package com.product.data.sync.util; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.IdUtil; import cn.hutool.core.util.NumberUtil; import com.google.common.collect.Lists; import com.product.common.collect.ListUtils; import com.product.common.collect.MapUtils; import com.product.common.lang.DateUtils; import com.product.common.lang.StringUtils; import com.product.core.config.Global; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.sync.config.CmnConst; import com.product.data.sync.config.SystemCode; import com.product.data.sync.service.ViewDataProcessService; import com.product.data.sync.service.ide.IViewDataProcessService; import com.product.util.BaseUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.lang.reflect.InvocationTargetException; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * 定时执行 同步数据的类 */ @Service public class ScheduledTaskExecution extends AbstractBaseService { @Autowired public ExceptionLog exceptionLog; @Autowired public BaseDao baseDao; @Override public BaseDao getBaseDao() { return baseDao; } @Override public void setBaseDao(BaseDao baseDao) { this.baseDao = baseDao; } @Autowired IViewDataProcessService iViewDataProcessService; public static void main(String[] args) { String driver = "com.mysql.cj.jdbc.Driver"; String ipAddress = "127.0.0.1"; String portNumber = "3306"; String databaseName = "product_db_v2.0.0"; String instantiation = ""; String user = "root"; String pwd = "root123"; String url = "jdbc:mysql://127.0.0.1:3306/product_db_picc?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai"; try { Connection connection = DataManipulationUtils.getConnection(driver, url, user, pwd); connection.setAutoCommit(false); PreparedStatement pst = connection.prepareStatement("delete from picc_sync_wf_t_task where taskid=? "); Thread.currentThread().sleep(1000); for (int i = 0; i < 60; i++) { pst.setObject(1, i + "sd"); pst.addBatch(); } pst.executeBatch(); connection.commit(); System.out.println(1); } catch (Exception e) { e.printStackTrace(); } } /** * 数据同方法 * * @param uuid 连接的数据库表名信息表uuid */ public void getDataSync(String uuid) throws SQLException { System.out.println(Thread.currentThread().getId()); logger.info("定时任务线程id:" + Thread.currentThread().getId()); //获取表明信息表 FieldSetEntity syncField = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG, uuid, false); String syncUuid = syncField.getUUID(); //sync_config_uuid DataTableEntity dataTableEntity = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG_SUB, "sync_config_uuid = ?", new String[]{syncUuid}); Map sileMap = MapUtils.newHashMap(); Map syncMap = MapUtils.newHashMap(); Map deleteMap = MapUtils.newHashMap(); //是否有uuid boolean is_uuid = false; //sqlServer 第一个子表字段为ID字段 String idField = dataTableEntity.getFieldSetEntity(0).getString(CmnConst.DATA_ORIGIN_FIELD); for (int i = 0; i < dataTableEntity.getRows(); i++) { FieldSetEntity fse = dataTableEntity.getFieldSetEntity(i); String isSole = fse.getString(CmnConst.IS_SOLE); String systemField = fse.getString(CmnConst.SYSTEM_FIELD); //本系统表字段 String dataOriginField = fse.getString(CmnConst.DATA_ORIGIN_FIELD); //外部数据源表字段 //是否删除 if ("1".equals(syncField.getString(CmnConst.IS_DELETE))) { //是否是删除字段 if ("1".equals(fse.getString(CmnConst.IS_DELETE))) { deleteMap.put(dataOriginField, systemField); } } if ("uuid".equals(systemField)) { is_uuid = true; } //1 是唯一字段 if ("1".equals(isSole)) { sileMap.put(dataOriginField, systemField); } else { syncMap.put(dataOriginField, systemField); } } this.runDataSync(syncField, sileMap, syncMap, deleteMap, is_uuid, idField); } /** * @param syncField 同步配置表 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param is_uuid 是否有uuid */ public void runDataSync(FieldSetEntity syncField, Map sileMap, Map syncMap, Map deleteMap, boolean is_uuid, String idField) { //数据源表名 String dataOriginName = syncField.getString(CmnConst.DATA_ORIGIN_NAME); //获取系统表 String tableName = syncField.getString(CmnConst.SYSTEM_TABLE_NAME); //保存前调用的方法 String savePreEvent = syncField.getString(CmnConst.SAVE_PRE_EVENT); //保存后调用的方法 String postSaveEvent = syncField.getString(CmnConst.POST_SAVE_EVENT); //查询条件 String syncCondition = syncField.getString(CmnConst.SYNC_CONDITION); //是否修改 String isUpdate = syncField.getString(CmnConst.IS_UPDATE); //是否删除 String isDelete = syncField.getString(CmnConst.IS_DELETE); //删除标识值 String deleteValue = syncField.getString(CmnConst.DELETE_VALUE); //获取数据库连接信息 String configUuid = syncField.getString(CmnConst.DATABASE_CONFIG_UUID); //同步类型 1增量 2覆盖 String syncType = syncField.getString(CmnConst.SYNC_TYPE); FieldSetEntity configField = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG, configUuid, false); String databaseType = configField.getString(CmnConst.DATABASE_TYPE);//数据库类型 String ipAddress = configField.getString(CmnConst.IP_ADDRESS);//ip地址 String databaseName = configField.getString(CmnConst.DATABASE_NAME);//数据库名称 String portNumber = configField.getString(CmnConst.PORT_NUMBER);//端口号 String userName = configField.getString(CmnConst.USER_NAME);//用户名 String userPassword = configField.getString(CmnConst.USER_PASSWORD);//密码 String instantiation = configField.getString(CmnConst.INSTANTIATION);//实例名 String fileName = ""; //创建日志 String logUuid = exceptionLog.addExceptionLog(syncField.getUUID(), dataOriginName, tableName); //获取查询的字段 for (String key : sileMap.keySet()) { fileName = fileName + key + ","; } for (String key : syncMap.keySet()) { fileName = fileName + key + ","; } fileName = fileName.substring(0, fileName.length() - 1); //连接类 String diver; //连接url String url; //编辑sql StringBuffer sql = new StringBuffer(); //参数 Map map = Collections.synchronizedMap(new HashMap<>()); map.put("syncCondition", syncCondition); if ("mysql".equals(databaseType)) { diver = "com.mysql.cj.jdbc.Driver"; url = "jdbc:mysql://" + ipAddress + ":" + portNumber + "/" + databaseName + "?useSSL=false&serverTimezone=UTC"; if (BaseUtil.strIsNull(syncCondition)) { sql.append("SELECT ? FROM ? LIMIT ?,?");//参数失败 改为字符串拼接 } else { sql.append("SELECT ? FROM ? ").append(" WHERE ").append(syncCondition).append(" LIMIT ?,?");//参数失败 改为字符串拼接 } } else if ("oracle".equals(databaseType)) { diver = "oracle.jdbc.driver.OracleDriver"; url = "jdbc:oracle:thin:@" + ipAddress + ":" + portNumber + ":orcl"; if (BaseUtil.strIsNull(syncCondition)) { sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ROWNUM <= ?) t2 WHERE t2.rowno > ?"); } else { sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ").append(syncCondition).append("AND ROWNUM <= ?) t2 WHERE t2.rowno > ?"); } } else if ("informix".equals(databaseType)) { diver = "com.informix.jdbc.IfxDriver"; url = "jdbc:informix-sqli://" + ipAddress + ":" + portNumber + "/" + databaseName + ":informixserver=" + instantiation; if (BaseUtil.strIsNull(syncCondition)) { //跳过?行,获取?行 字段名 表名 sql.append(" SELECT SKIP ? FIRST ? ? FROM ? "); } else { sql.append(" SELECT SKIP ? FIRST ? ? FROM ? WHERE ").append(syncCondition); } } else if ("sqlserver".equals(databaseType)) { SpringMVCContextHolder.getSystemLogger().error("唯一字段获取不严谨:" + idField); diver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; url = "jdbc:sqlserver://" + ipAddress + ":" + portNumber + ";DataBaseName=" + databaseName; if (BaseUtil.strIsNull(syncCondition)) { //? 查询条数 ? 查询字段 ? 表名 idField 唯一字段名 ? 开始位置 idField 唯一字段名 ? 表名 sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ?)"); } else { sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ? WHERE ").append(syncCondition).append(" ) AND ").append(syncCondition); } } else { BaseException baseException = new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText()); exceptionLog.upExceptionLog(null, logUuid, baseException, 0, 0, 0, 0, 0); return; } //1增量同步 2覆盖同步 // if ("2".equals(syncType)) { // //删除表所有数据 // baseDao.executeUpdate("delete from " + tableName); // } //1增量或2覆盖同步 map.put(CmnConst.SYNC_TYPE, syncType); map.put("databaseType", databaseType); map.put("url", url); map.put("sql", sql.toString()); map.put("fileName", fileName); map.put("tableName", tableName); map.put("dataOriginName", dataOriginName); map.put("deleteValue", deleteValue); Integer currentPage = 1; map.put("currentPage", String.valueOf(currentPage)); map.put("pageSize", "1000"); map.put("logUuid", logUuid); Connection conn = null; //获取jdbc连接 try { // 判断 ip地址是否为 / 当ip地址 = / 时 则认为本地同步 String sourceType = Global.getSystemConfig("data.source.type", ""); if (sourceType != null && sourceType.equalsIgnoreCase(databaseType) && "/".equals(ipAddress)) { diver = Global.getSystemConfig("data.source.driver", ""); url = Global.getSystemConfig("data.source.url", ""); userName = Global.getSystemConfig("data.source.user", ""); userPassword = Global.getSystemConfig("data.source.password", ""); } // 远程地址获取数据库连接 conn = DataManipulationUtils.getConnection(diver, url, userName, userPassword); } catch (BaseException | ClassNotFoundException | SQLException e) { exceptionLog.upExceptionLog(null, logUuid, e, 0, 0, 0, 0, 0); return; } List changeDataRecord = null; //新增同时还要修改和删除 if ("1".equals(isUpdate) && "1".equals(isDelete) && deleteMap.size() > 0) { if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) { this.runnableBatchSaveDeleteFieldSetData(conn, map, sileMap, syncMap, deleteMap, savePreEvent, postSaveEvent, is_uuid); } else { try { //TODO changeDataRecord = this.batchSaveDeleteData(conn, map, sileMap, syncMap, deleteMap); } catch (Exception e) { e.printStackTrace(); } } //新增同时还要修改 } else if ("1".equals(isUpdate)) { //savePreEvent 保存前 //postSaveEvent 保存后 if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) { // this.batchSaveFieldSetData(conn, map, sileMap, syncMap, savePreEvent, postSaveEvent, is_uuid); //todo changeDataRecord = this.runnableBatchSaveFieldSetData(conn, map, sileMap, syncMap, savePreEvent, postSaveEvent, is_uuid); } else { // 明天处理 2022年1月13日21:59:11 !!!!!!!!!!!!!!!!!!!! changeDataRecord = this.batchSaveData(conn, map, sileMap, syncMap); } //新增同时还要删除 } else if ("1".equals(isDelete) && deleteMap.size() > 0) { if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) { changeDataRecord = this.runnableBatchAddDeleteFieldSetData(conn, map, sileMap, syncMap, deleteMap, savePreEvent, postSaveEvent, is_uuid); } else { changeDataRecord = this.batchAddDeleteData(conn, map, sileMap, syncMap, deleteMap); } //新增 } else { if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) { changeDataRecord = this.runnableBatchAddFieldSetData(conn, map, sileMap, syncMap, savePreEvent, postSaveEvent, is_uuid); } else { if ("2".equals(syncType)) { changeDataRecord = this.batchAddData(conn, map, sileMap, syncMap); } else { changeDataRecord = this.batchAddData(conn, map, sileMap, syncMap, syncType); } } } if (changeDataRecord != null && changeDataRecord.size() > 0) { syncDataAfterDispose(changeDataRecord, tableName); } } /** * 同步数据之后处理 * * @throws BaseException */ private void syncDataAfterDispose(List recordList, String tableName) throws BaseException { //recordList 此变量最大长度为 3 其中 下标 0 = 新增的数据 1 = 修改的数据 2 = 删除的数据 try { if (recordList != null) { // 新增或修改的数据 uuid List changeDataList = new ArrayList<>(); List changeDataIds = new ArrayList<>(); // 修改数据的条件 (根据此条件去查询修改过数据的uuid) StringBuffer updateDataFilter = new StringBuffer(); // 遍历记录 for (int i = 0; i < recordList.size(); i++) { Object object = recordList.get(i); //判断记录类型是为 List (传入的时候应为List) if (object instanceof List) { List objects = (List) object; // 判断objects 中泛型的数据类型 if (objects != null && objects.size() > 0) { Object objectValue = null; // 循环取值 当取到的值不为空时赋值 停止循环 for (int j = 0; j < objects.size(); j++) { if (objects.get(j) != null) { objectValue = objects.get(j); break; } } // 没有取到值 跳过 if (objectValue == null) { continue; } // 判断取值的类型 if (objectValue instanceof String) { // 根据uuid 新增、修改、删除的数据 if (i < 2) { // 根据uuid 修改或新增的数据 for (Object o : recordList) { if (o == null) { continue; } changeDataList.addAll((List) o); } } else { // 预留删除的数据处理 } } else if (objectValue instanceof Integer) { //传入的就是id // 根据uuid 新增、修改、删除的数据 if (i < 2) { // 根据uuid 修改或新增的数据 for (Object o : recordList) { if (o == null) { continue; } List o1 = (List) o; changeDataIds.addAll(o1); // String[] objects1 = o1.toArray(new String[]{}); // changeDataList.addAll(Arrays.asList(objects1)); } } else { // 预留删除的数据处理 } } else if (objectValue instanceof Map) { // 根据条件更新的数据或删除的数据 if (i == 1) { for (Object o : recordList) { // 根据条件修改的数据查询条件 Map filterMap = (Map) o; if (filterMap == null) { continue; } String filter = getFilterByMap(filterMap); if (updateDataFilter.length() > 0) { updateDataFilter.append(" OR "); } updateDataFilter.append(" ( ").append(filter).append(" ) "); } } else if (i == 2) { //预留删除的数据处理 } } } } } String pkField = baseDao.getPKField(tableName); if (StringUtils.isEmpty(pkField)) { return; } String updateIds = ""; if (updateDataFilter.length() > 0) { DataTableEntity dt = baseDao.listTable(tableName, updateDataFilter.toString(), new Object[]{}, new Object[]{pkField + " as uuid"}); if (!DataTableEntity.isEmpty(dt)) { updateIds = dt.getUuidsToString(); } } List add_ids = null; if (changeDataList.size() > 0) { DataTableEntity dataTableEntity = baseDao.listTable(tableName, new Object[]{pkField + " as uuid"}, changeDataList.toArray()); if (!DataTableEntity.isEmpty(dataTableEntity)) { Object[] uuids = dataTableEntity.getUuids(); add_ids = new ArrayList<>(); for (int i = 0; i < uuids.length; i++) { String pkValue = (String) uuids[i]; if (NumberUtil.isNumber(pkValue)) { add_ids.add(NumberUtil.parseInt(pkValue)); } } } } if (changeDataIds != null && changeDataIds.size() > 0) { if (add_ids == null) { add_ids = new ArrayList<>(); } add_ids.addAll(changeDataIds); } IViewDataProcessService iViewDataProcessService = (IViewDataProcessService) getProxyInstance(this.iViewDataProcessService); iViewDataProcessService.updateSyncRecord(tableName, add_ids.toArray(new Integer[]{}), updateIds); } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } private String getFilterByMap(Map filterMap) { StringBuffer filter = new StringBuffer(); if (filterMap != null) { filterMap.forEach((k, v) -> { if (filter.length() > 0) { filter.append(" AND "); } if (v == null) { filter.append(" ( `").append(k).append("` is null ) "); } else { filter.append(" ( `").append(k).append("` = '").append(v).append("') "); } }); } return filter.insert(0, " ( ").append(" ) ").toString(); } public int getMaxIdValue(String tableName) { String pkField = baseDao.getPKField(tableName); if (!StringUtils.isEmpty(pkField)) { StringBuilder sql = new StringBuilder("SELECT "); sql.append("IFNULL(MAX(").append(pkField).append("),0) maxId FROM ").append(tableName); FieldSetEntity fieldSetBySQL = baseDao.getFieldSetBySQL(sql.toString(), new Object[]{}, false); if (fieldSetBySQL != null) { Integer maxId = fieldSetBySQL.getInteger("maxId"); return maxId == null ? -1 : maxId.intValue(); } } return -1; } /** * 原生sql批量插入 * 覆盖同步 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一标识字段 * @param syncMap 同步字段 * @Auth cheng 2022年1月17日21:29:42 */ public List batchAddData(Connection conn, Map map, Map sileMap, Map syncMap) { //系统表名 String tableName = map.get("tableName"); //每页查询条数 final int pageSize = 100000; //批处理条数 final int batchCount = 5000; map.put("pageSize", pageSize + ""); //日志组表uuid String logUuid = map.get("logUuid"); SpringMVCContextHolder.getSystemLogger().info("开始批量同步数据,每次查询条数:" + pageSize + ",批处理条数:" + batchCount); List addDataRecord = new ArrayList<>(); // int maxIdValue = getMaxIdValue(tableName); Connection currentConn = null; try { //当前数据库连接 currentConn = ConnectionManager.getConnection(); System.out.println(new Date().getTime()); currentConn.prepareStatement("truncate table " + tableName).execute(); System.out.println(new Date().getTime()); currentConn.setAutoCommit(false); //此变量插入顺序不可更改 List fieldSet = new ArrayList<>(); fieldSet.addAll(syncMap.keySet()); fieldSet.addAll(sileMap.keySet()); map.put("fileName", ArrayUtil.join(fieldSet.toArray(), ",")); Map insertDefaultValues = getInsertDefaultValues(tableName, syncMap, sileMap); fieldSet.addAll(insertDefaultValues.keySet()); fieldSet.add(CmnConst.UUID); StringBuilder sqlTemplate = new StringBuilder(); sqlTemplate.append(" INSERT INTO `").append(tableName).append("` ("); int i = 0; Iterator iterator = fieldSet.iterator(); // 字段 StringBuilder fieldTemplate = new StringBuilder(); //占位符 StringBuilder placeholderTemplate = new StringBuilder(); while (iterator.hasNext()) { String fieldName = iterator.next(); if (i > 0) { fieldTemplate.append(","); placeholderTemplate.append(","); } else { i++; } placeholderTemplate.append("?"); fieldTemplate.append("`").append(fieldName).append("`"); } sqlTemplate.append(fieldTemplate).append(") values (").append(placeholderTemplate).append(")"); // fieldTemplate.setLength(0); placeholderTemplate.setLength(0); System.out.println(sqlTemplate); //数据条数 当达到批处理提交的数量时会归零 int dataNumber = 0; //当前页 int currentPage = 1; //总条数 int totalCount = 0; int errorCount = 0; PreparedStatement pst = currentConn.prepareStatement(sqlTemplate.toString()); long currentPageStart = System.currentTimeMillis(); do { dataNumber = 0; // ResultSet resultSet = conn.prepareStatement("SELECT " + fieldTemplate.toString() + " FROM " + sourceTable).executeQuery(); long l = System.currentTimeMillis(); ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); SpringMVCContextHolder.getSystemLogger().info("结束查询数据,当前页:" + currentPage + ",耗时:" + (System.currentTimeMillis() - l) + "ms"); // List uuidList = new ArrayList<>(); if (resultSet != null) { int columnCount = -1; while (resultSet.next()) { if (columnCount == -1) { columnCount = resultSet.getMetaData().getColumnCount(); } int j = 1; for (; j <= columnCount; j++) { Object value = resultSet.getObject(j); pst.setObject(j, value); } for (Map.Entry v : insertDefaultValues.entrySet()) { pst.setObject(j, v.getValue()); j++; } pst.setObject(j, IdUtil.randomUUID()); pst.addBatch(); //数据条数+1 dataNumber++; if (dataNumber % batchCount == 0) { try { //执行批处理 pst.executeBatch(); //提交数据 currentConn.commit(); totalCount += batchCount; SpringMVCContextHolder.getSystemLogger().info("执行批处理并提交,耗时:" + (System.currentTimeMillis() - currentPageStart) + "ms"); currentPageStart = System.currentTimeMillis(); // addDataRecord.addAll(uuidList); } catch (SQLException e) { // 记录提交批次的数量 e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error("执行批处理并提交,错误," + e.getMessage()); SpringMVCContextHolder.getSystemLogger().error(e); errorCount += batchCount; exceptionLog.addSubExceptionLog(logUuid, e, currentPage); } } else { //记录新增批次的uuid // addDataRecord.addAll(uuidList); } } SpringMVCContextHolder.getSystemLogger().info("执行" + currentPage + "页,耗时:" + (System.currentTimeMillis() - l) + "ms"); currentPageStart = System.currentTimeMillis(); currentPage++; map.put("currentPage", currentPage + ""); try { resultSet.close(); } catch (SQLException e) { } } } while (dataNumber == pageSize); totalCount += (dataNumber % batchCount); try { pst.executeBatch(); currentConn.commit(); exceptionLog.upExceptionLog(conn, logUuid, null, totalCount - errorCount, 0, 0, errorCount, totalCount); } catch (SQLException e) { e.printStackTrace(); // addDataRecord = addDataRecord.subList(0, addDataRecord.size() - 2 - dataNumber); SpringMVCContextHolder.getSystemLogger().error("执行批处理并提交,错误," + e.getMessage()); SpringMVCContextHolder.getSystemLogger().error(e); errorCount += dataNumber % batchCount; exceptionLog.addSubExceptionLog(logUuid, e, currentPage - 1); } for (int j = 1; j <= totalCount; j++) { addDataRecord.add(j); } exceptionLog.upExceptionLog(conn, logUuid, null, totalCount - errorCount, 0, 0, errorCount, totalCount); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error("执行数据同步覆盖错误," + e.getMessage()); SpringMVCContextHolder.getSystemLogger().error(e); exceptionLog.upExceptionLog(conn, logUuid, null, 0, 0, 0, 0, 0); } finally { try { if (currentConn == null) { conn.close(); } } catch (Exception e) { } } SpringMVCContextHolder.getSystemLogger().info("批处理同步数据执行完成!!"); return Lists.newArrayList(addDataRecord, null, null); } private Map getInsertDefaultValues(String tableName, Map syncMap, Map sileMap) { // 默认值map 用于插入数据时默认插入字段 key=字段 value=默认值 Map defaultValueMap = new HashMap<>(); defaultValueMap.put(CmnConst.CREATED_BY, 1); defaultValueMap.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime()); defaultValueMap.put("org_level_uuid", "00000000-0000-0000-0000-000000000000"); //获取缓存中 表信息详情 FieldSetEntity tableInfo = BaseUtil.getSingleInfoByCache("所有表信息", new String[]{tableName}); Iterator iterator = defaultValueMap.keySet().iterator(); // 循环判断是否需要增加默认值字段 while (iterator.hasNext()) { String field = iterator.next(); // 判断默认值key 是否在同步字段中 if (syncMap.containsKey(field) || sileMap.containsKey(field)) { // 存在则删除默认值 iterator.remove(); continue; } else { // 检查默认字段在系统表中是否存在 FieldSetEntity fieldInfo = BaseUtil.getSingleInfoByCache("表字段信息", new String[]{tableInfo.getUUID(), field}); if (fieldInfo == null || !fieldInfo.getBoolean("is_required")) { //不存在 iterator.remove(); } } } return defaultValueMap; } /** * 原生sql批量插入 * 只新增数据,插入批量 INSERT INTO语句 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一标识字段 * @param syncMap 同步字段 * @param syncType 1增量或2覆盖同步 */ public List batchAddData(Connection conn, Map map, Map sileMap, Map syncMap, String syncType) { Integer resultRow = 0; //新增数量 Integer addNum = 0; //错误数量 Integer errorNum = 0; //总条数 Integer totalNumber = 0; //系统表名 String tableName = map.get("tableName"); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //获取默认值 Map defaultValueMap = getInsertDefaultValues(tableName, syncMap, sileMap); List addDataRecord = new ArrayList<>(); PreparedStatement mySqlPs = null; //修改影响行数 int result = 0; try { //mysql 连接 Connection mysqlCon = null; do { //判断连接是否为空或是否被关闭 cheng 2022年1月13日16:11:49 if (mysqlCon == null || mysqlCon.isClosed()) { mysqlCon = ConnectionManager.getConnection(); } ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); //新增的DataTableEntity resultRow = 0; StringBuffer insertSql = new StringBuffer(); Integer insert = 0; //新增计数 Integer addNumber = 0; long l = System.currentTimeMillis(); while (resultSet.next()) { resultRow++; //覆盖同步只管新增 if ("2".equals(syncType)) { //通过唯一字段查询数据 //未查询到相同数据就新增数据,查询到就跳过数据 StringBuffer selectSql = new StringBuffer(); selectSql.append(" select count(*) from `").append(tableName).append("` where "); for (String key : sileMap.keySet()) { String keys = resultSet.getString(key); if (keys == null) { selectSql.append("`").append(key).append("` = ").append(keys).append(" AND "); } else { selectSql.append("`").append(key).append("` = \"").append(keys).append("\" AND "); } } String sql = selectSql.substring(0, selectSql.length() - 4); PreparedStatement ps = mysqlCon.prepareStatement(sql); ResultSet resultNumber = ps.executeQuery(); // 如果查到就大于0 while (resultNumber.next()) { result = resultNumber.getInt(1); } } // 大于0 就不新增 if (result == 0) { addNumber++; StringBuffer valueSql = new StringBuffer(); for (String key : syncMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } } for (String key : sileMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } } // 新增插入默认值 cheng 2022年1月13日16:08:34 for (Map.Entry value : defaultValueMap.entrySet()) { valueSql.append("\"").append(value.getValue()).append("\","); } String uuid = UUID.randomUUID().toString(); valueSql.append("\"").append(uuid).append("\","); addDataRecord.add(uuid); valueSql.deleteCharAt(valueSql.length() - 1); if (insertSql.length() > 0) { insertSql.append(",(").append(valueSql).append(")"); } else { StringBuffer keySql = new StringBuffer(); for (String key : syncMap.keySet()) { keySql.append("`").append(syncMap.get(key)).append("`,"); } for (String key : sileMap.keySet()) { keySql.append("`").append(sileMap.get(key)).append("`,"); } // 新增插入默认key cheng 2022年1月13日16:08:34 for (Map.Entry value : defaultValueMap.entrySet()) { keySql.append("`").append(value.getKey()).append("`,"); } keySql.append("`uuid`"); insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql) .append(") VALUES (").append(valueSql).append(")"); } } else { //查到数据,无法新增 errorNum++; } } SpringMVCContextHolder.getSystemLogger().info("结束拼接Inert into 语句,共耗时:" + (System.currentTimeMillis() - l) + "ms"); if (!BaseUtil.strIsNull(insertSql.toString())) { try { insertSql.append(";"); l = System.currentTimeMillis(); mySqlPs = mysqlCon.prepareStatement(insertSql.toString()); insert = mySqlPs.executeUpdate(); SpringMVCContextHolder.getSystemLogger().info("结束运行插入,共耗时:" + (System.currentTimeMillis() - l) + "ms"); addNum = addNum + insert; } catch (SQLException | BaseException e) { e.printStackTrace(); addDataRecord.clear(); errorNum = errorNum + addNumber; exceptionLog.addSubExceptionLog(logUuid, e, currentPage); } } //关闭结果就 //DataManipulationUtils.close(resultSet, null, null); totalNumber = totalNumber + resultRow; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //如果条数小于1000就结束查询 //关闭mysql 注释 cheng 2022年1月13日15:34:52 DataManipulationUtils.close(resultSet, mySqlPs, null); } while (resultRow == 1000); SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, 0, errorNum, totalNumber); //关闭数据源连接 DataManipulationUtils.close(null, null, mysqlCon); DataManipulationUtils.close(null, null, conn); } catch (SQLException e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, 0, errorNum, totalNumber); } return Lists.newArrayList(addDataRecord); } /** * 原生sql批量插入 * 只新增数据,插入批量 INSERT INTO语句 * 根据删除字段删除数据 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一标识字段 * @param syncMap 同步字段 */ public List batchAddDeleteData(Connection conn, Map map, Map sileMap, Map syncMap, Map deleteMap) { Integer resultRow = 0; //新增数量 Integer addNum = 0; //错误数量 Integer errorNum = 0; //总条数 Integer totalNumber = 0; //删除条数 Integer deleteNumber = 0; //系统表名 String tableName = map.get("tableName"); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); List addDataRecord = new ArrayList<>(); List> deleteDataRecord = new ArrayList<>(); PreparedStatement mySqlPs = null; //修改影响行数 int result = 0; try { do { //mysql 连接 Connection mysqlCon = ConnectionManager.getConnection(); ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); //新增的DataTableEntity resultRow = 0; StringBuffer insertSql = new StringBuffer(); Integer insert = 0; //新增计数 while (resultSet.next()) { //是否是删除数据 Boolean is_delete = false; resultRow++; //最优先判断删除标识 for (String key : deleteMap.keySet()) { //删除验证值 String deleteValue = map.get("deleteValue"); //值相同就删除或者不新增 if (deleteValue.equals(resultSet.getString(key))) { is_delete = true; } } //通过唯一字段查询数据 //未查询到相同数据就新增数据,查询到就跳过数据 StringBuffer selectSql = new StringBuffer(); selectSql.append(" select count(*) from `").append(tableName).append("` where "); for (String key : sileMap.keySet()) { String keys = resultSet.getString(key); if (keys == null) { selectSql.append("`").append(key).append("` = ").append(keys).append(" AND "); } else { selectSql.append("`").append(key).append("` = \"").append(keys).append("\" AND "); } } String sql = selectSql.substring(0, selectSql.length() - 4); PreparedStatement ps = mysqlCon.prepareStatement(sql); ResultSet resultNumber = ps.executeQuery(); // 如果查到就大于0 while (resultNumber.next()) { result = resultNumber.getInt(1); } // 大于0 或者是需要删除的数据 if (result == 0) { //如果是要删除的数据 就不新增 if (is_delete) { deleteNumber++; continue; } StringBuffer valueSql = new StringBuffer(); for (String key : syncMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } } for (String key : sileMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } } String uuid = UUID.randomUUID().toString(); valueSql.append("\"").append(uuid).append("\","); addDataRecord.add(uuid); valueSql.deleteCharAt(valueSql.length() - 1); if (insertSql.length() > 0) { insertSql.append(",(").append(valueSql).append(")"); } else { StringBuffer keySql = new StringBuffer(); for (String key : syncMap.keySet()) { keySql.append("`").append(syncMap.get(key)).append("`,"); } for (String key : sileMap.keySet()) { keySql.append("`").append(sileMap.get(key)).append("`,"); } keySql.append("`uuid`"); insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql) .append(") VALUES (").append(valueSql).append(")"); } //查出这条数据,但要执行删除 } else { //执行删除方法 if (is_delete) { StringBuffer deleteSql = new StringBuffer(); deleteSql.append(" delete from `").append(tableName).append("` where "); Map filterMap = new HashMap<>(); for (String key : sileMap.keySet()) { String keys = resultSet.getString(key); filterMap.put(key, resultSet.getString(key)); if (keys == null) { deleteSql.append("`").append(key).append("` = ").append(keys).append(" AND "); } else { deleteSql.append("`").append(key).append("` = \"").append(keys).append("\" AND "); } } String delSql = deleteSql.substring(0, deleteSql.length() - 4); PreparedStatement delps = mysqlCon.prepareStatement(delSql); deleteNumber = deleteNumber + delps.executeUpdate(); deleteDataRecord.add(filterMap); } else { addNum++; } } } if (!BaseUtil.strIsNull(insertSql.toString())) { try { insertSql.append(";"); mySqlPs = mysqlCon.prepareStatement(insertSql.toString()); insert = mySqlPs.executeUpdate(); addNum = addNum + insert; } catch (SQLException | BaseException e) { errorNum = errorNum + insert; exceptionLog.addSubExceptionLog(logUuid, e, currentPage); addDataRecord.clear(); } } //关闭结果就 //DataManipulationUtils.close(resultSet, null, null); totalNumber = totalNumber + resultRow; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //如果条数小于1000就结束查询 //关闭mysql DataManipulationUtils.close(null, mySqlPs, mysqlCon); } while (resultRow == 1000); SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, deleteNumber, errorNum, totalNumber); //关闭数据源连接 DataManipulationUtils.close(null, null, conn); } catch (SQLException e) { SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, deleteNumber, errorNum, totalNumber); } return Lists.newArrayList(addDataRecord, null, deleteDataRecord); } /** * 原生sql批量插入 * 修改为单条修改,修改不成功的语句插入批量 INSERT INTO语句 * 新增修改数据,修改失败就新增 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一标识字段 * @param syncMap 同步字段 */ public List batchSaveData(Connection conn, Map map, Map sileMap, Map syncMap) { Integer resultRow = 0; List list = ListUtils.newArrayList(); //新增数量 Integer addNum = 0; //修改数量 Integer upNum = 0; //错误数量 Integer errorNum = 0; //总条数 Integer totalNumber = 0; //系统表名 String tableName = map.get("tableName"); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //1增量或2覆盖同步 String syncType = map.get(CmnConst.SYNC_TYPE); List addDataRecord = new ArrayList<>(); List> updateDataRecord = new ArrayList<>(); PreparedStatement mySqlPs = null; //修改影响行数 int result = 0; try { do { //mysql 连接 Connection mysqlCon = ConnectionManager.getConnection(); ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); //获取总列数 // ResultSetMetaData rsmd = resultSet.getMetaData(); // resultRow = rsmd.getColumnCount(); //新增的DataTableEntity resultRow = 0; StringBuffer insertSql = new StringBuffer(); Integer insert = 0; //新增计数 Integer addNumber = 0; while (resultSet.next()) { resultRow++; if ("1".equals(syncType)) { StringBuffer updateSql = new StringBuffer(); updateSql.append(" update `").append(tableName).append("` set "); for (String key : syncMap.keySet()) { String keys = resultSet.getString(key); if (keys == null) { updateSql.append("`").append(syncMap.get(key)).append("` = ").append(keys).append(","); } else { updateSql.append("`").append(syncMap.get(key)).append("` = \"").append(keys).append("\","); } } updateSql = updateSql.deleteCharAt(updateSql.length() - 1); updateSql.append(" where "); list.clear(); Map filterMap = new HashMap<>(); for (String key : sileMap.keySet()) { String value = resultSet.getString(key); filterMap.put(key, value); list.add(value); if (value == null) { updateSql.append("`").append(sileMap.get(key)).append("` = ").append(value).append(" AND "); } else { updateSql.append("`").append(sileMap.get(key)).append("` = \"").append(value).append("\" AND "); } } updateSql = updateSql.delete(updateSql.length() - 5, updateSql.length()); updateSql.append(";"); try { mySqlPs = mysqlCon.prepareStatement(updateSql.toString()); result = mySqlPs.executeUpdate();// 返回值代表收到影响的行数 if (result > 0) { updateDataRecord.add(filterMap); } } catch (SQLException | BaseException e) { errorNum++; exceptionLog.addSubExceptionLog(logUuid, list, e); continue; } } //修改不成功 拼装批量INSERT 语句 if (result == 0) { addNumber++; StringBuffer valueSql = new StringBuffer(); for (String key : syncMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } } list.clear(); for (String key : sileMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } list.add(value); } String uuid = UUID.randomUUID().toString(); valueSql.append("\"").append(uuid).append("\","); addDataRecord.add(uuid); valueSql.deleteCharAt(valueSql.length() - 1); if (insertSql.length() > 0) { insertSql.append(",(").append(valueSql).append(")"); } else { StringBuffer keySql = new StringBuffer(); for (String key : syncMap.keySet()) { keySql.append("`").append(syncMap.get(key)).append("`,"); } for (String key : sileMap.keySet()) { keySql.append("`").append(sileMap.get(key)).append("`,"); } keySql.append("`uuid`"); insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql) .append(") VALUES (").append(valueSql).append(")"); } } else { //修改成功计数加一 upNum++; } } if (!BaseUtil.strIsNull(insertSql.toString())) { try { insertSql.append(";"); mySqlPs = mysqlCon.prepareStatement(insertSql.toString()); insert = mySqlPs.executeUpdate(); addNum = addNum + insert; } catch (SQLException | BaseException e) { errorNum = errorNum + addNumber; addDataRecord.clear(); exceptionLog.addSubExceptionLog(logUuid, e, currentPage); } } //关闭结果就 //DataManipulationUtils.close(resultSet, null, null); totalNumber = totalNumber + resultRow; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //如果条数小于1000就结束查询 //关闭mysql DataManipulationUtils.close(null, mySqlPs, mysqlCon); } while (resultRow == 1000); SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, 0, errorNum, totalNumber); //关闭数据源连接 DataManipulationUtils.close(null, null, conn); } catch (SQLException e) { SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, 0, errorNum, totalNumber); } return Lists.newArrayList(addDataRecord, updateDataRecord); } /** * 原生sql批量插入 * 修改为单条修改,修改不成功的语句插入批量 INSERT INTO语句 * 先判断是否要删除, 要删除先删除,然后新增修改数据,修改失败就新增 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一标识字段 * @param syncMap 同步字段 * @param deleteMap 删除字段 */ public List batchSaveDeleteData(Connection conn, Map map, Map sileMap, Map syncMap, Map deleteMap) throws Exception { Integer resultRow = 0; List list = ListUtils.newArrayList(); //新增数量 Integer addNum = 0; //错误数量 Integer errorNum = 0; //总条数 Integer totalNumber = 0; //修改数量 Integer upNum = 0; //删除条数 Integer deleteNumber = 0; //系统表名 String tableName = map.get("tableName"); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); // 当前操作表主键自增(id)字段 if (true) { //拿id字段 再删除此代码 //todo throw new Exception("拿id字段 再删除此代码"); } String pkField = ""; List> deleteDataRecord = new ArrayList<>(); List> updateDataRecord = new ArrayList<>(); List addDataRecord = new ArrayList<>(); PreparedStatement mySqlPs = null; //修改影响行数 int result = 0; try { do { //mysql 连接 Connection mysqlCon = ConnectionManager.getConnection(); ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); //新增的DataTableEntity resultRow = 0; StringBuffer insertSql = new StringBuffer(); Integer insert = 0; //新增计数 Integer addNumber = 0; while (resultSet.next()) { //是否是删除数据 Boolean is_delete = false; resultRow++; //最优先判断删除标识 for (String key : deleteMap.keySet()) { //删除验证值 String deleteValue = map.get("deleteValue"); //值相同就删除或者不新增 if (deleteValue.equals(resultSet.getString(key))) { is_delete = true; } } //通过唯一字段查询数据 //未查询到相同数据就新增数据,查询到就跳过数据 StringBuffer selectSql = new StringBuffer(); selectSql.append(" select count(*) from `").append(tableName).append("` where "); for (String key : sileMap.keySet()) { String keys = resultSet.getString(key); if (keys == null) { selectSql.append("`").append(key).append("` = ").append(keys).append(" AND "); } else { selectSql.append("`").append(key).append("` = \"").append(keys).append("\" AND "); } } String sql = selectSql.substring(0, selectSql.length() - 4); PreparedStatement ps = mysqlCon.prepareStatement(sql); ResultSet resultNumber = ps.executeQuery(); // 如果查到就大于0 while (resultNumber.next()) { result = resultNumber.getInt(1); } // 大于0 或者是需要删除的数据 if (result == 0) { //如果是要删除的数据 就不新增 if (is_delete) { deleteNumber++; continue; } addNumber++; StringBuffer valueSql = new StringBuffer(); for (String key : syncMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } } list.clear(); for (String key : sileMap.keySet()) { String value = resultSet.getString(key); if (value == null) { valueSql.append(value).append(","); } else { valueSql.append("\"").append(value).append("\","); } list.add(value); } UUID uuid = UUID.randomUUID(); valueSql.append("\"").append(uuid).append("\","); addDataRecord.add(uuid.toString()); valueSql.deleteCharAt(valueSql.length() - 1); if (insertSql.length() > 0) { insertSql.append(",(").append(valueSql).append(")"); } else { StringBuffer keySql = new StringBuffer(); for (String key : syncMap.keySet()) { keySql.append("`").append(syncMap.get(key)).append("`,"); } for (String key : sileMap.keySet()) { keySql.append("`").append(sileMap.get(key)).append("`,"); } keySql.append("`uuid`"); insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql) .append(") VALUES (").append(valueSql).append(")"); } //查出这条数据,但要执行删除 } else { //执行删除方法 if (is_delete) { StringBuffer deleteSql = new StringBuffer(); deleteSql.append(" delete from `").append(tableName).append("` where "); Map record = new HashMap<>(); for (String key : sileMap.keySet()) { String keys = resultSet.getString(key); record.put(key, keys); if (keys == null) { deleteSql.append("`").append(key).append("` = ").append(keys).append(" AND "); } else { deleteSql.append("`").append(key).append("` = \"").append(keys).append("\" AND "); } } deleteDataRecord.add(record); String delSql = deleteSql.substring(0, deleteSql.length() - 4); PreparedStatement delps = mysqlCon.prepareStatement(delSql); deleteNumber = deleteNumber + delps.executeUpdate(); deleteDataRecord.add(record); //查到了数据,并且不删除。那更新该条数据 } else { StringBuffer updateSql = new StringBuffer(); updateSql.append(" update `").append(tableName).append("` set "); for (String key : syncMap.keySet()) { String keys = resultSet.getString(key); if (keys == null) { updateSql.append("`").append(syncMap.get(key)).append("` = ").append(keys).append(","); } else { updateSql.append("`").append(syncMap.get(key)).append("` = \"").append(keys).append("\","); } } updateSql = updateSql.deleteCharAt(updateSql.length() - 1); updateSql.append(" where "); list.clear(); Map record = new HashMap<>(); for (String key : sileMap.keySet()) { String value = resultSet.getString(key); record.put(key, value); list.add(value); if (value == null) { updateSql.append("`").append(sileMap.get(key)).append("` = ").append(value).append(" AND "); } else { updateSql.append("`").append(sileMap.get(key)).append("` = \"").append(value).append("\" AND "); } } updateSql = updateSql.delete(updateSql.length() - 5, updateSql.length()); updateSql.append(";"); try { mySqlPs = mysqlCon.prepareStatement(updateSql.toString()); result = mySqlPs.executeUpdate();// 返回值代表收到影响的行数 upNum++; updateDataRecord.add(record); } catch (SQLException | BaseException e) { errorNum++; exceptionLog.addSubExceptionLog(logUuid, list, e); continue; } } } } if (!BaseUtil.strIsNull(insertSql.toString())) { try { insertSql.append(";"); mySqlPs = mysqlCon.prepareStatement(insertSql.toString()); insert = mySqlPs.executeUpdate(); addNum = addNum + insert; } catch (SQLException | BaseException e) { errorNum = errorNum + addNumber; addDataRecord.clear(); exceptionLog.addSubExceptionLog(logUuid, e, currentPage); } } //关闭结果就 //DataManipulationUtils.close(resultSet, null, null); totalNumber = totalNumber + resultRow; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //如果条数小于1000就结束查询 //关闭mysql DataManipulationUtils.close(null, mySqlPs, mysqlCon); } while (resultRow == 1000); SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, deleteNumber, errorNum, totalNumber); //关闭数据源连接 DataManipulationUtils.close(null, null, conn); } catch (SQLException e) { SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, deleteNumber, errorNum, totalNumber); } return Lists.newArrayList(addDataRecord, updateDataRecord, deleteDataRecord); } /** * FieldSetEntity 单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param savePreEvent 保存前调用方法 * @param postSaveEvent 保存后调用方法 * @param is_uuid 是否有uuid字段 */ public void batchSaveFieldSetData(Connection conn, Map map, Map sileMap, Map syncMap, String savePreEvent, String postSaveEvent, boolean is_uuid) { Integer resultRow = 0; List list = ListUtils.newArrayList(); //新增数量 Integer addNum = 0; //修改数量 Integer upNum = 0; //错误数量 Integer errorNum = 0; //总条数 Integer totalNumber = 0; //系统表名 String tableName = map.get("tableName"); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //1增量或2覆盖同步 String syncType = map.get(CmnConst.SYNC_TYPE); try { do { ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); resultRow = 0; while (resultSet.next()) { FieldSetEntity fieldSet = new FieldSetEntity(); fieldSet.setTableName(tableName); resultRow++; StringBuffer condition = new StringBuffer(); for (String key : syncMap.keySet()) { fieldSet.setValue(syncMap.get(key), resultSet.getString(key)); } list.clear(); for (String key : sileMap.keySet()) { String value = resultSet.getString(key); String fieldName = sileMap.get(key); fieldSet.setValue(fieldName, value); condition.append(fieldName).append(" = ? AND "); list.add(value); } //拼接修改语句 如果未修改成功记录 就把数据放入新增的dataTableEntity boolean is_update = false; try { //调用保存前方法 if (!BaseUtil.strIsNull(savePreEvent) && savePreEvent.indexOf(".") != -1) { try { DataManipulationUtils.codeCalls(savePreEvent, fieldSet); } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { e.printStackTrace(); exceptionLog.addSubExceptionLog(logUuid, list, e); } } String term = condition.substring(0, condition.length() - 4); if ("1".equals(syncType)) { FieldSetEntity fieldSetEntityByFilter = baseDao.getFieldSetEntityByFilter(tableName, term, list.toArray(new String[]{}), false); //含有uuid if (is_uuid) { is_update = baseDao.update(fieldSet); //如果查出了系统原数据 } else if (fieldSetEntityByFilter != null) { fieldSet.setValue("uuid", fieldSetEntityByFilter.getString("uuid")); is_update = baseDao.update(fieldSet); // 没有uuid 并且没有源数据就新增 } else { is_update = false; } } } catch (BaseException e) { errorNum++; exceptionLog.addSubExceptionLog(logUuid, list, e); continue; } //修改不成功 if (!is_update) { addNum++; baseDao.add(fieldSet); } else { //修改成功计数加一 upNum++; } //调用保存后方法 if (!BaseUtil.strIsNull(postSaveEvent) && postSaveEvent.indexOf(".") != -1) { try { DataManipulationUtils.codeCalls(postSaveEvent, fieldSet); } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { e.printStackTrace(); exceptionLog.addSubExceptionLog(logUuid, list, e); } } } totalNumber = totalNumber + resultRow; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //如果条数小于1000就结束查询 } while (resultRow == 1000); SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, 0, errorNum, totalNumber); } catch (SQLException e) { SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, 0, errorNum, totalNumber); } } /** * FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。 * 查询到1000条数据开一个线程处理,创建可缓存线程池 * 新增同步,只做新增操作 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param savePreEvent 保存前调用方法 * @param postSaveEvent 保存后调用方法 * @param is_uuid 是否有uuid字段 */ public List runnableBatchAddFieldSetData(Connection conn, Map map, Map sileMap, Map syncMap, String savePreEvent, String postSaveEvent, boolean is_uuid) { //新增数量 Integer addNum = 0; //错误数量 Integer errorNum = 0; //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //保存前调用 map.put("savePreEvent", savePreEvent); //保存后调用 map.put("postSaveEvent", postSaveEvent); //是否有uuid字段 map.put("is_uuid", String.valueOf(is_uuid)); List addDataRecord = new ArrayList<>(); Integer totalNumber = 0; //线程安全的List List> result = Collections.synchronizedList(Lists.newArrayList()); try { ExecutorService exec = Executors.newCachedThreadPool(); //查询数据条数 Integer number = DataManipulationUtils.getResultSetRow(conn, map); totalNumber = number; do { ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); DataAddSynchronization async = new DataAddSynchronization(resultSet, map, sileMap, syncMap); async.setExceptionLog(exceptionLog); async.setBaseDao(baseDao); //回调写法 async.setCallBack((Object[] t) -> { if (t != null && t[0] != null) { result.add((Map) t[0]); } }); Thread t = new Thread(async); number = number - 1000; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //放入用于获取线程状态 并执行t.start(); exec.submit(t); //如果条数小于1000就结束查询 } while (number > 0); exec.shutdown(); while (true) { if (exec.isTerminated()) { //System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } //idea 写法 // int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum(); for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); List> changeDataKeys = (List>) m.get("changeDataKeys"); if (changeDataKeys != null && changeDataKeys.size() > 0) { addDataRecord.addAll(changeDataKeys.get(0)); } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, 0, errorNum, totalNumber); } catch (SQLException | InterruptedException e) { for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); List> changeDataKeys = (List>) m.get("changeDataKeys"); if (changeDataKeys != null && changeDataKeys.size() > 0) { addDataRecord.addAll(changeDataKeys.get(0)); } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, 0, errorNum, totalNumber); } return Lists.newArrayList(addDataRecord); } /** * FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。 * 查询到1000条数据开一个线程处理,创建可缓存线程池 * 新增修改同步,修改失败就新增 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param deleteMap 删除字段 * @param savePreEvent 保存前调用方法 * @param postSaveEvent 保存后调用方法 * @param is_uuid 是否有uuid字段 */ public List> runnableBatchSaveDeleteFieldSetData(Connection conn, Map map, Map sileMap, Map syncMap, Map deleteMap, String savePreEvent, String postSaveEvent, boolean is_uuid) { //新增数量 Integer addNum = 0; //修改数量 Integer upNum = 0; //删除条数 Integer deleteNum = 0; //错误数量 Integer errorNum = 0; // 新增、修改、删除 对应的uuid 顺序对应集合中的下表 2022年1月13日21:24:53 cheng List> executeDataKeys = Lists.newArrayList(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //保存前调用 map.put("savePreEvent", savePreEvent); //保存后调用 map.put("postSaveEvent", postSaveEvent); //是否有uuid字段 map.put("is_uuid", String.valueOf(is_uuid)); Integer totalNumber = 0; //线程安全的List List> result = Collections.synchronizedList(Lists.newArrayList()); try { ExecutorService exec = Executors.newCachedThreadPool(); //查询数据条数 Integer number = DataManipulationUtils.getResultSetRow(conn, map); totalNumber = number; do { ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); DataSaveDeleteSynchronization async = new DataSaveDeleteSynchronization(resultSet, map, sileMap, syncMap, deleteMap); async.setExceptionLog(exceptionLog); async.setBaseDao(baseDao); //回调写法 async.setCallBack((Object[] t) -> { if (t != null && t[0] != null) { result.add((Map) t[0]); } }); Thread t = new Thread(async); number = number - 1000; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //放入用于获取线程状态 并执行t.start(); exec.submit(t); //如果条数小于1000就结束查询 } while (number > 0); exec.shutdown(); while (true) { if (exec.isTerminated()) { //System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } //idea 写法 for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); upNum = upNum + Integer.valueOf((String) m.get("upNum")); deleteNum = deleteNum + Integer.valueOf((String) m.get("deleteNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); List changeDataKeys = (List) m.get("changeDataKeys"); if (changeDataKeys != null) { for (int j = 0; j < changeDataKeys.size(); j++) { List objects = executeDataKeys.get(j); if (objects == null) { objects = new ArrayList<>(); } objects.addAll((List) changeDataKeys.get(j)); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, deleteNum, errorNum, totalNumber); } catch (SQLException | InterruptedException e) { for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); upNum = upNum + Integer.valueOf((String) m.get("upNum")); deleteNum = deleteNum + Integer.valueOf((String) m.get("deleteNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); List changeDataKeys = (List) m.get("changeDataKeys"); if (changeDataKeys != null) { for (int j = 0; j < changeDataKeys.size(); j++) { List objects = executeDataKeys.get(j); if (objects == null) { objects = new ArrayList<>(); } objects.addAll((List) changeDataKeys.get(j)); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, deleteNum, errorNum, totalNumber); } return executeDataKeys; } /** * FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。 * 查询到1000条数据开一个线程处理,创建可缓存线程池 * 新增修改同步,修改失败就新增 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param savePreEvent 保存前调用方法 * @param postSaveEvent 保存后调用方法 * @param is_uuid 是否有uuid字段 */ public List runnableBatchSaveFieldSetData(Connection conn, Map map, Map sileMap, Map syncMap, String savePreEvent, String postSaveEvent, boolean is_uuid) { //新增数量 Integer addNum = 0; //修改数量 Integer upNum = 0; //错误数量 Integer errorNum = 0; //源数据表名 String dataOriginName = map.get("dataOriginName"); //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //保存前调用 map.put("savePreEvent", savePreEvent); //保存后调用 map.put("postSaveEvent", postSaveEvent); //是否有uuid字段 map.put("is_uuid", String.valueOf(is_uuid)); // 新增、修改、删除 对应的uuid 顺序对应集合中的下表 2022年1月13日21:24:53 cheng List executeDataKeys = Lists.newArrayList(new ArrayList<>(), new ArrayList<>(), new ArrayList<>()); Integer totalNumber = 0; //线程安全的List List> result = Collections.synchronizedList(Lists.newArrayList()); try { ExecutorService exec = Executors.newCachedThreadPool(); //查询数据条数 Integer number = DataManipulationUtils.getResultSetRow(conn, map); totalNumber = number; do { ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); DataSaveSynchronization async = new DataSaveSynchronization(resultSet, map, sileMap, syncMap); async.setExceptionLog(exceptionLog); async.setBaseDao(baseDao); //回调写法 async.setCallBack((Object[] t) -> { if (t != null && t[0] != null) { result.add((Map) t[0]); } }); Thread t = new Thread(async); number = number - 1000; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //放入用于获取线程状态 并执行t.start(); exec.submit(t); //如果条数小于1000就结束查询 } while (number > 0); exec.shutdown(); while (true) { if (exec.isTerminated()) { //System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } //idea 写法 // int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum(); for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); upNum = upNum + Integer.valueOf((String) m.get("upNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); List changeDataKeys = (List) m.get("changeDataKeys"); if (changeDataKeys != null) { for (int j = 0; j < changeDataKeys.size(); j++) { List objects = (List) executeDataKeys.get(j); if (objects == null) { objects = new ArrayList<>(); } objects.addAll((List) changeDataKeys.get(j)); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, 0, errorNum, totalNumber); } catch (SQLException | InterruptedException e) { for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); upNum = upNum + Integer.valueOf((String) m.get("upNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); List changeDataKeys = (List) m.get("changeDataKeys"); if (changeDataKeys != null) { for (int j = 0; j < changeDataKeys.size(); j++) { List objects = (List) executeDataKeys.get(j); if (objects == null) { objects = new ArrayList<>(); } objects.addAll((List) changeDataKeys.get(j)); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, 0, errorNum, totalNumber); } return executeDataKeys; } /** * FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。 * 查询到1000条数据开一个线程处理,创建可缓存线程池 * 新增同步,只做新增操作 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param deleteMap 删除字段 * @param savePreEvent 保存前调用方法 * @param postSaveEvent 保存后调用方法 * @param is_uuid 是否有uuid字段 */ public List runnableBatchAddDeleteFieldSetData(Connection conn, Map map, Map sileMap, Map syncMap, Map deleteMap, String savePreEvent, String postSaveEvent, boolean is_uuid) { //新增数量 Integer addNum = 0; //错误数量 Integer errorNum = 0; //删除数量 Integer deleteNumber = 0; //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //保存前调用 map.put("savePreEvent", savePreEvent); //保存后调用 map.put("postSaveEvent", postSaveEvent); //是否有uuid字段 map.put("is_uuid", String.valueOf(is_uuid)); Integer totalNumber = 0; //线程安全的List List> result = Collections.synchronizedList(Lists.newArrayList()); List resultChangeRecord = Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList()); try { ExecutorService exec = Executors.newCachedThreadPool(); //查询数据条数 Integer number = DataManipulationUtils.getResultSetRow(conn, map); totalNumber = number; do { ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); DataAddDeleteSynchronization async = new DataAddDeleteSynchronization(resultSet, map, sileMap, syncMap, deleteMap); async.setExceptionLog(exceptionLog); async.setBaseDao(baseDao); //回调写法 async.setCallBack((Object[] t) -> { if (t != null && t[0] != null) { result.add((Map) t[0]); } }); Thread t = new Thread(async); number = number - 1000; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //放入用于获取线程状态 并执行t.start(); exec.submit(t); //如果条数小于1000就结束查询 } while (number > 0); exec.shutdown(); while (true) { if (exec.isTerminated()) { //System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } //idea 写法 // int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum(); for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber")); List changeKeys = (List) m.get("changeKeys"); if (changeKeys != null && changeKeys.size() > 0) { for (int j = 0; j < changeKeys.size(); j++) { List list = (List) resultChangeRecord.get(j); list.addAll((List) changeKeys.get(j)); resultChangeRecord.set(i, list); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, deleteNumber, errorNum, totalNumber); } catch (SQLException | InterruptedException e) { for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber")); List changeKeys = (List) m.get("changeKeys"); if (changeKeys != null && changeKeys.size() > 0) { for (int j = 0; j < changeKeys.size(); j++) { List list = (List) resultChangeRecord.get(j); list.addAll((List) changeKeys.get(j)); resultChangeRecord.set(i, list); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, deleteNumber, errorNum, totalNumber); } return resultChangeRecord; } /** * FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。 * 查询到1000条数据开一个线程处理,创建可缓存线程池 * 新增同步,只做新增操作 * * @param conn 数据库连接 * @param map 同步配置表的数据 * @param sileMap 唯一字段 * @param syncMap 同步字段 * @param deleteMap 删除字段 * @param savePreEvent 保存前调用方法 * @param postSaveEvent 保存后调用方法 * @param is_uuid 是否有uuid字段 */ public List runnableBatcSaveDeleteFieldSetData(Connection conn, Map map, Map sileMap, Map syncMap, Map deleteMap, String savePreEvent, String postSaveEvent, boolean is_uuid) { //新增数量 Integer addNum = 0; //错误数量 Integer errorNum = 0; //删除数量 Integer deleteNumber = 0; //当前页 Integer currentPage = Integer.parseInt(map.get("currentPage")); //日志组表uuid String logUuid = map.get("logUuid"); //保存前调用 map.put("savePreEvent", savePreEvent); //保存后调用 map.put("postSaveEvent", postSaveEvent); //是否有uuid字段 map.put("is_uuid", String.valueOf(is_uuid)); Integer totalNumber = 0; List resultChangeRecord = Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList()); //线程安全的List List> result = Collections.synchronizedList(Lists.newArrayList()); try { ExecutorService exec = Executors.newCachedThreadPool(); //查询数据条数 Integer number = DataManipulationUtils.getResultSetRow(conn, map); totalNumber = number; do { ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map); DataAddDeleteSynchronization async = new DataAddDeleteSynchronization(resultSet, map, sileMap, syncMap, deleteMap); async.setExceptionLog(exceptionLog); async.setBaseDao(baseDao); //回调写法 async.setCallBack((Object[] t) -> { if (t != null && t[0] != null) { result.add((Map) t[0]); } }); Thread t = new Thread(async); number = number - 1000; currentPage++; map.put("currentPage", String.valueOf(currentPage)); //放入用于获取线程状态 并执行t.start(); exec.submit(t); //如果条数小于1000就结束查询 } while (number > 0); exec.shutdown(); while (true) { if (exec.isTerminated()) { //System.out.println("所有的子线程都结束了!"); break; } Thread.sleep(1000); } //idea 写法 // int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum(); for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber")); List changeKeys = (List) m.get("changeKeys"); if (changeKeys != null && changeKeys.size() > 0) { for (int j = 0; j < changeKeys.size(); j++) { List list = (List) resultChangeRecord.get(j); list.addAll((List) changeKeys.get(j)); resultChangeRecord.set(i, list); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, deleteNumber, errorNum, totalNumber); } catch (SQLException | InterruptedException e) { for (int i = 0; i < result.size(); i++) { Map m = result.get(i); addNum = addNum + Integer.valueOf((String) m.get("addNum")); errorNum = errorNum + Integer.valueOf((String) m.get("errorNum")); deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber")); List changeKeys = (List) m.get("changeKeys"); if (changeKeys != null && changeKeys.size() > 0) { for (int j = 0; j < changeKeys.size(); j++) { List list = (List) resultChangeRecord.get(j); list.addAll((List) changeKeys.get(j)); resultChangeRecord.set(i, list); } } } SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum); SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber); SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum); SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber); exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, deleteNumber, errorNum, totalNumber); } return resultChangeRecord; } public void testDataDispose(FieldSetEntity fse) { fse.setValue("su01", "【" + fse.getString("su01") + "】"); } }