package com.product.data.service; import cn.hutool.core.util.IdUtil; import com.google.common.collect.Lists; import com.product.common.lang.DateUtils; import com.product.common.lang.ExceptionUtils; import com.product.common.lang.StringUtils; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.config.CmnConst; import com.product.data.config.DatabaseType; import com.product.data.config.ErrorCode; import com.product.data.connection.ConnectionManager; import com.product.data.entity.QueryResultEntity; import com.product.data.utli.CommonUtils; import com.product.data.utli.QueryDataService; import com.product.util.CallBack; import com.product.util.CallBackValue; import com.product.util.SystemParamReplace; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; /** * @Author cheng * @Date 2022/2/13 17:25 * @Desc 鍚屾鏁版嵁涔嬪悗澶勭悊鎵цService */ public class SyncDataAfterProcessExecuteService { /** * 鐩爣琛ㄥ悕 */ private String targetTable; /** * 鎬绘潯鏁� */ private int totalNumber; /** * 鏌ヨ妯℃澘sql */ private String queryTemplateSql; /** * 鏌ヨ妯℃澘sql */ private String insertTemplateSql; /** * 澶勭悊浠诲姟涓昏〃uuid */ private String processUuid; /** * 鏄惁娓呯┖琛ㄦ暟鎹� */ private boolean isClearTableData; /** * 绯荤粺杩炴帴 */ private Connection systemConnection = ConnectionManager.getSystemConnection(); /** * 榛樿鍊糾ap */ private Map<String, Object> defaultValue = new HashMap<>(); /** * 褰撳墠椤� */ private int currentPage = 1; /** * 鎬婚〉鏁� */ private int totalPage = -1; /** * 姣忛〉鏌ヨ鏉℃暟 */ private int pageSize = 100000; /** * 姣忔壒娆℃彁浜ゆ潯鏁� */ private int batchSize = 5000; /** * 鏌ヨ鏁版嵁service */ private QueryDataService queryDataService; private Map<String, Map<Object, Object>> systemFieldByCache; private void initDefaultValue() { defaultValue.put(CmnConst.CREATED_BY, 1); defaultValue.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime()); defaultValue.put("org_level_uuid", "00000000-0000-0000-0000-000000000000"); CallBackValue<String> getUuidMethod = (o) -> IdUtil.randomUUID(); defaultValue.put(CmnConst.UUID, getUuidMethod); String[] fields = defaultValue.keySet().toArray(new String[]{}); for (String field : fields) { if (!systemFieldByCache.containsKey(field)) { this.defaultValue.remove(field); } } } /** * 娣诲姞榛樿鍊� * 瀛楁鍚嶇О蹇呴€夊湪鐩爣琛ㄤ腑 * * @param fieldName * @param value */ public void addDefaultValue(String fieldName, Object value) { if (this.systemFieldByCache.containsKey(fieldName)) { this.defaultValue.put(fieldName, value); } } public void setPageSize(int pageSize) { this.pageSize = pageSize; } /** * 鍒濆鏋勯€� * * @param targetTable 鐩爣琛ㄥ悕 * @param totalNumber 瑕佹煡璇㈡暟鎹殑鎬绘潯鏁� * @param queryTemplateSql 鏌ヨ妯℃澘sql * @param processUuid 鍚屾浠诲姟鍚庡鐞� uuid * @param isClearTableData 鏄惁娓呯┖琛ㄦ暟鎹� */ public SyncDataAfterProcessExecuteService(String targetTable, int totalNumber, String queryTemplateSql, String processUuid, boolean isClearTableData) throws BaseException { try { this.targetTable = targetTable; this.totalNumber = totalNumber; this.queryTemplateSql = queryTemplateSql; this.processUuid = processUuid; this.isClearTableData = isClearTableData; //鍒濆鍖栨煡璇ervice this.queryDataService = new QueryDataService(this.systemConnection); // this.initInsertSql(); this.systemConnection.setAutoCommit(false); if (this.totalNumber > 0) { this.totalPage = (int) Math.ceil(totalNumber / Double.valueOf(this.pageSize)); } //鑾峰彇鐩爣琛ㄧ郴缁熷瓧娈� this.systemFieldByCache = CommonUtils.getSystemFieldByCache(targetTable); //鍒濆鍖栭粯璁ゅ€� this.initDefaultValue(); } catch (BaseException e) { e.printStackTrace(); this.outPutError("鍒濆鍖朣yncDataAfterProcessExecuteService鏋勯€犲け璐�", e); throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL); } catch (Exception e) { e.printStackTrace(); this.outPutError("鍒濆鍖朣yncDataAfterProcessExecuteService鏋勯€犲け璐�", e); throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL); } } private List<String> valueFields; private void initInsertSql() throws BaseException { if (StringUtils.isEmpty(this.targetTable)) { throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_TARGET_TABLE_EMPTY); } StringBuilder sql = new StringBuilder(" INSERT INTO `"); sql.append(this.targetTable).append("` ( "); String values = ""; this.valueFields = new ArrayList<>(); for (int i = 0; i < this.queryFields.size(); i++) { String queryField = this.queryFields.get(i); this.valueFields.add(queryField); if (this.defaultValue.containsKey(queryField)) { this.defaultValue.remove(queryField); } sql.append(CommonUtils.getFieldName(queryField, DatabaseType.MySql)); values += "?"; if (i + 1 < this.queryFields.size()) { sql.append(","); values += ","; } else if (this.defaultValue.size() > 0) { String[] fields = this.defaultValue.keySet().toArray(new String[]{}); sql.append(","); values += ","; for (int j = 0; j < fields.length; j++) { this.valueFields.add(fields[j]); sql.append(CommonUtils.getFieldName(fields[j], DatabaseType.MySql)); values += "?"; if (j + 1 < fields.length) { sql.append(","); values += ","; } } } } sql.append(" ) VALUES (").append(values).append(") "); this.insertTemplateSql = sql.toString(); } /** * 鏌ヨ瀛楁 */ private List<String> queryFields; private FieldSetEntity pageParam; /** * 鏌ヨ涓嬩竴椤垫暟鎹� * * @return */ private QueryResultEntity queryNextPageData() throws BaseException { long startTime = System.currentTimeMillis(); if (pageParam == null) { pageParam = new FieldSetEntity(); pageParam.setTableName("temp"); pageParam.setValue("end_size", this.pageSize); } pageParam.setValue("start_size", (this.currentPage - 1) * this.pageSize); // String sql = String.format(this.queryTemplateSql, , this.pageSize); String sql = SystemParamReplace.formParamsReplace(this.queryTemplateSql, pageParam); this.outPutMsg("寮€濮嬫煡璇㈢ " + this.currentPage + " 椤垫暟鎹紝sql:\n" + sql); QueryResultEntity result = this.queryDataService.getResult(sql, queryFields); //褰撴煡璇㈠瓧娈典负绌� 骞朵笖鏌ヨ缁撴灉涓嶄负绌烘椂鑾峰彇绗竴鏉℃煡璇㈢粨鏋滅殑鎵€鏈夊瓧娈� 閬垮厤澶氭鑾峰彇瀛楁淇℃伅 if (this.queryFields == null || this.queryFields.size() <= 0) { if (result == null || result.getColumnNames() == null || result.getColumnNames().size() <= 0) { //娌℃湁鑾峰彇鍒版煡璇㈢殑瀛楁淇℃伅 throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_QUERY_RESULT_FAIL_EMPTY); } else { this.queryFields = result.getColumnNames(); initInsertSql(); } } this.outPutMsg("鏌ヨ瀹屾垚锛屾湰娆℃煡璇㈠埌 " + result.getSize() + " 鏉℃暟鎹紝鑰楁椂锛�" + ((System.currentTimeMillis() - startTime) / 1000) + " 绉�"); this.currentPage++; return result; } private PreparedStatement pst; private int notCommitNumber = 0; private int commitNumber = 0; private int errorNumber = 0; public void executeProcess(CallBack<Map<String, Object>> callBack) throws BaseException { try { if (this.isClearTableData) { PreparedStatement pst = this.systemConnection.prepareStatement("truncate table " + this.targetTable); pst.execute(); pst.close(); this.outPutMsg("鎵ц娓呯┖琛ㄦ垚鍔燂細" + this.targetTable); } execute(callBack); } catch (BaseException e) { e.printStackTrace(); this.outPutError("鎵ц鍚屾鏁版嵁鍚庡鐞嗛敊璇�", e); throw e; } catch (Exception e) { e.printStackTrace(); this.outPutError("鎵ц鍚屾鏁版嵁鍚庡鐞嗛敊璇�", e); throw new BaseException(e); } finally { //閲婃斁璧勬簮 close(); //灏嗗璞¢兘缃畁ull 鍥犱负澶勭悊瀹屾垚鍚庡閮ㄨ皟鐢ㄦ柟娉曞彲鑳戒細閫掑綊澶勭悊鍏朵粬閫犳垚璧勬簮娴垂 this.valueFields = null; this.queryDataService = null; this.pst = null; this.defaultValue = null; this.queryFields = null; this.insertTemplateSql = null; this.queryTemplateSql = null; } } private void execute(CallBack<Map<String, Object>> callBack) throws BaseException, SQLException { //鏌ヨ鏁版嵁 QueryResultEntity result = this.queryNextPageData(); if (this.currentPage - 1 == 1) { //鏌ヨ鐨勭涓€椤靛垵濮嬪寲棰勭紪璇戝璞� this.pst = this.systemConnection.prepareStatement(this.insertTemplateSql); } if (result != null && result.getSize() > 0) { //鏌ヨ鍒版暟鎹� for (int i = 0; i < result.getResult().size(); i++) { Map<String, Object> valuesMap = result.getResult().get(i); if (this.defaultValue.size() > 0) { this.defaultValue.forEach((k, v) -> { if (v instanceof CallBackValue) { v = ((CallBackValue) v).method(k); } valuesMap.put(k, v); }); } if (callBack != null) { callBack.method(valuesMap); } for (int j = 0; j < this.valueFields.size(); j++) { this.pst.setObject(j + 1, valuesMap.get(this.valueFields.get(j))); } try { this.pst.addBatch(); notCommitNumber++; } catch (Exception e) { this.outPutError("鎵瑰鐞嗗鍔犲け璐�", e); errorNumber++; } if (notCommitNumber > 0 && notCommitNumber % this.batchSize == 0) { commit(); } } } //this.currentPage - 1 鏄洜涓哄湪璋冪敤鏌ヨ涓嬩竴椤垫暟鎹柟娉曠粨鏉熷悗 +1 if (this.totalPage != -1 && this.currentPage - 1 < this.totalPage) { //鏈夋€婚〉鏁� 涓斿綋鍓嶉〉鏁板皬浜庢€婚〉鏁� //杩涘叆閫掑綊 娓呯┖褰撳墠鏌ヨ鐨勭粨鏋滈泦 鍚﹀垯椤垫暟杩囧鍚庡崰鐢ㄨ繃澶氱殑璧勬簮 if (result != null) { result.clear(); } this.execute(callBack); } else if (result != null && result.getSize() == this.pageSize) { //娌℃湁鎸囧畾鎬婚〉鏁帮紝鏌ヨ鍒版暟鎹潯鏁�=姣忛〉鏌ヨ鏉℃暟 鍙兘杩樻湁涓嬩竴椤� //杩涘叆閫掑綊 娓呯┖褰撳墠鏌ヨ鐨勭粨鏋滈泦 鍚﹀垯椤垫暟杩囧鍚庡崰鐢ㄨ繃澶氱殑璧勬簮 if (result != null) { result.clear(); } this.execute(callBack); } else { // 缁撴潫鏌ヨ鎵ц鏂规硶瀹屾垚 commit(); this.outPutMsg("鍚屾浠诲姟鍚庡鐞嗘柟娉曟墽琛屽畬鎴愶紝processUuid:" + this.processUuid); this.outPutMsg("鏈鎴愬姛鎻掑叆鏁版嵁:" + this.commitNumber + "鏉�"); } } /** * 鎻愪氦 */ private void commit() { if (this.notCommitNumber > 0) { try { this.pst.executeBatch(); this.systemConnection.commit(); this.pst.clearBatch(); } catch (Exception e) { errorNumber += this.notCommitNumber; this.notCommitNumber = 0; this.outPutError("鎵瑰鐞嗘彁浜ゅけ璐�", e); } finally { this.commitNumber += this.notCommitNumber; this.notCommitNumber = 0; } } } private void close() { try { if (this.pst != null && !this.pst.isClosed()) { pst.close(); } if (this.systemConnection != null && !this.systemConnection.isClosed()) { this.systemConnection.close(); } } catch (Exception e) { } } /** * 杈撳嚭閿欒鏃ュ織 * * @param error * @param e */ private void outPutError(String error, Exception e) { if (e != null) { String stackTraceAsString = ExceptionUtils.getStackTraceAsString(e); error += (!StringUtils.isEmpty(error) ? ":\n" : "") + stackTraceAsString; } this.outPutError(error); } private void outPutError(String error) { SpringMVCContextHolder.getSystemLogger().error(error); } private void outPutMsg(String msg) { SpringMVCContextHolder.getSystemLogger().info(msg); } }