package com.product.data.center.service;
|
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.TimeInterval;
|
import cn.hutool.core.util.ArrayUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.google.common.collect.Lists;
|
import com.product.common.utils.StringUtils;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.entity.JournalEntity;
|
import com.product.data.center.entity.SyncDataLogEntity;
|
import com.product.data.center.utils.CustomLock;
|
import com.product.data.center.utils.RSAUtil;
|
import com.product.data.center.utils.SqlTransferUtil;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.datasource.entity.UpdateFilterEntity;
|
import com.product.util.BaseUtil;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Component;
|
|
import java.security.NoSuchAlgorithmException;
|
import java.security.spec.InvalidKeySpecException;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
/**
|
* Copyright © 6c
|
*
|
* @Date 2022年08月03日 9:59
|
* @Author 6c
|
* @Description
|
*/
|
@Component
|
public class DataSyncService {
|
@Autowired
|
private BaseDao baseDao;
|
@Autowired
|
private JournalManagerService journalManagerService;
|
|
public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ";
|
// public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo";
|
// public static final String signPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJkgrw6CzG6odwYfbN8Yp77vjfs3ufjaR8S3xLQiZBmsMFv5T9Mkt4EkDpKZRx9BoAOf_a0zyhq8lbk78kDcKFUCAwEAAQ";
|
public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ";
|
|
private static CustomLock customLock = new CustomLock();
|
|
|
public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException {
|
JSONArray jsonArray = new JSONArray();
|
|
// JSONObject obj1 = new JSONObject();
|
// obj1.put(CmnConst.TABLE_NAME, "TT_T_CO_EMP_DESC");
|
// obj1.put(CmnConst.UNIQUE_NAME, "EMP_NO");
|
// obj1.put(CmnConst.UPDATE_TIME, "updated_utc_datetime");
|
// jsonArray.add(obj1);
|
|
JSONObject obj2 = new JSONObject();
|
obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND");
|
obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID");
|
jsonArray.add(obj2);
|
|
|
String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey));
|
System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ"));
|
String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey));
|
System.out.println(encodedData);
|
// dataSync(encodedData);
|
}
|
|
|
public void runTimeTask(String uuid) {
|
FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false);
|
if (FieldSetEntity.isEmpty(fse)) {
|
return;
|
}
|
JSONObject objects = BaseUtil.fieldSetEntityToJson(fse);
|
markProcess(this.dataSync(objects, true));
|
}
|
|
/**
|
* 重新处理
|
*
|
* @param logUUID
|
*/
|
public void reDeal(String logUUID) {
|
asynRedeal(logUUID);
|
}
|
|
@Async
|
public void asynRedeal(String logUUID) {
|
FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false);
|
if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) {
|
return;
|
}
|
String otherInfo = logFse.getString(CmnConst.OTHER_INFO);
|
SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class);
|
Object syncInfo = syncDataLogEntity.getSyncInfo();
|
if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) {
|
//解密错误类型 废弃日志逻辑代码未实现
|
dataSync((String) syncInfo);
|
} else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) {
|
//运行过程中产生的错误
|
markProcess(dataSync((JSONObject) syncInfo));
|
}
|
|
}
|
|
/**
|
* 标记日志
|
*
|
* @param lists
|
*/
|
private void markProcess(List<JournalEntity> lists) {
|
if (lists == null || lists.isEmpty()) {
|
return;
|
}
|
long count = lists.stream().filter(item -> item.getResult() != 1).count();
|
try {
|
SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class);
|
JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo();
|
if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) {
|
String tableName = syncInfo.getString(CmnConst.TABLE_NAME);
|
baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" +
|
" where type=3 and config_uuid in (select uuid from product_sys_data_sync_mes_sub where UPPER(table_name)=UPPER(?)) and (result=0 or deal_result=0)"
|
, new Object[]{1, count == lists.size() ? 1 : 0, new Date(), tableName});
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
|
|
public List<JournalEntity> dataSync(JSONObject jsonArray) {
|
return dataSync(jsonArray, false);
|
}
|
|
public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) {
|
List<JournalEntity> journalEntities = new ArrayList<>();
|
|
SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity();
|
syncDataLogEntity.setSyncInfo(jsonObject);
|
syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger);
|
Calendar c1 = Calendar.getInstance();
|
boolean lockStatus = false;
|
String lockKey = null;
|
Dao sourceDao = null;
|
try {
|
try {
|
String tableName = jsonObject.getString(CmnConst.TABLE_NAME);
|
String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(",");
|
if (ArrayUtil.isEmpty(uniqueSignFieldName)) {
|
throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN);
|
}
|
// 获取同步配置信息
|
DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
|
if (DataTableEntity.isEmpty(syncConfigDte)) {
|
throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
|
}
|
FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
|
if (FieldSetEntity.isEmpty(syncConfigFse)) {
|
throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
|
}
|
lockKey = jsonObject.getString(CmnConst.TABLE_NAME);
|
if (!customLock.tryLock(lockKey, 120)) {
|
syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!");
|
journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName,
|
null, null, null, 0L));
|
return null;
|
}
|
lockStatus = true;
|
if (jsonObject == null || jsonObject.isEmpty()) {
|
throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
|
}
|
|
// 获取来源连接和目标连接
|
if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) {
|
throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL);
|
}
|
FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false);
|
DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse);
|
sourceDao = sourceDbe.getDao();
|
if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) {
|
throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL);
|
}
|
DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true));
|
List<Dao> targetDaoList = Lists.newArrayList();
|
FieldSetEntity targetDataSourceFse;
|
for (int i = 0; i < targetDataSourceDte.getRows(); i++) {
|
targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i);
|
targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao());
|
}
|
|
// 数据处理
|
String updateTimeFieldName;
|
DataTableEntity sourceDataDte;
|
FieldSetEntity sourceDataFse;
|
DataTableEntity targetDataDte = null;
|
List<String> sourceUniqueKeyList;// 来源数据的唯一键集合
|
List<String> targetUniqueKeyList;// 目标数据的唯一键集合
|
List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合
|
List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合
|
List<String> publicKeyList;// 两者都存在的唯一键集合
|
DataTableEntity addDte;
|
DataTableEntity updateDte;
|
// StringBuilder errorInfo = new StringBuilder(1024);
|
int dataSourceCount = 0;
|
try {
|
tableName = tableName.toLowerCase(Locale.ROOT);
|
// uniqueSignFieldName = uniqueSignFieldName.toLowerCase(Locale.ROOT);
|
updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME);
|
if (StringUtils.isEmpty(updateTimeFieldName)) {
|
// 没有更新标识,执行覆盖对比更新
|
sourceDataDte = sourceDao.getList(tableName, "");
|
sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
|
//dao对应目标数据源信息的下标
|
for (Dao targetDao : targetDaoList) {
|
addDte = new DataTableEntity();
|
updateDte = new DataTableEntity();
|
TimeInterval timer = DateUtil.timer();
|
try {
|
targetDataDte = targetDao.getList(tableName, "");
|
if (!DataTableEntity.isEmpty(targetDataDte)) {
|
targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
|
sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
|
targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1);
|
publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
|
for (int j = 0; j < sourceDataDte.getRows(); j++) {
|
sourceDataFse = sourceDataDte.getFieldSetEntity(j);
|
sourceDataFse.setTableName(tableName);
|
if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
|
addDte.addFieldSetEntity(sourceDataFse);
|
} else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
|
updateDte.addFieldSetEntity(sourceDataFse);
|
}
|
}
|
String filter = " ( ";
|
for (int i = 0; i < uniqueSignFieldName.length; i++) {
|
String uniqueSign = uniqueSignFieldName[i];
|
if (i > 0) {
|
filter += " and ";
|
|
}
|
filter += " " + uniqueSign + "=? ";
|
}
|
filter += " ) ";
|
String[] lower = new String[uniqueSignFieldName.length];
|
for (int i = 0; i < uniqueSignFieldName.length; i++) {
|
lower[i] = uniqueSignFieldName[i].toLowerCase();
|
}
|
targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true);
|
if (!targetOnlyKeyList.isEmpty()) {
|
List<Object> delParams = new ArrayList<>();
|
StringBuilder delFilter = new StringBuilder();
|
for (int i = 0; i < targetOnlyKeyList.size(); i++) {
|
delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i))));
|
if (i > 0) {
|
delFilter.append(" OR ");
|
}
|
delFilter.append(" ( ");
|
for (int j = 0; j < uniqueSignFieldName.length; j++) {
|
if (j > 0) {
|
delFilter.append(" and ");
|
}
|
delFilter.append(uniqueSignFieldName[j]).append("=? ");
|
}
|
delFilter.append(" ) ");
|
}
|
|
targetDao.delete(tableName, delFilter.toString(), delParams.toArray());
|
}
|
} else {
|
addDte = sourceDataDte;
|
addDte.getMeta().setTableName(new Object[]{tableName});
|
}
|
targetDao.addBatch(addDte);
|
syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
|
journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName,
|
targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
|
syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
|
syncDataLogEntity.setErrorType(3);
|
journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName,
|
targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
|
} finally {
|
dataSourceCount++;
|
}
|
}
|
} else {
|
// 存在更新标识,执行增量更新
|
updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT);
|
dataSourceCount = 0;
|
for (Dao targetDao : targetDaoList) {
|
TimeInterval timer = DateUtil.timer();
|
StringBuilder sql = new StringBuilder(128);
|
String maxValue = null;
|
String minValue = null;
|
int totalCount = 0;
|
try {
|
sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName);
|
FieldSetEntity paramFse = targetDao.getOne(sql.toString());
|
StringBuilder filter = new StringBuilder(128);
|
if (paramFse != null) {
|
if (!StringUtils.isEmpty(paramFse.getString("max_value"))) {
|
filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value")))
|
.append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime()));
|
}
|
}
|
sql.setLength(0);
|
sql.append("select count(*) total_count from ").append(tableName);
|
if (filter.length() > 0) {
|
sql.append(" where ").append(filter);
|
}
|
paramFse = sourceDao.getOne(sql.toString());
|
totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count");
|
int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT);
|
int totalPage = (int) Math.ceil((double) totalCount / batchCount);
|
for (int j = 0; j < totalPage; j++) {
|
addDte = new DataTableEntity();
|
updateDte = new DataTableEntity();
|
sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount);
|
sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
|
sql.setLength(0);
|
sql.append("select ");
|
filter.setLength(0);
|
|
for (int i = 0; i < sourceUniqueKeyList.size(); i++) {
|
if (i > 0) {
|
filter.append(" or ");
|
}
|
filter.append(" ( ");
|
for (int k = 0; k < uniqueSignFieldName.length; k++) {
|
if (i == 0) {
|
if (k > 0) {
|
sql.append(",");
|
}
|
sql.append(uniqueSignFieldName[k]);
|
|
}
|
if (k > 0) {
|
filter.append(" AND ");
|
}
|
filter.append(uniqueSignFieldName[k]).append("= ? ");
|
}
|
filter.append(" ) ");
|
}
|
|
sql.append(" from ").append(tableName).append(" where ").append(filter);
|
|
String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new);
|
String[] parmas = ArrayUtil.addAll(objects);
|
DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas);
|
if (!DataTableEntity.isEmpty(paramDte)) {
|
targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
|
sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
|
publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
|
for (int k = 0; k < sourceDataDte.getRows(); k++) {
|
sourceDataFse = sourceDataDte.getFieldSetEntity(k);
|
String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName);
|
//获取最小唯一值
|
if (StringUtils.isEmpty(minValue)) {
|
minValue = uniqueValue;
|
} else {
|
minValue = getAimID(minValue, uniqueValue, 0);
|
}
|
//获取最大唯一值
|
if (StringUtils.isEmpty(maxValue)) {
|
maxValue = uniqueValue;
|
} else {
|
maxValue = getAimID(minValue, uniqueValue, 1);
|
}
|
sourceDataFse = sourceDataDte.getFieldSetEntity(k);
|
sourceDataFse.setTableName(tableName);
|
if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
|
addDte.addFieldSetEntity(sourceDataFse);
|
} else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
|
updateDte.addFieldSetEntity(sourceDataFse);
|
}
|
}
|
String filterUp = " ( ";
|
for (int i = 0; i < uniqueSignFieldName.length; i++) {
|
String uniqueSign = uniqueSignFieldName[i];
|
if (i > 0) {
|
filterUp += " and ";
|
|
}
|
filterUp += " " + uniqueSign + "=? ";
|
}
|
filterUp += " ) ";
|
|
String[] lower = new String[uniqueSignFieldName.length];
|
for (int i = 0; i < uniqueSignFieldName.length; i++) {
|
lower[i] = uniqueSignFieldName[i].toLowerCase();
|
}
|
targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true);
|
} else {
|
addDte = sourceDataDte;
|
addDte.getMeta().setTableName(new Object[]{tableName});
|
}
|
targetDao.addBatch(addDte);
|
}
|
syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
|
journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
|
minValue, maxValue,
|
targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
|
syncDataLogEntity.setErrorType(3);
|
syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
|
journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
|
minValue, maxValue,
|
targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
|
} finally {
|
dataSourceCount++;
|
}
|
}
|
}
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (targetDaoList != null && !targetDaoList.isEmpty()) {
|
for (int i1 = 0; i1 < targetDaoList.size(); i1++) {
|
Dao dao = targetDaoList.get(i1);
|
if (dao != null) {
|
dao.closeConnection();
|
}
|
}
|
}
|
|
}
|
|
return journalEntities;
|
} catch (Exception e) {
|
e.printStackTrace();
|
syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
|
syncDataLogEntity.setErrorType(2);
|
journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L
|
, e));
|
return journalEntities;
|
} finally {
|
if (journalEntities != null) {
|
for (int i = 0; i < journalEntities.size(); i++) {
|
journalManagerService.autoCreateJournal(journalEntities.get(i));
|
}
|
}
|
if (lockStatus && !StringUtils.isEmpty(lockKey)) {
|
customLock.unLock(lockKey);
|
}
|
if (sourceDao != null) {
|
sourceDao.closeConnection();
|
}
|
|
}
|
} finally {
|
if (lockStatus && !StringUtils.isEmpty(lockKey)) {
|
customLock.unLock(lockKey);
|
}
|
}
|
}
|
|
private static String[] getUniqueValue(String value) {
|
return value.split("\\*-\\*_~!");
|
}
|
|
private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) {
|
String name = "";
|
for (int i = 0; i < uniqueFieldNames.length; i++) {
|
if (i > 0) {
|
name += "*-*_~!";
|
}
|
name += item.getString(uniqueFieldNames[i].toLowerCase());
|
}
|
return name;
|
}
|
|
/**
|
* 日志只是记录一条,反正每次
|
* 覆盖 都是全部重新搞
|
* 增量 都是查询子库取了一个最大的时间作为标准
|
* 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩)
|
* 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理)
|
*
|
* @param encodedData
|
*/
|
@Deprecated
|
public synchronized void dataSync(String encodedData) {
|
TimeInterval timer = DateUtil.timer();
|
JSONArray jsonArray = null;
|
List<JournalEntity> journalEntities = new ArrayList<>();
|
try {
|
// 获取同步配置信息
|
DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
|
if (DataTableEntity.isEmpty(syncConfigDte)) {
|
throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
|
}
|
FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
|
if (FieldSetEntity.isEmpty(syncConfigFse)) {
|
throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
|
}
|
String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY);
|
String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY);
|
String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey));
|
String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey));
|
|
if (StringUtils.isEmpty(jsonArrayStr)) {
|
throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
|
}
|
jsonArray = JSON.parseArray(jsonArrayStr);
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity();
|
syncDataLogEntity.setErrorType(1);
|
syncDataLogEntity.setSyncInfo(encodedData);
|
JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs()
|
, e));
|
journalManagerService.autoCreateJournal(journalEntity);
|
}
|
if (jsonArray != null) {
|
try {
|
for (int i = 0; i < jsonArray.size(); i++) {
|
this.dataSync(jsonArray.getJSONObject(i));
|
}
|
} catch (Exception e) {
|
//不对错误做任何操作请在调用的方法中进行处理
|
e.printStackTrace();
|
}
|
}
|
}
|
|
private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
|
String dataSourceUid, long elapsedTime, Throwable throwable) {
|
return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable);
|
}
|
|
private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
|
String dataSourceUid, long elapsedTime) {
|
return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null);
|
}
|
|
private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
|
String maxId, String dataSourceUid, long elapsedTime) {
|
return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null);
|
}
|
|
private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
|
String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) {
|
JournalEntity journalEntity = new JournalEntity();
|
journalEntity.setType(3);
|
journalEntity.setDeal_flag(0);
|
journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0);
|
String configUUID = "";
|
if (syncDataLogEntity.getSyncInfo() != null) {
|
JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString());
|
configUUID = syncInfo.getString(CmnConst.UUID);
|
}
|
journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID);
|
journalEntity.setCount(dataCount);
|
journalEntity.setMin_id(minId);
|
journalEntity.setMax_id(maxId);
|
journalEntity.setData_source(dataSourceUid);
|
journalEntity.setSingle_duration(elapsedTime);
|
journalEntity.setOther_info(syncDataLogEntity.toString());
|
journalEntity.setCreated_utc_datetime(new Date());
|
if (journalEntity.getResult() != 1 && throwable != null) {
|
journalEntity.setError(journalManagerService.getStackTrace(throwable));
|
}
|
journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0);
|
return journalEntity;
|
}
|
|
private String getAimID(String s1, String s2, int sign) {
|
if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
|
return null;
|
}
|
if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
|
return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1;
|
}
|
String numberRegexp = "\\d{1,11}";
|
if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
|
boolean b = Long.parseLong(s1) > Long.parseLong(s2);
|
if (sign > 0) {
|
return b ? s1 : s2;
|
} else {
|
return b ? s2 : s1;
|
}
|
} else {
|
if (sign > 0) {
|
return s1.compareTo(s2) > 0 ? s1 : s2;
|
} else {
|
return s1.compareTo(s2) > 0 ? s2 : s1;
|
}
|
}
|
}
|
|
|
/**
|
* 处理唯一键集合
|
*
|
* @param sourceList 来源
|
* @param targetList 目标
|
* @param sign 1 来源单独存在;0 两者共有;-1 目标单独存在
|
* @return
|
*/
|
private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) {
|
List<T> sourceUniqueKeyList = Lists.newArrayList();
|
sourceUniqueKeyList.addAll(sourceList);
|
List<T> targetUniqueKeyList = Lists.newArrayList();
|
targetUniqueKeyList.addAll(targetList);
|
List<T> resultObjList;
|
if (sign == 1) {
|
sourceUniqueKeyList.removeAll(targetUniqueKeyList);
|
resultObjList = sourceUniqueKeyList;
|
} else if (sign == -1) {
|
targetUniqueKeyList.removeAll(sourceUniqueKeyList);
|
resultObjList = targetUniqueKeyList;
|
} else if (sign == 0) {
|
sourceUniqueKeyList.retainAll(targetUniqueKeyList);
|
resultObjList = sourceUniqueKeyList;
|
} else {
|
throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY);
|
}
|
return transferListType(resultObjList);
|
}
|
|
/**
|
* 转换list中obj为string
|
*
|
* @param list
|
* @return
|
*/
|
private <T> List<T> transferListType(List<T> list) {
|
List<T> resultList = Lists.newArrayList();
|
list.forEach(k -> {
|
if (k == null) {
|
resultList.add(null);
|
} else {
|
resultList.add(k);
|
}
|
});
|
return resultList;
|
}
|
|
/**
|
* 追加错误信息
|
*
|
* @param errorInfo
|
* @param e
|
*/
|
private void appendError(StringBuilder errorInfo, Throwable e) {
|
if (errorInfo.length() > 0) {
|
errorInfo.append("\n");
|
}
|
errorInfo.append(journalManagerService.getStackTrace(e));
|
}
|
}
|