package com.product.data.service; import cn.hutool.core.util.IdUtil; import com.google.common.collect.Lists; import com.product.common.lang.DateUtils; import com.product.common.lang.ExceptionUtils; import com.product.common.lang.StringUtils; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.config.CmnConst; import com.product.data.config.DatabaseType; import com.product.data.config.ErrorCode; import com.product.data.connection.ConnectionManager; import com.product.data.entity.QueryResultEntity; import com.product.data.utli.CommonUtils; import com.product.data.utli.QueryDataService; import com.product.util.CallBack; import com.product.util.CallBackValue; import com.product.util.SystemParamReplace; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; /** * @Author cheng * @Date 2022/2/13 17:25 * @Desc 同步数据之后处理执行Service */ public class SyncDataAfterProcessExecuteService { /** * 目标表名 */ private String targetTable; /** * 总条数 */ private int totalNumber; /** * 查询模板sql */ private String queryTemplateSql; /** * 查询模板sql */ private String insertTemplateSql; /** * 处理任务主表uuid */ private String processUuid; /** * 是否清空表数据 */ private boolean isClearTableData; /** * 系统连接 */ private Connection systemConnection = ConnectionManager.getSystemConnection(); /** * 默认值map */ private Map defaultValue = new HashMap<>(); /** * 当前页 */ private int currentPage = 1; /** * 总页数 */ private int totalPage = -1; /** * 每页查询条数 */ private int pageSize = 100000; /** * 每批次提交条数 */ private int batchSize = 5000; /** * 查询数据service */ private QueryDataService queryDataService; private Map> systemFieldByCache; private void initDefaultValue() { defaultValue.put(CmnConst.CREATED_BY, 1); defaultValue.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime()); defaultValue.put("org_level_uuid", "00000000-0000-0000-0000-000000000000"); CallBackValue getUuidMethod = (o) -> IdUtil.randomUUID(); defaultValue.put(CmnConst.UUID, getUuidMethod); String[] fields = defaultValue.keySet().toArray(new String[]{}); for (String field : fields) { if (!systemFieldByCache.containsKey(field)) { this.defaultValue.remove(field); } } } /** * 添加默认值 * 字段名称必选在目标表中 * * @param fieldName * @param value */ public void addDefaultValue(String fieldName, Object value) { if (this.systemFieldByCache.containsKey(fieldName)) { this.defaultValue.put(fieldName, value); } } public void setPageSize(int pageSize) { this.pageSize = pageSize; } /** * 初始构造 * * @param targetTable 目标表名 * @param totalNumber 要查询数据的总条数 * @param queryTemplateSql 查询模板sql * @param processUuid 同步任务后处理 uuid * @param isClearTableData 是否清空表数据 */ public SyncDataAfterProcessExecuteService(String targetTable, int totalNumber, String queryTemplateSql, String processUuid, boolean isClearTableData) throws BaseException { try { this.targetTable = targetTable; this.totalNumber = totalNumber; this.queryTemplateSql = queryTemplateSql; this.processUuid = processUuid; this.isClearTableData = isClearTableData; //初始化查询Service this.queryDataService = new QueryDataService(this.systemConnection); // this.initInsertSql(); this.systemConnection.setAutoCommit(false); if (this.totalNumber > 0) { this.totalPage = (int) Math.ceil(totalNumber / Double.valueOf(this.pageSize)); } //获取目标表系统字段 this.systemFieldByCache = CommonUtils.getSystemFieldByCache(targetTable); //初始化默认值 this.initDefaultValue(); } catch (BaseException e) { e.printStackTrace(); this.outPutError("初始化SyncDataAfterProcessExecuteService构造失败", e); throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL); } catch (Exception e) { e.printStackTrace(); this.outPutError("初始化SyncDataAfterProcessExecuteService构造失败", e); throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL); } } private List valueFields; private void initInsertSql() throws BaseException { if (StringUtils.isEmpty(this.targetTable)) { throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_TARGET_TABLE_EMPTY); } StringBuilder sql = new StringBuilder(" INSERT INTO `"); sql.append(this.targetTable).append("` ( "); String values = ""; this.valueFields = new ArrayList<>(); for (int i = 0; i < this.queryFields.size(); i++) { String queryField = this.queryFields.get(i); this.valueFields.add(queryField); if (this.defaultValue.containsKey(queryField)) { this.defaultValue.remove(queryField); } sql.append(CommonUtils.getFieldName(queryField, DatabaseType.MySql)); values += "?"; if (i + 1 < this.queryFields.size()) { sql.append(","); values += ","; } else if (this.defaultValue.size() > 0) { String[] fields = this.defaultValue.keySet().toArray(new String[]{}); sql.append(","); values += ","; for (int j = 0; j < fields.length; j++) { this.valueFields.add(fields[j]); sql.append(CommonUtils.getFieldName(fields[j], DatabaseType.MySql)); values += "?"; if (j + 1 < fields.length) { sql.append(","); values += ","; } } } } sql.append(" ) VALUES (").append(values).append(") "); this.insertTemplateSql = sql.toString(); } /** * 查询字段 */ private List queryFields; private FieldSetEntity pageParam; /** * 查询下一页数据 * * @return */ private QueryResultEntity queryNextPageData() throws BaseException { long startTime = System.currentTimeMillis(); if (pageParam == null) { pageParam = new FieldSetEntity(); pageParam.setTableName("temp"); pageParam.setValue("end_size", this.pageSize); } pageParam.setValue("start_size", (this.currentPage - 1) * this.pageSize); // String sql = String.format(this.queryTemplateSql, , this.pageSize); String sql = SystemParamReplace.formParamsReplace(this.queryTemplateSql, pageParam); this.outPutMsg("开始查询第 " + this.currentPage + " 页数据,sql:\n" + sql); QueryResultEntity result = this.queryDataService.getResult(sql, queryFields); //当查询字段为空 并且查询结果不为空时获取第一条查询结果的所有字段 避免多次获取字段信息 if (this.queryFields == null || this.queryFields.size() <= 0) { if (result == null || result.getColumnNames() == null || result.getColumnNames().size() <= 0) { //没有获取到查询的字段信息 throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_QUERY_RESULT_FAIL_EMPTY); } else { this.queryFields = result.getColumnNames(); initInsertSql(); } } this.outPutMsg("查询完成,本次查询到 " + result.getSize() + " 条数据,耗时:" + ((System.currentTimeMillis() - startTime) / 1000) + " 秒"); this.currentPage++; return result; } private PreparedStatement pst; private int notCommitNumber = 0; private int commitNumber = 0; private int errorNumber = 0; public void executeProcess(CallBack> callBack) throws BaseException { try { if (this.isClearTableData) { PreparedStatement pst = this.systemConnection.prepareStatement("truncate table " + this.targetTable); pst.execute(); pst.close(); this.outPutMsg("执行清空表成功:" + this.targetTable); } execute(callBack); } catch (BaseException e) { e.printStackTrace(); this.outPutError("执行同步数据后处理错误", e); throw e; } catch (Exception e) { e.printStackTrace(); this.outPutError("执行同步数据后处理错误", e); throw new BaseException(e); } finally { //释放资源 close(); //将对象都置null 因为处理完成后外部调用方法可能会递归处理其他造成资源浪费 this.valueFields = null; this.queryDataService = null; this.pst = null; this.defaultValue = null; this.queryFields = null; this.insertTemplateSql = null; this.queryTemplateSql = null; } } private void execute(CallBack> callBack) throws BaseException, SQLException { //查询数据 QueryResultEntity result = this.queryNextPageData(); if (this.currentPage - 1 == 1) { //查询的第一页初始化预编译对象 this.pst = this.systemConnection.prepareStatement(this.insertTemplateSql); } if (result != null && result.getSize() > 0) { //查询到数据 for (int i = 0; i < result.getResult().size(); i++) { Map valuesMap = result.getResult().get(i); if (this.defaultValue.size() > 0) { this.defaultValue.forEach((k, v) -> { if (v instanceof CallBackValue) { v = ((CallBackValue) v).method(k); } valuesMap.put(k, v); }); } if (callBack != null) { callBack.method(valuesMap); } for (int j = 0; j < this.valueFields.size(); j++) { this.pst.setObject(j + 1, valuesMap.get(this.valueFields.get(j))); } try { this.pst.addBatch(); notCommitNumber++; } catch (Exception e) { this.outPutError("批处理增加失败", e); errorNumber++; } if (notCommitNumber > 0 && notCommitNumber % this.batchSize == 0) { commit(); } } } //this.currentPage - 1 是因为在调用查询下一页数据方法结束后 +1 if (this.totalPage != -1 && this.currentPage - 1 < this.totalPage) { //有总页数 且当前页数小于总页数 //进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源 if (result != null) { result.clear(); } this.execute(callBack); } else if (result != null && result.getSize() == this.pageSize) { //没有指定总页数,查询到数据条数=每页查询条数 可能还有下一页 //进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源 if (result != null) { result.clear(); } this.execute(callBack); } else { // 结束查询执行方法完成 commit(); this.outPutMsg("同步任务后处理方法执行完成,processUuid:" + this.processUuid); this.outPutMsg("本次成功插入数据:" + this.commitNumber + "条"); } } /** * 提交 */ private void commit() { if (this.notCommitNumber > 0) { try { this.pst.executeBatch(); this.systemConnection.commit(); this.pst.clearBatch(); } catch (Exception e) { errorNumber += this.notCommitNumber; this.notCommitNumber = 0; this.outPutError("批处理提交失败", e); } finally { this.commitNumber += this.notCommitNumber; this.notCommitNumber = 0; } } } private void close() { try { if (this.pst != null && !this.pst.isClosed()) { pst.close(); } if (this.systemConnection != null && !this.systemConnection.isClosed()) { this.systemConnection.close(); } } catch (Exception e) { } } /** * 输出错误日志 * * @param error * @param e */ private void outPutError(String error, Exception e) { if (e != null) { String stackTraceAsString = ExceptionUtils.getStackTraceAsString(e); error += (!StringUtils.isEmpty(error) ? ":\n" : "") + stackTraceAsString; } this.outPutError(error); } private void outPutError(String error) { SpringMVCContextHolder.getSystemLogger().error(error); } private void outPutMsg(String msg) { SpringMVCContextHolder.getSystemLogger().info(msg); } }