123
许鹏程
2024-05-12 5a7cfb9266c7e03ccee44811b0fba1d4aceae6fd
product-server-data-center/src/main/java/com/product/data/center/service/CommonService.java
@@ -1,5 +1,9 @@
package com.product.data.center.service;
import cn.hutool.core.date.DateField;
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.IdUtil;
import com.product.admin.service.PublicService;
import com.product.common.lang.StringUtils;
@@ -14,16 +18,23 @@
import com.product.data.center.config.ErrorCode;
import com.product.data.center.service.ide.ICommonService;
import com.product.data.center.utils.CallBackReturnValue;
import com.product.datasource.entity.DataBaseEntity;
import com.product.quartz.service.impl.SysJobService;
import com.product.util.BaseUtil;
import io.swagger.models.auth.In;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.DecimalFormat;
import java.time.LocalDate;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
/**
 * @Author cheng
@@ -48,12 +59,184 @@
   @Value("${data.system.name}")
   private String dataSystemName;
   @Transactional
   public void deleteCenterLog() {
      for (int i = 0; i < 10; i++) {
         this.getBaseDao().executeUpdate("DELETE FROM product_sys_data_center_log WHERE uuid in (select * from (select * from product_sys_data_center_log_del_v limit 10000) a ");
      try {
         //在mysql的information_schema 表中查询指定表是否存在
         String tableName = "da_product_sys_data_center_log";
         //归档7天前的数据
         //截止日期
         Date endTime = DateUtil.offset(new Date(), DateField.DAY_OF_YEAR, -7);
         //格式化日期
         String endTimeStr = DateUtil.format(endTime, "yyyy-MM-dd");
         //根据截止日期查询表中最大、最小的创建时间根据创建时间进行分表按月分表
         String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) )";
         FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
         if (fs == null) {
            return;
         }
         //最小时间
         Date minTime = fs.getDate("minTime");
         //最大时间
         Date maxTime = fs.getDate("maxTime");
         //根据最小时间和最大时间获取区间内(包含最小时间和最大时间)的年月日期格式为yyyy-MM
         List<DateTime> tableSuffixList = DateUtil.rangeToList(minTime, maxTime, DateField.MONTH);
         //查询数据库中是否存在表
         Set<String> tableNameSet = tableSuffixList.stream().map(e -> tableName + "_" + DateUtil.format(e, "yyyyMM")).collect(Collectors.toSet());
         //查询数据库中存在的表
         String dataBaseName = getBaseDao().getDataBaseName();
         sql = "select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", tableNameSet.size(), true);
         Object[] params = new Object[tableNameSet.size() + 1];
         params[0] = dataBaseName;
         int index = 1;
         for (String tableNameStr : tableNameSet) {
            params[index] = tableNameStr;
            index++;
         }
         DataTableEntity dataTableEntity = getBaseDao().listTable(sql, params);
         if (!DataTableEntity.isEmpty(dataTableEntity)) {
            for (int i = 0; i < dataTableEntity.getRows(); i++) {
               String tableNameStr = dataTableEntity.getString(i, "table_name");
               tableNameSet.remove(tableNameStr);
            }
         }
         //创建不存在的表,表结构与原表一致
         for (String tableNameStr : tableNameSet) {
            String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
            getBaseDao().executeUpdate(createTableSql, new Object[]{});
         }
         //查询各类型配置最后一次成功的数据
         String sqlFilter = "SELECT * FROM (SELECT uuid\n" +
               "FROM product_sys_data_center_log\n" +
               "WHERE uuid IN (\n" +
               "\tSELECT uuid\n" +
               "\tFROM product_sys_data_center_log\n" +
               "\tWHERE id IN (\n" +
               "\t\tSELECT max(a.id)\n" +
               "\t\tFROM product_sys_data_center_log a\n" +
               "\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
               "\t\tWHERE a.type = 1\n" +
               "\t\t\tAND b.type = 2\n" +
               "\t\t\tAND (a.result = 1\n" +
               "\t\t\t\tOR (a.deal_flag = 1\n" +
               "\t\t\t\t\tAND a.deal_result = 1))\n" +
               "\t\t\tAND (b.result = 1\n" +
               "\t\t\t\tOR (b.deal_flag = 1\n" +
               "\t\t\t\t\tAND b.deal_result = 1))\n" +
               "\t\tGROUP BY a.config_uuid\n" +
               "\t)\n" +
               "\tUNION ALL\n" +
               "\tSELECT aa.uuid\n" +
               "\tFROM product_sys_data_center_log aa\n" +
               "\t\tJOIN product_sys_data_center_log bb ON aa.pre_step_uuid = bb.uuid\n" +
               "\tWHERE aa.type = 2\n" +
               "\t\tAND bb.type = 1\n" +
               "\t\tAND bb.id IN (\n" +
               "\t\t\tSELECT max(a.id)\n" +
               "\t\t\tFROM product_sys_data_center_log a\n" +
               "\t\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
               "\t\t\tWHERE a.type = 1\n" +
               "\t\t\t\tAND b.type = 2\n" +
               "\t\t\t\tAND (a.result = 1\n" +
               "\t\t\t\t\tOR (a.deal_flag = 1\n" +
               "\t\t\t\t\t\tAND a.deal_result = 1))\n" +
               "\t\t\t\tAND (b.result = 1\n" +
               "\t\t\t\t\tOR (b.deal_flag = 1\n" +
               "\t\t\t\t\t\tAND b.deal_result = 1))\n" +
               "\t\t\tGROUP BY a.config_uuid\n" +
               "\t\t)\n" +
               "\tUNION ALL\n" +
               "\tSELECT uuid\n" +
               "\tFROM product_sys_data_center_log a\n" +
               "\tWHERE a.type = 5\n" +
               "\t\tAND detail != 3\n" +
               "\t\tAND (a.result = 1\n" +
               "\t\t\tOR (a.deal_flag = 1\n" +
               "\t\t\t\tAND a.deal_result = 1))\n" +
               ")) ddd";
         //插入数据根据创建时间的年月分别插入到对应的表中
         for (DateTime dateTime : tableSuffixList) {
            String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
                  " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
            getBaseDao().executeUpdate(insertSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")});
//                //删除数据已插入的数据
            StringBuilder deleteSql = new StringBuilder();
            deleteSql.append(" select a.id FROM product_sys_data_center_log a INNER JOIN ");
            deleteSql.append(" product_sys_data_center_log b on a.uuid=b.pre_step_uuid and a.type=1 and b.type=2 ");
            deleteSql.append(" where ");
            deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
            deleteSql.append(" and a.type=1 ");
            deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
            //先删除采集日志数据
//            getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
            DataTableEntity dt = getBaseDao().listTable(deleteSql.toString(), new Object[]{});
            if (!DataTableEntity.isEmpty(dt)) {
               List<Integer> ids = dt.getData().stream().map(row -> row.getInteger("id")).collect(Collectors.toList());
               getBaseDao().delete("product_sys_data_center_log", BaseUtil.buildQuestionMarkFilter("id", ids.size(), true), ids.toArray());
            }
            deleteSql.setLength(0);
            //再删除提取日志数据
            deleteSql.append(" select a.id FROM product_sys_data_center_log a LEFT JOIN ");
            deleteSql.append(" product_sys_data_center_log b on b.uuid=a.pre_step_uuid and b.type=1 and a.type=2 ");
            deleteSql.append(" where ");
            deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
            deleteSql.append(" and a.type=2 and b.id is null  ");
            deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
//            getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
            dt = getBaseDao().listTable(deleteSql.toString(), new Object[]{});
            if (!DataTableEntity.isEmpty(dt)) {
               List<Integer> ids = dt.getData().stream().map(row -> row.getInteger("id")).collect(Collectors.toList());
               getBaseDao().delete("product_sys_data_center_log", BaseUtil.buildQuestionMarkFilter("id", ids.size(), true), ids.toArray());
            }
            deleteSql.setLength(0);
            //再删除其他日志数据
            deleteSql.append(" select a.id FROM product_sys_data_center_log a  ");
            deleteSql.append(" where ");
            deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
            deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
            deleteSql.append(" and a.type>2 ");
            deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
            dt = getBaseDao().listTable(deleteSql.toString(), new Object[]{});
            if (!DataTableEntity.isEmpty(dt)) {
               List<Integer> ids = dt.getData().stream().map(row -> row.getInteger("id")).collect(Collectors.toList());
               getBaseDao().delete("product_sys_data_center_log", BaseUtil.buildQuestionMarkFilter("id", ids.size(), true), ids.toArray());
            }
         }
//         fs = null;
//         do {
//            this.getBaseDao().executeUpdate("DELETE FROM product_sys_data_center_log WHERE uuid in (select * from (select * from product_sys_data_center_log_del_v limit 10000) a ) ");
//            //检查是否还有数据
//            sql = "select 1 as del_count from product_sys_data_center_log_del_v limit 1";
//            //检查是否还有数据
//            fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
//         } while (fs != null && fs.getInteger("del_count") != null && fs.getInteger("del_count") > 0);
      } catch (Exception e) {
         e.printStackTrace();
         SpringMVCContextHolder.getSystemLogger().error("删除采集提取日志错误!!!:" + e.getMessage());
         SpringMVCContextHolder.getSystemLogger().error(e);
         throw e;
      }
   }
   /**
    * 创建定时任务