package com.product.data.center.service; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import cn.hutool.core.util.ArrayUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.product.common.utils.StringUtils; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.entity.JournalEntity; import com.product.data.center.entity.SyncDataLogEntity; import com.product.data.center.utils.CustomLock; import com.product.data.center.utils.RSAUtil; import com.product.data.center.utils.SqlTransferUtil; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import com.product.datasource.entity.UpdateFilterEntity; import com.product.util.BaseUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import java.security.NoSuchAlgorithmException; import java.security.spec.InvalidKeySpecException; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; /** * Copyright © 6c * * @Date 2022年08月03日 9:59 * @Author 6c * @Description */ @Component public class DataSyncService { @Autowired private BaseDao baseDao; @Autowired private JournalManagerService journalManagerService; public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ"; // public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo"; // public static final String signPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJkgrw6CzG6odwYfbN8Yp77vjfs3ufjaR8S3xLQiZBmsMFv5T9Mkt4EkDpKZRx9BoAOf_a0zyhq8lbk78kDcKFUCAwEAAQ"; public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ"; private static CustomLock customLock = new CustomLock(); public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException { JSONArray jsonArray = new JSONArray(); // JSONObject obj1 = new JSONObject(); // obj1.put(CmnConst.TABLE_NAME, "TT_T_CO_EMP_DESC"); // obj1.put(CmnConst.UNIQUE_NAME, "EMP_NO"); // obj1.put(CmnConst.UPDATE_TIME, "updated_utc_datetime"); // jsonArray.add(obj1); JSONObject obj2 = new JSONObject(); obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND"); obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID"); jsonArray.add(obj2); String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey)); System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ")); String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey)); System.out.println(encodedData); // dataSync(encodedData); } public void runTimeTask(String uuid) { FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false); if (FieldSetEntity.isEmpty(fse)) { return; } JSONObject objects = BaseUtil.fieldSetEntityToJson(fse); markProcess(this.dataSync(objects, true)); } /** * 重新处理 * * @param logUUID */ public void reDeal(String logUUID) { asynRedeal(logUUID); } @Async public void asynRedeal(String logUUID) { FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false); if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) { return; } String otherInfo = logFse.getString(CmnConst.OTHER_INFO); SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class); Object syncInfo = syncDataLogEntity.getSyncInfo(); if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) { //解密错误类型 废弃日志逻辑代码未实现 dataSync((String) syncInfo); } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) { //运行过程中产生的错误 markProcess(dataSync((JSONObject) syncInfo)); } } /** * 标记日志 * * @param lists */ private void markProcess(List lists) { if (lists == null || lists.isEmpty()) { return; } long count = lists.stream().filter(item -> item.getResult() != 1).count(); try { SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class); JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo(); if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) { String tableName = syncInfo.getString(CmnConst.TABLE_NAME); baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" + " where type=3 and config_uuid in (select uuid from product_sys_data_sync_mes_sub where UPPER(table_name)=UPPER(?)) and (result=0 or deal_result=0)" , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), tableName}); } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } public List dataSync(JSONObject jsonArray) { return dataSync(jsonArray, false); } public List dataSync(JSONObject jsonObject, boolean timeTaskTrigger) { List journalEntities = new ArrayList<>(); SyncDataLogEntity syncDataLogEntity = new SyncDataLogEntity(); syncDataLogEntity.setSyncInfo(jsonObject); syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger); Calendar c1 = Calendar.getInstance(); boolean lockStatus = false; String lockKey = null; Dao sourceDao = null; try { try { String tableName = jsonObject.getString(CmnConst.TABLE_NAME); String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(","); if (ArrayUtil.isEmpty(uniqueSignFieldName)) { throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN); } // 获取同步配置信息 DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES); if (DataTableEntity.isEmpty(syncConfigDte)) { throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); } FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0); if (FieldSetEntity.isEmpty(syncConfigFse)) { throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); } lockKey = jsonObject.getString(CmnConst.TABLE_NAME); if (!customLock.tryLock(lockKey, 120)) { syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!"); journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName, null, null, null, 0L)); return null; } lockStatus = true; if (jsonObject == null || jsonObject.isEmpty()) { throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL); } // 获取来源连接和目标连接 if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) { throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL); } FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false); DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse); sourceDao = sourceDbe.getDao(); if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) { throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL); } DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true)); List targetDaoList = Lists.newArrayList(); FieldSetEntity targetDataSourceFse; for (int i = 0; i < targetDataSourceDte.getRows(); i++) { targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i); targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao()); } // 数据处理 String updateTimeFieldName; DataTableEntity sourceDataDte; FieldSetEntity sourceDataFse; DataTableEntity targetDataDte = null; List sourceUniqueKeyList;// 来源数据的唯一键集合 List targetUniqueKeyList;// 目标数据的唯一键集合 List sourceOnlyKeyList;// 来源数据单独存在的唯一键集合 List targetOnlyKeyList;// 目标数据单独存在的唯一键集合 List publicKeyList;// 两者都存在的唯一键集合 DataTableEntity addDte; DataTableEntity updateDte; // StringBuilder errorInfo = new StringBuilder(1024); int dataSourceCount = 0; try { tableName = tableName.toLowerCase(Locale.ROOT); // uniqueSignFieldName = uniqueSignFieldName.toLowerCase(Locale.ROOT); updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME); if (StringUtils.isEmpty(updateTimeFieldName)) { // 没有更新标识,执行覆盖对比更新 sourceDataDte = sourceDao.getList(tableName, ""); sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); //dao对应目标数据源信息的下标 for (Dao targetDao : targetDaoList) { addDte = new DataTableEntity(); updateDte = new DataTableEntity(); TimeInterval timer = DateUtil.timer(); try { targetDataDte = targetDao.getList(tableName, ""); if (!DataTableEntity.isEmpty(targetDataDte)) { targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1); targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1); publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0); for (int j = 0; j < sourceDataDte.getRows(); j++) { sourceDataFse = sourceDataDte.getFieldSetEntity(j); sourceDataFse.setTableName(tableName); if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { addDte.addFieldSetEntity(sourceDataFse); } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { updateDte.addFieldSetEntity(sourceDataFse); } } String filter = " ( "; for (int i = 0; i < uniqueSignFieldName.length; i++) { String uniqueSign = uniqueSignFieldName[i]; if (i > 0) { filter += " and "; } filter += " " + uniqueSign + "=? "; } filter += " ) "; String[] lower = new String[uniqueSignFieldName.length]; for (int i = 0; i < uniqueSignFieldName.length; i++) { lower[i] = uniqueSignFieldName[i].toLowerCase(); } targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true); if (!targetOnlyKeyList.isEmpty()) { List delParams = new ArrayList<>(); StringBuilder delFilter = new StringBuilder(); for (int i = 0; i < targetOnlyKeyList.size(); i++) { delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i)))); if (i > 0) { delFilter.append(" OR "); } delFilter.append(" ( "); for (int j = 0; j < uniqueSignFieldName.length; j++) { if (j > 0) { delFilter.append(" and "); } delFilter.append(uniqueSignFieldName[j]).append("=? "); } delFilter.append(" ) "); } targetDao.delete(tableName, delFilter.toString(), delParams.toArray()); } } else { addDte = sourceDataDte; addDte.getMeta().setTableName(new Object[]{tableName}); } targetDao.addBatch(addDte); syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName, targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs())); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid")); syncDataLogEntity.setErrorType(3); journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName, targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e)); } finally { dataSourceCount++; } } } else { // 存在更新标识,执行增量更新 updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT); dataSourceCount = 0; for (Dao targetDao : targetDaoList) { TimeInterval timer = DateUtil.timer(); StringBuilder sql = new StringBuilder(128); String maxValue = null; String minValue = null; int totalCount = 0; try { sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName); FieldSetEntity paramFse = targetDao.getOne(sql.toString()); StringBuilder filter = new StringBuilder(128); if (paramFse != null) { if (!StringUtils.isEmpty(paramFse.getString("max_value"))) { filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value"))) .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime())); } } sql.setLength(0); sql.append("select count(*) total_count from ").append(tableName); if (filter.length() > 0) { sql.append(" where ").append(filter); } paramFse = sourceDao.getOne(sql.toString()); totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count"); int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT); int totalPage = (int) Math.ceil((double) totalCount / batchCount); for (int j = 0; j < totalPage; j++) { addDte = new DataTableEntity(); updateDte = new DataTableEntity(); sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount); sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); sql.setLength(0); sql.append("select "); filter.setLength(0); for (int i = 0; i < sourceUniqueKeyList.size(); i++) { if (i > 0) { filter.append(" or "); } filter.append(" ( "); for (int k = 0; k < uniqueSignFieldName.length; k++) { if (i == 0) { if (k > 0) { sql.append(","); } sql.append(uniqueSignFieldName[k]); } if (k > 0) { filter.append(" AND "); } filter.append(uniqueSignFieldName[k]).append("= ? "); } filter.append(" ) "); } sql.append(" from ").append(tableName).append(" where ").append(filter); String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new); String[] parmas = ArrayUtil.addAll(objects); DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas); if (!DataTableEntity.isEmpty(paramDte)) { targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1); publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0); for (int k = 0; k < sourceDataDte.getRows(); k++) { sourceDataFse = sourceDataDte.getFieldSetEntity(k); String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName); //获取最小唯一值 if (StringUtils.isEmpty(minValue)) { minValue = uniqueValue; } else { minValue = getAimID(minValue, uniqueValue, 0); } //获取最大唯一值 if (StringUtils.isEmpty(maxValue)) { maxValue = uniqueValue; } else { maxValue = getAimID(minValue, uniqueValue, 1); } sourceDataFse = sourceDataDte.getFieldSetEntity(k); sourceDataFse.setTableName(tableName); if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { addDte.addFieldSetEntity(sourceDataFse); } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { updateDte.addFieldSetEntity(sourceDataFse); } } String filterUp = " ( "; for (int i = 0; i < uniqueSignFieldName.length; i++) { String uniqueSign = uniqueSignFieldName[i]; if (i > 0) { filterUp += " and "; } filterUp += " " + uniqueSign + "=? "; } filterUp += " ) "; String[] lower = new String[uniqueSignFieldName.length]; for (int i = 0; i < uniqueSignFieldName.length; i++) { lower[i] = uniqueSignFieldName[i].toLowerCase(); } targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true); } else { addDte = sourceDataDte; addDte.getMeta().setTableName(new Object[]{tableName}); } targetDao.addBatch(addDte); } syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName, minValue, maxValue, targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs())); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); syncDataLogEntity.setErrorType(3); syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid")); journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName, minValue, maxValue, targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e)); } finally { dataSourceCount++; } } } } catch (Exception e) { throw e; } finally { if (targetDaoList != null && !targetDaoList.isEmpty()) { for (int i1 = 0; i1 < targetDaoList.size(); i1++) { Dao dao = targetDaoList.get(i1); if (dao != null) { dao.closeConnection(); } } } } return journalEntities; } catch (Exception e) { e.printStackTrace(); syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); syncDataLogEntity.setErrorType(2); journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L , e)); return journalEntities; } finally { if (journalEntities != null) { for (int i = 0; i < journalEntities.size(); i++) { journalManagerService.autoCreateJournal(journalEntities.get(i)); } } if (lockStatus && !StringUtils.isEmpty(lockKey)) { customLock.unLock(lockKey); } if (sourceDao != null) { sourceDao.closeConnection(); } } } finally { if (lockStatus && !StringUtils.isEmpty(lockKey)) { customLock.unLock(lockKey); } } } private static String[] getUniqueValue(String value) { return value.split("\\*-\\*_~!"); } private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) { String name = ""; for (int i = 0; i < uniqueFieldNames.length; i++) { if (i > 0) { name += "*-*_~!"; } name += item.getString(uniqueFieldNames[i].toLowerCase()); } return name; } /** * 日志只是记录一条,反正每次 * 覆盖 都是全部重新搞 * 增量 都是查询子库取了一个最大的时间作为标准 * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩) * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理) * * @param encodedData */ @Deprecated public synchronized void dataSync(String encodedData) { TimeInterval timer = DateUtil.timer(); JSONArray jsonArray = null; List journalEntities = new ArrayList<>(); try { // 获取同步配置信息 DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES); if (DataTableEntity.isEmpty(syncConfigDte)) { throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); } FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0); if (FieldSetEntity.isEmpty(syncConfigFse)) { throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); } String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY); String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY); String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey)); String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey)); if (StringUtils.isEmpty(jsonArrayStr)) { throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL); } jsonArray = JSON.parseArray(jsonArrayStr); } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); SyncDataLogEntity syncDataLogEntity = new SyncDataLogEntity(); syncDataLogEntity.setErrorType(1); syncDataLogEntity.setSyncInfo(encodedData); JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs() , e)); journalManagerService.autoCreateJournal(journalEntity); } if (jsonArray != null) { try { for (int i = 0; i < jsonArray.size(); i++) { this.dataSync(jsonArray.getJSONObject(i)); } } catch (Exception e) { //不对错误做任何操作请在调用的方法中进行处理 e.printStackTrace(); } } } private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String dataSourceUid, long elapsedTime, Throwable throwable) { return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable); } private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String dataSourceUid, long elapsedTime) { return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null); } private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId, String maxId, String dataSourceUid, long elapsedTime) { return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null); } private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId, String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) { JournalEntity journalEntity = new JournalEntity(); journalEntity.setType(3); journalEntity.setDeal_flag(0); journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0); String configUUID = ""; if (syncDataLogEntity.getSyncInfo() != null) { JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString()); configUUID = syncInfo.getString(CmnConst.UUID); } journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID); journalEntity.setCount(dataCount); journalEntity.setMin_id(minId); journalEntity.setMax_id(maxId); journalEntity.setData_source(dataSourceUid); journalEntity.setSingle_duration(elapsedTime); journalEntity.setOther_info(syncDataLogEntity.toString()); journalEntity.setCreated_utc_datetime(new Date()); if (journalEntity.getResult() != 1 && throwable != null) { journalEntity.setError(journalManagerService.getStackTrace(throwable)); } journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0); return journalEntity; } private String getAimID(String s1, String s2, int sign) { if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) { return null; } if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) { return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1; } String numberRegexp = "\\d{1,11}"; if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) { boolean b = Long.parseLong(s1) > Long.parseLong(s2); if (sign > 0) { return b ? s1 : s2; } else { return b ? s2 : s1; } } else { if (sign > 0) { return s1.compareTo(s2) > 0 ? s1 : s2; } else { return s1.compareTo(s2) > 0 ? s2 : s1; } } } /** * 处理唯一键集合 * * @param sourceList 来源 * @param targetList 目标 * @param sign 1 来源单独存在;0 两者共有;-1 目标单独存在 * @return */ private List dealDataUniqueKeyList(List sourceList, List targetList, int sign) { List sourceUniqueKeyList = Lists.newArrayList(); sourceUniqueKeyList.addAll(sourceList); List targetUniqueKeyList = Lists.newArrayList(); targetUniqueKeyList.addAll(targetList); List resultObjList; if (sign == 1) { sourceUniqueKeyList.removeAll(targetUniqueKeyList); resultObjList = sourceUniqueKeyList; } else if (sign == -1) { targetUniqueKeyList.removeAll(sourceUniqueKeyList); resultObjList = targetUniqueKeyList; } else if (sign == 0) { sourceUniqueKeyList.retainAll(targetUniqueKeyList); resultObjList = sourceUniqueKeyList; } else { throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY); } return transferListType(resultObjList); } /** * 转换list中obj为string * * @param list * @return */ private List transferListType(List list) { List resultList = Lists.newArrayList(); list.forEach(k -> { if (k == null) { resultList.add(null); } else { resultList.add(k); } }); return resultList; } /** * 追加错误信息 * * @param errorInfo * @param e */ private void appendError(StringBuilder errorInfo, Throwable e) { if (errorInfo.length() > 0) { errorInfo.append("\n"); } errorInfo.append(journalManagerService.getStackTrace(e)); } }