T100738
2024-04-16 eeb86aaf2f73a02600195ce2637dde6caf858a88
product-server-data-center/src/main/java/com/product/data/center/service/DataSyncService.java
@@ -43,21 +43,21 @@
 */
@Component
public class DataSyncService {
    @Autowired
    private BaseDao baseDao;
    @Autowired
    private JournalManagerService journalManagerService;
   @Autowired
   private BaseDao baseDao;
   @Autowired
   private JournalManagerService journalManagerService;
    public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ";
    //    public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo";
   public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ";
   //    public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo";
//    public static final String signPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJkgrw6CzG6odwYfbN8Yp77vjfs3ufjaR8S3xLQiZBmsMFv5T9Mkt4EkDpKZRx9BoAOf_a0zyhq8lbk78kDcKFUCAwEAAQ";
    public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ";
   public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ";
    private static CustomLock customLock = new CustomLock();
   private static CustomLock customLock = new CustomLock();
    public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException {
        JSONArray jsonArray = new JSONArray();
   public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException {
      JSONArray jsonArray = new JSONArray();
//        JSONObject obj1 = new JSONObject();
//        obj1.put(CmnConst.TABLE_NAME, "TT_T_CO_EMP_DESC");
@@ -65,601 +65,607 @@
//        obj1.put(CmnConst.UPDATE_TIME, "updated_utc_datetime");
//        jsonArray.add(obj1);
        JSONObject obj2 = new JSONObject();
        obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND");
        obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID");
        jsonArray.add(obj2);
      JSONObject obj2 = new JSONObject();
      obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND");
      obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID");
      jsonArray.add(obj2);
        String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey));
        System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ"));
        String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey));
        System.out.println(encodedData);
      String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey));
      System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ"));
      String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey));
      System.out.println(encodedData);
//        dataSync(encodedData);
    }
   }
    public void runTimeTask(String uuid) {
        FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false);
        if (FieldSetEntity.isEmpty(fse)) {
            return;
        }
        JSONObject objects = BaseUtil.fieldSetEntityToJson(fse);
        markProcess(this.dataSync(objects, true));
    }
   public void runTimeTask(String uuid) {
      FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false);
      if (FieldSetEntity.isEmpty(fse)) {
         return;
      }
      JSONObject objects = BaseUtil.fieldSetEntityToJson(fse);
      markProcess(this.dataSync(objects, true));
   }
    /**
     * 重新处理
     *
     * @param logUUID
     */
    public void reDeal(String logUUID) {
        asynRedeal(logUUID);
    }
   /**
    * 重新处理
    *
    * @param logUUID
    */
   public void reDeal(String logUUID) {
      asynRedeal(logUUID);
   }
    @Async
    public void asynRedeal(String logUUID) {
        FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false);
        if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) {
            return;
        }
        String otherInfo = logFse.getString(CmnConst.OTHER_INFO);
        SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class);
        Object syncInfo = syncDataLogEntity.getSyncInfo();
        if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) {
            //解密错误类型 废弃日志逻辑代码未实现
            dataSync((String) syncInfo);
        } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) {
            //运行过程中产生的错误
            markProcess(dataSync((JSONObject) syncInfo));
        }
   @Async
   public void asynRedeal(String logUUID) {
      FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false);
      if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) {
         return;
      }
      String otherInfo = logFse.getString(CmnConst.OTHER_INFO);
      SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class);
      Object syncInfo = syncDataLogEntity.getSyncInfo();
      if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) {
         //解密错误类型 废弃日志逻辑代码未实现
         dataSync((String) syncInfo);
      } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) {
         //运行过程中产生的错误
         markProcess(dataSync((JSONObject) syncInfo));
      }
    }
   }
    /**
     * 标记日志
     *
     * @param lists
     */
    private void markProcess(List<JournalEntity> lists) {
        if (lists == null || lists.isEmpty()) {
            return;
        }
        long count = lists.stream().filter(item -> item.getResult() != 1).count();
        try {
            SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class);
            JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo();
            if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) {
                String tableName = syncInfo.getString(CmnConst.TABLE_NAME);
                baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" +
                                " where type=3 and UPPER(other_info) like concat('%',UPPER(?),'%')"
                        , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), "\"table_name\":\"" + tableName + "\""});
            }
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
        }
    }
   /**
    * 标记日志
    *
    * @param lists
    */
   private void markProcess(List<JournalEntity> lists) {
      if (lists == null || lists.isEmpty()) {
         return;
      }
      long count = lists.stream().filter(item -> item.getResult() != 1).count();
      try {
         SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class);
         JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo();
         if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) {
            String tableName = syncInfo.getString(CmnConst.TABLE_NAME);
            baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" +
                        " where type=3 and config_uuid in (select uuid from product_sys_data_sync_mes_sub where UPPER(table_name)=UPPER(?)) and (result=0 or deal_result=0)"
                  , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), tableName});
         }
      } catch (Exception e) {
         e.printStackTrace();
         SpringMVCContextHolder.getSystemLogger().error(e);
      }
   }
    public List<JournalEntity> dataSync(JSONObject jsonArray) {
        return dataSync(jsonArray, false);
    }
   public List<JournalEntity> dataSync(JSONObject jsonArray) {
      return dataSync(jsonArray, false);
   }
    public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) {
        List<JournalEntity> journalEntities = new ArrayList<>();
   public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) {
      List<JournalEntity> journalEntities = new ArrayList<>();
        SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity();
        syncDataLogEntity.setSyncInfo(jsonObject);
        syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger);
        Calendar c1 = Calendar.getInstance();
        boolean lockStatus = false;
        String lockKey = null;
        Dao sourceDao = null;
        try {
            String tableName = jsonObject.getString(CmnConst.TABLE_NAME);
            String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(",");
            if (ArrayUtil.isEmpty(uniqueSignFieldName)) {
                throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN);
            }
            // 获取同步配置信息
            DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
            if (DataTableEntity.isEmpty(syncConfigDte)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
            if (FieldSetEntity.isEmpty(syncConfigFse)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            lockKey = jsonObject.getString(CmnConst.TABLE_NAME);
            if (!customLock.tryLock(lockKey)) {
                syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!");
                journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName,
                        null, null, null, 0L));
                return null;
            }
            lockStatus = true;
            if (jsonObject == null || jsonObject.isEmpty()) {
                throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
            }
      SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity();
      syncDataLogEntity.setSyncInfo(jsonObject);
      syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger);
      Calendar c1 = Calendar.getInstance();
      boolean lockStatus = false;
      String lockKey = null;
      Dao sourceDao = null;
      try {
         try {
            String tableName = jsonObject.getString(CmnConst.TABLE_NAME);
            String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(",");
            if (ArrayUtil.isEmpty(uniqueSignFieldName)) {
               throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN);
            }
            // 获取同步配置信息
            DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
            if (DataTableEntity.isEmpty(syncConfigDte)) {
               throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
            if (FieldSetEntity.isEmpty(syncConfigFse)) {
               throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            lockKey = jsonObject.getString(CmnConst.TABLE_NAME);
            if (!customLock.tryLock(lockKey, 120)) {
               syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!");
               journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName,
                     null, null, null, 0L));
               return null;
            }
            lockStatus = true;
            if (jsonObject == null || jsonObject.isEmpty()) {
               throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
            }
            // 获取来源连接和目标连接
            if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) {
                throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL);
            }
            FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false);
            DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse);
            sourceDao = sourceDbe.getDao();
            if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) {
                throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL);
            }
            DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true));
            List<Dao> targetDaoList = Lists.newArrayList();
            FieldSetEntity targetDataSourceFse;
            for (int i = 0; i < targetDataSourceDte.getRows(); i++) {
                targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i);
                targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao());
            }
            // 获取来源连接和目标连接
            if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) {
               throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL);
            }
            FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false);
            DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse);
            sourceDao = sourceDbe.getDao();
            if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) {
               throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL);
            }
            DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true));
            List<Dao> targetDaoList = Lists.newArrayList();
            FieldSetEntity targetDataSourceFse;
            for (int i = 0; i < targetDataSourceDte.getRows(); i++) {
               targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i);
               targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao());
            }
            // 数据处理
            String updateTimeFieldName;
            DataTableEntity sourceDataDte;
            FieldSetEntity sourceDataFse;
            DataTableEntity targetDataDte = null;
            List<String> sourceUniqueKeyList;// 来源数据的唯一键集合
            List<String> targetUniqueKeyList;// 目标数据的唯一键集合
            List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合
            List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合
            List<String> publicKeyList;// 两者都存在的唯一键集合
            DataTableEntity addDte;
            DataTableEntity updateDte;
            // 数据处理
            String updateTimeFieldName;
            DataTableEntity sourceDataDte;
            FieldSetEntity sourceDataFse;
            DataTableEntity targetDataDte = null;
            List<String> sourceUniqueKeyList;// 来源数据的唯一键集合
            List<String> targetUniqueKeyList;// 目标数据的唯一键集合
            List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合
            List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合
            List<String> publicKeyList;// 两者都存在的唯一键集合
            DataTableEntity addDte;
            DataTableEntity updateDte;
//            StringBuilder errorInfo = new StringBuilder(1024);
            int dataSourceCount = 0;
            try {
                tableName = tableName.toLowerCase(Locale.ROOT);
            int dataSourceCount = 0;
            try {
               tableName = tableName.toLowerCase(Locale.ROOT);
//                uniqueSignFieldName = uniqueSignFieldName.toLowerCase(Locale.ROOT);
                updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME);
                if (StringUtils.isEmpty(updateTimeFieldName)) {
                    // 没有更新标识,执行覆盖对比更新
                    sourceDataDte = sourceDao.getList(tableName, "");
                    sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                    //dao对应目标数据源信息的下标
                    for (Dao targetDao : targetDaoList) {
                        addDte = new DataTableEntity();
                        updateDte = new DataTableEntity();
                        TimeInterval timer = DateUtil.timer();
                        try {
                            targetDataDte = targetDao.getList(tableName, "");
                            if (!DataTableEntity.isEmpty(targetDataDte)) {
                                targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                                targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1);
                                publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                                for (int j = 0; j < sourceDataDte.getRows(); j++) {
                                    sourceDataFse = sourceDataDte.getFieldSetEntity(j);
                                    sourceDataFse.setTableName(tableName);
                                    if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                        addDte.addFieldSetEntity(sourceDataFse);
                                    } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                        updateDte.addFieldSetEntity(sourceDataFse);
                                    }
                                }
                                String filter = " ( ";
                                for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                    String uniqueSign = uniqueSignFieldName[i];
                                    if (i > 0) {
                                        filter += " and ";
               updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME);
               if (StringUtils.isEmpty(updateTimeFieldName)) {
                  // 没有更新标识,执行覆盖对比更新
                  sourceDataDte = sourceDao.getList(tableName, "");
                  sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                  //dao对应目标数据源信息的下标
                  for (Dao targetDao : targetDaoList) {
                     addDte = new DataTableEntity();
                     updateDte = new DataTableEntity();
                     TimeInterval timer = DateUtil.timer();
                     try {
                        targetDataDte = targetDao.getList(tableName, "");
                        if (!DataTableEntity.isEmpty(targetDataDte)) {
                           targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                           sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                           targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1);
                           publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                           for (int j = 0; j < sourceDataDte.getRows(); j++) {
                              sourceDataFse = sourceDataDte.getFieldSetEntity(j);
                              sourceDataFse.setTableName(tableName);
                              if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                 addDte.addFieldSetEntity(sourceDataFse);
                              } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                 updateDte.addFieldSetEntity(sourceDataFse);
                              }
                           }
                           String filter = " ( ";
                           for (int i = 0; i < uniqueSignFieldName.length; i++) {
                              String uniqueSign = uniqueSignFieldName[i];
                              if (i > 0) {
                                 filter += " and ";
                                    }
                                    filter += " " + uniqueSign + "=? ";
                                }
                                filter += " ) ";
                                String[] lower = new String[uniqueSignFieldName.length];
                                for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                    lower[i] = uniqueSignFieldName[i].toLowerCase();
                                }
                                targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true);
                                if (!targetOnlyKeyList.isEmpty()) {
                                    List<Object> delParams = new ArrayList<>();
                                    StringBuilder delFilter = new StringBuilder();
                                    for (int i = 0; i < targetOnlyKeyList.size(); i++) {
                                        delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i))));
                                        if (i > 0) {
                                            delFilter.append(" OR ");
                                        }
                                        delFilter.append(" ( ");
                                        for (int j = 0; j < uniqueSignFieldName.length; j++) {
                                            if (j > 0) {
                                                delFilter.append(" and ");
                                            }
                                            delFilter.append(uniqueSignFieldName[j]).append("=? ");
                                        }
                                        delFilter.append(" ) ");
                                    }
                              }
                              filter += " " + uniqueSign + "=? ";
                           }
                           filter += " ) ";
                           String[] lower = new String[uniqueSignFieldName.length];
                           for (int i = 0; i < uniqueSignFieldName.length; i++) {
                              lower[i] = uniqueSignFieldName[i].toLowerCase();
                           }
                           targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true);
                           if (!targetOnlyKeyList.isEmpty()) {
                              List<Object> delParams = new ArrayList<>();
                              StringBuilder delFilter = new StringBuilder();
                              for (int i = 0; i < targetOnlyKeyList.size(); i++) {
                                 delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i))));
                                 if (i > 0) {
                                    delFilter.append(" OR ");
                                 }
                                 delFilter.append(" ( ");
                                 for (int j = 0; j < uniqueSignFieldName.length; j++) {
                                    if (j > 0) {
                                       delFilter.append(" and ");
                                    }
                                    delFilter.append(uniqueSignFieldName[j]).append("=? ");
                                 }
                                 delFilter.append(" ) ");
                              }
                                    targetDao.delete(tableName, delFilter.toString(), delParams.toArray());
                                }
                            } else {
                                addDte = sourceDataDte;
                                addDte.getMeta().setTableName(new Object[]{tableName});
                            }
                            targetDao.addBatch(addDte);
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                        } catch (Exception e) {
                            e.printStackTrace();
                            SpringMVCContextHolder.getSystemLogger().error(e);
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                            syncDataLogEntity.setErrorType(3);
                            journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                        } finally {
                            dataSourceCount++;
                        }
                    }
                } else {
                    // 存在更新标识,执行增量更新
                    updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT);
                    dataSourceCount = 0;
                    for (Dao targetDao : targetDaoList) {
                        TimeInterval timer = DateUtil.timer();
                        StringBuilder sql = new StringBuilder(128);
                        String maxValue = null;
                        String minValue = null;
                        int totalCount = 0;
                        try {
                            sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName);
                            FieldSetEntity paramFse = targetDao.getOne(sql.toString());
                            StringBuilder filter = new StringBuilder(128);
                            if (paramFse != null) {
                                if (!StringUtils.isEmpty(paramFse.getString("max_value"))) {
                                    filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value")))
                                            .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime()));
                                }
                            }
                            sql.setLength(0);
                            sql.append("select count(*) total_count from ").append(tableName);
                            if (filter.length() > 0) {
                                sql.append(" where ").append(filter);
                            }
                            paramFse = sourceDao.getOne(sql.toString());
                            totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count");
                            int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT);
                            int totalPage = (int) Math.ceil((double) totalCount / batchCount);
                            for (int j = 0; j < totalPage; j++) {
                                addDte = new DataTableEntity();
                                updateDte = new DataTableEntity();
                                sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount);
                                sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                sql.setLength(0);
                                sql.append("select ");
                                filter.setLength(0);
                              targetDao.delete(tableName, delFilter.toString(), delParams.toArray());
                           }
                        } else {
                           addDte = sourceDataDte;
                           addDte.getMeta().setTableName(new Object[]{tableName});
                        }
                        targetDao.addBatch(addDte);
                        syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                        journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName,
                              targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                     } catch (Exception e) {
                        e.printStackTrace();
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                        syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                        syncDataLogEntity.setErrorType(3);
                        journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName,
                              targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                     } finally {
                        dataSourceCount++;
                     }
                  }
               } else {
                  // 存在更新标识,执行增量更新
                  updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT);
                  dataSourceCount = 0;
                  for (Dao targetDao : targetDaoList) {
                     TimeInterval timer = DateUtil.timer();
                     StringBuilder sql = new StringBuilder(128);
                     String maxValue = null;
                     String minValue = null;
                     int totalCount = 0;
                     try {
                        sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName);
                        FieldSetEntity paramFse = targetDao.getOne(sql.toString());
                        StringBuilder filter = new StringBuilder(128);
                        if (paramFse != null) {
                           if (!StringUtils.isEmpty(paramFse.getString("max_value"))) {
                              filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value")))
                                    .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime()));
                           }
                        }
                        sql.setLength(0);
                        sql.append("select count(*) total_count from ").append(tableName);
                        if (filter.length() > 0) {
                           sql.append(" where ").append(filter);
                        }
                        paramFse = sourceDao.getOne(sql.toString());
                        totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count");
                        int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT);
                        int totalPage = (int) Math.ceil((double) totalCount / batchCount);
                        for (int j = 0; j < totalPage; j++) {
                           addDte = new DataTableEntity();
                           updateDte = new DataTableEntity();
                           sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount);
                           sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                           sql.setLength(0);
                           sql.append("select ");
                           filter.setLength(0);
                                for (int i = 0; i < sourceUniqueKeyList.size(); i++) {
                                    if (i > 0) {
                                        filter.append(" or ");
                                    }
                                    filter.append(" ( ");
                                    for (int k = 0; k < uniqueSignFieldName.length; k++) {
                                        if (i == 0) {
                                            if (k > 0) {
                                                sql.append(",");
                                            }
                                            sql.append(uniqueSignFieldName[k]);
                           for (int i = 0; i < sourceUniqueKeyList.size(); i++) {
                              if (i > 0) {
                                 filter.append(" or ");
                              }
                              filter.append(" ( ");
                              for (int k = 0; k < uniqueSignFieldName.length; k++) {
                                 if (i == 0) {
                                    if (k > 0) {
                                       sql.append(",");
                                    }
                                    sql.append(uniqueSignFieldName[k]);
                                        }
                                        if (k > 0) {
                                            filter.append(" AND ");
                                        }
                                        filter.append(uniqueSignFieldName[k]).append("= ? ");
                                    }
                                    filter.append(" ) ");
                                }
                                 }
                                 if (k > 0) {
                                    filter.append(" AND ");
                                 }
                                 filter.append(uniqueSignFieldName[k]).append("= ? ");
                              }
                              filter.append(" ) ");
                           }
                                sql.append(" from ").append(tableName).append(" where ").append(filter);
                           sql.append(" from ").append(tableName).append(" where ").append(filter);
                                String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new);
                                String[] parmas = ArrayUtil.addAll(objects);
                                DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas);
                                if (!DataTableEntity.isEmpty(paramDte)) {
                                    targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                    sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                                    publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                                    for (int k = 0; k < sourceDataDte.getRows(); k++) {
                                        sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                        String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName);
                                        //获取最小唯一值
                                        if (StringUtils.isEmpty(minValue)) {
                                            minValue = uniqueValue;
                                        } else {
                                            minValue = getAimID(minValue, uniqueValue, 0);
                                        }
                                        //获取最大唯一值
                                        if (StringUtils.isEmpty(maxValue)) {
                                            maxValue = uniqueValue;
                                        } else {
                                            maxValue = getAimID(minValue, uniqueValue, 1);
                                        }
                                        sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                        sourceDataFse.setTableName(tableName);
                                        if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                            addDte.addFieldSetEntity(sourceDataFse);
                                        } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                            updateDte.addFieldSetEntity(sourceDataFse);
                                        }
                                    }
                                    String filterUp = " ( ";
                                    for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                        String uniqueSign = uniqueSignFieldName[i];
                                        if (i > 0) {
                                            filterUp += " and ";
                           String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new);
                           String[] parmas = ArrayUtil.addAll(objects);
                           DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas);
                           if (!DataTableEntity.isEmpty(paramDte)) {
                              targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                              sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                              publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                              for (int k = 0; k < sourceDataDte.getRows(); k++) {
                                 sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                 String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName);
                                 //获取最小唯一值
                                 if (StringUtils.isEmpty(minValue)) {
                                    minValue = uniqueValue;
                                 } else {
                                    minValue = getAimID(minValue, uniqueValue, 0);
                                 }
                                 //获取最大唯一值
                                 if (StringUtils.isEmpty(maxValue)) {
                                    maxValue = uniqueValue;
                                 } else {
                                    maxValue = getAimID(minValue, uniqueValue, 1);
                                 }
                                 sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                 sourceDataFse.setTableName(tableName);
                                 if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                    addDte.addFieldSetEntity(sourceDataFse);
                                 } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                    updateDte.addFieldSetEntity(sourceDataFse);
                                 }
                              }
                              String filterUp = " ( ";
                              for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                 String uniqueSign = uniqueSignFieldName[i];
                                 if (i > 0) {
                                    filterUp += " and ";
                                        }
                                        filterUp += " " + uniqueSign + "=? ";
                                    }
                                    filterUp += " ) ";
                                 }
                                 filterUp += " " + uniqueSign + "=? ";
                              }
                              filterUp += " ) ";
                                    String[] lower = new String[uniqueSignFieldName.length];
                                    for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                        lower[i] = uniqueSignFieldName[i].toLowerCase();
                                    }
                                    targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true);
                                } else {
                                    addDte = sourceDataDte;
                                    addDte.getMeta().setTableName(new Object[]{tableName});
                                }
                                targetDao.addBatch(addDte);
                            }
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                                    minValue, maxValue,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                        } catch (Exception e) {
                            e.printStackTrace();
                            SpringMVCContextHolder.getSystemLogger().error(e);
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            syncDataLogEntity.setErrorType(3);
                            syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                            journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                                    minValue, maxValue,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                        } finally {
                            dataSourceCount++;
                        }
                    }
                }
            } catch (Exception e) {
                throw e;
            } finally {
                if (targetDaoList != null && !targetDaoList.isEmpty()) {
                    for (int i1 = 0; i1 < targetDaoList.size(); i1++) {
                        Dao dao = targetDaoList.get(i1);
                        if (dao != null) {
                            dao.closeConnection();
                        }
                    }
                }
                              String[] lower = new String[uniqueSignFieldName.length];
                              for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                 lower[i] = uniqueSignFieldName[i].toLowerCase();
                              }
                              targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true);
                           } else {
                              addDte = sourceDataDte;
                              addDte.getMeta().setTableName(new Object[]{tableName});
                           }
                           targetDao.addBatch(addDte);
                        }
                        syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                        journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                              minValue, maxValue,
                              targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                     } catch (Exception e) {
                        e.printStackTrace();
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                        syncDataLogEntity.setErrorType(3);
                        syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                        journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                              minValue, maxValue,
                              targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                     } finally {
                        dataSourceCount++;
                     }
                  }
               }
            } catch (Exception e) {
               throw e;
            } finally {
               if (targetDaoList != null && !targetDaoList.isEmpty()) {
                  for (int i1 = 0; i1 < targetDaoList.size(); i1++) {
                     Dao dao = targetDaoList.get(i1);
                     if (dao != null) {
                        dao.closeConnection();
                     }
                  }
               }
            }
            }
            return journalEntities;
        } catch (Exception e) {
            e.printStackTrace();
            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
            syncDataLogEntity.setErrorType(2);
            journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L
                    , e));
            return journalEntities;
        } finally {
            if (journalEntities != null) {
                for (int i = 0; i < journalEntities.size(); i++) {
                    journalManagerService.autoCreateJournal(journalEntities.get(i));
                }
            }
            if (lockStatus && !StringUtils.isEmpty(lockKey)) {
                customLock.unLock(lockKey);
            }
            if(sourceDao!=null){
                sourceDao.closeConnection();
            }
            return journalEntities;
         } catch (Exception e) {
            e.printStackTrace();
            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
            syncDataLogEntity.setErrorType(2);
            journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L
                  , e));
            return journalEntities;
         } finally {
            if (journalEntities != null) {
               for (int i = 0; i < journalEntities.size(); i++) {
                  journalManagerService.autoCreateJournal(journalEntities.get(i));
               }
            }
            if (lockStatus && !StringUtils.isEmpty(lockKey)) {
               customLock.unLock(lockKey);
            }
            if (sourceDao != null) {
               sourceDao.closeConnection();
            }
        }
    }
         }
      } finally {
         if (lockStatus && !StringUtils.isEmpty(lockKey)) {
            customLock.unLock(lockKey);
         }
      }
   }
    private static String[] getUniqueValue(String value) {
        return value.split("\\*-\\*_~!");
    }
   private static String[] getUniqueValue(String value) {
      return value.split("\\*-\\*_~!");
   }
    private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) {
        String name = "";
        for (int i = 0; i < uniqueFieldNames.length; i++) {
            if (i > 0) {
                name += "*-*_~!";
            }
            name += item.getString(uniqueFieldNames[i].toLowerCase());
        }
        return name;
    }
   private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) {
      String name = "";
      for (int i = 0; i < uniqueFieldNames.length; i++) {
         if (i > 0) {
            name += "*-*_~!";
         }
         name += item.getString(uniqueFieldNames[i].toLowerCase());
      }
      return name;
   }
    /**
     * 日志只是记录一条,反正每次
     * 覆盖  都是全部重新搞
     * 增量  都是查询子库取了一个最大的时间作为标准
     * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩)
     * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理)
     *
     * @param encodedData
     */
    @Deprecated
    public synchronized void dataSync(String encodedData) {
        TimeInterval timer = DateUtil.timer();
        JSONArray jsonArray = null;
        List<JournalEntity> journalEntities = new ArrayList<>();
        try {
            // 获取同步配置信息
            DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
            if (DataTableEntity.isEmpty(syncConfigDte)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
            if (FieldSetEntity.isEmpty(syncConfigFse)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY);
            String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY);
            String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey));
            String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey));
   /**
    * 日志只是记录一条,反正每次
    * 覆盖  都是全部重新搞
    * 增量  都是查询子库取了一个最大的时间作为标准
    * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩)
    * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理)
    *
    * @param encodedData
    */
   @Deprecated
   public synchronized void dataSync(String encodedData) {
      TimeInterval timer = DateUtil.timer();
      JSONArray jsonArray = null;
      List<JournalEntity> journalEntities = new ArrayList<>();
      try {
         // 获取同步配置信息
         DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
         if (DataTableEntity.isEmpty(syncConfigDte)) {
            throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
         }
         FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
         if (FieldSetEntity.isEmpty(syncConfigFse)) {
            throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
         }
         String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY);
         String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY);
         String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey));
         String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey));
            if (StringUtils.isEmpty(jsonArrayStr)) {
                throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
            }
            jsonArray = JSON.parseArray(jsonArrayStr);
        } catch (Exception e) {
            SpringMVCContextHolder.getSystemLogger().error(e);
            SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity();
            syncDataLogEntity.setErrorType(1);
            syncDataLogEntity.setSyncInfo(encodedData);
            JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs()
                    , e));
            journalManagerService.autoCreateJournal(journalEntity);
        }
        if (jsonArray != null) {
            try {
                for (int i = 0; i < jsonArray.size(); i++) {
                    this.dataSync(jsonArray.getJSONObject(i));
                }
            } catch (Exception e) {
                //不对错误做任何操作请在调用的方法中进行处理
                e.printStackTrace();
            }
        }
    }
         if (StringUtils.isEmpty(jsonArrayStr)) {
            throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
         }
         jsonArray = JSON.parseArray(jsonArrayStr);
      } catch (Exception e) {
         SpringMVCContextHolder.getSystemLogger().error(e);
         SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity();
         syncDataLogEntity.setErrorType(1);
         syncDataLogEntity.setSyncInfo(encodedData);
         JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs()
               , e));
         journalManagerService.autoCreateJournal(journalEntity);
      }
      if (jsonArray != null) {
         try {
            for (int i = 0; i < jsonArray.size(); i++) {
               this.dataSync(jsonArray.getJSONObject(i));
            }
         } catch (Exception e) {
            //不对错误做任何操作请在调用的方法中进行处理
            e.printStackTrace();
         }
      }
   }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                                       String dataSourceUid, long elapsedTime, Throwable throwable) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable);
    }
   private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                              String dataSourceUid, long elapsedTime, Throwable throwable) {
      return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable);
   }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                                       String dataSourceUid, long elapsedTime) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null);
    }
   private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                              String dataSourceUid, long elapsedTime) {
      return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null);
   }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                                       String maxId, String dataSourceUid, long elapsedTime) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null);
    }
   private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                              String maxId, String dataSourceUid, long elapsedTime) {
      return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null);
   }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                                       String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) {
        JournalEntity journalEntity = new JournalEntity();
        journalEntity.setType(3);
        journalEntity.setDeal_flag(0);
        journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0);
        String configUUID = "";
        if (syncDataLogEntity.getSyncInfo() != null) {
            JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString());
            configUUID = syncInfo.getString(CmnConst.UUID);
        }
        journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID);
        journalEntity.setCount(dataCount);
        journalEntity.setMin_id(minId);
        journalEntity.setMax_id(maxId);
        journalEntity.setData_source(dataSourceUid);
        journalEntity.setSingle_duration(elapsedTime);
        journalEntity.setOther_info(syncDataLogEntity.toString());
        journalEntity.setCreated_utc_datetime(new Date());
        if (journalEntity.getResult() != 1 && throwable != null) {
            journalEntity.setError(journalManagerService.getStackTrace(throwable));
        }
        journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0);
        return journalEntity;
    }
   private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                              String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) {
      JournalEntity journalEntity = new JournalEntity();
      journalEntity.setType(3);
      journalEntity.setDeal_flag(0);
      journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0);
      String configUUID = "";
      if (syncDataLogEntity.getSyncInfo() != null) {
         JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString());
         configUUID = syncInfo.getString(CmnConst.UUID);
      }
      journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID);
      journalEntity.setCount(dataCount);
      journalEntity.setMin_id(minId);
      journalEntity.setMax_id(maxId);
      journalEntity.setData_source(dataSourceUid);
      journalEntity.setSingle_duration(elapsedTime);
      journalEntity.setOther_info(syncDataLogEntity.toString());
      journalEntity.setCreated_utc_datetime(new Date());
      if (journalEntity.getResult() != 1 && throwable != null) {
         journalEntity.setError(journalManagerService.getStackTrace(throwable));
      }
      journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0);
      return journalEntity;
   }
    private String getAimID(String s1, String s2, int sign) {
        if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
            return null;
        }
        if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
            return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1;
        }
        String numberRegexp = "\\d{1,11}";
        if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
            boolean b = Long.parseLong(s1) > Long.parseLong(s2);
            if (sign > 0) {
                return b ? s1 : s2;
            } else {
                return b ? s2 : s1;
            }
        } else {
            if (sign > 0) {
                return s1.compareTo(s2) > 0 ? s1 : s2;
            } else {
                return s1.compareTo(s2) > 0 ? s2 : s1;
            }
        }
    }
   private String getAimID(String s1, String s2, int sign) {
      if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
         return null;
      }
      if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
         return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1;
      }
      String numberRegexp = "\\d{1,11}";
      if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
         boolean b = Long.parseLong(s1) > Long.parseLong(s2);
         if (sign > 0) {
            return b ? s1 : s2;
         } else {
            return b ? s2 : s1;
         }
      } else {
         if (sign > 0) {
            return s1.compareTo(s2) > 0 ? s1 : s2;
         } else {
            return s1.compareTo(s2) > 0 ? s2 : s1;
         }
      }
   }
    /**
     * 处理唯一键集合
     *
     * @param sourceList 来源
     * @param targetList 目标
     * @param sign       1 来源单独存在;0 两者共有;-1 目标单独存在
     * @return
     */
    private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) {
        List<T> sourceUniqueKeyList = Lists.newArrayList();
        sourceUniqueKeyList.addAll(sourceList);
        List<T> targetUniqueKeyList = Lists.newArrayList();
        targetUniqueKeyList.addAll(targetList);
        List<T> resultObjList;
        if (sign == 1) {
            sourceUniqueKeyList.removeAll(targetUniqueKeyList);
            resultObjList = sourceUniqueKeyList;
        } else if (sign == -1) {
            targetUniqueKeyList.removeAll(sourceUniqueKeyList);
            resultObjList = targetUniqueKeyList;
        } else if (sign == 0) {
            sourceUniqueKeyList.retainAll(targetUniqueKeyList);
            resultObjList = sourceUniqueKeyList;
        } else {
            throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY);
        }
        return transferListType(resultObjList);
    }
   /**
    * 处理唯一键集合
    *
    * @param sourceList 来源
    * @param targetList 目标
    * @param sign       1 来源单独存在;0 两者共有;-1 目标单独存在
    * @return
    */
   private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) {
      List<T> sourceUniqueKeyList = Lists.newArrayList();
      sourceUniqueKeyList.addAll(sourceList);
      List<T> targetUniqueKeyList = Lists.newArrayList();
      targetUniqueKeyList.addAll(targetList);
      List<T> resultObjList;
      if (sign == 1) {
         sourceUniqueKeyList.removeAll(targetUniqueKeyList);
         resultObjList = sourceUniqueKeyList;
      } else if (sign == -1) {
         targetUniqueKeyList.removeAll(sourceUniqueKeyList);
         resultObjList = targetUniqueKeyList;
      } else if (sign == 0) {
         sourceUniqueKeyList.retainAll(targetUniqueKeyList);
         resultObjList = sourceUniqueKeyList;
      } else {
         throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY);
      }
      return transferListType(resultObjList);
   }
    /**
     * 转换list中obj为string
     *
     * @param list
     * @return
     */
    private <T> List<T> transferListType(List<T> list) {
        List<T> resultList = Lists.newArrayList();
        list.forEach(k -> {
            if (k == null) {
                resultList.add(null);
            } else {
                resultList.add(k);
            }
        });
        return resultList;
    }
   /**
    * 转换list中obj为string
    *
    * @param list
    * @return
    */
   private <T> List<T> transferListType(List<T> list) {
      List<T> resultList = Lists.newArrayList();
      list.forEach(k -> {
         if (k == null) {
            resultList.add(null);
         } else {
            resultList.add(k);
         }
      });
      return resultList;
   }
    /**
     * 追加错误信息
     *
     * @param errorInfo
     * @param e
     */
    private void appendError(StringBuilder errorInfo, Throwable e) {
        if (errorInfo.length() > 0) {
            errorInfo.append("\n");
        }
        errorInfo.append(journalManagerService.getStackTrace(e));
    }
   /**
    * 追加错误信息
    *
    * @param errorInfo
    * @param e
    */
   private void appendError(StringBuilder errorInfo, Throwable e) {
      if (errorInfo.length() > 0) {
         errorInfo.append("\n");
      }
      errorInfo.append(journalManagerService.getStackTrace(e));
   }
}