cheng
2023-10-25 07bd010b2bbd939ade44c7b1a299fa911f8742e8
commit
已修改1个文件
1958 ■■■■ 文件已修改
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java 1958 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java
@@ -54,1046 +54,1058 @@
@Service
public class MesExternalService extends AbstractBaseService implements IMesExternalService, IRemoteService, com.product.data.service.impl.IRemoteService {
    @Value("${data.system.name}")
    private String dataSystemName;
    @Value("${data.system.name}")
    private String dataSystemName;
    private CommonService commonService = null;
    private CommonService commonService = null;
    private FieldSetEntity collectLogCache = null;
    private FieldSetEntity collectLogCache = null;
    public CommonService getCommonService() {
        if (this.commonService == null) {
            this.commonService = SpringBeanUtil.getBean(CommonService.class);
        }
        return commonService;
    }
    public CommonService getCommonService() {
        if (this.commonService == null) {
            this.commonService = SpringBeanUtil.getBean(CommonService.class);
        }
        return commonService;
    }
    public static void main(String[] args) {
        Date parse = DateUtil.parse("2023-10-23 18:22:50", "yyyy-MM-dd HH:mm:ss");
        Date parse1 = DateUtil.parse("2023-10-23 18:24:11", "yyyy-MM-dd HH:mm:ss");
        Date parse2 = DateUtil.parse("2023-10-23 18:24:21", "yyyy-MM-dd HH:mm:ss");
    public static void main(String[] args) {
        Date parse = DateUtil.parse("2023-10-23 18:22:50", "yyyy-MM-dd HH:mm:ss");
        Date parse1 = DateUtil.parse("2023-10-23 18:24:11", "yyyy-MM-dd HH:mm:ss");
        Date parse2 = DateUtil.parse("2023-10-23 18:24:21", "yyyy-MM-dd HH:mm:ss");
        Set<Date> set = Sets.newHashSet(parse, parse1, parse2);
        Set<Date> set = Sets.newHashSet(parse, parse1, parse2);
        List<Date> sets = CollectionUtil.toList(set.toArray(new Date[0]));
        List<Date> sets = CollectionUtil.toList(set.toArray(new Date[0]));
        Optional<Date> max = sets.stream().max(Comparator.comparing((a) -> a.getTime()));
        System.out.println(max.get());
        Optional<Date> max = sets.stream().max(Comparator.comparing((a) -> a.getTime()));
        System.out.println(max.get());
    }
    }
    public void splitTableData() {
        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
        if (FieldSetEntity.isEmpty(reportDbConfig)) {
            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
        }
        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
        Dao reportDao = dbe.getDao();
        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
    public void splitTableData() {
//        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
//        if (FieldSetEntity.isEmpty(reportDbConfig)) {
//            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
//        }
//        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
//        Dao reportDao = dbe.getDao();
//        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
//
//        for (String tableName : trackingTableSet) {
//            //获取年份从表名最后一个 下划线开始截取
//            String year = tableName.substring(tableName.lastIndexOf("_") + 1);
//            if (!StringUtils.equalsAny(year, "2023") || year.length() > 4) {
//                continue;
//            }
//            //获取表前缀
//            String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
//            ExecutorService executorService = Executors.newFixedThreadPool(12);
//            for (int i = 1; i <= 12; i++) {
//                final int finalI = i;
//                executorService.submit(() -> {
//                    Dao currentDao = dbe.newDao();
//                    //获取当前月份 以MM格式化
//                    String month = String.format("%02d", finalI);
//                    //检查月份对应的表是否存在
//                    String monthTableName = tablePrefix + "_" + year + month;
//                    Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
//                    if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
//                        //根据原始表结构创建新表
//                        String sql = "create table " + monthTableName + " like " + tableName;
//                        currentDao.executeSql(sql);
//                        SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
//                    }
//                    String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
//                    currentDao.executeSql(sql);
//                    currentDao.closeConnection();
//                });
//
//            }
//            executorService.shutdown();
//            while (true) {
//                try {
//                    if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
//                    Thread.sleep(5000);
//                    SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//
//            }
//        }
//        reportDao.closeConnection();
    }
        for (String tableName : trackingTableSet) {
            //获取年份从表名最后一个 下划线开始截取
            String year = tableName.substring(tableName.lastIndexOf("_") + 1);
            if (StringUtils.equalsAny(year, "2017", "2018") || year.length() > 4) {
                continue;
            }
            //获取表前缀
            String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
            ExecutorService executorService = Executors.newFixedThreadPool(12);
            for (int i = 1; i <= 12; i++) {
                if ("2023".equals(year) && i > 9) {
                    break;
                }
                final int finalI = i;
                executorService.submit(() -> {
                    Dao currentDao = dbe.newDao();
                    //获取当前月份 以MM格式化
                    String month = String.format("%02d", finalI);
                    //检查月份对应的表是否存在
                    String monthTableName = tablePrefix + "_" + year + month;
                    Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
                    if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
                        //根据原始表结构创建新表
                        String sql = "create table " + monthTableName + " like " + tableName;
                        currentDao.executeSql(sql);
                        SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
                    }
                    String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
                    currentDao.executeSql(sql);
                    currentDao.closeConnection();
                });
    /**
     * 获取历史数据
     */
    public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException {
            }
            executorService.shutdown();
            while (true) {
                try {
                    if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
                    Thread.sleep(5000);
                    SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
        //机号
        String serialNumber = fse.getString("serial_number");
        SpringMVCContextHolder.getSystemLogger().info("准备回写机号数据:" + serialNumber);
        if (StringUtils.isEmpty(serialNumber)) {
            throw new BaseException(ErrorCode.SERIAL_NUMBER_IS_NULL);
        }
            }
        }
        reportDao.closeConnection();
    }
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
        if (FieldSetEntity.isEmpty(reportDbConfig)) {
            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
        }
        DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "upper(source_table) in ('T_WIP_TRACKING','T_WIP_PRODUCT_KEYP','T_WIP_DETAIL','T_PM_PRODUCT_SN') and data_source in (select  uuid from product_sys_data_sync_manager)", new Object[]{}, new Object[]{"uuid,id,data_source,source_table"});
    /**
     * 获取历史数据
     */
    public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException {
        Map<String, List<FieldSetEntity>> groupByCollectId = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("id")));
        Map<String, List<FieldSetEntity>> groupBySourceTable = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("source_table")));
        //机号
        String serialNumber = fse.getString("serial_number");
        SpringMVCContextHolder.getSystemLogger().info("准备回写机号数据:" + serialNumber);
        if (StringUtils.isEmpty(serialNumber)) {
            throw new BaseException(ErrorCode.SERIAL_NUMBER_IS_NULL);
        }
        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
        Dao reportDao = dbe.getDao();
        String reportDbName = dbe.getDbName();
        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_tracking");
        HistoryEntity trackingData = historyBeforeDispose(getData(reportDao, trackingTableSet, "serial_number",
                serialNumber, new ErrorCode[]{ErrorCode.TRACKING_TABLE_NOT_EXISTS, ErrorCode.NOT_FOUND_SERIAL_NUMBER}), CmnConst.T_WIP_TRACKING);
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
        if (FieldSetEntity.isEmpty(reportDbConfig)) {
            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
        }
        DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "upper(source_table) in ('T_WIP_TRACKING','T_WIP_PRODUCT_KEYP','T_WIP_DETAIL','T_PM_PRODUCT_SN') and data_source in (select  uuid from product_sys_data_sync_manager)", new Object[]{}, new Object[]{"uuid,id,data_source,source_table"});
        Set<String> keypTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_product_keyp");
        HistoryEntity keypData = historyBeforeDispose(getData(reportDao, keypTableSet, "pk_product_sn", serialNumber, new ErrorCode[]{ErrorCode.KEYP_TABLE_NOT_EXISTS, ErrorCode.KEYP_DATA_NOT_FOUND}), CmnConst.T_WIP_PRODUCT_KEYP);
        Map<String, List<FieldSetEntity>> groupByCollectId = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("id")));
        Map<String, List<FieldSetEntity>> groupBySourceTable = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("source_table")));
        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
        Dao reportDao = dbe.getDao();
        String reportDbName = dbe.getDbName();
        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_tracking");
        HistoryEntity trackingData = historyBeforeDispose(getData(reportDao, trackingTableSet, "serial_number",
                serialNumber, new ErrorCode[]{ErrorCode.TRACKING_TABLE_NOT_EXISTS, ErrorCode.NOT_FOUND_SERIAL_NUMBER}), CmnConst.T_WIP_TRACKING);
        Set<String> keypTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_product_keyp");
        HistoryEntity keypData = historyBeforeDispose(getData(reportDao, keypTableSet, "pk_product_sn", serialNumber, new ErrorCode[]{ErrorCode.KEYP_TABLE_NOT_EXISTS, ErrorCode.KEYP_DATA_NOT_FOUND}), CmnConst.T_WIP_PRODUCT_KEYP);
        Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
        HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
        Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
        HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
//        Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
//        HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
        //主库数据源配置
        String masterDataSource = fs.getString("data_source");
        dbe = new DataBaseEntity(masterDataSource);
        Map<String, Dao> groupDao = new HashMap<>();
        Dao dao = dbe.getDao();
        boolean success = false;
        try {
            Connection connection = dao.getConnection();
            connection.setAutoCommit(false);
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
            insertMasterTableData(dao, historyEntities);
            insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
            connection.commit();
            batchCommit(groupDao);
            success = true;
            SpringMVCContextHolder.getSystemLogger().info("回写机号:" + serialNumber + "成功");
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error("回写机号:" + serialNumber + "失败");
            SpringMVCContextHolder.getSystemLogger().error(e);
            throw new BaseException(ErrorCode.INSERT_DATA_FAIL);
        } finally {
            try {
                if (!success && !dao.getConnection().getAutoCommit()) {
                    dao.getConnection().rollback();
                }
                dao.closeConnection();
                for (Dao value : groupDao.values()) {
                    try (Connection connection = value.getConnection()) {
                        if (!connection.getAutoCommit()) {
                            connection.rollback();
                        }
                    }
                    value.closeConnection();
        //主库数据源配置
        String masterDataSource = fs.getString("data_source");
        dbe = new DataBaseEntity(masterDataSource);
        Map<String, Dao> groupDao = new HashMap<>();
        Dao dao = dbe.getDao();
        boolean success = false;
        try {
            Connection connection = dao.getConnection();
            connection.setAutoCommit(false);
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
            insertMasterTableData(dao, historyEntities);
            insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
            connection.commit();
            batchCommit(groupDao);
            success = true;
            SpringMVCContextHolder.getSystemLogger().info("回写机号:" + serialNumber + "成功");
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error("回写机号:" + serialNumber + "失败");
            SpringMVCContextHolder.getSystemLogger().error(e);
            throw new BaseException(ErrorCode.INSERT_DATA_FAIL);
        } finally {
            try {
                if (!success && !dao.getConnection().getAutoCommit()) {
                    dao.getConnection().rollback();
                }
                dao.closeConnection();
                for (Dao value : groupDao.values()) {
                    try (Connection connection = value.getConnection()) {
                        if (!connection.getAutoCommit()) {
                            connection.rollback();
                        }
                    }
                    value.closeConnection();
                }
            } catch (Exception e) {
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
    public void batchCommit(Map<String, Dao> groupDao) throws SQLException {
        for (Dao value : groupDao.values()) {
            value.getConnection().commit();
        }
    }
    public void batchCommit(Map<String, Dao> groupDao) throws SQLException {
        for (Dao value : groupDao.values()) {
            value.getConnection().commit();
        }
    }
    /**
     * 插入数据到主库
     *
     * @param dao
     * @param historyEntities
     */
    public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
            if (DataTableEntity.isEmpty(masterDataTable)) {
                continue;
            }
            Object[] objects = masterDataTable.getData().stream().map(item -> {
                String primaryValue = item.getString(historyEntity.getPrimaryField());
                //判断是否为数值且包含小数点 小数点后面是否全是0
                if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                        indexOf(".") + 1).matches("^0*$")) {
                    //返回整数字符串
                    return primaryValue.substring(0, primaryValue.indexOf("."));
                }
                return primaryValue;
            }).toArray();
            //查询主库数据是否存在
            DataTableEntity list = dao.getList(historyEntity.getTableName(),
                    BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), objects.length, true),
                    new String[]{historyEntity.getPrimaryField()}, objects);
            List<String> existsPrimaryValues = null;
            if (!DataTableEntity.isEmpty(list)) {
                existsPrimaryValues = list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
            }
            for (int i = 0; i < masterDataTable.getRows(); i++) {
                String primaryValue = masterDataTable.getString(i, historyEntity.getPrimaryField());
                if (existsPrimaryValues != null && existsPrimaryValues.contains(primaryValue)) {
                    //数据存在跳过该数据
                    continue;
                }
                FieldSetEntity fieldSetEntity = masterDataTable.getFieldSetEntity(i);
                fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                fieldSetEntity.remove("~table_name~");
                dao.add(fieldSetEntity);
            }
        }
    }
    /**
     * 插入数据到主库
     *
     * @param dao
     * @param historyEntities
     */
    public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
            if (DataTableEntity.isEmpty(masterDataTable)) {
                continue;
            }
            Object[] objects = masterDataTable.getData().stream().map(item -> {
                String primaryValue = item.getString(historyEntity.getPrimaryField());
                //判断是否为数值且包含小数点 小数点后面是否全是0
                if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                        indexOf(".") + 1).matches("^0*$")) {
                    //返回整数字符串
                    return primaryValue.substring(0, primaryValue.indexOf("."));
                }
                return primaryValue;
            }).toArray();
            //查询主库数据是否存在
            DataTableEntity list = dao.getList(historyEntity.getTableName(),
                    BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), objects.length, true),
                    new String[]{historyEntity.getPrimaryField()}, objects);
            List<String> existsPrimaryValues = null;
            if (!DataTableEntity.isEmpty(list)) {
                existsPrimaryValues = list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
            }
            for (int i = 0; i < masterDataTable.getRows(); i++) {
                String primaryValue = masterDataTable.getString(i, historyEntity.getPrimaryField());
                if (existsPrimaryValues != null && existsPrimaryValues.contains(primaryValue)) {
                    //数据存在跳过该数据
                    continue;
                }
                FieldSetEntity fieldSetEntity = masterDataTable.getFieldSetEntity(i);
                fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                fieldSetEntity.remove("~table_name~");
                dao.add(fieldSetEntity);
            }
        }
    }
    /**
     * 插入子库表数据
     *
     * @param groupDao           子库dao
     * @param groupByCollectId   采集配置按采集id分组
     * @param groupBySourceTable 采集配置按表名分组
     * @param historyEntities    历史数据
     */
    public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                                   Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
            if (groupData == null || groupData.isEmpty()) {
                continue;
            }
            for (Map.Entry<String, List<FieldSetEntity>> entry : groupData.entrySet()) {
                List<Dao> daoList = new ArrayList<>();
                if ("ch-kt".equals(entry.getKey())) {
                    List<FieldSetEntity> fieldSetEntityList = groupBySourceTable.get(historyEntity.getTableName().toLowerCase());
                    Set<String> dataSourceSet = fieldSetEntityList.stream().map(item -> item.getString("data_source")).collect(Collectors.toSet());
                    for (String sourceUuid : dataSourceSet) {
                        daoList.add(getDao(groupDao, sourceUuid));
                    }
                } else {
                    daoList.add(getDao(groupDao, groupByCollectId.get(entry.getKey()).get(0).getString("data_source")));
                }
                for (Dao dao : daoList) {
                    List<FieldSetEntity> value = entry.getValue();
                    //查询已存在的数据
                    DataTableEntity list = dao.getList(historyEntity.getTableName(),
                            BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), value.size(), true),
                            value.stream().map(item -> {
                                String primaryValue = item.getString(historyEntity.getPrimaryField());
                                //判断是否为数值且包含小数点 小数点后面是否全是0
                                if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                                        indexOf(".") + 1).matches("^0*$")) {
                                    //返回整数字符串
                                    return primaryValue.substring(0, primaryValue.indexOf("."));
                                }
                                return primaryValue;
                            }).toArray());
                    List<String> existIds = DataTableEntity.isEmpty(list) ? null : list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
                    for (FieldSetEntity fieldSetEntity : value) {
                        String primaryValue = fieldSetEntity.getString(historyEntity.getPrimaryField());
                        if (existIds != null && existIds.contains(NumberUtil.parseNumber(primaryValue).toString())) {
                            continue;
                        }
                        fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                        fieldSetEntity.remove("~table_name~");
                        dao.add(fieldSetEntity);
                    }
                }
            }
        }
    }
    /**
     * 插入子库表数据
     *
     * @param groupDao           子库dao
     * @param groupByCollectId   采集配置按采集id分组
     * @param groupBySourceTable 采集配置按表名分组
     * @param historyEntities    历史数据
     */
    public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                                   Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
            if (groupData == null || groupData.isEmpty()) {
                continue;
            }
            for (Map.Entry<String, List<FieldSetEntity>> entry : groupData.entrySet()) {
                List<Dao> daoList = new ArrayList<>();
                if ("ch-kt".equals(entry.getKey())) {
                    List<FieldSetEntity> fieldSetEntityList = groupBySourceTable.get(historyEntity.getTableName().toLowerCase());
                    Set<String> dataSourceSet = fieldSetEntityList.stream().map(item -> item.getString("data_source")).collect(Collectors.toSet());
                    for (String sourceUuid : dataSourceSet) {
                        daoList.add(getDao(groupDao, sourceUuid));
                    }
                } else {
                    daoList.add(getDao(groupDao, groupByCollectId.get(entry.getKey()).get(0).getString("data_source")));
                }
                for (Dao dao : daoList) {
                    List<FieldSetEntity> value = entry.getValue();
                    //查询已存在的数据
                    DataTableEntity list = dao.getList(historyEntity.getTableName(),
                            BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), value.size(), true),
                            value.stream().map(item -> {
                                String primaryValue = item.getString(historyEntity.getPrimaryField());
                                //判断是否为数值且包含小数点 小数点后面是否全是0
                                if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
                                        indexOf(".") + 1).matches("^0*$")) {
                                    //返回整数字符串
                                    return primaryValue.substring(0, primaryValue.indexOf("."));
                                }
                                return primaryValue;
                            }).toArray());
                    List<String> existIds = DataTableEntity.isEmpty(list) ? null : list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
                    for (FieldSetEntity fieldSetEntity : value) {
                        String primaryValue = fieldSetEntity.getString(historyEntity.getPrimaryField());
                        if (existIds != null && existIds.contains(NumberUtil.parseNumber(primaryValue).toString())) {
                            continue;
                        }
                        fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
                        fieldSetEntity.remove("~table_name~");
                        dao.add(fieldSetEntity);
                    }
                }
            }
        }
    }
    public Dao getDao(Map<String, Dao> groupDao, String sourceUuid) throws Exception {
        Dao dao = groupDao.get(sourceUuid);
        if (null == dao) {
            DataBaseEntity dbe = new DataBaseEntity(sourceUuid);
            dao = dbe.getDao();
            dao.getConnection().setAutoCommit(false);
            groupDao.put(sourceUuid, dao);
        }
        return dao;
    }
    public Dao getDao(Map<String, Dao> groupDao, String sourceUuid) throws Exception {
        Dao dao = groupDao.get(sourceUuid);
        if (null == dao) {
            DataBaseEntity dbe = new DataBaseEntity(sourceUuid);
            dao = dbe.getDao();
            dao.getConnection().setAutoCommit(false);
            groupDao.put(sourceUuid, dao);
        }
        return dao;
    }
    public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
        if (DataTableEntity.isEmpty(dt)) {
            return null;
        }
        HistoryEntity historyEntity = new HistoryEntity();
        historyEntity.setMoNumberField("mo_number");
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("wip_id");
            historyEntity.setTimeField("update_date");
        } else if (CmnConst.T_WIP_PRODUCT_KEYP.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("pk_id");
            historyEntity.setTimeField("pk_loadtime");
    public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
        if (DataTableEntity.isEmpty(dt)) {
            return null;
        }
        HistoryEntity historyEntity = new HistoryEntity();
        historyEntity.setMoNumberField("mo_number");
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("wip_id");
            historyEntity.setTimeField("update_date");
        } else if (CmnConst.T_WIP_PRODUCT_KEYP.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("pk_id");
            historyEntity.setTimeField("pk_loadtime");
            historyEntity.setMoNumberField("pk_mo");
        } else if (CmnConst.T_WIP_DETAIL.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("wip_detail_id");
            historyEntity.setTimeField("update_date");
        } else if (CmnConst.T_PM_PRODUCT_SN.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("row_id");
            historyEntity.setTimeField("update_date");
        } else {
            return null;
        }
        historyEntity.setTableName(targetTableName.toUpperCase());
        Date now = new Date();
        for (int i = 0; i < dt.getRows(); i++) {
            Map<Object, Object> values = dt.getFieldSetEntity(i).getValues();
            //遍历map中的value是否为数字,如果是数字判断小数点后面是否有值,如果没有值则转换为整数
            for (Map.Entry<Object, Object> entry : values.entrySet()) {
                if (entry.getValue() instanceof Number) {
                    Number number = (Number) entry.getValue();
                    if (number.doubleValue() == number.intValue()) {
                        entry.setValue(number.intValue());
                    }
                }
            }
        }
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName) && dt.getRows() > 1) {
            List<FieldSetEntity> data = dt.getData();
            FieldSetEntity newData = null;
            for (int i = 0; i < data.size(); i++) {
                FieldSetEntity fieldSetEntity = data.get(i);
                if ("ch-kt".equals(fieldSetEntity.getValue("pre_master_key"))) {
                    data.remove(i);
                    dt.removeFieldSetEntity(i);
                    break;
                } else if (newData == null) {
                    newData = fieldSetEntity;
                    continue;
                }
            historyEntity.setMoNumberField("pk_mo");
        } else if (CmnConst.T_WIP_DETAIL.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("wip_detail_id");
            historyEntity.setTimeField("update_date");
        } else if (CmnConst.T_PM_PRODUCT_SN.equalsIgnoreCase(targetTableName)) {
            historyEntity.setPrimaryField("row_id");
            historyEntity.setTimeField("update_date");
        } else {
            return null;
        }
        historyEntity.setTableName(targetTableName.toUpperCase());
        Date now = new Date();
        for (int i = 0; i < dt.getRows(); i++) {
            Map<Object, Object> values = dt.getFieldSetEntity(i).getValues();
            //遍历map中的value是否为数字,如果是数字判断小数点后面是否有值,如果没有值则转换为整数
            for (Map.Entry<Object, Object> entry : values.entrySet()) {
                if (entry.getValue() instanceof Number) {
                    Number number = (Number) entry.getValue();
                    if (number.doubleValue() == number.intValue()) {
                        entry.setValue(number.intValue());
                    }
                }
            }
        }
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName) && dt.getRows() > 1) {
            List<FieldSetEntity> data = dt.getData();
            FieldSetEntity newData = null;
            for (int i = 0; i < data.size(); i++) {
                FieldSetEntity fieldSetEntity = data.get(i);
                if ("ch-kt".equals(fieldSetEntity.getValue("pre_master_key"))) {
                    data.remove(i);
                    dt.removeFieldSetEntity(i);
                    break;
                } else if (newData == null) {
                    newData = fieldSetEntity;
                    continue;
                }
                Date date = fieldSetEntity.getDate(historyEntity.getTimeField());
                if (date.getTime() > newData.getDate(historyEntity.getTimeField()).getTime()) {
                    newData = fieldSetEntity;
                }
            }
            if (newData != null) {
                dt = new DataTableEntity();
                dt.addFieldSetEntity(newData);
            }
            //在list中取出时间最近的数据
            Optional<FieldSetEntity> max = data.stream().max(Comparator.comparing((a) -> a.getDate(historyEntity.getTimeField()).getTime()));
                Date date = fieldSetEntity.getDate(historyEntity.getTimeField());
                if (date.getTime() > newData.getDate(historyEntity.getTimeField()).getTime()) {
                    newData = fieldSetEntity;
                }
            }
            if (newData != null) {
                dt = new DataTableEntity();
                dt.addFieldSetEntity(newData);
            }
        }
        DataTableEntity subData = dt.clones();
        DataTableEntity masterData = dt.clones();
        for (int i = 0; i < subData.getRows(); i++) {
            FieldSetEntity fs = subData.getFieldSetEntity(i);
            String preMasterKey = fs.getString("pre_master_key");
            if (historyEntity.getPrimaryField().equals(preMasterKey)) {
                preMasterKey = null;
        }
        DataTableEntity subData = dt.clones();
        DataTableEntity masterData = dt.clones();
        for (int i = 0; i < subData.getRows(); i++) {
            FieldSetEntity fs = subData.getFieldSetEntity(i);
            String preMasterKey = fs.getString("pre_master_key");
            if (historyEntity.getPrimaryField().equals(preMasterKey)) {
                preMasterKey = null;
            }
            if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) {
                preMasterKey = fs.getString(historyEntity.getPrimaryField());
            }
            fs.setValue(historyEntity.getPrimaryField(), preMasterKey);
            fs.setValue("pre_master_key", preMasterKey);
            masterData.setFieldValue(i, historyEntity.getTimeField(), now);
            fs.setValue(historyEntity.getTimeField(), now);
            fs.remove("~table_name~");
            masterData.getFieldSetEntity(i).remove("~table_name~");
            masterData.setFieldValue(i, "pre_master_key", null);
            if (StringUtils.isEmpty(historyEntity.getPrimaryField())) {
                subData.removeFieldSetEntity(i);
                masterData.removeFieldSetEntity(i);
                i--;
            }
        }
        historyEntity.setArchivedDataTable(dt);
        historyEntity.setMasterDataTable(masterData);
        historyEntity.setSubDataTable(subData);
        return historyEntity;
    }
            }
            if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) {
                preMasterKey = fs.getString(historyEntity.getPrimaryField());
                masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
            }
            fs.setValue(historyEntity.getPrimaryField(), preMasterKey);
            fs.setValue("pre_master_key", null);
            masterData.setFieldValue(i, historyEntity.getTimeField(), now);
            fs.setValue(historyEntity.getTimeField(), now);
            fs.remove("~table_name~");
            masterData.getFieldSetEntity(i).remove("~table_name~");
            if (StringUtils.isEmpty(preMasterKey)) {
                masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
            }
            if (StringUtils.isEmpty(historyEntity.getPrimaryField())) {
                subData.removeFieldSetEntity(i);
                masterData.removeFieldSetEntity(i);
                i--;
            }
        }
        historyEntity.setArchivedDataTable(dt);
        historyEntity.setMasterDataTable(masterData);
        historyEntity.setSubDataTable(subData);
        return historyEntity;
    }
    public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
            serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
        if (CollectionUtil.isEmpty(tableSet)) {
            throw new BaseException(errorCodes[0]);
        }
        String[] tableArray = tableSet.toArray(new String[]{});
        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
        //多线程查询单张表,等待所有线程查询完毕
        for (String tableName : tableArray) {
            objectCompletionService.submit(() -> dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber}));
        }
        DataTableEntity data = new DataTableEntity();
        for (int i = 0; i < tableArray.length; i++) {
            DataTableEntity dataTableEntity = objectCompletionService.take().get();
            BaseUtil.dataTableMerge(data, dataTableEntity);
        }
        return data;
    }
    public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
            serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
        if (CollectionUtil.isEmpty(tableSet)) {
            throw new BaseException(errorCodes[0]);
        }
        String[] tableArray = tableSet.toArray(new String[]{});
        StringBuilder sql = new StringBuilder();
        for (String tableName : tableArray) {
            if (sql.length() > 0) {
                sql.append(" union all ");
            }
            sql.append("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?");
        }
        DataTableEntity data = dao.getList(sql.toString(), Arrays.stream(tableArray).map((t) -> serialNumber).toArray());
//        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
//        //多线程查询单张表,等待所有线程查询完毕
//        for (String tableName : tableArray) {
//            objectCompletionService.submit(() -> {
//                DataTableEntity list = dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber});
//                return list;
//            });
//        }
//        DataTableEntity data = new DataTableEntity();
//        for (int i = 0; i < tableArray.length; i++) {
//            DataTableEntity dataTableEntity = objectCompletionService.take().get();
//            BaseUtil.dataTableMerge(data, dataTableEntity);
//        }
        if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
            throw new BaseException(errorCodes[1]);
        }
        return data;
    }
    /**
     * 初始化制令单
     *
     * @param moNumbers
     * @throws BaseException
     */
    public void initMoBase(String[] moNumbers) throws BaseException {
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
    /**
     * 初始化制令单
     *
     * @param moNumbers
     * @throws BaseException
     */
    public void initMoBase(String[] moNumbers) throws BaseException {
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        String[] targetDataSource = fs.getString("target_data_source").split(",");
        String[] targetDataSource = fs.getString("target_data_source").split(",");
        Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
        StringBuilder errorMsg = new StringBuilder();
        for (Dao dao : targetDao) {
            for (String moNumber : moNumbers) {
                synchronized (moNumber.intern()) {
                    try (Connection connection = dao.getConnection();
                         CallableStatement callableStatement = connection.prepareCall(
                                 "{CALL SMT_T_PM_MO_BASE(?,?,?,?,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,?,null,null" +
                                         ",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null)}")) {
                        callableStatement.setInt(1, 5);
                        callableStatement.registerOutParameter(2, OracleTypes.SMALLINT);
                        callableStatement.registerOutParameter(3, OracleTypes.VARCHAR);
                        callableStatement.setString(4, moNumber);
                        callableStatement.setString(5, "1");
                        callableStatement.execute();
                        //执行后返回的错误码
                        int errorCode = callableStatement.getInt(2);
                        //执行后返回的错误信息
                        String errorText = callableStatement.getString(3);
                        if (errorCode != 0) {
                            //错误的
                            errorMsg.append("制令单号:").append(moNumber);
                            errorMsg.append("\nerrorCode:").append(errorCode);
                            errorMsg.append("\nerrorText:").append(errorText);
                        }
                    } catch (Exception e) {
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        e.printStackTrace();
                        errorMsg.append("制令单号:").append(moNumber);
                        errorMsg.append("\n执行时未知错误:").append(e.getMessage());
                    }
                }
            }
        }
        if (errorMsg.length() > 0) {
            throw new BaseException(ErrorCode.SUB_MO_BASE_INI_FAIL.getValue(), ErrorCode.SUB_MO_BASE_INI_FAIL.getText() + "。\n" + errorMsg);
        }
    }
        Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
        StringBuilder errorMsg = new StringBuilder();
        for (Dao dao : targetDao) {
            for (String moNumber : moNumbers) {
                synchronized (moNumber.intern()) {
                    try (Connection connection = dao.getConnection();
                         CallableStatement callableStatement = connection.prepareCall(
                                 "{CALL SMT_T_PM_MO_BASE(?,?,?,?,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,?,null,null" +
                                         ",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null)}")) {
                        callableStatement.setInt(1, 5);
                        callableStatement.registerOutParameter(2, OracleTypes.SMALLINT);
                        callableStatement.registerOutParameter(3, OracleTypes.VARCHAR);
                        callableStatement.setString(4, moNumber);
                        callableStatement.setString(5, "1");
                        callableStatement.execute();
                        //执行后返回的错误码
                        int errorCode = callableStatement.getInt(2);
                        //执行后返回的错误信息
                        String errorText = callableStatement.getString(3);
                        if (errorCode != 0) {
                            //错误的
                            errorMsg.append("制令单号:").append(moNumber);
                            errorMsg.append("\nerrorCode:").append(errorCode);
                            errorMsg.append("\nerrorText:").append(errorText);
                        }
                    } catch (Exception e) {
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        e.printStackTrace();
                        errorMsg.append("制令单号:").append(moNumber);
                        errorMsg.append("\n执行时未知错误:").append(e.getMessage());
                    }
                }
            }
        }
        if (errorMsg.length() > 0) {
            throw new BaseException(ErrorCode.SUB_MO_BASE_INI_FAIL.getValue(), ErrorCode.SUB_MO_BASE_INI_FAIL.getText() + "。\n" + errorMsg);
        }
    }
    /**
     * @param moNumbers 制令单号,多个逗号分隔
     * @param type      操作类型 1 创建制令单 2 更新制令单中的指定字段
     * @throws BaseException
     */
    public void updateMoBase(String[] moNumbers, int type) throws BaseException {
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        String dataSource = fs.getString("data_source");
        String[] targetDataSource = fs.getString("target_data_source").split(",");
        StringBuilder errorMessage = new StringBuilder();
        DataBaseEntity dbe = new DataBaseEntity(dataSource);
        for (int i = 0; i < moNumbers.length; i++) {
            String moNumber = moNumbers[i];
            try {
                synchronized (moNumber.intern()) {
                    Dao dao = dbe.getDao();
                    FieldSetEntity one = dao.getOne("T_PM_MO_BASE", "MO_NUMBER=?", new Object[]{moNumber});
                    if (FieldSetEntity.isEmpty(one)) {
                        throw new BaseException(ErrorCode.MO_NUMBER_RECORD_SELECT_EMPTY);
                    }
                    one.setTableName("T_PM_MO_BASE");
                    dao.closeConnection();
                    Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
                    StringBuilder errorMsg = new StringBuilder();
                    try {
                        if (1 == type) {
                            createMoBase(errorMsg, one, targetDao);
                        } else if (2 == type) {
                            updateMoBase(errorMsg, one, targetDao);
                        }
                        if (errorMsg.length() > 0) {
                            //有错误
                            throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMsg);
                        }
                    } finally {
                        for (Dao dao1 : targetDao) {
                            dao1.closeConnection();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
                errorMessage.append(e.getMessage()).append("\n");
            }
    /**
     * @param moNumbers 制令单号,多个逗号分隔
     * @param type      操作类型 1 创建制令单 2 更新制令单中的指定字段
     * @throws BaseException
     */
    public void updateMoBase(String[] moNumbers, int type) throws BaseException {
        FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1  limit 1", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
        }
        String dataSource = fs.getString("data_source");
        String[] targetDataSource = fs.getString("target_data_source").split(",");
        StringBuilder errorMessage = new StringBuilder();
        DataBaseEntity dbe = new DataBaseEntity(dataSource);
        for (int i = 0; i < moNumbers.length; i++) {
            String moNumber = moNumbers[i];
            try {
                synchronized (moNumber.intern()) {
                    Dao dao = dbe.getDao();
                    FieldSetEntity one = dao.getOne("T_PM_MO_BASE", "MO_NUMBER=?", new Object[]{moNumber});
                    if (FieldSetEntity.isEmpty(one)) {
                        throw new BaseException(ErrorCode.MO_NUMBER_RECORD_SELECT_EMPTY);
                    }
                    one.setTableName("T_PM_MO_BASE");
                    dao.closeConnection();
                    Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
                    StringBuilder errorMsg = new StringBuilder();
                    try {
                        if (1 == type) {
                            createMoBase(errorMsg, one, targetDao);
                        } else if (2 == type) {
                            updateMoBase(errorMsg, one, targetDao);
                        }
                        if (errorMsg.length() > 0) {
                            //有错误
                            throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMsg);
                        }
                    } finally {
                        for (Dao dao1 : targetDao) {
                            dao1.closeConnection();
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
                errorMessage.append(e.getMessage()).append("\n");
            }
        }
        if (errorMessage.length() > 0) {
            throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMessage);
        }
    }
        }
        if (errorMessage.length() > 0) {
            throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMessage);
        }
    }
    /**
     * 创建制令单
     *
     * @param errorMsg
     * @param moBase
     * @param subDao
     */
    private void createMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
        String moNumber = moBase.getString("mo_number");
        for (int i = 0; i < subDao.length; i++) {
            Dao dao = subDao[i];
            try {
                //检查工单表是否存在
                FieldSetEntity projectBase = dao.getOne("T_PM_PROJECT_BASE", "PROJECT_ID = ? ", new Object[]{moNumber});
                if (FieldSetEntity.isEmpty(projectBase)) {
                    throw new BaseException(ErrorCode.SUB_PROJECT_BASE_CAN_NOT_EMPTY);
                }
                //检查制令单表是否已存在
                FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
                if (!FieldSetEntity.isEmpty(fs)) {
                    //进行更新操作
                    updateMoBase(errorMsg, moBase, new Dao[]{dao});
                    continue;
                }
                //将 T_PM_MO_BASE.PROJECT_ID 更改为子库中对应 T_PM_PROJECT_BASE.PROJECT_BASE_ID
                moBase.setValue("project_id", projectBase.getString("project_base_id"));
    /**
     * 创建制令单
     *
     * @param errorMsg
     * @param moBase
     * @param subDao
     */
    private void createMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
        String moNumber = moBase.getString("mo_number");
        for (int i = 0; i < subDao.length; i++) {
            Dao dao = subDao[i];
            try {
                //检查工单表是否存在
                FieldSetEntity projectBase = dao.getOne("T_PM_PROJECT_BASE", "PROJECT_ID = ? ", new Object[]{moNumber});
                if (FieldSetEntity.isEmpty(projectBase)) {
                    throw new BaseException(ErrorCode.SUB_PROJECT_BASE_CAN_NOT_EMPTY);
                }
                //检查制令单表是否已存在
                FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
                if (!FieldSetEntity.isEmpty(fs)) {
                    //进行更新操作
                    updateMoBase(errorMsg, moBase, new Dao[]{dao});
                    continue;
                }
                //将 T_PM_MO_BASE.PROJECT_ID 更改为子库中对应 T_PM_PROJECT_BASE.PROJECT_BASE_ID
                moBase.setValue("project_id", projectBase.getString("project_base_id"));
//                moBase.setValue("row_id", -1);
                dao.add(moBase);
            } catch (BaseException e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                        append("]").append(e.getMessageInfo());
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            } catch (Exception e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("创建制令单失败,未知的错误");
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
                dao.add(moBase);
            } catch (BaseException e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                        append("]").append(e.getMessageInfo());
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            } catch (Exception e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("创建制令单失败,未知的错误");
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
    /**
     * 更新制令单
     *
     * @param errorMsg
     * @param moBase
     * @param subDao
     */
    private void updateMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
        String moNumber = moBase.getString("mo_number");
        for (int i = 0; i < subDao.length; i++) {
            Dao dao = subDao[i];
            try {
                //检查制令单表是否已存在
                FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
                if (FieldSetEntity.isEmpty(fs)) {
                    //进行创建操作
                    createMoBase(errorMsg, moBase, new Dao[]{dao});
                    continue;
                }
                // 当子库中制令单 input_qty 字段为 0 或者 为 null 时 更新多个字段,否则只更新target_qty 字段
                String[] updateField = StringUtils.isEmpty(fs.getString("input_qty")) || "0".equals(fs.getString("input_qty")) ?
                        new String[]{"target_qty", "owner", "mo_create_date", "mo_schedule_date", "mo_due_date", "areaid", "technicsid", "close_flag", "default_group", "end_group"}
                        : new String[]{"target_qty"};
                StringBuilder sql = new StringBuilder();
                sql.append(" UPDATE T_PM_MO_BASE SET ");
                Object[] params = new Object[updateField.length + 1];
                for (int k = 0; k < updateField.length; k++) {
                    if (k > 0) {
                        sql.append(",");
                    }
                    params[k] = moBase.getObject(updateField[k]);
                    sql.append(updateField[k].toUpperCase()).append(" = ? ");
                }
                params[updateField.length] = moNumber;
                sql.append(" WHERE MO_NUMBER = ?  ");
                dao.executeSql(sql.toString(), params);
            } catch (BaseException e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                        append("]").append(e.getMessageInfo());
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            } catch (Exception e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("更新制令单失败,未知的错误");
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
    /**
     * 更新制令单
     *
     * @param errorMsg
     * @param moBase
     * @param subDao
     */
    private void updateMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
        String moNumber = moBase.getString("mo_number");
        for (int i = 0; i < subDao.length; i++) {
            Dao dao = subDao[i];
            try {
                //检查制令单表是否已存在
                FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
                if (FieldSetEntity.isEmpty(fs)) {
                    //进行创建操作
                    createMoBase(errorMsg, moBase, new Dao[]{dao});
                    continue;
                }
                // 当子库中制令单 input_qty 字段为 0 或者 为 null 时 更新多个字段,否则只更新target_qty 字段
                String[] updateField = StringUtils.isEmpty(fs.getString("input_qty")) || "0".equals(fs.getString("input_qty")) ?
                        new String[]{"target_qty", "owner", "mo_create_date", "mo_schedule_date", "mo_due_date", "areaid", "technicsid", "close_flag", "default_group", "end_group"}
                        : new String[]{"target_qty"};
                StringBuilder sql = new StringBuilder();
                sql.append(" UPDATE T_PM_MO_BASE SET ");
                Object[] params = new Object[updateField.length + 1];
                for (int k = 0; k < updateField.length; k++) {
                    if (k > 0) {
                        sql.append(",");
                    }
                    params[k] = moBase.getObject(updateField[k]);
                    sql.append(updateField[k].toUpperCase()).append(" = ? ");
                }
                params[updateField.length] = moNumber;
                sql.append(" WHERE MO_NUMBER = ?  ");
                dao.executeSql(sql.toString(), params);
            } catch (BaseException e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
                        append("]").append(e.getMessageInfo());
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            } catch (Exception e) {
                errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("更新制令单失败,未知的错误");
                e.printStackTrace();
                SpringMVCContextHolder.getSystemLogger().error(e);
            }
        }
    }
    private String getIp(Dao dao) {
        return ((OracleDaoImpl) dao).getDataBaseEntity().getIp();
    }
    private String getIp(Dao dao) {
        return ((OracleDaoImpl) dao).getDataBaseEntity().getIp();
    }
    /**
     * 采集配置保存
     *
     * @param request
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveCollectConfig(HttpServletRequest request) throws BaseException {
        //服务名称
        String serverName = request.getHeader("server-name");
        if (StringUtils.isEmpty(serverName)) {
            throw new BaseException(ErrorCode.SERVER_NAME_CAN_NOT_EMPTY);
        }
        FieldSetEntity fse = BaseUtil.getFieldSetEntity(request, CmnConst.PRODUCT_SYS_DATA_COLLECT);
        //采集来源
        String sourceInfo = fse.getString("id");
        if (StringUtils.isEmpty(sourceInfo) || StringUtils.contains(sourceInfo, dataSystemName)) {
            throw new BaseException(ErrorCode.COLLECT_SOURCE_VALUE);
        }
        commonSave(fse);
    }
    /**
     * 采集配置保存
     *
     * @param request
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveCollectConfig(HttpServletRequest request) throws BaseException {
        //服务名称
        String serverName = request.getHeader("server-name");
        if (StringUtils.isEmpty(serverName)) {
            throw new BaseException(ErrorCode.SERVER_NAME_CAN_NOT_EMPTY);
        }
        FieldSetEntity fse = BaseUtil.getFieldSetEntity(request, CmnConst.PRODUCT_SYS_DATA_COLLECT);
        //采集来源
        String sourceInfo = fse.getString("id");
        if (StringUtils.isEmpty(sourceInfo) || StringUtils.contains(sourceInfo, dataSystemName)) {
            throw new BaseException(ErrorCode.COLLECT_SOURCE_VALUE);
        }
        commonSave(fse);
    }
    /**
     * 提取配置保存
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveExtractConfig(FieldSetEntity fse) throws BaseException {
        String dataSource = fse.getString(CmnConst.TABLE_SYNC_MANAGER);
        if (!StringUtils.isEmpty(dataSource)) {
            FieldSetEntity fs = JsonUtil.pareseJsonToFieldSetEntity(dataSource);
            if (!FieldSetEntity.isEmpty(fs)) {
                commonSave(fs);
            }
        }
        commonSave(fse);
        getCommonService().saveDelRecordConfig(2, fse.getUUID());
    }
    /**
     * 提取配置保存
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveExtractConfig(FieldSetEntity fse) throws BaseException {
        String dataSource = fse.getString(CmnConst.TABLE_SYNC_MANAGER);
        if (!StringUtils.isEmpty(dataSource)) {
            FieldSetEntity fs = JsonUtil.pareseJsonToFieldSetEntity(dataSource);
            if (!FieldSetEntity.isEmpty(fs)) {
                commonSave(fs);
            }
        }
        commonSave(fse);
        getCommonService().saveDelRecordConfig(2, fse.getUUID());
    }
    /**
     * 归档配置保存
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveArchiveConfig(FieldSetEntity fse) throws BaseException {
        commonSave(fse);
    }
    /**
     * 归档配置保存
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    @Transactional
    public void saveArchiveConfig(FieldSetEntity fse) throws BaseException {
        commonSave(fse);
    }
    /**
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveCollectLog(FieldSetEntity fse) throws BaseException {
        this.commonSave(fse);
    }
    /**
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveCollectLog(FieldSetEntity fse) throws BaseException {
        this.commonSave(fse);
    }
    /**
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveExtractLog(FieldSetEntity fse) throws BaseException {
        this.commonSave(fse);
    }
    /**
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveExtractLog(FieldSetEntity fse) throws BaseException {
        this.commonSave(fse);
    }
    /**
     * 数据源保存会调用该方法
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException {
        //子服务保存数据库连接配置后调用该方法,传输数据到主服务
        //TODO
    }
    /**
     * 数据源保存会调用该方法
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException {
        //子服务保存数据库连接配置后调用该方法,传输数据到主服务
        //TODO
    }
    /**
     * 定时任务生成日志会调用该方法
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveTimeLog(FieldSetEntity fse) throws BaseException {
        //主服务提取日志保存后调用该方法传输到子服务
        //TODO
    /**
     * 定时任务生成日志会调用该方法
     *
     * @param fse
     * @throws BaseException
     */
    @Override
    public void saveTimeLog(FieldSetEntity fse) throws BaseException {
        //主服务提取日志保存后调用该方法传输到子服务
        //TODO
    }
    }
    private void commonSave(FieldSetEntity fse) throws BaseException {
        String uuid = fse.getUUID();
        FieldSetEntity fs = getBaseDao().getFieldSetEntity(fse.getTableName(), new String[]{CmnConst.UUID}, uuid, false);
        Map<String, DataTableEntity> subData = fse.getSubData();
        if (!CollectionUtil.isEmpty(subData)) {
            DataTableEntity addDt = new DataTableEntity();
            DataTableEntity updateDt = new DataTableEntity();
            for (Map.Entry<String, DataTableEntity> entry : subData.entrySet()) {
                String tableName = entry.getKey();
                DataTableEntity value = entry.getValue();
                if (DataTableEntity.isEmpty(value)) {
                    continue;
                }
                Object[] uuids = value.getUuids();
                DataTableEntity dt = getBaseDao().listTable(tableName, BaseUtil.buildQuestionMarkFilter(CmnConst.UUID, uuids.length, true), uuids, new String[]{CmnConst.UUID});
                for (int i = 0; i < value.getRows(); i++) {
                    uuid = value.getString(i, CmnConst.UUID);
                    List<FieldSetEntity> fieldSetEntity = dt.getFieldSetEntity(uuid);
                    if (CollectionUtil.isEmpty(fieldSetEntity)) {
                        addDt.addFieldSetEntity(value.getFieldSetEntity(i));
                        value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add");
                    } else {
                        addDt.addFieldSetEntity(updateDt.getFieldSetEntity(i));
                        value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "update");
                    }
                }
                if (!DataTableEntity.isEmpty(addDt)) {
                    getBaseDao().add(addDt);
                }
                if (!DataTableEntity.isEmpty(updateDt)) {
                    getBaseDao().update(updateDt);
                }
            }
        }
        if (FieldSetEntity.isEmpty(fs)) {
            getBaseDao().add(fse, false);
        } else {
            getBaseDao().update(fse, false);
        }
    }
    private void commonSave(FieldSetEntity fse) throws BaseException {
        String uuid = fse.getUUID();
        FieldSetEntity fs = getBaseDao().getFieldSetEntity(fse.getTableName(), new String[]{CmnConst.UUID}, uuid, false);
        Map<String, DataTableEntity> subData = fse.getSubData();
        if (!CollectionUtil.isEmpty(subData)) {
            DataTableEntity addDt = new DataTableEntity();
            DataTableEntity updateDt = new DataTableEntity();
            for (Map.Entry<String, DataTableEntity> entry : subData.entrySet()) {
                String tableName = entry.getKey();
                DataTableEntity value = entry.getValue();
                if (DataTableEntity.isEmpty(value)) {
                    continue;
                }
                Object[] uuids = value.getUuids();
                DataTableEntity dt = getBaseDao().listTable(tableName, BaseUtil.buildQuestionMarkFilter(CmnConst.UUID, uuids.length, true), uuids, new String[]{CmnConst.UUID});
                for (int i = 0; i < value.getRows(); i++) {
                    uuid = value.getString(i, CmnConst.UUID);
                    List<FieldSetEntity> fieldSetEntity = dt.getFieldSetEntity(uuid);
                    if (CollectionUtil.isEmpty(fieldSetEntity)) {
                        addDt.addFieldSetEntity(value.getFieldSetEntity(i));
                        value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add");
                    } else {
                        addDt.addFieldSetEntity(updateDt.getFieldSetEntity(i));
                        value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "update");
                    }
                }
                if (!DataTableEntity.isEmpty(addDt)) {
                    getBaseDao().add(addDt);
                }
                if (!DataTableEntity.isEmpty(updateDt)) {
                    getBaseDao().update(updateDt);
                }
            }
        }
        if (FieldSetEntity.isEmpty(fs)) {
            getBaseDao().add(fse, false);
        } else {
            getBaseDao().update(fse, false);
        }
    }
    /**
     * 调用远程主服务器采集保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public FieldSetEntity remoteSaveCollectConfig(FieldSetEntity fse) throws BaseException {
        if ("ch-kt".equals(dataSystemName)) {
            return fse;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
        }
        //服务域名的端口
        String ipPort = fs.getString("server_url");
        String serverName = fs.getString("server_name");
        String serverUrl = ipPort + CmnConst.SAVE_COLLECT_URL;
        FieldSetEntity res = doPost(serverUrl, serverName, fse);
        return res;
    }
    /**
     * 调用远程主服务器采集保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public FieldSetEntity remoteSaveCollectConfig(FieldSetEntity fse) throws BaseException {
        if ("ch-kt".equals(dataSystemName)) {
            return fse;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
        }
        //服务域名的端口
        String ipPort = fs.getString("server_url");
        String serverName = fs.getString("server_name");
        String serverUrl = ipPort + CmnConst.SAVE_COLLECT_URL;
        FieldSetEntity res = doPost(serverUrl, serverName, fse);
        return res;
    }
    /**
     * 主服务提取保存日志后调用该方法传入到子服务保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveExtractLog(FieldSetEntity fse) throws BaseException {
        try {
            if ("ch-kt".equals(dataSystemName)) {
                //主服务采集日志uuid
                String preStepUuid = fse.getString(CmnConst.PRE_STEP_UUID);
                StringBuilder sql = new StringBuilder();
                sql.append("\nSELECT ");
                sql.append("\nserver.* ");
                sql.append("\nFROM ").append(CmnConst.PRODUCT_SYS_DATA_COLLECT).append(" collect ");
                sql.append("\nJOIN ").append(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG).append(" log ");
                sql.append("\nON log.config_uuid=collect.uuid and log.type=1 ");
                sql.append("\nJOIN ").append(CmnConst.PRODUCT_MES_SERVER).append(" server ");
                sql.append("\nON collect.id like concat(server.server_name,'%') ");
                sql.append("\nWHERE log.uuid=? and collect.id not like concat(?,'%')  limit 1");
                FieldSetEntity fs = getBaseDao().getFieldSetBySQL(sql.toString(), new Object[]{preStepUuid, dataSystemName}, false);
                if (FieldSetEntity.isEmpty(fs)) {
                    return;
                }
                String ipPort = fs.getString("server_url");
                String serverName = fs.getString("server_name");
                String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_LOG_URL;
                doPostAsync(serverUrl, serverName, fse);
            }
        } catch (Exception e) {
            //捕获异常为了使采集定时任务正常运行
            SpringMVCContextHolder.getSystemLogger().error(e);
            e.printStackTrace();
        }
    }
    /**
     * 主服务提取保存日志后调用该方法传入到子服务保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveExtractLog(FieldSetEntity fse) throws BaseException {
        try {
            if ("ch-kt".equals(dataSystemName)) {
                //主服务采集日志uuid
                String preStepUuid = fse.getString(CmnConst.PRE_STEP_UUID);
                StringBuilder sql = new StringBuilder();
                sql.append("\nSELECT ");
                sql.append("\nserver.* ");
                sql.append("\nFROM ").append(CmnConst.PRODUCT_SYS_DATA_COLLECT).append(" collect ");
                sql.append("\nJOIN ").append(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG).append(" log ");
                sql.append("\nON log.config_uuid=collect.uuid and log.type=1 ");
                sql.append("\nJOIN ").append(CmnConst.PRODUCT_MES_SERVER).append(" server ");
                sql.append("\nON collect.id like concat(server.server_name,'%') ");
                sql.append("\nWHERE log.uuid=? and collect.id not like concat(?,'%')  limit 1");
                FieldSetEntity fs = getBaseDao().getFieldSetBySQL(sql.toString(), new Object[]{preStepUuid, dataSystemName}, false);
                if (FieldSetEntity.isEmpty(fs)) {
                    return;
                }
                String ipPort = fs.getString("server_url");
                String serverName = fs.getString("server_name");
                String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_LOG_URL;
                doPostAsync(serverUrl, serverName, fse);
            }
        } catch (Exception e) {
            //捕获异常为了使采集定时任务正常运行
            SpringMVCContextHolder.getSystemLogger().error(e);
            e.printStackTrace();
        }
    }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveExtractConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity dt = getRemoteSubServer();
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        FieldSetEntity extractTargetSource = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, fse.getString("extract_target_source"), false);
        if (!FieldSetEntity.isEmpty(extractTargetSource)) {
            extractTargetSource.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, CmnConst.TABLE_SYNC_MANAGER);
            fse.setValue(CmnConst.TABLE_SYNC_MANAGER, BaseUtil.fieldSetEntityToJson(extractTargetSource));
        }
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fs = dt.getFieldSetEntity(i);
            //服务域名的端口
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_URL;
            doPost(serverUrl, serverName, fse);
        }
    }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveExtractConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity dt = getRemoteSubServer();
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        FieldSetEntity extractTargetSource = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, fse.getString("extract_target_source"), false);
        if (!FieldSetEntity.isEmpty(extractTargetSource)) {
            extractTargetSource.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, CmnConst.TABLE_SYNC_MANAGER);
            fse.setValue(CmnConst.TABLE_SYNC_MANAGER, BaseUtil.fieldSetEntityToJson(extractTargetSource));
        }
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fs = dt.getFieldSetEntity(i);
            //服务域名的端口
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_URL;
            doPost(serverUrl, serverName, fse);
        }
    }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveArchiveConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity dt = getRemoteSubServer();
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fs = dt.getFieldSetEntity(i);
            //服务域名的端口
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_ARCHIVE_URL;
            doPost(serverUrl, serverName, fse);
        }
    }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveArchiveConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity dt = getRemoteSubServer();
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fs = dt.getFieldSetEntity(i);
            //服务域名的端口
            String ipPort = fs.getString("server_url");
            String serverName = fs.getString("server_name");
            String serverUrl = ipPort + CmnConst.SAVE_ARCHIVE_URL;
            doPost(serverUrl, serverName, fse);
        }
    }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveCollectLog(FieldSetEntity fse) throws BaseException {
        if ("ch-kt".equals(dataSystemName) || FieldSetEntity.isEmpty(fse)) {
            return;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
        }
        //服务域名的端口
        String ipPort = fs.getString("server_url");
        String serverName = fs.getString("server_name");
        String serverUrl = ipPort + CmnConst.SAVE_COLLECT_LOG_URL;
        doPost(serverUrl, serverName, fse);
    }
    /**
     * 调用远程主服务器保存
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    public void remoteSaveCollectLog(FieldSetEntity fse) throws BaseException {
        if ("ch-kt".equals(dataSystemName) || FieldSetEntity.isEmpty(fse)) {
            return;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
        if (FieldSetEntity.isEmpty(fs)) {
            throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
        }
        //服务域名的端口
        String ipPort = fs.getString("server_url");
        String serverName = fs.getString("server_name");
        String serverUrl = ipPort + CmnConst.SAVE_COLLECT_LOG_URL;
        doPost(serverUrl, serverName, fse);
    }
    /**
     * 获取子服务配置
     *
     * @return
     * @throws BaseException
     */
    private DataTableEntity getRemoteSubServer() throws BaseException {
        return getBaseDao().listTable(CmnConst.PRODUCT_MES_SERVER, "server_type!=0");
    }
    /**
     * 获取子服务配置
     *
     * @return
     * @throws BaseException
     */
    private DataTableEntity getRemoteSubServer() throws BaseException {
        return getBaseDao().listTable(CmnConst.PRODUCT_MES_SERVER, "server_type!=0");
    }
    //    @Async
    void doPostAsync(String url, String serverName, FieldSetEntity fse) throws BaseException {
        doPost(url, serverName, fse);
    }
    //    @Async
    void doPostAsync(String url, String serverName, FieldSetEntity fse) throws BaseException {
        doPost(url, serverName, fse);
    }
    public String getDataSystemName() {
        return dataSystemName;
    }
    public String getDataSystemName() {
        return dataSystemName;
    }
    public boolean remoteRehandle(FieldSetEntity fse) {
        String type = fse.getString(CmnConst.TYPE);//类型
        String logUuid;
        if ("2".equals(type)) {
            logUuid = fse.getString(CmnConst.PRE_STEP_UUID);
        } else if ("1".equals(type)) {
            logUuid = fse.getUUID();
        } else {
            return false;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_COLLECT, "uuid =(select config_uuid FROM product_sys_data_center_log where uuid=?)", new Object[]{logUuid}, false);
        if (!FieldSetEntity.isEmpty(fs)) {
            //采集id
            String collectId = fs.getString(CmnConst.ID);
            if (collectId.indexOf(dataSystemName) == -1) {
                fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=1 and ? like concat(server_name,'%')", new Object[]{collectId}, false);
                if (!FieldSetEntity.isEmpty(fs)) {
                    //服务域名的端口
                    String ipPort = fs.getString("server_url");
                    String serverName = fs.getString("server_name");
                    String serverUrl = ipPort + CmnConst.REHANDLE_ERROR_URL;
                    doPost(serverUrl, serverName, fse);
                    //标记日志成功
                    JournalManagerService journalManagerService = SpringUtils.getBean(JournalManagerService.class);
                    journalManagerService.writeBackReDealResult(fse, true);
                }
                return true;
            }
        }
        return false;
    }
    public boolean remoteRehandle(FieldSetEntity fse) {
        String type = fse.getString(CmnConst.TYPE);//类型
        String logUuid;
        if ("2".equals(type)) {
            logUuid = fse.getString(CmnConst.PRE_STEP_UUID);
        } else if ("1".equals(type)) {
            logUuid = fse.getUUID();
        } else {
            return false;
        }
        FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_COLLECT, "uuid =(select config_uuid FROM product_sys_data_center_log where uuid=?)", new Object[]{logUuid}, false);
        if (!FieldSetEntity.isEmpty(fs)) {
            //采集id
            String collectId = fs.getString(CmnConst.ID);
            if (collectId.indexOf(dataSystemName) == -1) {
                fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=1 and ? like concat(server_name,'%')", new Object[]{collectId}, false);
                if (!FieldSetEntity.isEmpty(fs)) {
                    //服务域名的端口
                    String ipPort = fs.getString("server_url");
                    String serverName = fs.getString("server_name");
                    String serverUrl = ipPort + CmnConst.REHANDLE_ERROR_URL;
                    doPost(serverUrl, serverName, fse);
                    //标记日志成功
                    JournalManagerService journalManagerService = SpringUtils.getBean(JournalManagerService.class);
                    journalManagerService.writeBackReDealResult(fse, true);
                }
                return true;
            }
        }
        return false;
    }
    /**
     * post 请求
     *
     * @param url
     * @param serverName
     * @param fse
     * @throws BaseException
     */
    private FieldSetEntity doPost(String url, String serverName, FieldSetEntity fse) throws BaseException {
        fse.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, fse.getTableName());
        JSONObject requestBody = BaseUtil.fieldSetEntityToJson(fse);
        String requestData = requestBody.toJSONString();
        //签名
        String signature = SignUtil.getHmacSHA1(requestData, Global.getSystemConfig("signature.key", ""));
        try (HttpResponse response = HttpRequest.post(url)
                .contentType("application/x-www-form-urlencoded")
                .header(CoreConst.SYSTEM_LANGUAGE_CODE_, "zh-CN") //语言编码
                .header(CoreConst.SYSTEM_CLIENT_TYPE_, "Web") //客户端类型
                .header(CoreConst.SYSTEM_CLIENT_VERSION_, "1.0.0") //客户端版本
                .header("server-name", serverName) //系统名称
                .header("signature", signature) //签名
                .body("formData=" + requestData).execute()) {
            if (response.getStatus() == 200) {
                //请求成功
                String res = response.body();
                if (!StringUtils.isEmpty(res)) {
                    JSONObject resBody = JSON.parseObject(res);
                    if (resBody != null) {
                        String code = resBody.getString(CoreConst.API_RETURN_KEY_CODE);
                        if (!CoreConst.API_RETURN_VALUE_CODE_200.equals(code)) {
                            //服务内部抛出的错误
                            throw new BaseException("调用接口失败,", resBody.getString(CoreConst.API_RETURN_KEY_MSG));
                        }
                        String formData = resBody.getString(CoreConst.API_RETURN_KEY_DATA);
                        if (StringUtils.isEmpty(formData)) {
                            return null;
                        }
                        FieldSetEntity result = JsonUtil.pareseJsonToFieldSetEntity(formData);
                        return result;
                    }
                }
            }
            throw new BaseException(ErrorCode.OPEN_API_REQUEST_FAIL);
        }
    /**
     * post 请求
     *
     * @param url
     * @param serverName
     * @param fse
     * @throws BaseException
     */
    private FieldSetEntity doPost(String url, String serverName, FieldSetEntity fse) throws BaseException {
        fse.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, fse.getTableName());
        JSONObject requestBody = BaseUtil.fieldSetEntityToJson(fse);
        String requestData = requestBody.toJSONString();
        //签名
        String signature = SignUtil.getHmacSHA1(requestData, Global.getSystemConfig("signature.key", ""));
        try (HttpResponse response = HttpRequest.post(url)
                .contentType("application/x-www-form-urlencoded")
                .header(CoreConst.SYSTEM_LANGUAGE_CODE_, "zh-CN") //语言编码
                .header(CoreConst.SYSTEM_CLIENT_TYPE_, "Web") //客户端类型
                .header(CoreConst.SYSTEM_CLIENT_VERSION_, "1.0.0") //客户端版本
                .header("server-name", serverName) //系统名称
                .header("signature", signature) //签名
                .body("formData=" + requestData).execute()) {
            if (response.getStatus() == 200) {
                //请求成功
                String res = response.body();
                if (!StringUtils.isEmpty(res)) {
                    JSONObject resBody = JSON.parseObject(res);
                    if (resBody != null) {
                        String code = resBody.getString(CoreConst.API_RETURN_KEY_CODE);
                        if (!CoreConst.API_RETURN_VALUE_CODE_200.equals(code)) {
                            //服务内部抛出的错误
                            throw new BaseException("调用接口失败,", resBody.getString(CoreConst.API_RETURN_KEY_MSG));
                        }
                        String formData = resBody.getString(CoreConst.API_RETURN_KEY_DATA);
                        if (StringUtils.isEmpty(formData)) {
                            return null;
                        }
                        FieldSetEntity result = JsonUtil.pareseJsonToFieldSetEntity(formData);
                        return result;
                    }
                }
            }
            throw new BaseException(ErrorCode.OPEN_API_REQUEST_FAIL);
        }
    }
    }
    /**
     * 获取采集日志
     *
     * @return
     */
    public FieldSetEntity getCollectLog() {
        collectLogCache = null;
        if (collectLogCache == null) {
            loadCollectLogCache();
        }
    /**
     * 获取采集日志
     *
     * @return
     */
    public FieldSetEntity getCollectLog() {
        collectLogCache = null;
        if (collectLogCache == null) {
            loadCollectLogCache();
        }
        return collectLogCache;
    }
        return collectLogCache;
    }
    public void loadCollectLogCache() {
        StringBuilder sql = new StringBuilder();
        sql.append("\n with no_extract as ( ");
        sql.append("\n     SELECT ");
        sql.append("\n         a.*  ");
        sql.append("\n     FROM ");
        sql.append("\n         product_sys_data_center_log A ");
        sql.append("\n         LEFT JOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid  ");
        sql.append("\n     WHERE ");
        sql.append("\n         a.type = 1  ");
        sql.append("\n         and a.result=1 and (a.deal_flag is null or a.deal_flag =0) ");
        sql.append("\n         AND ( ");
        sql.append("\n                     b.pre_step_uuid IS NULL  ");
        sql.append("\n                     OR  ");
        sql.append("\n                     ( ");
        sql.append("\n                         b.pre_step_uuid=a.uuid  ");
        sql.append("\n                         AND  ");
        sql.append("\n                         (b.result=0 and (b.deal_result = 0 or b.deal_flag is null)) ");
        sql.append("\n                     ) ");
        sql.append("\n         ) ");
        sql.append("\n ), ");
        sql.append("\n last_run_log as ( ");
        sql.append("\n select max(created_utc_datetime) last_time,config_uuid from product_sys_data_center_log where type=1 and result=1 and deal_flag=0 GROUP BY config_uuid ");
        sql.append("\n ) ");
        sql.append("\n      ");
        sql.append("\n SELECT ifnull(COUNT(no_extract.uuid),0) unextracted_batch ,ifnull(sum(no_extract.count),0) unextracted_total_count,b.`name`,(select last_time from last_run_log where b.uuid=last_run_log.config_uuid) last_success_time FROM  product_sys_data_collect b  ");
        sql.append("\n     left join no_extract on no_extract.config_uuid=b.uuid ");
        sql.append("\n  GROUP BY b.uuid ");
        sql.append("\n     order by b.`name` ");
    public void loadCollectLogCache() {
        StringBuilder sql = new StringBuilder();
        sql.append("\n with no_extract as ( ");
        sql.append("\n     SELECT ");
        sql.append("\n         a.*  ");
        sql.append("\n     FROM ");
        sql.append("\n         product_sys_data_center_log A ");
        sql.append("\n         LEFT JOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid  ");
        sql.append("\n     WHERE ");
        sql.append("\n         a.type = 1  ");
        sql.append("\n         and a.result=1 and (a.deal_flag is null or a.deal_flag =0) ");
        sql.append("\n         AND ( ");
        sql.append("\n                     b.pre_step_uuid IS NULL  ");
        sql.append("\n                     OR  ");
        sql.append("\n                     ( ");
        sql.append("\n                         b.pre_step_uuid=a.uuid  ");
        sql.append("\n                         AND  ");
        sql.append("\n                         (b.result=0 and (b.deal_result = 0 or b.deal_flag is null)) ");
        sql.append("\n                     ) ");
        sql.append("\n         ) ");
        sql.append("\n ), ");
        sql.append("\n last_run_log as ( ");
        sql.append("\n select max(created_utc_datetime) last_time,config_uuid from product_sys_data_center_log where type=1 and result=1 and deal_flag=0 GROUP BY config_uuid ");
        sql.append("\n ) ");
        sql.append("\n      ");
        sql.append("\n SELECT ifnull(COUNT(no_extract.uuid),0) unextracted_batch ,ifnull(sum(no_extract.count),0) unextracted_total_count,b.`name`,(select last_time from last_run_log where b.uuid=last_run_log.config_uuid) last_success_time FROM  product_sys_data_collect b  ");
        sql.append("\n     left join no_extract on no_extract.config_uuid=b.uuid ");
        sql.append("\n  GROUP BY b.uuid ");
        sql.append("\n     order by b.`name` ");
        FieldSetEntity fse = new FieldSetEntity();
        fse.setTableName("temp");
        fse.setValue("load_log_time", DateTime.now().toString());
        DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), new Object[]{});
        if (!DataTableEntity.isEmpty(dataTableEntity)) {
            fse.setValue("list", BaseUtil.dataTableEntityToJson(dataTableEntity, f -> {
                JSONObject jsonObject = f[0];
                Date lastSuccessTime = jsonObject.getDate("last_success_time");
                if (lastSuccessTime != null) {
                    jsonObject.put("last_success_time", DateUtil.format(lastSuccessTime, "yyyy-MM-dd HH:mm:ss"));
                }
            }));
        } else {
            fse.setValue("list", "[]");
        }
        this.collectLogCache = fse;
    }
        FieldSetEntity fse = new FieldSetEntity();
        fse.setTableName("temp");
        fse.setValue("load_log_time", DateTime.now().toString());
        DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), new Object[]{});
        if (!DataTableEntity.isEmpty(dataTableEntity)) {
            fse.setValue("list", BaseUtil.dataTableEntityToJson(dataTableEntity, f -> {
                JSONObject jsonObject = f[0];
                Date lastSuccessTime = jsonObject.getDate("last_success_time");
                if (lastSuccessTime != null) {
                    jsonObject.put("last_success_time", DateUtil.format(lastSuccessTime, "yyyy-MM-dd HH:mm:ss"));
                }
            }));
        } else {
            fse.setValue("list", "[]");
        }
        this.collectLogCache = fse;
    }
}