package com.product.data.service;
|
|
import cn.hutool.core.util.IdUtil;
|
import com.google.common.collect.Lists;
|
import com.product.common.lang.DateUtils;
|
import com.product.common.lang.ExceptionUtils;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.config.CmnConst;
|
import com.product.data.config.DatabaseType;
|
import com.product.data.config.ErrorCode;
|
import com.product.data.connection.ConnectionManager;
|
import com.product.data.entity.QueryResultEntity;
|
import com.product.data.utli.CommonUtils;
|
import com.product.data.utli.QueryDataService;
|
import com.product.util.CallBack;
|
import com.product.util.CallBackValue;
|
import com.product.util.SystemParamReplace;
|
|
import java.sql.Connection;
|
import java.sql.PreparedStatement;
|
import java.sql.SQLException;
|
import java.util.*;
|
|
/**
|
* @Author cheng
|
* @Date 2022/2/13 17:25
|
* @Desc 同步数据之后处理执行Service
|
*/
|
public class SyncDataAfterProcessExecuteService {
|
/**
|
* 目标表名
|
*/
|
private String targetTable;
|
/**
|
* 总条数
|
*/
|
private int totalNumber;
|
/**
|
* 查询模板sql
|
*/
|
private String queryTemplateSql;
|
/**
|
* 查询模板sql
|
*/
|
private String insertTemplateSql;
|
/**
|
* 处理任务主表uuid
|
*/
|
private String processUuid;
|
/**
|
* 是否清空表数据
|
*/
|
private boolean isClearTableData;
|
/**
|
* 系统连接
|
*/
|
private Connection systemConnection = ConnectionManager.getSystemConnection();
|
/**
|
* 默认值map
|
*/
|
private Map<String, Object> defaultValue = new HashMap<>();
|
/**
|
* 当前页
|
*/
|
private int currentPage = 1;
|
/**
|
* 总页数
|
*/
|
private int totalPage = -1;
|
/**
|
* 每页查询条数
|
*/
|
private int pageSize = 100000;
|
/**
|
* 每批次提交条数
|
*/
|
private int batchSize = 5000;
|
/**
|
* 查询数据service
|
*/
|
private QueryDataService queryDataService;
|
|
private Map<String, Map<Object, Object>> systemFieldByCache;
|
|
private void initDefaultValue() {
|
defaultValue.put(CmnConst.CREATED_BY, 1);
|
defaultValue.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime());
|
defaultValue.put("org_level_uuid", "00000000-0000-0000-0000-000000000000");
|
CallBackValue<String> getUuidMethod = (o) -> IdUtil.randomUUID();
|
defaultValue.put(CmnConst.UUID, getUuidMethod);
|
String[] fields = defaultValue.keySet().toArray(new String[]{});
|
for (String field : fields) {
|
if (!systemFieldByCache.containsKey(field)) {
|
this.defaultValue.remove(field);
|
}
|
}
|
}
|
|
/**
|
* 添加默认值
|
* 字段名称必选在目标表中
|
*
|
* @param fieldName
|
* @param value
|
*/
|
public void addDefaultValue(String fieldName, Object value) {
|
if (this.systemFieldByCache.containsKey(fieldName)) {
|
this.defaultValue.put(fieldName, value);
|
}
|
}
|
|
public void setPageSize(int pageSize) {
|
this.pageSize = pageSize;
|
}
|
|
/**
|
* 初始构造
|
*
|
* @param targetTable 目标表名
|
* @param totalNumber 要查询数据的总条数
|
* @param queryTemplateSql 查询模板sql
|
* @param processUuid 同步任务后处理 uuid
|
* @param isClearTableData 是否清空表数据
|
*/
|
public SyncDataAfterProcessExecuteService(String targetTable, int totalNumber, String queryTemplateSql, String processUuid, boolean isClearTableData) throws BaseException {
|
try {
|
this.targetTable = targetTable;
|
this.totalNumber = totalNumber;
|
this.queryTemplateSql = queryTemplateSql;
|
this.processUuid = processUuid;
|
this.isClearTableData = isClearTableData;
|
//初始化查询Service
|
this.queryDataService = new QueryDataService(this.systemConnection);
|
// this.initInsertSql();
|
this.systemConnection.setAutoCommit(false);
|
if (this.totalNumber > 0) {
|
this.totalPage = (int) Math.ceil(totalNumber / Double.valueOf(this.pageSize));
|
}
|
//获取目标表系统字段
|
this.systemFieldByCache = CommonUtils.getSystemFieldByCache(targetTable);
|
//初始化默认值
|
this.initDefaultValue();
|
} catch (BaseException e) {
|
e.printStackTrace();
|
this.outPutError("初始化SyncDataAfterProcessExecuteService构造失败", e);
|
throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL);
|
} catch (Exception e) {
|
e.printStackTrace();
|
this.outPutError("初始化SyncDataAfterProcessExecuteService构造失败", e);
|
throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL);
|
}
|
}
|
|
private List<String> valueFields;
|
|
private void initInsertSql() throws BaseException {
|
if (StringUtils.isEmpty(this.targetTable)) {
|
throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_TARGET_TABLE_EMPTY);
|
}
|
|
StringBuilder sql = new StringBuilder(" INSERT INTO `");
|
sql.append(this.targetTable).append("` ( ");
|
String values = "";
|
this.valueFields = new ArrayList<>();
|
for (int i = 0; i < this.queryFields.size(); i++) {
|
String queryField = this.queryFields.get(i);
|
this.valueFields.add(queryField);
|
if (this.defaultValue.containsKey(queryField)) {
|
this.defaultValue.remove(queryField);
|
}
|
sql.append(CommonUtils.getFieldName(queryField, DatabaseType.MySql));
|
values += "?";
|
if (i + 1 < this.queryFields.size()) {
|
sql.append(",");
|
values += ",";
|
} else if (this.defaultValue.size() > 0) {
|
String[] fields = this.defaultValue.keySet().toArray(new String[]{});
|
sql.append(",");
|
values += ",";
|
for (int j = 0; j < fields.length; j++) {
|
this.valueFields.add(fields[j]);
|
sql.append(CommonUtils.getFieldName(fields[j], DatabaseType.MySql));
|
values += "?";
|
if (j + 1 < fields.length) {
|
sql.append(",");
|
values += ",";
|
}
|
}
|
}
|
|
}
|
sql.append(" ) VALUES (").append(values).append(") ");
|
this.insertTemplateSql = sql.toString();
|
}
|
|
/**
|
* 查询字段
|
*/
|
private List<String> queryFields;
|
|
private FieldSetEntity pageParam;
|
|
/**
|
* 查询下一页数据
|
*
|
* @return
|
*/
|
private QueryResultEntity queryNextPageData() throws BaseException {
|
long startTime = System.currentTimeMillis();
|
if (pageParam == null) {
|
pageParam = new FieldSetEntity();
|
pageParam.setTableName("temp");
|
pageParam.setValue("end_size", this.pageSize);
|
}
|
pageParam.setValue("start_size", (this.currentPage - 1) * this.pageSize);
|
// String sql = String.format(this.queryTemplateSql, , this.pageSize);
|
String sql = SystemParamReplace.formParamsReplace(this.queryTemplateSql, pageParam);
|
this.outPutMsg("开始查询第 " + this.currentPage + " 页数据,sql:\n" + sql);
|
QueryResultEntity result = this.queryDataService.getResult(sql, queryFields);
|
//当查询字段为空 并且查询结果不为空时获取第一条查询结果的所有字段 避免多次获取字段信息
|
if (this.queryFields == null || this.queryFields.size() <= 0) {
|
if (result == null || result.getColumnNames() == null || result.getColumnNames().size() <= 0) {
|
//没有获取到查询的字段信息
|
throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_QUERY_RESULT_FAIL_EMPTY);
|
} else {
|
this.queryFields = result.getColumnNames();
|
initInsertSql();
|
}
|
}
|
this.outPutMsg("查询完成,本次查询到 " + result.getSize() + " 条数据,耗时:" + ((System.currentTimeMillis() - startTime) / 1000) + " 秒");
|
this.currentPage++;
|
return result;
|
}
|
|
private PreparedStatement pst;
|
|
private int notCommitNumber = 0;
|
|
private int commitNumber = 0;
|
|
private int errorNumber = 0;
|
|
public void executeProcess(CallBack<Map<String, Object>> callBack) throws BaseException {
|
try {
|
if (this.isClearTableData) {
|
PreparedStatement pst = this.systemConnection.prepareStatement("truncate table " + this.targetTable);
|
pst.execute();
|
pst.close();
|
this.outPutMsg("执行清空表成功:" + this.targetTable);
|
}
|
execute(callBack);
|
} catch (BaseException e) {
|
e.printStackTrace();
|
this.outPutError("执行同步数据后处理错误", e);
|
throw e;
|
} catch (Exception e) {
|
e.printStackTrace();
|
this.outPutError("执行同步数据后处理错误", e);
|
throw new BaseException(e);
|
} finally {
|
//释放资源
|
close();
|
//将对象都置null 因为处理完成后外部调用方法可能会递归处理其他造成资源浪费
|
this.valueFields = null;
|
this.queryDataService = null;
|
this.pst = null;
|
this.defaultValue = null;
|
this.queryFields = null;
|
this.insertTemplateSql = null;
|
this.queryTemplateSql = null;
|
}
|
}
|
|
private void execute(CallBack<Map<String, Object>> callBack) throws BaseException, SQLException {
|
|
//查询数据
|
QueryResultEntity result = this.queryNextPageData();
|
if (this.currentPage - 1 == 1) {
|
//查询的第一页初始化预编译对象
|
this.pst = this.systemConnection.prepareStatement(this.insertTemplateSql);
|
}
|
if (result != null && result.getSize() > 0) {
|
//查询到数据
|
for (int i = 0; i < result.getResult().size(); i++) {
|
Map<String, Object> valuesMap = result.getResult().get(i);
|
if (this.defaultValue.size() > 0) {
|
this.defaultValue.forEach((k, v) -> {
|
|
if (v instanceof CallBackValue) {
|
v = ((CallBackValue) v).method(k);
|
}
|
valuesMap.put(k, v);
|
});
|
}
|
if (callBack != null) {
|
callBack.method(valuesMap);
|
}
|
for (int j = 0; j < this.valueFields.size(); j++) {
|
this.pst.setObject(j + 1, valuesMap.get(this.valueFields.get(j)));
|
}
|
try {
|
this.pst.addBatch();
|
notCommitNumber++;
|
} catch (Exception e) {
|
this.outPutError("批处理增加失败", e);
|
errorNumber++;
|
}
|
if (notCommitNumber > 0 && notCommitNumber % this.batchSize == 0) {
|
commit();
|
}
|
}
|
}
|
//this.currentPage - 1 是因为在调用查询下一页数据方法结束后 +1
|
if (this.totalPage != -1 && this.currentPage - 1 < this.totalPage) {
|
//有总页数 且当前页数小于总页数
|
//进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源
|
if (result != null) {
|
result.clear();
|
}
|
this.execute(callBack);
|
} else if (result != null && result.getSize() == this.pageSize) {
|
//没有指定总页数,查询到数据条数=每页查询条数 可能还有下一页
|
//进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源
|
if (result != null) {
|
result.clear();
|
}
|
this.execute(callBack);
|
} else {
|
// 结束查询执行方法完成
|
commit();
|
this.outPutMsg("同步任务后处理方法执行完成,processUuid:" + this.processUuid);
|
this.outPutMsg("本次成功插入数据:" + this.commitNumber + "条");
|
}
|
}
|
|
/**
|
* 提交
|
*/
|
private void commit() {
|
if (this.notCommitNumber > 0) {
|
try {
|
this.pst.executeBatch();
|
this.systemConnection.commit();
|
this.pst.clearBatch();
|
} catch (Exception e) {
|
errorNumber += this.notCommitNumber;
|
this.notCommitNumber = 0;
|
this.outPutError("批处理提交失败", e);
|
} finally {
|
this.commitNumber += this.notCommitNumber;
|
this.notCommitNumber = 0;
|
}
|
}
|
}
|
|
private void close() {
|
try {
|
if (this.pst != null && !this.pst.isClosed()) {
|
pst.close();
|
}
|
if (this.systemConnection != null && !this.systemConnection.isClosed()) {
|
this.systemConnection.close();
|
}
|
} catch (Exception e) {
|
|
}
|
}
|
|
|
/**
|
* 输出错误日志
|
*
|
* @param error
|
* @param e
|
*/
|
private void outPutError(String error, Exception e) {
|
if (e != null) {
|
String stackTraceAsString = ExceptionUtils.getStackTraceAsString(e);
|
error += (!StringUtils.isEmpty(error) ? ":\n" : "") + stackTraceAsString;
|
}
|
|
this.outPutError(error);
|
}
|
|
private void outPutError(String error) {
|
SpringMVCContextHolder.getSystemLogger().error(error);
|
}
|
|
private void outPutMsg(String msg) {
|
SpringMVCContextHolder.getSystemLogger().info(msg);
|
}
|
|
|
}
|