许鹏程
2024-05-15 3212d131b28e5f4b09bb1222861370310a00ffc9
更改采集提取日志表归档删除逻辑
已修改3个文件
269 ■■■■ 文件已修改
product-server-data-center/src/main/java/com/product/data/center/service/CommonService.java 265 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataCollectService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataExtractService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/CommonService.java
@@ -4,6 +4,7 @@
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.IdUtil;
import com.product.admin.service.PublicService;
import com.product.common.lang.StringUtils;
@@ -21,17 +22,13 @@
import com.product.datasource.entity.DataBaseEntity;
import com.product.quartz.service.impl.SysJobService;
import com.product.util.BaseUtil;
import io.swagger.models.auth.In;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.text.DecimalFormat;
import java.time.LocalDate;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -59,8 +56,172 @@
    @Value("${data.system.name}")
    private String dataSystemName;
    @Transactional
    public void deleteCenterLogV2() {
        try {
            StringBuilder querySql = new StringBuilder(64);
            //select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime,min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y-%m') current_month  from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) group by date_format(created_utc_datetime,'%Y-%m')
            querySql.append(" select min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y%m') current_month  ");
            querySql.append(" from product_sys_data_center_log ");
            querySql.append(" where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) ");
            querySql.append(" group by date_format(created_utc_datetime,'%Y%m') ");
            DataTableEntity dt = getBaseDao().listTable(querySql.toString(), new Object[]{});
            if (DataTableEntity.isEmpty(dt)) {
                return;
            }
            //在mysql的information_schema 表中查询指定表是否存在
            String tableName = "da_product_sys_data_center_log";
            //封装所有表名
            Set<String> allTableName = dt.getData().stream().map(e -> tableName + "_" + e.getString("current_month")).collect(Collectors.toSet());
            //在mysql的information_schema 表中查询所有的表
            querySql.setLength(0);
            querySql.append(" select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", allTableName.size(), true));
            String dataBaseName = getBaseDao().getDataBaseName();
            Object[] params = new Object[allTableName.size() + 1];
            params[0] = dataBaseName;
            int index = 1;
            for (String tableNameStr : allTableName) {
                params[index] = tableNameStr;
                index++;
            }
            DataTableEntity dataTableEntity = getBaseDao().listTable(querySql.toString(), params);
            Set<String> existsTable = dataTableEntity.getData().stream().map(e -> e.getString("table_name")).collect(Collectors.toSet());
            //对比表名,创建不存在的表
            allTableName.removeAll(existsTable);
            for (String tableNameStr : allTableName) {
                String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
                getBaseDao().executeUpdate(createTableSql, new Object[]{});
            }
            querySql.setLength(0);
            //查询data_center_log表的uuid字段 created_utc_datetime最新的数据根据 config_uuid 和 type进行分组 且 result = 1 or ( deal_flag = 1 AND deal_result = 1 ) 每组只取最新的一条数据
            querySql.append(" SELECT uuid,row_num,(select uuid from product_sys_data_center_log where pre_step_uuid = uuid and type=2) as pre_step_uuid ");
            querySql.append(" FROM ( ");
            querySql.append("        SELECT ");
            querySql.append("        uuid, ");
            querySql.append("        ROW_NUMBER() OVER (PARTITION BY config_uuid, type ORDER BY created_utc_datetime DESC) AS row_num ");
            querySql.append("            FROM product_sys_data_center_log ");
            querySql.append("            WHERE result = 1 OR (deal_flag = 1 AND deal_result = 1) ");
            querySql.append("    ) AS subquery ");
            querySql.append("    WHERE row_num = 1  ");
            //查询需要保留的数据uuid
            DataTableEntity retainData = getBaseDao().listTable(querySql.toString(), new Object[]{});
            String[] retainUuids = DataTableEntity.isEmpty(retainData) ? null : new String[retainData.getRows() * 2];
            if (!DataTableEntity.isEmpty(retainData)) {
                for (int i = 0; i < retainData.getRows(); i++) {
                    retainUuids[i] = retainData.getString(i, "uuid");
                    retainUuids[i + retainData.getRows()] = retainData.getString(i, "pre_step_uuid");
                }
            }
            for (int i = 0; i < dt.getRows(); i++) {
                Integer minId = dt.getInt(i, "min_id");
                Integer maxId = dt.getInt(i, "max_id");
                String currentMonth = dt.getString(i, "current_month");
                //查询mysql中是否存在表
                querySql.setLength(0);
                int pageSize = 500;
                //分页查询采集日志数据7天前的最大和最小id
                querySql.append(" select a.uuid as log_collect_uuid,b.uuid as log_extract_uuid from product_sys_data_center_log a");
                querySql.append(" join product_sys_data_center_log b ");
                querySql.append(" on a.uuid=b.pre_step_uuid ");
                querySql.append(" where a.id >= ? and a.id <= ? and a.type=1 and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) and a.type=1 and b.type=2 limit ").append(pageSize);
                while (true) {
                    DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId});
                    if (DataTableEntity.isEmpty(queryDt)) {
                        break;
                    }
                    //将查询出的log_collect_uuid和log_extract_uuid分别存入两个list中
                    List<String> logCollectUuidList = new ArrayList<>();
                    List<String> logExtractUuidList = new ArrayList<>();
                    for (int j = 0; j < queryDt.getRows(); j++) {
                        logCollectUuidList.add(queryDt.getString(j, "log_collect_uuid"));
                        logExtractUuidList.add(queryDt.getString(j, "log_extract_uuid"));
                    }
                    //根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in logCollectUuidList 和 logExtractUuidList 的数据
                    StringBuilder insertSql = new StringBuilder(64);
                    insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth);
                    //将logCollectUuidList和logExtractUuidList合并为一个数组且数组中如果有rangeUuids中的数据则排除
                    Set<String> uuidSet = new HashSet<>();
                    uuidSet.addAll(logCollectUuidList);
                    uuidSet.addAll(logExtractUuidList);
                    uuidSet.removeAll(Arrays.asList(retainUuids));
                    String[] uuids = uuidSet.toArray(new String[0]);
                    insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuids.length, true)).append(")");
                    getBaseDao().executeUpdate(insertSql.toString(), uuids);
                    //删除采集、提取日志数据
                    StringBuilder deleteSql = new StringBuilder(64);
                    deleteSql.append(" delete a from product_sys_data_center_log a ");
                    deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuids.length, true)).append(")");
                    getBaseDao().executeUpdate(deleteSql.toString(), uuids);
                    if (pageSize != queryDt.getRows()) {
                        break;
                    }
                }
                //分页删除type>2的数据
                querySql.setLength(0);
                querySql.append(" select uuid from product_sys_data_center_log a");
                querySql.append(" where id >= ? and id <= ? and type>2 and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) )  limit ").append(pageSize * 2);
                while (true) {
                    DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId});
                    if (DataTableEntity.isEmpty(queryDt)) {
                        break;
                    }
                    //将查询出的uuid存入list中
                    List<String> uuidList = queryDt.getData().stream().map(e -> e.getString("uuid")).collect(Collectors.toList());
                    //排除rangeUuids中的数据
                    uuidList.removeAll(Arrays.asList(retainUuids));
                    //根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in uuidList 的数据
                    StringBuilder insertSql = new StringBuilder(64);
                    insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth);
                    insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuidList.size(), true)).append(")");
                    getBaseDao().executeUpdate(insertSql.toString(), uuidList.toArray());
                    //删除其他日志数据
                    StringBuilder deleteSql = new StringBuilder(64);
                    deleteSql.append(" delete a from product_sys_data_center_log a ");
                    deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuidList.size(), true)).append(")");
                    getBaseDao().executeUpdate(deleteSql.toString(), uuidList.toArray());
                    if (pageSize * 2 != queryDt.getRows()) {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            SpringMVCContextHolder.getSystemLogger().error("deleteCenterLogV2 error");
            SpringMVCContextHolder.getSystemLogger().error(e);
            e.printStackTrace();
            throw e;
        }
    }
    public void deleteCenterLog() {
        this.deleteCenterLogV2();
    }
    public void deleteCenterLogBak() {
        try {
            //在mysql的information_schema 表中查询指定表是否存在
            String tableName = "da_product_sys_data_center_log";
@@ -73,7 +234,7 @@
            String endTimeStr = DateUtil.format(endTime, "yyyy-MM-dd");
            //根据截止日期查询表中最大、最小的创建时间根据创建时间进行分表按月分表
            String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) )";
            String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) group by date_format(created_utc_datetime,'%Y-%m') ";
            FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
@@ -169,29 +330,67 @@
                    ")) ddd";
            //插入数据根据创建时间的年月分别插入到对应的表中
            for (DateTime dateTime : tableSuffixList) {
                String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
                String selectSql = "select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
                        " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
                getBaseDao().executeUpdate(insertSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")});
                //获取datetime所在月份的第一天0点0分0秒 和最后一天23点59分59秒
                DateTime beginOfMonth = DateUtil.beginOfMonth(dateTime);
                DateTime endOfMonth = DateUtil.endOfMonth(dateTime);
                int currentPage = 1;
                int totalPage;
                do {
                    //分页查询数据
                    DataTableEntity dt = getBaseDao().listTable(selectSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")}, currentPage, 2000);
                    if (DataTableEntity.isEmpty(dt)) {
                        break;
                    }
                    totalPage = dt.getSqle().getTotalpage();
                    //更改dt的表名进行插入
                    dt.getMeta().setTableName(new Object[]{tableName + "_" + DateUtil.format(dateTime, "yyyyMM")});
                    getBaseDao().add(dt);
                } while (totalPage > currentPage);
//                String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
//                        " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
//                getBaseDao().executeUpdate(insertSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")});
//                //删除数据已插入的数据
                StringBuilder deleteSql = new StringBuilder();
                deleteSql.append(" select a.id FROM product_sys_data_center_log a INNER JOIN ");
                deleteSql.append(" product_sys_data_center_log b on a.uuid=b.pre_step_uuid and a.type=1 and b.type=2 ");
                deleteSql.append(" where ");
                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
                deleteSql.append(" and a.type=1 ");
                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
                //先删除采集日志数据
//                deleteSql.append(" DELETE a FROM product_sys_data_center_log a INNER JOIN ");
//                deleteSql.append(" product_sys_data_center_log b on a.uuid=b.pre_step_uuid and a.type=1 and b.type=2 ");
//                deleteSql.append(" where ");
//                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
//                deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
//                deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
//                deleteSql.append(" and a.type=1 ");
//                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
//                //先删除采集日志数据
//                getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
                DataTableEntity dt = getBaseDao().listTable(deleteSql.toString(), new Object[]{});
                if (!DataTableEntity.isEmpty(dt)) {
                    List<Integer> ids = dt.getData().stream().map(row -> row.getInteger("id")).collect(Collectors.toList());
                    getBaseDao().delete("product_sys_data_center_log", BaseUtil.buildQuestionMarkFilter("id", ids.size(), true), ids.toArray());
                }
                //删除数据已插入的数据
                //查询data_center_log表中的创建时间是7天前的数据 且开始时间是dateTime的数据
                StringBuilder querySql = new StringBuilder(64);
                querySql.append("select max(id) max_id,min(id) min_id from product_sys_data_center_log ");
                querySql.append(" where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s')");
                querySql.append("select id from ");
                querySql.append(tableName).append("_").append(DateUtil.format(dateTime, "yyyyMM"));
                querySql.append(" where type=1 and (result=1 or (deal_flag=1 and deal_result=1))");
                deleteSql.setLength(0);
                //再删除提取日志数据
                deleteSql.append(" select a.id FROM product_sys_data_center_log a LEFT JOIN ");
                deleteSql.append(" DELETE a FROM product_sys_data_center_log a LEFT JOIN ");
                deleteSql.append(" product_sys_data_center_log b on b.uuid=a.pre_step_uuid and b.type=1 and a.type=2 ");
                deleteSql.append(" where ");
                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
@@ -200,26 +399,18 @@
                deleteSql.append(" and a.type=2 and b.id is null  ");
                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
//                getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
                dt = getBaseDao().listTable(deleteSql.toString(), new Object[]{});
                if (!DataTableEntity.isEmpty(dt)) {
                    List<Integer> ids = dt.getData().stream().map(row -> row.getInteger("id")).collect(Collectors.toList());
                    getBaseDao().delete("product_sys_data_center_log", BaseUtil.buildQuestionMarkFilter("id", ids.size(), true), ids.toArray());
                }
                getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
                deleteSql.setLength(0);
                //再删除其他日志数据
                deleteSql.append(" select a.id FROM product_sys_data_center_log a  ");
                deleteSql.append(" DELETE FROM product_sys_data_center_log a  ");
                deleteSql.append(" where ");
                deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
                deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
                deleteSql.append(" and a.type>2 ");
                deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
                dt = getBaseDao().listTable(deleteSql.toString(), new Object[]{});
                if (!DataTableEntity.isEmpty(dt)) {
                    List<Integer> ids = dt.getData().stream().map(row -> row.getInteger("id")).collect(Collectors.toList());
                    getBaseDao().delete("product_sys_data_center_log", BaseUtil.buildQuestionMarkFilter("id", ids.size(), true), ids.toArray());
                }
                getBaseDao().executeUpdate(deleteSql.toString());
            }
//            fs = null;
@@ -232,8 +423,6 @@
//            } while (fs != null && fs.getInteger("del_count") != null && fs.getInteger("del_count") > 0);
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error("删除采集提取日志错误!!!:" + e.getMessage());
            SpringMVCContextHolder.getSystemLogger().error(e);
            throw e;
        }
    }
product-server-data-center/src/main/java/com/product/data/center/service/DataCollectService.java
@@ -74,7 +74,7 @@
            Calendar calendar = Calendar.getInstance();
            int hour = calendar.get(Calendar.HOUR_OF_DAY);
            int minute = calendar.get(Calendar.MINUTE);
            if (hour == 1 && (minute >= 10 || minute<=20)){
            if (hour == 1 && (minute >= 10 && minute <= 20)) {
                return;
            }
            Calendar startCalendar = Calendar.getInstance();
product-server-data-center/src/main/java/com/product/data/center/service/DataExtractService.java
@@ -367,7 +367,7 @@
        Calendar calendar = Calendar.getInstance();
        int hour = calendar.get(Calendar.HOUR_OF_DAY);
        int minute = calendar.get(Calendar.MINUTE);
        if (hour == 1 && (minute >= 10 || minute <= 20)) {
        if (hour == 1 && (minute >= 10 && minute <= 20)) {
            return;
        }
        WriteExtractUtil.append("开始提取!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!:::" + uuid);