package com.product.data.sync.util; import cn.hutool.core.util.IdUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.product.common.lang.DateUtils; import com.product.common.lang.StringUtils; import com.product.common.utils.spring.SpringUtils; import com.product.core.cache.DataPoolCacheImpl; import com.product.core.config.Global; import com.product.core.dao.BaseDao; import com.product.core.dao.support.BaseDaoImpl; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.sync.config.CmnConst; import com.product.data.sync.config.SystemCode; import com.product.util.CallBack; import com.product.util.CallBackValue; import java.sql.*; import java.util.*; /** * @desc 批处理数据 * @Author cheng * @Date 2022/1/19 10:17 * @Desc */ public class BatchData { private Map defaultValueMap = new HashMap<>(); /** * 当前连接 */ private Connection currentConnection; /** * 来源连接 */ private Connection sourceConnection; /** * 批处理条数 */ private int batchCount; /** * 每页查询条数 */ private int pageSize; private BaseDao baseDao; private ExceptionLog exceptionLog; /** * 系统表 */ private String systemTable; /** * 数据来源表 */ private String sourceTable; private boolean isUpdate; private boolean isDelete; private FieldSetEntity syncFieldSet; /** * 要添加的字段结合 */ private String[] columnNames; private String templateSql; private int currentPage = 1; private boolean threadSelect = false; private String mainTableName; private ThreadSelectManager threadSelectManager; private boolean totalNumberSelect = false; private int totalPage = -1; public Connection getCurrentConnection() { return currentConnection; } public Connection getSourceConnection() { return sourceConnection; } public BaseDao getBaseDao() { return baseDao; } public String getSystemTable() { return systemTable; } public String getSourceTable() { return sourceTable; } public int getCurrentPage() { return currentPage; } public int getPageSize() { return pageSize; } public Map getDefaultValueMap() { return defaultValueMap; } public String[] getColumnNames() { return columnNames; } public void setColumnNames(String[] columnNames) { this.columnNames = columnNames; } public int getTotalPage() { return totalPage; } /** * 默认构造 * 批处理 2000 * 每页查询 100000 */ public BatchData(FieldSetEntity syncConfig) { initOperation(syncConfig); currentConnection = ConnectionManager.getConnection(); pageSize = 100000; batchCount = 2000; } /** * 初始构造 * 初始化连接 * * @param batchCount 批处理条数 * @param pageSize 每页查询条数 */ public BatchData(FieldSetEntity syncConfig, int batchCount, int pageSize) { initOperation(syncConfig); currentConnection = ConnectionManager.getConnection(); this.batchCount = batchCount; this.pageSize = pageSize; } /** * 初始化默认值 */ private void initOperation(FieldSetEntity syncConfig) { this.syncFieldSet = syncConfig; defaultValueMap.put(CmnConst.CREATED_BY, 1); defaultValueMap.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime()); defaultValueMap.put("org_level_uuid", "00000000-0000-0000-0000-000000000000"); CallBackValue getUuidMethod = (o) -> IdUtil.randomUUID(); defaultValueMap.put("uuid", getUuidMethod); this.baseDao = SpringUtils.getBean(BaseDaoImpl.class); this.exceptionLog = SpringUtils.getBean(ExceptionLog.class); getSyncParams(); } /** * 批处理添加 * 自动生成uuid 默认字段 * * @param resultSet 结果集 * @param fieldNames 要插入的字段集合 (可选) 不传入该参数则添加查询结果集中所有的字段 并验证该字段在系统表中是否存在(product_sys_datamodel_field)表 */ public void batchAdd(ResultSet resultSet, Collection fieldNames) { if (resultSet != null && StringUtils.isEmpty(systemTable)) { // 字段set List fieldNameList = fieldNames == null ? Lists.newArrayList() : Lists.newArrayList(fieldNames.iterator()); if (fieldNameList == null || fieldNameList.size() <= 0) { String[] columnNames = getColumnNames(resultSet); Map> systemFieldMap = getSystemFieldByCache(systemTable); //比较字段在缓存中是否存在 this.comparisonFieldName(systemTable, columnNames, fieldNameList, systemFieldMap); //添加默认字段 defaultValueMap.forEach((k, v) -> { //判断默认字段是否在缓存中 或字段是已在查询结果中 if (systemFieldMap.get(k) != null) { fieldNameList.add(k); } }); } try { // 判断是否有待添加的字段 if (fieldNameList.size() > 0) { String templateSql = getInsertTemplateSql(systemTable, fieldNameList); //将结果集移到最后一行 resultSet.last(); //记录总行数 int totalRow = resultSet.getRow(); //再见结果集移到第一行 resultSet.first(); while (resultSet.next()) { } } } catch (SQLException e) { e.printStackTrace(); } } } /** * 获取插入字段 * * @param resultSet * @param fieldNameList * @return */ private List getInsertFieldNames(CustomResultSet resultSet, List fieldNameList) { if (fieldNameList == null) { fieldNameList = new ArrayList<>(); } if (this.columnNames == null || this.columnNames.length <= 0) { String[] columnNames = getColumnNames(resultSet.getResultSet()); this.columnNames = columnNames; } Map> systemFieldMap = getSystemFieldByCache(systemTable); //比较字段在缓存中是否存在 this.comparisonFieldName(systemTable, columnNames, fieldNameList, systemFieldMap); //添加默认字段 List finalFieldNameList = fieldNameList; List deleteDefaultKey = new ArrayList<>(); for (Map.Entry v : defaultValueMap.entrySet()) { //判断默认字段是否在缓存中 或字段是已在查询结果中 if (systemFieldMap.get(v.getKey()) != null) { finalFieldNameList.add(v.getKey()); } else { deleteDefaultKey.add(v.getKey()); } } if (deleteDefaultKey.size() > 0) { for (String key : deleteDefaultKey) { defaultValueMap.remove(key); } } return finalFieldNameList; } private String getInsertTemplateSql(String systemTable, List fieldNameList) { StringBuilder sqlTemplate = new StringBuilder(); sqlTemplate.append(" INSERT INTO `").append(systemTable).append("` ("); int i = 0; Iterator iterator = fieldNameList.iterator(); // 字段 StringBuilder fieldTemplate = new StringBuilder(); //占位符 StringBuilder placeholderTemplate = new StringBuilder(); while (iterator.hasNext()) { String fieldName = iterator.next(); if (i > 0) { fieldTemplate.append(","); placeholderTemplate.append(","); } else { i++; } placeholderTemplate.append("?"); fieldTemplate.append("`").append(fieldName).append("`"); } sqlTemplate.append(fieldTemplate).append(") values (").append(placeholderTemplate).append(")"); return sqlTemplate.toString(); } private void readResultSet(ResultSet resultSet, List columnNames) { } /** * 比较字段在系统中是否存在 * * @param columnNames 需要比对的字段 * @param fieldNameList 比较后返回的结果字段集合 * @param systemFieldMap 系统字段map 字段名为 key value 为field缓存表的记录 对比后会删除成功对比的字段 */ private void comparisonFieldName(String systemTable, String[] columnNames, List fieldNameList, Map> systemFieldMap) { for (String columnName : columnNames) { if (systemFieldMap.get(columnName) == null) { SpringMVCContextHolder.getSystemLogger().info("警告:查询结果列在系统表中不存在,跳过该字段:" + systemTable + "." + columnName); continue; } else if (!fieldNameList.contains(columnName)) { //字段在集合中不存在 fieldNameList.add(columnName); } defaultValueMap.remove(columnName); systemFieldMap.remove(columnName); } } /** * 获取字段缓存根据表名 * * @param tableName 表名 * @return * @throws BaseException */ private Map> getSystemFieldByCache(String tableName) throws BaseException { DataPoolCacheImpl dataPoolCache = DataPoolCacheImpl.getInstance(); DataTableEntity tableInfo = dataPoolCache.getCacheData("所有表信息", new String[]{tableName}); if (DataTableEntity.isEmpty(tableInfo) || StringUtils.isEmpty(tableInfo.getString(0, CmnConst.UUID))) { throw new BaseException(SystemCode.GET_TABLE_CACHE_FAIL.getValue(), SystemCode.GET_TABLE_CACHE_FAIL.getText()); } String table_uuid = tableInfo.getString(0, CmnConst.UUID); DataTableEntity fieldInfo = dataPoolCache.getCacheData("表字段信息", new String[]{table_uuid}); if (DataTableEntity.isEmpty(fieldInfo)) { throw new BaseException(SystemCode.GET_FIELD_CACHE_FAIL.getValue(), SystemCode.GET_FIELD_CACHE_FAIL.getText()); } Map> resultMap = new HashMap<>(); for (int i = 0; i < fieldInfo.getRows(); i++) { FieldSetEntity fs = fieldInfo.getFieldSetEntity(i); Map values = fs.getValues(); String field_name = fs.getString(CmnConst.FIELD_NAME); resultMap.put(field_name, Maps.newHashMap(values)); } return resultMap; } /** * 获取结果集所有的列名 * * @param resultSet 结果集 * @return * @throws BaseException */ public static String[] getColumnNames(ResultSet resultSet) throws BaseException { try { ResultSetMetaData metaData = resultSet.getMetaData(); if (metaData != null) { int columnCount = getColumnCount(resultSet); String[] fieldName = new String[columnCount]; for (int i = 1; i <= columnCount; i++) { String columnName = metaData.getColumnLabel(i); fieldName[i - 1] = columnName; } return fieldName; } throw new BaseException(SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getValue(), SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getText()); } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); throw new BaseException(SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getValue(), SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getText() + (e.getMessage() != null ? "," + e.getMessage() : "")); } } /** * 获取结果集中列的数量 * * @param resultSet 结果集 * @return * @throws BaseException */ public static int getColumnCount(ResultSet resultSet) throws BaseException { try { ResultSetMetaData metaData = resultSet.getMetaData(); if (metaData != null) { return metaData.getColumnCount(); } } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); throw new BaseException(SystemCode.GET_RESULTSET_COLUMN_COUNT_FAIL.getValue(), SystemCode.GET_RESULTSET_COLUMN_COUNT_FAIL.getText() + (e.getMessage() != null ? "," + e.getMessage() : "")); } return 0; } private void getSyncParams() { DataTableEntity dataTableEntity = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG_SUB, "sync_config_uuid = ?", new String[]{}); String idField = dataTableEntity.getString(0, CmnConst.DATA_ORIGIN_FIELD); //数据源表名 String dataOriginName = syncFieldSet.getString(CmnConst.DATA_ORIGIN_NAME); this.sourceTable = dataOriginName; //获取系统表 String tableName = syncFieldSet.getString(CmnConst.SYSTEM_TABLE_NAME); this.systemTable = tableName; //保存前调用的方法 String savePreEvent = syncFieldSet.getString(CmnConst.SAVE_PRE_EVENT); //保存后调用的方法 String postSaveEvent = syncFieldSet.getString(CmnConst.POST_SAVE_EVENT); //查询条件 String syncCondition = syncFieldSet.getString(CmnConst.SYNC_CONDITION); //是否修改 this.isUpdate = syncFieldSet.getBoolean(CmnConst.IS_UPDATE); //是否删除 this.isUpdate = syncFieldSet.getBoolean(CmnConst.IS_DELETE); // //删除标识值 // String deleteValue = syncFieldSet.getString(CmnConst.DELETE_VALUE); //获取数据库连接信息 // String configUuid = syncFieldSet.getString(CmnConst.DATABASE_CONFIG_UUID); String databaseType = syncFieldSet.getString(CmnConst.DATABASE_TYPE);//数据库类型 String ipAddress = syncFieldSet.getString(CmnConst.IP_ADDRESS);//ip地址 String databaseName = syncFieldSet.getString(CmnConst.DATABASE_NAME);//数据库名称 String portNumber = syncFieldSet.getString(CmnConst.PORT_NUMBER);//端口号 String userName = syncFieldSet.getString(CmnConst.USER_NAME);//用户名 String userPassword = syncFieldSet.getString(CmnConst.USER_PASSWORD);//密码 String instantiation = syncFieldSet.getString(CmnConst.INSTANTIATION);//实例名 String url; String driver; //创建日志 String logUuid = exceptionLog.addExceptionLog(syncFieldSet.getUUID(), dataOriginName, tableName); StringBuilder sql = new StringBuilder(); if ("mysql".equals(databaseType)) { driver = "com.mysql.cj.jdbc.Driver"; url = "jdbc:mysql://" + ipAddress + ":" + portNumber + "/" + databaseName + "?useSSL=false&serverTimezone=UTC"; if (StringUtils.isEmpty(syncCondition)) { sql.append("SELECT ? FROM ? LIMIT ?,?");//参数失败 改为字符串拼接 } else { sql.append("SELECT ? FROM ? ").append(" WHERE ").append(syncCondition).append(" LIMIT ?,?");//参数失败 改为字符串拼接 } } else if ("oracle".equals(databaseType)) { driver = "oracle.jdbc.driver.OracleDriver"; url = "jdbc:oracle:thin:@" + ipAddress + ":" + portNumber + ":orcl"; if (StringUtils.isEmpty(syncCondition)) { sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ROWNUM <= ?) t2 WHERE t2.rowno > ?"); } else { sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ").append(syncCondition).append("AND ROWNUM <= ?) t2 WHERE t2.rowno > ?"); } } else if ("informix".equals(databaseType)) { driver = "com.informix.jdbc.IfxDriver"; url = "jdbc:informix-sqli://" + ipAddress + ":" + portNumber + "/" + databaseName + ":informixserver=" + instantiation; if (StringUtils.isEmpty(syncCondition)) { //跳过?行,获取?行 字段名 表名 sql.append(" SELECT SKIP ? FIRST ? ? FROM ? "); } else { sql.append(" SELECT SKIP ? FIRST ? ? FROM ? WHERE ").append(syncCondition); } } else if ("sqlserver".equals(databaseType)) { SpringMVCContextHolder.getSystemLogger().error("唯一字段获取不严谨:" + idField); driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"; url = "jdbc:sqlserver://" + ipAddress + ":" + portNumber + ";DataBaseName=" + databaseName; if (StringUtils.isEmpty(syncCondition)) { //? 查询条数 ? 查询字段 ? 表名 idField 唯一字段名 ? 开始位置 idField 唯一字段名 ? 表名 sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ?)"); } else { sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ? WHERE ").append(syncCondition).append(" ) AND ").append(syncCondition); } } else { BaseException baseException = new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText()); exceptionLog.upExceptionLog(null, logUuid, baseException, 0, 0, 0, 0, 0); throw baseException; } this.sourceConnection = this.getSourceConnection(driver, ipAddress, url, userName, userPassword); } private Connection getSourceConnection(String driver, String ipAddress, String url, String userName, String userPassword) { // 判断 ip地址是否为 / 当ip地址 = / 时 则认为本地同步 String sourceType = Global.getSystemConfig("data.source.type", ""); if (sourceType != null && "/".equals(ipAddress)) { driver = Global.getSystemConfig("data.source.driver", ""); url = Global.getSystemConfig("data.source.url", ""); userName = Global.getSystemConfig("data.source.user", ""); userPassword = Global.getSystemConfig("data.source.password", ""); } // 远程地址获取数据库连接 return ConnectionManager.getConnection(driver, url, userName, userPassword); } /** * @param sql 需要查询的sql * @param systemTable 目标表 * @param pageSize 每页查询条数 * @param batchCount 每批次处理数量 */ public BatchData(String sql, String systemTable, int pageSize, int batchCount) { this.pageSize = pageSize; this.batchCount = batchCount; this.currentConnection = ConnectionManager.getConnection(); this.sourceConnection = ConnectionManager.getConnection(); this.templateSql = sql; this.systemTable = systemTable; defaultValueMap.put(CmnConst.CREATED_BY, 1); defaultValueMap.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime()); defaultValueMap.put("org_level_uuid", "00000000-0000-0000-0000-000000000000"); CallBackValue getUuidMethod = (o) -> IdUtil.randomUUID(); defaultValueMap.put("uuid", getUuidMethod); } /** * 获取分页 * * @param currentPage * @return */ private String getMysqlLimit(int currentPage) { return " limit " + (currentPage == 1 ? 0 : (currentPage - 1) * this.pageSize) + "," + this.pageSize; } public static void main(String[] args) { String sql = "SELECT * FROM TABLE [[TABLE_NAME]]"; String substring = sql.substring(sql.indexOf("[[") + 2, sql.indexOf("]]")); System.out.println(substring); } /** * 获取查询sql * * @param currentPage * @return */ public String getQuerySql(int currentPage) { String mysqlLimit = this.getMysqlLimit(currentPage); if (!StringUtils.isEmpty(this.templateSql)) { int startIndex = this.templateSql.indexOf("[["); if (startIndex != -1 && this.templateSql.indexOf("]]") > startIndex) { this.mainTableName = this.templateSql.substring(startIndex + 2, this.templateSql.indexOf("]]") - "_limit".length()); return this.templateSql.replace("[[" + mainTableName + "_limit" + "]]", mysqlLimit); } } return templateSql + mysqlLimit; } /** * 批处理导入数据 用于数据同步结束后数据处理方法 * * @param isDelete 是否删除目标表数据 * @param callBack 自定义回调 传入每条数据的 map key=字段名称 value=值 不可改变map中的key (增加或修改) 只能改变值 */ public void batchImprovedCoverage(boolean isDelete, CallBack callBack) { long startTime = System.currentTimeMillis(); SpringMVCContextHolder.getSystemLogger().info("开始处理同步表数据,准备插入数据表:" + this.systemTable); int pageDataNumber; String templateSql = null; int totalCount = -1; List insertFieldNames = null; try { if (isDelete) { PreparedStatement preparedStatement = this.currentConnection.prepareStatement("truncate table " + this.systemTable); preparedStatement.execute(); preparedStatement.close(); } this.currentConnection.setAutoCommit(false); PreparedStatement pst = null; int dataNumber = 0; List fieldNames = new ArrayList<>(); do { pageDataNumber = 0; if ((this.currentPage - 1) % 3 == 0) { this.sourceConnection.close(); this.sourceConnection = ConnectionManager.getConnection(); } String query_sql = getQuerySql(this.currentPage); if (totalCount == -1 && this.mainTableName != null) { //开启关联主表分页查询 ResultSet resultSet = this.currentConnection.prepareStatement("SELECT ifnull(COUNT(1),0) totalCount FROM " + this.mainTableName).executeQuery(); if (resultSet != null) { if (resultSet.next()) { //查询到总条数 totalCount = resultSet.getInt("totalCount"); if (totalCount > 0) { double v = totalCount / Double.valueOf(this.pageSize); this.totalPage = (int) Math.ceil(v); if (this.totalPage > 0) { //能够获取到总页数则开启线程查询 this.totalNumberSelect = true; // threadSelectManager = new ThreadSelectManager(this); // if (threadSelectManager.startQuery()) { // this.threadSelect = true; // } } } } } } long s = System.currentTimeMillis(); CustomResultSet resultSet; if (!this.threadSelect) { SpringMVCContextHolder.getSystemLogger().info("查询第 " + this.currentPage + " 页数据"); SpringMVCContextHolder.getSystemLogger().info("查询sql:" + query_sql); resultSet = new CustomResultSet(this.sourceConnection.prepareStatement(query_sql).executeQuery()); SpringMVCContextHolder.getSystemLogger().info("查询第 " + this.currentPage + " 页数据结束,耗时:" + (System.currentTimeMillis() - s) + "ms"); } else { resultSet = threadSelectManager.getNext(); SpringMVCContextHolder.getSystemLogger().info("获取下一页页数据结束,耗时:" + (System.currentTimeMillis() - s) + "ms"); } if (resultSet != null) { if (templateSql == null) { // 获取添加字段 insertFieldNames = getInsertFieldNames(resultSet, null); // 获取插入的sql模板 templateSql = getInsertTemplateSql(systemTable, insertFieldNames); if (StringUtils.isEmpty(templateSql)) { // 没有获取到insert 语句 throw new BaseException(new Exception("")); } pst = this.currentConnection.prepareStatement(templateSql); } if (this.columnNames == null || this.columnNames.length < 0) { SpringMVCContextHolder.getSystemLogger().error("错误,获取插入字段集合失败"); throw new Exception("错误,获取插入字段集合失败"); } while (resultSet.next()) { pageDataNumber++; dataNumber++; int i = 1; Map values = callBack != null ? new HashMap<>() : null; //是否需要插入字段到集合中 回调不为空时 boolean isInsertField = fieldNames.size() >= 0 && callBack != null; for (; i <= this.columnNames.length; i++) { // 是否插入字段到集合 if (isInsertField) { fieldNames.add(this.columnNames[i - 1]); } // 如果回调 不是空将value放入map if (callBack != null) { values.put(this.columnNames[i - 1], resultSet.getObject(this.columnNames[i - 1])); } else { pst.setObject(i, resultSet.getObject(this.columnNames[i - 1])); } } for (Map.Entry v : defaultValueMap.entrySet()) { Object value = v.getValue(); if (v.getValue() instanceof CallBackValue) { CallBackValue callback = (CallBackValue) value; value = callback.method(v.getKey()); } // 如果回调 不是空将value放入map if (callBack == null) { pst.setObject(i, value); } else { values.put(v.getKey(), value); } i++; } if (callBack != null) { int paramSize = values.size(); callBack.method(values); if (values.size() != paramSize) { throw new Exception("执行自定义回调后参数数量不一致,执行前参数数量:" + paramSize + ",执行后参数数量:" + values.size() + "\n\t" + values); } int j = 1; //回调结束后将插入的值写入 for (; j <= insertFieldNames.size(); j++) { Object o = values.get(insertFieldNames.get(j - 1)); pst.setObject(j, o); } } pst.addBatch(); //已遍历的数量是否符合每批次提交的数量 if (dataNumber % this.batchCount == 0) { try { long l2 = System.currentTimeMillis(); //提交批处理 pst.executeBatch(); this.currentConnection.commit(); pst.clearBatch(); System.out.println("清理"); SpringMVCContextHolder.getSystemLogger().info("执行批处理并提交成功,共耗时:" + (System.currentTimeMillis() - l2) + "ms"); dataNumber = 0; } catch (Exception e) { //批处理错误 e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); SpringMVCContextHolder.getSystemLogger().error("执行批处理并提交错误,插入目标:" + systemTable); } } } totalCount -= pageDataNumber; //查询页数+1 if (resultSet != null && resultSet.getResultSet() != null && !resultSet.getResultSet().isClosed()) { resultSet.getResultSet().close(); } if (!this.threadSelect) { this.currentPage++; } } else { if (this.totalPage == -1) { break; } else { this.currentPage++; } } } while ((this.totalNumberSelect == true && (totalPage - currentPage) >= 0) || (this.totalNumberSelect != true && (pageDataNumber > 0 && pageDataNumber % this.pageSize == 0))); try { //将不足每批次提交的数据最后做提交处理 //提交批处理 pst.executeBatch(); this.currentConnection.commit(); pst.clearBatch(); SpringMVCContextHolder.getSystemLogger().info("处理数据完成,插入目标:" + systemTable + ",共耗时:" + (System.currentTimeMillis() - startTime) + "ms"); } catch (Exception e) { //批处理错误 e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); SpringMVCContextHolder.getSystemLogger().error("处理数据错误,插入目标:" + systemTable); } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); SpringMVCContextHolder.getSystemLogger().error("处理数据错误,插入目标:" + systemTable); } // String[] columnNames = getColumnNames(resultSet); // Map> systemFieldMap = getSystemFieldByCache(systemTable); // //比较字段在缓存中是否存在 // this.comparisonFieldName(systemTable, columnNames, fieldNameList, systemFieldMap); } public void closeConnection() { try { if (this.currentConnection != null && !this.currentConnection.isClosed()) { this.currentConnection.close(); } if (this.sourceConnection != null && !this.sourceConnection.isClosed()) { this.sourceConnection.close(); } } catch (Exception e) { } } }