T100738
2024-04-16 eeb86aaf2f73a02600195ce2637dde6caf858a88
commit
已修改5个文件
1201 ■■■■ 文件已修改
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataExtractService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataSyncService.java 1138 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/utils/CustomLock.java 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-web/resources/LicenseKey.dat 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -202,7 +202,7 @@
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                    executorService.shutdownNow();
                }
                queryThreadMap.remove(tableName);
            }
product-server-data-center/src/main/java/com/product/data/center/service/DataExtractService.java
@@ -533,7 +533,7 @@
                                    maybeUpdate.getData().sort((o1, o2) -> DateUtil.compare(o1.getDate(extractUpdateTimeField), o2.getDate(extractUpdateTimeField)));
                                    batchResultEntity = targetNewDao.updateBatch(maybeUpdate, updateFilterEntity, false);
                                    batchResultEntity = targetNewDao.updateBatch(maybeUpdate, updateFilterEntity, true);
                                    WriteExtractUtil.append("更新提取过滤后数据:" + extractTargetTable + ",需要更新的条数:" + maybeUpdate.getRows() + ",耗时:" + tempTestTimer2.intervalMs());
                                    targetNewDao.closeConnection();
product-server-data-center/src/main/java/com/product/data/center/service/DataSyncService.java
@@ -43,21 +43,21 @@
 */
@Component
public class DataSyncService {
    @Autowired
    private BaseDao baseDao;
    @Autowired
    private JournalManagerService journalManagerService;
    @Autowired
    private BaseDao baseDao;
    @Autowired
    private JournalManagerService journalManagerService;
    public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ";
    //    public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo";
    public static final String cryptPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJlJtWr3FRT-T2HzJk0Jhxbt-Qm2fupACtwpqeLnz9iXfXGWdSvkc4OUpmM4pNGOa1fBTB_RNu5f1-dm74QEEV0CAwEAAQ";
    //    public static final String cryptPrivateKey = "MIIBVQIBADANBgkqhkiG9w0BAQEFAASCAT8wggE7AgEAAkEAmUm1avcVFP5PYfMmTQmHFu35CbZ-6kAK3Cmp4ufP2Jd9cZZ1K-Rzg5SmYzik0Y5rV8FMH9E27l_X52bvhAQRXQIDAQABAkBHyE6ekqpatGS0N8s91DJguHwg4kc4p1julMwrp-abREwrfW1RiwN4YIhwmMHBw_971RCtN55XJPVb7shx7Bh5AiEAy_pCJaRgeHUlZfQxOu7eBoswbimm4xks2Q7_4HqoT28CIQDAYelqjDNgeigKwVUrgOmKBSqldDxqXXixVDBebMSF8wIhAILviYiKRNbuM-yHXRa8gM9oh9UfbZ536Z8IDt61PdeHAiBjT2f2F4_CAu0-uBSmU3K7S_V62akCY2QVbldVtyIv3wIhAKB4lbrlUL3OC7DDtRaYNaCQdkYqDVV2eRZgQdzLnQTo";
//    public static final String signPublicKey = "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAJkgrw6CzG6odwYfbN8Yp77vjfs3ufjaR8S3xLQiZBmsMFv5T9Mkt4EkDpKZRx9BoAOf_a0zyhq8lbk78kDcKFUCAwEAAQ";
    public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ";
    public static final String signPrivateKey = "MIIBUwIBADANBgkqhkiG9w0BAQEFAASCAT0wggE5AgEAAkEAmSCvDoLMbqh3Bh9s3xinvu-N-ze5-NpHxLfEtCJkGawwW_lP0yS3gSQOkplHH0GgA5_9rTPKGryVuTvyQNwoVQIDAQABAkAWJTnr-VKjdk2wXv8ZzLEF1hNMj6SfrsHOW11hR8_-PkVSZmCzxlBNaREtixV538GdWZhjMbmlbvApa4c9TZaBAiEA1t8X6SyHAVKZRfGeheAWA0iyJRw9RZoW0RIH0qUoKXECIQC2cBX8jUDZdtZzGVQhUnnmmXVH4Tu_kaXjHFO1DwNbJQIgA7EVkhYHw8gNhhweoyI0fp3zIZwYmWeKWNE8fSwFQqECIEQGtVwPe4_a7QnL9v_Z1hRzQjUEOhgrgfSWWmwX5gN1AiAJsxMXJN81YsKlacXJZXofJMhGHheJ3xnF1UkntC96HQ";
    private static CustomLock customLock = new CustomLock();
    private static CustomLock customLock = new CustomLock();
    public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException {
        JSONArray jsonArray = new JSONArray();
    public static void test() throws NoSuchAlgorithmException, InvalidKeySpecException {
        JSONArray jsonArray = new JSONArray();
//        JSONObject obj1 = new JSONObject();
//        obj1.put(CmnConst.TABLE_NAME, "TT_T_CO_EMP_DESC");
@@ -65,601 +65,607 @@
//        obj1.put(CmnConst.UPDATE_TIME, "updated_utc_datetime");
//        jsonArray.add(obj1);
        JSONObject obj2 = new JSONObject();
        obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND");
        obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID");
        jsonArray.add(obj2);
        JSONObject obj2 = new JSONObject();
        obj2.put(CmnConst.TABLE_NAME, "TT_T_CO_EMPANDEMPLOYKIND");
        obj2.put(CmnConst.UNIQUE_NAME, "EMPANDEMPLOYKIND_ID");
        jsonArray.add(obj2);
        String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey));
        System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ"));
        String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey));
        System.out.println(encodedData);
        String signedData = RSAUtil.privateEncrypt(jsonArray.toString(), RSAUtil.getPrivateKey(signPrivateKey));
        System.out.println(signedData.equals("UADjiSAUii_PYdXTx-Jud6nY6cmsrQlsNfWK3vnIsuxg8rlugO-GUe0PetQxtvGuPrUR8hBcBrJUEsamI8rXyVp7WYdlBRsZmraglklb1orLspNSPgQzgSPHP6jhnW5OlTYMbdu_aU1r328jIfjwCk_hjHhN20ed7JW1OUXyQEQ"));
        String encodedData = RSAUtil.publicEncrypt(signedData, RSAUtil.getPublicKey(cryptPublicKey));
        System.out.println(encodedData);
//        dataSync(encodedData);
    }
    }
    public void runTimeTask(String uuid) {
        FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false);
        if (FieldSetEntity.isEmpty(fse)) {
            return;
        }
        JSONObject objects = BaseUtil.fieldSetEntityToJson(fse);
        markProcess(this.dataSync(objects, true));
    }
    public void runTimeTask(String uuid) {
        FieldSetEntity fse = baseDao.getFieldSetEntity("product_sys_data_sync_mes_sub", uuid, false);
        if (FieldSetEntity.isEmpty(fse)) {
            return;
        }
        JSONObject objects = BaseUtil.fieldSetEntityToJson(fse);
        markProcess(this.dataSync(objects, true));
    }
    /**
     * 重新处理
     *
     * @param logUUID
     */
    public void reDeal(String logUUID) {
        asynRedeal(logUUID);
    }
    /**
     * 重新处理
     *
     * @param logUUID
     */
    public void reDeal(String logUUID) {
        asynRedeal(logUUID);
    }
    @Async
    public void asynRedeal(String logUUID) {
        FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false);
        if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) {
            return;
        }
        String otherInfo = logFse.getString(CmnConst.OTHER_INFO);
        SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class);
        Object syncInfo = syncDataLogEntity.getSyncInfo();
        if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) {
            //解密错误类型 废弃日志逻辑代码未实现
            dataSync((String) syncInfo);
        } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) {
            //运行过程中产生的错误
            markProcess(dataSync((JSONObject) syncInfo));
        }
    @Async
    public void asynRedeal(String logUUID) {
        FieldSetEntity logFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUUID, false);
        if (FieldSetEntity.isEmpty(logFse) || !"3".equals(logFse.getString(CmnConst.TYPE))) {
            return;
        }
        String otherInfo = logFse.getString(CmnConst.OTHER_INFO);
        SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(otherInfo), SyncDataLogEntity.class);
        Object syncInfo = syncDataLogEntity.getSyncInfo();
        if (syncInfo instanceof String && syncDataLogEntity.getErrorType() == 1) {
            //解密错误类型 废弃日志逻辑代码未实现
            dataSync((String) syncInfo);
        } else if (syncDataLogEntity.getErrorType() == 2 || syncDataLogEntity.getErrorType() == 3) {
            //运行过程中产生的错误
            markProcess(dataSync((JSONObject) syncInfo));
        }
    }
    }
    /**
     * 标记日志
     *
     * @param lists
     */
    private void markProcess(List<JournalEntity> lists) {
        if (lists == null || lists.isEmpty()) {
            return;
        }
        long count = lists.stream().filter(item -> item.getResult() != 1).count();
        try {
            SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class);
            JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo();
            if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) {
                String tableName = syncInfo.getString(CmnConst.TABLE_NAME);
                baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" +
                                " where type=3 and UPPER(other_info) like concat('%',UPPER(?),'%')"
                        , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), "\"table_name\":\"" + tableName + "\""});
            }
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
        }
    }
    /**
     * 标记日志
     *
     * @param lists
     */
    private void markProcess(List<JournalEntity> lists) {
        if (lists == null || lists.isEmpty()) {
            return;
        }
        long count = lists.stream().filter(item -> item.getResult() != 1).count();
        try {
            SyncDataLogEntity syncDataLogEntity = JSONObject.toJavaObject(JSON.parseObject(lists.get(0).getOther_info()), SyncDataLogEntity.class);
            JSONObject syncInfo = (JSONObject) syncDataLogEntity.getSyncInfo();
            if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) {
                String tableName = syncInfo.getString(CmnConst.TABLE_NAME);
                baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" +
                                " where type=3 and config_uuid in (select uuid from product_sys_data_sync_mes_sub where UPPER(table_name)=UPPER(?)) and (result=0 or deal_result=0)"
                        , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), tableName});
            }
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
        }
    }
    public List<JournalEntity> dataSync(JSONObject jsonArray) {
        return dataSync(jsonArray, false);
    }
    public List<JournalEntity> dataSync(JSONObject jsonArray) {
        return dataSync(jsonArray, false);
    }
    public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) {
        List<JournalEntity> journalEntities = new ArrayList<>();
    public List<JournalEntity> dataSync(JSONObject jsonObject, boolean timeTaskTrigger) {
        List<JournalEntity> journalEntities = new ArrayList<>();
        SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity();
        syncDataLogEntity.setSyncInfo(jsonObject);
        syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger);
        Calendar c1 = Calendar.getInstance();
        boolean lockStatus = false;
        String lockKey = null;
        Dao sourceDao = null;
        try {
            String tableName = jsonObject.getString(CmnConst.TABLE_NAME);
            String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(",");
            if (ArrayUtil.isEmpty(uniqueSignFieldName)) {
                throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN);
            }
            // 获取同步配置信息
            DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
            if (DataTableEntity.isEmpty(syncConfigDte)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
            if (FieldSetEntity.isEmpty(syncConfigFse)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            lockKey = jsonObject.getString(CmnConst.TABLE_NAME);
            if (!customLock.tryLock(lockKey)) {
                syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!");
                journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName,
                        null, null, null, 0L));
                return null;
            }
            lockStatus = true;
            if (jsonObject == null || jsonObject.isEmpty()) {
                throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
            }
        SyncDataLogEntity<JSONObject> syncDataLogEntity = new SyncDataLogEntity();
        syncDataLogEntity.setSyncInfo(jsonObject);
        syncDataLogEntity.setTimeTaskTrigger(timeTaskTrigger);
        Calendar c1 = Calendar.getInstance();
        boolean lockStatus = false;
        String lockKey = null;
        Dao sourceDao = null;
        try {
            try {
                String tableName = jsonObject.getString(CmnConst.TABLE_NAME);
                String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(",");
                if (ArrayUtil.isEmpty(uniqueSignFieldName)) {
                    throw new BaseException(ErrorCode.NO_TABLE_OR_UNIQUE_SIGN);
                }
                // 获取同步配置信息
                DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
                if (DataTableEntity.isEmpty(syncConfigDte)) {
                    throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
                }
                FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
                if (FieldSetEntity.isEmpty(syncConfigFse)) {
                    throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
                }
                lockKey = jsonObject.getString(CmnConst.TABLE_NAME);
                if (!customLock.tryLock(lockKey, 120)) {
                    syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!");
                    journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName,
                            null, null, null, 0L));
                    return null;
                }
                lockStatus = true;
                if (jsonObject == null || jsonObject.isEmpty()) {
                    throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
                }
            // 获取来源连接和目标连接
            if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) {
                throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL);
            }
            FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false);
            DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse);
            sourceDao = sourceDbe.getDao();
            if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) {
                throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL);
            }
            DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true));
            List<Dao> targetDaoList = Lists.newArrayList();
            FieldSetEntity targetDataSourceFse;
            for (int i = 0; i < targetDataSourceDte.getRows(); i++) {
                targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i);
                targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao());
            }
                // 获取来源连接和目标连接
                if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.DATA_SOURCE))) {
                    throw new BaseException(ErrorCode.SYNC_GET_DATA_SOURCE_FAIL);
                }
                FieldSetEntity sourceDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, syncConfigFse.getString(CmnConst.DATA_SOURCE), false);
                DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSourceFse);
                sourceDao = sourceDbe.getDao();
                if (StringUtils.isEmpty(syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE))) {
                    throw new BaseException(ErrorCode.SYNC_GET_TARGET_DATA_SOURCE_FAIL);
                }
                DataTableEntity targetDataSourceDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, BaseUtil.buildQuestionMarkFilter("uuid", syncConfigFse.getString(CmnConst.TARGET_DATA_SOURCE).split(","), true));
                List<Dao> targetDaoList = Lists.newArrayList();
                FieldSetEntity targetDataSourceFse;
                for (int i = 0; i < targetDataSourceDte.getRows(); i++) {
                    targetDataSourceFse = targetDataSourceDte.getFieldSetEntity(i);
                    targetDaoList.add(new DataBaseEntity(targetDataSourceFse).getDao());
                }
            // 数据处理
            String updateTimeFieldName;
            DataTableEntity sourceDataDte;
            FieldSetEntity sourceDataFse;
            DataTableEntity targetDataDte = null;
            List<String> sourceUniqueKeyList;// 来源数据的唯一键集合
            List<String> targetUniqueKeyList;// 目标数据的唯一键集合
            List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合
            List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合
            List<String> publicKeyList;// 两者都存在的唯一键集合
            DataTableEntity addDte;
            DataTableEntity updateDte;
                // 数据处理
                String updateTimeFieldName;
                DataTableEntity sourceDataDte;
                FieldSetEntity sourceDataFse;
                DataTableEntity targetDataDte = null;
                List<String> sourceUniqueKeyList;// 来源数据的唯一键集合
                List<String> targetUniqueKeyList;// 目标数据的唯一键集合
                List<String> sourceOnlyKeyList;// 来源数据单独存在的唯一键集合
                List<String> targetOnlyKeyList;// 目标数据单独存在的唯一键集合
                List<String> publicKeyList;// 两者都存在的唯一键集合
                DataTableEntity addDte;
                DataTableEntity updateDte;
//            StringBuilder errorInfo = new StringBuilder(1024);
            int dataSourceCount = 0;
            try {
                tableName = tableName.toLowerCase(Locale.ROOT);
                int dataSourceCount = 0;
                try {
                    tableName = tableName.toLowerCase(Locale.ROOT);
//                uniqueSignFieldName = uniqueSignFieldName.toLowerCase(Locale.ROOT);
                updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME);
                if (StringUtils.isEmpty(updateTimeFieldName)) {
                    // 没有更新标识,执行覆盖对比更新
                    sourceDataDte = sourceDao.getList(tableName, "");
                    sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                    //dao对应目标数据源信息的下标
                    for (Dao targetDao : targetDaoList) {
                        addDte = new DataTableEntity();
                        updateDte = new DataTableEntity();
                        TimeInterval timer = DateUtil.timer();
                        try {
                            targetDataDte = targetDao.getList(tableName, "");
                            if (!DataTableEntity.isEmpty(targetDataDte)) {
                                targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                                targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1);
                                publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                                for (int j = 0; j < sourceDataDte.getRows(); j++) {
                                    sourceDataFse = sourceDataDte.getFieldSetEntity(j);
                                    sourceDataFse.setTableName(tableName);
                                    if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                        addDte.addFieldSetEntity(sourceDataFse);
                                    } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                        updateDte.addFieldSetEntity(sourceDataFse);
                                    }
                                }
                                String filter = " ( ";
                                for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                    String uniqueSign = uniqueSignFieldName[i];
                                    if (i > 0) {
                                        filter += " and ";
                    updateTimeFieldName = jsonObject.getString(CmnConst.UPDATE_TIME);
                    if (StringUtils.isEmpty(updateTimeFieldName)) {
                        // 没有更新标识,执行覆盖对比更新
                        sourceDataDte = sourceDao.getList(tableName, "");
                        sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                        //dao对应目标数据源信息的下标
                        for (Dao targetDao : targetDaoList) {
                            addDte = new DataTableEntity();
                            updateDte = new DataTableEntity();
                            TimeInterval timer = DateUtil.timer();
                            try {
                                targetDataDte = targetDao.getList(tableName, "");
                                if (!DataTableEntity.isEmpty(targetDataDte)) {
                                    targetUniqueKeyList = targetDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                    sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                                    targetOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, -1);
                                    publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                                    for (int j = 0; j < sourceDataDte.getRows(); j++) {
                                        sourceDataFse = sourceDataDte.getFieldSetEntity(j);
                                        sourceDataFse.setTableName(tableName);
                                        if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                            addDte.addFieldSetEntity(sourceDataFse);
                                        } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                            updateDte.addFieldSetEntity(sourceDataFse);
                                        }
                                    }
                                    String filter = " ( ";
                                    for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                        String uniqueSign = uniqueSignFieldName[i];
                                        if (i > 0) {
                                            filter += " and ";
                                    }
                                    filter += " " + uniqueSign + "=? ";
                                }
                                filter += " ) ";
                                String[] lower = new String[uniqueSignFieldName.length];
                                for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                    lower[i] = uniqueSignFieldName[i].toLowerCase();
                                }
                                targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true);
                                if (!targetOnlyKeyList.isEmpty()) {
                                    List<Object> delParams = new ArrayList<>();
                                    StringBuilder delFilter = new StringBuilder();
                                    for (int i = 0; i < targetOnlyKeyList.size(); i++) {
                                        delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i))));
                                        if (i > 0) {
                                            delFilter.append(" OR ");
                                        }
                                        delFilter.append(" ( ");
                                        for (int j = 0; j < uniqueSignFieldName.length; j++) {
                                            if (j > 0) {
                                                delFilter.append(" and ");
                                            }
                                            delFilter.append(uniqueSignFieldName[j]).append("=? ");
                                        }
                                        delFilter.append(" ) ");
                                    }
                                        }
                                        filter += " " + uniqueSign + "=? ";
                                    }
                                    filter += " ) ";
                                    String[] lower = new String[uniqueSignFieldName.length];
                                    for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                        lower[i] = uniqueSignFieldName[i].toLowerCase();
                                    }
                                    targetDao.updateBatch(updateDte, new UpdateFilterEntity(filter, lower), true);
                                    if (!targetOnlyKeyList.isEmpty()) {
                                        List<Object> delParams = new ArrayList<>();
                                        StringBuilder delFilter = new StringBuilder();
                                        for (int i = 0; i < targetOnlyKeyList.size(); i++) {
                                            delParams.addAll(Arrays.asList(getUniqueValue(targetOnlyKeyList.get(i))));
                                            if (i > 0) {
                                                delFilter.append(" OR ");
                                            }
                                            delFilter.append(" ( ");
                                            for (int j = 0; j < uniqueSignFieldName.length; j++) {
                                                if (j > 0) {
                                                    delFilter.append(" and ");
                                                }
                                                delFilter.append(uniqueSignFieldName[j]).append("=? ");
                                            }
                                            delFilter.append(" ) ");
                                        }
                                    targetDao.delete(tableName, delFilter.toString(), delParams.toArray());
                                }
                            } else {
                                addDte = sourceDataDte;
                                addDte.getMeta().setTableName(new Object[]{tableName});
                            }
                            targetDao.addBatch(addDte);
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                        } catch (Exception e) {
                            e.printStackTrace();
                            SpringMVCContextHolder.getSystemLogger().error(e);
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                            syncDataLogEntity.setErrorType(3);
                            journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                        } finally {
                            dataSourceCount++;
                        }
                    }
                } else {
                    // 存在更新标识,执行增量更新
                    updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT);
                    dataSourceCount = 0;
                    for (Dao targetDao : targetDaoList) {
                        TimeInterval timer = DateUtil.timer();
                        StringBuilder sql = new StringBuilder(128);
                        String maxValue = null;
                        String minValue = null;
                        int totalCount = 0;
                        try {
                            sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName);
                            FieldSetEntity paramFse = targetDao.getOne(sql.toString());
                            StringBuilder filter = new StringBuilder(128);
                            if (paramFse != null) {
                                if (!StringUtils.isEmpty(paramFse.getString("max_value"))) {
                                    filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value")))
                                            .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime()));
                                }
                            }
                            sql.setLength(0);
                            sql.append("select count(*) total_count from ").append(tableName);
                            if (filter.length() > 0) {
                                sql.append(" where ").append(filter);
                            }
                            paramFse = sourceDao.getOne(sql.toString());
                            totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count");
                            int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT);
                            int totalPage = (int) Math.ceil((double) totalCount / batchCount);
                            for (int j = 0; j < totalPage; j++) {
                                addDte = new DataTableEntity();
                                updateDte = new DataTableEntity();
                                sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount);
                                sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                sql.setLength(0);
                                sql.append("select ");
                                filter.setLength(0);
                                        targetDao.delete(tableName, delFilter.toString(), delParams.toArray());
                                    }
                                } else {
                                    addDte = sourceDataDte;
                                    addDte.getMeta().setTableName(new Object[]{tableName});
                                }
                                targetDao.addBatch(addDte);
                                syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                                journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte.getRows(), tableName,
                                        targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                            } catch (Exception e) {
                                e.printStackTrace();
                                SpringMVCContextHolder.getSystemLogger().error(e);
                                syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                                syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                                syncDataLogEntity.setErrorType(3);
                                journalEntities.add(getLogRecord(syncDataLogEntity, targetDataDte != null ? targetDataDte.getRows() : 0, tableName,
                                        targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                            } finally {
                                dataSourceCount++;
                            }
                        }
                    } else {
                        // 存在更新标识,执行增量更新
                        updateTimeFieldName = updateTimeFieldName.toLowerCase(Locale.ROOT);
                        dataSourceCount = 0;
                        for (Dao targetDao : targetDaoList) {
                            TimeInterval timer = DateUtil.timer();
                            StringBuilder sql = new StringBuilder(128);
                            String maxValue = null;
                            String minValue = null;
                            int totalCount = 0;
                            try {
                                sql.append("select max(").append(updateTimeFieldName).append(") max_value from ").append(tableName);
                                FieldSetEntity paramFse = targetDao.getOne(sql.toString());
                                StringBuilder filter = new StringBuilder(128);
                                if (paramFse != null) {
                                    if (!StringUtils.isEmpty(paramFse.getString("max_value"))) {
                                        filter.append(updateTimeFieldName).append(">=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), paramFse.getDate("max_value")))
                                                .append(" and ").append(updateTimeFieldName).append("<=").append(SqlTransferUtil.str2Date(sourceDao.getDataBaseType().getValue(), c1.getTime()));
                                    }
                                }
                                sql.setLength(0);
                                sql.append("select count(*) total_count from ").append(tableName);
                                if (filter.length() > 0) {
                                    sql.append(" where ").append(filter);
                                }
                                paramFse = sourceDao.getOne(sql.toString());
                                totalCount = StringUtils.isEmpty(paramFse.getString("total_count")) ? 0 : paramFse.getInteger("total_count");
                                int batchCount = StringUtils.isEmpty(syncConfigFse.getString(CmnConst.INCREMENT_BATCH_COUNT)) ? 1000 : syncConfigFse.getInteger(CmnConst.INCREMENT_BATCH_COUNT);
                                int totalPage = (int) Math.ceil((double) totalCount / batchCount);
                                for (int j = 0; j < totalPage; j++) {
                                    addDte = new DataTableEntity();
                                    updateDte = new DataTableEntity();
                                    sourceDataDte = sourceDao.getList(tableName, filter.toString(), new Object[]{}, j + 1, batchCount);
                                    sourceUniqueKeyList = sourceDataDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                    sql.setLength(0);
                                    sql.append("select ");
                                    filter.setLength(0);
                                for (int i = 0; i < sourceUniqueKeyList.size(); i++) {
                                    if (i > 0) {
                                        filter.append(" or ");
                                    }
                                    filter.append(" ( ");
                                    for (int k = 0; k < uniqueSignFieldName.length; k++) {
                                        if (i == 0) {
                                            if (k > 0) {
                                                sql.append(",");
                                            }
                                            sql.append(uniqueSignFieldName[k]);
                                    for (int i = 0; i < sourceUniqueKeyList.size(); i++) {
                                        if (i > 0) {
                                            filter.append(" or ");
                                        }
                                        filter.append(" ( ");
                                        for (int k = 0; k < uniqueSignFieldName.length; k++) {
                                            if (i == 0) {
                                                if (k > 0) {
                                                    sql.append(",");
                                                }
                                                sql.append(uniqueSignFieldName[k]);
                                        }
                                        if (k > 0) {
                                            filter.append(" AND ");
                                        }
                                        filter.append(uniqueSignFieldName[k]).append("= ? ");
                                    }
                                    filter.append(" ) ");
                                }
                                            }
                                            if (k > 0) {
                                                filter.append(" AND ");
                                            }
                                            filter.append(uniqueSignFieldName[k]).append("= ? ");
                                        }
                                        filter.append(" ) ");
                                    }
                                sql.append(" from ").append(tableName).append(" where ").append(filter);
                                    sql.append(" from ").append(tableName).append(" where ").append(filter);
                                String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new);
                                String[] parmas = ArrayUtil.addAll(objects);
                                DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas);
                                if (!DataTableEntity.isEmpty(paramDte)) {
                                    targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                    sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                                    publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                                    for (int k = 0; k < sourceDataDte.getRows(); k++) {
                                        sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                        String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName);
                                        //获取最小唯一值
                                        if (StringUtils.isEmpty(minValue)) {
                                            minValue = uniqueValue;
                                        } else {
                                            minValue = getAimID(minValue, uniqueValue, 0);
                                        }
                                        //获取最大唯一值
                                        if (StringUtils.isEmpty(maxValue)) {
                                            maxValue = uniqueValue;
                                        } else {
                                            maxValue = getAimID(minValue, uniqueValue, 1);
                                        }
                                        sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                        sourceDataFse.setTableName(tableName);
                                        if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                            addDte.addFieldSetEntity(sourceDataFse);
                                        } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                            updateDte.addFieldSetEntity(sourceDataFse);
                                        }
                                    }
                                    String filterUp = " ( ";
                                    for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                        String uniqueSign = uniqueSignFieldName[i];
                                        if (i > 0) {
                                            filterUp += " and ";
                                    String[][] objects = sourceUniqueKeyList.stream().map(item -> getUniqueValue(item)).toArray(String[][]::new);
                                    String[] parmas = ArrayUtil.addAll(objects);
                                    DataTableEntity paramDte = targetDao.getList(sql.toString(), parmas);
                                    if (!DataTableEntity.isEmpty(paramDte)) {
                                        targetUniqueKeyList = paramDte.getData().stream().map(item -> getUniqueValue(item, uniqueSignFieldName)).collect(Collectors.toList());
                                        sourceOnlyKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 1);
                                        publicKeyList = dealDataUniqueKeyList(sourceUniqueKeyList, targetUniqueKeyList, 0);
                                        for (int k = 0; k < sourceDataDte.getRows(); k++) {
                                            sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                            String uniqueValue = getUniqueValue(sourceDataFse, uniqueSignFieldName);
                                            //获取最小唯一值
                                            if (StringUtils.isEmpty(minValue)) {
                                                minValue = uniqueValue;
                                            } else {
                                                minValue = getAimID(minValue, uniqueValue, 0);
                                            }
                                            //获取最大唯一值
                                            if (StringUtils.isEmpty(maxValue)) {
                                                maxValue = uniqueValue;
                                            } else {
                                                maxValue = getAimID(minValue, uniqueValue, 1);
                                            }
                                            sourceDataFse = sourceDataDte.getFieldSetEntity(k);
                                            sourceDataFse.setTableName(tableName);
                                            if (sourceOnlyKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                                addDte.addFieldSetEntity(sourceDataFse);
                                            } else if (publicKeyList.contains(getUniqueValue(sourceDataFse, uniqueSignFieldName))) {
                                                updateDte.addFieldSetEntity(sourceDataFse);
                                            }
                                        }
                                        String filterUp = " ( ";
                                        for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                            String uniqueSign = uniqueSignFieldName[i];
                                            if (i > 0) {
                                                filterUp += " and ";
                                        }
                                        filterUp += " " + uniqueSign + "=? ";
                                    }
                                    filterUp += " ) ";
                                            }
                                            filterUp += " " + uniqueSign + "=? ";
                                        }
                                        filterUp += " ) ";
                                    String[] lower = new String[uniqueSignFieldName.length];
                                    for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                        lower[i] = uniqueSignFieldName[i].toLowerCase();
                                    }
                                    targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true);
                                } else {
                                    addDte = sourceDataDte;
                                    addDte.getMeta().setTableName(new Object[]{tableName});
                                }
                                targetDao.addBatch(addDte);
                            }
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                                    minValue, maxValue,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                        } catch (Exception e) {
                            e.printStackTrace();
                            SpringMVCContextHolder.getSystemLogger().error(e);
                            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                            syncDataLogEntity.setErrorType(3);
                            syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                            journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                                    minValue, maxValue,
                                    targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                        } finally {
                            dataSourceCount++;
                        }
                    }
                }
            } catch (Exception e) {
                throw e;
            } finally {
                if (targetDaoList != null && !targetDaoList.isEmpty()) {
                    for (int i1 = 0; i1 < targetDaoList.size(); i1++) {
                        Dao dao = targetDaoList.get(i1);
                        if (dao != null) {
                            dao.closeConnection();
                        }
                    }
                }
                                        String[] lower = new String[uniqueSignFieldName.length];
                                        for (int i = 0; i < uniqueSignFieldName.length; i++) {
                                            lower[i] = uniqueSignFieldName[i].toLowerCase();
                                        }
                                        targetDao.updateBatch(updateDte, new UpdateFilterEntity(filterUp, lower), true);
                                    } else {
                                        addDte = sourceDataDte;
                                        addDte.getMeta().setTableName(new Object[]{tableName});
                                    }
                                    targetDao.addBatch(addDte);
                                }
                                syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                                journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                                        minValue, maxValue,
                                        targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs()));
                            } catch (Exception e) {
                                e.printStackTrace();
                                SpringMVCContextHolder.getSystemLogger().error(e);
                                syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                                syncDataLogEntity.setErrorType(3);
                                syncDataLogEntity.setOtherInfo("目标数据源配置uuid:" + targetDataSourceDte.getString(dataSourceCount, "uuid"));
                                journalEntities.add(getLogRecord(syncDataLogEntity, totalCount, tableName,
                                        minValue, maxValue,
                                        targetDataSourceDte.getString(dataSourceCount, CmnConst.UUID), timer.intervalMs(), e));
                            } finally {
                                dataSourceCount++;
                            }
                        }
                    }
                } catch (Exception e) {
                    throw e;
                } finally {
                    if (targetDaoList != null && !targetDaoList.isEmpty()) {
                        for (int i1 = 0; i1 < targetDaoList.size(); i1++) {
                            Dao dao = targetDaoList.get(i1);
                            if (dao != null) {
                                dao.closeConnection();
                            }
                        }
                    }
            }
                }
            return journalEntities;
        } catch (Exception e) {
            e.printStackTrace();
            syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
            syncDataLogEntity.setErrorType(2);
            journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L
                    , e));
            return journalEntities;
        } finally {
            if (journalEntities != null) {
                for (int i = 0; i < journalEntities.size(); i++) {
                    journalManagerService.autoCreateJournal(journalEntities.get(i));
                }
            }
            if (lockStatus && !StringUtils.isEmpty(lockKey)) {
                customLock.unLock(lockKey);
            }
            if(sourceDao!=null){
                sourceDao.closeConnection();
            }
                return journalEntities;
            } catch (Exception e) {
                e.printStackTrace();
                syncDataLogEntity = new SyncDataLogEntity(syncDataLogEntity);
                syncDataLogEntity.setErrorType(2);
                journalEntities.add(getLogRecord(syncDataLogEntity, null, null, null, 0L
                        , e));
                return journalEntities;
            } finally {
                if (journalEntities != null) {
                    for (int i = 0; i < journalEntities.size(); i++) {
                        journalManagerService.autoCreateJournal(journalEntities.get(i));
                    }
                }
                if (lockStatus && !StringUtils.isEmpty(lockKey)) {
                    customLock.unLock(lockKey);
                }
                if (sourceDao != null) {
                    sourceDao.closeConnection();
                }
        }
    }
            }
        } finally {
            if (lockStatus && !StringUtils.isEmpty(lockKey)) {
                customLock.unLock(lockKey);
            }
        }
    }
    private static String[] getUniqueValue(String value) {
        return value.split("\\*-\\*_~!");
    }
    private static String[] getUniqueValue(String value) {
        return value.split("\\*-\\*_~!");
    }
    private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) {
        String name = "";
        for (int i = 0; i < uniqueFieldNames.length; i++) {
            if (i > 0) {
                name += "*-*_~!";
            }
            name += item.getString(uniqueFieldNames[i].toLowerCase());
        }
        return name;
    }
    private String getUniqueValue(FieldSetEntity item, String[] uniqueFieldNames) {
        String name = "";
        for (int i = 0; i < uniqueFieldNames.length; i++) {
            if (i > 0) {
                name += "*-*_~!";
            }
            name += item.getString(uniqueFieldNames[i].toLowerCase());
        }
        return name;
    }
    /**
     * 日志只是记录一条,反正每次
     * 覆盖  都是全部重新搞
     * 增量  都是查询子库取了一个最大的时间作为标准
     * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩)
     * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理)
     *
     * @param encodedData
     */
    @Deprecated
    public synchronized void dataSync(String encodedData) {
        TimeInterval timer = DateUtil.timer();
        JSONArray jsonArray = null;
        List<JournalEntity> journalEntities = new ArrayList<>();
        try {
            // 获取同步配置信息
            DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
            if (DataTableEntity.isEmpty(syncConfigDte)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
            if (FieldSetEntity.isEmpty(syncConfigFse)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY);
            String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY);
            String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey));
            String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey));
    /**
     * 日志只是记录一条,反正每次
     * 覆盖  都是全部重新搞
     * 增量  都是查询子库取了一个最大的时间作为标准
     * 点击日志中的重新处理只是为了触发这个口子而已,和实际已经操作了好多数据剩余好多数据并没有好大的关系(查询和插入数据依然采用分批,不然数据量大了内存要崩)
     * 数据不进行回滚,遇到错误直接写入到日志中(因为若是回滚,那么就相当于所有的线都没有获取到新的内容,不回滚数据可以让没问题的线正常处理)
     *
     * @param encodedData
     */
    @Deprecated
    public synchronized void dataSync(String encodedData) {
        TimeInterval timer = DateUtil.timer();
        JSONArray jsonArray = null;
        List<JournalEntity> journalEntities = new ArrayList<>();
        try {
            // 获取同步配置信息
            DataTableEntity syncConfigDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MES);
            if (DataTableEntity.isEmpty(syncConfigDte)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            FieldSetEntity syncConfigFse = syncConfigDte.getFieldSetEntity(0);
            if (FieldSetEntity.isEmpty(syncConfigFse)) {
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            String cryptPrivateKey = syncConfigFse.getString(CmnConst.DECRYPT_KEY);
            String signPublicKey = syncConfigFse.getString(CmnConst.SIGN_KEY);
            String signedData = RSAUtil.privateDecrypt(encodedData, RSAUtil.getPrivateKey(cryptPrivateKey));
            String jsonArrayStr = RSAUtil.publicDecrypt(signedData, RSAUtil.getPublicKey(signPublicKey));
            if (StringUtils.isEmpty(jsonArrayStr)) {
                throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
            }
            jsonArray = JSON.parseArray(jsonArrayStr);
        } catch (Exception e) {
            SpringMVCContextHolder.getSystemLogger().error(e);
            SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity();
            syncDataLogEntity.setErrorType(1);
            syncDataLogEntity.setSyncInfo(encodedData);
            JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs()
                    , e));
            journalManagerService.autoCreateJournal(journalEntity);
        }
        if (jsonArray != null) {
            try {
                for (int i = 0; i < jsonArray.size(); i++) {
                    this.dataSync(jsonArray.getJSONObject(i));
                }
            } catch (Exception e) {
                //不对错误做任何操作请在调用的方法中进行处理
                e.printStackTrace();
            }
        }
    }
            if (StringUtils.isEmpty(jsonArrayStr)) {
                throw new BaseException(ErrorCode.SYNC_GET_PARAM_FAIL);
            }
            jsonArray = JSON.parseArray(jsonArrayStr);
        } catch (Exception e) {
            SpringMVCContextHolder.getSystemLogger().error(e);
            SyncDataLogEntity<String> syncDataLogEntity = new SyncDataLogEntity();
            syncDataLogEntity.setErrorType(1);
            syncDataLogEntity.setSyncInfo(encodedData);
            JournalEntity journalEntity = (getLogRecord(syncDataLogEntity, null, null, null, timer.intervalMs()
                    , e));
            journalManagerService.autoCreateJournal(journalEntity);
        }
        if (jsonArray != null) {
            try {
                for (int i = 0; i < jsonArray.size(); i++) {
                    this.dataSync(jsonArray.getJSONObject(i));
                }
            } catch (Exception e) {
                //不对错误做任何操作请在调用的方法中进行处理
                e.printStackTrace();
            }
        }
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                                       String dataSourceUid, long elapsedTime, Throwable throwable) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable);
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                                       String dataSourceUid, long elapsedTime, Throwable throwable) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, throwable);
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                                       String dataSourceUid, long elapsedTime) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null);
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid,
                                       String dataSourceUid, long elapsedTime) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, null, null, dataSourceUid, elapsedTime, null);
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                                       String maxId, String dataSourceUid, long elapsedTime) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null);
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                                       String maxId, String dataSourceUid, long elapsedTime) {
        return getLogRecord(syncDataLogEntity, dataCount, configUid, minId, maxId, dataSourceUid, elapsedTime, null);
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                                       String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) {
        JournalEntity journalEntity = new JournalEntity();
        journalEntity.setType(3);
        journalEntity.setDeal_flag(0);
        journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0);
        String configUUID = "";
        if (syncDataLogEntity.getSyncInfo() != null) {
            JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString());
            configUUID = syncInfo.getString(CmnConst.UUID);
        }
        journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID);
        journalEntity.setCount(dataCount);
        journalEntity.setMin_id(minId);
        journalEntity.setMax_id(maxId);
        journalEntity.setData_source(dataSourceUid);
        journalEntity.setSingle_duration(elapsedTime);
        journalEntity.setOther_info(syncDataLogEntity.toString());
        journalEntity.setCreated_utc_datetime(new Date());
        if (journalEntity.getResult() != 1 && throwable != null) {
            journalEntity.setError(journalManagerService.getStackTrace(throwable));
        }
        journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0);
        return journalEntity;
    }
    private JournalEntity getLogRecord(SyncDataLogEntity syncDataLogEntity, Integer dataCount, String configUid, String minId,
                                       String maxId, String dataSourceUid, long elapsedTime, Throwable throwable) {
        JournalEntity journalEntity = new JournalEntity();
        journalEntity.setType(3);
        journalEntity.setDeal_flag(0);
        journalEntity.setResult(syncDataLogEntity.getErrorType() == -1 ? 1 : 0);
        String configUUID = "";
        if (syncDataLogEntity.getSyncInfo() != null) {
            JSONObject syncInfo = JSONObject.parseObject(syncDataLogEntity.getSyncInfo().toString());
            configUUID = syncInfo.getString(CmnConst.UUID);
        }
        journalEntity.setConfigUuid(StringUtils.isEmpty(configUUID) ? "1" : configUUID);
        journalEntity.setCount(dataCount);
        journalEntity.setMin_id(minId);
        journalEntity.setMax_id(maxId);
        journalEntity.setData_source(dataSourceUid);
        journalEntity.setSingle_duration(elapsedTime);
        journalEntity.setOther_info(syncDataLogEntity.toString());
        journalEntity.setCreated_utc_datetime(new Date());
        if (journalEntity.getResult() != 1 && throwable != null) {
            journalEntity.setError(journalManagerService.getStackTrace(throwable));
        }
        journalEntity.setDeal_result(journalEntity.getResult() == 1 ? 1 : 0);
        return journalEntity;
    }
    private String getAimID(String s1, String s2, int sign) {
        if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
            return null;
        }
        if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
            return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1;
        }
        String numberRegexp = "\\d{1,11}";
        if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
            boolean b = Long.parseLong(s1) > Long.parseLong(s2);
            if (sign > 0) {
                return b ? s1 : s2;
            } else {
                return b ? s2 : s1;
            }
        } else {
            if (sign > 0) {
                return s1.compareTo(s2) > 0 ? s1 : s2;
            } else {
                return s1.compareTo(s2) > 0 ? s2 : s1;
            }
        }
    }
    private String getAimID(String s1, String s2, int sign) {
        if (org.apache.commons.lang3.StringUtils.isEmpty(s1) && org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
            return null;
        }
        if (org.apache.commons.lang3.StringUtils.isEmpty(s1) || org.apache.commons.lang3.StringUtils.isEmpty(s2)) {
            return org.apache.commons.lang3.StringUtils.isEmpty(s1) ? s2 : s1;
        }
        String numberRegexp = "\\d{1,11}";
        if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
            boolean b = Long.parseLong(s1) > Long.parseLong(s2);
            if (sign > 0) {
                return b ? s1 : s2;
            } else {
                return b ? s2 : s1;
            }
        } else {
            if (sign > 0) {
                return s1.compareTo(s2) > 0 ? s1 : s2;
            } else {
                return s1.compareTo(s2) > 0 ? s2 : s1;
            }
        }
    }
    /**
     * 处理唯一键集合
     *
     * @param sourceList 来源
     * @param targetList 目标
     * @param sign       1 来源单独存在;0 两者共有;-1 目标单独存在
     * @return
     */
    private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) {
        List<T> sourceUniqueKeyList = Lists.newArrayList();
        sourceUniqueKeyList.addAll(sourceList);
        List<T> targetUniqueKeyList = Lists.newArrayList();
        targetUniqueKeyList.addAll(targetList);
        List<T> resultObjList;
        if (sign == 1) {
            sourceUniqueKeyList.removeAll(targetUniqueKeyList);
            resultObjList = sourceUniqueKeyList;
        } else if (sign == -1) {
            targetUniqueKeyList.removeAll(sourceUniqueKeyList);
            resultObjList = targetUniqueKeyList;
        } else if (sign == 0) {
            sourceUniqueKeyList.retainAll(targetUniqueKeyList);
            resultObjList = sourceUniqueKeyList;
        } else {
            throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY);
        }
        return transferListType(resultObjList);
    }
    /**
     * 处理唯一键集合
     *
     * @param sourceList 来源
     * @param targetList 目标
     * @param sign       1 来源单独存在;0 两者共有;-1 目标单独存在
     * @return
     */
    private <T> List<T> dealDataUniqueKeyList(List<T> sourceList, List<T> targetList, int sign) {
        List<T> sourceUniqueKeyList = Lists.newArrayList();
        sourceUniqueKeyList.addAll(sourceList);
        List<T> targetUniqueKeyList = Lists.newArrayList();
        targetUniqueKeyList.addAll(targetList);
        List<T> resultObjList;
        if (sign == 1) {
            sourceUniqueKeyList.removeAll(targetUniqueKeyList);
            resultObjList = sourceUniqueKeyList;
        } else if (sign == -1) {
            targetUniqueKeyList.removeAll(sourceUniqueKeyList);
            resultObjList = targetUniqueKeyList;
        } else if (sign == 0) {
            sourceUniqueKeyList.retainAll(targetUniqueKeyList);
            resultObjList = sourceUniqueKeyList;
        } else {
            throw new BaseException(ErrorCode.SYNC_NO_DATA_DEAL_WAY);
        }
        return transferListType(resultObjList);
    }
    /**
     * 转换list中obj为string
     *
     * @param list
     * @return
     */
    private <T> List<T> transferListType(List<T> list) {
        List<T> resultList = Lists.newArrayList();
        list.forEach(k -> {
            if (k == null) {
                resultList.add(null);
            } else {
                resultList.add(k);
            }
        });
        return resultList;
    }
    /**
     * 转换list中obj为string
     *
     * @param list
     * @return
     */
    private <T> List<T> transferListType(List<T> list) {
        List<T> resultList = Lists.newArrayList();
        list.forEach(k -> {
            if (k == null) {
                resultList.add(null);
            } else {
                resultList.add(k);
            }
        });
        return resultList;
    }
    /**
     * 追加错误信息
     *
     * @param errorInfo
     * @param e
     */
    private void appendError(StringBuilder errorInfo, Throwable e) {
        if (errorInfo.length() > 0) {
            errorInfo.append("\n");
        }
        errorInfo.append(journalManagerService.getStackTrace(e));
    }
    /**
     * 追加错误信息
     *
     * @param errorInfo
     * @param e
     */
    private void appendError(StringBuilder errorInfo, Throwable e) {
        if (errorInfo.length() > 0) {
            errorInfo.append("\n");
        }
        errorInfo.append(journalManagerService.getStackTrace(e));
    }
}
product-server-data-center/src/main/java/com/product/data/center/utils/CustomLock.java
@@ -1,11 +1,15 @@
package com.product.data.center.utils;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ArrayUtil;
import com.product.core.spring.context.SpringMVCContextHolder;
import org.apache.commons.lang3.StringUtils;
import javax.swing.*;
import java.text.DateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -19,6 +23,7 @@
    private Map<String, Long> obj = new ConcurrentHashMap<>();
    private Map<String, Long> threadObj = new ConcurrentHashMap<>();
    private long detectionWaitTime = 1000L;
@@ -41,12 +46,64 @@
        return this.tryLock(ArrayUtil.join(key, ","));
    }
    public static void main(String[] args) {
        long timemillis = 1709654400039L;
        //转换为年月日时分秒
        String format = DateUtil.format(DateUtil.date(timemillis), "yyyy-MM-dd HH:mm:ss");
        System.out.println(format);
    }
    public boolean tryLock(String key, long timeoutMinute) {
        if (!StringUtils.isEmpty(key)) {
            key = key.toUpperCase();
            synchronized (key.intern()) {
                if (!this.obj.containsKey(key)) {
                    lock(key);
                    //获取当前线程
                    long id = Thread.currentThread().getId();
                    threadObj.put(key, id);
                    return true;
                } else if (timeoutMinute > 0) {
                    //获取锁进入开始时间
                    long startTime = obj.get(key);
                    //获取当前时间
                    long currentTime = System.currentTimeMillis();
                    //将分钟转换为毫秒
                    long timeout = timeoutMinute * 60 * 1000;
                    //如果当前时间减去开始时间大于等于超时时间
                    if (currentTime - startTime >= timeout) {
                        long threadId = threadObj.get(key);
                        //根据线程id获取线程
                        Thread[] threads = ThreadUtil.getThreads();
                        for (Thread thread : threads) {
                            if (thread.getId() == threadId) {
                                //中断线程
                                ThreadUtil.interrupt(thread, true);
                                SpringMVCContextHolder.getSystemLogger().error("线程:" + thread.getName() + "超时被中断");
                                return tryLock(key, timeoutMinute);
                            }
                        }
                        return true;
                    }
                }
            }
        }
        return false;
    }
    public boolean tryLock(String key) {
        if (!StringUtils.isEmpty(key)) {
            key = key.toUpperCase();
            synchronized (key.intern()) {
                if (!this.obj.containsKey(key)) {
                    lock(key);
                    //获取当前线程
                    long id = Thread.currentThread().getId();
                    threadObj.put(key, id);
                    return true;
                }
            }
product-server-web/resources/LicenseKey.dat
@@ -1 +1 @@

