许鹏程
2024-01-15 36dc14af5a78be3b3eb941ddc13a22d3aaf1fe3a
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingService.java
@@ -2,6 +2,7 @@
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.TimeInterval;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.RandomUtil;
import com.alibaba.fastjson.JSONObject;
@@ -49,1153 +50,1168 @@
@Service
public class DataArchivingService extends AbstractBaseService {
    @Autowired
    JournalManagerService journalManagerService;
    @Autowired
    DataArchivingQueue dataArchivingQueue;
   @Autowired
   JournalManagerService journalManagerService;
   @Autowired
   DataArchivingQueue dataArchivingQueue;
    @Resource
    private SyncDataConfigService syncDataConfigService;
   @Resource
   private SyncDataConfigService syncDataConfigService;
    private static CustomLock lock = new CustomLock();
   private static CustomLock lock = new CustomLock();
    private Map<String, String> getCollectIds(String[] deleteDataSource, String tableName) {
        StringBuilder sql = new StringBuilder("SELECT ");
        sql.append("id,data_source FROM product_sys_data_collect WHERE ");
        List<Object> params = new ArrayList<>();
        for (int i = 0; i < deleteDataSource.length; i++) {
            String source = deleteDataSource[i];
            if (i > 0) {
                sql.append(" or ");
            }
            sql.append("( data_source = ? and LOWER(source_table) = ? )");
            params.add(source);
            params.add(tableName.toLowerCase());
        }
        DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), params.toArray());
        if (DataTableEntity.isEmpty(dataTableEntity)) {
            return null;
        }
        Map<String, String> collect = dataTableEntity.getData().stream().collect(Collectors.toMap(item -> item.getString("data_source"), item -> item.getString("id")));
   private Map<String, String> getCollectIds(String[] deleteDataSource, String tableName) {
      StringBuilder sql = new StringBuilder("SELECT ");
      sql.append("id,data_source FROM product_sys_data_collect WHERE ");
      List<Object> params = new ArrayList<>();
      for (int i = 0; i < deleteDataSource.length; i++) {
         String source = deleteDataSource[i];
         if (i > 0) {
            sql.append(" or ");
         }
         sql.append("( data_source = ? and LOWER(source_table) = ? )");
         params.add(source);
         params.add(tableName.toLowerCase());
      }
      DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), params.toArray());
      if (DataTableEntity.isEmpty(dataTableEntity)) {
         return null;
      }
      Map<String, String> collect = dataTableEntity.getData().stream().collect(Collectors.toMap(item -> item.getString("data_source"), item -> item.getString("id")));
        if (collect.size() != deleteDataSource.length) {
            throw new BaseException(ErrorCode.GET_COLLECT_ID_FAIL);
        }
      if (collect.size() != deleteDataSource.length) {
         throw new BaseException(ErrorCode.GET_COLLECT_ID_FAIL);
      }
        return collect;
    }
      return collect;
   }
    public void reDeal(String logUuid) {
        FieldSetEntity fs = getBaseDao().getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUuid, false);
        if (!FieldSetEntity.isEmpty(fs)) {
            String configUid = fs.getString(CmnConst.CONFIG_UUID);
            reDealArchiving(configUid);
        }
        fs.setValue(CmnConst.DEAL_FLAG, 1);
        getBaseDao().update(fs);
    }
   public void reDeal(String logUuid) {
      FieldSetEntity fs = getBaseDao().getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUuid, false);
      if (!FieldSetEntity.isEmpty(fs)) {
         String configUid = fs.getString(CmnConst.CONFIG_UUID);
         reDealArchiving(configUid);
      }
      fs.setValue(CmnConst.DEAL_FLAG, 1);
      getBaseDao().update(fs);
   }
    @Async
    public void reDealArchiving(String uuid) {
        this.dataArchivingEntry(uuid);
    }
   @Async
   public void reDealArchiving(String uuid) {
      this.dataArchivingEntry(uuid);
   }
    /**
     * 扫码库删除
     *
     * @param configFse
     * @throws BaseException
     */
    private String sweepCodeLibrary(FieldSetEntity configFse) throws BaseException, SQLException {
        JournalEntity journalEntity = new JournalEntity();
        journalEntity.setDetail(3);
        TimeInterval timer = DateUtil.timer();
        int deleteSuccessCount = 0;
        try {
            String deleteDataSource = configFse.getString("delete_data_source");
            WriteUtil.append("DA-deleteDataSource:" + deleteDataSource);
            WriteUtil.append("DA-sub_delete_select_filter:" + configFse.getString("delete_select_filter"));
            if (StringUtils.isEmpty(deleteDataSource)) {
                return null;
            }
            String[] deleteDataSourceArray = deleteDataSource.split(",");
   /**
    * 扫码库删除
    *
    * @param configFse
    * @throws BaseException
    */
   private String sweepCodeLibrary(FieldSetEntity configFse) throws BaseException, SQLException {
      JournalEntity journalEntity = new JournalEntity();
      journalEntity.setDetail(3);
      TimeInterval timer = DateUtil.timer();
      int deleteSuccessCount = 0;
      try {
         String deleteDataSource = configFse.getString("delete_data_source");
         WriteUtil.append("DA-deleteDataSource:" + deleteDataSource);
         WriteUtil.append("DA-sub_delete_select_filter:" + configFse.getString("delete_select_filter"));
         if (StringUtils.isEmpty(deleteDataSource)) {
            return null;
         }
         String[] deleteDataSourceArray = deleteDataSource.split(",");
//            DataBaseEntity dbe = new DataBaseEntity(deleteDataSource);
            String source_data_validation = configFse.getString("source_data_validation");
            if (StringUtils.isEmpty(source_data_validation)) {
                return null;
            }
            String deleteDataTable = configFse.getString("delete_data_table");
            if (StringUtils.isEmpty(deleteDataTable)) {
                return null;
            }
         String source_data_validation = configFse.getString("source_data_validation");
         if (StringUtils.isEmpty(source_data_validation)) {
            return null;
         }
         String deleteDataTable = configFse.getString("delete_data_table");
         if (StringUtils.isEmpty(deleteDataTable)) {
            return null;
         }
            String collectIdField = configFse.getString("collect_id_field");
         String collectIdField = configFse.getString("collect_id_field");
            DataBaseEntity validationDbe = new DataBaseEntity(source_data_validation);
         DataBaseEntity validationDbe = new DataBaseEntity(source_data_validation);
            //扫码库删除唯一标识
            String deleteUniqueField = configFse.getString("delete_unique_field");
         //扫码库删除唯一标识
         String deleteUniqueField = configFse.getString("delete_unique_field");
            //验证表
            String validationTableName = configFse.getString("table_data_validation");
            //验证唯一字段
            String validationUniqueField = configFse.getString("validation_unique_field");
            //忽略对比字段
            List<String> ignoreComparisonFields =
                    !StringUtils.isEmpty(configFse.getString("ignore_comparison_field")) ?
                            Lists.newArrayList(Arrays.asList(configFse.getString("ignore_comparison_field").split(","))) : null;
            int pageSize = 500;
            DataTableEntity list;
            Dao dao = null;
            int currentCount = 0;
            Map<String, String> collectIds = getCollectIds(deleteDataSource.split(","), deleteDataTable);
            try {
                String minID = null;
                String maxID = null;
                for (int j = 0; j < deleteDataSourceArray.length; j++) {
                    if (dao != null) {
                        dao.closeConnection();
                    }
                    dao = new DataBaseEntity(deleteDataSourceArray[j]).getDao();
                    Connection connection = dao.getConnection();
                    connection.setAutoCommit(false);
                    do {
                        list = dao.getList(deleteDataTable, configFse.getString("delete_select_filter"), new Object[]{}, 1, pageSize);
                        WriteUtil.append("DA-删除子库数据-表名:" + deleteDataTable);
                        if (DataTableEntity.isEmpty(list)) {
                            break;
                        }
                        currentCount = list.getRows();
                        Object[] uniqueValues = list.getData().stream().map(item -> item.getString(deleteUniqueField)).toArray();
                        DataTableEntity validationData = validationDbe.getDao().getList(validationTableName, collectIdField + "= ? AND " +
                                BaseUtil.buildQuestionMarkFilter(validationUniqueField, uniqueValues.length, true), ArrayUtil.addAll(new Object[]{collectIds.get(deleteDataSourceArray[j])}, uniqueValues));
                        validationDbe.getDao().closeConnection();
                        if (DataTableEntity.isEmpty(validationData)) {
                            break;
                        }
                        Map<String, Map> collectMap = validationData.getData().stream().collect(Collectors.toMap(
                                (item) -> item.getString(validationUniqueField),
                                item -> item.getValues()));
                        validationData = null;
                        List<String> deleteUniqueValueList = new ArrayList<>();
                        for (int i = 0; i < list.getRows(); i++) {
                            FieldSetEntity fs = list.getFieldSetEntity(i);
                            String uniqueValue = fs.getString(deleteUniqueField);
                            Map<String, Object> map = collectMap.get(uniqueValue);
                            if (ignoreComparisonFields != null && map != null) {
                                for (int i1 = 0; i1 < ignoreComparisonFields.size(); i1++) {
                                    try {
                                        map.remove(ignoreComparisonFields.get(i1));
                                        fs.remove(ignoreComparisonFields.get(i1));
                                    } catch (Exception e) {
                                        e.printStackTrace();
                                    }
                                }
                            }
                            if (map == null || !new JSONObject((Map) fs.getValues()).equals(new JSONObject(map))) {
                                list.removeFieldSetEntity(i);
                                i--;
                                continue;
                            }
                            deleteUniqueValueList.add(uniqueValue);
                            minID = getAimID(minID, uniqueValue, 0);
                            maxID = getAimID(maxID, uniqueValue, 1);
                        }
                        collectMap.clear();
                        if (deleteUniqueValueList.size() > 0) {
                            deleteSuccessCount += dao.deleteRInt(configFse.getString("delete_data_table"),
                                    BaseUtil.buildQuestionMarkFilter(deleteUniqueField, deleteUniqueValueList.size(), true), deleteUniqueValueList.toArray());
                            WriteUtil.append("DA-删除子库数据-已经删除条数:" + deleteSuccessCount);
                        }
                    } while (currentCount == pageSize);
                    connection.commit();
                }
                journalEntity.setResult(1);
                journalEntity.setMin_id(minID);
                journalEntity.setMax_id(maxID);
            } catch (Exception e) {
                throw e;
            } finally {
                if (dao != null) {
                    dao.closeConnection();
                }
            }
        } catch (Exception e) {
            SpringMVCContextHolder.getSystemLogger().error(e);
            journalEntity.setResult(0);
            journalEntity.setError(journalManagerService.getStackTrace(e));
            throw e;
        } finally {
            journalEntity.setCount(deleteSuccessCount);
            journalEntity.setCreated_utc_datetime(new Date());
            journalEntity.setSingle_duration(timer.intervalMs());
            journalEntity.setType(5);
            journalEntity.setConfigUuid(configFse.getUUID());
        }
        if (journalEntity != null && (journalEntity.getCount() > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1)) {
            return journalManagerService.autoCreateJournal(journalEntity).getUUID();
        }
        return null;
    }
         //验证表
         String validationTableName = configFse.getString("table_data_validation");
         //验证唯一字段
         String validationUniqueField = configFse.getString("validation_unique_field");
         //忽略对比字段
         List<String> ignoreComparisonFields =
               !StringUtils.isEmpty(configFse.getString("ignore_comparison_field")) ?
                     Lists.newArrayList(Arrays.asList(configFse.getString("ignore_comparison_field").split(","))) : null;
         int pageSize = 500;
         DataTableEntity list;
         Dao dao = null;
         int currentCount = 0;
         Map<String, String> collectIds = getCollectIds(deleteDataSource.split(","), deleteDataTable);
         try {
            String minID = null;
            String maxID = null;
            for (int j = 0; j < deleteDataSourceArray.length; j++) {
               if (dao != null) {
                  dao.closeConnection();
               }
               dao = new DataBaseEntity(deleteDataSourceArray[j]).getDao();
               do {
//                  String sql = "SELECT  * FROM (SELECT * from " + deleteDataTable + " where " + configFse.getString("delete_select_filter") + " order by " + deleteUniqueField + " ) A  ";
//                  if (!Objects.isNull(minID) && !Objects.isNull(maxID)) {
//                     sql += " where " + deleteUniqueField + " > " + maxID;
//                  }
                  String filter = configFse.getString("delete_select_filter");
                  if (!Objects.isNull(minID) && !Objects.isNull(maxID)) {
                     filter += "  and ( " + deleteUniqueField + " > " + maxID + ")";
                  }
                  list = dao.getList(deleteDataTable, filter, new Object[]{}, deleteUniqueField, 1, pageSize);
                  WriteUtil.append("DA-删除子库数据-表名:" + deleteDataTable);
                  if (DataTableEntity.isEmpty(list)) {
                     break;
                  }
                  currentCount = list.getRows();
                  Object[] uniqueValues = list.getData().stream().map(item -> item.getString(deleteUniqueField)).toArray();
                  DataTableEntity validationData = validationDbe.getDao().getList(validationTableName, collectIdField + "= ? AND " +
                        BaseUtil.buildQuestionMarkFilter(validationUniqueField, uniqueValues.length, true), ArrayUtil.addAll(new Object[]{collectIds.get(deleteDataSourceArray[j])}, uniqueValues));
                  validationDbe.getDao().closeConnection();
                  if (DataTableEntity.isEmpty(validationData)) {
                     break;
                  }
                  Map<String, Map> collectMap = validationData.getData().stream().collect(Collectors.toMap(
                        (item) -> item.getString(validationUniqueField),
                        item -> item.getValues()));
                  List<String> deleteUniqueValueList = new ArrayList<>();
                  for (int i = 0; i < list.getRows(); i++) {
                     FieldSetEntity fs = list.getFieldSetEntity(i);
                     String uniqueValue = fs.getString(deleteUniqueField);
                     Map<String, Object> map = collectMap.get(uniqueValue);
                     if (ignoreComparisonFields != null && map != null) {
                        for (int i1 = 0; i1 < ignoreComparisonFields.size(); i1++) {
                           try {
                              map.remove(ignoreComparisonFields.get(i1));
                              fs.remove(ignoreComparisonFields.get(i1));
                           } catch (Exception e) {
                              e.printStackTrace();
                           }
                        }
                     }
                     if (map == null || !new JSONObject((Map) fs.getValues()).equals(new JSONObject(map))) {
                        list.removeFieldSetEntity(i);
                        i--;
                        continue;
                     }
                     deleteUniqueValueList.add(uniqueValue);
                     minID = getAimID(minID, uniqueValue, 0);
                     maxID = getAimID(maxID, uniqueValue, 1);
                  }
                  collectMap.clear();
                  if (deleteUniqueValueList.size() > 0) {
                     deleteSuccessCount += dao.deleteRInt(configFse.getString("delete_data_table"),
                           BaseUtil.buildQuestionMarkFilter(deleteUniqueField, deleteUniqueValueList.size(), true), deleteUniqueValueList.toArray());
                     WriteUtil.append("DA-删除子库数据-已经删除条数:" + deleteSuccessCount);
    /**
     * 数据归档入口
     *
     * @param uuid 归档配置uuid
     */
    public void dataArchivingEntry(String uuid) {
        if (lock.tryLock(uuid)) {
            try {
                dataArchivingEntryLock(uuid);
            } catch (Exception e) {
                throw e;
            } finally {
                lock.unLock(uuid);
            }
            return;
        }
        SpringMVCContextHolder.getSystemLogger().info("跳过执行:" + uuid);
    }
                  }
               } while (currentCount == pageSize);
            }
            journalEntity.setResult(1);
            journalEntity.setMin_id(minID);
            journalEntity.setMax_id(maxID);
         } catch (Exception e) {
            throw e;
         } finally {
            if (dao != null) {
               dao.closeConnection();
            }
         }
      } catch (Exception e) {
         SpringMVCContextHolder.getSystemLogger().error(e);
         journalEntity.setResult(0);
         journalEntity.setError(journalManagerService.getStackTrace(e));
         throw e;
      } finally {
         journalEntity.setCount(deleteSuccessCount);
         journalEntity.setCreated_utc_datetime(new Date());
         journalEntity.setSingle_duration(timer.intervalMs());
         journalEntity.setType(5);
         journalEntity.setConfigUuid(configFse.getUUID());
      }
      if (journalEntity != null && (journalEntity.getCount() > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1)) {
         return journalManagerService.autoCreateJournal(journalEntity).getUUID();
      }
      return null;
   }
    public void dataArchivingEntryLock(String uuid) {
        SpringMVCContextHolder.getSystemLogger().info("开始执行归档》》》》" + uuid);
        WriteUtil.append("DA》》》");
        WriteUtil.append("DA-已经获取到锁");
        TimeInterval timer = DateUtil.timer();
        String curDateTimeStr = DateUtil.date().toString();
        int archivingSuccessCount = 0;
        JournalEntity journalEntity = new JournalEntity();
        journalEntity.setResult(1);
        try {
            //获取归档服务配置详情
            FieldSetEntity configFse = getBaseDao().getFieldSetEntity(CmnConst.DATA_ARCHIVING_TABLE, uuid, false);
            TimeInterval tempTestTimer = DateUtil.timer();
            //来源数据表
            String sourceTable = configFse.getString("source_table");
            boolean canExecuteClearFlag = canExecuteClear(sourceTable);
            String deleteSubLogUUID = null;
            if (canExecuteClearFlag) {
                WriteUtil.append("DA-删除扫码库》》》");
                //删除扫码库已提取到mes主库且根据配置条件过滤的数据 KT特有
                deleteSubLogUUID = this.sweepCodeLibrary(configFse);
                WriteUtil.append("DA-删除扫码库耗时:" + tempTestTimer.intervalMs());
            }
   /**
    * 数据归档入口
    *
    * @param uuid 归档配置uuid
    */
   public void dataArchivingEntry(String uuid) {
      if (lock.tryLock(uuid)) {
         try {
            dataArchivingEntryLock(uuid);
         } catch (Exception e) {
            throw e;
         } finally {
            lock.unLock(uuid);
         }
         return;
      }
      SpringMVCContextHolder.getSystemLogger().info("跳过执行:" + uuid);
   }
            //来源数据源
            String sourceDataSource = configFse.getString("source_data_source");
            DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSource);
            //目标数据源
            String targetDataSource = configFse.getString("target_data_source");
            DataBaseEntity targetDbe = new DataBaseEntity(targetDataSource);
            //目标数据表前缀
            String targetTablePrefix = configFse.getString("target_table_prefix");
            if (targetTablePrefix.endsWith("_")) {
                targetTablePrefix = targetTablePrefix.substring(0, targetTablePrefix.length() - 1);
            }
            //来源采集ID字段
            String sourceCollectIdField = configFse.getString("source_collect_id_field");
            //来源唯一标识字段 由MES子库生成的值
            String sourceUniqueField = configFse.getString("source_unique_field");
            //唯一字段
            String uniqueField = configFse.getString("unique_field");
            // 时间字段
            String timeField = configFse.getString("time_field");
            // 提取时间字段
            String extractTimeField = configFse.getString(CmnConst.EXTRACT_TIME_FIELD);
   public void dataArchivingEntryLock(String uuid) {
      SpringMVCContextHolder.getSystemLogger().info("开始执行归档》》》》" + uuid);
      WriteUtil.append("DA》》》");
      WriteUtil.append("DA-已经获取到锁");
      TimeInterval timer = DateUtil.timer();
      String curDateTimeStr = DateUtil.date().toString();
      int archivingSuccessCount = 0;
      JournalEntity journalEntity = new JournalEntity();
      journalEntity.setResult(1);
      try {
         //获取归档服务配置详情
         FieldSetEntity configFse = getBaseDao().getFieldSetEntity(CmnConst.DATA_ARCHIVING_TABLE, uuid, false);
         TimeInterval tempTestTimer = DateUtil.timer();
         //来源数据表
         String sourceTable = configFse.getString("source_table");
         boolean canExecuteClearFlag = canExecuteClear(sourceTable);
         String deleteSubLogUUID = null;
            Dao sourceDao = sourceDbe.getDao();
            Dao targetDao = targetDbe.getDao();
            Set<String> targetTableSet;
            try {
                DataArchivingServiceImpl service = new DataArchivingServiceImpl(sourceDao, targetDao, sourceTable, uuid, sourceDbe.getDbName(), targetDbe.getDbName());
                String keyPrefix = "DA_STORE:" + sourceTable + ":";
                boolean serializeFlag = true;
                int outTime = 60 * 60;// 60 min
                RedisService readRedis = StringUtils.isEmpty(configFse.getString("redis_config_uuid")) ? new RedisService() : new RedisService(new DataBaseEntity(configFse.getString("redis_config_uuid")));
                tempTestTimer = DateUtil.timer();
                WriteUtil.append("DA-获取表名集合》》》");
                if (!StringUtils.isEmpty(timeField)) {
                    targetTableSet = QuerySqlParseUtil.getAllTableName(targetDao, targetDbe.getDbName(), sourceTable);
                } else {
                    targetTableSet = Sets.newLinkedHashSet();
                    targetTableSet.add(targetTablePrefix);
                }
                WriteUtil.append("DA-获取表名集合耗时:" + tempTestTimer.intervalMs());
                FieldSetEntity paramFse = getBaseDao().getFieldSetBySQL("select max(statistics_final_time) statistics_final_time from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false);
                Date preMaxTime = paramFse.getDate("statistics_final_time");
                // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容)
                boolean turnRedisFilterFlag = preMaxTime == null;
                boolean turnTargetDBClearFlag = false;
                if (turnRedisFilterFlag) {
                    // 若是没有成功的日志,表示为第一次归档,跳过目标库数据清理
                    FieldSetEntity logExistsFse = getBaseDao().getFieldSetBySQL("select count(1) count_value from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false);
                    turnTargetDBClearFlag = "0".equals(logExistsFse.getString("count_value"));
                }
                StringBuilder paramSql = new StringBuilder(128);
                paramSql.append("select max(").append(uniqueField).append(") max_id,min(").append(uniqueField).append(") min_id");
                if (!StringUtils.isEmpty(timeField)) {
                    paramSql.append(",max(").append(timeField).append(") max_update_time,min(").append(timeField).append(") min_update_time");
                }
                if (!StringUtils.isEmpty(extractTimeField)) {
                    paramSql.append(",max(").append(extractTimeField).append(") max_extract_time,min(").append(extractTimeField).append(") min_extract_time");
                }
                paramSql.append(" from ").append(sourceTable);
                StringBuilder filterSb = new StringBuilder(128);
                if (preMaxTime != null && (!StringUtils.isEmpty(timeField) || !StringUtils.isEmpty(extractTimeField))) {
                    int sourceDbType = sourceDbe.getDbType().getValue();
                    if (!StringUtils.isEmpty(timeField)) {
                        filterSb.append("(").append(timeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE))
                                .append(" and ").append(timeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")");
                    }
                    if (!StringUtils.isEmpty(extractTimeField)) {
                        if (filterSb.length() > 0) {
                            filterSb.append(" or ");
                        }
                        filterSb.append("(").append(extractTimeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE))
                                .append(" and ").append(extractTimeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")");
                    }
                    paramSql.append(" where (").append(filterSb).append(")");
                }
                paramFse = sourceDao.getOne(paramSql.toString());
                if (filterSb.length() > 0) {
                    filterSb.insert(0, "(");
                    filterSb.append(") and ");
                }
                if (StringUtils.isEmpty(paramFse.getString("max_id"))) {
                    WriteUtil.append("最大id为空,跳出,执行sql:" + paramSql);
                    return;
                }
                filterSb.append(uniqueField).append("<='").append(paramFse.getString("max_id")).append("'");
                if (!StringUtils.isEmpty(configFse.getString("select_filter")) && !StringUtils.isEmpty(configFse.getString("select_filter").trim())) {
                    filterSb.append(" and (").append(configFse.getString("select_filter")).append(")");
                }
                WriteUtil.append("DA-sql-filter:" + filterSb);
                Date statisticsStartTime = getAimDate(paramFse.getDate("min_update_time"), paramFse.getDate("min_extract_time"), 0);
                Date statisticsFinalTime = getAimDate(paramFse.getDate("max_update_time"), paramFse.getDate("max_extract_time"), 0);
                String maxID = paramFse.getString("max_id");
                String minID = paramFse.getString("min_id");
                String splitTableType = "1".equals(configFse.getString("split_table_type")) ? "1" : "0";
                dataArchivingQueue.query(sourceDbe, sourceTable, filterSb.toString(), null, uniqueField, minID);
                DataTableEntity allDte;
                Map<String, List<DataTableEntity>> groupDteMap;
                do {
                    allDte = dataArchivingQueue.get(sourceTable);
                    if (DataTableEntity.isEmpty(allDte)) {
                        WriteUtil.append("DA-从队列中获取内容为空,执行睡眠跳过...");
                        Thread.sleep(RandomUtil.randomInt(800, 1200));
                        continue;
                    }
                    WriteUtil.append("DA-从队列中获取内容非空,执行插入...");
                    groupDteMap = dteGroupByTime(allDte, timeField, splitTableType);
                    FieldSetEntity tempFse;
                    String field;
                    Object value;
                    Date date;
                    Date updateTime;
                    Date extractTime;
                    Map<String, Set<String>> sourceTableUniqueByCollectId = null;
                    for (Map.Entry<String, List<DataTableEntity>> entry : groupDteMap.entrySet()) {
                        String time = entry.getKey();
                        List<DataTableEntity> dteList = entry.getValue();
                        for (DataTableEntity list : dteList) {
                            //创建表(不存在才创建)
                            tempTestTimer = DateUtil.timer();
                            WriteUtil.append("DA-创建表》》》");
                            String tableName = service.createTable(targetTablePrefix, time, targetDbe);
                            WriteUtil.append("DA-创建表耗时:" + tempTestTimer.intervalMs());
                            targetTableSet.add(tableName);
                            FieldSetEntity fs = list.getFieldSetEntity(0);
                            tempTestTimer = DateUtil.timer();
                            WriteUtil.append("DA-数据筛选》》》");
                            if (!StringUtils.isEmpty(sourceCollectIdField)) {
                                sourceTableUniqueByCollectId = new HashMap<>();
                            }
                            // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容),跳过验证
                            if (!turnRedisFilterFlag) {
                                // 验证是否存在redis中,若是存在,比较时间字段值的大小,若是查询出数据中的时间更靠近当前时间,那么重置redis中的时间和过期时间,若是redis中的时间更靠近当前时间,则剔除数据集中的数据,并重置过期时间;若是不存在,则正常执行,先删除然后新增
                                for (int i = list.getRows() - 1; i >= 0; i--) {
                                    tempFse = list.getFieldSetEntity(i);
                                    updateTime = tempFse.getDate(timeField);
                                    extractTime = tempFse.getDate(extractTimeField);
                                    date = getAimDate(updateTime, extractTime, 0);
                                    field = tempFse.getString(uniqueField);
                                    value = readRedis.get(keyPrefix + field, serializeFlag);
                                    String sourceUniqueValue = tempFse.getString(sourceUniqueField);
                                    String collectId = tempFse.getString(sourceCollectIdField);
                                    if (!StringUtils.isEmpty(sourceCollectIdField) && sourceTableUniqueByCollectId != null
                                            && !StringUtils.isEmpty(collectId) && !StringUtils.isEmpty(sourceUniqueValue)) {
                                        Set<String> uniqueValues = sourceTableUniqueByCollectId.computeIfAbsent(collectId, k -> new HashSet<>());
                                        uniqueValues.add(sourceUniqueValue);
                                    }
                                    if (value != null && !StringUtils.isEmpty(value.toString())) {
                                        if (((Date) value).compareTo(date) <= 0) {
                                            readRedis.set(keyPrefix + field, date, serializeFlag);
                                            readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
                                            statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
                                            statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
                                            minID = getAimID(minID, field, 0);
                                            maxID = getAimID(maxID, field, 1);
                                        } else {
                                            list.removeFieldSetEntity(i);
                                            readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
                                        }
                                    } else {
                                        readRedis.set(keyPrefix + field, date, serializeFlag);
                                        readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
                                        statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
                                        statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
                                        minID = getAimID(minID, field, 0);
                                        maxID = getAimID(maxID, field, 1);
                                    }
                                }
                            }
                            WriteUtil.append("DA-数据筛选耗时:" + tempTestTimer.intervalMs());
                            if (DataTableEntity.isEmpty(list)) {
                                continue;
                            }
                            //重设表名
                            fs.setTableName(tableName);
                            list.setMeta(fs.getMeta());
                            Connection connection = sourceDao.getConnection();
                            //设置手动提交
                            connection.setAutoCommit(false);
                            try {
                                tempTestTimer = DateUtil.timer();
                                WriteUtil.append("DA-清理数据》》》");
                                // 拉取全量数据到空表(第一次归档),跳过清理
                                if (!turnTargetDBClearFlag) {
                                    // 清理数据
                                    DataTableEntity clearDte = clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list);
                                    WriteUtil.append("DA-清理数据量:" + clearDte.getRows());
                                }
                                WriteUtil.append("DA-清理数据耗时:" + tempTestTimer.intervalMs());
                                // 新增数据
                                tempTestTimer = DateUtil.timer();
                                WriteUtil.append("DA-新增数据量:" + list.getRows());
                                if (list.getRows() > 0) {
                                    WriteUtil.append("DA-新增数据》》》表名:" + list.getFieldSetEntity(0).getMeta().getTableName()[0]);
                                    try {
                                        targetDao.addBatch(list);
                                    } catch (Exception e) {
                                        if (turnTargetDBClearFlag) {
                                            clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list);
                                            targetDao.addBatch(list);
                                        } else {
                                            throw e;
                                        }
                                    }
                                }
                                WriteUtil.append("DA-新增数据耗时:" + tempTestTimer.intervalMs());
                                // 提交
                                connection.commit();
                            } catch (Exception e) {
         if (canExecuteClearFlag) {
            WriteUtil.append("DA-删除扫码库》》》");
            //删除扫码库已提取到mes主库且根据配置条件过滤的数据 KT特有
//            deleteSubLogUUID = this.sweepCodeLibrary(configFse);
            //更改为异步执行
            ThreadUtil.execAsync(() -> {
               try {
                  sweepCodeLibrary(configFse);
               } catch (SQLException e) {
                  e.printStackTrace();
               }
            });
         }
                                //若批量添加失败回滚删除
                                try {
                                    connection.rollback();
                                } catch (Exception er) {
                                    e.printStackTrace();
                                    throw er;
                                }
                                throw e;
                            } finally {
                                //重设连接为自动提交
                                connection.setAutoCommit(true);
                            }
                            archivingSuccessCount += list.getRows();
                        }
                    }
                } while (!dataArchivingQueue.checkQueryFinish(sourceTable) || !dataArchivingQueue.checkInsertQueueEmpty(sourceTable));
                journalEntity.setSingle_duration(timer.intervalMs());
                journalEntity.setStatistics_start_time(statisticsStartTime);
                journalEntity.setStatistics_final_time(statisticsFinalTime);
                journalEntity.setMin_id(minID);
                journalEntity.setMax_id(maxID);
                WriteUtil.append("DA-循环完毕");
            } catch (Exception e) {
                WriteUtil.append("error:\n" + journalManagerService.getStackTrace(e));
                throw e;
            } finally {
                targetDao.closeConnection();
                sourceDao.closeConnection();
                // 关闭线程
                dataArchivingQueue.shutdownQueryThread(sourceTable);
            }
         //来源数据源
         String sourceDataSource = configFse.getString("source_data_source");
         DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSource);
         //目标数据源
         String targetDataSource = configFse.getString("target_data_source");
         DataBaseEntity targetDbe = new DataBaseEntity(targetDataSource);
         //目标数据表前缀
         String targetTablePrefix = configFse.getString("target_table_prefix");
         if (targetTablePrefix.endsWith("_")) {
            targetTablePrefix = targetTablePrefix.substring(0, targetTablePrefix.length() - 1);
         }
         //来源采集ID字段
         String sourceCollectIdField = configFse.getString("source_collect_id_field");
         //来源唯一标识字段 由MES子库生成的值
         String sourceUniqueField = configFse.getString("source_unique_field");
         //唯一字段
         String uniqueField = configFse.getString("unique_field");
         // 时间字段
         String timeField = configFse.getString("time_field");
         // 提取时间字段
         String extractTimeField = configFse.getString(CmnConst.EXTRACT_TIME_FIELD);
            // 删除mes主库的内容
            if (canExecuteClearFlag) {
                WriteUtil.append("DA-删除mes主库内容》》》");
                String deleteMasterLogUUID = deleteMasterData(sourceTable, configFse, sourceDao, targetDao, uniqueField, timeField, extractTimeField, targetTableSet, deleteSubLogUUID);
                if (!StringUtils.isEmpty(deleteMasterLogUUID)) {
                    journalEntity.setPre_step_uuid(deleteMasterLogUUID);
                } else if (!StringUtils.isEmpty(deleteSubLogUUID)) {
                    journalEntity.setPre_step_uuid(deleteMasterLogUUID);
                }
                WriteUtil.append("DA-删除mes主库内容完毕");
            }
            String errorInfo = dataArchivingQueue.getErrorLog(sourceTable);
            if (!StringUtils.isEmpty(errorInfo)) {
                journalEntity.setError(errorInfo);
                journalEntity.setResult(0);
            }
        } catch (Exception e) {
            SpringMVCContextHolder.getSystemLogger().error(e);
            if (!StringUtils.isEmpty(journalEntity.getError())) {
                journalEntity.setError(journalEntity.getError() + "\n" + journalManagerService.getStackTrace(e));
            } else {
                journalEntity.setError(journalManagerService.getStackTrace(e));
            }
            journalEntity.setResult(0);
        } finally {
            journalEntity.setSingle_duration(timer.intervalMs());
            journalEntity.setCount(archivingSuccessCount);
            journalEntity.setConfigUuid(uuid);
            journalEntity.setType(5);
            journalEntity.setDetail(4);
            journalEntity.setCreated_utc_datetime(new Date());
            FieldSetEntity curLogFse = null;
            if (archivingSuccessCount > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1 || !StringUtils.isEmpty(journalEntity.getPre_step_uuid())) {
                curLogFse = journalManagerService.autoCreateJournal(journalEntity);
            }
         Dao sourceDao = sourceDbe.getDao();
         Dao targetDao = targetDbe.getDao();
         Set<String> targetTableSet;
         try {
            DataArchivingServiceImpl service = new DataArchivingServiceImpl(sourceDao, targetDao, sourceTable, uuid, sourceDbe.getDbName(), targetDbe.getDbName());
            String keyPrefix = "DA_STORE:" + sourceTable + ":";
            boolean serializeFlag = true;
            int outTime = 60 * 60;// 60 min
            RedisService readRedis = StringUtils.isEmpty(configFse.getString("redis_config_uuid")) ? new RedisService() : new RedisService(new DataBaseEntity(configFse.getString("redis_config_uuid")));
            tempTestTimer = DateUtil.timer();
            WriteUtil.append("DA-获取表名集合》》》");
            if (!StringUtils.isEmpty(timeField)) {
               targetTableSet = QuerySqlParseUtil.getAllTableName(targetDao, targetDbe.getDbName(), sourceTable);
            } else {
               targetTableSet = Sets.newLinkedHashSet();
               targetTableSet.add(targetTablePrefix);
            }
            WriteUtil.append("DA-获取表名集合耗时:" + tempTestTimer.intervalMs());
            FieldSetEntity paramFse = getBaseDao().getFieldSetBySQL("select max(statistics_final_time) statistics_final_time from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false);
            Date preMaxTime = paramFse.getDate("statistics_final_time");
            // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容)
            boolean turnRedisFilterFlag = preMaxTime == null;
            boolean turnTargetDBClearFlag = false;
            if (turnRedisFilterFlag) {
               // 若是没有成功的日志,表示为第一次归档,跳过目标库数据清理
               FieldSetEntity logExistsFse = getBaseDao().getFieldSetBySQL("select count(1) count_value from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false);
               turnTargetDBClearFlag = "0".equals(logExistsFse.getString("count_value"));
            }
            StringBuilder paramSql = new StringBuilder(128);
            paramSql.append("select max(").append(uniqueField).append(") max_id,min(").append(uniqueField).append(") min_id");
            if (!StringUtils.isEmpty(timeField)) {
               paramSql.append(",max(").append(timeField).append(") max_update_time,min(").append(timeField).append(") min_update_time");
            }
            if (!StringUtils.isEmpty(extractTimeField)) {
               paramSql.append(",max(").append(extractTimeField).append(") max_extract_time,min(").append(extractTimeField).append(") min_extract_time");
            }
            paramSql.append(" from ").append(sourceTable);
            StringBuilder filterSb = new StringBuilder(128);
            if (preMaxTime != null && (!StringUtils.isEmpty(timeField) || !StringUtils.isEmpty(extractTimeField))) {
               int sourceDbType = sourceDbe.getDbType().getValue();
               if (!StringUtils.isEmpty(timeField)) {
                  filterSb.append("(").append(timeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE))
                        .append(" and ").append(timeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")");
               }
               if (!StringUtils.isEmpty(extractTimeField)) {
                  if (filterSb.length() > 0) {
                     filterSb.append(" or ");
                  }
                  filterSb.append("(").append(extractTimeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE))
                        .append(" and ").append(extractTimeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")");
               }
               paramSql.append(" where (").append(filterSb).append(")");
            }
            paramFse = sourceDao.getOne(paramSql.toString());
            if (filterSb.length() > 0) {
               filterSb.insert(0, "(");
               filterSb.append(") and ");
            }
            if (StringUtils.isEmpty(paramFse.getString("max_id"))) {
               WriteUtil.append("最大id为空,跳出,执行sql:" + paramSql);
               return;
            }
            filterSb.append(uniqueField).append("<='").append(paramFse.getString("max_id")).append("'");
            if (!StringUtils.isEmpty(configFse.getString("select_filter")) && !StringUtils.isEmpty(configFse.getString("select_filter").trim())) {
               filterSb.append(" and (").append(configFse.getString("select_filter")).append(")");
            }
            WriteUtil.append("DA-sql-filter:" + filterSb);
            Date statisticsStartTime = getAimDate(paramFse.getDate("min_update_time"), paramFse.getDate("min_extract_time"), 0);
            Date statisticsFinalTime = getAimDate(paramFse.getDate("max_update_time"), paramFse.getDate("max_extract_time"), 0);
            String maxID = paramFse.getString("max_id");
            String minID = paramFse.getString("min_id");
            String splitTableType = "1".equals(configFse.getString("split_table_type")) ? "1" : "0";
            dataArchivingQueue.query(sourceDbe, sourceTable, filterSb.toString(), null, uniqueField, minID);
            DataTableEntity allDte;
            Map<String, List<DataTableEntity>> groupDteMap;
            do {
               allDte = dataArchivingQueue.get(sourceTable);
               if (DataTableEntity.isEmpty(allDte)) {
                  WriteUtil.append("DA-从队列中获取内容为空,执行睡眠跳过...");
                  Thread.sleep(RandomUtil.randomInt(800, 1200));
                  continue;
               }
               WriteUtil.append("DA-从队列中获取内容非空,执行插入...");
               groupDteMap = dteGroupByTime(allDte, timeField, splitTableType);
               FieldSetEntity tempFse;
               String field;
               Object value;
               Date date;
               Date updateTime;
               Date extractTime;
               Map<String, Set<String>> sourceTableUniqueByCollectId = null;
               for (Map.Entry<String, List<DataTableEntity>> entry : groupDteMap.entrySet()) {
                  String time = entry.getKey();
                  List<DataTableEntity> dteList = entry.getValue();
                  for (DataTableEntity list : dteList) {
                     //创建表(不存在才创建)
                     tempTestTimer = DateUtil.timer();
                     WriteUtil.append("DA-创建表》》》");
                     String tableName = service.createTable(targetTablePrefix, time, targetDbe);
                     WriteUtil.append("DA-创建表耗时:" + tempTestTimer.intervalMs());
                     targetTableSet.add(tableName);
                     FieldSetEntity fs = list.getFieldSetEntity(0);
                     tempTestTimer = DateUtil.timer();
                     WriteUtil.append("DA-数据筛选》》》");
                     if (!StringUtils.isEmpty(sourceCollectIdField)) {
                        sourceTableUniqueByCollectId = new HashMap<>();
                     }
                     // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容),跳过验证
                     if (!turnRedisFilterFlag) {
                        // 验证是否存在redis中,若是存在,比较时间字段值的大小,若是查询出数据中的时间更靠近当前时间,那么重置redis中的时间和过期时间,若是redis中的时间更靠近当前时间,则剔除数据集中的数据,并重置过期时间;若是不存在,则正常执行,先删除然后新增
                        for (int i = list.getRows() - 1; i >= 0; i--) {
                           tempFse = list.getFieldSetEntity(i);
                           updateTime = tempFse.getDate(timeField);
                           extractTime = tempFse.getDate(extractTimeField);
                           date = getAimDate(updateTime, extractTime, 0);
                           field = tempFse.getString(uniqueField);
                           value = readRedis.get(keyPrefix + field, serializeFlag);
                           String sourceUniqueValue = tempFse.getString(sourceUniqueField);
                           String collectId = tempFse.getString(sourceCollectIdField);
                           if (!StringUtils.isEmpty(sourceCollectIdField) && sourceTableUniqueByCollectId != null
                                 && !StringUtils.isEmpty(collectId) && !StringUtils.isEmpty(sourceUniqueValue)) {
                              Set<String> uniqueValues = sourceTableUniqueByCollectId.computeIfAbsent(collectId, k -> new HashSet<>());
                              uniqueValues.add(sourceUniqueValue);
                           }
                           if (value != null && !StringUtils.isEmpty(value.toString())) {
                              if (((Date) value).compareTo(date) <= 0) {
                                 readRedis.set(keyPrefix + field, date, serializeFlag);
                                 readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
                                 statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
                                 statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
                                 minID = getAimID(minID, field, 0);
                                 maxID = getAimID(maxID, field, 1);
                              } else {
                                 list.removeFieldSetEntity(i);
                                 readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
                              }
                           } else {
                              readRedis.set(keyPrefix + field, date, serializeFlag);
                              readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
                              statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
                              statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
                              minID = getAimID(minID, field, 0);
                              maxID = getAimID(maxID, field, 1);
                           }
                        }
                     }
                     WriteUtil.append("DA-数据筛选耗时:" + tempTestTimer.intervalMs());
                     if (DataTableEntity.isEmpty(list)) {
                        continue;
                     }
                     //重设表名
                     fs.setTableName(tableName);
                     list.setMeta(fs.getMeta());
                     Connection connection = sourceDao.getConnection();
                     //设置手动提交
                     connection.setAutoCommit(false);
                     try {
                        tempTestTimer = DateUtil.timer();
                        WriteUtil.append("DA-清理数据》》》");
                        // 拉取全量数据到空表(第一次归档),跳过清理
                        if (!turnTargetDBClearFlag) {
                           // 清理数据
                           DataTableEntity clearDte = clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list);
                           WriteUtil.append("DA-清理数据量:" + clearDte.getRows());
                        }
                        WriteUtil.append("DA-清理数据耗时:" + tempTestTimer.intervalMs());
                        // 新增数据
                        tempTestTimer = DateUtil.timer();
                        WriteUtil.append("DA-新增数据量:" + list.getRows());
                        if (list.getRows() > 0) {
                           WriteUtil.append("DA-新增数据》》》表名:" + list.getFieldSetEntity(0).getMeta().getTableName()[0]);
                           try {
                              targetDao.addBatch(list);
                           } catch (Exception e) {
                              if (turnTargetDBClearFlag) {
                                 clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list);
                                 targetDao.addBatch(list);
                              } else {
                                 throw e;
                              }
                           }
                        }
                        WriteUtil.append("DA-新增数据耗时:" + tempTestTimer.intervalMs());
                        // 提交
                        connection.commit();
                     } catch (Exception e) {
            // 将日志表中执行失败的记录标记为已经重新处理
            WriteUtil.append("DA-将日志表中执行失败的记录标记为已经重新处理");
            DataTableEntity logDte;
            if (curLogFse == null) {
                logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=?", new Object[]{uuid});
            } else {
                logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=? and uuid<>?", new Object[]{uuid, curLogFse.getUUID()});
            }
            for (int i = 0; i < logDte.getRows(); i++) {
                journalManagerService.writeBackReDealResult(logDte.getFieldSetEntity(i), true);
            }
            WriteUtil.append("DA-执行完毕");
        }
    }
                        //若批量添加失败回滚删除
                        try {
                           connection.rollback();
                        } catch (Exception er) {
                           e.printStackTrace();
                           throw er;
                        }
                        throw e;
                     } finally {
                        //重设连接为自动提交
                        connection.setAutoCommit(true);
                     }
                     archivingSuccessCount += list.getRows();
                  }
               }
            } while (!dataArchivingQueue.checkQueryFinish(sourceTable) || !dataArchivingQueue.checkInsertQueueEmpty(sourceTable));
            journalEntity.setSingle_duration(timer.intervalMs());
            journalEntity.setStatistics_start_time(statisticsStartTime);
            journalEntity.setStatistics_final_time(statisticsFinalTime);
            journalEntity.setMin_id(minID);
            journalEntity.setMax_id(maxID);
            WriteUtil.append("DA-循环完毕");
         } catch (Exception e) {
            WriteUtil.append("error:\n" + journalManagerService.getStackTrace(e));
            throw e;
         } finally {
            targetDao.closeConnection();
            sourceDao.closeConnection();
            // 关闭线程
            dataArchivingQueue.shutdownQueryThread(sourceTable);
         }
    /**
     * 清理归档重复数据
     * @param sourceTable
     * @param targetTableSet
     * @param uniqueField
     * @param targetDao
     */
    private DataTableEntity clearArchiveRepeatData(String sourceTable, Set<String> targetTableSet, String uniqueField, Dao targetDao, DataTableEntity list) {
        StringBuilder clearSql = new StringBuilder(128);
        clearSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet, Arrays.asList(uniqueField, "{#table_name#}"), false, " where " + BaseUtil.buildQuestionMarkFilter(uniqueField, list.getFieldAllValues(uniqueField), true)))
                .append("\nselect ").append(uniqueField).append(",_table_name from ").append(sourceTable);
        DataTableEntity clearDte = targetDao.getList(clearSql.toString());
        if (!DataTableEntity.isEmpty(clearDte)) {
            Set<Object> clearTableSet = Sets.newHashSet();
            clearTableSet.addAll(Arrays.asList(clearDte.getFieldAllValues("_table_name")));
            for (Object targetTableName : clearTableSet) {
                targetDao.delete(targetTableName.toString(),
                        BaseUtil.buildQuestionMarkFilter(uniqueField, clearDte.getRows(), true),
                        clearDte.getData().stream().map(item -> item.getString(uniqueField)).toArray());
            }
        }
        return clearDte;
    }
         // 删除mes主库的内容
         if (canExecuteClearFlag) {
            WriteUtil.append("DA-删除mes主库内容》》》");
            String deleteMasterLogUUID = deleteMasterData(sourceTable, configFse, sourceDao, targetDao, uniqueField, timeField, extractTimeField, targetTableSet, deleteSubLogUUID);
            if (!StringUtils.isEmpty(deleteMasterLogUUID)) {
               journalEntity.setPre_step_uuid(deleteMasterLogUUID);
            } else if (!StringUtils.isEmpty(deleteSubLogUUID)) {
               journalEntity.setPre_step_uuid(deleteMasterLogUUID);
            }
            WriteUtil.append("DA-删除mes主库内容完毕");
         }
         String errorInfo = dataArchivingQueue.getErrorLog(sourceTable);
         if (!StringUtils.isEmpty(errorInfo)) {
            journalEntity.setError(errorInfo);
            journalEntity.setResult(0);
         }
      } catch (Exception e) {
         SpringMVCContextHolder.getSystemLogger().error(e);
         if (!StringUtils.isEmpty(journalEntity.getError())) {
            journalEntity.setError(journalEntity.getError() + "\n" + journalManagerService.getStackTrace(e));
         } else {
            journalEntity.setError(journalManagerService.getStackTrace(e));
         }
         journalEntity.setResult(0);
      } finally {
         journalEntity.setSingle_duration(timer.intervalMs());
         journalEntity.setCount(archivingSuccessCount);
         journalEntity.setConfigUuid(uuid);
         journalEntity.setType(5);
         journalEntity.setDetail(4);
         journalEntity.setCreated_utc_datetime(new Date());
         FieldSetEntity curLogFse = null;
         if (archivingSuccessCount > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1 || !StringUtils.isEmpty(journalEntity.getPre_step_uuid())) {
            curLogFse = journalManagerService.autoCreateJournal(journalEntity);
         }
    /**
     * 清理判定,每天切换的时候可以执行一次
     * @param tableName
     * @return
     */
    public boolean canExecuteClear(String tableName) {
        String dateStr = DateUtil.format(DateUtil.date(), "yyyy-MM-dd");
        final String KEY = "DE_CLEAR_STORE";
        Object preDate = RedisUtil.getHash(KEY, tableName);
        if (dateStr.equals(preDate)) {
            return false;
        } else {
            RedisUtil.setHash(KEY, tableName, dateStr);
            return true;
        }
    }
         // 将日志表中执行失败的记录标记为已经重新处理
         WriteUtil.append("DA-将日志表中执行失败的记录标记为已经重新处理");
         DataTableEntity logDte;
         if (curLogFse == null) {
            logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=?", new Object[]{uuid});
         } else {
            logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=? and uuid<>?", new Object[]{uuid, curLogFse.getUUID()});
         }
         for (int i = 0; i < logDte.getRows(); i++) {
            journalManagerService.writeBackReDealResult(logDte.getFieldSetEntity(i), true);
         }
         WriteUtil.append("DA-执行完毕");
      }
   }
    /**
     * 清理主库数据
     *
     * @param sourceTable      源表名
     * @param configFse        配置fse
     * @param sourceDao        源dao
     * @param targetDao        目标dao
     * @param uniqueField      唯一字段
     * @param timeField        更新(归档)时间字段
     * @param extractTimeField 提取时间字段
     * @param targetTableSet   目标表集合
     * @param deleteSubLogUUID 删除子表日志uuid
     * @return
     */
    private String deleteMasterData(String sourceTable, FieldSetEntity configFse, Dao sourceDao, Dao targetDao, String uniqueField, String timeField, String extractTimeField, Set<String> targetTableSet, String deleteSubLogUUID) {
        FieldSetEntity deleteMasterLogFse = new FieldSetEntity();
        deleteMasterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
        TimeInterval deleteMasterLogTimer = DateUtil.timer();
        String minID = "";
        String maxID = "";
        Date statisticsStartTime = null;
        Date statisticsFinalTime = null;
        try {
            StringBuilder sql = new StringBuilder(128);
            sql.append("select count(*) count_value from ").append(sourceTable);
            if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) {
                sql.append(" where ").append(configFse.getString("source_select_filter"));
            } else {
                sql.append(" where  1=2");
            }
            FieldSetEntity paramFse = sourceDao.getOne(sql.toString());
            WriteUtil.append("DA-sourceDao.getOne(sql.toString())-sql:" + sql);
            int totalCount = StringUtils.isEmpty(paramFse.getString("count_value")) ? 0 : paramFse.getInteger("count_value");
            int delTotalCount = 0;
            if (totalCount > 0) {
                int pageSize = 1000;
                int totalPage = totalCount / pageSize + (totalCount % pageSize == 0 ? 0 : 1);
                sql.setLength(0);
                sql.append("select ").append(uniqueField).append(" from ").append(sourceTable);
                if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) {
                    sql.append(" where ").append(configFse.getString("source_select_filter"));
                }
                StringBuilder existSql = new StringBuilder(128);
                existSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet))
                        .append("\nselect ").append(uniqueField).append(" from ").append(sourceTable);
                StringBuilder tempSql = new StringBuilder(128);
                for (int i = 0; i < totalPage; i++) {
                    DataTableEntity delDte = sourceDao.getList(sql.toString(), new Object[]{}, 1, pageSize);
                    WriteUtil.append("DA-删除的数据-delDte:" + Arrays.toString(delDte.getFieldAllValues(uniqueField)));
                    if (DataTableEntity.isEmpty(delDte)) {
                        continue;
                    }
                    // 验证归档库里面存在,仅删除存在,不存在的保留
                    tempSql.setLength(0);
                    tempSql.append(existSql);
                    tempSql.append(" where ").append(BaseUtil.buildQuestionMarkFilter(uniqueField, delDte.getFieldAllValues(uniqueField), true));
                    WriteUtil.append("DA-tempSql:" + tempSql);
                    DataTableEntity existDte = targetDao.getList(tempSql.toString(), new Object[]{});
                    if (existDte.getRows() > 0) {
                        FieldSetEntity existFse;
                        for (int j = 0; j < existDte.getRows(); j++) {
                            existFse = existDte.getFieldSetEntity(j);
                            Date updateTime = existFse.getDate(timeField);
                            Date extractTime = existFse.getDate(extractTimeField);
                            Date date = getAimDate(updateTime, extractTime, 0);
                            statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
                            statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
                            minID = getAimID(minID, existFse.getString(uniqueField), 0);
                            maxID = getAimID(maxID, existFse.getString(uniqueField), 1);
                        }
                        delTotalCount += existDte.getRows();
                        sourceDao.delete(sourceTable,
                                BaseUtil.buildQuestionMarkFilter(uniqueField, existDte.getRows(), true),
                                existDte.getData().stream().map(item -> item.getString(uniqueField)).toArray());
                    }
                }
                WriteUtil.append("DA-删除总条数:" + delTotalCount);
            }
            deleteMasterLogFse.setValue(CmnConst.COUNT, delTotalCount);
            deleteMasterLogFse.setValue(CmnConst.RESULT, 1);
        } catch (Exception e) {
            e.printStackTrace();
            deleteMasterLogFse.setValue(CmnConst.RESULT, 0);
            deleteMasterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e));
        } finally {
            targetDao.closeConnection();
            sourceDao.closeConnection();
            deleteMasterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
            deleteMasterLogFse.setValue(CmnConst.TYPE, 5);
            deleteMasterLogFse.setValue(CmnConst.DETAIL, 6);
            deleteMasterLogFse.setValue(CmnConst.PRE_STEP_UUID, deleteSubLogUUID);
            deleteMasterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
            deleteMasterLogFse.setValue(CmnConst.DEAL_RESULT, 1);
            deleteMasterLogFse.setValue(CmnConst.MIN_ID, minID);
            deleteMasterLogFse.setValue(CmnConst.MAX_ID, maxID);
            deleteMasterLogFse.setValue(CmnConst.SINGLE_DURATION, deleteMasterLogTimer.intervalMs());
            deleteMasterLogFse.setValue(CmnConst.CONFIG_UUID, configFse.getUUID());
            deleteMasterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime);
            deleteMasterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime);
            if ((!StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.COUNT)) && deleteMasterLogFse.getInteger(CmnConst.COUNT) > 0)
                    || !StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.ERROR)) || !"1".equals(deleteMasterLogFse.getString(CmnConst.RESULT))) {
                getBaseDao().add(deleteMasterLogFse);
            }
        }
        return deleteMasterLogFse.getUUID();
    }
   /**
    * 清理归档重复数据
    *
    * @param sourceTable
    * @param targetTableSet
    * @param uniqueField
    * @param targetDao
    */
   private DataTableEntity clearArchiveRepeatData(String sourceTable, Set<String> targetTableSet, String uniqueField, Dao targetDao, DataTableEntity list) {
      StringBuilder clearSql = new StringBuilder(128);
      clearSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet, Arrays.asList(uniqueField, "{#table_name#}"), false, " where " + BaseUtil.buildQuestionMarkFilter(uniqueField, list.getFieldAllValues(uniqueField), true)))
            .append("\nselect ").append(uniqueField).append(",_table_name from ").append(sourceTable);
      DataTableEntity clearDte = targetDao.getList(clearSql.toString());
      if (!DataTableEntity.isEmpty(clearDte)) {
         Set<Object> clearTableSet = Sets.newHashSet();
         clearTableSet.addAll(Arrays.asList(clearDte.getFieldAllValues("_table_name")));
         for (Object targetTableName : clearTableSet) {
            targetDao.delete(targetTableName.toString(),
                  BaseUtil.buildQuestionMarkFilter(uniqueField, clearDte.getRows(), true),
                  clearDte.getData().stream().map(item -> item.getString(uniqueField)).toArray());
         }
      }
      return clearDte;
   }
    /**
     * 获取指定的日期,若有一个为空,那么直接获取另外一个的值;否则按照指定取值
     *
     * @param d1
     * @param d2
     * @param sign 大于0,取两者中大的,就是更靠近当前时间的;否则取小的,就是更远离当前时间的
     * @return
     */
    private Date getAimDate(Date d1, Date d2, int sign) {
        if (d1 == null && d2 == null) {
            return null;
        }
        if (d1 == null || d2 == null) {
            return d1 == null ? d2 : d1;
        }
        if (sign > 0) {
            return d1.compareTo(d2) > 0 ? d1 : d2;
        } else {
            return d1.compareTo(d2) > 0 ? d2 : d1;
        }
    }
   /**
    * 清理判定,每天切换的时候可以执行一次
    *
    * @param tableName
    * @return
    */
   public boolean canExecuteClear(String tableName) {
      String dateStr = DateUtil.format(DateUtil.date(), "yyyy-MM-dd");
      final String KEY = "DE_CLEAR_STORE";
      Object preDate = RedisUtil.getHash(KEY, tableName);
      if (dateStr.equals(preDate)) {
         return false;
      } else {
         RedisUtil.setHash(KEY, tableName, dateStr);
         return true;
      }
   }
    /**
     * 获取指定的ID,那么直接获取另外一个的值;否则按照指定取值
     *
     * @param s1
     * @param s2
     * @param sign 大于0,取两者中大的;否则取小的
     * @return
     */
    private String getAimID(String s1, String s2, int sign) {
        if (StringUtils.isEmpty(s1) && StringUtils.isEmpty(s2)) {
            return null;
        }
        if (StringUtils.isEmpty(s1) || StringUtils.isEmpty(s2)) {
            return StringUtils.isEmpty(s1) ? s2 : s1;
        }
        String numberRegexp = "\\d{1,11}";
        if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
            boolean b = Long.parseLong(s1) > Long.parseLong(s2);
            if (sign > 0) {
                return b ? s1 : s2;
            } else {
                return b ? s2 : s1;
            }
        } else {
            if (sign > 0) {
                return s1.compareTo(s2) > 0 ? s1 : s2;
            } else {
                return s1.compareTo(s2) > 0 ? s2 : s1;
            }
        }
    }
   /**
    * 清理主库数据
    *
    * @param sourceTable      源表名
    * @param configFse        配置fse
    * @param sourceDao        源dao
    * @param targetDao        目标dao
    * @param uniqueField      唯一字段
    * @param timeField        更新(归档)时间字段
    * @param extractTimeField 提取时间字段
    * @param targetTableSet   目标表集合
    * @param deleteSubLogUUID 删除子表日志uuid
    * @return
    */
   private String deleteMasterData(String sourceTable, FieldSetEntity configFse, Dao sourceDao, Dao targetDao, String uniqueField, String timeField, String extractTimeField, Set<String> targetTableSet, String deleteSubLogUUID) {
      FieldSetEntity deleteMasterLogFse = new FieldSetEntity();
      deleteMasterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
      TimeInterval deleteMasterLogTimer = DateUtil.timer();
      String minID = "";
      String maxID = "";
      Date statisticsStartTime = null;
      Date statisticsFinalTime = null;
      try {
         StringBuilder sql = new StringBuilder(128);
         sql.append("select count(*) count_value from ").append(sourceTable);
         if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) {
            sql.append(" where ").append(configFse.getString("source_select_filter"));
         } else {
            sql.append(" where  1=2");
         }
         FieldSetEntity paramFse = sourceDao.getOne(sql.toString());
         WriteUtil.append("DA-sourceDao.getOne(sql.toString())-sql:" + sql);
         int totalCount = StringUtils.isEmpty(paramFse.getString("count_value")) ? 0 : paramFse.getInteger("count_value");
         int delTotalCount = 0;
         if (totalCount > 0) {
            int pageSize = 1000;
            int totalPage = totalCount / pageSize + (totalCount % pageSize == 0 ? 0 : 1);
            sql.setLength(0);
            sql.append("select ").append(uniqueField).append(" from ").append(sourceTable);
            if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) {
               sql.append(" where ").append(configFse.getString("source_select_filter"));
            }
            StringBuilder existSql = new StringBuilder(128);
            existSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet))
                  .append("\nselect ").append(uniqueField).append(" from ").append(sourceTable);
            StringBuilder tempSql = new StringBuilder(128);
            for (int i = 0; i < totalPage; i++) {
               DataTableEntity delDte = sourceDao.getList(sql.toString(), new Object[]{}, 1, pageSize);
               WriteUtil.append("DA-删除的数据-delDte:" + Arrays.toString(delDte.getFieldAllValues(uniqueField)));
               if (DataTableEntity.isEmpty(delDte)) {
                  continue;
               }
               // 验证归档库里面存在,仅删除存在,不存在的保留
               tempSql.setLength(0);
               tempSql.append(existSql);
               tempSql.append(" where ").append(BaseUtil.buildQuestionMarkFilter(uniqueField, delDte.getFieldAllValues(uniqueField), true));
               WriteUtil.append("DA-tempSql:" + tempSql);
               DataTableEntity existDte = targetDao.getList(tempSql.toString(), new Object[]{});
               if (existDte.getRows() > 0) {
                  FieldSetEntity existFse;
                  for (int j = 0; j < existDte.getRows(); j++) {
                     existFse = existDte.getFieldSetEntity(j);
                     Date updateTime = existFse.getDate(timeField);
                     Date extractTime = existFse.getDate(extractTimeField);
                     Date date = getAimDate(updateTime, extractTime, 0);
                     statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
                     statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
                     minID = getAimID(minID, existFse.getString(uniqueField), 0);
                     maxID = getAimID(maxID, existFse.getString(uniqueField), 1);
                  }
                  delTotalCount += existDte.getRows();
                  sourceDao.delete(sourceTable,
                        BaseUtil.buildQuestionMarkFilter(uniqueField, existDte.getRows(), true),
                        existDte.getData().stream().map(item -> item.getString(uniqueField)).toArray());
               }
            }
            WriteUtil.append("DA-删除总条数:" + delTotalCount);
         }
         deleteMasterLogFse.setValue(CmnConst.COUNT, delTotalCount);
         deleteMasterLogFse.setValue(CmnConst.RESULT, 1);
      } catch (Exception e) {
         e.printStackTrace();
         deleteMasterLogFse.setValue(CmnConst.RESULT, 0);
         deleteMasterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e));
      } finally {
         targetDao.closeConnection();
         sourceDao.closeConnection();
         deleteMasterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
         deleteMasterLogFse.setValue(CmnConst.TYPE, 5);
         deleteMasterLogFse.setValue(CmnConst.DETAIL, 6);
         deleteMasterLogFse.setValue(CmnConst.PRE_STEP_UUID, deleteSubLogUUID);
         deleteMasterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
         deleteMasterLogFse.setValue(CmnConst.DEAL_RESULT, 1);
         deleteMasterLogFse.setValue(CmnConst.MIN_ID, minID);
         deleteMasterLogFse.setValue(CmnConst.MAX_ID, maxID);
         deleteMasterLogFse.setValue(CmnConst.SINGLE_DURATION, deleteMasterLogTimer.intervalMs());
         deleteMasterLogFse.setValue(CmnConst.CONFIG_UUID, configFse.getUUID());
         deleteMasterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime);
         deleteMasterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime);
         if ((!StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.COUNT)) && deleteMasterLogFse.getInteger(CmnConst.COUNT) > 0)
               || !StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.ERROR)) || !"1".equals(deleteMasterLogFse.getString(CmnConst.RESULT))) {
            getBaseDao().add(deleteMasterLogFse);
         }
      }
      return deleteMasterLogFse.getUUID();
   }
    /**
     * 将dte按照年份分组
     *
     * @param dte
     * @param timeField      时间字段
     * @param splitTableType 分表方式,0-年,1-月
     * @return Map<时间, List < dte数据>>
     */
    private Map<String, List<DataTableEntity>> dteGroupByTime(DataTableEntity dte, String timeField, String splitTableType) {
        Map<String, List<DataTableEntity>> groupDteMap = Maps.newHashMap();
        FieldSetEntity fse;
        String time;
        List<DataTableEntity> groupDteList;
        if (StringUtils.isEmpty(timeField)) {
            groupDteList = Lists.newArrayList();
            groupDte(groupDteList, dte);
            groupDteMap.put("0", groupDteList);
        } else {
            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
            for (int i = 0; i < dte.getRows(); i++) {
                fse = dte.getFieldSetEntity(i);
                if (fse.getDate(timeField) == null) {
                    throw new BaseException(ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getValue(), ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getText() + "\ntable_name:" + fse.getTableName() + "\ndata:" + fse.getValues());
                }
                if ("1".equals(splitTableType)) {
                    time = dateFormat.format(fse.getDate(timeField));
                } else {
                    time = String.valueOf(DateUtil.year(fse.getDate(timeField)));
                }
                groupDteList = groupDteMap.computeIfAbsent(time, k -> Lists.newArrayList());
                groupAddDte(groupDteList, fse);
            }
        }
        return groupDteMap;
    }
   /**
    * 获取指定的日期,若有一个为空,那么直接获取另外一个的值;否则按照指定取值
    *
    * @param d1
    * @param d2
    * @param sign 大于0,取两者中大的,就是更靠近当前时间的;否则取小的,就是更远离当前时间的
    * @return
    */
   private Date getAimDate(Date d1, Date d2, int sign) {
      if (d1 == null && d2 == null) {
         return null;
      }
      if (d1 == null || d2 == null) {
         return d1 == null ? d2 : d1;
      }
      if (sign > 0) {
         return d1.compareTo(d2) > 0 ? d1 : d2;
      } else {
         return d1.compareTo(d2) > 0 ? d2 : d1;
      }
   }
    private void groupDte(List<DataTableEntity> list, DataTableEntity allDte) {
        if (list == null) {
            throw new BaseException(ErrorCode.DATA_ARCHIVE_GROUP_CONTAINER_IS_NULL);
        }
        for (int i = 0; i < allDte.getRows(); i++) {
            groupAddDte(list, allDte.getFieldSetEntity(i));
        }
    }
   /**
    * 获取指定的ID,那么直接获取另外一个的值;否则按照指定取值
    *
    * @param s1
    * @param s2
    * @param sign 大于0,取两者中大的;否则取小的
    * @return
    */
   private String getAimID(String s1, String s2, int sign) {
      if (StringUtils.isEmpty(s1) && StringUtils.isEmpty(s2)) {
         return null;
      }
      if (StringUtils.isEmpty(s1) || StringUtils.isEmpty(s2)) {
         return StringUtils.isEmpty(s1) ? s2 : s1;
      }
      String numberRegexp = "\\d{1,11}";
      if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
         boolean b = Long.parseLong(s1) > Long.parseLong(s2);
         if (sign > 0) {
            return b ? s1 : s2;
         } else {
            return b ? s2 : s1;
         }
      } else {
         if (sign > 0) {
            return s1.compareTo(s2) > 0 ? s1 : s2;
         } else {
            return s1.compareTo(s2) > 0 ? s2 : s1;
         }
      }
   }
    private void groupAddDte(List<DataTableEntity> list, FieldSetEntity fse) {
        DataTableEntity dte;
        if (list.isEmpty()) {
            dte = new DataTableEntity();
            list.add(dte);
        } else {
            dte = list.get(list.size() - 1);
            if (dte.getRows() >= DataArchivingQueue.INSERT_PAGE_SIZE) {
                dte = new DataTableEntity();
                list.add(dte);
            }
        }
        dte.addFieldSetEntity(fse);
    }
   /**
    * 将dte按照年份分组
    *
    * @param dte
    * @param timeField      时间字段
    * @param splitTableType 分表方式,0-年,1-月
    * @return Map<时间, List < dte数据>>
    */
   private Map<String, List<DataTableEntity>> dteGroupByTime(DataTableEntity dte, String timeField, String splitTableType) {
      Map<String, List<DataTableEntity>> groupDteMap = Maps.newHashMap();
      FieldSetEntity fse;
      String time;
      List<DataTableEntity> groupDteList;
      if (StringUtils.isEmpty(timeField)) {
         groupDteList = Lists.newArrayList();
         groupDte(groupDteList, dte);
         groupDteMap.put("0", groupDteList);
      } else {
         SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
         for (int i = 0; i < dte.getRows(); i++) {
            fse = dte.getFieldSetEntity(i);
            if (fse.getDate(timeField) == null) {
               throw new BaseException(ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getValue(), ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getText() + "\ntable_name:" + fse.getTableName() + "\ndata:" + fse.getValues());
            }
            if ("1".equals(splitTableType)) {
               time = dateFormat.format(fse.getDate(timeField));
            } else {
               time = String.valueOf(DateUtil.year(fse.getDate(timeField)));
            }
            groupDteList = groupDteMap.computeIfAbsent(time, k -> Lists.newArrayList());
            groupAddDte(groupDteList, fse);
         }
      }
      return groupDteMap;
   }
   private void groupDte(List<DataTableEntity> list, DataTableEntity allDte) {
      if (list == null) {
         throw new BaseException(ErrorCode.DATA_ARCHIVE_GROUP_CONTAINER_IS_NULL);
      }
      for (int i = 0; i < allDte.getRows(); i++) {
         groupAddDte(list, allDte.getFieldSetEntity(i));
      }
   }
   private void groupAddDte(List<DataTableEntity> list, FieldSetEntity fse) {
      DataTableEntity dte;
      if (list.isEmpty()) {
         dte = new DataTableEntity();
         list.add(dte);
      } else {
         dte = list.get(list.size() - 1);
         if (dte.getRows() >= DataArchivingQueue.INSERT_PAGE_SIZE) {
            dte = new DataTableEntity();
            list.add(dte);
         }
      }
      dte.addFieldSetEntity(fse);
   }
    class DataArchivingServiceImpl {
   class DataArchivingServiceImpl {
        private Dao sourceDao;
        private Dao targetDao;
        private String sourceTable;
        private String configUid;
        private String sourceDbName;
        private String targetDbName;
      private Dao sourceDao;
      private Dao targetDao;
      private String sourceTable;
      private String configUid;
      private String sourceDbName;
      private String targetDbName;
        public DataArchivingServiceImpl(Dao sourceDao, Dao targetDao, String sourceTable, String configUid, String sourceDbName, String targetDbName) {
            this.sourceDao = sourceDao;
            this.targetDao = targetDao;
            this.sourceTable = sourceTable;
            this.configUid = configUid;
            this.sourceDbName = sourceDbName;
            this.targetDbName = targetDbName;
        }
      public DataArchivingServiceImpl(Dao sourceDao, Dao targetDao, String sourceTable, String configUid, String sourceDbName, String targetDbName) {
         this.sourceDao = sourceDao;
         this.targetDao = targetDao;
         this.sourceTable = sourceTable;
         this.configUid = configUid;
         this.sourceDbName = sourceDbName;
         this.targetDbName = targetDbName;
      }
        /**
         * 验证数据表是否存在
         *
         * @param tableName 数据表名
         * @return
         */
        public boolean dataTableIsExists(String tableName) throws BaseException {
            try {
                Connection connection = targetDao.getConnection();
                DatabaseMetaData metaData = targetDao.getConnection().getMetaData();
                ResultSet tables = metaData.getTables(null, null, tableName, new String[]{"TABLE"});
                boolean result = tables.next();
                tables.close();
                connection.close();
                return result;
            } catch (Exception e) {
                e.printStackTrace();
                throw new BaseException(e);
            }
        }
      /**
       * 验证数据表是否存在
       *
       * @param tableName 数据表名
       * @return
       */
      public boolean dataTableIsExists(String tableName) throws BaseException {
         try {
            Connection connection = targetDao.getConnection();
            DatabaseMetaData metaData = targetDao.getConnection().getMetaData();
            ResultSet tables = metaData.getTables(null, null, tableName, new String[]{"TABLE"});
            boolean result = tables.next();
            tables.close();
            connection.close();
            return result;
         } catch (Exception e) {
            e.printStackTrace();
            throw new BaseException(e);
         }
      }
        /**
         * 创建表
         *
         * @param prefix 表名前缀
         */
        public String createTable(String prefix, String time, DataBaseEntity dbe) throws BaseException {
            String tableName;
            if (!"0".equals(time)) {
                tableName = prefix + (prefix.lastIndexOf("_") != prefix.length() - 1 ? "_" : "") + time;
            } else {
                tableName = prefix.endsWith("_") ? prefix.substring(0, prefix.length() - 1) : prefix;
            }
            if (dataTableIsExists(tableName)) {
                return tableName;
            }
            JSONObject tableInfoObj = getTableInfo(tableName);
            String sql = getSql(tableInfoObj);
            SpringMVCContextHolder.getSystemLogger().error("sql:\n" + sql);
            //先创建记录再执行ddl语句不然报错后ddl无法回滚
            saveCreateTableRecord(tableName, time);
            targetDao.executeSql(sql);
      /**
       * 创建表
       *
       * @param prefix 表名前缀
       */
      public String createTable(String prefix, String time, DataBaseEntity dbe) throws BaseException {
         String tableName;
         if (!"0".equals(time)) {
            tableName = prefix + (prefix.lastIndexOf("_") != prefix.length() - 1 ? "_" : "") + time;
         } else {
            tableName = prefix.endsWith("_") ? prefix.substring(0, prefix.length() - 1) : prefix;
         }
         if (dataTableIsExists(tableName)) {
            return tableName;
         }
         JSONObject tableInfoObj = getTableInfo(tableName);
         String sql = getSql(tableInfoObj);
         SpringMVCContextHolder.getSystemLogger().error("sql:\n" + sql);
         //先创建记录再执行ddl语句不然报错后ddl无法回滚
         saveCreateTableRecord(tableName, time);
         targetDao.executeSql(sql);
//            syncDataConfigService.addTableField(dbe,dbe.getUuid(),tableName);
            return tableName;
        }
         return tableName;
      }
        /**
         * 根据表名获取来源数据源对应表的结构信息
         *
         * @param tableName
         * @return
         */
        private JSONObject getTableInfo(String tableName) {
            JSONObject tableInfoObj = new JSONObject();
            tableInfoObj.put(CmnConst.NAME, tableName);
            DataTableEntity tempDte;
            FieldSetEntity tempFse;
            JSONObject fieldInfoObj = new JSONObject(new LinkedHashMap<>());
            tableInfoObj.put(CmnConst.FIELD, fieldInfoObj);
            JSONObject indexInfoObj = new JSONObject(new LinkedHashMap<>());
            tableInfoObj.put(CmnConst.INDEX, indexInfoObj);
            JSONObject singleFieldInfoObj;
            if (DataBaseType.MYSQL.equals(sourceDao.getDataBaseType())) {
                // mysql
                // 表
                tempFse = sourceDao.getOne("information_schema.`TABLES`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable});
                tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("table_comment"));
                // 字段表
                tempDte = sourceDao.getList("information_schema.`COLUMNS`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}, "ordinal_position", 1, Integer.MAX_VALUE);
                for (int i = 0; i < tempDte.getRows(); i++) {
                    tempFse = tempDte.getFieldSetEntity(i);
                    singleFieldInfoObj = new JSONObject();
                    fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj);
                    singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type"));
                    singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("character_maximum_length")) ? tempFse.getString("numeric_precision") : tempFse.getString("character_maximum_length"));
                    singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("numeric_scale"));
                    singleFieldInfoObj.put(CmnConst.NULLABLE, "NO".equalsIgnoreCase(tempFse.getString("is_nullable")) ? 0 : 1);
                    singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("column_comment"));
                }
                // 索引表
                StringBuilder sql = new StringBuilder(128);
                sql.append("select index_name,non_unique,group_concat(column_name) column_name");
                sql.append("\nfrom information_schema.`STATISTICS`");
                sql.append("\nwhere table_schema=? and table_name=?");
                sql.append("\ngroup by index_name,non_unique");
                tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, sourceTable});
                for (int i = 0; i < tempDte.getRows(); i++) {
                    tempFse = tempDte.getFieldSetEntity(i);
                    singleFieldInfoObj = new JSONObject();
                    indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
                    singleFieldInfoObj.put(CmnConst.TYPE, "PRIMARY".equalsIgnoreCase(tempFse.getString("index_name")) ? CmnConst.PRIMARY : ("1".equals(tempFse.getString("non_unique")) ? CmnConst.NORMAL : CmnConst.UNIQUE));
                    singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
                }
            } else if (DataBaseType.ORACLE.equals(sourceDao.getDataBaseType())) {
                // oracle
                // 表
                String upperTableName = sourceTable.toUpperCase();
                tempFse = sourceDao.getOne("SYS.USER_TAB_COMMENTS", "table_name=?", new Object[]{upperTableName});
                tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments"));
                // 字段表
                StringBuilder sql = new StringBuilder(128);
                sql.append("SELECT TC.COLUMN_NAME,DATA_TYPE,DATA_LENGTH,DATA_PRECISION,DATA_SCALE,NULLABLE,CHAR_LENGTH,COMMENTS FROM SYS.USER_TAB_COLUMNS TC");
                sql.append("\nLEFT JOIN USER_COL_COMMENTS CC ON TC.TABLE_NAME=CC.TABLE_NAME AND TC.COLUMN_NAME=CC.COLUMN_NAME");
                sql.append("\nWHERE TC.TABLE_NAME=?");
                sql.append("\nORDER BY TC.COLUMN_ID");
                tempDte = sourceDao.getList(sql.toString(), new Object[]{upperTableName});
                for (int i = 0; i < tempDte.getRows(); i++) {
                    tempFse = tempDte.getFieldSetEntity(i);
                    singleFieldInfoObj = new JSONObject();
                    fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj);
                    singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type").contains("VARCHAR") ? "varchar" : tempFse.getString("data_type"));
                    singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("data_precision"))
                            ? tempFse.getString("char_length") : tempFse.getString("data_precision"));
                    singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("data_scale"));
                    singleFieldInfoObj.put(CmnConst.NULLABLE, "N".equalsIgnoreCase(tempFse.getString("nullable")) ? 0 : 1);
                    singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments"));
                }
                // 索引表
                sql.setLength(0);
                sql.append("SELECT DIC.INDEX_NAME,WM_CONCAT(DIC.COLUMN_NAME) column_name FROM SYS.DBA_IND_COLUMNS DIC");
                sql.append("\nLEFT JOIN SYS.DBA_INDEXES DI ON DIC.INDEX_NAME=DI.INDEX_NAME");
                sql.append("\nWHERE UNIQUENESS='NONUNIQUE' AND DIC.TABLE_OWNER=? AND DI.TABLE_NAME=?");
                sql.append("\nGROUP BY DIC.INDEX_NAME");
                tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName});
                for (int i = 0; i < tempDte.getRows(); i++) {
                    tempFse = tempDte.getFieldSetEntity(i);
                    singleFieldInfoObj = new JSONObject();
                    indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
                    singleFieldInfoObj.put(CmnConst.TYPE, CmnConst.NORMAL);
                    singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
                }
                // 约束表 C-检查,写到字段里面;R-外键,不要;P-主键;U-唯一键
                sql.setLength(0);
                sql.append("SELECT DC.CONSTRAINT_NAME index_name,CONSTRAINT_TYPE,LISTAGG(COLUMN_NAME, ',') WITHIN GROUP(ORDER BY DCC.POSITION) column_name FROM SYS.DBA_CONS_COLUMNS DCC");
                sql.append("\nLEFT JOIN SYS.DBA_CONSTRAINTS DC ON DCC.CONSTRAINT_NAME=DC.CONSTRAINT_NAME");
                sql.append("\nWHERE DCC.OWNER=? AND DCC.TABLE_NAME=? AND CONSTRAINT_TYPE IN ('P','U')");
                sql.append("\nGROUP BY DC.CONSTRAINT_NAME,CONSTRAINT_TYPE");
                tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName});
                WriteUtil.append("DA-DDL-:" + sql + " |||库名:" + sourceDbName + " |||表名:" + upperTableName);
                for (int i = 0; i < tempDte.getRows(); i++) {
                    tempFse = tempDte.getFieldSetEntity(i);
                    singleFieldInfoObj = new JSONObject();
                    indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
                    singleFieldInfoObj.put(CmnConst.TYPE, "P".equalsIgnoreCase(tempFse.getString("constraint_type")) ? CmnConst.PRIMARY : CmnConst.UNIQUE);
                    singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
                }
                WriteUtil.append("DA-DDL-创表信息:" + tableInfoObj);
            } else {
                throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL);
            }
            return tableInfoObj;
        }
      /**
       * 根据表名获取来源数据源对应表的结构信息
       *
       * @param tableName
       * @return
       */
      private JSONObject getTableInfo(String tableName) {
         JSONObject tableInfoObj = new JSONObject();
         tableInfoObj.put(CmnConst.NAME, tableName);
         DataTableEntity tempDte;
         FieldSetEntity tempFse;
         JSONObject fieldInfoObj = new JSONObject(new LinkedHashMap<>());
         tableInfoObj.put(CmnConst.FIELD, fieldInfoObj);
         JSONObject indexInfoObj = new JSONObject(new LinkedHashMap<>());
         tableInfoObj.put(CmnConst.INDEX, indexInfoObj);
         JSONObject singleFieldInfoObj;
         if (DataBaseType.MYSQL.equals(sourceDao.getDataBaseType())) {
            // mysql
            // 表
            tempFse = sourceDao.getOne("information_schema.`TABLES`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable});
            tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("table_comment"));
            // 字段表
            tempDte = sourceDao.getList("information_schema.`COLUMNS`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}, "ordinal_position", 1, Integer.MAX_VALUE);
            for (int i = 0; i < tempDte.getRows(); i++) {
               tempFse = tempDte.getFieldSetEntity(i);
               singleFieldInfoObj = new JSONObject();
               fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj);
               singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type"));
               singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("character_maximum_length")) ? tempFse.getString("numeric_precision") : tempFse.getString("character_maximum_length"));
               singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("numeric_scale"));
               singleFieldInfoObj.put(CmnConst.NULLABLE, "NO".equalsIgnoreCase(tempFse.getString("is_nullable")) ? 0 : 1);
               singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("column_comment"));
            }
            // 索引表
            StringBuilder sql = new StringBuilder(128);
            sql.append("select index_name,non_unique,group_concat(column_name) column_name");
            sql.append("\nfrom information_schema.`STATISTICS`");
            sql.append("\nwhere table_schema=? and table_name=?");
            sql.append("\ngroup by index_name,non_unique");
            tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, sourceTable});
            for (int i = 0; i < tempDte.getRows(); i++) {
               tempFse = tempDte.getFieldSetEntity(i);
               singleFieldInfoObj = new JSONObject();
               indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
               singleFieldInfoObj.put(CmnConst.TYPE, "PRIMARY".equalsIgnoreCase(tempFse.getString("index_name")) ? CmnConst.PRIMARY : ("1".equals(tempFse.getString("non_unique")) ? CmnConst.NORMAL : CmnConst.UNIQUE));
               singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
            }
         } else if (DataBaseType.ORACLE.equals(sourceDao.getDataBaseType())) {
            // oracle
            // 表
            String upperTableName = sourceTable.toUpperCase();
            tempFse = sourceDao.getOne("SYS.USER_TAB_COMMENTS", "table_name=?", new Object[]{upperTableName});
            tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments"));
            // 字段表
            StringBuilder sql = new StringBuilder(128);
            sql.append("SELECT TC.COLUMN_NAME,DATA_TYPE,DATA_LENGTH,DATA_PRECISION,DATA_SCALE,NULLABLE,CHAR_LENGTH,COMMENTS FROM SYS.USER_TAB_COLUMNS TC");
            sql.append("\nLEFT JOIN USER_COL_COMMENTS CC ON TC.TABLE_NAME=CC.TABLE_NAME AND TC.COLUMN_NAME=CC.COLUMN_NAME");
            sql.append("\nWHERE TC.TABLE_NAME=?");
            sql.append("\nORDER BY TC.COLUMN_ID");
            tempDte = sourceDao.getList(sql.toString(), new Object[]{upperTableName});
            for (int i = 0; i < tempDte.getRows(); i++) {
               tempFse = tempDte.getFieldSetEntity(i);
               singleFieldInfoObj = new JSONObject();
               fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj);
               singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type").contains("VARCHAR") ? "varchar" : tempFse.getString("data_type"));
               singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("data_precision"))
                     ? tempFse.getString("char_length") : tempFse.getString("data_precision"));
               singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("data_scale"));
               singleFieldInfoObj.put(CmnConst.NULLABLE, "N".equalsIgnoreCase(tempFse.getString("nullable")) ? 0 : 1);
               singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments"));
            }
            // 索引表
            sql.setLength(0);
            sql.append("SELECT DIC.INDEX_NAME,WM_CONCAT(DIC.COLUMN_NAME) column_name FROM SYS.DBA_IND_COLUMNS DIC");
            sql.append("\nLEFT JOIN SYS.DBA_INDEXES DI ON DIC.INDEX_NAME=DI.INDEX_NAME");
            sql.append("\nWHERE UNIQUENESS='NONUNIQUE' AND DIC.TABLE_OWNER=? AND DI.TABLE_NAME=?");
            sql.append("\nGROUP BY DIC.INDEX_NAME");
            tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName});
            for (int i = 0; i < tempDte.getRows(); i++) {
               tempFse = tempDte.getFieldSetEntity(i);
               singleFieldInfoObj = new JSONObject();
               indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
               singleFieldInfoObj.put(CmnConst.TYPE, CmnConst.NORMAL);
               singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
            }
            // 约束表 C-检查,写到字段里面;R-外键,不要;P-主键;U-唯一键
            sql.setLength(0);
            sql.append("SELECT DC.CONSTRAINT_NAME index_name,CONSTRAINT_TYPE,LISTAGG(COLUMN_NAME, ',') WITHIN GROUP(ORDER BY DCC.POSITION) column_name FROM SYS.DBA_CONS_COLUMNS DCC");
            sql.append("\nLEFT JOIN SYS.DBA_CONSTRAINTS DC ON DCC.CONSTRAINT_NAME=DC.CONSTRAINT_NAME");
            sql.append("\nWHERE DCC.OWNER=? AND DCC.TABLE_NAME=? AND CONSTRAINT_TYPE IN ('P','U')");
            sql.append("\nGROUP BY DC.CONSTRAINT_NAME,CONSTRAINT_TYPE");
            tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName});
            WriteUtil.append("DA-DDL-:" + sql + " |||库名:" + sourceDbName + " |||表名:" + upperTableName);
            for (int i = 0; i < tempDte.getRows(); i++) {
               tempFse = tempDte.getFieldSetEntity(i);
               singleFieldInfoObj = new JSONObject();
               indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
               singleFieldInfoObj.put(CmnConst.TYPE, "P".equalsIgnoreCase(tempFse.getString("constraint_type")) ? CmnConst.PRIMARY : CmnConst.UNIQUE);
               singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
            }
            WriteUtil.append("DA-DDL-创表信息:" + tableInfoObj);
         } else {
            throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL);
         }
         return tableInfoObj;
      }
        /**
         * 根据表结构信息,拼接DDL创建表sql语句
         *
         * @param tableInfoObj
         * @return
         */
        private String getSql(JSONObject tableInfoObj) {
            StringBuilder sql = new StringBuilder(128);
            JSONObject fieldInfoObj = tableInfoObj.getJSONObject(CmnConst.FIELD);
            JSONObject indexInfoObj = tableInfoObj.getJSONObject(CmnConst.INDEX);
            JSONObject singleFieldInfoObj;
            sql.append("CREATE TABLE ").append(tableInfoObj.getString(CmnConst.NAME)).append(" (");
            if (DataBaseType.MYSQL.equals(targetDao.getDataBaseType())) {
                // mysql
                for (String field : fieldInfoObj.keySet()) {
                    singleFieldInfoObj = fieldInfoObj.getJSONObject(field);
                    sql.append("\n    `").append(field.toLowerCase(Locale.ROOT)).append("` ");
                    if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("timestamp") || singleFieldInfoObj.getString(CmnConst.TYPE).contains("TIMESTAMP")) {
                        sql.append("timestamp ");
                    } else if (singleFieldInfoObj.getIntValue(CmnConst.DECIMAL) > 0) {
                        sql.append("decimal(").append(singleFieldInfoObj.getString(CmnConst.INTEGER)).append(",").append(singleFieldInfoObj.getString(CmnConst.DECIMAL)).append(") ");
                    } else if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("int")) {
                        sql.append("int(0) ");
                    } else if ("number".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        if ("0".equals(singleFieldInfoObj.getString(CmnConst.DECIMAL))) {
                            sql.append("int(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") ");
                        } else {
                            sql.append("decimal(22,4) ");
                        }
                    } else if ("date".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        sql.append("datetime(0) ");
                    } else if ("blob".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        sql.append("blob ");
                    } else if ("text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT).endsWith("text") ||
                            (("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) && singleFieldInfoObj.getIntValue(CmnConst.INTEGER) >= 4000)
                    ) {
                        sql.append("text ");
                    } else {
                        sql.append(singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT)).append("(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") ");
                    }
                    if ("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        sql.append(" CHARACTER SET utf8mb4 COLLATE utf8mb4_bin ");
                    }
                    if ("0".equals(singleFieldInfoObj.getString(CmnConst.NULLABLE))) {
                        sql.append("not null ");
                    }
                    if (!StringUtils.isEmpty(singleFieldInfoObj.getString(CmnConst.COMMENT))) {
                        sql.append(" comment '").append(singleFieldInfoObj.getString(CmnConst.COMMENT)).append("'");
                    }
                    sql.append(",");
                }
                for (String indexName : indexInfoObj.keySet()) {
                    singleFieldInfoObj = indexInfoObj.getJSONObject(indexName);
                    if (CmnConst.PRIMARY.equalsIgnoreCase(indexName) || CmnConst.PRIMARY.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        sql.append("\n    PRIMARY KEY (`").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append("`) USING BTREE,");
                    } else if (CmnConst.UNIQUE.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        sql.append("\n    UNIQUE INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,");
                    } else if (CmnConst.NORMAL.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                        sql.append("\n    INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,");
                    }
                }
                sql.deleteCharAt(sql.length() - 1);
                sql.append("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ");
                if (!StringUtils.isEmpty(tableInfoObj.getString(CmnConst.COMMENT))) {
                    sql.append(" COMMENT = '").append(tableInfoObj.getString(CmnConst.COMMENT)).append("'");
                }
            } else {
                throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL);
            }
            return sql.toString();
        }
      /**
       * 根据表结构信息,拼接DDL创建表sql语句
       *
       * @param tableInfoObj
       * @return
       */
      private String getSql(JSONObject tableInfoObj) {
         StringBuilder sql = new StringBuilder(128);
         JSONObject fieldInfoObj = tableInfoObj.getJSONObject(CmnConst.FIELD);
         JSONObject indexInfoObj = tableInfoObj.getJSONObject(CmnConst.INDEX);
         JSONObject singleFieldInfoObj;
         sql.append("CREATE TABLE ").append(tableInfoObj.getString(CmnConst.NAME)).append(" (");
         if (DataBaseType.MYSQL.equals(targetDao.getDataBaseType())) {
            // mysql
            for (String field : fieldInfoObj.keySet()) {
               singleFieldInfoObj = fieldInfoObj.getJSONObject(field);
               sql.append("\n    `").append(field.toLowerCase(Locale.ROOT)).append("` ");
               if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("timestamp") || singleFieldInfoObj.getString(CmnConst.TYPE).contains("TIMESTAMP")) {
                  sql.append("timestamp ");
               } else if (singleFieldInfoObj.getIntValue(CmnConst.DECIMAL) > 0) {
                  sql.append("decimal(").append(singleFieldInfoObj.getString(CmnConst.INTEGER)).append(",").append(singleFieldInfoObj.getString(CmnConst.DECIMAL)).append(") ");
               } else if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("int")) {
                  sql.append("int(0) ");
               } else if ("number".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  if ("0".equals(singleFieldInfoObj.getString(CmnConst.DECIMAL))) {
                     sql.append("int(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") ");
                  } else {
                     sql.append("decimal(22,4) ");
                  }
               } else if ("date".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  sql.append("datetime(0) ");
               } else if ("blob".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  sql.append("blob ");
               } else if ("text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT).endsWith("text") ||
                     (("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) && singleFieldInfoObj.getIntValue(CmnConst.INTEGER) >= 4000)
               ) {
                  sql.append("text ");
               } else {
                  sql.append(singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT)).append("(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") ");
               }
               if ("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  sql.append(" CHARACTER SET utf8mb4 COLLATE utf8mb4_bin ");
               }
               if ("0".equals(singleFieldInfoObj.getString(CmnConst.NULLABLE))) {
                  sql.append("not null ");
               }
               if (!StringUtils.isEmpty(singleFieldInfoObj.getString(CmnConst.COMMENT))) {
                  sql.append(" comment '").append(singleFieldInfoObj.getString(CmnConst.COMMENT)).append("'");
               }
               sql.append(",");
            }
            for (String indexName : indexInfoObj.keySet()) {
               singleFieldInfoObj = indexInfoObj.getJSONObject(indexName);
               if (CmnConst.PRIMARY.equalsIgnoreCase(indexName) || CmnConst.PRIMARY.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  sql.append("\n    PRIMARY KEY (`").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append("`) USING BTREE,");
               } else if (CmnConst.UNIQUE.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  sql.append("\n    UNIQUE INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,");
               } else if (CmnConst.NORMAL.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
                  sql.append("\n    INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,");
               }
            }
            sql.deleteCharAt(sql.length() - 1);
            sql.append("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ");
            if (!StringUtils.isEmpty(tableInfoObj.getString(CmnConst.COMMENT))) {
               sql.append(" COMMENT = '").append(tableInfoObj.getString(CmnConst.COMMENT)).append("'");
            }
         } else {
            throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL);
         }
         return sql.toString();
      }
        private String saveCreateTableRecord(String tableName, String time) {
            /*=====================================================*/
            //                 新增创建表记录
            FieldSetEntity fse = new FieldSetEntity();
            fse.setTableName(CmnConst.DATA_ARCHIVING_SUB_TABLE);
            fse.setValue("table_name", tableName);
            fse.setValue("parent_uuid", this.configUid);
            fse.setValue("data_time", time);
            BaseUtil.createCreatorAndCreationTime(fse);
            getBaseDao().saveFieldSetEntity(fse);
            /*=====================================================*/
            return tableName;
        }
      private String saveCreateTableRecord(String tableName, String time) {
         /*=====================================================*/
         //                 新增创建表记录
         FieldSetEntity fse = new FieldSetEntity();
         fse.setTableName(CmnConst.DATA_ARCHIVING_SUB_TABLE);
         fse.setValue("table_name", tableName);
         fse.setValue("parent_uuid", this.configUid);
         fse.setValue("data_time", time);
         BaseUtil.createCreatorAndCreationTime(fse);
         getBaseDao().saveFieldSetEntity(fse);
         /*=====================================================*/
         return tableName;
      }
        private void createIndex(String targetTable, String time) throws BaseException {
            StringBuilder sql = new StringBuilder();
            sql.append("\n SELECT DBMS_METADATA.GET_DDL('INDEX',u.index_name) as  create_index_statement,u.INDEX_NAME ");
            sql.append("\n from USER_INDEXES u where u.TABLE_NAME=? ");
            String primaryIndexName = getPrimaryIndexName();
            List<Object> params = new ArrayList<>();
            params.add(sourceTable);
            if (!StringUtils.isEmpty(primaryIndexName)) {
                sql.append(" and u.index_name  != ? ");
                params.add(primaryIndexName);
            }
            DataTableEntity list = sourceDao.getList(sql.toString(), params.toArray());
            if (!DataTableEntity.isEmpty(list)) {
                for (int i = 0; i < list.getRows(); i++) {
                    //循环获取创建索引语句
                    String createIndexStatement = list.getString(i, "create_index_statement");
                    String indexName = list.getString(i, "index_name");
                    if (!StringUtils.isEmpty(createIndexStatement)) {
                        //新的索引名称
                        String newIndexName = this.getIndexName(indexName, time);
                        //将建索引ddl
                        createIndexStatement = createIndexStatement.replaceAll(sourceTable, targetTable)
                                .replaceAll("\"" + indexName + "\"", "\"" + newIndexName + "\"");
                        //执行创建索引
                        targetDao.executeSql(createIndexStatement);
                    }
                }
            }
      private void createIndex(String targetTable, String time) throws BaseException {
         StringBuilder sql = new StringBuilder();
         sql.append("\n SELECT DBMS_METADATA.GET_DDL('INDEX',u.index_name) as  create_index_statement,u.INDEX_NAME ");
         sql.append("\n from USER_INDEXES u where u.TABLE_NAME=? ");
         String primaryIndexName = getPrimaryIndexName();
         List<Object> params = new ArrayList<>();
         params.add(sourceTable);
         if (!StringUtils.isEmpty(primaryIndexName)) {
            sql.append(" and u.index_name  != ? ");
            params.add(primaryIndexName);
         }
         DataTableEntity list = sourceDao.getList(sql.toString(), params.toArray());
         if (!DataTableEntity.isEmpty(list)) {
            for (int i = 0; i < list.getRows(); i++) {
               //循环获取创建索引语句
               String createIndexStatement = list.getString(i, "create_index_statement");
               String indexName = list.getString(i, "index_name");
               if (!StringUtils.isEmpty(createIndexStatement)) {
                  //新的索引名称
                  String newIndexName = this.getIndexName(indexName, time);
                  //将建索引ddl
                  createIndexStatement = createIndexStatement.replaceAll(sourceTable, targetTable)
                        .replaceAll("\"" + indexName + "\"", "\"" + newIndexName + "\"");
                  //执行创建索引
                  targetDao.executeSql(createIndexStatement);
               }
            }
         }
        }
      }
        /**
         * 获取新的索引名称
         *
         * @param indexName
         * @return
         */
        private String getIndexName(String indexName, String time) {
            //新的索引名称
            String newIndexName;
            if (indexName.length() <= 26) {
                newIndexName = indexName + time;
            } else {
                //超过26位随机生成索引名称
                newIndexName = RandomUtil.randomString(10) + time;
            }
            return newIndexName;
        }
      /**
       * 获取新的索引名称
       *
       * @param indexName
       * @return
       */
      private String getIndexName(String indexName, String time) {
         //新的索引名称
         String newIndexName;
         if (indexName.length() <= 26) {
            newIndexName = indexName + time;
         } else {
            //超过26位随机生成索引名称
            newIndexName = RandomUtil.randomString(10) + time;
         }
         return newIndexName;
      }
        /**
         * 获取主键索引的名名称
         *
         * @return
         */
        private String getPrimaryIndexName() {
            StringBuilder sql = new StringBuilder();
            sql.append("\n SELECT a.index_name from user_constraints a ");
            sql.append("\n WHERE a.constraint_type = 'P' ");
            sql.append("\n AND a.table_name = ? ");
            FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()});
            return one != null ? one.getString("index_name") : null;
        }
      /**
       * 获取主键索引的名名称
       *
       * @return
       */
      private String getPrimaryIndexName() {
         StringBuilder sql = new StringBuilder();
         sql.append("\n SELECT a.index_name from user_constraints a ");
         sql.append("\n WHERE a.constraint_type = 'P' ");
         sql.append("\n AND a.table_name = ? ");
         FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()});
         return one != null ? one.getString("index_name") : null;
      }
        /**
         * 获取创建表语句(包含主键索引)
         *
         * @return
         */
        private String getCreateTableStatement() {
            StringBuilder sql = new StringBuilder();
            sql.append(" SELECT DBMS_METADATA.GET_DDL(U.OBJECT_TYPE, u.object_name) create_table_statement ");
            sql.append(" from USER_OBJECTS u ");
            sql.append(" where U.OBJECT_TYPE ='TABLE' and u.object_name=? ");
            FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()});
            return one != null ? one.getString("create_table_statement") : null;
        }
      /**
       * 获取创建表语句(包含主键索引)
       *
       * @return
       */
      private String getCreateTableStatement() {
         StringBuilder sql = new StringBuilder();
         sql.append(" SELECT DBMS_METADATA.GET_DDL(U.OBJECT_TYPE, u.object_name) create_table_statement ");
         sql.append(" from USER_OBJECTS u ");
         sql.append(" where U.OBJECT_TYPE ='TABLE' and u.object_name=? ");
         FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()});
         return one != null ? one.getString("create_table_statement") : null;
      }
        /**
         * 获取其他索引名称
         *
         * @return
         */
        private List<String> getOtherIndexName() {
            StringBuilder sql = new StringBuilder();
            sql.append(" SELECT INDEX_NAME FROM USER_INDEXES ");
            sql.append("\n WHERE TABLE_NAME=? AND  INDEX_NAME NOT IN ( ");
            sql.append("\n SELECT a.index_name from user_constraints a ");
            sql.append("\n WHERE a.constraint_type = 'P' ");
            sql.append("\n AND a.TABLE_NAME = ? ) ");
      /**
       * 获取其他索引名称
       *
       * @return
       */
      private List<String> getOtherIndexName() {
         StringBuilder sql = new StringBuilder();
         sql.append(" SELECT INDEX_NAME FROM USER_INDEXES ");
         sql.append("\n WHERE TABLE_NAME=? AND  INDEX_NAME NOT IN ( ");
         sql.append("\n SELECT a.index_name from user_constraints a ");
         sql.append("\n WHERE a.constraint_type = 'P' ");
         sql.append("\n AND a.TABLE_NAME = ? ) ");
            DataTableEntity list = sourceDao.getList(sql.toString(), new Object[]{this.sourceTable, this.sourceTable});
            if (!DataTableEntity.isEmpty(list)) {
                return list.getData().stream().map(item -> item.getString("index_name")).collect(Collectors.toList());
            }
            return null;
        }
         DataTableEntity list = sourceDao.getList(sql.toString(), new Object[]{this.sourceTable, this.sourceTable});
         if (!DataTableEntity.isEmpty(list)) {
            return list.getData().stream().map(item -> item.getString("index_name")).collect(Collectors.toList());
         }
         return null;
      }
    }
   }
}