package com.product.data.sync.util;
|
|
import cn.hutool.core.util.IdUtil;
|
import com.google.common.collect.Lists;
|
import com.google.common.collect.Maps;
|
import com.product.common.lang.DateUtils;
|
import com.product.common.lang.StringUtils;
|
import com.product.common.utils.spring.SpringUtils;
|
import com.product.core.cache.DataPoolCacheImpl;
|
import com.product.core.config.Global;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.dao.support.BaseDaoImpl;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.sync.config.CmnConst;
|
import com.product.data.sync.config.SystemCode;
|
import com.product.util.CallBack;
|
import com.product.util.CallBackValue;
|
|
import java.sql.*;
|
import java.util.*;
|
|
/**
|
* @desc 批处理数据
|
* @Author cheng
|
* @Date 2022/1/19 10:17
|
* @Desc
|
*/
|
public class BatchData {
|
|
private Map<String, Object> defaultValueMap = new HashMap<>();
|
/**
|
* 当前连接
|
*/
|
private Connection currentConnection;
|
/**
|
* 来源连接
|
*/
|
private Connection sourceConnection;
|
|
/**
|
* 批处理条数
|
*/
|
private int batchCount;
|
/**
|
* 每页查询条数
|
*/
|
private int pageSize;
|
|
private BaseDao baseDao;
|
|
private ExceptionLog exceptionLog;
|
/**
|
* 系统表
|
*/
|
private String systemTable;
|
/**
|
* 数据来源表
|
*/
|
private String sourceTable;
|
|
private boolean isUpdate;
|
|
private boolean isDelete;
|
|
private FieldSetEntity syncFieldSet;
|
/**
|
* 要添加的字段结合
|
*/
|
private String[] columnNames;
|
|
private String templateSql;
|
|
private int currentPage = 1;
|
|
private boolean threadSelect = false;
|
|
|
private String mainTableName;
|
|
|
private ThreadSelectManager threadSelectManager;
|
|
|
private boolean totalNumberSelect = false;
|
|
private int totalPage = -1;
|
|
public Connection getCurrentConnection() {
|
return currentConnection;
|
}
|
|
public Connection getSourceConnection() {
|
return sourceConnection;
|
}
|
|
public BaseDao getBaseDao() {
|
return baseDao;
|
}
|
|
public String getSystemTable() {
|
return systemTable;
|
}
|
|
public String getSourceTable() {
|
return sourceTable;
|
}
|
|
public int getCurrentPage() {
|
return currentPage;
|
}
|
|
public int getPageSize() {
|
return pageSize;
|
}
|
|
public Map<String, Object> getDefaultValueMap() {
|
return defaultValueMap;
|
}
|
|
public String[] getColumnNames() {
|
return columnNames;
|
}
|
|
public void setColumnNames(String[] columnNames) {
|
this.columnNames = columnNames;
|
}
|
|
|
public int getTotalPage() {
|
return totalPage;
|
}
|
|
|
/**
|
* 默认构造
|
* 批处理 2000
|
* 每页查询 100000
|
*/
|
public BatchData(FieldSetEntity syncConfig) {
|
initOperation(syncConfig);
|
currentConnection = ConnectionManager.getConnection();
|
pageSize = 100000;
|
batchCount = 2000;
|
}
|
|
|
/**
|
* 初始构造
|
* 初始化连接
|
*
|
* @param batchCount 批处理条数
|
* @param pageSize 每页查询条数
|
*/
|
public BatchData(FieldSetEntity syncConfig, int batchCount, int pageSize) {
|
initOperation(syncConfig);
|
currentConnection = ConnectionManager.getConnection();
|
this.batchCount = batchCount;
|
this.pageSize = pageSize;
|
}
|
|
/**
|
* 初始化默认值
|
*/
|
private void initOperation(FieldSetEntity syncConfig) {
|
this.syncFieldSet = syncConfig;
|
defaultValueMap.put(CmnConst.CREATED_BY, 1);
|
defaultValueMap.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime());
|
defaultValueMap.put("org_level_uuid", "00000000-0000-0000-0000-000000000000");
|
CallBackValue<String> getUuidMethod = (o) -> IdUtil.randomUUID();
|
defaultValueMap.put("uuid", getUuidMethod);
|
this.baseDao = SpringUtils.getBean(BaseDaoImpl.class);
|
this.exceptionLog = SpringUtils.getBean(ExceptionLog.class);
|
getSyncParams();
|
}
|
|
/**
|
* 批处理添加
|
* 自动生成uuid 默认字段
|
*
|
* @param resultSet 结果集
|
* @param fieldNames 要插入的字段集合 (可选) 不传入该参数则添加查询结果集中所有的字段 并验证该字段在系统表中是否存在(product_sys_datamodel_field)表
|
*/
|
public void batchAdd(ResultSet resultSet, Collection<String> fieldNames) {
|
if (resultSet != null && StringUtils.isEmpty(systemTable)) {
|
// 字段set
|
List<String> fieldNameList = fieldNames == null ? Lists.newArrayList() : Lists.newArrayList(fieldNames.iterator());
|
if (fieldNameList == null || fieldNameList.size() <= 0) {
|
String[] columnNames = getColumnNames(resultSet);
|
Map<String, Map<Object, Object>> systemFieldMap = getSystemFieldByCache(systemTable);
|
//比较字段在缓存中是否存在
|
this.comparisonFieldName(systemTable, columnNames, fieldNameList, systemFieldMap);
|
//添加默认字段
|
defaultValueMap.forEach((k, v) -> {
|
//判断默认字段是否在缓存中 或字段是已在查询结果中
|
if (systemFieldMap.get(k) != null) {
|
fieldNameList.add(k);
|
}
|
});
|
}
|
try {
|
|
// 判断是否有待添加的字段
|
if (fieldNameList.size() > 0) {
|
String templateSql = getInsertTemplateSql(systemTable, fieldNameList);
|
//将结果集移到最后一行
|
resultSet.last();
|
//记录总行数
|
int totalRow = resultSet.getRow();
|
//再见结果集移到第一行
|
resultSet.first();
|
while (resultSet.next()) {
|
|
}
|
}
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
|
|
/**
|
* 获取插入字段
|
*
|
* @param resultSet
|
* @param fieldNameList
|
* @return
|
*/
|
private List<String> getInsertFieldNames(CustomResultSet resultSet, List<String> fieldNameList) {
|
if (fieldNameList == null) {
|
fieldNameList = new ArrayList<>();
|
}
|
if (this.columnNames == null || this.columnNames.length <= 0) {
|
String[] columnNames = getColumnNames(resultSet.getResultSet());
|
this.columnNames = columnNames;
|
}
|
Map<String, Map<Object, Object>> systemFieldMap = getSystemFieldByCache(systemTable);
|
//比较字段在缓存中是否存在
|
this.comparisonFieldName(systemTable, columnNames, fieldNameList, systemFieldMap);
|
//添加默认字段
|
List<String> finalFieldNameList = fieldNameList;
|
List<String> deleteDefaultKey = new ArrayList<>();
|
for (Map.Entry<String, Object> v : defaultValueMap.entrySet()) {
|
//判断默认字段是否在缓存中 或字段是已在查询结果中
|
if (systemFieldMap.get(v.getKey()) != null) {
|
finalFieldNameList.add(v.getKey());
|
} else {
|
deleteDefaultKey.add(v.getKey());
|
}
|
}
|
if (deleteDefaultKey.size() > 0) {
|
for (String key : deleteDefaultKey) {
|
defaultValueMap.remove(key);
|
}
|
|
}
|
return finalFieldNameList;
|
}
|
|
private String getInsertTemplateSql(String systemTable, List<String> fieldNameList) {
|
StringBuilder sqlTemplate = new StringBuilder();
|
sqlTemplate.append(" INSERT INTO `").append(systemTable).append("` (");
|
int i = 0;
|
Iterator<String> iterator = fieldNameList.iterator();
|
// 字段
|
StringBuilder fieldTemplate = new StringBuilder();
|
//占位符
|
StringBuilder placeholderTemplate = new StringBuilder();
|
while (iterator.hasNext()) {
|
String fieldName = iterator.next();
|
if (i > 0) {
|
fieldTemplate.append(",");
|
placeholderTemplate.append(",");
|
} else {
|
i++;
|
}
|
placeholderTemplate.append("?");
|
fieldTemplate.append("`").append(fieldName).append("`");
|
}
|
sqlTemplate.append(fieldTemplate).append(") values (").append(placeholderTemplate).append(")");
|
return sqlTemplate.toString();
|
}
|
|
private void readResultSet(ResultSet resultSet, List<String> columnNames) {
|
}
|
|
/**
|
* 比较字段在系统中是否存在
|
*
|
* @param columnNames 需要比对的字段
|
* @param fieldNameList 比较后返回的结果字段集合
|
* @param systemFieldMap 系统字段map 字段名为 key value 为field缓存表的记录 对比后会删除成功对比的字段
|
*/
|
private void comparisonFieldName(String systemTable, String[] columnNames, List<String> fieldNameList, Map<String, Map<Object, Object>> systemFieldMap) {
|
for (String columnName : columnNames) {
|
if (systemFieldMap.get(columnName) == null) {
|
SpringMVCContextHolder.getSystemLogger().info("警告:查询结果列在系统表中不存在,跳过该字段:" + systemTable + "." + columnName);
|
continue;
|
} else if (!fieldNameList.contains(columnName)) {
|
//字段在集合中不存在
|
fieldNameList.add(columnName);
|
}
|
defaultValueMap.remove(columnName);
|
systemFieldMap.remove(columnName);
|
}
|
}
|
|
/**
|
* 获取字段缓存根据表名
|
*
|
* @param tableName 表名
|
* @return
|
* @throws BaseException
|
*/
|
private Map<String, Map<Object, Object>> getSystemFieldByCache(String tableName) throws BaseException {
|
DataPoolCacheImpl dataPoolCache = DataPoolCacheImpl.getInstance();
|
DataTableEntity tableInfo = dataPoolCache.getCacheData("所有表信息", new String[]{tableName});
|
if (DataTableEntity.isEmpty(tableInfo) || StringUtils.isEmpty(tableInfo.getString(0, CmnConst.UUID))) {
|
throw new BaseException(SystemCode.GET_TABLE_CACHE_FAIL.getValue(), SystemCode.GET_TABLE_CACHE_FAIL.getText());
|
}
|
String table_uuid = tableInfo.getString(0, CmnConst.UUID);
|
DataTableEntity fieldInfo = dataPoolCache.getCacheData("表字段信息", new String[]{table_uuid});
|
if (DataTableEntity.isEmpty(fieldInfo)) {
|
throw new BaseException(SystemCode.GET_FIELD_CACHE_FAIL.getValue(), SystemCode.GET_FIELD_CACHE_FAIL.getText());
|
}
|
Map<String, Map<Object, Object>> resultMap = new HashMap<>();
|
for (int i = 0; i < fieldInfo.getRows(); i++) {
|
FieldSetEntity fs = fieldInfo.getFieldSetEntity(i);
|
Map<Object, Object> values = fs.getValues();
|
String field_name = fs.getString(CmnConst.FIELD_NAME);
|
resultMap.put(field_name, Maps.newHashMap(values));
|
}
|
|
return resultMap;
|
}
|
|
/**
|
* 获取结果集所有的列名
|
*
|
* @param resultSet 结果集
|
* @return
|
* @throws BaseException
|
*/
|
public static String[] getColumnNames(ResultSet resultSet) throws BaseException {
|
try {
|
ResultSetMetaData metaData = resultSet.getMetaData();
|
if (metaData != null) {
|
int columnCount = getColumnCount(resultSet);
|
String[] fieldName = new String[columnCount];
|
for (int i = 1; i <= columnCount; i++) {
|
String columnName = metaData.getColumnLabel(i);
|
fieldName[i - 1] = columnName;
|
}
|
return fieldName;
|
}
|
throw new BaseException(SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getValue(), SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getText());
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw new BaseException(SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getValue(), SystemCode.GET_RESULTSET_COLUMN_NAME_FAIL.getText() + (e.getMessage() != null ? "," + e.getMessage() : ""));
|
}
|
}
|
|
/**
|
* 获取结果集中列的数量
|
*
|
* @param resultSet 结果集
|
* @return
|
* @throws BaseException
|
*/
|
public static int getColumnCount(ResultSet resultSet) throws BaseException {
|
try {
|
ResultSetMetaData metaData = resultSet.getMetaData();
|
if (metaData != null) {
|
return metaData.getColumnCount();
|
}
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw new BaseException(SystemCode.GET_RESULTSET_COLUMN_COUNT_FAIL.getValue(), SystemCode.GET_RESULTSET_COLUMN_COUNT_FAIL.getText() + (e.getMessage() != null ? "," + e.getMessage() : ""));
|
}
|
return 0;
|
}
|
|
|
private void getSyncParams() {
|
DataTableEntity dataTableEntity = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG_SUB, "sync_config_uuid = ?", new String[]{});
|
String idField = dataTableEntity.getString(0, CmnConst.DATA_ORIGIN_FIELD);
|
//数据源表名
|
String dataOriginName = syncFieldSet.getString(CmnConst.DATA_ORIGIN_NAME);
|
this.sourceTable = dataOriginName;
|
//获取系统表
|
String tableName = syncFieldSet.getString(CmnConst.SYSTEM_TABLE_NAME);
|
this.systemTable = tableName;
|
//保存前调用的方法
|
String savePreEvent = syncFieldSet.getString(CmnConst.SAVE_PRE_EVENT);
|
//保存后调用的方法
|
String postSaveEvent = syncFieldSet.getString(CmnConst.POST_SAVE_EVENT);
|
//查询条件
|
String syncCondition = syncFieldSet.getString(CmnConst.SYNC_CONDITION);
|
//是否修改
|
this.isUpdate = syncFieldSet.getBoolean(CmnConst.IS_UPDATE);
|
//是否删除
|
this.isUpdate = syncFieldSet.getBoolean(CmnConst.IS_DELETE);
|
// //删除标识值
|
// String deleteValue = syncFieldSet.getString(CmnConst.DELETE_VALUE);
|
//获取数据库连接信息
|
// String configUuid = syncFieldSet.getString(CmnConst.DATABASE_CONFIG_UUID);
|
String databaseType = syncFieldSet.getString(CmnConst.DATABASE_TYPE);//数据库类型
|
String ipAddress = syncFieldSet.getString(CmnConst.IP_ADDRESS);//ip地址
|
String databaseName = syncFieldSet.getString(CmnConst.DATABASE_NAME);//数据库名称
|
String portNumber = syncFieldSet.getString(CmnConst.PORT_NUMBER);//端口号
|
String userName = syncFieldSet.getString(CmnConst.USER_NAME);//用户名
|
String userPassword = syncFieldSet.getString(CmnConst.USER_PASSWORD);//密码
|
String instantiation = syncFieldSet.getString(CmnConst.INSTANTIATION);//实例名
|
String url;
|
String driver;
|
//创建日志
|
String logUuid = exceptionLog.addExceptionLog(syncFieldSet.getUUID(), dataOriginName, tableName);
|
StringBuilder sql = new StringBuilder();
|
if ("mysql".equals(databaseType)) {
|
driver = "com.mysql.cj.jdbc.Driver";
|
url = "jdbc:mysql://" + ipAddress + ":" + portNumber + "/" + databaseName + "?useSSL=false&serverTimezone=UTC";
|
|
if (StringUtils.isEmpty(syncCondition)) {
|
sql.append("SELECT ? FROM ? LIMIT ?,?");//参数失败 改为字符串拼接
|
} else {
|
sql.append("SELECT ? FROM ? ").append(" WHERE ").append(syncCondition).append(" LIMIT ?,?");//参数失败 改为字符串拼接
|
}
|
} else if ("oracle".equals(databaseType)) {
|
driver = "oracle.jdbc.driver.OracleDriver";
|
url = "jdbc:oracle:thin:@" + ipAddress + ":" + portNumber + ":orcl";
|
if (StringUtils.isEmpty(syncCondition)) {
|
sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ROWNUM <= ?) t2 WHERE t2.rowno > ?");
|
} else {
|
sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ").append(syncCondition).append("AND ROWNUM <= ?) t2 WHERE t2.rowno > ?");
|
}
|
} else if ("informix".equals(databaseType)) {
|
driver = "com.informix.jdbc.IfxDriver";
|
url = "jdbc:informix-sqli://" + ipAddress + ":" + portNumber + "/" + databaseName + ":informixserver=" + instantiation;
|
if (StringUtils.isEmpty(syncCondition)) {
|
//跳过?行,获取?行 字段名 表名
|
sql.append(" SELECT SKIP ? FIRST ? ? FROM ? ");
|
} else {
|
sql.append(" SELECT SKIP ? FIRST ? ? FROM ? WHERE ").append(syncCondition);
|
}
|
} else if ("sqlserver".equals(databaseType)) {
|
|
SpringMVCContextHolder.getSystemLogger().error("唯一字段获取不严谨:" + idField);
|
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
|
url = "jdbc:sqlserver://" + ipAddress + ":" + portNumber + ";DataBaseName=" + databaseName;
|
if (StringUtils.isEmpty(syncCondition)) {
|
//? 查询条数 ? 查询字段 ? 表名 idField 唯一字段名 ? 开始位置 idField 唯一字段名 ? 表名
|
sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ?)");
|
} else {
|
sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ? WHERE ").append(syncCondition).append(" ) AND ").append(syncCondition);
|
}
|
} else {
|
BaseException baseException = new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText());
|
exceptionLog.upExceptionLog(null, logUuid, baseException, 0, 0, 0, 0, 0);
|
throw baseException;
|
}
|
this.sourceConnection = this.getSourceConnection(driver, ipAddress, url, userName, userPassword);
|
}
|
|
private Connection getSourceConnection(String driver, String ipAddress, String url, String userName, String userPassword) {
|
// 判断 ip地址是否为 / 当ip地址 = / 时 则认为本地同步
|
String sourceType = Global.getSystemConfig("data.source.type", "");
|
if (sourceType != null && "/".equals(ipAddress)) {
|
driver = Global.getSystemConfig("data.source.driver", "");
|
url = Global.getSystemConfig("data.source.url", "");
|
userName = Global.getSystemConfig("data.source.user", "");
|
userPassword = Global.getSystemConfig("data.source.password", "");
|
}
|
// 远程地址获取数据库连接
|
return ConnectionManager.getConnection(driver, url, userName, userPassword);
|
}
|
|
|
/**
|
* @param sql 需要查询的sql
|
* @param systemTable 目标表
|
* @param pageSize 每页查询条数
|
* @param batchCount 每批次处理数量
|
*/
|
public BatchData(String sql, String systemTable, int pageSize, int batchCount) {
|
this.pageSize = pageSize;
|
this.batchCount = batchCount;
|
this.currentConnection = ConnectionManager.getConnection();
|
this.sourceConnection = ConnectionManager.getConnection();
|
this.templateSql = sql;
|
this.systemTable = systemTable;
|
defaultValueMap.put(CmnConst.CREATED_BY, 1);
|
defaultValueMap.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime());
|
defaultValueMap.put("org_level_uuid", "00000000-0000-0000-0000-000000000000");
|
CallBackValue getUuidMethod = (o) -> IdUtil.randomUUID();
|
defaultValueMap.put("uuid", getUuidMethod);
|
}
|
|
/**
|
* 获取分页
|
*
|
* @param currentPage
|
* @return
|
*/
|
private String getMysqlLimit(int currentPage) {
|
return " limit " + (currentPage == 1 ? 0 : (currentPage - 1) * this.pageSize) + "," + this.pageSize;
|
}
|
|
|
public static void main(String[] args) {
|
String sql = "SELECT * FROM TABLE [[TABLE_NAME]]";
|
String substring = sql.substring(sql.indexOf("[[") + 2, sql.indexOf("]]"));
|
System.out.println(substring);
|
}
|
|
/**
|
* 获取查询sql
|
*
|
* @param currentPage
|
* @return
|
*/
|
public String getQuerySql(int currentPage) {
|
String mysqlLimit = this.getMysqlLimit(currentPage);
|
if (!StringUtils.isEmpty(this.templateSql)) {
|
int startIndex = this.templateSql.indexOf("[[");
|
if (startIndex != -1 && this.templateSql.indexOf("]]") > startIndex) {
|
this.mainTableName = this.templateSql.substring(startIndex + 2, this.templateSql.indexOf("]]") - "_limit".length());
|
return this.templateSql.replace("[[" + mainTableName + "_limit" + "]]", mysqlLimit);
|
}
|
}
|
return templateSql + mysqlLimit;
|
}
|
|
|
/**
|
* 批处理导入数据 用于数据同步结束后数据处理方法
|
*
|
* @param isDelete 是否删除目标表数据
|
* @param callBack 自定义回调 传入每条数据的 map key=字段名称 value=值 不可改变map中的key (增加或修改) 只能改变值
|
*/
|
public void batchImprovedCoverage(boolean isDelete, CallBack<Object> callBack) {
|
long startTime = System.currentTimeMillis();
|
SpringMVCContextHolder.getSystemLogger().info("开始处理同步表数据,准备插入数据表:" + this.systemTable);
|
int pageDataNumber;
|
String templateSql = null;
|
int totalCount = -1;
|
List<String> insertFieldNames = null;
|
try {
|
if (isDelete) {
|
PreparedStatement preparedStatement = this.currentConnection.prepareStatement("truncate table " + this.systemTable);
|
preparedStatement.execute();
|
preparedStatement.close();
|
}
|
this.currentConnection.setAutoCommit(false);
|
PreparedStatement pst = null;
|
int dataNumber = 0;
|
List<String> fieldNames = new ArrayList<>();
|
do {
|
pageDataNumber = 0;
|
if ((this.currentPage - 1) % 3 == 0) {
|
this.sourceConnection.close();
|
this.sourceConnection = ConnectionManager.getConnection();
|
}
|
String query_sql = getQuerySql(this.currentPage);
|
if (totalCount == -1 && this.mainTableName != null) {
|
//开启关联主表分页查询
|
ResultSet resultSet = this.currentConnection.prepareStatement("SELECT ifnull(COUNT(1),0) totalCount FROM " + this.mainTableName).executeQuery();
|
if (resultSet != null) {
|
if (resultSet.next()) {
|
//查询到总条数
|
totalCount = resultSet.getInt("totalCount");
|
if (totalCount > 0) {
|
double v = totalCount / Double.valueOf(this.pageSize);
|
this.totalPage = (int) Math.ceil(v);
|
if (this.totalPage > 0) {
|
//能够获取到总页数则开启线程查询
|
this.totalNumberSelect = true;
|
// threadSelectManager = new ThreadSelectManager(this);
|
// if (threadSelectManager.startQuery()) {
|
// this.threadSelect = true;
|
// }
|
}
|
}
|
}
|
}
|
}
|
long s = System.currentTimeMillis();
|
CustomResultSet resultSet;
|
if (!this.threadSelect) {
|
SpringMVCContextHolder.getSystemLogger().info("查询第 " + this.currentPage + " 页数据");
|
SpringMVCContextHolder.getSystemLogger().info("查询sql:" + query_sql);
|
resultSet = new CustomResultSet(this.sourceConnection.prepareStatement(query_sql).executeQuery());
|
SpringMVCContextHolder.getSystemLogger().info("查询第 " + this.currentPage + " 页数据结束,耗时:" + (System.currentTimeMillis() - s) + "ms");
|
|
} else {
|
resultSet = threadSelectManager.getNext();
|
SpringMVCContextHolder.getSystemLogger().info("获取下一页页数据结束,耗时:" + (System.currentTimeMillis() - s) + "ms");
|
}
|
if (resultSet != null) {
|
if (templateSql == null) {
|
// 获取添加字段
|
insertFieldNames = getInsertFieldNames(resultSet, null);
|
// 获取插入的sql模板
|
templateSql = getInsertTemplateSql(systemTable, insertFieldNames);
|
if (StringUtils.isEmpty(templateSql)) {
|
// 没有获取到insert 语句
|
throw new BaseException(new Exception(""));
|
}
|
pst = this.currentConnection.prepareStatement(templateSql);
|
}
|
if (this.columnNames == null || this.columnNames.length < 0) {
|
SpringMVCContextHolder.getSystemLogger().error("错误,获取插入字段集合失败");
|
throw new Exception("错误,获取插入字段集合失败");
|
}
|
while (resultSet.next()) {
|
pageDataNumber++;
|
dataNumber++;
|
int i = 1;
|
Map<String, Object> values = callBack != null ? new HashMap<>() : null;
|
//是否需要插入字段到集合中 回调不为空时
|
boolean isInsertField = fieldNames.size() >= 0 && callBack != null;
|
for (; i <= this.columnNames.length; i++) {
|
// 是否插入字段到集合
|
if (isInsertField) {
|
fieldNames.add(this.columnNames[i - 1]);
|
}
|
// 如果回调 不是空将value放入map
|
if (callBack != null) {
|
values.put(this.columnNames[i - 1], resultSet.getObject(this.columnNames[i - 1]));
|
} else {
|
pst.setObject(i, resultSet.getObject(this.columnNames[i - 1]));
|
}
|
}
|
for (Map.Entry<String, Object> v : defaultValueMap.entrySet()) {
|
Object value = v.getValue();
|
if (v.getValue() instanceof CallBackValue) {
|
CallBackValue<String> callback = (CallBackValue<String>) value;
|
value = callback.method(v.getKey());
|
}
|
// 如果回调 不是空将value放入map
|
if (callBack == null) {
|
pst.setObject(i, value);
|
} else {
|
values.put(v.getKey(), value);
|
}
|
i++;
|
}
|
if (callBack != null) {
|
int paramSize = values.size();
|
callBack.method(values);
|
if (values.size() != paramSize) {
|
throw new Exception("执行自定义回调后参数数量不一致,执行前参数数量:" + paramSize + ",执行后参数数量:" + values.size() + "\n\t" + values);
|
}
|
int j = 1;
|
//回调结束后将插入的值写入
|
for (; j <= insertFieldNames.size(); j++) {
|
Object o = values.get(insertFieldNames.get(j - 1));
|
pst.setObject(j, o);
|
}
|
}
|
pst.addBatch();
|
//已遍历的数量是否符合每批次提交的数量
|
if (dataNumber % this.batchCount == 0) {
|
try {
|
long l2 = System.currentTimeMillis();
|
//提交批处理
|
pst.executeBatch();
|
this.currentConnection.commit();
|
pst.clearBatch();
|
System.out.println("清理");
|
SpringMVCContextHolder.getSystemLogger().info("执行批处理并提交成功,共耗时:" + (System.currentTimeMillis() - l2) + "ms");
|
dataNumber = 0;
|
} catch (Exception e) {
|
//批处理错误
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
SpringMVCContextHolder.getSystemLogger().error("执行批处理并提交错误,插入目标:" + systemTable);
|
}
|
}
|
}
|
totalCount -= pageDataNumber;
|
//查询页数+1
|
if (resultSet != null && resultSet.getResultSet() != null && !resultSet.getResultSet().isClosed()) {
|
|
resultSet.getResultSet().close();
|
}
|
if (!this.threadSelect) {
|
this.currentPage++;
|
}
|
} else {
|
if (this.totalPage == -1) {
|
break;
|
} else {
|
this.currentPage++;
|
}
|
}
|
} while ((this.totalNumberSelect == true && (totalPage - currentPage) >= 0) || (this.totalNumberSelect != true && (pageDataNumber > 0 && pageDataNumber % this.pageSize == 0)));
|
try {
|
//将不足每批次提交的数据最后做提交处理
|
//提交批处理
|
pst.executeBatch();
|
this.currentConnection.commit();
|
pst.clearBatch();
|
SpringMVCContextHolder.getSystemLogger().info("处理数据完成,插入目标:" + systemTable + ",共耗时:" + (System.currentTimeMillis() - startTime) + "ms");
|
} catch (Exception e) {
|
//批处理错误
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
SpringMVCContextHolder.getSystemLogger().error("处理数据错误,插入目标:" + systemTable);
|
}
|
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
SpringMVCContextHolder.getSystemLogger().error("处理数据错误,插入目标:" + systemTable);
|
|
}
|
|
// String[] columnNames = getColumnNames(resultSet);
|
// Map<String, Map<Object, Object>> systemFieldMap = getSystemFieldByCache(systemTable);
|
// //比较字段在缓存中是否存在
|
// this.comparisonFieldName(systemTable, columnNames, fieldNameList, systemFieldMap);
|
}
|
|
public void closeConnection() {
|
try {
|
if (this.currentConnection != null && !this.currentConnection.isClosed()) {
|
this.currentConnection.close();
|
}
|
if (this.sourceConnection != null && !this.sourceConnection.isClosed()) {
|
this.sourceConnection.close();
|
}
|
} catch (Exception e) {
|
|
}
|
}
|
|
|
}
|