许鹏程
2024-05-15 3212d131b28e5f4b09bb1222861370310a00ffc9
product-server-data-center/src/main/java/com/product/data/center/service/CommonService.java
@@ -4,6 +4,7 @@
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.IdUtil;
import com.product.admin.service.PublicService;
import com.product.common.lang.StringUtils;
@@ -27,9 +28,7 @@
import javax.annotation.Resource;
import java.text.DecimalFormat;
import java.time.LocalDate;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -42,170 +41,377 @@
@Service("dataCommonService")
public class CommonService extends AbstractBaseService implements ICommonService {
    @Resource
    SysJobService sysJobService;
   @Resource
   SysJobService sysJobService;
    @Resource
    PublicService publicService;
   @Resource
   PublicService publicService;
    @Resource
    SyncDelRecordService syncDelRecordService;
   @Resource
   SyncDelRecordService syncDelRecordService;
    @Resource
    MesExternalService mesExternalService;
   @Resource
   MesExternalService mesExternalService;
    @Value("${data.system.name}")
    private String dataSystemName;
   @Value("${data.system.name}")
   private String dataSystemName;
    public void deleteCenterLog() {
        try {
            //在mysql的information_schema 表中查询指定表是否存在
            String tableName = "da_product_sys_data_center_log";
            //归档7天前的数据
            //截止日期
            Date endTime = DateUtil.offset(new Date(), DateField.DAY_OF_YEAR, -7);
   public void deleteCenterLogV2() {
      try {
            //格式化日期
            String endTimeStr = DateUtil.format(endTime, "yyyy-MM-dd");
         StringBuilder querySql = new StringBuilder(64);
            //根据截止日期查询表中最大、最小的创建时间根据创建时间进行分表按月分表
            String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) )";
         //select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime,min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y-%m') current_month  from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) group by date_format(created_utc_datetime,'%Y-%m')
         querySql.append(" select min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y%m') current_month  ");
         querySql.append(" from product_sys_data_center_log ");
         querySql.append(" where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) ");
         querySql.append(" group by date_format(created_utc_datetime,'%Y%m') ");
            FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
         DataTableEntity dt = getBaseDao().listTable(querySql.toString(), new Object[]{});
            if (fs == null) {
                return;
            }
            //最小时间
            Date minTime = fs.getDate("minTime");
         if (DataTableEntity.isEmpty(dt)) {
            return;
         }
         //在mysql的information_schema 表中查询指定表是否存在
         String tableName = "da_product_sys_data_center_log";
            //最大时间
            Date maxTime = fs.getDate("maxTime");
         //封装所有表名
         Set<String> allTableName = dt.getData().stream().map(e -> tableName + "_" + e.getString("current_month")).collect(Collectors.toSet());
            //根据最小时间和最大时间获取区间内(包含最小时间和最大时间)的年月日期格式为yyyy-MM
         //在mysql的information_schema 表中查询所有的表
         querySql.setLength(0);
         querySql.append(" select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", allTableName.size(), true));
         String dataBaseName = getBaseDao().getDataBaseName();
         Object[] params = new Object[allTableName.size() + 1];
         params[0] = dataBaseName;
         int index = 1;
         for (String tableNameStr : allTableName) {
            params[index] = tableNameStr;
            index++;
         }
         DataTableEntity dataTableEntity = getBaseDao().listTable(querySql.toString(), params);
            List<DateTime> tableSuffixList = DateUtil.rangeToList(minTime, maxTime, DateField.MONTH);
         Set<String> existsTable = dataTableEntity.getData().stream().map(e -> e.getString("table_name")).collect(Collectors.toSet());
            //查询数据库中是否存在表
         //对比表名,创建不存在的表
         allTableName.removeAll(existsTable);
         for (String tableNameStr : allTableName) {
            String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
            getBaseDao().executeUpdate(createTableSql, new Object[]{});
         }
            Set<String> tableNameSet = tableSuffixList.stream().map(e -> tableName + "_" + DateUtil.format(e, "yyyyMM")).collect(Collectors.toSet());
         querySql.setLength(0);
         //查询data_center_log表的uuid字段 created_utc_datetime最新的数据根据 config_uuid 和 type进行分组 且 result = 1 or ( deal_flag = 1 AND deal_result = 1 ) 每组只取最新的一条数据
         querySql.append(" SELECT uuid,row_num,(select uuid from product_sys_data_center_log where pre_step_uuid = uuid and type=2) as pre_step_uuid ");
         querySql.append(" FROM ( ");
         querySql.append("      SELECT ");
         querySql.append("      uuid, ");
         querySql.append("      ROW_NUMBER() OVER (PARTITION BY config_uuid, type ORDER BY created_utc_datetime DESC) AS row_num ");
         querySql.append("         FROM product_sys_data_center_log ");
         querySql.append("         WHERE result = 1 OR (deal_flag = 1 AND deal_result = 1) ");
         querySql.append("   ) AS subquery ");
         querySql.append("   WHERE row_num = 1  ");
            //查询数据库中存在的表
            String dataBaseName = getBaseDao().getDataBaseName();
            sql = "select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", tableNameSet.size(), true);
            Object[] params = new Object[tableNameSet.size() + 1];
            params[0] = dataBaseName;
            int index = 1;
            for (String tableNameStr : tableNameSet) {
                params[index] = tableNameStr;
                index++;
            }
            DataTableEntity dataTableEntity = getBaseDao().listTable(sql, params);
         //查询需要保留的数据uuid
         DataTableEntity retainData = getBaseDao().listTable(querySql.toString(), new Object[]{});
         String[] retainUuids = DataTableEntity.isEmpty(retainData) ? null : new String[retainData.getRows() * 2];
         if (!DataTableEntity.isEmpty(retainData)) {
            for (int i = 0; i < retainData.getRows(); i++) {
               retainUuids[i] = retainData.getString(i, "uuid");
               retainUuids[i + retainData.getRows()] = retainData.getString(i, "pre_step_uuid");
            }
         }
            if (!DataTableEntity.isEmpty(dataTableEntity)) {
                for (int i = 0; i < dataTableEntity.getRows(); i++) {
                    String tableNameStr = dataTableEntity.getString(i, "table_name");
                    tableNameSet.remove(tableNameStr);
                }
            }
            //创建不存在的表,表结构与原表一致
            for (String tableNameStr : tableNameSet) {
                String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
                getBaseDao().executeUpdate(createTableSql, new Object[]{});
            }
            //查询各类型配置最后一次成功的数据
            String sqlFilter = "SELECT * FROM (SELECT uuid\n" +
                    "FROM product_sys_data_center_log\n" +
                    "WHERE uuid IN (\n" +
                    "\tSELECT uuid\n" +
                    "\tFROM product_sys_data_center_log\n" +
                    "\tWHERE id IN (\n" +
                    "\t\tSELECT max(a.id)\n" +
                    "\t\tFROM product_sys_data_center_log a\n" +
                    "\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
                    "\t\tWHERE a.type = 1\n" +
                    "\t\t\tAND b.type = 2\n" +
                    "\t\t\tAND (a.result = 1\n" +
                    "\t\t\t\tOR (a.deal_flag = 1\n" +
                    "\t\t\t\t\tAND a.deal_result = 1))\n" +
                    "\t\t\tAND (b.result = 1\n" +
                    "\t\t\t\tOR (b.deal_flag = 1\n" +
                    "\t\t\t\t\tAND b.deal_result = 1))\n" +
                    "\t\tGROUP BY a.config_uuid\n" +
                    "\t)\n" +
                    "\tUNION ALL\n" +
                    "\tSELECT aa.uuid\n" +
                    "\tFROM product_sys_data_center_log aa\n" +
                    "\t\tJOIN product_sys_data_center_log bb ON aa.pre_step_uuid = bb.uuid\n" +
                    "\tWHERE aa.type = 2\n" +
                    "\t\tAND bb.type = 1\n" +
                    "\t\tAND bb.id IN (\n" +
                    "\t\t\tSELECT max(a.id)\n" +
                    "\t\t\tFROM product_sys_data_center_log a\n" +
                    "\t\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
                    "\t\t\tWHERE a.type = 1\n" +
                    "\t\t\t\tAND b.type = 2\n" +
                    "\t\t\t\tAND (a.result = 1\n" +
                    "\t\t\t\t\tOR (a.deal_flag = 1\n" +
                    "\t\t\t\t\t\tAND a.deal_result = 1))\n" +
                    "\t\t\t\tAND (b.result = 1\n" +
                    "\t\t\t\t\tOR (b.deal_flag = 1\n" +
                    "\t\t\t\t\t\tAND b.deal_result = 1))\n" +
                    "\t\t\tGROUP BY a.config_uuid\n" +
                    "\t\t)\n" +
                    "\tUNION ALL\n" +
                    "\tSELECT uuid\n" +
                    "\tFROM product_sys_data_center_log a\n" +
                    "\tWHERE a.type = 5\n" +
                    "\t\tAND detail != 3\n" +
                    "\t\tAND (a.result = 1\n" +
                    "\t\t\tOR (a.deal_flag = 1\n" +
                    "\t\t\t\tAND a.deal_result = 1))\n" +
                    ")) ddd";
            //插入数据根据创建时间的年月分别插入到对应的表中
            for (DateTime dateTime : tableSuffixList) {
                String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
                        " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
                getBaseDao().executeUpdate(insertSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")});
         for (int i = 0; i < dt.getRows(); i++) {
            Integer minId = dt.getInt(i, "min_id");
            Integer maxId = dt.getInt(i, "max_id");
            String currentMonth = dt.getString(i, "current_month");
            //查询mysql中是否存在表
            querySql.setLength(0);
            int pageSize = 500;
            //分页查询采集日志数据7天前的最大和最小id
            querySql.append(" select a.uuid as log_collect_uuid,b.uuid as log_extract_uuid from product_sys_data_center_log a");
            querySql.append(" join product_sys_data_center_log b ");
            querySql.append(" on a.uuid=b.pre_step_uuid ");
            querySql.append(" where a.id >= ? and a.id <= ? and a.type=1 and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) and a.type=1 and b.type=2 limit ").append(pageSize);
            while (true) {
               DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId});
               if (DataTableEntity.isEmpty(queryDt)) {
                  break;
               }
               //将查询出的log_collect_uuid和log_extract_uuid分别存入两个list中
               List<String> logCollectUuidList = new ArrayList<>();
               List<String> logExtractUuidList = new ArrayList<>();
               for (int j = 0; j < queryDt.getRows(); j++) {
                  logCollectUuidList.add(queryDt.getString(j, "log_collect_uuid"));
                  logExtractUuidList.add(queryDt.getString(j, "log_extract_uuid"));
               }
               //根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in logCollectUuidList 和 logExtractUuidList 的数据
               StringBuilder insertSql = new StringBuilder(64);
               insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth);
               //将logCollectUuidList和logExtractUuidList合并为一个数组且数组中如果有rangeUuids中的数据则排除
               Set<String> uuidSet = new HashSet<>();
               uuidSet.addAll(logCollectUuidList);
               uuidSet.addAll(logExtractUuidList);
               uuidSet.removeAll(Arrays.asList(retainUuids));
               String[] uuids = uuidSet.toArray(new String[0]);
               insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuids.length, true)).append(")");
               getBaseDao().executeUpdate(insertSql.toString(), uuids);
               //删除采集、提取日志数据
               StringBuilder deleteSql = new StringBuilder(64);
               deleteSql.append(" delete a from product_sys_data_center_log a ");
               deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuids.length, true)).append(")");
               getBaseDao().executeUpdate(deleteSql.toString(), uuids);
               if (pageSize != queryDt.getRows()) {
                  break;
               }
            }
            //分页删除type>2的数据
            querySql.setLength(0);
            querySql.append(" select uuid from product_sys_data_center_log a");
            querySql.append(" where id >= ? and id <= ? and type>2 and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) )  limit ").append(pageSize * 2);
            while (true) {
               DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId});
               if (DataTableEntity.isEmpty(queryDt)) {
                  break;
               }
               //将查询出的uuid存入list中
               List<String> uuidList = queryDt.getData().stream().map(e -> e.getString("uuid")).collect(Collectors.toList());
               //排除rangeUuids中的数据
               uuidList.removeAll(Arrays.asList(retainUuids));
               //根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in uuidList 的数据
               StringBuilder insertSql = new StringBuilder(64);
               insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth);
               insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuidList.size(), true)).append(")");
               getBaseDao().executeUpdate(insertSql.toString(), uuidList.toArray());
               //删除其他日志数据
               StringBuilder deleteSql = new StringBuilder(64);
               deleteSql.append(" delete a from product_sys_data_center_log a ");
               deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuidList.size(), true)).append(")");
               getBaseDao().executeUpdate(deleteSql.toString(), uuidList.toArray());
               if (pageSize * 2 != queryDt.getRows()) {
                  break;
               }
            }
         }
      } catch (Exception e) {
         SpringMVCContextHolder.getSystemLogger().error("deleteCenterLogV2 error");
         SpringMVCContextHolder.getSystemLogger().error(e);
         e.printStackTrace();
         throw e;
      }
   }
   public void deleteCenterLog() {
      this.deleteCenterLogV2();
   }
   public void deleteCenterLogBak() {
      try {
         //在mysql的information_schema 表中查询指定表是否存在
         String tableName = "da_product_sys_data_center_log";
         //归档7天前的数据
         //截止日期
         Date endTime = DateUtil.offset(new Date(), DateField.DAY_OF_YEAR, -7);
         //格式化日期
         String endTimeStr = DateUtil.format(endTime, "yyyy-MM-dd");
         //根据截止日期查询表中最大、最小的创建时间根据创建时间进行分表按月分表
         String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) group by date_format(created_utc_datetime,'%Y-%m') ";
         FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
         if (fs == null) {
            return;
         }
         //最小时间
         Date minTime = fs.getDate("minTime");
         //最大时间
         Date maxTime = fs.getDate("maxTime");
         //根据最小时间和最大时间获取区间内(包含最小时间和最大时间)的年月日期格式为yyyy-MM
         List<DateTime> tableSuffixList = DateUtil.rangeToList(minTime, maxTime, DateField.MONTH);
         //查询数据库中是否存在表
         Set<String> tableNameSet = tableSuffixList.stream().map(e -> tableName + "_" + DateUtil.format(e, "yyyyMM")).collect(Collectors.toSet());
         //查询数据库中存在的表
         String dataBaseName = getBaseDao().getDataBaseName();
         sql = "select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", tableNameSet.size(), true);
         Object[] params = new Object[tableNameSet.size() + 1];
         params[0] = dataBaseName;
         int index = 1;
         for (String tableNameStr : tableNameSet) {
            params[index] = tableNameStr;
            index++;
         }
         DataTableEntity dataTableEntity = getBaseDao().listTable(sql, params);
         if (!DataTableEntity.isEmpty(dataTableEntity)) {
            for (int i = 0; i < dataTableEntity.getRows(); i++) {
               String tableNameStr = dataTableEntity.getString(i, "table_name");
               tableNameSet.remove(tableNameStr);
            }
         }
         //创建不存在的表,表结构与原表一致
         for (String tableNameStr : tableNameSet) {
            String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
            getBaseDao().executeUpdate(createTableSql, new Object[]{});
         }
         //查询各类型配置最后一次成功的数据
         String sqlFilter = "SELECT * FROM (SELECT uuid\n" +
               "FROM product_sys_data_center_log\n" +
               "WHERE uuid IN (\n" +
               "\tSELECT uuid\n" +
               "\tFROM product_sys_data_center_log\n" +
               "\tWHERE id IN (\n" +
               "\t\tSELECT max(a.id)\n" +
               "\t\tFROM product_sys_data_center_log a\n" +
               "\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
               "\t\tWHERE a.type = 1\n" +
               "\t\t\tAND b.type = 2\n" +
               "\t\t\tAND (a.result = 1\n" +
               "\t\t\t\tOR (a.deal_flag = 1\n" +
               "\t\t\t\t\tAND a.deal_result = 1))\n" +
               "\t\t\tAND (b.result = 1\n" +
               "\t\t\t\tOR (b.deal_flag = 1\n" +
               "\t\t\t\t\tAND b.deal_result = 1))\n" +
               "\t\tGROUP BY a.config_uuid\n" +
               "\t)\n" +
               "\tUNION ALL\n" +
               "\tSELECT aa.uuid\n" +
               "\tFROM product_sys_data_center_log aa\n" +
               "\t\tJOIN product_sys_data_center_log bb ON aa.pre_step_uuid = bb.uuid\n" +
               "\tWHERE aa.type = 2\n" +
               "\t\tAND bb.type = 1\n" +
               "\t\tAND bb.id IN (\n" +
               "\t\t\tSELECT max(a.id)\n" +
               "\t\t\tFROM product_sys_data_center_log a\n" +
               "\t\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
               "\t\t\tWHERE a.type = 1\n" +
               "\t\t\t\tAND b.type = 2\n" +
               "\t\t\t\tAND (a.result = 1\n" +
               "\t\t\t\t\tOR (a.deal_flag = 1\n" +
               "\t\t\t\t\t\tAND a.deal_result = 1))\n" +
               "\t\t\t\tAND (b.result = 1\n" +
               "\t\t\t\t\tOR (b.deal_flag = 1\n" +
               "\t\t\t\t\t\tAND b.deal_result = 1))\n" +
               "\t\t\tGROUP BY a.config_uuid\n" +
               "\t\t)\n" +
               "\tUNION ALL\n" +
               "\tSELECT uuid\n" +
               "\tFROM product_sys_data_center_log a\n" +
               "\tWHERE a.type = 5\n" +
               "\t\tAND detail != 3\n" +
               "\t\tAND (a.result = 1\n" +
               "\t\t\tOR (a.deal_flag = 1\n" +
               "\t\t\t\tAND a.deal_result = 1))\n" +
               ")) ddd";
         //插入数据根据创建时间的年月分别插入到对应的表中
         for (DateTime dateTime : tableSuffixList) {
            String selectSql = "select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
                  " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
            //获取datetime所在月份的第一天0点0分0秒 和最后一天23点59分59秒
            DateTime beginOfMonth = DateUtil.beginOfMonth(dateTime);
            DateTime endOfMonth = DateUtil.endOfMonth(dateTime);
            int currentPage = 1;
            int totalPage;
            do {
               //分页查询数据
               DataTableEntity dt = getBaseDao().listTable(selectSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")}, currentPage, 2000);
               if (DataTableEntity.isEmpty(dt)) {
                  break;
               }
               totalPage = dt.getSqle().getTotalpage();
               //更改dt的表名进行插入
               dt.getMeta().setTableName(new Object[]{tableName + "_" + DateUtil.format(dateTime, "yyyyMM")});
               getBaseDao().add(dt);
            } while (totalPage > currentPage);
//            String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
//                  " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
//            getBaseDao().executeUpdate(insertSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")});
//                //删除数据已插入的数据
                StringBuilder deleteSql = new StringBuilder();
                deleteSql.append(" DELETE a FROM product_sys_data_center_log a INNER JOIN ");
                deleteSql.append(" product_sys_data_center_log b on a.uuid=b.pre_step_uuid and a.type=1 and b.type=2 ");
                deleteSql.append(" where ");
                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
                deleteSql.append(" and a.type=1 ");
                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
                //先删除采集日志数据
                getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
            StringBuilder deleteSql = new StringBuilder();
//            deleteSql.append(" DELETE a FROM product_sys_data_center_log a INNER JOIN ");
//            deleteSql.append(" product_sys_data_center_log b on a.uuid=b.pre_step_uuid and a.type=1 and b.type=2 ");
//            deleteSql.append(" where ");
//            deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
//            deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
//            deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
//            deleteSql.append(" and a.type=1 ");
//            deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
//            //先删除采集日志数据
//            getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
                deleteSql.setLength(0);
                //再删除提取日志数据
                deleteSql.append(" DELETE a FROM product_sys_data_center_log a LEFT JOIN ");
                deleteSql.append(" product_sys_data_center_log b on b.uuid=a.pre_step_uuid and b.type=1 and a.type=2 ");
                deleteSql.append(" where ");
                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
                deleteSql.append(" and a.type=2 and b.id is null  ");
                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
                getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
            //删除数据已插入的数据
                deleteSql.setLength(0);
                //再删除其他日志数据
                deleteSql.append(" DELETE FROM product_sys_data_center_log a  ");
                deleteSql.append(" where ");
                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
                deleteSql.append(" and a.type>2 ");
                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
                getBaseDao().executeUpdate(deleteSql.toString());
            }
            //查询data_center_log表中的创建时间是7天前的数据 且开始时间是dateTime的数据
            StringBuilder querySql = new StringBuilder(64);
            querySql.append("select max(id) max_id,min(id) min_id from product_sys_data_center_log ");
            querySql.append(" where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s')");
            querySql.append("select id from ");
            querySql.append(tableName).append("_").append(DateUtil.format(dateTime, "yyyyMM"));
            querySql.append(" where type=1 and (result=1 or (deal_flag=1 and deal_result=1))");
            deleteSql.setLength(0);
            //再删除提取日志数据
            deleteSql.append(" DELETE a FROM product_sys_data_center_log a LEFT JOIN ");
            deleteSql.append(" product_sys_data_center_log b on b.uuid=a.pre_step_uuid and b.type=1 and a.type=2 ");
            deleteSql.append(" where ");
            deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
            deleteSql.append(" and a.type=2 and b.id is null  ");
            deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
            getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
            deleteSql.setLength(0);
            //再删除其他日志数据
            deleteSql.append(" DELETE FROM product_sys_data_center_log a  ");
            deleteSql.append(" where ");
            deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
            deleteSql.append(" and a.type>2 ");
            deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
            getBaseDao().executeUpdate(deleteSql.toString());
         }
//         fs = null;
//         do {
@@ -215,419 +421,419 @@
//            //检查是否还有数据
//            fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
//         } while (fs != null && fs.getInteger("del_count") != null && fs.getInteger("del_count") > 0);
        } catch (Exception e) {
            e.printStackTrace();
            throw e;
        }
    }
      } catch (Exception e) {
         e.printStackTrace();
         throw e;
      }
   }
    /**
     * 创建定时任务
     *
     * @param taskName          任务名称
     * @param isUsed            是否启用
     * @param corn              执行时间
     * @param executeExpression 执行表达式(bean.method)
     */
    private String createTimeTask(String timeTaskUuid, String taskName, boolean isUsed, String corn, String executeExpression) {
        FieldSetEntity fse = new FieldSetEntity();
        try {
            fse.setTableName("product_sys_timed_task");
            fse.setValue("uuid", timeTaskUuid);
            fse.setValue("job_name", taskName);//任务名称
            fse.setValue("job_group", "system");//分组
            fse.setValue("invoke_target", executeExpression);//调用目标字符串
            fse.setValue("cron_expression", corn);//cron表达式
            fse.setValue("misfire_policy", "3");//错误执行策略  只执行一次
            fse.setValue("concurrent", 0);//不允许并发执行
            fse.setValue("remark", taskName);
            fse.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id());
            fse.setValue("created_utc_datetime", new Date());
            fse.setValue("status", isUsed ? 1 : 0);
            fse.setValue("is_conceal", 1);
   /**
    * 创建定时任务
    *
    * @param taskName          任务名称
    * @param isUsed            是否启用
    * @param corn              执行时间
    * @param executeExpression 执行表达式(bean.method)
    */
   private String createTimeTask(String timeTaskUuid, String taskName, boolean isUsed, String corn, String executeExpression) {
      FieldSetEntity fse = new FieldSetEntity();
      try {
         fse.setTableName("product_sys_timed_task");
         fse.setValue("uuid", timeTaskUuid);
         fse.setValue("job_name", taskName);//任务名称
         fse.setValue("job_group", "system");//分组
         fse.setValue("invoke_target", executeExpression);//调用目标字符串
         fse.setValue("cron_expression", corn);//cron表达式
         fse.setValue("misfire_policy", "3");//错误执行策略  只执行一次
         fse.setValue("concurrent", 0);//不允许并发执行
         fse.setValue("remark", taskName);
         fse.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id());
         fse.setValue("created_utc_datetime", new Date());
         fse.setValue("status", isUsed ? 1 : 0);
         fse.setValue("is_conceal", 1);
            if (!StringUtils.isEmpty(timeTaskUuid)) {
                BaseUtil.createCreatorAndCreationTime(fse);
                sysJobService.updateJob(fse);
            } else {
                sysJobService.insertJob(fse);
            }
            return fse.getUUID();
        } catch (Exception e) {
            e.printStackTrace();
            throw new BaseException(ErrorCode.CRATED_TIMED_TASK_FAIL);
        }
    }
         if (!StringUtils.isEmpty(timeTaskUuid)) {
            BaseUtil.createCreatorAndCreationTime(fse);
            sysJobService.updateJob(fse);
         } else {
            sysJobService.insertJob(fse);
         }
         return fse.getUUID();
      } catch (Exception e) {
         e.printStackTrace();
         throw new BaseException(ErrorCode.CRATED_TIMED_TASK_FAIL);
      }
   }
    /**
     * 通用保存
     *
     * @param fse
     * @param t
     */
    private void commonSave(FieldSetEntity fse, CallBackReturnValue<String, FieldSetEntity> t) {
        boolean isAdd = false;
        BaseUtil.createCreatorAndCreationTime(fse);
        if (StringUtils.isEmpty(fse.getUUID())) {
            fse.setValue(CmnConst.UUID, IdUtil.randomUUID());
            isAdd = true;
        } else if (!StringUtils.isEmpty(fse.getUUID()) && "add".equals(fse.getString(CoreConst.SYSTEM_DATA_OPERATE_TYPE))) {
            isAdd = true;
        }
        String timeTasksUid = t.invokeMethod(fse);
        fse.setValue(CmnConst.TIME_TASK_UUID, timeTasksUid);
        if (isAdd) {
            getBaseDao().add(fse);
        } else {
            getBaseDao().update(fse);
        }
    }
   /**
    * 通用保存
    *
    * @param fse
    * @param t
    */
   private void commonSave(FieldSetEntity fse, CallBackReturnValue<String, FieldSetEntity> t) {
      boolean isAdd = false;
      BaseUtil.createCreatorAndCreationTime(fse);
      if (StringUtils.isEmpty(fse.getUUID())) {
         fse.setValue(CmnConst.UUID, IdUtil.randomUUID());
         isAdd = true;
      } else if (!StringUtils.isEmpty(fse.getUUID()) && "add".equals(fse.getString(CoreConst.SYSTEM_DATA_OPERATE_TYPE))) {
         isAdd = true;
      }
      String timeTasksUid = t.invokeMethod(fse);
      fse.setValue(CmnConst.TIME_TASK_UUID, timeTasksUid);
      if (isAdd) {
         getBaseDao().add(fse);
      } else {
         getBaseDao().update(fse);
      }
   }
    /**
     * 保存MES数据同步配置
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    @Transactional
    @Override
    public String saveMesSyncDataConfig(FieldSetEntity fse) throws BaseException {
        DataTableEntity subDt = fse.getSubDataTable("product_sys_data_sync_mes_sub");
        //固定uuid
        fse.setValue("uuid", 1);
        DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "del_type=1");
        DataTableEntity subDataTable = null;
        if (!DataTableEntity.isEmpty(subDt)) {
            subDataTable = subDt.clones();
            int b = 0;
            for (int i = 0; i < subDt.getRows(); i++) {
                FieldSetEntity subFse = subDt.getFieldSetEntity(i);
                String dataType = subFse.getString("~type~");
                String uuid = subFse.getString(CmnConst.UUID);
                if ("del".equals(dataType)) {
                    String timeTaskUid = subFse.getString(CmnConst.TIME_TASK_UUID);
                    FieldSetEntity task = getBaseDao().getFieldSetEntity("product_sys_timed_task", timeTaskUid, false);
                    if (task != null) {
                        cancelTimeTask(task);
                    }
                    subDataTable.setFieldValue(b, "config_uuid", subFse.getUUID());
                } else {
                    subFse.setValue("master_uuid", fse.getUUID());
                    commonSave(subFse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "SYNC:" + fs.getString("table_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execution_time"), "dataSyncService.runTimeTask('" + fs.getUUID() + "')"));
                    subDt.removeFieldSetEntity(i);
                    subDataTable.setFieldValue(b, "uuid", subFse.getUUID());
                    i--;
                }
                b++;
            }
            if (!DataTableEntity.isEmpty(subDt)) {
                Object[] objects = subDt.getData().stream().map(item -> item.getUUID()).toArray();
                getBaseDao().delete("product_sys_data_sync_mes_sub", objects);
            }
        }
        fse.removeSubData("product_sys_data_sync_mes_sub");
        getBaseDao().saveFieldSetEntity(fse);
        //创建触发器
   /**
    * 保存MES数据同步配置
    *
    * @param fse
    * @return
    * @throws BaseException
    */
   @Transactional
   @Override
   public String saveMesSyncDataConfig(FieldSetEntity fse) throws BaseException {
      DataTableEntity subDt = fse.getSubDataTable("product_sys_data_sync_mes_sub");
      //固定uuid
      fse.setValue("uuid", 1);
      DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "del_type=1");
      DataTableEntity subDataTable = null;
      if (!DataTableEntity.isEmpty(subDt)) {
         subDataTable = subDt.clones();
         int b = 0;
         for (int i = 0; i < subDt.getRows(); i++) {
            FieldSetEntity subFse = subDt.getFieldSetEntity(i);
            String dataType = subFse.getString("~type~");
            String uuid = subFse.getString(CmnConst.UUID);
            if ("del".equals(dataType)) {
               String timeTaskUid = subFse.getString(CmnConst.TIME_TASK_UUID);
               FieldSetEntity task = getBaseDao().getFieldSetEntity("product_sys_timed_task", timeTaskUid, false);
               if (task != null) {
                  cancelTimeTask(task);
               }
               subDataTable.setFieldValue(b, "config_uuid", subFse.getUUID());
            } else {
               subFse.setValue("master_uuid", fse.getUUID());
               commonSave(subFse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "SYNC:" + fs.getString("table_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execution_time"), "dataSyncService.runTimeTask('" + fs.getUUID() + "')"));
               subDt.removeFieldSetEntity(i);
               subDataTable.setFieldValue(b, "uuid", subFse.getUUID());
               i--;
            }
            b++;
         }
         if (!DataTableEntity.isEmpty(subDt)) {
            Object[] objects = subDt.getData().stream().map(item -> item.getUUID()).toArray();
            getBaseDao().delete("product_sys_data_sync_mes_sub", objects);
         }
      }
      fse.removeSubData("product_sys_data_sync_mes_sub");
      getBaseDao().saveFieldSetEntity(fse);
      //创建触发器
//           //1 主库删除后同步删除子库
        syncDelRecordService.createTrigger(subDataTable, fse.getString("data_source"), fse.getString("target_data_source"));
        return fse.getUUID();
    }
      syncDelRecordService.createTrigger(subDataTable, fse.getString("data_source"), fse.getString("target_data_source"));
      return fse.getUUID();
   }
    /**
     * 取消定时任务
     *
     * @param fse
     * @throws BaseException
     */
    private void cancelTimeTask(FieldSetEntity fse) throws BaseException {
        try {
            this.sysJobService.deleteJob(fse);
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
            throw new BaseException(ErrorCode.CANCEL_TIME_TASK_FAIL);
        }
    }
   /**
    * 取消定时任务
    *
    * @param fse
    * @throws BaseException
    */
   private void cancelTimeTask(FieldSetEntity fse) throws BaseException {
      try {
         this.sysJobService.deleteJob(fse);
      } catch (Exception e) {
         e.printStackTrace();
         SpringMVCContextHolder.getSystemLogger().error(e);
         throw new BaseException(ErrorCode.CANCEL_TIME_TASK_FAIL);
      }
   }
    /**
     * 采集配置保存
     *
     * @param fse
     * @return
     */
    @Override
    @Transactional
    public synchronized String saveCollectConfig(FieldSetEntity fse) throws BaseException {
        boolean isAdd = false;
        if (StringUtils.isEmpty(fse.getUUID())) {
            if (StringUtils.isEmpty(dataSystemName)) {
                //数据来源系统名称不能为空
                throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY);
            }
            Pattern r = Pattern.compile("^[A-Za-z-]+$");
            Matcher m = r.matcher(dataSystemName);
            if (!m.matches()) {
                //数据来源系统名称格式不符
                throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_FORMAT_UNQUALIFIED);
            }
            //新增
            FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL("SELECT\n" + "\t ifnull(MAX( id ),0)+1 id " + "FROM\n" + "\t( SELECT CONVERT(REPLACE ( id, ?, '' ),UNSIGNED int) id FROM product_sys_data_collect WHERE id LIKE concat(?,'%') ) a", new Object[]{dataSystemName, dataSystemName}, false);
            if (!FieldSetEntity.isEmpty(fs) && !StringUtils.isEmpty(fs.getString("id"))) {
                String id = fs.getString("id");
                DecimalFormat decimalFormat = new DecimalFormat("000000");
                id = decimalFormat.format(Integer.valueOf(id));
                fse.setValue("id", dataSystemName + id);
            } else {
                throw new BaseException(ErrorCode.UNIQUE_COLLECT_CREATE_FAIL);
            }
            isAdd = true;
        }
   /**
    * 采集配置保存
    *
    * @param fse
    * @return
    */
   @Override
   @Transactional
   public synchronized String saveCollectConfig(FieldSetEntity fse) throws BaseException {
      boolean isAdd = false;
      if (StringUtils.isEmpty(fse.getUUID())) {
         if (StringUtils.isEmpty(dataSystemName)) {
            //数据来源系统名称不能为空
            throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY);
         }
         Pattern r = Pattern.compile("^[A-Za-z-]+$");
         Matcher m = r.matcher(dataSystemName);
         if (!m.matches()) {
            //数据来源系统名称格式不符
            throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_FORMAT_UNQUALIFIED);
         }
         //新增
         FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL("SELECT\n" + "\t ifnull(MAX( id ),0)+1 id " + "FROM\n" + "\t( SELECT CONVERT(REPLACE ( id, ?, '' ),UNSIGNED int) id FROM product_sys_data_collect WHERE id LIKE concat(?,'%') ) a", new Object[]{dataSystemName, dataSystemName}, false);
         if (!FieldSetEntity.isEmpty(fs) && !StringUtils.isEmpty(fs.getString("id"))) {
            String id = fs.getString("id");
            DecimalFormat decimalFormat = new DecimalFormat("000000");
            id = decimalFormat.format(Integer.valueOf(id));
            fse.setValue("id", dataSystemName + id);
         } else {
            throw new BaseException(ErrorCode.UNIQUE_COLLECT_CREATE_FAIL);
         }
         isAdd = true;
      }
        commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DC:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency"), "dataCollectService.dataCollect('" + fs.getUUID() + "')"));
        saveDelRecordConfig(1, fse.getUUID());
        if (!"ch-kt".equals(dataSystemName)) {
            //子库保存后发送到主服务
            fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
            mesExternalService.remoteSaveCollectConfig(fse);
        }
        return fse.getUUID();
    }
      commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DC:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency"), "dataCollectService.dataCollect('" + fs.getUUID() + "')"));
      saveDelRecordConfig(1, fse.getUUID());
      if (!"ch-kt".equals(dataSystemName)) {
         //子库保存后发送到主服务
         fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
         mesExternalService.remoteSaveCollectConfig(fse);
      }
      return fse.getUUID();
   }
    /**
     * 采集配置详情
     *
     * @param fse
     * @return
     */
    @Override
    public FieldSetEntity findCollectConfig(FieldSetEntity fse) throws BaseException {
        return publicService.getFieldSetEntity(fse, false);
    }
   /**
    * 采集配置详情
    *
    * @param fse
    * @return
    */
   @Override
   public FieldSetEntity findCollectConfig(FieldSetEntity fse) throws BaseException {
      return publicService.getFieldSetEntity(fse, false);
   }
    /**
     * 采集配置删除
     *
     * @param fse
     */
    @Override
    @Transactional
    public void delCollectConfig(FieldSetEntity fse) throws BaseException {
        String uuids = fse.getUUID();
        String[] split = uuids.split(",");
        DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM  product_sys_data_collect WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
        if (!DataTableEntity.isEmpty(dt)) {
            for (int i = 0; i < dt.getRows(); i++) {
                cancelTimeTask(dt.getFieldSetEntity(i));
            }
        }
        for (String s : split) {
            cancelDelRecordConfig(1, s);
        }
        publicService.delete(fse);
    }
   /**
    * 采集配置删除
    *
    * @param fse
    */
   @Override
   @Transactional
   public void delCollectConfig(FieldSetEntity fse) throws BaseException {
      String uuids = fse.getUUID();
      String[] split = uuids.split(",");
      DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM  product_sys_data_collect WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
      if (!DataTableEntity.isEmpty(dt)) {
         for (int i = 0; i < dt.getRows(); i++) {
            cancelTimeTask(dt.getFieldSetEntity(i));
         }
      }
      for (String s : split) {
         cancelDelRecordConfig(1, s);
      }
      publicService.delete(fse);
   }
    /**
     * 提取配置保存
     *
     * @param fse
     * @return
     */
    @Override
    @Transactional
    public String saveExtractConfig(FieldSetEntity fse) throws BaseException {
        boolean isAdd = !StringUtils.isEmpty(fse.getUUID());
        commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DE:" + fs.getString("extract_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("extract_time"), "dataExtractService.startExtractData('" + fs.getUUID() + "')"));
        saveDelRecordConfig(2, fse.getUUID());
        if ("ch-kt".equals(dataSystemName)) {
            //主服务保存后分发到子服务中
            fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
            mesExternalService.remoteSaveExtractConfig(fse);
        }
        return fse.getUUID();
    }
   /**
    * 提取配置保存
    *
    * @param fse
    * @return
    */
   @Override
   @Transactional
   public String saveExtractConfig(FieldSetEntity fse) throws BaseException {
      boolean isAdd = !StringUtils.isEmpty(fse.getUUID());
      commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DE:" + fs.getString("extract_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("extract_time"), "dataExtractService.startExtractData('" + fs.getUUID() + "')"));
      saveDelRecordConfig(2, fse.getUUID());
      if ("ch-kt".equals(dataSystemName)) {
         //主服务保存后分发到子服务中
         fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
         mesExternalService.remoteSaveExtractConfig(fse);
      }
      return fse.getUUID();
   }
    /**
     * 提取配置查询
     *
     * @param fse
     * @return
     */
    @Override
    public FieldSetEntity findExtractConfig(FieldSetEntity fse) throws BaseException {
        return publicService.getFieldSetEntity(fse, true);
    }
   /**
    * 提取配置查询
    *
    * @param fse
    * @return
    */
   @Override
   public FieldSetEntity findExtractConfig(FieldSetEntity fse) throws BaseException {
      return publicService.getFieldSetEntity(fse, true);
   }
    /**
     * 提取配置删除
     *
     * @param fse
     */
    @Override
    @Transactional
    public void delExtractConfig(FieldSetEntity fse) throws BaseException {
        String uuids = fse.getUUID();
        String[] split = uuids.split(",");
        DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM  product_sys_data_extract_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
        if (!DataTableEntity.isEmpty(dt)) {
            for (int i = 0; i < dt.getRows(); i++) {
                cancelTimeTask(dt.getFieldSetEntity(i));
            }
        }
        for (String s : split) {
            cancelDelRecordConfig(2, s);
        }
        publicService.delete(fse);
    }
   /**
    * 提取配置删除
    *
    * @param fse
    */
   @Override
   @Transactional
   public void delExtractConfig(FieldSetEntity fse) throws BaseException {
      String uuids = fse.getUUID();
      String[] split = uuids.split(",");
      DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM  product_sys_data_extract_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
      if (!DataTableEntity.isEmpty(dt)) {
         for (int i = 0; i < dt.getRows(); i++) {
            cancelTimeTask(dt.getFieldSetEntity(i));
         }
      }
      for (String s : split) {
         cancelDelRecordConfig(2, s);
      }
      publicService.delete(fse);
   }
    /**
     * 归档配置保存
     *
     * @param fse
     * @return
     */
    @Override
    @Transactional
    public String saveArchivingConfig(FieldSetEntity fse) throws BaseException {
        String sourceDataSource = fse.getString("source_data_source");
        String targetDataSource = fse.getString("target_data_source");
        String[] dataSource = (sourceDataSource + "," + targetDataSource).split(",");
        StringBuilder sql = new StringBuilder();
        sql.append("SELECT db_type num FROM product_sys_data_sync_manager WHERE ");
        sql.append(BaseUtil.buildQuestionMarkFilter("uuid", dataSource.length, true));
        sql.append(" GROUP BY db_type ");
        DataTableEntity dt = getBaseDao().listTable(sql.toString(), dataSource);
        if (DataTableEntity.isEmpty(dt)) {
            throw new BaseException(ErrorCode.VERIFY_DATA_SOURCE_TYPE_FAIL);
        }
   /**
    * 归档配置保存
    *
    * @param fse
    * @return
    */
   @Override
   @Transactional
   public String saveArchivingConfig(FieldSetEntity fse) throws BaseException {
      String sourceDataSource = fse.getString("source_data_source");
      String targetDataSource = fse.getString("target_data_source");
      String[] dataSource = (sourceDataSource + "," + targetDataSource).split(",");
      StringBuilder sql = new StringBuilder();
      sql.append("SELECT db_type num FROM product_sys_data_sync_manager WHERE ");
      sql.append(BaseUtil.buildQuestionMarkFilter("uuid", dataSource.length, true));
      sql.append(" GROUP BY db_type ");
      DataTableEntity dt = getBaseDao().listTable(sql.toString(), dataSource);
      if (DataTableEntity.isEmpty(dt)) {
         throw new BaseException(ErrorCode.VERIFY_DATA_SOURCE_TYPE_FAIL);
      }
        /* 2022年10月10日 15:33:47 6c 需要能够支持不同数据库间的归档
        if (dt.getRows() != 1) {
            throw new BaseException(ErrorCode.DATA_SOURCE_INCONFORMITY);
        }*/
        if (StringUtils.equalsAny(dt.getString(0, "num"), "0", "1", "5")) {
            boolean isAdd = !StringUtils.isEmpty(fse.getUUID());
            commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DA:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execute_time"), "dataArchivingService.dataArchivingEntry('" + fs.getUUID() + "')"));
            if ("ch-kt".equals(dataSystemName)) {
                //主服务保存后分发到子服务中
                fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
                mesExternalService.remoteSaveArchiveConfig(fse);
            }
        }
      if (StringUtils.equalsAny(dt.getString(0, "num"), "0", "1", "5")) {
         boolean isAdd = !StringUtils.isEmpty(fse.getUUID());
         commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DA:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execute_time"), "dataArchivingService.dataArchivingEntry('" + fs.getUUID() + "')"));
         if ("ch-kt".equals(dataSystemName)) {
            //主服务保存后分发到子服务中
            fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
            mesExternalService.remoteSaveArchiveConfig(fse);
         }
      }
        return fse.getUUID();
    }
      return fse.getUUID();
   }
    /**
     * 归档配置详情
     *
     * @param fse
     * @return
     */
    @Override
    public FieldSetEntity findArchivingConfig(FieldSetEntity fse) throws BaseException {
        return publicService.getFieldSetEntity(fse, false);
    }
   /**
    * 归档配置详情
    *
    * @param fse
    * @return
    */
   @Override
   public FieldSetEntity findArchivingConfig(FieldSetEntity fse) throws BaseException {
      return publicService.getFieldSetEntity(fse, false);
   }
    /**
     * 归档配置删除
     *
     * @param fse
     */
    @Override
    @Transactional
    public void delArchivingConfig(FieldSetEntity fse) throws BaseException {
        String uuids = fse.getUUID();
        String[] split = uuids.split(",");
        DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM  product_sys_data_archiving_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
        if (!DataTableEntity.isEmpty(dt)) {
            for (int i = 0; i < dt.getRows(); i++) {
                cancelTimeTask(dt.getFieldSetEntity(i));
            }
        }
        publicService.delete(fse);
    }
   /**
    * 归档配置删除
    *
    * @param fse
    */
   @Override
   @Transactional
   public void delArchivingConfig(FieldSetEntity fse) throws BaseException {
      String uuids = fse.getUUID();
      String[] split = uuids.split(",");
      DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM  product_sys_data_archiving_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
      if (!DataTableEntity.isEmpty(dt)) {
         for (int i = 0; i < dt.getRows(); i++) {
            cancelTimeTask(dt.getFieldSetEntity(i));
         }
      }
      publicService.delete(fse);
   }
    /**
     * 订单数据验证配置保存
     *
     * @param fse
     * @return
     */
    @Transactional
    @Override
    public String saveOrderDataVerification(FieldSetEntity fse) {
        fse.setValue(CmnConst.UUID, "1");
        commonSave(fse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "订单数据同步验证", fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency_verification"), "orderDataValidationService.verificationEntryPoint"));
        return fse.getUUID();
    }
   /**
    * 订单数据验证配置保存
    *
    * @param fse
    * @return
    */
   @Transactional
   @Override
   public String saveOrderDataVerification(FieldSetEntity fse) {
      fse.setValue(CmnConst.UUID, "1");
      commonSave(fse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "订单数据同步验证", fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency_verification"), "orderDataValidationService.verificationEntryPoint"));
      return fse.getUUID();
   }
    /**
     * 保存删除记录配置
     *
     * @param type
     * @param uuid
     */
    public void saveDelRecordConfig(int type, String uuid) {
        String sql = getDelRecordSql(type);
   /**
    * 保存删除记录配置
    *
    * @param type
    * @param uuid
    */
   public void saveDelRecordConfig(int type, String uuid) {
      String sql = getDelRecordSql(type);
        DataTableEntity dt = getBaseDao().listTable(sql, new Object[]{uuid, dataSystemName});
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fse = dt.getFieldSetEntity(i);
            fse.setTableName(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG);
            fse.setValue("delete_sync_content", 1);
            syncDelRecordService.createTrigger(fse);
        }
      DataTableEntity dt = getBaseDao().listTable(sql, new Object[]{uuid, dataSystemName});
      for (int i = 0; i < dt.getRows(); i++) {
         FieldSetEntity fse = dt.getFieldSetEntity(i);
         fse.setTableName(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG);
         fse.setValue("delete_sync_content", 1);
         syncDelRecordService.createTrigger(fse);
      }
    }
   }
    private String getDelRecordSql(int type) {
        StringBuffer sql = new StringBuffer();
        sql.append("\n SELECT ");
        sql.append("\n c.uuid, ");
        sql.append("\n c.random_suffix,");
        sql.append("\n collect.uuid config_uuid, ");
        sql.append("\n collect.data_source, ");
        sql.append("\n extract.extract_target_source target_data_source, ");
        sql.append("\n collect.auto_field master_key_field, ");
        sql.append("\n collect.source_table table_name, ");
        sql.append("\n collect.id collect_id,2 del_type, ");
        sql.append("\n extract.collect_source_field collect_field,");
        sql.append("\n extract.extract_unique_field pre_master_field");
        sql.append("\n FROM ");
        sql.append("\n product_sys_data_collect collect ");
        sql.append("\n JOIN product_sys_data_extract_config extract ON upper( case when length(collect.target_table)>0 then collect.target_table else collect.source_table end ) = upper( extract.extract_source_table ) ");
        sql.append("\n LEFT JOIN product_mes_del_record_config c ON collect.uuid=c.config_uuid and c.del_type=2 ");
        sql.append(" WHERE ");
        if (1 == type) {
            //采集
            sql.append("\n collect.uuid=? ");
        } else {
            //提取
            sql.append("\n extract.uuid=? ");
        }
        sql.append(" and  collect.id like concat(?,'%')");
        return sql.toString();
    }
   private String getDelRecordSql(int type) {
      StringBuffer sql = new StringBuffer();
      sql.append("\n SELECT ");
      sql.append("\n c.uuid, ");
      sql.append("\n c.random_suffix,");
      sql.append("\n collect.uuid config_uuid, ");
      sql.append("\n collect.data_source, ");
      sql.append("\n extract.extract_target_source target_data_source, ");
      sql.append("\n collect.auto_field master_key_field, ");
      sql.append("\n collect.source_table table_name, ");
      sql.append("\n collect.id collect_id,2 del_type, ");
      sql.append("\n extract.collect_source_field collect_field,");
      sql.append("\n extract.extract_unique_field pre_master_field");
      sql.append("\n FROM ");
      sql.append("\n product_sys_data_collect collect ");
      sql.append("\n JOIN product_sys_data_extract_config extract ON upper( case when length(collect.target_table)>0 then collect.target_table else collect.source_table end ) = upper( extract.extract_source_table ) ");
      sql.append("\n LEFT JOIN product_mes_del_record_config c ON collect.uuid=c.config_uuid and c.del_type=2 ");
      sql.append(" WHERE ");
      if (1 == type) {
         //采集
         sql.append("\n collect.uuid=? ");
      } else {
         //提取
         sql.append("\n extract.uuid=? ");
      }
      sql.append(" and  collect.id like concat(?,'%')");
      return sql.toString();
   }
    public void cancelDelRecordConfig(int type, String uuid) {
        FieldSetEntity fieldSetEntity = new FieldSetEntity();
        fieldSetEntity.setTableName("temp");
        if (type == 1) {
            fieldSetEntity.setValue("uuid", uuid);
            syncDelRecordService.deleteTrigger(fieldSetEntity);
        } else {
            //提取
            DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "UPPER(source_table) in (select upper(extract_source_table) from product_sys_data_extract_config where uuid =?) ", new Object[]{uuid});
            if (DataTableEntity.isEmpty(dt)) {
                return;
            }
            Object[] uuids = dt.getUuids();
            for (Object o : uuids) {
                fieldSetEntity.setValue("uuid", o);
                syncDelRecordService.deleteTrigger(fieldSetEntity);
            }
        }
   public void cancelDelRecordConfig(int type, String uuid) {
      FieldSetEntity fieldSetEntity = new FieldSetEntity();
      fieldSetEntity.setTableName("temp");
      if (type == 1) {
         fieldSetEntity.setValue("uuid", uuid);
         syncDelRecordService.deleteTrigger(fieldSetEntity);
      } else {
         //提取
         DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "UPPER(source_table) in (select upper(extract_source_table) from product_sys_data_extract_config where uuid =?) ", new Object[]{uuid});
         if (DataTableEntity.isEmpty(dt)) {
            return;
         }
         Object[] uuids = dt.getUuids();
         for (Object o : uuids) {
            fieldSetEntity.setValue("uuid", o);
            syncDelRecordService.deleteTrigger(fieldSetEntity);
         }
      }
    }
   }
}