| | |
| | | */ |
| | | @Component |
| | | public class DataSyncService { |
| | | @Autowired |
| | | private BaseDao baseDao; |
| | | @Autowired |
| | | private JournalManagerService journalManagerService; |
| | | @Autowired |
| | | private BaseDao baseDao; |
| | | @Autowired |
| | | private JournalManagerService journalManagerService; |
| | | |
| | | public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ"; |
| | | // public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo"; |
| | | public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ"; |
| | | // public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo"; |
| | | // public static final String signPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJkgrw6CzG6odwYfbN8Yp77vjfs3ufjaR8S3xLQiZBmsMFv5T9Mkt4EkDpKZRx9BoAOf_a0zyhq8lbk78kDcKFUCAwEAAQ"; |
| | | public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ"; |
| | | public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ"; |
| | | |
| | | private static CustomLock customLock = new CustomLock(); |
| | | private static CustomLock customLock = new CustomLock(); |
| | | |
| | | |
| | | public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException { |
| | | JSONArray jsonArray = new JSONArray(); |
| | | public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException { |
| | | JSONArray jsonArray = new JSONArray(); |
| | | |
| | | // JSONObject obj1 = new JSONObject(); |
| | | // obj1.put(CmnConst.TABLE_NAME, "TT_T_CO_EMP_DESC"); |
| | |
| | | // obj1.put(CmnConst.UPDATE_TIME, "updated_utc_datetime"); |
| | | // jsonArray.add(obj1); |
| | | |
| | | JSONObject obj2 = new JSONObject(); |
| | | obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND"); |
| | | obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID"); |
| | | jsonArray.add(obj2); |
| | | JSONObject obj2 = new JSONObject(); |
| | | obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND"); |
| | | obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID"); |
| | | jsonArray.add(obj2); |
| | | |
| | | |
| | | String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey)); |
| | | System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ")); |
| | | String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey)); |
| | | System.out.println(encodedData); |
| | | String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey)); |
| | | System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ")); |
| | | String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey)); |
| | | System.out.println(encodedData); |
| | | // dataSync(encodedData); |
| | | } |
| | | } |
| | | |
| | | |
| | | public void runTimeTask(String uuid) { |
| | | FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false); |
| | | if (FieldSetEntity.isEmpty(fse)) { |
| | | return; |
| | | } |
| | | JSONObject objects = BaseUtil.fieldSetEntityToJson(fse); |
| | | markProcess(this.dataSync(objects, true)); |
| | | } |
| | | public void runTimeTask(String uuid) { |
| | | FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false); |
| | | if (FieldSetEntity.isEmpty(fse)) { |
| | | return; |
| | | } |
| | | JSONObject objects = BaseUtil.fieldSetEntityToJson(fse); |
| | | markProcess(this.dataSync(objects, true)); |
| | | } |
| | | |
| | | /** |
| | | * 重新处理 |
| | | * |
| | | * @param logUUID |
| | | */ |
| | | public void reDeal(String logUUID) { |
| | | asynRedeal(logUUID); |
| | | } |
| | | /** |
| | | * 重新处理 |
| | | * |
| | | * @param logUUID |
| | | */ |
| | | public void reDeal(String logUUID) { |
| | | asynRedeal(logUUID); |
| | | } |
| | | |
| | | @Async |
| | | public void asynRedeal(String logUUID) { |
| | | FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false); |
| | | if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) { |
| | | return; |
| | | } |
| | | String otherInfo = logFse.getString(CmnConst.OTHER_INFO); |
| | | SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class); |
| | | Object syncInfo = syncDataLogEntity.getSyncInfo(); |
| | | if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) { |
| | | //解密错误类型 废弃日志逻辑代码未实现 |
| | | dataSync((String) syncInfo); |
| | | } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) { |
| | | //运行过程中产生的错误 |
| | | markProcess(dataSync((JSONObject) syncInfo)); |
| | | } |
| | | @Async |
| | | public void asynRedeal(String logUUID) { |
| | | FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false); |
| | | if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) { |
| | | return; |
| | | } |
| | | String otherInfo = logFse.getString(CmnConst.OTHER_INFO); |
| | | SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class); |
| | | Object syncInfo = syncDataLogEntity.getSyncInfo(); |
| | | if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) { |
| | | //解密错误类型 废弃日志逻辑代码未实现 |
| | | dataSync((String) syncInfo); |
| | | } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) { |
| | | //运行过程中产生的错误 |
| | | markProcess(dataSync((JSONObject) syncInfo)); |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 标记日志 |
| | | * |
| | | * @param lists |
| | | */ |
| | | private void markProcess(List<JournalEntity> lists) { |
| | | if (lists == null || lists.isEmpty()) { |
| | | return; |
| | | } |
| | | long count = lists.stream().filter(item -> item.getResult() != 1).count(); |
| | | try { |
| | | SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class); |
| | | JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo(); |
| | | if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) { |
| | | String tableName = syncInfo.getString(CmnConst.TABLE_NAME); |
| | | baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" + |
| | | " where type=3 and UPPER(other_info) like concat('%',UPPER(?),'%')" |
| | | , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), "\"table_name\":\"" + tableName + "\""}); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | } |
| | | } |
| | | /** |
| | | * 标记日志 |
| | | * |
| | | * @param lists |
| | | */ |
| | | private void markProcess(List<JournalEntity> lists) { |
| | | if (lists == null || lists.isEmpty()) { |
| | | return; |
| | | } |
| | | long count = lists.stream().filter(item -> item.getResult() != 1).count(); |
| | | try { |
| | | SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class); |
| | | JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo(); |
| | | if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) { |
| | | String tableName = syncInfo.getString(CmnConst.TABLE_NAME); |
| | | baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" + |
| | | " where type=3 and config_uuid in (select uuid from product_sys_data_sync_mes_sub where UPPER(table_name)=UPPER(?)) and (result=0 or deal_result=0)" |
| | | , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), tableName}); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | } |
| | | } |
| | | |
| | | |
| | | public List<JournalEntity> dataSync(JSONObject jsonArray) { |
| | | return dataSync(jsonArray, false); |
| | | } |
| | | public List<JournalEntity> dataSync(JSONObject jsonArray) { |
| | | return dataSync(jsonArray, false); |
| | | } |
| | | |
| | | public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) { |
| | | List<JournalEntity> journalEntities = new ArrayList<>(); |
| | | public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) { |
| | | List<JournalEntity> journalEntities = new ArrayList<>(); |
| | | |
| | | SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity(); |
| | | syncDataLogEntity.setSyncInfo(jsonObject); |
| | | syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger); |
| | | Calendar c1 = Calendar.getInstance(); |
| | | boolean lockStatus = false; |
| | | String lockKey = null; |
| | | Dao sourceDao = null; |
| | | try { |
| | | String tableName = jsonObject.getString(CmnConst.TABLE_NAME); |
| | | String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(","); |
| | | if (ArrayUtil.isEmpty(uniqueSignFieldName)) { |
| | | throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN); |
| | | } |
| | | // 获取同步配置信息 |
| | | DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES); |
| | | if (DataTableEntity.isEmpty(syncConfigDte)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0); |
| | | if (FieldSetEntity.isEmpty(syncConfigFse)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | lockKey = jsonObject.getString(CmnConst.TABLE_NAME); |
| | | if (!customLock.tryLock(lockKey)) { |
| | | syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!"); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName, |
| | | null, null, null, 0L)); |
| | | return null; |
| | | } |
| | | lockStatus = true; |
| | | if (jsonObject == null || jsonObject.isEmpty()) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL); |
| | | } |
| | | SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity(); |
| | | syncDataLogEntity.setSyncInfo(jsonObject); |
| | | syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger); |
| | | Calendar c1 = Calendar.getInstance(); |
| | | boolean lockStatus = false; |
| | | String lockKey = null; |
| | | Dao sourceDao = null; |
| | | try { |
| | | try { |
| | | String tableName = jsonObject.getString(CmnConst.TABLE_NAME); |
| | | String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(","); |
| | | if (ArrayUtil.isEmpty(uniqueSignFieldName)) { |
| | | throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN); |
| | | } |
| | | // 获取同步配置信息 |
| | | DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES); |
| | | if (DataTableEntity.isEmpty(syncConfigDte)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0); |
| | | if (FieldSetEntity.isEmpty(syncConfigFse)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | lockKey = jsonObject.getString(CmnConst.TABLE_NAME); |
| | | if (!customLock.tryLock(lockKey, 120)) { |
| | | syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!"); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName, |
| | | null, null, null, 0L)); |
| | | return null; |
| | | } |
| | | lockStatus = true; |
| | | if (jsonObject == null || jsonObject.isEmpty()) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL); |
| | | } |
| | | |
| | | // 获取来源连接和目标连接 |
| | | if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL); |
| | | } |
| | | FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false); |
| | | DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse); |
| | | sourceDao = sourceDbe.getDao(); |
| | | if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL); |
| | | } |
| | | DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true)); |
| | | List<Dao> targetDaoList = Lists.newArrayList(); |
| | | FieldSetEntity targetDataSourceFse; |
| | | for (int i = 0; i < targetDataSourceDte.getRows(); i++) { |
| | | targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i); |
| | | targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao()); |
| | | } |
| | | // 获取来源连接和目标连接 |
| | | if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL); |
| | | } |
| | | FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false); |
| | | DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse); |
| | | sourceDao = sourceDbe.getDao(); |
| | | if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL); |
| | | } |
| | | DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true)); |
| | | List<Dao> targetDaoList = Lists.newArrayList(); |
| | | FieldSetEntity targetDataSourceFse; |
| | | for (int i = 0; i < targetDataSourceDte.getRows(); i++) { |
| | | targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i); |
| | | targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao()); |
| | | } |
| | | |
| | | // 数据处理 |
| | | String updateTimeFieldName; |
| | | DataTableEntity sourceDataDte; |
| | | FieldSetEntity sourceDataFse; |
| | | DataTableEntity targetDataDte = null; |
| | | List<String> sourceUniqueKeyList;// 来源数据的唯一键集合 |
| | | List<String> targetUniqueKeyList;// 目标数据的唯一键集合 |
| | | List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合 |
| | | List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合 |
| | | List<String> publicKeyList;// 两者都存在的唯一键集合 |
| | | DataTableEntity addDte; |
| | | DataTableEntity updateDte; |
| | | // 数据处理 |
| | | String updateTimeFieldName; |
| | | DataTableEntity sourceDataDte; |
| | | FieldSetEntity sourceDataFse; |
| | | DataTableEntity targetDataDte = null; |
| | | List<String> sourceUniqueKeyList;// 来源数据的唯一键集合 |
| | | List<String> targetUniqueKeyList;// 目标数据的唯一键集合 |
| | | List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合 |
| | | List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合 |
| | | List<String> publicKeyList;// 两者都存在的唯一键集合 |
| | | DataTableEntity addDte; |
| | | DataTableEntity updateDte; |
| | | // StringBuilder errorInfo = new StringBuilder(1024); |
| | | int dataSourceCount = 0; |
| | | try { |
| | | tableName = tableName.toLowerCase(Locale.ROOT); |
| | | int dataSourceCount = 0; |
| | | try { |
| | | tableName = tableName.toLowerCase(Locale.ROOT); |
| | | // uniqueSignFieldName = uniqueSignFieldName.toLowerCase(Locale.ROOT); |
| | | updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME); |
| | | if (StringUtils.isEmpty(updateTimeFieldName)) { |
| | | // 没有更新标识,执行覆盖对比更新 |
| | | sourceDataDte = sourceDao.getList(tableName, ""); |
| | | sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | //dao对应目标数据源信息的下标 |
| | | for (Dao targetDao : targetDaoList) { |
| | | addDte = new DataTableEntity(); |
| | | updateDte = new DataTableEntity(); |
| | | TimeInterval timer = DateUtil.timer(); |
| | | try { |
| | | targetDataDte = targetDao.getList(tableName, ""); |
| | | if (!DataTableEntity.isEmpty(targetDataDte)) { |
| | | targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1); |
| | | targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1); |
| | | publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0); |
| | | for (int j = 0; j < sourceDataDte.getRows(); j++) { |
| | | sourceDataFse = sourceDataDte.getFieldSetEntity(j); |
| | | sourceDataFse.setTableName(tableName); |
| | | if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | addDte.addFieldSetEntity(sourceDataFse); |
| | | } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | updateDte.addFieldSetEntity(sourceDataFse); |
| | | } |
| | | } |
| | | String filter = " ( "; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | String uniqueSign = uniqueSignFieldName[i]; |
| | | if (i > 0) { |
| | | filter += " and "; |
| | | updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME); |
| | | if (StringUtils.isEmpty(updateTimeFieldName)) { |
| | | // 没有更新标识,执行覆盖对比更新 |
| | | sourceDataDte = sourceDao.getList(tableName, ""); |
| | | sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | //dao对应目标数据源信息的下标 |
| | | for (Dao targetDao : targetDaoList) { |
| | | addDte = new DataTableEntity(); |
| | | updateDte = new DataTableEntity(); |
| | | TimeInterval timer = DateUtil.timer(); |
| | | try { |
| | | targetDataDte = targetDao.getList(tableName, ""); |
| | | if (!DataTableEntity.isEmpty(targetDataDte)) { |
| | | targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1); |
| | | targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1); |
| | | publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0); |
| | | for (int j = 0; j < sourceDataDte.getRows(); j++) { |
| | | sourceDataFse = sourceDataDte.getFieldSetEntity(j); |
| | | sourceDataFse.setTableName(tableName); |
| | | if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | addDte.addFieldSetEntity(sourceDataFse); |
| | | } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | updateDte.addFieldSetEntity(sourceDataFse); |
| | | } |
| | | } |
| | | String filter = " ( "; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | String uniqueSign = uniqueSignFieldName[i]; |
| | | if (i > 0) { |
| | | filter += " and "; |
| | | |
| | | } |
| | | filter += " " + uniqueSign + "=? "; |
| | | } |
| | | filter += " ) "; |
| | | String[] lower = new String[uniqueSignFieldName.length]; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | lower[i] = uniqueSignFieldName[i].toLowerCase(); |
| | | } |
| | | targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true); |
| | | if (!targetOnlyKeyList.isEmpty()) { |
| | | List<Object> delParams = new ArrayList<>(); |
| | | StringBuilder delFilter = new StringBuilder(); |
| | | for (int i = 0; i < targetOnlyKeyList.size(); i++) { |
| | | delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i)))); |
| | | if (i > 0) { |
| | | delFilter.append(" OR "); |
| | | } |
| | | delFilter.append(" ( "); |
| | | for (int j = 0; j < uniqueSignFieldName.length; j++) { |
| | | if (j > 0) { |
| | | delFilter.append(" and "); |
| | | } |
| | | delFilter.append(uniqueSignFieldName[j]).append("=? "); |
| | | } |
| | | delFilter.append(" ) "); |
| | | } |
| | | } |
| | | filter += " " + uniqueSign + "=? "; |
| | | } |
| | | filter += " ) "; |
| | | String[] lower = new String[uniqueSignFieldName.length]; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | lower[i] = uniqueSignFieldName[i].toLowerCase(); |
| | | } |
| | | targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true); |
| | | if (!targetOnlyKeyList.isEmpty()) { |
| | | List<Object> delParams = new ArrayList<>(); |
| | | StringBuilder delFilter = new StringBuilder(); |
| | | for (int i = 0; i < targetOnlyKeyList.size(); i++) { |
| | | delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i)))); |
| | | if (i > 0) { |
| | | delFilter.append(" OR "); |
| | | } |
| | | delFilter.append(" ( "); |
| | | for (int j = 0; j < uniqueSignFieldName.length; j++) { |
| | | if (j > 0) { |
| | | delFilter.append(" and "); |
| | | } |
| | | delFilter.append(uniqueSignFieldName[j]).append("=? "); |
| | | } |
| | | delFilter.append(" ) "); |
| | | } |
| | | |
| | | targetDao.delete(tableName, delFilter.toString(), delParams.toArray()); |
| | | } |
| | | } else { |
| | | addDte = sourceDataDte; |
| | | addDte.getMeta().setTableName(new Object[]{tableName}); |
| | | } |
| | | targetDao.addBatch(addDte); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs())); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid")); |
| | | syncDataLogEntity.setErrorType(3); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e)); |
| | | } finally { |
| | | dataSourceCount++; |
| | | } |
| | | } |
| | | } else { |
| | | // 存在更新标识,执行增量更新 |
| | | updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT); |
| | | dataSourceCount = 0; |
| | | for (Dao targetDao : targetDaoList) { |
| | | TimeInterval timer = DateUtil.timer(); |
| | | StringBuilder sql = new StringBuilder(128); |
| | | String maxValue = null; |
| | | String minValue = null; |
| | | int totalCount = 0; |
| | | try { |
| | | sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName); |
| | | FieldSetEntity paramFse = targetDao.getOne(sql.toString()); |
| | | StringBuilder filter = new StringBuilder(128); |
| | | if (paramFse != null) { |
| | | if (!StringUtils.isEmpty(paramFse.getString("max_value"))) { |
| | | filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value"))) |
| | | .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime())); |
| | | } |
| | | } |
| | | sql.setLength(0); |
| | | sql.append("select count(*) total_count from ").append(tableName); |
| | | if (filter.length() > 0) { |
| | | sql.append(" where ").append(filter); |
| | | } |
| | | paramFse = sourceDao.getOne(sql.toString()); |
| | | totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count"); |
| | | int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT); |
| | | int totalPage = (int) Math.ceil((double) totalCount / batchCount); |
| | | for (int j = 0; j < totalPage; j++) { |
| | | addDte = new DataTableEntity(); |
| | | updateDte = new DataTableEntity(); |
| | | sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount); |
| | | sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | sql.setLength(0); |
| | | sql.append("select "); |
| | | filter.setLength(0); |
| | | targetDao.delete(tableName, delFilter.toString(), delParams.toArray()); |
| | | } |
| | | } else { |
| | | addDte = sourceDataDte; |
| | | addDte.getMeta().setTableName(new Object[]{tableName}); |
| | | } |
| | | targetDao.addBatch(addDte); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs())); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid")); |
| | | syncDataLogEntity.setErrorType(3); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e)); |
| | | } finally { |
| | | dataSourceCount++; |
| | | } |
| | | } |
| | | } else { |
| | | // 存在更新标识,执行增量更新 |
| | | updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT); |
| | | dataSourceCount = 0; |
| | | for (Dao targetDao : targetDaoList) { |
| | | TimeInterval timer = DateUtil.timer(); |
| | | StringBuilder sql = new StringBuilder(128); |
| | | String maxValue = null; |
| | | String minValue = null; |
| | | int totalCount = 0; |
| | | try { |
| | | sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName); |
| | | FieldSetEntity paramFse = targetDao.getOne(sql.toString()); |
| | | StringBuilder filter = new StringBuilder(128); |
| | | if (paramFse != null) { |
| | | if (!StringUtils.isEmpty(paramFse.getString("max_value"))) { |
| | | filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value"))) |
| | | .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime())); |
| | | } |
| | | } |
| | | sql.setLength(0); |
| | | sql.append("select count(*) total_count from ").append(tableName); |
| | | if (filter.length() > 0) { |
| | | sql.append(" where ").append(filter); |
| | | } |
| | | paramFse = sourceDao.getOne(sql.toString()); |
| | | totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count"); |
| | | int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT); |
| | | int totalPage = (int) Math.ceil((double) totalCount / batchCount); |
| | | for (int j = 0; j < totalPage; j++) { |
| | | addDte = new DataTableEntity(); |
| | | updateDte = new DataTableEntity(); |
| | | sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount); |
| | | sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | sql.setLength(0); |
| | | sql.append("select "); |
| | | filter.setLength(0); |
| | | |
| | | for (int i = 0; i < sourceUniqueKeyList.size(); i++) { |
| | | if (i > 0) { |
| | | filter.append(" or "); |
| | | } |
| | | filter.append(" ( "); |
| | | for (int k = 0; k < uniqueSignFieldName.length; k++) { |
| | | if (i == 0) { |
| | | if (k > 0) { |
| | | sql.append(","); |
| | | } |
| | | sql.append(uniqueSignFieldName[k]); |
| | | for (int i = 0; i < sourceUniqueKeyList.size(); i++) { |
| | | if (i > 0) { |
| | | filter.append(" or "); |
| | | } |
| | | filter.append(" ( "); |
| | | for (int k = 0; k < uniqueSignFieldName.length; k++) { |
| | | if (i == 0) { |
| | | if (k > 0) { |
| | | sql.append(","); |
| | | } |
| | | sql.append(uniqueSignFieldName[k]); |
| | | |
| | | } |
| | | if (k > 0) { |
| | | filter.append(" AND "); |
| | | } |
| | | filter.append(uniqueSignFieldName[k]).append("= ? "); |
| | | } |
| | | filter.append(" ) "); |
| | | } |
| | | } |
| | | if (k > 0) { |
| | | filter.append(" AND "); |
| | | } |
| | | filter.append(uniqueSignFieldName[k]).append("= ? "); |
| | | } |
| | | filter.append(" ) "); |
| | | } |
| | | |
| | | sql.append(" from ").append(tableName).append(" where ").append(filter); |
| | | sql.append(" from ").append(tableName).append(" where ").append(filter); |
| | | |
| | | String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new); |
| | | String[] parmas = ArrayUtil.addAll(objects); |
| | | DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas); |
| | | if (!DataTableEntity.isEmpty(paramDte)) { |
| | | targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1); |
| | | publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0); |
| | | for (int k = 0; k < sourceDataDte.getRows(); k++) { |
| | | sourceDataFse = sourceDataDte.getFieldSetEntity(k); |
| | | String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName); |
| | | //获取最小唯一值 |
| | | if (StringUtils.isEmpty(minValue)) { |
| | | minValue = uniqueValue; |
| | | } else { |
| | | minValue = getAimID(minValue, uniqueValue, 0); |
| | | } |
| | | //获取最大唯一值 |
| | | if (StringUtils.isEmpty(maxValue)) { |
| | | maxValue = uniqueValue; |
| | | } else { |
| | | maxValue = getAimID(minValue, uniqueValue, 1); |
| | | } |
| | | sourceDataFse = sourceDataDte.getFieldSetEntity(k); |
| | | sourceDataFse.setTableName(tableName); |
| | | if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | addDte.addFieldSetEntity(sourceDataFse); |
| | | } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | updateDte.addFieldSetEntity(sourceDataFse); |
| | | } |
| | | } |
| | | String filterUp = " ( "; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | String uniqueSign = uniqueSignFieldName[i]; |
| | | if (i > 0) { |
| | | filterUp += " and "; |
| | | String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new); |
| | | String[] parmas = ArrayUtil.addAll(objects); |
| | | DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas); |
| | | if (!DataTableEntity.isEmpty(paramDte)) { |
| | | targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList()); |
| | | sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1); |
| | | publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0); |
| | | for (int k = 0; k < sourceDataDte.getRows(); k++) { |
| | | sourceDataFse = sourceDataDte.getFieldSetEntity(k); |
| | | String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName); |
| | | //获取最小唯一值 |
| | | if (StringUtils.isEmpty(minValue)) { |
| | | minValue = uniqueValue; |
| | | } else { |
| | | minValue = getAimID(minValue, uniqueValue, 0); |
| | | } |
| | | //获取最大唯一值 |
| | | if (StringUtils.isEmpty(maxValue)) { |
| | | maxValue = uniqueValue; |
| | | } else { |
| | | maxValue = getAimID(minValue, uniqueValue, 1); |
| | | } |
| | | sourceDataFse = sourceDataDte.getFieldSetEntity(k); |
| | | sourceDataFse.setTableName(tableName); |
| | | if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | addDte.addFieldSetEntity(sourceDataFse); |
| | | } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) { |
| | | updateDte.addFieldSetEntity(sourceDataFse); |
| | | } |
| | | } |
| | | String filterUp = " ( "; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | String uniqueSign = uniqueSignFieldName[i]; |
| | | if (i > 0) { |
| | | filterUp += " and "; |
| | | |
| | | } |
| | | filterUp += " " + uniqueSign + "=? "; |
| | | } |
| | | filterUp += " ) "; |
| | | } |
| | | filterUp += " " + uniqueSign + "=? "; |
| | | } |
| | | filterUp += " ) "; |
| | | |
| | | String[] lower = new String[uniqueSignFieldName.length]; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | lower[i] = uniqueSignFieldName[i].toLowerCase(); |
| | | } |
| | | targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true); |
| | | } else { |
| | | addDte = sourceDataDte; |
| | | addDte.getMeta().setTableName(new Object[]{tableName}); |
| | | } |
| | | targetDao.addBatch(addDte); |
| | | } |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName, |
| | | minValue, maxValue, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs())); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | syncDataLogEntity.setErrorType(3); |
| | | syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid")); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName, |
| | | minValue, maxValue, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e)); |
| | | } finally { |
| | | dataSourceCount++; |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (targetDaoList != null && !targetDaoList.isEmpty()) { |
| | | for (int i1 = 0; i1 < targetDaoList.size(); i1++) { |
| | | Dao dao = targetDaoList.get(i1); |
| | | if (dao != null) { |
| | | dao.closeConnection(); |
| | | } |
| | | } |
| | | } |
| | | String[] lower = new String[uniqueSignFieldName.length]; |
| | | for (int i = 0; i < uniqueSignFieldName.length; i++) { |
| | | lower[i] = uniqueSignFieldName[i].toLowerCase(); |
| | | } |
| | | targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true); |
| | | } else { |
| | | addDte = sourceDataDte; |
| | | addDte.getMeta().setTableName(new Object[]{tableName}); |
| | | } |
| | | targetDao.addBatch(addDte); |
| | | } |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName, |
| | | minValue, maxValue, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs())); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | syncDataLogEntity.setErrorType(3); |
| | | syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid")); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName, |
| | | minValue, maxValue, |
| | | targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e)); |
| | | } finally { |
| | | dataSourceCount++; |
| | | } |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (targetDaoList != null && !targetDaoList.isEmpty()) { |
| | | for (int i1 = 0; i1 < targetDaoList.size(); i1++) { |
| | | Dao dao = targetDaoList.get(i1); |
| | | if (dao != null) { |
| | | dao.closeConnection(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | return journalEntities; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | syncDataLogEntity.setErrorType(2); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L |
| | | , e)); |
| | | return journalEntities; |
| | | } finally { |
| | | if (journalEntities != null) { |
| | | for (int i = 0; i < journalEntities.size(); i++) { |
| | | journalManagerService.autoCreateJournal(journalEntities.get(i)); |
| | | } |
| | | } |
| | | if (lockStatus && !StringUtils.isEmpty(lockKey)) { |
| | | customLock.unLock(lockKey); |
| | | } |
| | | if(sourceDao!=null){ |
| | | sourceDao.closeConnection(); |
| | | } |
| | | return journalEntities; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity); |
| | | syncDataLogEntity.setErrorType(2); |
| | | journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L |
| | | , e)); |
| | | return journalEntities; |
| | | } finally { |
| | | if (journalEntities != null) { |
| | | for (int i = 0; i < journalEntities.size(); i++) { |
| | | journalManagerService.autoCreateJournal(journalEntities.get(i)); |
| | | } |
| | | } |
| | | if (lockStatus && !StringUtils.isEmpty(lockKey)) { |
| | | customLock.unLock(lockKey); |
| | | } |
| | | if (sourceDao != null) { |
| | | sourceDao.closeConnection(); |
| | | } |
| | | |
| | | } |
| | | } |
| | | } |
| | | } finally { |
| | | if (lockStatus && !StringUtils.isEmpty(lockKey)) { |
| | | customLock.unLock(lockKey); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private static String[] getUniqueValue(String value) { |
| | | return value.split("\\*-\\*_~!"); |
| | | } |
| | | private static String[] getUniqueValue(String value) { |
| | | return value.split("\\*-\\*_~!"); |
| | | } |
| | | |
| | | private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) { |
| | | String name = ""; |
| | | for (int i = 0; i < uniqueFieldNames.length; i++) { |
| | | if (i > 0) { |
| | | name += "*-*_~!"; |
| | | } |
| | | name += item.getString(uniqueFieldNames[i].toLowerCase()); |
| | | } |
| | | return name; |
| | | } |
| | | private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) { |
| | | String name = ""; |
| | | for (int i = 0; i < uniqueFieldNames.length; i++) { |
| | | if (i > 0) { |
| | | name += "*-*_~!"; |
| | | } |
| | | name += item.getString(uniqueFieldNames[i].toLowerCase()); |
| | | } |
| | | return name; |
| | | } |
| | | |
| | | /** |
| | | * 日志只是记录一条,反正每次 |
| | | * 覆盖 都是全部重新搞 |
| | | * 增量 都是查询子库取了一个最大的时间作为标准 |
| | | * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩) |
| | | * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理) |
| | | * |
| | | * @param encodedData |
| | | */ |
| | | @Deprecated |
| | | public synchronized void dataSync(String encodedData) { |
| | | TimeInterval timer = DateUtil.timer(); |
| | | JSONArray jsonArray = null; |
| | | List<JournalEntity> journalEntities = new ArrayList<>(); |
| | | try { |
| | | // 获取同步配置信息 |
| | | DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES); |
| | | if (DataTableEntity.isEmpty(syncConfigDte)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0); |
| | | if (FieldSetEntity.isEmpty(syncConfigFse)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY); |
| | | String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY); |
| | | String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey)); |
| | | String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey)); |
| | | /** |
| | | * 日志只是记录一条,反正每次 |
| | | * 覆盖 都是全部重新搞 |
| | | * 增量 都是查询子库取了一个最大的时间作为标准 |
| | | * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩) |
| | | * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理) |
| | | * |
| | | * @param encodedData |
| | | */ |
| | | @Deprecated |
| | | public synchronized void dataSync(String encodedData) { |
| | | TimeInterval timer = DateUtil.timer(); |
| | | JSONArray jsonArray = null; |
| | | List<JournalEntity> journalEntities = new ArrayList<>(); |
| | | try { |
| | | // 获取同步配置信息 |
| | | DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES); |
| | | if (DataTableEntity.isEmpty(syncConfigDte)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0); |
| | | if (FieldSetEntity.isEmpty(syncConfigFse)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL); |
| | | } |
| | | String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY); |
| | | String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY); |
| | | String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey)); |
| | | String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey)); |
| | | |
| | | if (StringUtils.isEmpty(jsonArrayStr)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL); |
| | | } |
| | | jsonArray = JSON.parseArray(jsonArrayStr); |
| | | } catch (Exception e) { |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity(); |
| | | syncDataLogEntity.setErrorType(1); |
| | | syncDataLogEntity.setSyncInfo(encodedData); |
| | | JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs() |
| | | , e)); |
| | | journalManagerService.autoCreateJournal(journalEntity); |
| | | } |
| | | if (jsonArray != null) { |
| | | try { |
| | | for (int i = 0; i < jsonArray.size(); i++) { |
| | | this.dataSync(jsonArray.getJSONObject(i)); |
| | | } |
| | | } catch (Exception e) { |
| | | //不对错误做任何操作请在调用的方法中进行处理 |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | if (StringUtils.isEmpty(jsonArrayStr)) { |
| | | throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL); |
| | | } |
| | | jsonArray = JSON.parseArray(jsonArrayStr); |
| | | } catch (Exception e) { |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity(); |
| | | syncDataLogEntity.setErrorType(1); |
| | | syncDataLogEntity.setSyncInfo(encodedData); |
| | | JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs() |
| | | , e)); |
| | | journalManagerService.autoCreateJournal(journalEntity); |
| | | } |
| | | if (jsonArray != null) { |
| | | try { |
| | | for (int i = 0; i < jsonArray.size(); i++) { |
| | | this.dataSync(jsonArray.getJSONObject(i)); |
| | | } |
| | | } catch (Exception e) { |
| | | //不对错误做任何操作请在调用的方法中进行处理 |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, |
| | | String dataSourceUid, long elapsedTime, Throwable throwable) { |
| | | return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable); |
| | | } |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, |
| | | String dataSourceUid, long elapsedTime, Throwable throwable) { |
| | | return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable); |
| | | } |
| | | |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, |
| | | String dataSourceUid, long elapsedTime) { |
| | | return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null); |
| | | } |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, |
| | | String dataSourceUid, long elapsedTime) { |
| | | return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null); |
| | | } |
| | | |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId, |
| | | String maxId, String dataSourceUid, long elapsedTime) { |
| | | return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null); |
| | | } |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId, |
| | | String maxId, String dataSourceUid, long elapsedTime) { |
| | | return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null); |
| | | } |
| | | |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId, |
| | | String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) { |
| | | JournalEntity journalEntity = new JournalEntity(); |
| | | journalEntity.setType(3); |
| | | journalEntity.setDeal_flag(0); |
| | | journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0); |
| | | String configUUID = ""; |
| | | if (syncDataLogEntity.getSyncInfo() != null) { |
| | | JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString()); |
| | | configUUID = syncInfo.getString(CmnConst.UUID); |
| | | } |
| | | journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID); |
| | | journalEntity.setCount(dataCount); |
| | | journalEntity.setMin_id(minId); |
| | | journalEntity.setMax_id(maxId); |
| | | journalEntity.setData_source(dataSourceUid); |
| | | journalEntity.setSingle_duration(elapsedTime); |
| | | journalEntity.setOther_info(syncDataLogEntity.toString()); |
| | | journalEntity.setCreated_utc_datetime(new Date()); |
| | | if (journalEntity.getResult() != 1 && throwable != null) { |
| | | journalEntity.setError(journalManagerService.getStackTrace(throwable)); |
| | | } |
| | | journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0); |
| | | return journalEntity; |
| | | } |
| | | private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId, |
| | | String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) { |
| | | JournalEntity journalEntity = new JournalEntity(); |
| | | journalEntity.setType(3); |
| | | journalEntity.setDeal_flag(0); |
| | | journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0); |
| | | String configUUID = ""; |
| | | if (syncDataLogEntity.getSyncInfo() != null) { |
| | | JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString()); |
| | | configUUID = syncInfo.getString(CmnConst.UUID); |
| | | } |
| | | journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID); |
| | | journalEntity.setCount(dataCount); |
| | | journalEntity.setMin_id(minId); |
| | | journalEntity.setMax_id(maxId); |
| | | journalEntity.setData_source(dataSourceUid); |
| | | journalEntity.setSingle_duration(elapsedTime); |
| | | journalEntity.setOther_info(syncDataLogEntity.toString()); |
| | | journalEntity.setCreated_utc_datetime(new Date()); |
| | | if (journalEntity.getResult() != 1 && throwable != null) { |
| | | journalEntity.setError(journalManagerService.getStackTrace(throwable)); |
| | | } |
| | | journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0); |
| | | return journalEntity; |
| | | } |
| | | |
| | | private String getAimID(String s1, String s2, int sign) { |
| | | if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) { |
| | | return null; |
| | | } |
| | | if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) { |
| | | return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1; |
| | | } |
| | | String numberRegexp = "\\d{1,11}"; |
| | | if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) { |
| | | boolean b = Long.parseLong(s1) > Long.parseLong(s2); |
| | | if (sign > 0) { |
| | | return b ? s1 : s2; |
| | | } else { |
| | | return b ? s2 : s1; |
| | | } |
| | | } else { |
| | | if (sign > 0) { |
| | | return s1.compareTo(s2) > 0 ? s1 : s2; |
| | | } else { |
| | | return s1.compareTo(s2) > 0 ? s2 : s1; |
| | | } |
| | | } |
| | | } |
| | | private String getAimID(String s1, String s2, int sign) { |
| | | if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) { |
| | | return null; |
| | | } |
| | | if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) { |
| | | return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1; |
| | | } |
| | | String numberRegexp = "\\d{1,11}"; |
| | | if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) { |
| | | boolean b = Long.parseLong(s1) > Long.parseLong(s2); |
| | | if (sign > 0) { |
| | | return b ? s1 : s2; |
| | | } else { |
| | | return b ? s2 : s1; |
| | | } |
| | | } else { |
| | | if (sign > 0) { |
| | | return s1.compareTo(s2) > 0 ? s1 : s2; |
| | | } else { |
| | | return s1.compareTo(s2) > 0 ? s2 : s1; |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 处理唯一键集合 |
| | | * |
| | | * @param sourceList 来源 |
| | | * @param targetList 目标 |
| | | * @param sign 1 来源单独存在;0 两者共有;-1 目标单独存在 |
| | | * @return |
| | | */ |
| | | private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) { |
| | | List<T> sourceUniqueKeyList = Lists.newArrayList(); |
| | | sourceUniqueKeyList.addAll(sourceList); |
| | | List<T> targetUniqueKeyList = Lists.newArrayList(); |
| | | targetUniqueKeyList.addAll(targetList); |
| | | List<T> resultObjList; |
| | | if (sign == 1) { |
| | | sourceUniqueKeyList.removeAll(targetUniqueKeyList); |
| | | resultObjList = sourceUniqueKeyList; |
| | | } else if (sign == -1) { |
| | | targetUniqueKeyList.removeAll(sourceUniqueKeyList); |
| | | resultObjList = targetUniqueKeyList; |
| | | } else if (sign == 0) { |
| | | sourceUniqueKeyList.retainAll(targetUniqueKeyList); |
| | | resultObjList = sourceUniqueKeyList; |
| | | } else { |
| | | throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY); |
| | | } |
| | | return transferListType(resultObjList); |
| | | } |
| | | /** |
| | | * 处理唯一键集合 |
| | | * |
| | | * @param sourceList 来源 |
| | | * @param targetList 目标 |
| | | * @param sign 1 来源单独存在;0 两者共有;-1 目标单独存在 |
| | | * @return |
| | | */ |
| | | private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) { |
| | | List<T> sourceUniqueKeyList = Lists.newArrayList(); |
| | | sourceUniqueKeyList.addAll(sourceList); |
| | | List<T> targetUniqueKeyList = Lists.newArrayList(); |
| | | targetUniqueKeyList.addAll(targetList); |
| | | List<T> resultObjList; |
| | | if (sign == 1) { |
| | | sourceUniqueKeyList.removeAll(targetUniqueKeyList); |
| | | resultObjList = sourceUniqueKeyList; |
| | | } else if (sign == -1) { |
| | | targetUniqueKeyList.removeAll(sourceUniqueKeyList); |
| | | resultObjList = targetUniqueKeyList; |
| | | } else if (sign == 0) { |
| | | sourceUniqueKeyList.retainAll(targetUniqueKeyList); |
| | | resultObjList = sourceUniqueKeyList; |
| | | } else { |
| | | throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY); |
| | | } |
| | | return transferListType(resultObjList); |
| | | } |
| | | |
| | | /** |
| | | * 转换list中obj为string |
| | | * |
| | | * @param list |
| | | * @return |
| | | */ |
| | | private <T> List<T> transferListType(List<T> list) { |
| | | List<T> resultList = Lists.newArrayList(); |
| | | list.forEach(k -> { |
| | | if (k == null) { |
| | | resultList.add(null); |
| | | } else { |
| | | resultList.add(k); |
| | | } |
| | | }); |
| | | return resultList; |
| | | } |
| | | /** |
| | | * 转换list中obj为string |
| | | * |
| | | * @param list |
| | | * @return |
| | | */ |
| | | private <T> List<T> transferListType(List<T> list) { |
| | | List<T> resultList = Lists.newArrayList(); |
| | | list.forEach(k -> { |
| | | if (k == null) { |
| | | resultList.add(null); |
| | | } else { |
| | | resultList.add(k); |
| | | } |
| | | }); |
| | | return resultList; |
| | | } |
| | | |
| | | /** |
| | | * 追加错误信息 |
| | | * |
| | | * @param errorInfo |
| | | * @param e |
| | | */ |
| | | private void appendError(StringBuilder errorInfo, Throwable e) { |
| | | if (errorInfo.length() > 0) { |
| | | errorInfo.append("\n"); |
| | | } |
| | | errorInfo.append(journalManagerService.getStackTrace(e)); |
| | | } |
| | | /** |
| | | * 追加错误信息 |
| | | * |
| | | * @param errorInfo |
| | | * @param e |
| | | */ |
| | | private void appendError(StringBuilder errorInfo, Throwable e) { |
| | | if (errorInfo.length() > 0) { |
| | | errorInfo.append("\n"); |
| | | } |
| | | errorInfo.append(journalManagerService.getStackTrace(e)); |
| | | } |
| | | } |