package com.product.data.sync.util;
|
|
import cn.hutool.core.util.ArrayUtil;
|
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.NumberUtil;
|
import com.google.common.collect.Lists;
|
import com.product.common.collect.ListUtils;
|
import com.product.common.collect.MapUtils;
|
import com.product.common.lang.DateUtils;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.config.Global;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.sync.config.CmnConst;
|
import com.product.data.sync.config.SystemCode;
|
import com.product.data.sync.service.ViewDataProcessService;
|
import com.product.data.sync.service.ide.IViewDataProcessService;
|
import com.product.util.BaseUtil;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.lang.reflect.InvocationTargetException;
|
import java.sql.Connection;
|
import java.sql.PreparedStatement;
|
import java.sql.ResultSet;
|
import java.sql.SQLException;
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
/**
|
* 定时执行 同步数据的类
|
*/
|
@Service
|
public class ScheduledTaskExecution extends AbstractBaseService {
|
|
@Autowired
|
public ExceptionLog exceptionLog;
|
@Autowired
|
public BaseDao baseDao;
|
|
@Override
|
public BaseDao getBaseDao() {
|
return baseDao;
|
}
|
|
@Override
|
public void setBaseDao(BaseDao baseDao) {
|
this.baseDao = baseDao;
|
}
|
|
@Autowired
|
IViewDataProcessService iViewDataProcessService;
|
|
public static void main(String[] args) {
|
String driver = "com.mysql.cj.jdbc.Driver";
|
String ipAddress = "127.0.0.1";
|
String portNumber = "3306";
|
String databaseName = "product_db_v2.0.0";
|
String instantiation = "";
|
String user = "root";
|
String pwd = "root123";
|
String url = "jdbc:mysql://127.0.0.1:3306/product_db_picc?autoReconnect=true&useUnicode=true&characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai";
|
try {
|
Connection connection = DataManipulationUtils.getConnection(driver, url, user, pwd);
|
connection.setAutoCommit(false);
|
PreparedStatement pst = connection.prepareStatement("delete from picc_sync_wf_t_task where taskid=? ");
|
Thread.currentThread().sleep(1000);
|
for (int i = 0; i < 60; i++) {
|
pst.setObject(1, i + "sd");
|
pst.addBatch();
|
}
|
pst.executeBatch();
|
connection.commit();
|
System.out.println(1);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 数据同方法
|
*
|
* @param uuid 连接的数据库表名信息表uuid
|
*/
|
public void getDataSync(String uuid) throws SQLException {
|
System.out.println(Thread.currentThread().getId());
|
logger.info("定时任务线程id:" + Thread.currentThread().getId());
|
//获取表明信息表
|
FieldSetEntity syncField = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG, uuid, false);
|
String syncUuid = syncField.getUUID();
|
|
//sync_config_uuid
|
DataTableEntity dataTableEntity = baseDao.listTable(CmnConst.PRODUCT_SYS_DATABASE_SYNC_CONFIG_SUB, "sync_config_uuid = ?", new String[]{syncUuid});
|
Map<String, String> sileMap = MapUtils.newHashMap();
|
Map<String, String> syncMap = MapUtils.newHashMap();
|
Map<String, String> deleteMap = MapUtils.newHashMap();
|
//是否有uuid
|
boolean is_uuid = false;
|
//sqlServer 第一个子表字段为ID字段
|
String idField = dataTableEntity.getFieldSetEntity(0).getString(CmnConst.DATA_ORIGIN_FIELD);
|
for (int i = 0; i < dataTableEntity.getRows(); i++) {
|
FieldSetEntity fse = dataTableEntity.getFieldSetEntity(i);
|
String isSole = fse.getString(CmnConst.IS_SOLE);
|
String systemField = fse.getString(CmnConst.SYSTEM_FIELD); //本系统表字段
|
String dataOriginField = fse.getString(CmnConst.DATA_ORIGIN_FIELD); //外部数据源表字段
|
//是否删除
|
if ("1".equals(syncField.getString(CmnConst.IS_DELETE))) {
|
//是否是删除字段
|
if ("1".equals(fse.getString(CmnConst.IS_DELETE))) {
|
deleteMap.put(dataOriginField, systemField);
|
}
|
}
|
if ("uuid".equals(systemField)) {
|
is_uuid = true;
|
}
|
//1 是唯一字段
|
if ("1".equals(isSole)) {
|
sileMap.put(dataOriginField, systemField);
|
} else {
|
syncMap.put(dataOriginField, systemField);
|
}
|
}
|
this.runDataSync(syncField, sileMap, syncMap, deleteMap, is_uuid, idField);
|
}
|
|
|
/**
|
* @param syncField 同步配置表
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param is_uuid 是否有uuid
|
*/
|
public void runDataSync(FieldSetEntity syncField, Map<String, String> sileMap, Map<String, String> syncMap, Map<String, String> deleteMap, boolean is_uuid, String idField) {
|
//数据源表名
|
String dataOriginName = syncField.getString(CmnConst.DATA_ORIGIN_NAME);
|
//获取系统表
|
String tableName = syncField.getString(CmnConst.SYSTEM_TABLE_NAME);
|
//保存前调用的方法
|
String savePreEvent = syncField.getString(CmnConst.SAVE_PRE_EVENT);
|
//保存后调用的方法
|
String postSaveEvent = syncField.getString(CmnConst.POST_SAVE_EVENT);
|
//查询条件
|
String syncCondition = syncField.getString(CmnConst.SYNC_CONDITION);
|
//是否修改
|
String isUpdate = syncField.getString(CmnConst.IS_UPDATE);
|
//是否删除
|
String isDelete = syncField.getString(CmnConst.IS_DELETE);
|
//删除标识值
|
String deleteValue = syncField.getString(CmnConst.DELETE_VALUE);
|
//获取数据库连接信息
|
String configUuid = syncField.getString(CmnConst.DATABASE_CONFIG_UUID);
|
//同步类型 1增量 2覆盖
|
String syncType = syncField.getString(CmnConst.SYNC_TYPE);
|
FieldSetEntity configField = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATABASE_CONNECTION_CONFIG, configUuid, false);
|
String databaseType = configField.getString(CmnConst.DATABASE_TYPE);//数据库类型
|
String ipAddress = configField.getString(CmnConst.IP_ADDRESS);//ip地址
|
String databaseName = configField.getString(CmnConst.DATABASE_NAME);//数据库名称
|
String portNumber = configField.getString(CmnConst.PORT_NUMBER);//端口号
|
String userName = configField.getString(CmnConst.USER_NAME);//用户名
|
String userPassword = configField.getString(CmnConst.USER_PASSWORD);//密码
|
String instantiation = configField.getString(CmnConst.INSTANTIATION);//实例名
|
String fileName = "";
|
//创建日志
|
String logUuid = exceptionLog.addExceptionLog(syncField.getUUID(), dataOriginName, tableName);
|
//获取查询的字段
|
for (String key : sileMap.keySet()) {
|
fileName = fileName + key + ",";
|
}
|
for (String key : syncMap.keySet()) {
|
fileName = fileName + key + ",";
|
}
|
fileName = fileName.substring(0, fileName.length() - 1);
|
//连接类
|
String diver;
|
//连接url
|
String url;
|
//编辑sql
|
StringBuffer sql = new StringBuffer();
|
//参数
|
Map<String, String> map = Collections.synchronizedMap(new HashMap<>());
|
map.put("syncCondition", syncCondition);
|
if ("mysql".equals(databaseType)) {
|
diver = "com.mysql.cj.jdbc.Driver";
|
url = "jdbc:mysql://" + ipAddress + ":" + portNumber + "/" + databaseName + "?useSSL=false&serverTimezone=UTC";
|
|
if (BaseUtil.strIsNull(syncCondition)) {
|
sql.append("SELECT ? FROM ? LIMIT ?,?");//参数失败 改为字符串拼接
|
} else {
|
sql.append("SELECT ? FROM ? ").append(" WHERE ").append(syncCondition).append(" LIMIT ?,?");//参数失败 改为字符串拼接
|
}
|
} else if ("oracle".equals(databaseType)) {
|
diver = "oracle.jdbc.driver.OracleDriver";
|
url = "jdbc:oracle:thin:@" + ipAddress + ":" + portNumber + ":orcl";
|
if (BaseUtil.strIsNull(syncCondition)) {
|
sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ROWNUM <= ?) t2 WHERE t2.rowno > ?");
|
} else {
|
sql.append("SELECT ? FROM (SELECT ROWNUM AS rowno, t.* FROM ? t WHERE ").append(syncCondition).append("AND ROWNUM <= ?) t2 WHERE t2.rowno > ?");
|
}
|
} else if ("informix".equals(databaseType)) {
|
diver = "com.informix.jdbc.IfxDriver";
|
url = "jdbc:informix-sqli://" + ipAddress + ":" + portNumber + "/" + databaseName + ":informixserver=" + instantiation;
|
if (BaseUtil.strIsNull(syncCondition)) {
|
//跳过?行,获取?行 字段名 表名
|
sql.append(" SELECT SKIP ? FIRST ? ? FROM ? ");
|
} else {
|
sql.append(" SELECT SKIP ? FIRST ? ? FROM ? WHERE ").append(syncCondition);
|
}
|
} else if ("sqlserver".equals(databaseType)) {
|
SpringMVCContextHolder.getSystemLogger().error("唯一字段获取不严谨:" + idField);
|
diver = "com.microsoft.sqlserver.jdbc.SQLServerDriver";
|
url = "jdbc:sqlserver://" + ipAddress + ":" + portNumber + ";DataBaseName=" + databaseName;
|
if (BaseUtil.strIsNull(syncCondition)) {
|
//? 查询条数 ? 查询字段 ? 表名 idField 唯一字段名 ? 开始位置 idField 唯一字段名 ? 表名
|
sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ?)");
|
} else {
|
sql.append("SELECT TOP ? ? FROM ? WHERE ").append(idField).append(" NOT IN(SELECT TOP ? ").append(idField).append(" FROM ? WHERE ").append(syncCondition).append(" ) AND ").append(syncCondition);
|
}
|
} else {
|
BaseException baseException = new BaseException(SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getValue(), SystemCode.SYSTEM_UNKNOWN_DATABASE_TYPE.getText());
|
exceptionLog.upExceptionLog(null, logUuid, baseException, 0, 0, 0, 0, 0);
|
return;
|
}
|
//1增量同步 2覆盖同步
|
// if ("2".equals(syncType)) {
|
// //删除表所有数据
|
// baseDao.executeUpdate("delete from " + tableName);
|
// }
|
//1增量或2覆盖同步
|
map.put(CmnConst.SYNC_TYPE, syncType);
|
map.put("databaseType", databaseType);
|
map.put("url", url);
|
map.put("sql", sql.toString());
|
map.put("fileName", fileName);
|
map.put("tableName", tableName);
|
map.put("dataOriginName", dataOriginName);
|
map.put("deleteValue", deleteValue);
|
Integer currentPage = 1;
|
map.put("currentPage", String.valueOf(currentPage));
|
map.put("pageSize", "1000");
|
map.put("logUuid", logUuid);
|
|
Connection conn = null;
|
//获取jdbc连接
|
try {
|
// 判断 ip地址是否为 / 当ip地址 = / 时 则认为本地同步
|
String sourceType = Global.getSystemConfig("data.source.type", "");
|
if (sourceType != null && sourceType.equalsIgnoreCase(databaseType) && "/".equals(ipAddress)) {
|
diver = Global.getSystemConfig("data.source.driver", "");
|
url = Global.getSystemConfig("data.source.url", "");
|
userName = Global.getSystemConfig("data.source.user", "");
|
userPassword = Global.getSystemConfig("data.source.password", "");
|
}
|
// 远程地址获取数据库连接
|
conn = DataManipulationUtils.getConnection(diver, url, userName, userPassword);
|
} catch (BaseException | ClassNotFoundException | SQLException e) {
|
exceptionLog.upExceptionLog(null, logUuid, e, 0, 0, 0, 0, 0);
|
return;
|
}
|
List<Object> changeDataRecord = null;
|
//新增同时还要修改和删除
|
if ("1".equals(isUpdate) && "1".equals(isDelete) && deleteMap.size() > 0) {
|
if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) {
|
this.runnableBatchSaveDeleteFieldSetData(conn, map, sileMap, syncMap, deleteMap, savePreEvent, postSaveEvent, is_uuid);
|
} else {
|
try {
|
//TODO
|
changeDataRecord = this.batchSaveDeleteData(conn, map, sileMap, syncMap, deleteMap);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
//新增同时还要修改
|
} else if ("1".equals(isUpdate)) {
|
//savePreEvent 保存前
|
//postSaveEvent 保存后
|
if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) {
|
// this.batchSaveFieldSetData(conn, map, sileMap, syncMap, savePreEvent, postSaveEvent, is_uuid);
|
//todo
|
changeDataRecord = this.runnableBatchSaveFieldSetData(conn, map, sileMap, syncMap, savePreEvent, postSaveEvent, is_uuid);
|
} else {
|
// 明天处理 2022年1月13日21:59:11 !!!!!!!!!!!!!!!!!!!!
|
changeDataRecord = this.batchSaveData(conn, map, sileMap, syncMap);
|
}
|
//新增同时还要删除
|
} else if ("1".equals(isDelete) && deleteMap.size() > 0) {
|
if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) {
|
changeDataRecord = this.runnableBatchAddDeleteFieldSetData(conn, map, sileMap, syncMap, deleteMap, savePreEvent, postSaveEvent, is_uuid);
|
} else {
|
changeDataRecord = this.batchAddDeleteData(conn, map, sileMap, syncMap, deleteMap);
|
}
|
//新增
|
} else {
|
if (!BaseUtil.strIsNull(savePreEvent) || !BaseUtil.strIsNull(postSaveEvent)) {
|
changeDataRecord = this.runnableBatchAddFieldSetData(conn, map, sileMap, syncMap, savePreEvent, postSaveEvent, is_uuid);
|
} else {
|
if ("2".equals(syncType)) {
|
changeDataRecord = this.batchAddData(conn, map, sileMap, syncMap);
|
} else {
|
changeDataRecord = this.batchAddData(conn, map, sileMap, syncMap, syncType);
|
}
|
|
}
|
}
|
if (changeDataRecord != null && changeDataRecord.size() > 0) {
|
syncDataAfterDispose(changeDataRecord, tableName);
|
}
|
}
|
|
/**
|
* 同步数据之后处理
|
*
|
* @throws BaseException
|
*/
|
private void syncDataAfterDispose(List<Object> recordList, String tableName) throws BaseException {
|
//recordList 此变量最大长度为 3 其中 下标 0 = 新增的数据 1 = 修改的数据 2 = 删除的数据
|
try {
|
if (recordList != null) {
|
// 新增或修改的数据 uuid
|
List<String> changeDataList = new ArrayList<>();
|
List<Integer> changeDataIds = new ArrayList<>();
|
// 修改数据的条件 (根据此条件去查询修改过数据的uuid)
|
StringBuffer updateDataFilter = new StringBuffer();
|
// 遍历记录
|
for (int i = 0; i < recordList.size(); i++) {
|
Object object = recordList.get(i);
|
//判断记录类型是为 List (传入的时候应为List)
|
if (object instanceof List) {
|
List<Object> objects = (List<Object>) object;
|
// 判断objects 中泛型的数据类型
|
if (objects != null && objects.size() > 0) {
|
Object objectValue = null;
|
// 循环取值 当取到的值不为空时赋值 停止循环
|
for (int j = 0; j < objects.size(); j++) {
|
if (objects.get(j) != null) {
|
objectValue = objects.get(j);
|
break;
|
}
|
}
|
// 没有取到值 跳过
|
if (objectValue == null) {
|
continue;
|
}
|
// 判断取值的类型
|
if (objectValue instanceof String) {
|
// 根据uuid 新增、修改、删除的数据
|
if (i < 2) {
|
// 根据uuid 修改或新增的数据
|
for (Object o : recordList) {
|
if (o == null) {
|
continue;
|
}
|
changeDataList.addAll((List<String>) o);
|
}
|
} else {
|
// 预留删除的数据处理
|
}
|
} else if (objectValue instanceof Integer) {
|
//传入的就是id
|
// 根据uuid 新增、修改、删除的数据
|
if (i < 2) {
|
// 根据uuid 修改或新增的数据
|
for (Object o : recordList) {
|
if (o == null) {
|
continue;
|
}
|
List<Integer> o1 = (List<Integer>) o;
|
changeDataIds.addAll(o1);
|
// String[] objects1 = o1.toArray(new String[]{});
|
// changeDataList.addAll(Arrays.asList(objects1));
|
}
|
} else {
|
// 预留删除的数据处理
|
}
|
} else if (objectValue instanceof Map) {
|
// 根据条件更新的数据或删除的数据
|
if (i == 1) {
|
for (Object o : recordList) {
|
// 根据条件修改的数据查询条件
|
Map<String, Object> filterMap = (Map<String, Object>) o;
|
if (filterMap == null) {
|
continue;
|
}
|
String filter = getFilterByMap(filterMap);
|
if (updateDataFilter.length() > 0) {
|
updateDataFilter.append(" OR ");
|
}
|
updateDataFilter.append(" ( ").append(filter).append(" ) ");
|
}
|
|
} else if (i == 2) {
|
//预留删除的数据处理
|
}
|
}
|
}
|
}
|
}
|
String pkField = baseDao.getPKField(tableName);
|
if (StringUtils.isEmpty(pkField)) {
|
return;
|
}
|
String updateIds = "";
|
if (updateDataFilter.length() > 0) {
|
DataTableEntity dt = baseDao.listTable(tableName, updateDataFilter.toString(), new Object[]{}, new Object[]{pkField + " as uuid"});
|
if (!DataTableEntity.isEmpty(dt)) {
|
updateIds = dt.getUuidsToString();
|
}
|
}
|
List<Integer> add_ids = null;
|
if (changeDataList.size() > 0) {
|
DataTableEntity dataTableEntity = baseDao.listTable(tableName, new Object[]{pkField + " as uuid"}, changeDataList.toArray());
|
if (!DataTableEntity.isEmpty(dataTableEntity)) {
|
Object[] uuids = dataTableEntity.getUuids();
|
add_ids = new ArrayList<>();
|
for (int i = 0; i < uuids.length; i++) {
|
String pkValue = (String) uuids[i];
|
if (NumberUtil.isNumber(pkValue)) {
|
add_ids.add(NumberUtil.parseInt(pkValue));
|
}
|
}
|
}
|
}
|
if (changeDataIds != null && changeDataIds.size() > 0) {
|
if (add_ids == null) {
|
add_ids = new ArrayList<>();
|
}
|
add_ids.addAll(changeDataIds);
|
|
}
|
IViewDataProcessService iViewDataProcessService = (IViewDataProcessService) getProxyInstance(this.iViewDataProcessService);
|
iViewDataProcessService.updateSyncRecord(tableName, add_ids.toArray(new Integer[]{}), updateIds);
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
|
private String getFilterByMap(Map<String, Object> filterMap) {
|
StringBuffer filter = new StringBuffer();
|
if (filterMap != null) {
|
filterMap.forEach((k, v) -> {
|
if (filter.length() > 0) {
|
filter.append(" AND ");
|
}
|
if (v == null) {
|
filter.append(" ( `").append(k).append("` is null ) ");
|
} else {
|
filter.append(" ( `").append(k).append("` = '").append(v).append("') ");
|
}
|
});
|
}
|
return filter.insert(0, " ( ").append(" ) ").toString();
|
}
|
|
|
public int getMaxIdValue(String tableName) {
|
String pkField = baseDao.getPKField(tableName);
|
if (!StringUtils.isEmpty(pkField)) {
|
StringBuilder sql = new StringBuilder("SELECT ");
|
sql.append("IFNULL(MAX(").append(pkField).append("),0) maxId FROM ").append(tableName);
|
FieldSetEntity fieldSetBySQL = baseDao.getFieldSetBySQL(sql.toString(), new Object[]{}, false);
|
if (fieldSetBySQL != null) {
|
Integer maxId = fieldSetBySQL.getInteger("maxId");
|
return maxId == null ? -1 : maxId.intValue();
|
}
|
}
|
return -1;
|
}
|
|
/**
|
* 原生sql批量插入
|
* 覆盖同步
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一标识字段
|
* @param syncMap 同步字段
|
* @Auth cheng 2022年1月17日21:29:42
|
*/
|
public List<Object> batchAddData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap) {
|
//系统表名
|
String tableName = map.get("tableName");
|
//每页查询条数
|
final int pageSize = 100000;
|
//批处理条数
|
final int batchCount = 5000;
|
map.put("pageSize", pageSize + "");
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
SpringMVCContextHolder.getSystemLogger().info("开始批量同步数据,每次查询条数:" + pageSize + ",批处理条数:" + batchCount);
|
List<Integer> addDataRecord = new ArrayList<>();
|
// int maxIdValue = getMaxIdValue(tableName);
|
Connection currentConn = null;
|
try {
|
//当前数据库连接
|
currentConn = ConnectionManager.getConnection();
|
System.out.println(new Date().getTime());
|
currentConn.prepareStatement("truncate table " + tableName).execute();
|
System.out.println(new Date().getTime());
|
currentConn.setAutoCommit(false);
|
//此变量插入顺序不可更改
|
List<String> fieldSet = new ArrayList<>();
|
fieldSet.addAll(syncMap.keySet());
|
fieldSet.addAll(sileMap.keySet());
|
map.put("fileName", ArrayUtil.join(fieldSet.toArray(), ","));
|
Map<String, Object> insertDefaultValues = getInsertDefaultValues(tableName, syncMap, sileMap);
|
fieldSet.addAll(insertDefaultValues.keySet());
|
fieldSet.add(CmnConst.UUID);
|
StringBuilder sqlTemplate = new StringBuilder();
|
sqlTemplate.append(" INSERT INTO `").append(tableName).append("` (");
|
int i = 0;
|
Iterator<String> iterator = fieldSet.iterator();
|
// 字段
|
StringBuilder fieldTemplate = new StringBuilder();
|
//占位符
|
StringBuilder placeholderTemplate = new StringBuilder();
|
while (iterator.hasNext()) {
|
String fieldName = iterator.next();
|
if (i > 0) {
|
fieldTemplate.append(",");
|
placeholderTemplate.append(",");
|
} else {
|
i++;
|
}
|
placeholderTemplate.append("?");
|
fieldTemplate.append("`").append(fieldName).append("`");
|
}
|
sqlTemplate.append(fieldTemplate).append(") values (").append(placeholderTemplate).append(")");
|
// fieldTemplate.setLength(0);
|
placeholderTemplate.setLength(0);
|
System.out.println(sqlTemplate);
|
//数据条数 当达到批处理提交的数量时会归零
|
int dataNumber = 0;
|
//当前页
|
int currentPage = 1;
|
//总条数
|
int totalCount = 0;
|
int errorCount = 0;
|
PreparedStatement pst = currentConn.prepareStatement(sqlTemplate.toString());
|
long currentPageStart = System.currentTimeMillis();
|
do {
|
dataNumber = 0;
|
// ResultSet resultSet = conn.prepareStatement("SELECT " + fieldTemplate.toString() + " FROM " + sourceTable).executeQuery();
|
long l = System.currentTimeMillis();
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
SpringMVCContextHolder.getSystemLogger().info("结束查询数据,当前页:" + currentPage + ",耗时:" + (System.currentTimeMillis() - l) + "ms");
|
// List<String> uuidList = new ArrayList<>();
|
if (resultSet != null) {
|
int columnCount = -1;
|
while (resultSet.next()) {
|
if (columnCount == -1) {
|
columnCount = resultSet.getMetaData().getColumnCount();
|
}
|
int j = 1;
|
for (; j <= columnCount; j++) {
|
Object value = resultSet.getObject(j);
|
pst.setObject(j, value);
|
}
|
for (Map.Entry<String, Object> v : insertDefaultValues.entrySet()) {
|
pst.setObject(j, v.getValue());
|
j++;
|
}
|
pst.setObject(j, IdUtil.randomUUID());
|
pst.addBatch();
|
//数据条数+1
|
dataNumber++;
|
if (dataNumber % batchCount == 0) {
|
try {
|
//执行批处理
|
pst.executeBatch();
|
//提交数据
|
currentConn.commit();
|
totalCount += batchCount;
|
SpringMVCContextHolder.getSystemLogger().info("执行批处理并提交,耗时:" + (System.currentTimeMillis() - currentPageStart) + "ms");
|
currentPageStart = System.currentTimeMillis();
|
// addDataRecord.addAll(uuidList);
|
} catch (SQLException e) {
|
// 记录提交批次的数量
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error("执行批处理并提交,错误," + e.getMessage());
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
errorCount += batchCount;
|
exceptionLog.addSubExceptionLog(logUuid, e, currentPage);
|
}
|
} else {
|
//记录新增批次的uuid
|
// addDataRecord.addAll(uuidList);
|
}
|
|
}
|
SpringMVCContextHolder.getSystemLogger().info("执行" + currentPage + "页,耗时:" + (System.currentTimeMillis() - l) + "ms");
|
currentPageStart = System.currentTimeMillis();
|
currentPage++;
|
map.put("currentPage", currentPage + "");
|
try {
|
resultSet.close();
|
} catch (SQLException e) {
|
}
|
}
|
} while (dataNumber == pageSize);
|
totalCount += (dataNumber % batchCount);
|
try {
|
pst.executeBatch();
|
currentConn.commit();
|
exceptionLog.upExceptionLog(conn, logUuid, null, totalCount - errorCount, 0, 0, errorCount, totalCount);
|
} catch (SQLException e) {
|
e.printStackTrace();
|
// addDataRecord = addDataRecord.subList(0, addDataRecord.size() - 2 - dataNumber);
|
SpringMVCContextHolder.getSystemLogger().error("执行批处理并提交,错误," + e.getMessage());
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
errorCount += dataNumber % batchCount;
|
exceptionLog.addSubExceptionLog(logUuid, e, currentPage - 1);
|
}
|
for (int j = 1; j <= totalCount; j++) {
|
addDataRecord.add(j);
|
}
|
exceptionLog.upExceptionLog(conn, logUuid, null, totalCount - errorCount, 0, 0, errorCount, totalCount);
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error("执行数据同步覆盖错误," + e.getMessage());
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
exceptionLog.upExceptionLog(conn, logUuid, null, 0, 0, 0, 0, 0);
|
} finally {
|
try {
|
if (currentConn == null) {
|
conn.close();
|
}
|
} catch (Exception e) {
|
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().info("批处理同步数据执行完成!!");
|
return Lists.newArrayList(addDataRecord, null, null);
|
}
|
|
private Map<String, Object> getInsertDefaultValues(String tableName, Map<String, String> syncMap, Map<String, String> sileMap) {
|
// 默认值map 用于插入数据时默认插入字段 key=字段 value=默认值
|
Map<String, Object> defaultValueMap = new HashMap<>();
|
defaultValueMap.put(CmnConst.CREATED_BY, 1);
|
defaultValueMap.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime());
|
defaultValueMap.put("org_level_uuid", "00000000-0000-0000-0000-000000000000");
|
//获取缓存中 表信息详情
|
FieldSetEntity tableInfo = BaseUtil.getSingleInfoByCache("所有表信息", new String[]{tableName});
|
Iterator<String> iterator = defaultValueMap.keySet().iterator();
|
// 循环判断是否需要增加默认值字段
|
while (iterator.hasNext()) {
|
String field = iterator.next();
|
// 判断默认值key 是否在同步字段中
|
if (syncMap.containsKey(field) || sileMap.containsKey(field)) {
|
// 存在则删除默认值
|
iterator.remove();
|
continue;
|
} else {
|
// 检查默认字段在系统表中是否存在
|
FieldSetEntity fieldInfo = BaseUtil.getSingleInfoByCache("表字段信息", new String[]{tableInfo.getUUID(), field});
|
if (fieldInfo == null || !fieldInfo.getBoolean("is_required")) {
|
//不存在
|
iterator.remove();
|
}
|
}
|
}
|
return defaultValueMap;
|
}
|
|
/**
|
* 原生sql批量插入
|
* 只新增数据,插入批量 INSERT INTO语句
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一标识字段
|
* @param syncMap 同步字段
|
* @param syncType 1增量或2覆盖同步
|
*/
|
public List<Object> batchAddData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, String syncType) {
|
Integer resultRow = 0;
|
//新增数量
|
Integer addNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//总条数
|
Integer totalNumber = 0;
|
//系统表名
|
String tableName = map.get("tableName");
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//获取默认值
|
Map<String, Object> defaultValueMap = getInsertDefaultValues(tableName, syncMap, sileMap);
|
List<String> addDataRecord = new ArrayList<>();
|
PreparedStatement mySqlPs = null;
|
//修改影响行数
|
int result = 0;
|
try {
|
//mysql 连接
|
Connection mysqlCon = null;
|
do {
|
//判断连接是否为空或是否被关闭 cheng 2022年1月13日16:11:49
|
if (mysqlCon == null || mysqlCon.isClosed()) {
|
mysqlCon = ConnectionManager.getConnection();
|
}
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
//新增的DataTableEntity
|
resultRow = 0;
|
StringBuffer insertSql = new StringBuffer();
|
Integer insert = 0;
|
//新增计数
|
Integer addNumber = 0;
|
long l = System.currentTimeMillis();
|
while (resultSet.next()) {
|
resultRow++;
|
//覆盖同步只管新增
|
if ("2".equals(syncType)) {
|
//通过唯一字段查询数据
|
//未查询到相同数据就新增数据,查询到就跳过数据
|
StringBuffer selectSql = new StringBuffer();
|
selectSql.append(" select count(*) from `").append(tableName).append("` where ");
|
for (String key : sileMap.keySet()) {
|
String keys = resultSet.getString(key);
|
if (keys == null) {
|
selectSql.append("`").append(key).append("` = ").append(keys).append(" AND ");
|
} else {
|
selectSql.append("`").append(key).append("` = \"").append(keys).append("\" AND ");
|
}
|
}
|
String sql = selectSql.substring(0, selectSql.length() - 4);
|
PreparedStatement ps = mysqlCon.prepareStatement(sql);
|
ResultSet resultNumber = ps.executeQuery();
|
// 如果查到就大于0
|
while (resultNumber.next()) {
|
result = resultNumber.getInt(1);
|
}
|
}
|
// 大于0 就不新增
|
if (result == 0) {
|
addNumber++;
|
StringBuffer valueSql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
}
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
}
|
// 新增插入默认值 cheng 2022年1月13日16:08:34
|
for (Map.Entry<String, Object> value : defaultValueMap.entrySet()) {
|
valueSql.append("\"").append(value.getValue()).append("\",");
|
}
|
String uuid = UUID.randomUUID().toString();
|
valueSql.append("\"").append(uuid).append("\",");
|
addDataRecord.add(uuid);
|
valueSql.deleteCharAt(valueSql.length() - 1);
|
if (insertSql.length() > 0) {
|
insertSql.append(",(").append(valueSql).append(")");
|
} else {
|
StringBuffer keySql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
keySql.append("`").append(syncMap.get(key)).append("`,");
|
}
|
for (String key : sileMap.keySet()) {
|
keySql.append("`").append(sileMap.get(key)).append("`,");
|
}
|
// 新增插入默认key cheng 2022年1月13日16:08:34
|
for (Map.Entry<String, Object> value : defaultValueMap.entrySet()) {
|
keySql.append("`").append(value.getKey()).append("`,");
|
}
|
keySql.append("`uuid`");
|
insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql)
|
.append(") VALUES (").append(valueSql).append(")");
|
}
|
} else {
|
//查到数据,无法新增
|
errorNum++;
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().info("结束拼接Inert into 语句,共耗时:" + (System.currentTimeMillis() - l) + "ms");
|
if (!BaseUtil.strIsNull(insertSql.toString())) {
|
try {
|
insertSql.append(";");
|
l = System.currentTimeMillis();
|
mySqlPs = mysqlCon.prepareStatement(insertSql.toString());
|
insert = mySqlPs.executeUpdate();
|
SpringMVCContextHolder.getSystemLogger().info("结束运行插入,共耗时:" + (System.currentTimeMillis() - l) + "ms");
|
addNum = addNum + insert;
|
} catch (SQLException | BaseException e) {
|
e.printStackTrace();
|
addDataRecord.clear();
|
errorNum = errorNum + addNumber;
|
exceptionLog.addSubExceptionLog(logUuid, e, currentPage);
|
}
|
}
|
//关闭结果就
|
//DataManipulationUtils.close(resultSet, null, null);
|
totalNumber = totalNumber + resultRow;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//如果条数小于1000就结束查询
|
//关闭mysql 注释 cheng 2022年1月13日15:34:52
|
DataManipulationUtils.close(resultSet, mySqlPs, null);
|
} while (resultRow == 1000);
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, 0, errorNum, totalNumber);
|
//关闭数据源连接
|
DataManipulationUtils.close(null, null, mysqlCon);
|
DataManipulationUtils.close(null, null, conn);
|
|
} catch (SQLException e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, 0, errorNum, totalNumber);
|
}
|
return Lists.newArrayList(addDataRecord);
|
}
|
|
|
/**
|
* 原生sql批量插入
|
* 只新增数据,插入批量 INSERT INTO语句
|
* 根据删除字段删除数据
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一标识字段
|
* @param syncMap 同步字段
|
*/
|
public List<Object> batchAddDeleteData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, Map<String, String> deleteMap) {
|
Integer resultRow = 0;
|
//新增数量
|
Integer addNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//总条数
|
Integer totalNumber = 0;
|
//删除条数
|
Integer deleteNumber = 0;
|
//系统表名
|
String tableName = map.get("tableName");
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
List<String> addDataRecord = new ArrayList<>();
|
List<Map<String, Object>> deleteDataRecord = new ArrayList<>();
|
PreparedStatement mySqlPs = null;
|
|
//修改影响行数
|
int result = 0;
|
try {
|
do {
|
//mysql 连接
|
Connection mysqlCon = ConnectionManager.getConnection();
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
//新增的DataTableEntity
|
resultRow = 0;
|
StringBuffer insertSql = new StringBuffer();
|
Integer insert = 0;
|
//新增计数
|
while (resultSet.next()) {
|
//是否是删除数据
|
Boolean is_delete = false;
|
resultRow++;
|
//最优先判断删除标识
|
for (String key : deleteMap.keySet()) {
|
//删除验证值
|
String deleteValue = map.get("deleteValue");
|
//值相同就删除或者不新增
|
if (deleteValue.equals(resultSet.getString(key))) {
|
is_delete = true;
|
}
|
}
|
//通过唯一字段查询数据
|
//未查询到相同数据就新增数据,查询到就跳过数据
|
StringBuffer selectSql = new StringBuffer();
|
selectSql.append(" select count(*) from `").append(tableName).append("` where ");
|
for (String key : sileMap.keySet()) {
|
String keys = resultSet.getString(key);
|
if (keys == null) {
|
selectSql.append("`").append(key).append("` = ").append(keys).append(" AND ");
|
} else {
|
selectSql.append("`").append(key).append("` = \"").append(keys).append("\" AND ");
|
}
|
}
|
String sql = selectSql.substring(0, selectSql.length() - 4);
|
PreparedStatement ps = mysqlCon.prepareStatement(sql);
|
ResultSet resultNumber = ps.executeQuery();
|
// 如果查到就大于0
|
while (resultNumber.next()) {
|
result = resultNumber.getInt(1);
|
}
|
// 大于0 或者是需要删除的数据
|
if (result == 0) {
|
//如果是要删除的数据 就不新增
|
if (is_delete) {
|
deleteNumber++;
|
continue;
|
}
|
StringBuffer valueSql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
}
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
}
|
String uuid = UUID.randomUUID().toString();
|
valueSql.append("\"").append(uuid).append("\",");
|
addDataRecord.add(uuid);
|
valueSql.deleteCharAt(valueSql.length() - 1);
|
if (insertSql.length() > 0) {
|
insertSql.append(",(").append(valueSql).append(")");
|
} else {
|
StringBuffer keySql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
keySql.append("`").append(syncMap.get(key)).append("`,");
|
}
|
for (String key : sileMap.keySet()) {
|
keySql.append("`").append(sileMap.get(key)).append("`,");
|
}
|
keySql.append("`uuid`");
|
insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql)
|
.append(") VALUES (").append(valueSql).append(")");
|
}
|
//查出这条数据,但要执行删除
|
} else {
|
//执行删除方法
|
if (is_delete) {
|
StringBuffer deleteSql = new StringBuffer();
|
deleteSql.append(" delete from `").append(tableName).append("` where ");
|
Map<String, Object> filterMap = new HashMap<>();
|
for (String key : sileMap.keySet()) {
|
String keys = resultSet.getString(key);
|
filterMap.put(key, resultSet.getString(key));
|
if (keys == null) {
|
deleteSql.append("`").append(key).append("` = ").append(keys).append(" AND ");
|
} else {
|
deleteSql.append("`").append(key).append("` = \"").append(keys).append("\" AND ");
|
}
|
}
|
String delSql = deleteSql.substring(0, deleteSql.length() - 4);
|
PreparedStatement delps = mysqlCon.prepareStatement(delSql);
|
deleteNumber = deleteNumber + delps.executeUpdate();
|
deleteDataRecord.add(filterMap);
|
} else {
|
addNum++;
|
}
|
}
|
}
|
if (!BaseUtil.strIsNull(insertSql.toString())) {
|
try {
|
insertSql.append(";");
|
mySqlPs = mysqlCon.prepareStatement(insertSql.toString());
|
insert = mySqlPs.executeUpdate();
|
addNum = addNum + insert;
|
} catch (SQLException | BaseException e) {
|
errorNum = errorNum + insert;
|
exceptionLog.addSubExceptionLog(logUuid, e, currentPage);
|
addDataRecord.clear();
|
}
|
}
|
//关闭结果就
|
//DataManipulationUtils.close(resultSet, null, null);
|
totalNumber = totalNumber + resultRow;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//如果条数小于1000就结束查询
|
//关闭mysql
|
DataManipulationUtils.close(null, mySqlPs, mysqlCon);
|
} while (resultRow == 1000);
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, deleteNumber, errorNum, totalNumber);
|
//关闭数据源连接
|
DataManipulationUtils.close(null, null, conn);
|
} catch (SQLException e) {
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, deleteNumber, errorNum, totalNumber);
|
}
|
return Lists.newArrayList(addDataRecord, null, deleteDataRecord);
|
}
|
|
/**
|
* 原生sql批量插入
|
* 修改为单条修改,修改不成功的语句插入批量 INSERT INTO语句
|
* 新增修改数据,修改失败就新增
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一标识字段
|
* @param syncMap 同步字段
|
*/
|
public List<Object> batchSaveData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap) {
|
Integer resultRow = 0;
|
List<String> list = ListUtils.newArrayList();
|
//新增数量
|
Integer addNum = 0;
|
//修改数量
|
Integer upNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//总条数
|
Integer totalNumber = 0;
|
//系统表名
|
String tableName = map.get("tableName");
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//1增量或2覆盖同步
|
String syncType = map.get(CmnConst.SYNC_TYPE);
|
List<String> addDataRecord = new ArrayList<>();
|
List<Map<String, Object>> updateDataRecord = new ArrayList<>();
|
PreparedStatement mySqlPs = null;
|
|
//修改影响行数
|
int result = 0;
|
try {
|
do {
|
//mysql 连接
|
Connection mysqlCon = ConnectionManager.getConnection();
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
//获取总列数
|
// ResultSetMetaData rsmd = resultSet.getMetaData();
|
// resultRow = rsmd.getColumnCount();
|
//新增的DataTableEntity
|
resultRow = 0;
|
StringBuffer insertSql = new StringBuffer();
|
Integer insert = 0;
|
//新增计数
|
Integer addNumber = 0;
|
while (resultSet.next()) {
|
resultRow++;
|
if ("1".equals(syncType)) {
|
StringBuffer updateSql = new StringBuffer();
|
updateSql.append(" update `").append(tableName).append("` set ");
|
for (String key : syncMap.keySet()) {
|
String keys = resultSet.getString(key);
|
if (keys == null) {
|
updateSql.append("`").append(syncMap.get(key)).append("` = ").append(keys).append(",");
|
} else {
|
updateSql.append("`").append(syncMap.get(key)).append("` = \"").append(keys).append("\",");
|
}
|
}
|
updateSql = updateSql.deleteCharAt(updateSql.length() - 1);
|
updateSql.append(" where ");
|
list.clear();
|
Map<String, Object> filterMap = new HashMap<>();
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
filterMap.put(key, value);
|
list.add(value);
|
if (value == null) {
|
updateSql.append("`").append(sileMap.get(key)).append("` = ").append(value).append(" AND ");
|
} else {
|
updateSql.append("`").append(sileMap.get(key)).append("` = \"").append(value).append("\" AND ");
|
}
|
}
|
updateSql = updateSql.delete(updateSql.length() - 5, updateSql.length());
|
updateSql.append(";");
|
|
try {
|
mySqlPs = mysqlCon.prepareStatement(updateSql.toString());
|
result = mySqlPs.executeUpdate();// 返回值代表收到影响的行数
|
if (result > 0) {
|
updateDataRecord.add(filterMap);
|
}
|
} catch (SQLException | BaseException e) {
|
errorNum++;
|
exceptionLog.addSubExceptionLog(logUuid, list, e);
|
continue;
|
}
|
}
|
//修改不成功 拼装批量INSERT 语句
|
if (result == 0) {
|
addNumber++;
|
StringBuffer valueSql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
}
|
list.clear();
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
list.add(value);
|
}
|
String uuid = UUID.randomUUID().toString();
|
valueSql.append("\"").append(uuid).append("\",");
|
addDataRecord.add(uuid);
|
valueSql.deleteCharAt(valueSql.length() - 1);
|
if (insertSql.length() > 0) {
|
insertSql.append(",(").append(valueSql).append(")");
|
} else {
|
StringBuffer keySql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
keySql.append("`").append(syncMap.get(key)).append("`,");
|
}
|
for (String key : sileMap.keySet()) {
|
keySql.append("`").append(sileMap.get(key)).append("`,");
|
}
|
keySql.append("`uuid`");
|
insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql)
|
.append(") VALUES (").append(valueSql).append(")");
|
}
|
} else {
|
//修改成功计数加一
|
upNum++;
|
}
|
}
|
if (!BaseUtil.strIsNull(insertSql.toString())) {
|
try {
|
insertSql.append(";");
|
mySqlPs = mysqlCon.prepareStatement(insertSql.toString());
|
insert = mySqlPs.executeUpdate();
|
addNum = addNum + insert;
|
} catch (SQLException | BaseException e) {
|
errorNum = errorNum + addNumber;
|
addDataRecord.clear();
|
exceptionLog.addSubExceptionLog(logUuid, e, currentPage);
|
}
|
}
|
//关闭结果就
|
//DataManipulationUtils.close(resultSet, null, null);
|
totalNumber = totalNumber + resultRow;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//如果条数小于1000就结束查询
|
//关闭mysql
|
DataManipulationUtils.close(null, mySqlPs, mysqlCon);
|
} while (resultRow == 1000);
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, 0, errorNum, totalNumber);
|
//关闭数据源连接
|
DataManipulationUtils.close(null, null, conn);
|
|
} catch (SQLException e) {
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, 0, errorNum, totalNumber);
|
}
|
return Lists.newArrayList(addDataRecord, updateDataRecord);
|
}
|
|
/**
|
* 原生sql批量插入
|
* 修改为单条修改,修改不成功的语句插入批量 INSERT INTO语句
|
* 先判断是否要删除, 要删除先删除,然后新增修改数据,修改失败就新增
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一标识字段
|
* @param syncMap 同步字段
|
* @param deleteMap 删除字段
|
*/
|
public List<Object> batchSaveDeleteData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, Map<String, String> deleteMap) throws Exception {
|
Integer resultRow = 0;
|
List<String> list = ListUtils.newArrayList();
|
//新增数量
|
Integer addNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//总条数
|
Integer totalNumber = 0;
|
//修改数量
|
Integer upNum = 0;
|
//删除条数
|
Integer deleteNumber = 0;
|
//系统表名
|
String tableName = map.get("tableName");
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
// 当前操作表主键自增(id)字段
|
if (true) {
|
//拿id字段 再删除此代码
|
//todo
|
throw new Exception("拿id字段 再删除此代码");
|
}
|
String pkField = "";
|
List<Map<String, String>> deleteDataRecord = new ArrayList<>();
|
List<Map<String, String>> updateDataRecord = new ArrayList<>();
|
List<String> addDataRecord = new ArrayList<>();
|
PreparedStatement mySqlPs = null;
|
|
//修改影响行数
|
int result = 0;
|
try {
|
do {
|
//mysql 连接
|
Connection mysqlCon = ConnectionManager.getConnection();
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
//新增的DataTableEntity
|
resultRow = 0;
|
StringBuffer insertSql = new StringBuffer();
|
Integer insert = 0;
|
//新增计数
|
Integer addNumber = 0;
|
while (resultSet.next()) {
|
//是否是删除数据
|
Boolean is_delete = false;
|
resultRow++;
|
//最优先判断删除标识
|
for (String key : deleteMap.keySet()) {
|
//删除验证值
|
String deleteValue = map.get("deleteValue");
|
//值相同就删除或者不新增
|
if (deleteValue.equals(resultSet.getString(key))) {
|
is_delete = true;
|
}
|
}
|
//通过唯一字段查询数据
|
//未查询到相同数据就新增数据,查询到就跳过数据
|
StringBuffer selectSql = new StringBuffer();
|
selectSql.append(" select count(*) from `").append(tableName).append("` where ");
|
for (String key : sileMap.keySet()) {
|
String keys = resultSet.getString(key);
|
if (keys == null) {
|
selectSql.append("`").append(key).append("` = ").append(keys).append(" AND ");
|
} else {
|
selectSql.append("`").append(key).append("` = \"").append(keys).append("\" AND ");
|
}
|
}
|
String sql = selectSql.substring(0, selectSql.length() - 4);
|
PreparedStatement ps = mysqlCon.prepareStatement(sql);
|
ResultSet resultNumber = ps.executeQuery();
|
// 如果查到就大于0
|
while (resultNumber.next()) {
|
result = resultNumber.getInt(1);
|
}
|
// 大于0 或者是需要删除的数据
|
if (result == 0) {
|
//如果是要删除的数据 就不新增
|
if (is_delete) {
|
deleteNumber++;
|
continue;
|
}
|
addNumber++;
|
StringBuffer valueSql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
}
|
list.clear();
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
if (value == null) {
|
valueSql.append(value).append(",");
|
} else {
|
valueSql.append("\"").append(value).append("\",");
|
}
|
list.add(value);
|
}
|
UUID uuid = UUID.randomUUID();
|
valueSql.append("\"").append(uuid).append("\",");
|
addDataRecord.add(uuid.toString());
|
valueSql.deleteCharAt(valueSql.length() - 1);
|
if (insertSql.length() > 0) {
|
insertSql.append(",(").append(valueSql).append(")");
|
} else {
|
StringBuffer keySql = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
keySql.append("`").append(syncMap.get(key)).append("`,");
|
}
|
for (String key : sileMap.keySet()) {
|
keySql.append("`").append(sileMap.get(key)).append("`,");
|
}
|
keySql.append("`uuid`");
|
insertSql.append(" INSERT INTO `").append(tableName).append("`(").append(keySql)
|
.append(") VALUES (").append(valueSql).append(")");
|
}
|
//查出这条数据,但要执行删除
|
} else {
|
//执行删除方法
|
if (is_delete) {
|
StringBuffer deleteSql = new StringBuffer();
|
deleteSql.append(" delete from `").append(tableName).append("` where ");
|
Map<String, String> record = new HashMap<>();
|
for (String key : sileMap.keySet()) {
|
String keys = resultSet.getString(key);
|
record.put(key, keys);
|
if (keys == null) {
|
deleteSql.append("`").append(key).append("` = ").append(keys).append(" AND ");
|
} else {
|
deleteSql.append("`").append(key).append("` = \"").append(keys).append("\" AND ");
|
}
|
}
|
deleteDataRecord.add(record);
|
|
String delSql = deleteSql.substring(0, deleteSql.length() - 4);
|
PreparedStatement delps = mysqlCon.prepareStatement(delSql);
|
deleteNumber = deleteNumber + delps.executeUpdate();
|
deleteDataRecord.add(record);
|
//查到了数据,并且不删除。那更新该条数据
|
} else {
|
StringBuffer updateSql = new StringBuffer();
|
updateSql.append(" update `").append(tableName).append("` set ");
|
for (String key : syncMap.keySet()) {
|
String keys = resultSet.getString(key);
|
if (keys == null) {
|
updateSql.append("`").append(syncMap.get(key)).append("` = ").append(keys).append(",");
|
} else {
|
updateSql.append("`").append(syncMap.get(key)).append("` = \"").append(keys).append("\",");
|
}
|
}
|
updateSql = updateSql.deleteCharAt(updateSql.length() - 1);
|
updateSql.append(" where ");
|
list.clear();
|
Map<String, String> record = new HashMap<>();
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
record.put(key, value);
|
list.add(value);
|
if (value == null) {
|
updateSql.append("`").append(sileMap.get(key)).append("` = ").append(value).append(" AND ");
|
} else {
|
updateSql.append("`").append(sileMap.get(key)).append("` = \"").append(value).append("\" AND ");
|
}
|
}
|
updateSql = updateSql.delete(updateSql.length() - 5, updateSql.length());
|
updateSql.append(";");
|
|
try {
|
mySqlPs = mysqlCon.prepareStatement(updateSql.toString());
|
result = mySqlPs.executeUpdate();// 返回值代表收到影响的行数
|
upNum++;
|
updateDataRecord.add(record);
|
} catch (SQLException | BaseException e) {
|
errorNum++;
|
exceptionLog.addSubExceptionLog(logUuid, list, e);
|
continue;
|
|
}
|
}
|
}
|
}
|
if (!BaseUtil.strIsNull(insertSql.toString())) {
|
try {
|
insertSql.append(";");
|
mySqlPs = mysqlCon.prepareStatement(insertSql.toString());
|
insert = mySqlPs.executeUpdate();
|
addNum = addNum + insert;
|
} catch (SQLException | BaseException e) {
|
errorNum = errorNum + addNumber;
|
addDataRecord.clear();
|
exceptionLog.addSubExceptionLog(logUuid, e, currentPage);
|
}
|
}
|
//关闭结果就
|
//DataManipulationUtils.close(resultSet, null, null);
|
totalNumber = totalNumber + resultRow;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//如果条数小于1000就结束查询
|
//关闭mysql
|
DataManipulationUtils.close(null, mySqlPs, mysqlCon);
|
} while (resultRow == 1000);
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, deleteNumber, errorNum, totalNumber);
|
//关闭数据源连接
|
DataManipulationUtils.close(null, null, conn);
|
|
} catch (SQLException e) {
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, deleteNumber, errorNum, totalNumber);
|
}
|
return Lists.newArrayList(addDataRecord, updateDataRecord, deleteDataRecord);
|
}
|
|
|
/**
|
* FieldSetEntity 单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param savePreEvent 保存前调用方法
|
* @param postSaveEvent 保存后调用方法
|
* @param is_uuid 是否有uuid字段
|
*/
|
public void batchSaveFieldSetData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, String savePreEvent, String postSaveEvent, boolean is_uuid) {
|
Integer resultRow = 0;
|
List<String> list = ListUtils.newArrayList();
|
//新增数量
|
Integer addNum = 0;
|
//修改数量
|
Integer upNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//总条数
|
Integer totalNumber = 0;
|
//系统表名
|
String tableName = map.get("tableName");
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//1增量或2覆盖同步
|
String syncType = map.get(CmnConst.SYNC_TYPE);
|
try {
|
do {
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
resultRow = 0;
|
while (resultSet.next()) {
|
FieldSetEntity fieldSet = new FieldSetEntity();
|
fieldSet.setTableName(tableName);
|
resultRow++;
|
StringBuffer condition = new StringBuffer();
|
for (String key : syncMap.keySet()) {
|
fieldSet.setValue(syncMap.get(key), resultSet.getString(key));
|
}
|
list.clear();
|
for (String key : sileMap.keySet()) {
|
String value = resultSet.getString(key);
|
String fieldName = sileMap.get(key);
|
fieldSet.setValue(fieldName, value);
|
condition.append(fieldName).append(" = ? AND ");
|
list.add(value);
|
}
|
//拼接修改语句 如果未修改成功记录 就把数据放入新增的dataTableEntity
|
boolean is_update = false;
|
try {
|
//调用保存前方法
|
if (!BaseUtil.strIsNull(savePreEvent) && savePreEvent.indexOf(".") != -1) {
|
try {
|
DataManipulationUtils.codeCalls(savePreEvent, fieldSet);
|
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
e.printStackTrace();
|
exceptionLog.addSubExceptionLog(logUuid, list, e);
|
}
|
}
|
String term = condition.substring(0, condition.length() - 4);
|
if ("1".equals(syncType)) {
|
FieldSetEntity fieldSetEntityByFilter = baseDao.getFieldSetEntityByFilter(tableName, term, list.toArray(new String[]{}), false);
|
//含有uuid
|
if (is_uuid) {
|
is_update = baseDao.update(fieldSet);
|
//如果查出了系统原数据
|
} else if (fieldSetEntityByFilter != null) {
|
fieldSet.setValue("uuid", fieldSetEntityByFilter.getString("uuid"));
|
is_update = baseDao.update(fieldSet);
|
// 没有uuid 并且没有源数据就新增
|
} else {
|
is_update = false;
|
}
|
}
|
} catch (BaseException e) {
|
errorNum++;
|
exceptionLog.addSubExceptionLog(logUuid, list, e);
|
continue;
|
}
|
//修改不成功
|
if (!is_update) {
|
addNum++;
|
baseDao.add(fieldSet);
|
} else {
|
//修改成功计数加一
|
upNum++;
|
}
|
//调用保存后方法
|
if (!BaseUtil.strIsNull(postSaveEvent) && postSaveEvent.indexOf(".") != -1) {
|
try {
|
DataManipulationUtils.codeCalls(postSaveEvent, fieldSet);
|
} catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
|
e.printStackTrace();
|
exceptionLog.addSubExceptionLog(logUuid, list, e);
|
}
|
}
|
}
|
totalNumber = totalNumber + resultRow;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//如果条数小于1000就结束查询
|
} while (resultRow == 1000);
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, 0, errorNum, totalNumber);
|
} catch (SQLException e) {
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, 0, errorNum, totalNumber);
|
}
|
}
|
|
/**
|
* FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。
|
* 查询到1000条数据开一个线程处理,创建可缓存线程池
|
* 新增同步,只做新增操作
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param savePreEvent 保存前调用方法
|
* @param postSaveEvent 保存后调用方法
|
* @param is_uuid 是否有uuid字段
|
*/
|
public List<Object> runnableBatchAddFieldSetData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, String savePreEvent, String postSaveEvent, boolean is_uuid) {
|
//新增数量
|
Integer addNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//保存前调用
|
map.put("savePreEvent", savePreEvent);
|
//保存后调用
|
map.put("postSaveEvent", postSaveEvent);
|
//是否有uuid字段
|
map.put("is_uuid", String.valueOf(is_uuid));
|
List<String> addDataRecord = new ArrayList<>();
|
Integer totalNumber = 0;
|
//线程安全的List
|
List<Map<String, Object>> result = Collections.synchronizedList(Lists.newArrayList());
|
try {
|
ExecutorService exec = Executors.newCachedThreadPool();
|
//查询数据条数
|
Integer number = DataManipulationUtils.getResultSetRow(conn, map);
|
totalNumber = number;
|
do {
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
DataAddSynchronization async = new DataAddSynchronization(resultSet, map, sileMap, syncMap);
|
async.setExceptionLog(exceptionLog);
|
async.setBaseDao(baseDao);
|
//回调写法
|
async.setCallBack((Object[] t) -> {
|
if (t != null && t[0] != null) {
|
result.add((Map<String, Object>) t[0]);
|
}
|
});
|
Thread t = new Thread(async);
|
number = number - 1000;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//放入用于获取线程状态 并执行t.start();
|
exec.submit(t);
|
//如果条数小于1000就结束查询
|
} while (number > 0);
|
exec.shutdown();
|
while (true) {
|
if (exec.isTerminated()) {
|
//System.out.println("所有的子线程都结束了!");
|
break;
|
}
|
Thread.sleep(1000);
|
}
|
//idea 写法
|
// int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum();
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
List<List<String>> changeDataKeys = (List<List<String>>) m.get("changeDataKeys");
|
if (changeDataKeys != null && changeDataKeys.size() > 0) {
|
addDataRecord.addAll(changeDataKeys.get(0));
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, 0, errorNum, totalNumber);
|
} catch (SQLException | InterruptedException e) {
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
List<List<String>> changeDataKeys = (List<List<String>>) m.get("changeDataKeys");
|
if (changeDataKeys != null && changeDataKeys.size() > 0) {
|
addDataRecord.addAll(changeDataKeys.get(0));
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, 0, errorNum, totalNumber);
|
}
|
return Lists.newArrayList(addDataRecord);
|
}
|
|
/**
|
* FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。
|
* 查询到1000条数据开一个线程处理,创建可缓存线程池
|
* 新增修改同步,修改失败就新增
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param deleteMap 删除字段
|
* @param savePreEvent 保存前调用方法
|
* @param postSaveEvent 保存后调用方法
|
* @param is_uuid 是否有uuid字段
|
*/
|
public List<List<Object>> runnableBatchSaveDeleteFieldSetData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, Map<String, String> deleteMap, String savePreEvent, String postSaveEvent, boolean is_uuid) {
|
//新增数量
|
Integer addNum = 0;
|
//修改数量
|
Integer upNum = 0;
|
//删除条数
|
Integer deleteNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
// 新增、修改、删除 对应的uuid 顺序对应集合中的下表 2022年1月13日21:24:53 cheng
|
List<List<Object>> executeDataKeys = Lists.newArrayList(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//保存前调用
|
map.put("savePreEvent", savePreEvent);
|
//保存后调用
|
map.put("postSaveEvent", postSaveEvent);
|
//是否有uuid字段
|
map.put("is_uuid", String.valueOf(is_uuid));
|
Integer totalNumber = 0;
|
//线程安全的List
|
List<Map<String, Object>> result = Collections.synchronizedList(Lists.newArrayList());
|
try {
|
ExecutorService exec = Executors.newCachedThreadPool();
|
//查询数据条数
|
Integer number = DataManipulationUtils.getResultSetRow(conn, map);
|
totalNumber = number;
|
do {
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
DataSaveDeleteSynchronization async = new DataSaveDeleteSynchronization(resultSet, map, sileMap, syncMap, deleteMap);
|
async.setExceptionLog(exceptionLog);
|
async.setBaseDao(baseDao);
|
//回调写法
|
async.setCallBack((Object[] t) -> {
|
if (t != null && t[0] != null) {
|
result.add((Map<String, Object>) t[0]);
|
}
|
});
|
Thread t = new Thread(async);
|
number = number - 1000;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//放入用于获取线程状态 并执行t.start();
|
exec.submit(t);
|
//如果条数小于1000就结束查询
|
} while (number > 0);
|
exec.shutdown();
|
while (true) {
|
if (exec.isTerminated()) {
|
//System.out.println("所有的子线程都结束了!");
|
break;
|
}
|
Thread.sleep(1000);
|
}
|
//idea 写法
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
upNum = upNum + Integer.valueOf((String) m.get("upNum"));
|
deleteNum = deleteNum + Integer.valueOf((String) m.get("deleteNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
List<Object> changeDataKeys = (List<Object>) m.get("changeDataKeys");
|
if (changeDataKeys != null) {
|
for (int j = 0; j < changeDataKeys.size(); j++) {
|
List<Object> objects = executeDataKeys.get(j);
|
if (objects == null) {
|
objects = new ArrayList<>();
|
}
|
objects.addAll((List) changeDataKeys.get(j));
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, deleteNum, errorNum, totalNumber);
|
} catch (SQLException | InterruptedException e) {
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
upNum = upNum + Integer.valueOf((String) m.get("upNum"));
|
deleteNum = deleteNum + Integer.valueOf((String) m.get("deleteNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
List<Object> changeDataKeys = (List<Object>) m.get("changeDataKeys");
|
if (changeDataKeys != null) {
|
for (int j = 0; j < changeDataKeys.size(); j++) {
|
List<Object> objects = executeDataKeys.get(j);
|
if (objects == null) {
|
objects = new ArrayList<>();
|
}
|
objects.addAll((List) changeDataKeys.get(j));
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, deleteNum, errorNum, totalNumber);
|
}
|
return executeDataKeys;
|
}
|
|
/**
|
* FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。
|
* 查询到1000条数据开一个线程处理,创建可缓存线程池
|
* 新增修改同步,修改失败就新增
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param savePreEvent 保存前调用方法
|
* @param postSaveEvent 保存后调用方法
|
* @param is_uuid 是否有uuid字段
|
*/
|
public List<Object> runnableBatchSaveFieldSetData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, String savePreEvent, String postSaveEvent, boolean is_uuid) {
|
//新增数量
|
Integer addNum = 0;
|
//修改数量
|
Integer upNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//源数据表名
|
String dataOriginName = map.get("dataOriginName");
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//保存前调用
|
map.put("savePreEvent", savePreEvent);
|
//保存后调用
|
map.put("postSaveEvent", postSaveEvent);
|
//是否有uuid字段
|
map.put("is_uuid", String.valueOf(is_uuid));
|
|
// 新增、修改、删除 对应的uuid 顺序对应集合中的下表 2022年1月13日21:24:53 cheng
|
List<Object> executeDataKeys = Lists.newArrayList(new ArrayList<>(), new ArrayList<>(), new ArrayList<>());
|
Integer totalNumber = 0;
|
//线程安全的List
|
List<Map<String, Object>> result = Collections.synchronizedList(Lists.newArrayList());
|
try {
|
ExecutorService exec = Executors.newCachedThreadPool();
|
//查询数据条数
|
Integer number = DataManipulationUtils.getResultSetRow(conn, map);
|
totalNumber = number;
|
do {
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
DataSaveSynchronization async = new DataSaveSynchronization(resultSet, map, sileMap, syncMap);
|
async.setExceptionLog(exceptionLog);
|
async.setBaseDao(baseDao);
|
//回调写法
|
async.setCallBack((Object[] t) -> {
|
if (t != null && t[0] != null) {
|
result.add((Map<String, Object>) t[0]);
|
}
|
});
|
Thread t = new Thread(async);
|
number = number - 1000;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//放入用于获取线程状态 并执行t.start();
|
exec.submit(t);
|
//如果条数小于1000就结束查询
|
} while (number > 0);
|
exec.shutdown();
|
while (true) {
|
if (exec.isTerminated()) {
|
//System.out.println("所有的子线程都结束了!");
|
break;
|
}
|
Thread.sleep(1000);
|
}
|
//idea 写法
|
// int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum();
|
for (int i = 0; i < result.size(); i++) {
|
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
upNum = upNum + Integer.valueOf((String) m.get("upNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
List<Object> changeDataKeys = (List<Object>) m.get("changeDataKeys");
|
if (changeDataKeys != null) {
|
for (int j = 0; j < changeDataKeys.size(); j++) {
|
List<Object> objects = (List<Object>) executeDataKeys.get(j);
|
if (objects == null) {
|
objects = new ArrayList<>();
|
}
|
objects.addAll((List) changeDataKeys.get(j));
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, upNum, 0, errorNum, totalNumber);
|
} catch (SQLException | InterruptedException e) {
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
upNum = upNum + Integer.valueOf((String) m.get("upNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
List<Object> changeDataKeys = (List<Object>) m.get("changeDataKeys");
|
if (changeDataKeys != null) {
|
for (int j = 0; j < changeDataKeys.size(); j++) {
|
List<Object> objects = (List<Object>) executeDataKeys.get(j);
|
if (objects == null) {
|
objects = new ArrayList<>();
|
}
|
objects.addAll((List) changeDataKeys.get(j));
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("修改数据" + upNum);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, upNum, 0, errorNum, totalNumber);
|
}
|
return executeDataKeys;
|
}
|
|
/**
|
* FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。
|
* 查询到1000条数据开一个线程处理,创建可缓存线程池
|
* 新增同步,只做新增操作
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param deleteMap 删除字段
|
* @param savePreEvent 保存前调用方法
|
* @param postSaveEvent 保存后调用方法
|
* @param is_uuid 是否有uuid字段
|
*/
|
public List<Object> runnableBatchAddDeleteFieldSetData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, Map<String, String> deleteMap, String savePreEvent, String postSaveEvent, boolean is_uuid) {
|
//新增数量
|
Integer addNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//删除数量
|
Integer deleteNumber = 0;
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//保存前调用
|
map.put("savePreEvent", savePreEvent);
|
//保存后调用
|
map.put("postSaveEvent", postSaveEvent);
|
//是否有uuid字段
|
map.put("is_uuid", String.valueOf(is_uuid));
|
Integer totalNumber = 0;
|
//线程安全的List
|
List<Map<String, Object>> result = Collections.synchronizedList(Lists.newArrayList());
|
List<Object> resultChangeRecord = Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
|
try {
|
ExecutorService exec = Executors.newCachedThreadPool();
|
//查询数据条数
|
Integer number = DataManipulationUtils.getResultSetRow(conn, map);
|
totalNumber = number;
|
do {
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
DataAddDeleteSynchronization async = new DataAddDeleteSynchronization(resultSet, map, sileMap, syncMap, deleteMap);
|
async.setExceptionLog(exceptionLog);
|
async.setBaseDao(baseDao);
|
//回调写法
|
async.setCallBack((Object[] t) -> {
|
if (t != null && t[0] != null) {
|
result.add((Map<String, Object>) t[0]);
|
}
|
});
|
Thread t = new Thread(async);
|
number = number - 1000;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//放入用于获取线程状态 并执行t.start();
|
exec.submit(t);
|
//如果条数小于1000就结束查询
|
} while (number > 0);
|
exec.shutdown();
|
while (true) {
|
if (exec.isTerminated()) {
|
//System.out.println("所有的子线程都结束了!");
|
break;
|
}
|
Thread.sleep(1000);
|
}
|
//idea 写法
|
// int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum();
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber"));
|
List<Object> changeKeys = (List<Object>) m.get("changeKeys");
|
if (changeKeys != null && changeKeys.size() > 0) {
|
for (int j = 0; j < changeKeys.size(); j++) {
|
List<Object> list = (List) resultChangeRecord.get(j);
|
list.addAll((List) changeKeys.get(j));
|
resultChangeRecord.set(i, list);
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, deleteNumber, errorNum, totalNumber);
|
} catch (SQLException | InterruptedException e) {
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber"));
|
List<Object> changeKeys = (List<Object>) m.get("changeKeys");
|
if (changeKeys != null && changeKeys.size() > 0) {
|
for (int j = 0; j < changeKeys.size(); j++) {
|
List<Object> list = (List) resultChangeRecord.get(j);
|
list.addAll((List) changeKeys.get(j));
|
resultChangeRecord.set(i, list);
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, deleteNumber, errorNum, totalNumber);
|
}
|
return resultChangeRecord;
|
}
|
|
/**
|
* FieldSetEntity 通过线程单条保存,每条保存前或保存后可单独调用函数。错误日志单条记录。
|
* 查询到1000条数据开一个线程处理,创建可缓存线程池
|
* 新增同步,只做新增操作
|
*
|
* @param conn 数据库连接
|
* @param map 同步配置表的数据
|
* @param sileMap 唯一字段
|
* @param syncMap 同步字段
|
* @param deleteMap 删除字段
|
* @param savePreEvent 保存前调用方法
|
* @param postSaveEvent 保存后调用方法
|
* @param is_uuid 是否有uuid字段
|
*/
|
public List<Object> runnableBatcSaveDeleteFieldSetData(Connection conn, Map<String, String> map, Map<String, String> sileMap, Map<String, String> syncMap, Map<String, String> deleteMap, String savePreEvent, String postSaveEvent, boolean is_uuid) {
|
//新增数量
|
Integer addNum = 0;
|
//错误数量
|
Integer errorNum = 0;
|
//删除数量
|
Integer deleteNumber = 0;
|
//当前页
|
Integer currentPage = Integer.parseInt(map.get("currentPage"));
|
//日志组表uuid
|
String logUuid = map.get("logUuid");
|
//保存前调用
|
map.put("savePreEvent", savePreEvent);
|
//保存后调用
|
map.put("postSaveEvent", postSaveEvent);
|
//是否有uuid字段
|
map.put("is_uuid", String.valueOf(is_uuid));
|
Integer totalNumber = 0;
|
List<Object> resultChangeRecord = Lists.newArrayList(Lists.newArrayList(), Lists.newArrayList(), Lists.newArrayList());
|
//线程安全的List
|
List<Map<String, Object>> result = Collections.synchronizedList(Lists.newArrayList());
|
try {
|
ExecutorService exec = Executors.newCachedThreadPool();
|
//查询数据条数
|
Integer number = DataManipulationUtils.getResultSetRow(conn, map);
|
totalNumber = number;
|
do {
|
ResultSet resultSet = DataManipulationUtils.getResultSet(conn, map);
|
DataAddDeleteSynchronization async = new DataAddDeleteSynchronization(resultSet, map, sileMap, syncMap, deleteMap);
|
async.setExceptionLog(exceptionLog);
|
async.setBaseDao(baseDao);
|
//回调写法
|
async.setCallBack((Object[] t) -> {
|
if (t != null && t[0] != null) {
|
result.add((Map<String, Object>) t[0]);
|
}
|
});
|
Thread t = new Thread(async);
|
number = number - 1000;
|
currentPage++;
|
map.put("currentPage", String.valueOf(currentPage));
|
//放入用于获取线程状态 并执行t.start();
|
exec.submit(t);
|
//如果条数小于1000就结束查询
|
} while (number > 0);
|
exec.shutdown();
|
while (true) {
|
if (exec.isTerminated()) {
|
//System.out.println("所有的子线程都结束了!");
|
break;
|
}
|
Thread.sleep(1000);
|
}
|
//idea 写法
|
// int numberC = result.stream().map(m -> m.get("resultRow")).filter(n -> n != null && n > 0).mapToInt(n -> n).sum();
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber"));
|
List<Object> changeKeys = (List<Object>) m.get("changeKeys");
|
if (changeKeys != null && changeKeys.size() > 0) {
|
for (int j = 0; j < changeKeys.size(); j++) {
|
List<Object> list = (List) resultChangeRecord.get(j);
|
list.addAll((List) changeKeys.get(j));
|
resultChangeRecord.set(i, list);
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, null, addNum, 0, deleteNumber, errorNum, totalNumber);
|
} catch (SQLException | InterruptedException e) {
|
for (int i = 0; i < result.size(); i++) {
|
Map<String, Object> m = result.get(i);
|
addNum = addNum + Integer.valueOf((String) m.get("addNum"));
|
errorNum = errorNum + Integer.valueOf((String) m.get("errorNum"));
|
deleteNumber = deleteNumber + Integer.valueOf((String) m.get("deleteNumber"));
|
List<Object> changeKeys = (List<Object>) m.get("changeKeys");
|
if (changeKeys != null && changeKeys.size() > 0) {
|
for (int j = 0; j < changeKeys.size(); j++) {
|
List<Object> list = (List) resultChangeRecord.get(j);
|
list.addAll((List) changeKeys.get(j));
|
resultChangeRecord.set(i, list);
|
}
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("新增数据" + addNum);
|
SpringMVCContextHolder.getSystemLogger().error("删除数据" + deleteNumber);
|
SpringMVCContextHolder.getSystemLogger().error("错误数据" + errorNum);
|
SpringMVCContextHolder.getSystemLogger().error("总数据" + totalNumber);
|
exceptionLog.upExceptionLog(conn, logUuid, e, addNum, 0, deleteNumber, errorNum, totalNumber);
|
}
|
return resultChangeRecord;
|
}
|
|
public void testDataDispose(FieldSetEntity fse) {
|
fse.setValue("su01", "【" + fse.getString("su01") + "】");
|
}
|
|
}
|