package com.product.data.service;
|
|
import cn.hutool.core.util.IdUtil;
|
import cn.hutool.core.util.NumberUtil;
|
import com.product.common.lang.DateUtils;
|
import com.product.common.lang.ExceptionUtils;
|
import com.product.common.lang.StringUtils;
|
import com.product.common.reflect.ReflectUtils;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.dao.support.BaseDaoImpl;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringBeanUtil;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.config.CmnConst;
|
import com.product.data.config.DatabaseType;
|
import com.product.data.config.ErrorCode;
|
import com.product.data.connection.ConnectionManager;
|
import com.product.data.entity.*;
|
import com.product.data.utli.CommonUtils;
|
import com.product.data.utli.QueryDataService;
|
import com.product.util.CallBackValue;
|
|
import java.lang.reflect.Method;
|
import java.sql.*;
|
import java.util.*;
|
import java.util.Date;
|
|
/**
|
* @Author cheng
|
* @Date 2022/2/10 13:43
|
* @Desc 同步数据执行 单例
|
*/
|
public class SyncExecuteService {
|
|
/**
|
* 来源连接信息
|
*/
|
private DatabaseEntity dbe;
|
/**
|
* 配置信息
|
*/
|
private SyncFieldConfigEntity syncFieldConfig;
|
/**
|
* 系统连接
|
*/
|
private Connection systemConnection;
|
/**
|
* 来源连接
|
*/
|
private Connection sourceConnection;
|
/**
|
* 每页查询条数
|
*/
|
private int pageSize;
|
/**
|
* 当前查询到第几页
|
*/
|
private int currentPage = 1;
|
/**
|
* 每批次处理数据量
|
*/
|
private int batchSize;
|
|
/**
|
* 默认值
|
*/
|
private static Map<String, Object> defaultValue = new HashMap<>();
|
/**
|
* 系统缓存字段
|
*/
|
private Map<String, Map<Object, Object>> systemFields;
|
/**
|
* 查询service
|
*/
|
private QueryDataService queryDataService;
|
/**
|
* 查询字段
|
*/
|
private List<String> queryFields = new ArrayList<>();
|
/**
|
* 插入字段
|
*/
|
private List<String> insertFields = new ArrayList<>();
|
/**
|
* 当前执行记录
|
*/
|
private SyncExecuteRecordEntity executeRecord;
|
/**
|
* 上一次执行记录
|
*/
|
private SyncExecuteRecordEntity prevExecuteRecord;
|
|
private int startMaxPkValue = -1;
|
|
private List<String> valueFields = new ArrayList<>();
|
|
private BaseDao baseDao;
|
|
static {
|
//初始默认值
|
defaultValue.put(CmnConst.CREATED_BY, 1);
|
defaultValue.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime());
|
defaultValue.put("org_level_uuid", "00000000-0000-0000-0000-000000000000");
|
CallBackValue<String> getUuidMethod = (o) -> IdUtil.randomUUID();
|
defaultValue.put(CmnConst.UUID, getUuidMethod);
|
}
|
|
|
/**
|
* @param dbe
|
* @param syncFieldConfig
|
* @param prevExecuteRecord 上一次执行记录
|
* @param pageSize 每页查询条数
|
* @param batchSize 每次批处理条数
|
* @throws BaseException
|
*/
|
public SyncExecuteService(DatabaseEntity dbe, SyncFieldConfigEntity syncFieldConfig, SyncExecuteRecordEntity prevExecuteRecord, int pageSize, int batchSize) throws BaseException {
|
this.outPutLog("初始化同步Service");
|
try {
|
this.executeRecord = new SyncExecuteRecordEntity(syncFieldConfig.getTaskUuid(), syncFieldConfig.getSyncType());
|
this.prevExecuteRecord = prevExecuteRecord;
|
this.dbe = dbe;
|
this.syncFieldConfig = syncFieldConfig;
|
//获取系统连接
|
this.systemConnection = ConnectionManager.getSystemConnection();
|
//获取来源连接
|
this.sourceConnection = ConnectionManager.getConnection(dbe);
|
this.outPutLog("初始化连接成功");
|
this.systemConnection.setAutoCommit(false);
|
this.pageSize = pageSize;
|
this.batchSize = batchSize;
|
this.baseDao = SpringBeanUtil.getBean(BaseDaoImpl.class);
|
if (this.syncFieldConfig.getSyncType() == 1) {
|
this.increaseFieldMaxValue = getTargetTableFieldMaxValue(this.syncFieldConfig.getIncrementField().getTargetField());
|
this.outPutLog("增量同步最大值获取成功:" + this.increaseFieldMaxValue);
|
// 有更新且更新字段和增量字段不是同一个字段
|
if (this.syncFieldConfig.getUpdateField() != null && !this.syncFieldConfig.getIncrementField().equals(this.syncFieldConfig.getUpdateField()))
|
this.updateFieldMaxValue = getTargetTableFieldMaxValue(this.syncFieldConfig.getUpdateField().getTargetField());
|
this.outPutLog("增量同步更新最大值获取成功:" + this.updateFieldMaxValue);
|
}
|
this.outPutLog("初始化Sql模板");
|
this.initSqlTemplate();
|
this.queryDataService = new QueryDataService(this.sourceConnection);
|
this.outPutLog("开始获取目标表最大主键值");
|
this.startMaxPkValue = getSystemTableMaxPkValue() + 1;
|
this.executeRecord.setStartPosition(startMaxPkValue);
|
if (!StringUtils.isEmpty(this.syncFieldConfig.getSaveBeforeMethod())) {
|
this.outPutLog("初始化获取保存前处理bean.method:" + this.syncFieldConfig.getSaveBeforeMethod());
|
this.getSaveBeforeMethod();
|
}
|
|
this.outPutLog("同步Service初始成功");
|
|
} catch (BaseException e) {
|
e.printStackTrace();
|
this.outPutError("初始化同步Service错误", e);
|
} catch (Exception e) {
|
e.printStackTrace();
|
this.outPutError("初始化同步Service错误", e);
|
}
|
}
|
|
/**
|
* 保存期调用方法bean
|
*/
|
private Object saveBeforeBean;
|
/**
|
* 保存前调用的方法
|
* 方法的参数只能是 Map<String,Object>
|
*/
|
private Method saveBeforeMethod;
|
|
/**
|
* 获取保存前调用的方法
|
*/
|
private void getSaveBeforeMethod() {
|
String saveBeforeMethod = this.syncFieldConfig.getSaveBeforeMethod();
|
if (StringUtils.isEmpty(saveBeforeMethod)) {
|
String[] params = saveBeforeMethod.split(".");
|
if (params.length == 2) {
|
this.saveBeforeBean = SpringBeanUtil.getBean(params[0]);
|
if (this.saveBeforeBean == null) {
|
throw new BaseException(ErrorCode.GET_SAVE_BEFORE_BEAN_FAIL);
|
}
|
this.saveBeforeMethod = ReflectUtils.getAccessibleMethod(params[0], params[1], Map.class);
|
if (this.saveBeforeMethod == null) {
|
throw new BaseException(ErrorCode.GET_SAVE_BEFORE_METHOD_FAIL);
|
}
|
}
|
}
|
}
|
|
/**
|
* 保存执行同步的记录到日志表
|
*
|
* @param endPosition
|
*/
|
private void saveSyncExecuteRecord(int endPosition) {
|
try {
|
BaseDao baseDao = SpringBeanUtil.getBean(BaseDaoImpl.class);
|
if (baseDao != null) {
|
// final long connectionSystemTime = ConnectionManager.getConnectionSystemTime(this.sourceConnection, this.dbe.getDbType());
|
baseDao.saveFieldSetEntity(this.executeRecord.getRecord(endPosition, System.currentTimeMillis()));
|
}
|
} catch (Exception e) {
|
this.outPutError("保存同步数据执行记录失败", e);
|
//为了记录保存执行日志失败时,写入到系统日志文件中,方便追溯原因
|
this.outPutError(this.toString());
|
}
|
}
|
|
/**
|
* 执行保存前方法
|
*
|
* @param values
|
*/
|
private void executeSaveBeforeMethod(Map<String, Object> values) {
|
try {
|
// 判断方法和bean 是否为空 为空说明没有填写保存前执行方法或获取执行方法错误
|
if (this.saveBeforeMethod != null && this.saveBeforeBean != null) {
|
// 传入一行数据
|
this.saveBeforeMethod.invoke(this.saveBeforeBean, values);
|
}
|
} catch (Exception e) {
|
//执行保存前方法出错
|
throw new BaseException(ErrorCode.EXECUTE_BEFORE_SAVE_ERROR);
|
}
|
}
|
|
/**
|
* 查询数据sql模板
|
*/
|
private String querySqlTemplate;
|
|
/**
|
* 插入数据sql模板
|
*/
|
private String insertSqlTemplate;
|
/**
|
* 查询参数 插入 更新使用同一个变量 值可能不同
|
*/
|
private Object[] queryParams;
|
/**
|
* 查询更新的数据sql模板(来源表)
|
*/
|
private StringBuilder queryUpdateTemplate;
|
|
private String[] defaultFields;
|
|
|
/**
|
* 初始化sql模板
|
*/
|
private void initSqlTemplate() throws BaseException {
|
DatabaseType dbType = dbe.getDbType();
|
//查询数据sql模板
|
StringBuilder queryTemplate = new StringBuilder("SELECT ");
|
if (DatabaseType.Informix.equals(dbType)) {
|
queryTemplate.append(" SKIP %s FIRST %s ");
|
} else if (DatabaseType.SqlServer.equals(dbType)) {
|
queryTemplate.append(" TOP %s ");
|
}
|
//插入数据sql目标
|
StringBuilder insertTemplate = new StringBuilder("INSERT INTO `").append(syncFieldConfig.getTargetTable()).append("` (");
|
String values = "";
|
this.systemFields = CommonUtils.getSystemFieldByCache(this.syncFieldConfig.getTargetTable());
|
String[] fields = this.defaultValue.keySet().toArray(new String[]{});
|
for (String field : fields) {
|
if (!this.systemFields.containsKey(field)) {
|
this.defaultValue.remove(field);
|
}
|
}
|
for (int i = 0; i < syncFieldConfig.getSyncFieldEntityList().size(); i++) {
|
SyncFieldEntity syncFieldEntity = syncFieldConfig.getSyncFieldEntityList().get(i);
|
//检查目标表对应字段是否存在
|
Map<Object, Object> systemField = this.systemFields.get(syncFieldEntity.getTargetFieldName());
|
if (systemField == null || systemField.isEmpty()) {
|
//目标表字段不存在
|
throw new BaseException(ErrorCode.SYNC_DATA_TARGET_FIELD_NOT_EXIST);
|
}
|
this.queryFields.add(syncFieldEntity.getSourceFieldName());
|
this.insertFields.add(syncFieldEntity.getTargetFieldName());
|
valueFields.add(syncFieldEntity.getSourceFieldName());
|
this.defaultValue.remove(syncFieldEntity.getTargetFieldName());
|
queryTemplate.append(syncFieldEntity.getSourceField());
|
insertTemplate.append(syncFieldEntity.getTargetField());
|
values += "?";
|
if (i + 1 < syncFieldConfig.getSyncFieldEntityList().size()) {
|
queryTemplate.append(",");
|
insertTemplate.append(",");
|
values += ",";
|
} else {
|
//循环到最后一个字段时,循环默认字段
|
fields = defaultValue.keySet().toArray(new String[]{});
|
defaultFields = new String[fields.length];
|
for (int j = 0; j < fields.length; j++) {
|
|
defaultFields[j] = fields[j];
|
if (j == 0) {
|
values += ",";
|
insertTemplate.append(",");
|
}
|
this.insertFields.add(fields[j]);
|
insertTemplate.append(fields[j]);
|
values += "?";
|
if (j + 1 < fields.length) {
|
insertTemplate.append(",");
|
values += ",";
|
}
|
}
|
}
|
}
|
insertTemplate.append(") ");
|
//增量同步时增加条件
|
String initFilter = syncFieldConfig.getQueryFilter();
|
if (this.prevExecuteRecord != null && this.syncFieldConfig.getSyncType() == 1) {
|
String increaseFilter = this.getIncreaseFilter();
|
if (!StringUtils.isEmpty(increaseFilter)) {
|
String queryFilter = initFilter;
|
if (queryFilter != null) {
|
queryFilter += " AND ";
|
} else {
|
queryFilter = "";
|
}
|
queryFilter += " ( " + increaseFilter + " ) ";
|
|
syncFieldConfig.setQueryFilter(queryFilter);
|
}
|
}
|
if (DatabaseType.Oracle.equals(dbType)) {
|
this.appendUpdateSql(queryTemplate);
|
//Oracle 特殊分页操作
|
queryTemplate.append(" FROM (SELECT ROWNUM AS rowno, t.* FROM ")
|
.append(syncFieldConfig.getSourceTable())
|
.append(" t WHERE ROWNUM <= %s ");
|
this.appendUpdateSql(" FROM (SELECT ROWNUM AS rowno, t.* FROM ");
|
this.appendUpdateSql(syncFieldConfig.getSourceTable());
|
this.appendUpdateSql(" t WHERE ROWNUM <= %s ");
|
if (!StringUtils.isEmpty(initFilter)) {
|
this.appendUpdateSql(" AND ( ");
|
this.appendUpdateSql(initFilter);
|
this.appendUpdateSql(" ) ");
|
}
|
this.appendUpdateSql(" AND ( ");
|
this.appendUpdateSql(getUpdateQueryFilter());
|
this.appendUpdateSql(" ) ");
|
if (!StringUtils.isEmpty(syncFieldConfig.getQueryFilter())) {
|
queryTemplate.append(" AND ( ")
|
.append(syncFieldConfig.getQueryFilter())
|
.append(" ) ");
|
}
|
queryTemplate.append(") a_table WHERE rowno>%s ");
|
this.appendUpdateSql(") a_table WHERE rowno>%s ");
|
} else {
|
queryTemplate.append(" FROM ");
|
if (DatabaseType.PSQL.equals(dbType)) {
|
queryTemplate.append(" \"").append(dbe.getDbName()).append("\".").append("\"").append(dbe.getDbInstance()).append("\".");
|
}
|
queryTemplate.append(syncFieldConfig.getSourceTable()).append(" ");
|
this.appendUpdateSql(queryTemplate);
|
}
|
if (DatabaseType.SqlServer.equals(dbType)) {
|
// sqlserver 特殊分页操作
|
int startIndex = queryTemplate.length();
|
// todo sqlsver 库未正确拼接 sql
|
// queryTemplate.append(" WHERE ").append(syncFieldConfig.getUniqueField()).append(" NOT IN (SELECT TOP %s ").append(syncFieldConfig.getUniqueField());
|
// queryTemplate.append(" FROM ").append(syncFieldConfig.getSourceTable());
|
this.appendUpdateSql(queryTemplate.substring(startIndex));
|
if (!StringUtils.isEmpty(initFilter)) {
|
this.appendUpdateSql(" WHERE ( ");
|
this.appendUpdateSql(initFilter);
|
this.appendUpdateSql(" AND (");
|
this.appendUpdateSql(getUpdateQueryFilter());
|
this.appendUpdateSql(") ");
|
this.appendUpdateSql(" ) ");
|
this.appendUpdateSql(" and ");
|
this.appendUpdateSql(initFilter);
|
this.appendUpdateSql(" AND (");
|
this.appendUpdateSql(getUpdateQueryFilter());
|
this.appendUpdateSql(") ");
|
|
} else {
|
this.appendUpdateSql(" WHERE ( ");
|
this.appendUpdateSql(getUpdateQueryFilter());
|
}
|
if (!StringUtils.isEmpty(syncFieldConfig.getQueryFilter())) {
|
//主表查询条件
|
queryTemplate.append(" WHERE ").append(syncFieldConfig.getQueryFilter()).append(" ) ");
|
queryTemplate.append(" AND ").append(syncFieldConfig.getQueryFilter());
|
}
|
this.appendUpdateSql(" ) ");
|
queryTemplate.append(" ) ");
|
} else if (dbType.equals(DatabaseType.MySql) || DatabaseType.PSQL.equals(dbType)) {
|
String startStr = " WHERE ";
|
if (!StringUtils.isEmpty(syncFieldConfig.getQueryFilter())) {
|
queryTemplate.append(" WHERE ").append(syncFieldConfig.getQueryFilter());
|
}
|
if (!StringUtils.isEmpty(initFilter)) {
|
this.appendUpdateSql(" " + startStr + " ( ");
|
this.appendUpdateSql(initFilter);
|
this.appendUpdateSql(" ) ");
|
startStr = " AND ";
|
} else {
|
|
}
|
if (DatabaseType.PSQL.equals(dbType)) {
|
queryTemplate.append(" limit %s OFFSET %s");
|
} else {
|
queryTemplate.append(" limit %s,%s");
|
}
|
this.appendUpdateSql(" " + startStr + " ( ");
|
this.appendUpdateSql(getUpdateQueryFilter());
|
this.appendUpdateSql(" ) ");
|
if (DatabaseType.PSQL.equals(dbType)) {
|
this.appendUpdateSql(" limit %s OFFSET %s");
|
} else {
|
this.appendUpdateSql(" limit %s,%s");
|
}
|
|
} else if (DatabaseType.Informix.equals(dbType)) {
|
String queryFilter = syncFieldConfig.getQueryFilter();
|
if (!StringUtils.isEmpty(queryFilter)) {
|
queryTemplate.append(" WHERE ").append(queryFilter);
|
}
|
|
this.appendUpdateSql(" WHERE ");
|
if (!StringUtils.isEmpty(initFilter)) {
|
this.appendUpdateSql(initFilter);
|
this.appendUpdateSql(" and ");
|
}
|
this.appendUpdateSql(getUpdateQueryFilter());
|
}
|
insertTemplate.append(" VALUES (").append(values).append(")");
|
this.querySqlTemplate = queryTemplate.toString();
|
this.insertSqlTemplate = insertTemplate.toString();
|
System.out.println("查询sql:\n" + querySqlTemplate);
|
System.out.println("插入sql:\n" + insertSqlTemplate);
|
System.out.println("更新查询sql:\n" + this.queryUpdateTemplate);
|
|
|
}
|
|
/**
|
* 是否更新数(在增量标识字段和更新标识字段不为空时才为 true)
|
*/
|
private boolean isUpdate = true;
|
|
/**
|
* 拼接更新查询语句
|
*
|
* @param str
|
*/
|
private void appendUpdateSql(CharSequence str) {
|
if (this.syncFieldConfig != null && this.prevExecuteRecord != null && this.isUpdate) {
|
if (this.queryUpdateTemplate == null) {
|
this.queryUpdateTemplate = new StringBuilder();
|
}
|
this.queryUpdateTemplate.append(str);
|
}
|
}
|
|
/**
|
* 查询目标表字段最大值
|
*
|
* @param fieldName
|
* @return
|
*/
|
private Object getTargetTableFieldMaxValue(String fieldName) {
|
StringBuilder sql = new StringBuilder("SELECT MAX(");
|
sql.append(fieldName);
|
sql.append(") increaseValue FROM ").append(this.syncFieldConfig.getTargetTable());
|
FieldSetEntity fs = baseDao.getFieldSetEntityBySQL(sql.toString(), new Object[]{}, false);
|
return fs == null ? null : fs.getObject("increaseValue");
|
}
|
|
private Object increaseMaxValue;
|
|
/**
|
* 获取增量条件
|
*
|
* @return
|
* @throws BaseException
|
*/
|
private String getIncreaseFilter() throws BaseException {
|
if (this.prevExecuteRecord != null && this.syncFieldConfig.getSyncType() == 1 && increaseFieldMaxValue != null) {
|
Object increaseValue = this.increaseFieldMaxValue;
|
this.queryParams = new Object[]{increaseValue};
|
if (increaseValue instanceof Timestamp) {
|
long time = ((Timestamp) increaseValue).getTime();
|
increaseMaxValue = time;
|
this.executeRecord.setInsertQueryFilter(this.syncFieldConfig.getIncrementField().getTargetField() + ">" + time);
|
} else if (increaseValue instanceof Date) {
|
long time = ((Date) increaseValue).getTime();
|
increaseMaxValue = time;
|
this.executeRecord.setInsertQueryFilter(this.syncFieldConfig.getIncrementField().getTargetField() + ">" + time);
|
} else {
|
|
String charValue;
|
if (increaseValue instanceof Integer) {
|
charValue = increaseValue.toString();
|
} else {
|
charValue = (String) increaseValue;
|
}
|
if (NumberUtil.isNumber(charValue)) {
|
increaseMaxValue = charValue;
|
this.executeRecord.setInsertQueryFilter(this.syncFieldConfig.getIncrementField().getTargetField() + ">" + NumberUtil.parseInt(charValue));
|
} else {
|
// 增量标识字段只能是 时间或整数 类型
|
throw new BaseException(ErrorCode.GET_SYNC_TARGET_TABLE_MAX_INCREMENT_VALUE_FAIL);
|
}
|
}
|
return this.syncFieldConfig.getIncrementField().getSourceField() + "> ? ";
|
}
|
return "";
|
}
|
|
/**
|
* 获取增量更新条件
|
*
|
* @return
|
* @throws BaseException
|
*/
|
private String getUpdateQueryFilter() throws BaseException {
|
if (this.prevExecuteRecord != null && this.syncFieldConfig.getSyncType() == 1) {
|
|
if (this.updateFieldMaxValue != null) {
|
Object time = null;
|
|
if (updateFieldMaxValue instanceof Timestamp) {
|
time = ((Timestamp) updateFieldMaxValue).getTime();
|
} else if (updateFieldMaxValue instanceof Date) {
|
time = ((Date) updateFieldMaxValue).getTime();
|
} else {
|
// 更新标识字段只能是 时间类型
|
throw new BaseException(ErrorCode.GET_SYNC_TARGET_UPDATE_MAX_VALUE_FAIL);
|
}
|
this.executeRecord.setUpdateQueryFilter(this.syncFieldConfig.getIncrementField().getTargetField() + "<=" + increaseMaxValue + " AND " + this.syncFieldConfig.getUpdateField().getTargetField() + ">" + time);
|
return this.syncFieldConfig.getIncrementField().getSourceField() + "<=? AND " + this.syncFieldConfig.getUpdateField().getSourceField() + "> ? ";
|
} else if (this.syncFieldConfig.getIncrementField() != null && this.syncFieldConfig.getUpdateField() != null) {
|
// 没有更新最大值获取更新表示字段不为空的并且小于增量同步字段的数据做更新
|
this.executeRecord.setUpdateQueryFilter(this.syncFieldConfig.getIncrementField().getTargetField() + "<=" + increaseMaxValue + " AND " + this.syncFieldConfig.getUpdateField().getTargetField() + " is not null");
|
return this.syncFieldConfig.getIncrementField().getSourceField() + "<=? AND " + this.syncFieldConfig.getUpdateField().getSourceField() + " is not null";
|
}
|
}
|
this.isUpdate = false;
|
if (this.queryUpdateTemplate != null) {
|
this.queryUpdateTemplate.setLength(0);
|
}
|
//没有要更新的数据
|
return "";
|
}
|
|
/**
|
* 获取查询sql
|
*
|
* @return
|
*/
|
private String getQuerySql(String templateSql) {
|
DatabaseType dbType = dbe.getDbType();
|
int first = -1;
|
int second = -1;
|
if (DatabaseType.Oracle.equals(dbType)) {
|
first = currentPage * pageSize;
|
second = (currentPage - 1) * pageSize;
|
} else if (DatabaseType.MySql.equals(dbType)) {
|
if (currentPage == 1) {
|
first = 0;
|
} else {
|
first = (currentPage - 1) * pageSize;
|
}
|
second = pageSize;
|
} else if (DatabaseType.SqlServer.equals(dbType)) {
|
first = pageSize;
|
second = (currentPage - 1) * pageSize;
|
} else if (DatabaseType.Informix.equals(dbType)) {
|
first = (currentPage - 1) * pageSize;
|
second = pageSize;
|
} else if (DatabaseType.PSQL.equals(dbType)) {
|
first = pageSize;
|
second = pageSize * (currentPage - 1);
|
} else {
|
return null;
|
}
|
String sql = String.format(templateSql, first, second);
|
this.currentPage++;
|
return sql;
|
}
|
|
/**
|
* 总条数
|
*/
|
private int totalCount = 0;
|
/**
|
* 没有提交的条数
|
*/
|
private int notCommitCount = 0;
|
|
private PreparedStatement pst;
|
|
private List<Object> updateDataUniqueValues = null;
|
/**
|
* 更新数据的sql模板(目标表)
|
*/
|
private String updateTemplateSql;
|
/**
|
* 增量字段最大值
|
*/
|
private Object increaseFieldMaxValue;
|
/**
|
* 更新字段最大值
|
*/
|
private Object updateFieldMaxValue;
|
|
/**
|
* 执行更新数据
|
*
|
* @throws SQLException
|
*/
|
private void executeUpdateData() throws SQLException {
|
if (this.queryUpdateTemplate == null) {
|
this.outPutError("查询更新数据sql模板为空");
|
return;
|
}
|
if (this.updateTemplateSql == null) {
|
this.outPutError("更新数据sql模板为空");
|
return;
|
}
|
if (queryFields.size() > 0 && this.valueFields.size() > 0) {
|
if (this.currentPage == 1) {
|
this.outPutLog("开始查询更新的数据,SOURCE_TABLE:" + this.syncFieldConfig.getSourceTable() + ",TARGET_TABLE:" + this.syncFieldConfig.getTargetTable());
|
// 初始化预编译对象
|
pst = this.systemConnection.prepareStatement(this.updateTemplateSql);
|
this.outPutLog("初始化预编译对象完成");
|
}
|
if (pst == null || pst.isClosed()) {
|
this.outPutError("初预编译对象获取失败,对象为空或对象已被关闭");
|
return;
|
}
|
//获取查询sql 自动替换分页变量 sql中会有 两个占位符 %s %s 替换后会将 当前页+1
|
String querySql = getQuerySql(this.queryUpdateTemplate.toString());
|
QueryResultEntity result = this.executeQuery(querySql);
|
if (!result.isEmpty()) {
|
// 遍历查询的结果集
|
this.outPutLog("开始遍历查询结果,本次查询到数据条数:" + result.getSize());
|
for (Map<String, Object> rowMap : result.getResult()) {
|
//遍历要插入的字段
|
this.executeSaveBeforeMethod(rowMap);
|
int i = 0;
|
for (; i < valueFields.size(); i++) {
|
pst.setObject(i + 1, rowMap.get(valueFields.get(i)));
|
if (i + 1 == valueFields.size()) {
|
//条件值
|
List<SyncFieldEntity> uniqueFields = this.syncFieldConfig.getUniqueFields();
|
for (int j = 1; j <= uniqueFields.size(); j++) {
|
pst.setObject(i + 1 + j, rowMap.get(uniqueFields.get(j - 1).getSourceFieldName()));
|
}
|
|
}
|
}
|
|
this.pst.addBatch();
|
notCommitCount++;
|
if (this.notCommitCount % this.batchSize == 0) {
|
commitData(2);
|
}
|
}
|
|
}
|
//如果查询到的数据条数等于每页查询条数,继续查询下一页
|
if (result.getSize() == this.pageSize) {
|
this.executeUpdateData();
|
} else if (this.notCommitCount > 0) {
|
commitData(2);
|
}
|
}
|
}
|
|
/**
|
* 执行查询语句返回结果值
|
*
|
* @param querySql
|
* @return
|
*/
|
private QueryResultEntity executeQuery(String querySql) {
|
this.outPutLog("查询sql:\n" + querySql);
|
// 执行查询返回查询结果集
|
long startQueryTime = System.currentTimeMillis();
|
this.outPutLog("开始执行查询");
|
QueryResultEntity result = queryDataService.getResult(querySql, this.queryParams, this.queryFields);
|
this.outPutLog("执行查询完成,共耗时:" + ((System.currentTimeMillis() - startQueryTime) / 1000) + "秒");
|
return result;
|
}
|
|
/**
|
* 执行同步数据入口
|
*
|
* @throws BaseException
|
*/
|
public void executeSync() throws BaseException {
|
int endPosition = -1;
|
try {
|
executeSyncData();
|
endPosition = getSystemTableMaxPkValue();
|
} catch (BaseException e) {
|
throw e;
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw new BaseException(e);
|
} finally {
|
saveSyncExecuteRecord(endPosition);
|
close();
|
}
|
}
|
|
/**
|
* 执行同步入口
|
*/
|
private void executeSyncData() throws BaseException, SQLException {
|
|
try {
|
if (queryFields.size() > 0 && this.valueFields.size() > 0) {
|
if (this.currentPage == 1) {
|
this.outPutLog("开始执行同步数据任务,SOURCE_TABLE:" + this.syncFieldConfig.getSourceTable() + ",TARGET_TABLE:" + this.syncFieldConfig.getTargetTable());
|
this.outPutLog("本次同步每页查询条数:" + this.pageSize + ",每" + batchSize + "条数据执行批量提交");
|
// 初始化预编译对象
|
pst = this.systemConnection.prepareStatement(this.insertSqlTemplate);
|
this.outPutLog("初始化预编译对象完成");
|
if (this.syncFieldConfig.getSyncType() == 0) {
|
// 覆盖同步
|
PreparedStatement pst = this.systemConnection.prepareStatement("truncate table " + syncFieldConfig.getTargetTable());
|
//执行清空表
|
pst.execute();
|
pst.close();
|
this.outPutLog("清空表操作完成");
|
this.executeRecord.setStartPosition(0);
|
}
|
}
|
if (pst == null || pst.isClosed()) {
|
this.outPutError("初预编译对象获取失败,对象为空或对象已被关闭");
|
return;
|
}
|
//获取查询sql 自动替换分页变量 sql中会有 两个占位符 %s %s 替换后会将 当前页+1
|
String querySql = getQuerySql(this.querySqlTemplate);
|
|
QueryResultEntity result = this.executeQuery(querySql);
|
if (!result.isEmpty()) {
|
// 遍历查询的结果集
|
this.outPutLog("开始遍历查询结果,本次查询到数据条数:" + result.getSize());
|
for (Map<String, Object> rowMap : result.getResult()) {
|
//遍历要插入的字段
|
// this.defaultValue.forEach((fieldName, value) -> {
|
// if (this.defaultValue.get(fieldName) != null) {
|
// value = this.defaultValue.get(fieldName);
|
// //判断是否为回调接口
|
// if (value instanceof CallBackValue) {
|
// CallBackValue<String> callback = (CallBackValue<String>) value;
|
// //执行回调方法获取value
|
// value = callback.method(fieldName);
|
// }
|
// rowMap.put(fieldName, value);
|
// }
|
// });
|
//调用保存前执行方法
|
this.executeSaveBeforeMethod(rowMap);
|
int i = 0;
|
for (; i < valueFields.size(); i++) {
|
pst.setObject(i + 1, rowMap.get(valueFields.get(i)));
|
}
|
if (this.defaultFields != null && this.defaultFields.length > 0) {
|
for (int j = 1; j <= this.defaultFields.length; j++) {
|
Object value = null;
|
String fieldName = this.defaultFields[j - 1];
|
if (this.defaultValue.get(fieldName) != null) {
|
value = this.defaultValue.get(fieldName);
|
//判断是否为回调接口
|
if (value instanceof CallBackValue) {
|
CallBackValue<String> callback = (CallBackValue<String>) value;
|
//执行回调方法获取value
|
value = callback.method(fieldName);
|
}
|
}
|
pst.setObject(i + j, value);
|
}
|
}
|
|
try {
|
this.pst.addBatch();
|
notCommitCount++;
|
} catch (Exception e) {
|
this.executeRecord.addErrorNumber(1);
|
this.outPutError("批处理增加失败", e);
|
}
|
if (this.notCommitCount % this.batchSize == 0) {
|
commitData(1);
|
}
|
}
|
|
}
|
|
if (result.getSize() > 0 && result.getSize() == this.pageSize) {
|
//进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源
|
result.clear();
|
this.executeSyncData();
|
} else if (this.notCommitCount > 0) {
|
commitData(1);
|
//新增数据完成 更新历史数据
|
this.historyDataUpdate();
|
} else {
|
//没有查询到数据 更新历史数据
|
this.historyDataUpdate();
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
this.outPutError("", e);
|
throw e;
|
}
|
}
|
|
|
private static String getConcatUnique(List<SyncFieldEntity> uniqueFields) {
|
if (uniqueFields != null) {
|
String start = " concat (";
|
StringBuilder f = new StringBuilder(start);
|
for (int i = 0; i < uniqueFields.size(); i++) {
|
if (i > 0) {
|
f.append(",'*=*',");
|
}
|
f.append(uniqueFields.get(i).getTargetField().trim());
|
}
|
if (start.length() < f.length()) {
|
f.append(")");
|
return f.toString();
|
}
|
}
|
return "";
|
}
|
|
|
/**
|
* 历史数据更新 (仅用于增量同步)
|
*/
|
public void historyDataUpdate() {
|
try {
|
if (this.isUpdate == false) {
|
if (pst != null && !pst.isClosed()) {
|
pst.close();
|
}
|
return;
|
}
|
if (this.syncFieldConfig.getSyncType() == 1) {
|
if (this.syncFieldConfig.getIncrementField().equals(this.syncFieldConfig.getUpdateField()) && this.executeRecord.getIncreaseNumber() > 0) {
|
//更新标识和增量标识是同一个字段
|
StringBuilder sql = new StringBuilder();
|
List<SyncFieldEntity> uniqueFields = this.syncFieldConfig.getUniqueFields();
|
if (uniqueFields == null || uniqueFields.size() == 0) {
|
this.outPutError("唯一标识字段不存在,增量更新失败");
|
} else {
|
String targetTable = this.syncFieldConfig.getTargetTable();
|
String incrementField = this.syncFieldConfig.getIncrementField().getTargetField();
|
String targetTablePkField = this.syncFieldConfig.getTargetTablePkField();
|
sql.append(" DELETE FROM ").append(targetTable).append(" WHERE ").append(targetTablePkField).append(" in ( SELECT * FROM ( ");
|
sql.append(" SELECT ");
|
sql.append(" id ");
|
sql.append(" FROM ");
|
sql.append(targetTable);
|
sql.append(" a ");
|
sql.append(" JOIN ( ");
|
sql.append(" SELECT ");
|
sql.append(getConcatUnique(uniqueFields)).append(" uniqueValue ");
|
sql.append(" FROM ");
|
sql.append(targetTable);
|
sql.append(" GROUP BY ");
|
for (int i = 0; i < uniqueFields.size(); i++) {
|
if (i > 0) {
|
sql.append(",");
|
}
|
sql.append(" `").append(uniqueFields.get(i).getTargetFieldName()).append("` ");
|
}
|
sql.append(" HAVING ");
|
for (int i = 0; i < uniqueFields.size(); i++) {
|
if (i > 0) {
|
sql.append(" AND ");
|
}
|
sql.append(" count(`").append(uniqueFields.get(i).getTargetFieldName()).append("`)>1 ");
|
}
|
sql.append(" ) b ON ");
|
sql.append(getConcatUnique(uniqueFields)).append(" = b.`uniqueValue` ");
|
sql.append(" JOIN ( SELECT max( ").append(incrementField).append(" ) `maxValue`, ").append(getConcatUnique(uniqueFields)).append(" uniqueValue FROM ").append(targetTable).append(" GROUP BY ");
|
for (int i = 0; i < uniqueFields.size(); i++) {
|
if (i > 0) {
|
sql.append(",");
|
}
|
sql.append(" `").append(uniqueFields.get(i).getTargetFieldName()).append("` ");
|
}
|
sql.append(" ) c ON c.`maxValue` != a.").append(incrementField);
|
sql.append(" AND c.`uniqueValue` = b.`uniqueValue` ) A ) ");
|
// //查询出重复的数据删除 根据唯一字段查询 保留最近的数据
|
PreparedStatement pst = this.systemConnection.prepareStatement(sql.toString());
|
int i = pst.executeUpdate();
|
this.outPutLog("删除重复数据条数:" + i);
|
this.systemConnection.commit();
|
}
|
pst.close();
|
} else {
|
//更新标识是单独的字段
|
if (this.prevExecuteRecord != null) {
|
//上一次执行同步有查询到数据 根据上一次查询结束时间查询 更新标识字段大于结束时间的字段
|
this.currentPage = 1;
|
if (pst != null && !pst.isClosed()) {
|
pst.close();
|
}
|
if (this.defaultValue.size() > 0) {
|
this.defaultValue.forEach((k, v) -> {
|
this.valueFields.remove(this.syncFieldConfig.getTargetFieldEntity(k));
|
});
|
}
|
StringBuilder updateTemplateSql = new StringBuilder();
|
updateTemplateSql.append(" UPDATE `").append(this.syncFieldConfig.getTargetTable()).append("` SET ");
|
for (int i = 0; i < this.valueFields.size(); i++) {
|
updateTemplateSql.append(this.syncFieldConfig.getSourceFieldEntity(this.valueFields.get(i)).getTargetField()).append("=?");
|
if (i + 1 < this.valueFields.size()) {
|
updateTemplateSql.append(",");
|
} else {
|
List<SyncFieldEntity> uniqueFields = this.syncFieldConfig.getUniqueFields();
|
updateTemplateSql.append(" WHERE ");
|
for (int j = 0; j < uniqueFields.size(); j++) {
|
if (j > 0) {
|
updateTemplateSql.append(" AND ");
|
}
|
updateTemplateSql.append(uniqueFields.get(j).getTargetField()).append("=?");
|
|
}
|
// updateTemplateSql.append(" WHERE ").;
|
}
|
}
|
if (this.updateFieldMaxValue == null) {
|
this.queryParams = new Object[]{this.increaseFieldMaxValue};
|
} else {
|
this.queryParams = new Object[]{this.increaseFieldMaxValue, this.updateFieldMaxValue};
|
}
|
this.updateTemplateSql = updateTemplateSql.toString();
|
System.out.println("更新sql:\n" + this.updateTemplateSql);
|
this.executeUpdateData();
|
|
}
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 获取目标表最大主键值
|
*
|
* @return
|
*/
|
private int getSystemTableMaxPkValue() {
|
try {
|
String targetTablePkField = this.syncFieldConfig.getTargetTablePkField();
|
try (
|
PreparedStatement pst = this.systemConnection.prepareStatement("SELECT MAX(" + targetTablePkField + ") maxPkValue FROM " + this.syncFieldConfig.getTargetTable()); ResultSet resultSet = pst.executeQuery();) {
|
if (resultSet != null && resultSet.next()) {
|
return resultSet.getInt("maxPkValue");
|
}
|
}
|
throw new BaseException(ErrorCode.GET_TARGET_TABLE_PK_VALUE_FAIL);
|
} catch (BaseException e) {
|
e.printStackTrace();
|
throw e;
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw new BaseException(ErrorCode.GET_TARGET_TABLE_PK_VALUE_FAIL);
|
}
|
}
|
|
/**
|
* 关闭连接相关的所有对象
|
*/
|
private void close() {
|
try {
|
if (this.pst != null && !this.pst.isClosed()) {
|
this.pst.close();
|
}
|
if (this.systemConnection != null) {
|
this.systemConnection.close();
|
}
|
if (this.sourceConnection != null) {
|
this.sourceConnection.close();
|
}
|
this.outPutLog("关闭连接");
|
} catch (Exception e) {
|
this.outPutError("关闭连接失败", e);
|
}
|
}
|
|
/**
|
* 批处理提交数据
|
*
|
* @param type 提交类型 1 新增 2 更新
|
*/
|
private void commitData(int type) throws SQLException {
|
try {
|
this.outPutLog("执行批量提交");
|
this.pst.executeBatch();
|
this.systemConnection.commit();
|
this.pst.clearBatch();
|
this.outPutLog("执行批量提交完成,共提交数据" + this.notCommitCount + "条");
|
this.totalCount += this.notCommitCount;
|
if (type == 1) {
|
this.executeRecord.addIncreaseNumber(this.notCommitCount);
|
} else if (type == 2) {
|
this.executeRecord.addUpdateNumber(this.notCommitCount);
|
}
|
} catch (Exception e) {
|
this.outPutError("执行批量提交错误,未成功提交数据" + this.notCommitCount + "条", e);
|
this.executeRecord.addErrorNumber(this.notCommitCount);
|
this.pst.clearBatch();
|
} finally {
|
this.notCommitCount = 0;
|
}
|
}
|
|
/**
|
* 输出错误日志并存放到执行记录对象中
|
*
|
* @param error
|
*/
|
private void outPutError(String error) {
|
SpringMVCContextHolder.getSystemLogger().error("执行同步数据任务错误,任务名称:" + this.syncFieldConfig.getSyncTaskName() + ",任务uuid:" + this.syncFieldConfig.getTaskUuid() + ",错误信息:\n" + error);
|
if (this.executeRecord != null) {
|
this.executeRecord.addErrorInfo(error);
|
}
|
}
|
|
/**
|
* 输出错误日志
|
*
|
* @param error
|
* @param e
|
*/
|
private void outPutError(String error, Exception e) {
|
if (e != null) {
|
String stackTraceAsString = ExceptionUtils.getStackTraceAsString(e);
|
error += (!StringUtils.isEmpty(error) ? ":\n" : "") + stackTraceAsString;
|
}
|
|
this.outPutError(error);
|
}
|
|
/**
|
* 输出info日志
|
*
|
* @param msg
|
*/
|
private void outPutLog(String msg) {
|
SpringMVCContextHolder.getSystemLogger().info(msg);
|
}
|
|
@Override
|
public String toString() {
|
return "SyncExecuteService{" +
|
"dbe=" + (dbe == null ? null : dbe.toString()) +
|
", syncFieldConfig=" + (syncFieldConfig == null ? null : syncFieldConfig.toString()) +
|
", pageSize=" + pageSize +
|
", currentPage=" + currentPage +
|
", batchSize=" + batchSize +
|
", systemFields=" + systemFields +
|
", queryFields=" + queryFields +
|
", insertFields=" + insertFields +
|
", executeRecord=" + executeRecord +
|
", startMaxPkValue=" + startMaxPkValue +
|
", querySqlTemplate='" + querySqlTemplate + '\'' +
|
", insertSqlTemplate='" + insertSqlTemplate + '\'' +
|
", totalCount=" + totalCount +
|
", notCommitCount=" + notCommitCount +
|
'}';
|
}
|
|
public SyncExecuteRecordEntity getExecuteRecord() {
|
return executeRecord;
|
}
|
|
}
|