cheng
2023-10-25 07bd010b2bbd939ade44c7b1a299fa911f8742e8
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java
@@ -54,1046 +54,1058 @@
@Service
public class MesExternalService extends AbstractBaseService implements IMesExternalService, IRemoteService, com.product.data.service.impl.IRemoteService {
   @Value("${data.system.name}")
   private String dataSystemName;
    @Value("${data.system.name}")
    private String dataSystemName;
   private CommonService commonService = null;
    private CommonService commonService = null;
   private FieldSetEntity collectLogCache = null;
    private FieldSetEntity collectLogCache = null;
   public CommonService getCommonService() {
      if (this.commonService == null) {
         this.commonService = SpringBeanUtil.getBean(CommonService.class);
      }
      return commonService;
   }
    public CommonService getCommonService() {
        if (this.commonService == null) {
            this.commonService = SpringBeanUtil.getBean(CommonService.class);
        }
        return commonService;
    }
   public static void main(String[] args) {
      Date parse = DateUtil.parse("2023-10-23 18:22:50", "yyyy-MM-dd HH:mm:ss");
      Date parse1 = DateUtil.parse("2023-10-23 18:24:11", "yyyy-MM-dd HH:mm:ss");
      Date parse2 = DateUtil.parse("2023-10-23 18:24:21", "yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args) {
        Date parse = DateUtil.parse("2023-10-23 18:22:50", "yyyy-MM-dd HH:mm:ss");
        Date parse1 = DateUtil.parse("2023-10-23 18:24:11", "yyyy-MM-dd HH:mm:ss");
        Date parse2 = DateUtil.parse("2023-10-23 18:24:21", "yyyy-MM-dd HH:mm:ss");
      Set<Date> set = Sets.newHashSet(parse, parse1, parse2);
        Set<Date> set = Sets.newHashSet(parse, parse1, parse2);
      List<Date> sets = CollectionUtil.toList(set.toArray(new Date[0]));
        List<Date> sets = CollectionUtil.toList(set.toArray(new Date[0]));
      Optional<Date> max = sets.stream().max(Comparator.comparing((a) -> a.getTime()));
      System.out.println(max.get());
        Optional<Date> max = sets.stream().max(Comparator.comparing((a) -> a.getTime()));
        System.out.println(max.get());
   }
    }
   public void splitTableData() {
      FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
      if (FieldSetEntity.isEmpty(reportDbConfig)) {
         throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
      }
      DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
      Dao reportDao = dbe.getDao();
      Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
    public void splitTableData() {
//        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
//        if (FieldSetEntity.isEmpty(reportDbConfig)) {
//            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
//        }
//        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
//        Dao reportDao = dbe.getDao();
//        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
//
//        for (String tableName : trackingTableSet) {
//            //获取年份从表名最后一个 下划线开始截取
//            String year = tableName.substring(tableName.lastIndexOf("_") + 1);
//            if (!StringUtils.equalsAny(year, "2023") || year.length() > 4) {
//                continue;
//            }
//            //获取表前缀
//            String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
//            ExecutorService executorService = Executors.newFixedThreadPool(12);
//            for (int i = 1; i <= 12; i++) {
//                final int finalI = i;
//                executorService.submit(() -> {
//                    Dao currentDao = dbe.newDao();
//                    //获取当前月份 以MM格式化
//                    String month = String.format("%02d", finalI);
//                    //检查月份对应的表是否存在
//                    String monthTableName = tablePrefix + "_" + year + month;
//                    Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
//                    if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
//                        //根据原始表结构创建新表
//                        String sql = "create table " + monthTableName + " like " + tableName;
//                        currentDao.executeSql(sql);
//                        SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
//                    }
//                    String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
//                    currentDao.executeSql(sql);
//                    currentDao.closeConnection();
//                });
//
//            }
//            executorService.shutdown();
//            while (true) {
//                try {
//                    if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
//                    Thread.sleep(5000);
//                    SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//
//            }
//        }
//        reportDao.closeConnection();
    }
      for (String tableName : trackingTableSet) {
         //获取年份从表名最后一个 下划线开始截取
         String year = tableName.substring(tableName.lastIndexOf("_") + 1);
         if (StringUtils.equalsAny(year, "2017", "2018") || year.length() > 4) {
            continue;
         }
         //获取表前缀
         String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
         ExecutorService executorService = Executors.newFixedThreadPool(12);
         for (int i = 1; i <= 12; i++) {
            if ("2023".equals(year) && i > 9) {
               break;
            }
            final int finalI = i;
            executorService.submit(() -> {
               Dao currentDao = dbe.newDao();
               //获取当前月份 以MM格式化
               String month = String.format("%02d", finalI);
               //检查月份对应的表是否存在
               String monthTableName = tablePrefix + "_" + year + month;
               Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
               if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
                  //根据原始表结构创建新表
                  String sql = "create table " + monthTableName + " like " + tableName;
                  currentDao.executeSql(sql);
                  SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
               }
               String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
               currentDao.executeSql(sql);
               currentDao.closeConnection();
            });
    /**
     * 获取历史数据
     */
    public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException {
         }
         executorService.shutdown();
         while (true) {
            try {
               if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
               Thread.sleep(5000);
               SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
            } catch (InterruptedException e) {
               e.printStackTrace();
            }
        //机号
        String serialNumber = fse.getString("serial_number");
        SpringMVCContextHolder.getSystemLogger().info("准备回写机号数据:" + serialNumber);
        if (StringUtils.isEmpty(serialNumber)) {
            throw new BaseException(ErrorCode.SERIAL_NUMBER_IS_NULL);
        }
         }
      }
      reportDao.closeConnection();
   }
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
        if (FieldSetEntity.isEmpty(reportDbConfig)) {
            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
        }
        DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "upper(source_table) in ('T_WIP_TRACKING','T_WIP_PRODUCT_KEYP','T_WIP_DETAIL','T_PM_PRODUCT_SN') and data_source in (select  uuid from product_sys_data_sync_manager)", new Object[]{}, new Object[]{"uuid,id,data_source,source_table"});
   /**
    * 获取历史数据
    */
   public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException {
        Map<String, List<FieldSetEntity>> groupByCollectId = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("id")));
        Map<String, List<FieldSetEntity>> groupBySourceTable = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("source_table")));
      //机号
      String serialNumber = fse.getString("serial_number");
      SpringMVCContextHolder.getSystemLogger().info("准备回写机号数据:" + serialNumber);
      if (StringUtils.isEmpty(serialNumber)) {
         throw new BaseException(ErrorCode.SERIAL_NUMBER_IS_NULL);
      }
        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
        Dao reportDao = dbe.getDao();
        String reportDbName = dbe.getDbName();
        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_tracking");
        HistoryEntity trackingData = historyBeforeDispose(getData(reportDao, trackingTableSet, "serial_number",
                serialNumber, new ErrorCode[]{ErrorCode.TRACKING_TABLE_NOT_EXISTS, ErrorCode.NOT_FOUND_SERIAL_NUMBER}), CmnConst.T_WIP_TRACKING);
      FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
      if (FieldSetEntity.isEmpty(fs)) {
         throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
      }
      FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
      if (FieldSetEntity.isEmpty(reportDbConfig)) {
         throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
      }
      DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "upper(source_table) in ('T_WIP_TRACKING','T_WIP_PRODUCT_KEYP','T_WIP_DETAIL','T_PM_PRODUCT_SN') and data_source in (select  uuid from product_sys_data_sync_manager)", new Object[]{}, new Object[]{"uuid,id,data_source,source_table"});
        Set<String> keypTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_product_keyp");
        HistoryEntity keypData = historyBeforeDispose(getData(reportDao, keypTableSet, "pk_product_sn", serialNumber, new ErrorCode[]{ErrorCode.KEYP_TABLE_NOT_EXISTS, ErrorCode.KEYP_DATA_NOT_FOUND}), CmnConst.T_WIP_PRODUCT_KEYP);
      Map<String, List<FieldSetEntity>> groupByCollectId = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("id")));
      Map<String, List<FieldSetEntity>> groupBySourceTable = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("source_table")));
      DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
      Dao reportDao = dbe.getDao();
      String reportDbName = dbe.getDbName();
      Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_tracking");
      HistoryEntity trackingData = historyBeforeDispose(getData(reportDao, trackingTableSet, "serial_number",
            serialNumber, new ErrorCode[]{ErrorCode.TRACKING_TABLE_NOT_EXISTS, ErrorCode.NOT_FOUND_SERIAL_NUMBER}), CmnConst.T_WIP_TRACKING);
      Set<String> keypTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_product_keyp");
      HistoryEntity keypData = historyBeforeDispose(getData(reportDao, keypTableSet, "pk_product_sn", serialNumber, new ErrorCode[]{ErrorCode.KEYP_TABLE_NOT_EXISTS, ErrorCode.KEYP_DATA_NOT_FOUND}), CmnConst.T_WIP_PRODUCT_KEYP);
      Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
      HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
        Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
        HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
//      Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
//      HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
      //主库数据源配置
      String masterDataSource = fs.getString("data_source");
      dbe = new DataBaseEntity(masterDataSource);
      Map<String, Dao> groupDao = new HashMap<>();
      Dao dao = dbe.getDao();
      boolean success = false;
      try {
         Connection connection = dao.getConnection();
         connection.setAutoCommit(false);
         HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
         insertMasterTableData(dao, historyEntities);
         insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
         connection.commit();
         batchCommit(groupDao);
         success = true;
         SpringMVCContextHolder.getSystemLogger().info("回写机号:" + serialNumber + "成功");
      } catch (Exception e) {
         e.printStackTrace();
         SpringMVCContextHolder.getSystemLogger().error("回写机号:" + serialNumber + "失败");
         SpringMVCContextHolder.getSystemLogger().error(e);
         throw new BaseException(ErrorCode.INSERT_DATA_FAIL);
      } finally {
         try {
            if (!success && !dao.getConnection().getAutoCommit()) {
               dao.getConnection().rollback();
            }
            dao.closeConnection();
            for (Dao value : groupDao.values()) {
               try (Connection connection = value.getConnection()) {
                  if (!connection.getAutoCommit()) {
                     connection.rollback();
                  }
               }
               value.closeConnection();
        //主库数据源配置
        String masterDataSource = fs.getString("data_source");
        dbe = new DataBaseEntity(masterDataSource);
        Map<String, Dao> groupDao = new HashMap<>();
        Dao dao = dbe.getDao();
        boolean success = false;
        try {
            Connection connection = dao.getConnection();
            connection.setAutoCommit(false);
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
            insertMasterTableData(dao, historyEntities);
            insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
            connection.commit();
            batchCommit(groupDao);
            success = true;
            SpringMVCContextHolder.getSystemLogger().info("回写机号:" + serialNumber + "成功");
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error("回写机号:" + serialNumber + "失败");
            SpringMVCContextHolder.getSystemLogger().error(e);
            throw new BaseException(ErrorCode.INSERT_DATA_FAIL);
        } finally {
            try {
                if (!success && !dao.getConnection().getAutoCommit()) {
                    dao.getConnection().rollback();
                }
                dao.closeConnection();
                for (Dao value : groupDao.values()) {
                    try (Connection connection = value.getConnection()) {
                        if (!connection.getAutoCommit()) {
                            connection.rollback();
                        }
                    }
                    value.closeConnection();
            }
         } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
         }
      }
   }
                }
            } catch (Exception e) {
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
   public void batchCommit(Map<String, Dao> groupDao) throws SQLException {
      for (Dao value : groupDao.values()) {
         value.getConnection().commit();
      }
   }
    public void batchCommit(Map<String, Dao> groupDao) throws SQLException {
        for (Dao value : groupDao.values()) {
            value.getConnection().commit();
        }
    }
   /**
    * 插入数据到主库
    *
    * @param dao
    * @param historyEntities
    */
   public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
      for (HistoryEntity historyEntity : historyEntities) {
         if (historyEntity == null) {
            continue;
         }
         DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
         if (DataTableEntity.isEmpty(masterDataTable)) {
            continue;
         }
         Object[] objects = masterDataTable.getData().stream().map(item -> {
            String primaryValue = item.getString(historyEntity.getPrimaryField());
            //判断是否为数值且包含小数点 小数点后面是否全是0
            if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                  indexOf(".") + 1).matches("^0*$")) {
               //返回整数字符串
               return primaryValue.substring(0, primaryValue.indexOf("."));
            }
            return primaryValue;
         }).toArray();
         //查询主库数据是否存在
         DataTableEntity list = dao.getList(historyEntity.getTableName(),
               BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), objects.length, true),
               new String[]{historyEntity.getPrimaryField()}, objects);
         List<String> existsPrimaryValues = null;
         if (!DataTableEntity.isEmpty(list)) {
            existsPrimaryValues = list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
         }
         for (int i = 0; i < masterDataTable.getRows(); i++) {
            String primaryValue = masterDataTable.getString(i, historyEntity.getPrimaryField());
            if (existsPrimaryValues != null && existsPrimaryValues.contains(primaryValue)) {
               //数据存在跳过该数据
               continue;
            }
            FieldSetEntity fieldSetEntity = masterDataTable.getFieldSetEntity(i);
            fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
            fieldSetEntity.remove("~table_name~");
            dao.add(fieldSetEntity);
         }
      }
   }
    /**
     * 插入数据到主库
     *
     * @param dao
     * @param historyEntities
     */
    public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
            if (DataTableEntity.isEmpty(masterDataTable)) {
                continue;
            }
            Object[] objects = masterDataTable.getData().stream().map(item -> {
                String primaryValue = item.getString(historyEntity.getPrimaryField());
                //判断是否为数值且包含小数点 小数点后面是否全是0
                if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                        indexOf(".") + 1).matches("^0*$")) {
                    //返回整数字符串
                    return primaryValue.substring(0, primaryValue.indexOf("."));
                }
                return primaryValue;
            }).toArray();
            //查询主库数据是否存在
            DataTableEntity list = dao.getList(historyEntity.getTableName(),
                    BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), objects.length, true),
                    new String[]{historyEntity.getPrimaryField()}, objects);
            List<String> existsPrimaryValues = null;
            if (!DataTableEntity.isEmpty(list)) {
                existsPrimaryValues = list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
            }
            for (int i = 0; i < masterDataTable.getRows(); i++) {
                String primaryValue = masterDataTable.getString(i, historyEntity.getPrimaryField());
                if (existsPrimaryValues != null && existsPrimaryValues.contains(primaryValue)) {
                    //数据存在跳过该数据
                    continue;
                }
                FieldSetEntity fieldSetEntity = masterDataTable.getFieldSetEntity(i);
                fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                fieldSetEntity.remove("~table_name~");
                dao.add(fieldSetEntity);
            }
        }
    }
   /**
    * 插入子库表数据
    *
    * @param groupDao           子库dao
    * @param groupByCollectId   采集配置按采集id分组
    * @param groupBySourceTable 采集配置按表名分组
    * @param historyEntities    历史数据
    */
   public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                           Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
      for (HistoryEntity historyEntity : historyEntities) {
         if (historyEntity == null) {
            continue;
         }
         Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
         if (groupData == null || groupData.isEmpty()) {
            continue;
         }
         for (Map.Entry<String, List<FieldSetEntity>> entry : groupData.entrySet()) {
            List<Dao> daoList = new ArrayList<>();
            if ("ch-kt".equals(entry.getKey())) {
               List<FieldSetEntity> fieldSetEntityList = groupBySourceTable.get(historyEntity.getTableName().toLowerCase());
               Set<String> dataSourceSet = fieldSetEntityList.stream().map(item -> item.getString("data_source")).collect(Collectors.toSet());
               for (String sourceUuid : dataSourceSet) {
                  daoList.add(getDao(groupDao, sourceUuid));
               }
            } else {
               daoList.add(getDao(groupDao, groupByCollectId.get(entry.getKey()).get(0).getString("data_source")));
            }
            for (Dao dao : daoList) {
               List<FieldSetEntity> value = entry.getValue();
               //查询已存在的数据
               DataTableEntity list = dao.getList(historyEntity.getTableName(),
                     BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), value.size(), true),
                     value.stream().map(item -> {
                        String primaryValue = item.getString(historyEntity.getPrimaryField());
                        //判断是否为数值且包含小数点 小数点后面是否全是0
                        if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                              indexOf(".") + 1).matches("^0*$")) {
                           //返回整数字符串
                           return primaryValue.substring(0, primaryValue.indexOf("."));
                        }
                        return primaryValue;
                     }).toArray());
               List<String> existIds = DataTableEntity.isEmpty(list) ? null : list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
               for (FieldSetEntity fieldSetEntity : value) {
                  String primaryValue = fieldSetEntity.getString(historyEntity.getPrimaryField());
                  if (existIds != null && existIds.contains(NumberUtil.parseNumber(primaryValue).toString())) {
                     continue;
                  }
                  fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                  fieldSetEntity.remove("~table_name~");
                  dao.add(fieldSetEntity);
               }
            }
         }
      }
   }
    /**
     * 插入子库表数据
     *
     * @param groupDao           子库dao
     * @param groupByCollectId   采集配置按采集id分组
     * @param groupBySourceTable 采集配置按表名分组
     * @param historyEntities    历史数据
     */
    public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                                   Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
            if (groupData == null || groupData.isEmpty()) {
                continue;
            }
            for (Map.Entry<String, List<FieldSetEntity>> entry : groupData.entrySet()) {
                List<Dao> daoList = new ArrayList<>();
                if ("ch-kt".equals(entry.getKey())) {
                    List<FieldSetEntity> fieldSetEntityList = groupBySourceTable.get(historyEntity.getTableName().toLowerCase());
                    Set<String> dataSourceSet = fieldSetEntityList.stream().map(item -> item.getString("data_source")).collect(Collectors.toSet());
                    for (String sourceUuid : dataSourceSet) {
                        daoList.add(getDao(groupDao, sourceUuid));
                    }
                } else {
                    daoList.add(getDao(groupDao, groupByCollectId.get(entry.getKey()).get(0).getString("data_source")));
                }
                for (Dao dao : daoList) {
                    List<FieldSetEntity> value = entry.getValue();
                    //查询已存在的数据
                    DataTableEntity list = dao.getList(historyEntity.getTableName(),
                            BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), value.size(), true),
                            value.stream().map(item -> {
                                String primaryValue = item.getString(historyEntity.getPrimaryField());
                                //判断是否为数值且包含小数点 小数点后面是否全是0
                                if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                                        indexOf(".") + 1).matches("^0*$")) {
                                    //返回整数字符串
                                    return primaryValue.substring(0, primaryValue.indexOf("."));
                                }
                                return primaryValue;
                            }).toArray());
                    List<String> existIds = DataTableEntity.isEmpty(list) ? null : list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
                    for (FieldSetEntity fieldSetEntity : value) {
                        String primaryValue = fieldSetEntity.getString(historyEntity.getPrimaryField());
                        if (existIds != null && existIds.contains(NumberUtil.parseNumber(primaryValue).toString())) {
                            continue;
                        }
                        fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                        fieldSetEntity.remove("~table_name~");
                        dao.add(fieldSetEntity);
                    }
                }
            }
        }
    }
   public Dao getDao(Map<String, Dao> groupDao, String sourceUuid) throws Exception {
      Dao dao = groupDao.get(sourceUuid);
      if (null == dao) {
         DataBaseEntity dbe = new DataBaseEntity(sourceUuid);
         dao = dbe.getDao();
         dao.getConnection().setAutoCommit(false);
         groupDao.put(sourceUuid, dao);
      }
      return dao;
   }
    public Dao getDao(Map<String, Dao> groupDao, String sourceUuid) throws Exception {
        Dao dao = groupDao.get(sourceUuid);
        if (null == dao) {
            DataBaseEntity dbe = new DataBaseEntity(sourceUuid);
            dao = dbe.getDao();
            dao.getConnection().setAutoCommit(false);
            groupDao.put(sourceUuid, dao);
        }
        return dao;
    }
   public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
      if (DataTableEntity.isEmpty(dt)) {
         return null;
      }
      HistoryEntity historyEntity = new HistoryEntity();
      historyEntity.setMoNumberField("mo_number");
      if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
         historyEntity.setPrimaryField("wip_id");
         historyEntity.setTimeField("update_date");
      } else if (CmnConst.T_WIP_PRODUCT_KEYP.equalsIgnoreCase(targetTableName)) {
         historyEntity.setPrimaryField("pk_id");
         historyEntity.setTimeField("pk_loadtime");
    public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
        if (DataTableEntity.isEmpty(dt)) {
            return null;
        }
        HistoryEntity historyEntity = new HistoryEntity();
        historyEntity.setMoNumberField("mo_number");
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("wip_id");
            historyEntity.setTimeField("update_date");
        } else if (CmnConst.T_WIP_PRODUCT_KEYP.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("pk_id");
            historyEntity.setTimeField("pk_loadtime");
         historyEntity.setMoNumberField("pk_mo");
      } else if (CmnConst.T_WIP_DETAIL.equalsIgnoreCase(targetTableName)) {
         historyEntity.setPrimaryField("wip_detail_id");
         historyEntity.setTimeField("update_date");
      } else if (CmnConst.T_PM_PRODUCT_SN.equalsIgnoreCase(targetTableName)) {
         historyEntity.setPrimaryField("row_id");
         historyEntity.setTimeField("update_date");
      } else {
         return null;
      }
      historyEntity.setTableName(targetTableName.toUpperCase());
      Date now = new Date();
      for (int i = 0; i < dt.getRows(); i++) {
         Map<Object, Object> values = dt.getFieldSetEntity(i).getValues();
         //遍历map中的value是否为数字,如果是数字判断小数点后面是否有值,如果没有值则转换为整数
         for (Map.Entry<Object, Object> entry : values.entrySet()) {
            if (entry.getValue() instanceof Number) {
               Number number = (Number) entry.getValue();
               if (number.doubleValue() == number.intValue()) {
                  entry.setValue(number.intValue());
               }
            }
         }
      }
      if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName) && dt.getRows() > 1) {
         List<FieldSetEntity> data = dt.getData();
         FieldSetEntity newData = null;
         for (int i = 0; i < data.size(); i++) {
            FieldSetEntity fieldSetEntity = data.get(i);
            if ("ch-kt".equals(fieldSetEntity.getValue("pre_master_key"))) {
               data.remove(i);
               dt.removeFieldSetEntity(i);
               break;
            } else if (newData == null) {
               newData = fieldSetEntity;
               continue;
            }
            historyEntity.setMoNumberField("pk_mo");
        } else if (CmnConst.T_WIP_DETAIL.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("wip_detail_id");
            historyEntity.setTimeField("update_date");
        } else if (CmnConst.T_PM_PRODUCT_SN.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("row_id");
            historyEntity.setTimeField("update_date");
        } else {
            return null;
        }
        historyEntity.setTableName(targetTableName.toUpperCase());
        Date now = new Date();
        for (int i = 0; i < dt.getRows(); i++) {
            Map<Object, Object> values = dt.getFieldSetEntity(i).getValues();
            //遍历map中的value是否为数字,如果是数字判断小数点后面是否有值,如果没有值则转换为整数
            for (Map.Entry<Object, Object> entry : values.entrySet()) {
                if (entry.getValue() instanceof Number) {
                    Number number = (Number) entry.getValue();
                    if (number.doubleValue() == number.intValue()) {
                        entry.setValue(number.intValue());
                    }
                }
            }
        }
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName) && dt.getRows() > 1) {
            List<FieldSetEntity> data = dt.getData();
            FieldSetEntity newData = null;
            for (int i = 0; i < data.size(); i++) {
                FieldSetEntity fieldSetEntity = data.get(i);
                if ("ch-kt".equals(fieldSetEntity.getValue("pre_master_key"))) {
                    data.remove(i);
                    dt.removeFieldSetEntity(i);
                    break;
                } else if (newData == null) {
                    newData = fieldSetEntity;
                    continue;
                }
            Date date = fieldSetEntity.getDate(historyEntity.getTimeField());
            if (date.getTime() > newData.getDate(historyEntity.getTimeField()).getTime()) {
               newData = fieldSetEntity;
            }
         }
         if (newData != null) {
            dt = new DataTableEntity();
            dt.addFieldSetEntity(newData);
         }
         //在list中取出时间最近的数据
         Optional<FieldSetEntity> max = data.stream().max(Comparator.comparing((a) -> a.getDate(historyEntity.getTimeField()).getTime()));
                Date date = fieldSetEntity.getDate(historyEntity.getTimeField());
                if (date.getTime() > newData.getDate(historyEntity.getTimeField()).getTime()) {
                    newData = fieldSetEntity;
                }
            }
            if (newData != null) {
                dt = new DataTableEntity();
                dt.addFieldSetEntity(newData);
            }
        }
        DataTableEntity subData = dt.clones();
        DataTableEntity masterData = dt.clones();
        for (int i = 0; i < subData.getRows(); i++) {
            FieldSetEntity fs = subData.getFieldSetEntity(i);
            String preMasterKey = fs.getString("pre_master_key");
            if (historyEntity.getPrimaryField().equals(preMasterKey)) {
                preMasterKey = null;
      }
      DataTableEntity subData = dt.clones();
      DataTableEntity masterData = dt.clones();
      for (int i = 0; i < subData.getRows(); i++) {
         FieldSetEntity fs = subData.getFieldSetEntity(i);
         String preMasterKey = fs.getString("pre_master_key");
         if (historyEntity.getPrimaryField().equals(preMasterKey)) {
            preMasterKey = null;
         }
         if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) {
            preMasterKey = fs.getString(historyEntity.getPrimaryField());
         }
         fs.setValue(historyEntity.getPrimaryField(), preMasterKey);
         fs.setValue("pre_master_key", preMasterKey);
         masterData.setFieldValue(i, historyEntity.getTimeField(), now);
         fs.setValue(historyEntity.getTimeField(), now);
         fs.remove("~table_name~");
         masterData.getFieldSetEntity(i).remove("~table_name~");
         masterData.setFieldValue(i, "pre_master_key", null);
         if (StringUtils.isEmpty(historyEntity.getPrimaryField())) {
            subData.removeFieldSetEntity(i);
            masterData.removeFieldSetEntity(i);
            i--;
         }
      }
      historyEntity.setArchivedDataTable(dt);
      historyEntity.setMasterDataTable(masterData);
      historyEntity.setSubDataTable(subData);
      return historyEntity;
   }
            }
            if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) {
                preMasterKey = fs.getString(historyEntity.getPrimaryField());
                masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
            }
            fs.setValue(historyEntity.getPrimaryField(), preMasterKey);
            fs.setValue("pre_master_key", null);
            masterData.setFieldValue(i, historyEntity.getTimeField(), now);
            fs.setValue(historyEntity.getTimeField(), now);
            fs.remove("~table_name~");
            masterData.getFieldSetEntity(i).remove("~table_name~");
            if (StringUtils.isEmpty(preMasterKey)) {
                masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
            }
            if (StringUtils.isEmpty(historyEntity.getPrimaryField())) {
                subData.removeFieldSetEntity(i);
                masterData.removeFieldSetEntity(i);
                i--;
            }
        }
        historyEntity.setArchivedDataTable(dt);
        historyEntity.setMasterDataTable(masterData);
        historyEntity.setSubDataTable(subData);
        return historyEntity;
    }
   public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
         serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
      if (CollectionUtil.isEmpty(tableSet)) {
         throw new BaseException(errorCodes[0]);
      }
      String[] tableArray = tableSet.toArray(new String[]{});
      CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
      //多线程查询单张表,等待所有线程查询完毕
      for (String tableName : tableArray) {
         objectCompletionService.submit(() -> dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber}));
      }
      DataTableEntity data = new DataTableEntity();
      for (int i = 0; i < tableArray.length; i++) {
         DataTableEntity dataTableEntity = objectCompletionService.take().get();
         BaseUtil.dataTableMerge(data, dataTableEntity);
      }
      return data;
   }
    public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
            serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
        if (CollectionUtil.isEmpty(tableSet)) {
            throw new BaseException(errorCodes[0]);
        }
        String[] tableArray = tableSet.toArray(new String[]{});
        StringBuilder sql = new StringBuilder();
        for (String tableName : tableArray) {
            if (sql.length() > 0) {
                sql.append(" union all ");
            }
            sql.append("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?");
        }
        DataTableEntity data = dao.getList(sql.toString(), Arrays.stream(tableArray).map((t) -> serialNumber).toArray());
//        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
//        //多线程查询单张表,等待所有线程查询完毕
//        for (String tableName : tableArray) {
//            objectCompletionService.submit(() -> {
//                DataTableEntity list = dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber});
//                return list;
//            });
//        }
//        DataTableEntity data = new DataTableEntity();
//        for (int i = 0; i < tableArray.length; i++) {
//            DataTableEntity dataTableEntity = objectCompletionService.take().get();
//            BaseUtil.dataTableMerge(data, dataTableEntity);
//        }
        if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
            throw new BaseException(errorCodes[1]);
        }
        return data;
    }
   /**
    * 初始化制令单
    *
    * @param moNumbers
    * @throws BaseException
    */
   public void initMoBase(String[] moNumbers) throws BaseException {
      FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
      if (FieldSetEntity.isEmpty(fs)) {
         throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
      }
    /**
     * 初始化制令单
     *
     * @param moNumbers
     * @throws BaseException
     */
    public void initMoBase(String[] moNumbers) throws BaseException {
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
      String[] targetDataSource = fs.getString("target_data_source").split(",");
        String[] targetDataSource = fs.getString("target_data_source").split(",");
      Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
      StringBuilder errorMsg = new StringBuilder();
      for (Dao dao : targetDao) {
         for (String moNumber : moNumbers) {
            synchronized (moNumber.intern()) {
               try (Connection connection = dao.getConnection();
                   CallableStatement callableStatement = connection.prepareCall(
                         "{CALL SMT_T_PM_MO_BASE(?,?,?,?,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,?,null,null" +
                               ",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null)}")) {
                  callableStatement.setInt(1, 5);
                  callableStatement.registerOutParameter(2, OracleTypes.SMALLINT);
                  callableStatement.registerOutParameter(3, OracleTypes.VARCHAR);
                  callableStatement.setString(4, moNumber);
                  callableStatement.setString(5, "1");
                  callableStatement.execute();
                  //执行后返回的错误码
                  int errorCode = callableStatement.getInt(2);
                  //执行后返回的错误信息
                  String errorText = callableStatement.getString(3);
                  if (errorCode != 0) {
                     //错误的
                     errorMsg.append("制令单号:").append(moNumber);
                     errorMsg.append("\nerrorCode:").append(errorCode);
                     errorMsg.append("\nerrorText:").append(errorText);
                  }
               } catch (Exception e) {
                  SpringMVCContextHolder.getSystemLogger().error(e);
                  e.printStackTrace();
                  errorMsg.append("制令单号:").append(moNumber);
                  errorMsg.append("\n执行时未知错误:").append(e.getMessage());
               }
            }
         }
      }
      if (errorMsg.length() > 0) {
         throw new BaseException(ErrorCode.SUB_MO_BASE_INI_FAIL.getValue(), ErrorCode.SUB_MO_BASE_INI_FAIL.getText() + "。\n" + errorMsg);
      }
   }
        Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
        StringBuilder errorMsg = new StringBuilder();
        for (Dao dao : targetDao) {
            for (String moNumber : moNumbers) {
                synchronized (moNumber.intern()) {
                    try (Connection connection = dao.getConnection();
                         CallableStatement callableStatement = connection.prepareCall(
                                 "{CALL SMT_T_PM_MO_BASE(?,?,?,?,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,?,null,null" +
                                         ",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null)}")) {
                        callableStatement.setInt(1, 5);
                        callableStatement.registerOutParameter(2, OracleTypes.SMALLINT);
                        callableStatement.registerOutParameter(3, OracleTypes.VARCHAR);
                        callableStatement.setString(4, moNumber);
                        callableStatement.setString(5, "1");
                        callableStatement.execute();
                        //执行后返回的错误码
                        int errorCode = callableStatement.getInt(2);
                        //执行后返回的错误信息
                        String errorText = callableStatement.getString(3);
                        if (errorCode != 0) {
                            //错误的
                            errorMsg.append("制令单号:").append(moNumber);
                            errorMsg.append("\nerrorCode:").append(errorCode);
                            errorMsg.append("\nerrorText:").append(errorText);
                        }
                    } catch (Exception e) {
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        e.printStackTrace();
                        errorMsg.append("制令单号:").append(moNumber);
                        errorMsg.append("\n执行时未知错误:").append(e.getMessage());
                    }
                }
            }
        }
        if (errorMsg.length() > 0) {
            throw new BaseException(ErrorCode.SUB_MO_BASE_INI_FAIL.getValue(), ErrorCode.SUB_MO_BASE_INI_FAIL.getText() + "。\n" + errorMsg);
        }
    }
   /**
    * @param moNumbers 制令单号,多个逗号分隔
    * @param type      操作类型 1 创建制令单 2 更新制令单中的指定字段
    * @throws BaseException
    */
   public void updateMoBase(String[] moNumbers, int type) throws BaseException {
      FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
      if (FieldSetEntity.isEmpty(fs)) {
         throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
      }
      String dataSource = fs.getString("data_source");
      String[] targetDataSource = fs.getString("target_data_source").split(",");
      StringBuilder errorMessage = new StringBuilder();
      DataBaseEntity dbe = new DataBaseEntity(dataSource);
      for (int i = 0; i < moNumbers.length; i++) {
         String moNumber = moNumbers[i];
         try {
            synchronized (moNumber.intern()) {
               Dao dao = dbe.getDao();
               FieldSetEntity one = dao.getOne("T_PM_MO_BASE", "MO_NUMBER=?", new Object[]{moNumber});
               if (FieldSetEntity.isEmpty(one)) {
                  throw new BaseException(ErrorCode.MO_NUMBER_RECORD_SELECT_EMPTY);
               }
               one.setTableName("T_PM_MO_BASE");
               dao.closeConnection();
               Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
               StringBuilder errorMsg = new StringBuilder();
               try {
                  if (1 == type) {
                     createMoBase(errorMsg, one, targetDao);
                  } else if (2 == type) {
                     updateMoBase(errorMsg, one, targetDao);
                  }
                  if (errorMsg.length() > 0) {
                     //有错误
                     throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMsg);
                  }
               } finally {
                  for (Dao dao1 : targetDao) {
                     dao1.closeConnection();
                  }
               }
            }
         } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
            errorMessage.append(e.getMessage()).append("\n");
         }
    /**
     * @param moNumbers 制令单号,多个逗号分隔
     * @param type      操作类型 1 创建制令单 2 更新制令单中的指定字段
     * @throws BaseException
     */
    public void updateMoBase(String[] moNumbers, int type) throws BaseException {
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        String dataSource = fs.getString("data_source");
        String[] targetDataSource = fs.getString("target_data_source").split(",");
        StringBuilder errorMessage = new StringBuilder();
        DataBaseEntity dbe = new DataBaseEntity(dataSource);
        for (int i = 0; i < moNumbers.length; i++) {
            String moNumber = moNumbers[i];
            try {
                synchronized (moNumber.intern()) {
                    Dao dao = dbe.getDao();
                    FieldSetEntity one = dao.getOne("T_PM_MO_BASE", "MO_NUMBER=?", new Object[]{moNumber});
                    if (FieldSetEntity.isEmpty(one)) {
                        throw new BaseException(ErrorCode.MO_NUMBER_RECORD_SELECT_EMPTY);
                    }
                    one.setTableName("T_PM_MO_BASE");
                    dao.closeConnection();
                    Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
                    StringBuilder errorMsg = new StringBuilder();
                    try {
                        if (1 == type) {
                            createMoBase(errorMsg, one, targetDao);
                        } else if (2 == type) {
                            updateMoBase(errorMsg, one, targetDao);
                        }
                        if (errorMsg.length() > 0) {
                            //有错误
                            throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMsg);
                        }
                    } finally {
                        for (Dao dao1 : targetDao) {
                            dao1.closeConnection();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
                errorMessage.append(e.getMessage()).append("\n");
            }
      }
      if (errorMessage.length() > 0) {
         throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMessage);
      }
   }
        }
        if (errorMessage.length() > 0) {
            throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMessage);
        }
    }
   /**
    * 创建制令单
    *
    * @param errorMsg
    * @param moBase
    * @param subDao
    */
   private void createMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
      String moNumber = moBase.getString("mo_number");
      for (int i = 0; i < subDao.length; i++) {
         Dao dao = subDao[i];
         try {
            //检查工单表是否存在
            FieldSetEntity projectBase = dao.getOne("T_PM_PROJECT_BASE", "PROJECT_ID = ? ", new Object[]{moNumber});
            if (FieldSetEntity.isEmpty(projectBase)) {
               throw new BaseException(ErrorCode.SUB_PROJECT_BASE_CAN_NOT_EMPTY);
            }
            //检查制令单表是否已存在
            FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
            if (!FieldSetEntity.isEmpty(fs)) {
               //进行更新操作
               updateMoBase(errorMsg, moBase, new Dao[]{dao});
               continue;
            }
            //将 T_PM_MO_BASE.PROJECT_ID 更改为子库中对应 T_PM_PROJECT_BASE.PROJECT_BASE_ID
            moBase.setValue("project_id", projectBase.getString("project_base_id"));
    /**
     * 创建制令单
     *
     * @param errorMsg
     * @param moBase
     * @param subDao
     */
    private void createMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
        String moNumber = moBase.getString("mo_number");
        for (int i = 0; i < subDao.length; i++) {
            Dao dao = subDao[i];
            try {
                //检查工单表是否存在
                FieldSetEntity projectBase = dao.getOne("T_PM_PROJECT_BASE", "PROJECT_ID = ? ", new Object[]{moNumber});
                if (FieldSetEntity.isEmpty(projectBase)) {
                    throw new BaseException(ErrorCode.SUB_PROJECT_BASE_CAN_NOT_EMPTY);
                }
                //检查制令单表是否已存在
                FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
                if (!FieldSetEntity.isEmpty(fs)) {
                    //进行更新操作
                    updateMoBase(errorMsg, moBase, new Dao[]{dao});
                    continue;
                }
                //将 T_PM_MO_BASE.PROJECT_ID 更改为子库中对应 T_PM_PROJECT_BASE.PROJECT_BASE_ID
                moBase.setValue("project_id", projectBase.getString("project_base_id"));
//                moBase.setValue("row_id", -1);
            dao.add(moBase);
         } catch (BaseException e) {
            errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                  append("]").append(e.getMessageInfo());
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
         } catch (Exception e) {
            errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("创建制令单失败,未知的错误");
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
         }
      }
   }
                dao.add(moBase);
            } catch (BaseException e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                        append("]").append(e.getMessageInfo());
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            } catch (Exception e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("创建制令单失败,未知的错误");
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
   /**
    * 更新制令单
    *
    * @param errorMsg
    * @param moBase
    * @param subDao
    */
   private void updateMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
      String moNumber = moBase.getString("mo_number");
      for (int i = 0; i < subDao.length; i++) {
         Dao dao = subDao[i];
         try {
            //检查制令单表是否已存在
            FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
            if (FieldSetEntity.isEmpty(fs)) {
               //进行创建操作
               createMoBase(errorMsg, moBase, new Dao[]{dao});
               continue;
            }
            // 当子库中制令单 input_qty 字段为 0 或者 为 null 时 更新多个字段,否则只更新target_qty 字段
            String[] updateField = StringUtils.isEmpty(fs.getString("input_qty")) || "0".equals(fs.getString("input_qty")) ?
                  new String[]{"target_qty", "owner", "mo_create_date", "mo_schedule_date", "mo_due_date", "areaid", "technicsid", "close_flag", "default_group", "end_group"}
                  : new String[]{"target_qty"};
            StringBuilder sql = new StringBuilder();
            sql.append(" UPDATE T_PM_MO_BASE SET ");
            Object[] params = new Object[updateField.length + 1];
            for (int k = 0; k < updateField.length; k++) {
               if (k > 0) {
                  sql.append(",");
               }
               params[k] = moBase.getObject(updateField[k]);
               sql.append(updateField[k].toUpperCase()).append(" = ? ");
            }
            params[updateField.length] = moNumber;
            sql.append(" WHERE MO_NUMBER = ?  ");
            dao.executeSql(sql.toString(), params);
         } catch (BaseException e) {
            errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                  append("]").append(e.getMessageInfo());
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
         } catch (Exception e) {
            errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("更新制令单失败,未知的错误");
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
         }
      }
   }
    /**
     * 更新制令单
     *
     * @param errorMsg
     * @param moBase
     * @param subDao
     */
    private void updateMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
        String moNumber = moBase.getString("mo_number");
        for (int i = 0; i < subDao.length; i++) {
            Dao dao = subDao[i];
            try {
                //检查制令单表是否已存在
                FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
                if (FieldSetEntity.isEmpty(fs)) {
                    //进行创建操作
                    createMoBase(errorMsg, moBase, new Dao[]{dao});
                    continue;
                }
                // 当子库中制令单 input_qty 字段为 0 或者 为 null 时 更新多个字段,否则只更新target_qty 字段
                String[] updateField = StringUtils.isEmpty(fs.getString("input_qty")) || "0".equals(fs.getString("input_qty")) ?
                        new String[]{"target_qty", "owner", "mo_create_date", "mo_schedule_date", "mo_due_date", "areaid", "technicsid", "close_flag", "default_group", "end_group"}
                        : new String[]{"target_qty"};
                StringBuilder sql = new StringBuilder();
                sql.append(" UPDATE T_PM_MO_BASE SET ");
                Object[] params = new Object[updateField.length + 1];
                for (int k = 0; k < updateField.length; k++) {
                    if (k > 0) {
                        sql.append(",");
                    }
                    params[k] = moBase.getObject(updateField[k]);
                    sql.append(updateField[k].toUpperCase()).append(" = ? ");
                }
                params[updateField.length] = moNumber;
                sql.append(" WHERE MO_NUMBER = ?  ");
                dao.executeSql(sql.toString(), params);
            } catch (BaseException e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                        append("]").append(e.getMessageInfo());
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            } catch (Exception e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("更新制令单失败,未知的错误");
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
   private String getIp(Dao dao) {
      return ((OracleDaoImpl) dao).getDataBaseEntity().getIp();
   }
    private String getIp(Dao dao) {
        return ((OracleDaoImpl) dao).getDataBaseEntity().getIp();
    }
   /**
    * 采集配置保存
    *
    * @param request
    * @throws BaseException
    */
   @Override
   @Transactional
   public void saveCollectConfig(HttpServletRequest request) throws BaseException {
      //服务名称
      String serverName = request.getHeader("server-name");
      if (StringUtils.isEmpty(serverName)) {
         throw new BaseException(ErrorCode.SERVER_NAME_CAN_NOT_EMPTY);
      }
      FieldSetEntity fse = BaseUtil.getFieldSetEntity(request, CmnConst.PRODUCT_SYS_DATA_COLLECT);
      //采集来源
      String sourceInfo = fse.getString("id");
      if (StringUtils.isEmpty(sourceInfo) || StringUtils.contains(sourceInfo, dataSystemName)) {
         throw new BaseException(ErrorCode.COLLECT_SOURCE_VALUE);
      }
      commonSave(fse);
   }
    /**
     * 采集配置保存
     *
     * @param request
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveCollectConfig(HttpServletRequest request) throws BaseException {
        //服务名称
        String serverName = request.getHeader("server-name");
        if (StringUtils.isEmpty(serverName)) {
            throw new BaseException(ErrorCode.SERVER_NAME_CAN_NOT_EMPTY);
        }
        FieldSetEntity fse = BaseUtil.getFieldSetEntity(request, CmnConst.PRODUCT_SYS_DATA_COLLECT);
        //采集来源
        String sourceInfo = fse.getString("id");
        if (StringUtils.isEmpty(sourceInfo) || StringUtils.contains(sourceInfo, dataSystemName)) {
            throw new BaseException(ErrorCode.COLLECT_SOURCE_VALUE);
        }
        commonSave(fse);
    }
   /**
    * 提取配置保存
    *
    * @param fse
    * @throws BaseException
    */
   @Override
   @Transactional
   public void saveExtractConfig(FieldSetEntity fse) throws BaseException {
      String dataSource = fse.getString(CmnConst.TABLE_SYNC_MANAGER);
      if (!StringUtils.isEmpty(dataSource)) {
         FieldSetEntity fs = JsonUtil.pareseJsonToFieldSetEntity(dataSource);
         if (!FieldSetEntity.isEmpty(fs)) {
            commonSave(fs);
         }
      }
      commonSave(fse);
      getCommonService().saveDelRecordConfig(2, fse.getUUID());
   }
    /**
     * 提取配置保存
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveExtractConfig(FieldSetEntity fse) throws BaseException {
        String dataSource = fse.getString(CmnConst.TABLE_SYNC_MANAGER);
        if (!StringUtils.isEmpty(dataSource)) {
            FieldSetEntity fs = JsonUtil.pareseJsonToFieldSetEntity(dataSource);
            if (!FieldSetEntity.isEmpty(fs)) {
                commonSave(fs);
            }
        }
        commonSave(fse);
        getCommonService().saveDelRecordConfig(2, fse.getUUID());
    }
   /**
    * 归档配置保存
    *
    * @param fse
    * @throws BaseException
    */
   @Override
   @Transactional
   public void saveArchiveConfig(FieldSetEntity fse) throws BaseException {
      commonSave(fse);
   }
    /**
     * 归档配置保存
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveArchiveConfig(FieldSetEntity fse) throws BaseException {
        commonSave(fse);
    }
   /**
    * @param fse
    * @throws BaseException
    */
   @Override
   public void saveCollectLog(FieldSetEntity fse) throws BaseException {
      this.commonSave(fse);
   }
    /**
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveCollectLog(FieldSetEntity fse) throws BaseException {
        this.commonSave(fse);
    }
   /**
    * @param fse
    * @throws BaseException
    */
   @Override
   public void saveExtractLog(FieldSetEntity fse) throws BaseException {
      this.commonSave(fse);
   }
    /**
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveExtractLog(FieldSetEntity fse) throws BaseException {
        this.commonSave(fse);
    }
   /**
    * 数据源保存会调用该方法
    *
    * @param fse
    * @throws BaseException
    */
   @Override
   public void saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException {
      //子服务保存数据库连接配置后调用该方法,传输数据到主服务
      //TODO
   }
    /**
     * 数据源保存会调用该方法
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException {
        //子服务保存数据库连接配置后调用该方法,传输数据到主服务
        //TODO
    }
   /**
    * 定时任务生成日志会调用该方法
    *
    * @param fse
    * @throws BaseException
    */
   @Override
   public void saveTimeLog(FieldSetEntity fse) throws BaseException {
      //主服务提取日志保存后调用该方法传输到子服务
      //TODO
    /**
     * 定时任务生成日志会调用该方法
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveTimeLog(FieldSetEntity fse) throws BaseException {
        //主服务提取日志保存后调用该方法传输到子服务
        //TODO
   }
    }
   private void commonSave(FieldSetEntity fse) throws BaseException {
      String uuid = fse.getUUID();
      FieldSetEntity fs = getBaseDao().getFieldSetEntity(fse.getTableName(), new String[]{CmnConst.UUID}, uuid, false);
      Map<String, DataTableEntity> subData = fse.getSubData();
      if (!CollectionUtil.isEmpty(subData)) {
         DataTableEntity addDt = new DataTableEntity();
         DataTableEntity updateDt = new DataTableEntity();
         for (Map.Entry<String, DataTableEntity> entry : subData.entrySet()) {
            String tableName = entry.getKey();
            DataTableEntity value = entry.getValue();
            if (DataTableEntity.isEmpty(value)) {
               continue;
            }
            Object[] uuids = value.getUuids();
            DataTableEntity dt = getBaseDao().listTable(tableName, BaseUtil.buildQuestionMarkFilter(CmnConst.UUID, uuids.length, true), uuids, new String[]{CmnConst.UUID});
            for (int i = 0; i < value.getRows(); i++) {
               uuid = value.getString(i, CmnConst.UUID);
               List<FieldSetEntity> fieldSetEntity = dt.getFieldSetEntity(uuid);
               if (CollectionUtil.isEmpty(fieldSetEntity)) {
                  addDt.addFieldSetEntity(value.getFieldSetEntity(i));
                  value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add");
               } else {
                  addDt.addFieldSetEntity(updateDt.getFieldSetEntity(i));
                  value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "update");
               }
            }
            if (!DataTableEntity.isEmpty(addDt)) {
               getBaseDao().add(addDt);
            }
            if (!DataTableEntity.isEmpty(updateDt)) {
               getBaseDao().update(updateDt);
            }
         }
      }
      if (FieldSetEntity.isEmpty(fs)) {
         getBaseDao().add(fse, false);
      } else {
         getBaseDao().update(fse, false);
      }
   }
    private void commonSave(FieldSetEntity fse) throws BaseException {
        String uuid = fse.getUUID();
        FieldSetEntity fs = getBaseDao().getFieldSetEntity(fse.getTableName(), new String[]{CmnConst.UUID}, uuid, false);
        Map<String, DataTableEntity> subData = fse.getSubData();
        if (!CollectionUtil.isEmpty(subData)) {
            DataTableEntity addDt = new DataTableEntity();
            DataTableEntity updateDt = new DataTableEntity();
            for (Map.Entry<String, DataTableEntity> entry : subData.entrySet()) {
                String tableName = entry.getKey();
                DataTableEntity value = entry.getValue();
                if (DataTableEntity.isEmpty(value)) {
                    continue;
                }
                Object[] uuids = value.getUuids();
                DataTableEntity dt = getBaseDao().listTable(tableName, BaseUtil.buildQuestionMarkFilter(CmnConst.UUID, uuids.length, true), uuids, new String[]{CmnConst.UUID});
                for (int i = 0; i < value.getRows(); i++) {
                    uuid = value.getString(i, CmnConst.UUID);
                    List<FieldSetEntity> fieldSetEntity = dt.getFieldSetEntity(uuid);
                    if (CollectionUtil.isEmpty(fieldSetEntity)) {
                        addDt.addFieldSetEntity(value.getFieldSetEntity(i));
                        value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add");
                    } else {
                        addDt.addFieldSetEntity(updateDt.getFieldSetEntity(i));
                        value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "update");
                    }
                }
                if (!DataTableEntity.isEmpty(addDt)) {
                    getBaseDao().add(addDt);
                }
                if (!DataTableEntity.isEmpty(updateDt)) {
                    getBaseDao().update(updateDt);
                }
            }
        }
        if (FieldSetEntity.isEmpty(fs)) {
            getBaseDao().add(fse, false);
        } else {
            getBaseDao().update(fse, false);
        }
    }
   /**
    * 调用远程主服务器采集保存
    *
    * @param fse
    * @return
    * @throws BaseException
    */
   public FieldSetEntity remoteSaveCollectConfig(FieldSetEntity fse) throws BaseException {
      if ("ch-kt".equals(dataSystemName)) {
         return fse;
      }
      FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
      if (FieldSetEntity.isEmpty(fs)) {
         throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
      }
      //服务域名的端口
      String ipPort = fs.getString("server_url");
      String serverName = fs.getString("server_name");
      String serverUrl = ipPort + CmnConst.SAVE_COLLECT_URL;
      FieldSetEntity res = doPost(serverUrl, serverName, fse);
      return res;
   }
    /**
     * 调用远程主服务器采集保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public FieldSetEntity remoteSaveCollectConfig(FieldSetEntity fse) throws BaseException {
        if ("ch-kt".equals(dataSystemName)) {
            return fse;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
        }
        //服务域名的端口
        String ipPort = fs.getString("server_url");
        String serverName = fs.getString("server_name");
        String serverUrl = ipPort + CmnConst.SAVE_COLLECT_URL;
        FieldSetEntity res = doPost(serverUrl, serverName, fse);
        return res;
    }
   /**
    * 主服务提取保存日志后调用该方法传入到子服务保存
    *
    * @param fse
    * @return
    * @throws BaseException
    */
   public void remoteSaveExtractLog(FieldSetEntity fse) throws BaseException {
      try {
         if ("ch-kt".equals(dataSystemName)) {
            //主服务采集日志uuid
            String preStepUuid = fse.getString(CmnConst.PRE_STEP_UUID);
            StringBuilder sql = new StringBuilder();
            sql.append("\nSELECT ");
            sql.append("\nserver.* ");
            sql.append("\nFROM ").append(CmnConst.PRODUCT_SYS_DATA_COLLECT).append(" collect ");
            sql.append("\nJOIN ").append(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG).append(" log ");
            sql.append("\nON log.config_uuid=collect.uuid and log.type=1 ");
            sql.append("\nJOIN ").append(CmnConst.PRODUCT_MES_SERVER).append(" server ");
            sql.append("\nON collect.id like concat(server.server_name,'%') ");
            sql.append("\nWHERE log.uuid=? and collect.id not like concat(?,'%')  limit 1");
            FieldSetEntity fs = getBaseDao().getFieldSetBySQL(sql.toString(), new Object[]{preStepUuid, dataSystemName}, false);
            if (FieldSetEntity.isEmpty(fs)) {
               return;
            }
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_LOG_URL;
            doPostAsync(serverUrl, serverName, fse);
         }
      } catch (Exception e) {
         //捕获异常为了使采集定时任务正常运行
         SpringMVCContextHolder.getSystemLogger().error(e);
         e.printStackTrace();
      }
   }
    /**
     * 主服务提取保存日志后调用该方法传入到子服务保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveExtractLog(FieldSetEntity fse) throws BaseException {
        try {
            if ("ch-kt".equals(dataSystemName)) {
                //主服务采集日志uuid
                String preStepUuid = fse.getString(CmnConst.PRE_STEP_UUID);
                StringBuilder sql = new StringBuilder();
                sql.append("\nSELECT ");
                sql.append("\nserver.* ");
                sql.append("\nFROM ").append(CmnConst.PRODUCT_SYS_DATA_COLLECT).append(" collect ");
                sql.append("\nJOIN ").append(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG).append(" log ");
                sql.append("\nON log.config_uuid=collect.uuid and log.type=1 ");
                sql.append("\nJOIN ").append(CmnConst.PRODUCT_MES_SERVER).append(" server ");
                sql.append("\nON collect.id like concat(server.server_name,'%') ");
                sql.append("\nWHERE log.uuid=? and collect.id not like concat(?,'%')  limit 1");
                FieldSetEntity fs = getBaseDao().getFieldSetBySQL(sql.toString(), new Object[]{preStepUuid, dataSystemName}, false);
                if (FieldSetEntity.isEmpty(fs)) {
                    return;
                }
                String ipPort = fs.getString("server_url");
                String serverName = fs.getString("server_name");
                String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_LOG_URL;
                doPostAsync(serverUrl, serverName, fse);
            }
        } catch (Exception e) {
            //捕获异常为了使采集定时任务正常运行
            SpringMVCContextHolder.getSystemLogger().error(e);
            e.printStackTrace();
        }
    }
   /**
    * 调用远程主服务器保存
    *
    * @param fse
    * @return
    * @throws BaseException
    */
   public void remoteSaveExtractConfig(FieldSetEntity fse) throws BaseException {
      DataTableEntity dt = getRemoteSubServer();
      if (DataTableEntity.isEmpty(dt)) {
         return;
      }
      FieldSetEntity extractTargetSource = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, fse.getString("extract_target_source"), false);
      if (!FieldSetEntity.isEmpty(extractTargetSource)) {
         extractTargetSource.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, CmnConst.TABLE_SYNC_MANAGER);
         fse.setValue(CmnConst.TABLE_SYNC_MANAGER, BaseUtil.fieldSetEntityToJson(extractTargetSource));
      }
      for (int i = 0; i < dt.getRows(); i++) {
         FieldSetEntity fs = dt.getFieldSetEntity(i);
         //服务域名的端口
         String ipPort = fs.getString("server_url");
         String serverName = fs.getString("server_name");
         String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_URL;
         doPost(serverUrl, serverName, fse);
      }
   }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveExtractConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity dt = getRemoteSubServer();
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        FieldSetEntity extractTargetSource = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, fse.getString("extract_target_source"), false);
        if (!FieldSetEntity.isEmpty(extractTargetSource)) {
            extractTargetSource.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, CmnConst.TABLE_SYNC_MANAGER);
            fse.setValue(CmnConst.TABLE_SYNC_MANAGER, BaseUtil.fieldSetEntityToJson(extractTargetSource));
        }
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fs = dt.getFieldSetEntity(i);
            //服务域名的端口
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_URL;
            doPost(serverUrl, serverName, fse);
        }
    }
   /**
    * 调用远程主服务器保存
    *
    * @param fse
    * @return
    * @throws BaseException
    */
   public void remoteSaveArchiveConfig(FieldSetEntity fse) throws BaseException {
      DataTableEntity dt = getRemoteSubServer();
      if (DataTableEntity.isEmpty(dt)) {
         return;
      }
      for (int i = 0; i < dt.getRows(); i++) {
         FieldSetEntity fs = dt.getFieldSetEntity(i);
         //服务域名的端口
         String ipPort = fs.getString("server_url");
         String serverName = fs.getString("server_name");
         String serverUrl = ipPort + CmnConst.SAVE_ARCHIVE_URL;
         doPost(serverUrl, serverName, fse);
      }
   }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveArchiveConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity dt = getRemoteSubServer();
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fs = dt.getFieldSetEntity(i);
            //服务域名的端口
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_ARCHIVE_URL;
            doPost(serverUrl, serverName, fse);
        }
    }
   /**
    * 调用远程主服务器保存
    *
    * @param fse
    * @return
    * @throws BaseException
    */
   public void remoteSaveCollectLog(FieldSetEntity fse) throws BaseException {
      if ("ch-kt".equals(dataSystemName) || FieldSetEntity.isEmpty(fse)) {
         return;
      }
      FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
      if (FieldSetEntity.isEmpty(fs)) {
         throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
      }
      //服务域名的端口
      String ipPort = fs.getString("server_url");
      String serverName = fs.getString("server_name");
      String serverUrl = ipPort + CmnConst.SAVE_COLLECT_LOG_URL;
      doPost(serverUrl, serverName, fse);
   }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveCollectLog(FieldSetEntity fse) throws BaseException {
        if ("ch-kt".equals(dataSystemName) || FieldSetEntity.isEmpty(fse)) {
            return;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
        }
        //服务域名的端口
        String ipPort = fs.getString("server_url");
        String serverName = fs.getString("server_name");
        String serverUrl = ipPort + CmnConst.SAVE_COLLECT_LOG_URL;
        doPost(serverUrl, serverName, fse);
    }
   /**
    * 获取子服务配置
    *
    * @return
    * @throws BaseException
    */
   private DataTableEntity getRemoteSubServer() throws BaseException {
      return getBaseDao().listTable(CmnConst.PRODUCT_MES_SERVER, "server_type!=0");
   }
    /**
     * 获取子服务配置
     *
     * @return
     * @throws BaseException
     */
    private DataTableEntity getRemoteSubServer() throws BaseException {
        return getBaseDao().listTable(CmnConst.PRODUCT_MES_SERVER, "server_type!=0");
    }
   //    @Async
   void doPostAsync(String url, String serverName, FieldSetEntity fse) throws BaseException {
      doPost(url, serverName, fse);
   }
    //    @Async
    void doPostAsync(String url, String serverName, FieldSetEntity fse) throws BaseException {
        doPost(url, serverName, fse);
    }
   public String getDataSystemName() {
      return dataSystemName;
   }
    public String getDataSystemName() {
        return dataSystemName;
    }
   public boolean remoteRehandle(FieldSetEntity fse) {
      String type = fse.getString(CmnConst.TYPE);//类型
      String logUuid;
      if ("2".equals(type)) {
         logUuid = fse.getString(CmnConst.PRE_STEP_UUID);
      } else if ("1".equals(type)) {
         logUuid = fse.getUUID();
      } else {
         return false;
      }
      FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_COLLECT, "uuid =(select config_uuid FROM product_sys_data_center_log where uuid=?)", new Object[]{logUuid}, false);
      if (!FieldSetEntity.isEmpty(fs)) {
         //采集id
         String collectId = fs.getString(CmnConst.ID);
         if (collectId.indexOf(dataSystemName) == -1) {
            fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=1 and ? like concat(server_name,'%')", new Object[]{collectId}, false);
            if (!FieldSetEntity.isEmpty(fs)) {
               //服务域名的端口
               String ipPort = fs.getString("server_url");
               String serverName = fs.getString("server_name");
               String serverUrl = ipPort + CmnConst.REHANDLE_ERROR_URL;
               doPost(serverUrl, serverName, fse);
               //标记日志成功
               JournalManagerService journalManagerService = SpringUtils.getBean(JournalManagerService.class);
               journalManagerService.writeBackReDealResult(fse, true);
            }
            return true;
         }
      }
      return false;
   }
    public boolean remoteRehandle(FieldSetEntity fse) {
        String type = fse.getString(CmnConst.TYPE);//类型
        String logUuid;
        if ("2".equals(type)) {
            logUuid = fse.getString(CmnConst.PRE_STEP_UUID);
        } else if ("1".equals(type)) {
            logUuid = fse.getUUID();
        } else {
            return false;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_COLLECT, "uuid =(select config_uuid FROM product_sys_data_center_log where uuid=?)", new Object[]{logUuid}, false);
        if (!FieldSetEntity.isEmpty(fs)) {
            //采集id
            String collectId = fs.getString(CmnConst.ID);
            if (collectId.indexOf(dataSystemName) == -1) {
                fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=1 and ? like concat(server_name,'%')", new Object[]{collectId}, false);
                if (!FieldSetEntity.isEmpty(fs)) {
                    //服务域名的端口
                    String ipPort = fs.getString("server_url");
                    String serverName = fs.getString("server_name");
                    String serverUrl = ipPort + CmnConst.REHANDLE_ERROR_URL;
                    doPost(serverUrl, serverName, fse);
                    //标记日志成功
                    JournalManagerService journalManagerService = SpringUtils.getBean(JournalManagerService.class);
                    journalManagerService.writeBackReDealResult(fse, true);
                }
                return true;
            }
        }
        return false;
    }
   /**
    * post 请求
    *
    * @param url
    * @param serverName
    * @param fse
    * @throws BaseException
    */
   private FieldSetEntity doPost(String url, String serverName, FieldSetEntity fse) throws BaseException {
      fse.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, fse.getTableName());
      JSONObject requestBody = BaseUtil.fieldSetEntityToJson(fse);
      String requestData = requestBody.toJSONString();
      //签名
      String signature = SignUtil.getHmacSHA1(requestData, Global.getSystemConfig("signature.key", ""));
      try (HttpResponse response = HttpRequest.post(url)
            .contentType("application/x-www-form-urlencoded")
            .header(CoreConst.SYSTEM_LANGUAGE_CODE_, "zh-CN") //语言编码
            .header(CoreConst.SYSTEM_CLIENT_TYPE_, "Web") //客户端类型
            .header(CoreConst.SYSTEM_CLIENT_VERSION_, "1.0.0") //客户端版本
            .header("server-name", serverName) //系统名称
            .header("signature", signature) //签名
            .body("formData=" + requestData).execute()) {
         if (response.getStatus() == 200) {
            //请求成功
            String res = response.body();
            if (!StringUtils.isEmpty(res)) {
               JSONObject resBody = JSON.parseObject(res);
               if (resBody != null) {
                  String code = resBody.getString(CoreConst.API_RETURN_KEY_CODE);
                  if (!CoreConst.API_RETURN_VALUE_CODE_200.equals(code)) {
                     //服务内部抛出的错误
                     throw new BaseException("调用接口失败,", resBody.getString(CoreConst.API_RETURN_KEY_MSG));
                  }
                  String formData = resBody.getString(CoreConst.API_RETURN_KEY_DATA);
                  if (StringUtils.isEmpty(formData)) {
                     return null;
                  }
                  FieldSetEntity result = JsonUtil.pareseJsonToFieldSetEntity(formData);
                  return result;
               }
            }
         }
         throw new BaseException(ErrorCode.OPEN_API_REQUEST_FAIL);
      }
    /**
     * post 请求
     *
     * @param url
     * @param serverName
     * @param fse
     * @throws BaseException
     */
    private FieldSetEntity doPost(String url, String serverName, FieldSetEntity fse) throws BaseException {
        fse.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, fse.getTableName());
        JSONObject requestBody = BaseUtil.fieldSetEntityToJson(fse);
        String requestData = requestBody.toJSONString();
        //签名
        String signature = SignUtil.getHmacSHA1(requestData, Global.getSystemConfig("signature.key", ""));
        try (HttpResponse response = HttpRequest.post(url)
                .contentType("application/x-www-form-urlencoded")
                .header(CoreConst.SYSTEM_LANGUAGE_CODE_, "zh-CN") //语言编码
                .header(CoreConst.SYSTEM_CLIENT_TYPE_, "Web") //客户端类型
                .header(CoreConst.SYSTEM_CLIENT_VERSION_, "1.0.0") //客户端版本
                .header("server-name", serverName) //系统名称
                .header("signature", signature) //签名
                .body("formData=" + requestData).execute()) {
            if (response.getStatus() == 200) {
                //请求成功
                String res = response.body();
                if (!StringUtils.isEmpty(res)) {
                    JSONObject resBody = JSON.parseObject(res);
                    if (resBody != null) {
                        String code = resBody.getString(CoreConst.API_RETURN_KEY_CODE);
                        if (!CoreConst.API_RETURN_VALUE_CODE_200.equals(code)) {
                            //服务内部抛出的错误
                            throw new BaseException("调用接口失败,", resBody.getString(CoreConst.API_RETURN_KEY_MSG));
                        }
                        String formData = resBody.getString(CoreConst.API_RETURN_KEY_DATA);
                        if (StringUtils.isEmpty(formData)) {
                            return null;
                        }
                        FieldSetEntity result = JsonUtil.pareseJsonToFieldSetEntity(formData);
                        return result;
                    }
                }
            }
            throw new BaseException(ErrorCode.OPEN_API_REQUEST_FAIL);
        }
   }
    }
   /**
    * 获取采集日志
    *
    * @return
    */
   public FieldSetEntity getCollectLog() {
      collectLogCache = null;
      if (collectLogCache == null) {
         loadCollectLogCache();
      }
    /**
     * 获取采集日志
     *
     * @return
     */
    public FieldSetEntity getCollectLog() {
        collectLogCache = null;
        if (collectLogCache == null) {
            loadCollectLogCache();
        }
      return collectLogCache;
   }
        return collectLogCache;
    }
   public void loadCollectLogCache() {
      StringBuilder sql = new StringBuilder();
      sql.append("\n with no_extract as ( ");
      sql.append("\n    SELECT ");
      sql.append("\n       a.*  ");
      sql.append("\n    FROM ");
      sql.append("\n       product_sys_data_center_log A ");
      sql.append("\n       LEFT JOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid  ");
      sql.append("\n    WHERE ");
      sql.append("\n       a.type = 1  ");
      sql.append("\n       and a.result=1 and (a.deal_flag is null or a.deal_flag =0) ");
      sql.append("\n       AND ( ");
      sql.append("\n                b.pre_step_uuid IS NULL  ");
      sql.append("\n                OR  ");
      sql.append("\n                ( ");
      sql.append("\n                   b.pre_step_uuid=a.uuid  ");
      sql.append("\n                   AND  ");
      sql.append("\n                   (b.result=0 and (b.deal_result = 0 or b.deal_flag is null)) ");
      sql.append("\n                ) ");
      sql.append("\n       ) ");
      sql.append("\n ), ");
      sql.append("\n last_run_log as ( ");
      sql.append("\n select max(created_utc_datetime) last_time,config_uuid from product_sys_data_center_log where type=1 and result=1 and deal_flag=0 GROUP BY config_uuid ");
      sql.append("\n ) ");
      sql.append("\n     ");
      sql.append("\n SELECT ifnull(COUNT(no_extract.uuid),0) unextracted_batch ,ifnull(sum(no_extract.count),0) unextracted_total_count,b.`name`,(select last_time from last_run_log where b.uuid=last_run_log.config_uuid) last_success_time FROM  product_sys_data_collect b  ");
      sql.append("\n    left join no_extract on no_extract.config_uuid=b.uuid ");
      sql.append("\n  GROUP BY b.uuid ");
      sql.append("\n    order by b.`name` ");
    public void loadCollectLogCache() {
        StringBuilder sql = new StringBuilder();
        sql.append("\n with no_extract as ( ");
        sql.append("\n    SELECT ");
        sql.append("\n       a.*  ");
        sql.append("\n    FROM ");
        sql.append("\n       product_sys_data_center_log A ");
        sql.append("\n       LEFT JOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid  ");
        sql.append("\n    WHERE ");
        sql.append("\n       a.type = 1  ");
        sql.append("\n       and a.result=1 and (a.deal_flag is null or a.deal_flag =0) ");
        sql.append("\n       AND ( ");
        sql.append("\n                b.pre_step_uuid IS NULL  ");
        sql.append("\n                OR  ");
        sql.append("\n                ( ");
        sql.append("\n                   b.pre_step_uuid=a.uuid  ");
        sql.append("\n                   AND  ");
        sql.append("\n                   (b.result=0 and (b.deal_result = 0 or b.deal_flag is null)) ");
        sql.append("\n                ) ");
        sql.append("\n       ) ");
        sql.append("\n ), ");
        sql.append("\n last_run_log as ( ");
        sql.append("\n select max(created_utc_datetime) last_time,config_uuid from product_sys_data_center_log where type=1 and result=1 and deal_flag=0 GROUP BY config_uuid ");
        sql.append("\n ) ");
        sql.append("\n     ");
        sql.append("\n SELECT ifnull(COUNT(no_extract.uuid),0) unextracted_batch ,ifnull(sum(no_extract.count),0) unextracted_total_count,b.`name`,(select last_time from last_run_log where b.uuid=last_run_log.config_uuid) last_success_time FROM  product_sys_data_collect b  ");
        sql.append("\n    left join no_extract on no_extract.config_uuid=b.uuid ");
        sql.append("\n  GROUP BY b.uuid ");
        sql.append("\n    order by b.`name` ");
      FieldSetEntity fse = new FieldSetEntity();
      fse.setTableName("temp");
      fse.setValue("load_log_time", DateTime.now().toString());
      DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), new Object[]{});
      if (!DataTableEntity.isEmpty(dataTableEntity)) {
         fse.setValue("list", BaseUtil.dataTableEntityToJson(dataTableEntity, f -> {
            JSONObject jsonObject = f[0];
            Date lastSuccessTime = jsonObject.getDate("last_success_time");
            if (lastSuccessTime != null) {
               jsonObject.put("last_success_time", DateUtil.format(lastSuccessTime, "yyyy-MM-dd HH:mm:ss"));
            }
         }));
      } else {
         fse.setValue("list", "[]");
      }
      this.collectLogCache = fse;
   }
        FieldSetEntity fse = new FieldSetEntity();
        fse.setTableName("temp");
        fse.setValue("load_log_time", DateTime.now().toString());
        DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), new Object[]{});
        if (!DataTableEntity.isEmpty(dataTableEntity)) {
            fse.setValue("list", BaseUtil.dataTableEntityToJson(dataTableEntity, f -> {
                JSONObject jsonObject = f[0];
                Date lastSuccessTime = jsonObject.getDate("last_success_time");
                if (lastSuccessTime != null) {
                    jsonObject.put("last_success_time", DateUtil.format(lastSuccessTime, "yyyy-MM-dd HH:mm:ss"));
                }
            }));
        } else {
            fse.setValue("list", "[]");
        }
        this.collectLogCache = fse;
    }
}