package com.product.data.center.service;
|
|
import cn.hutool.core.date.DateField;
|
import cn.hutool.core.date.DateTime;
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.LocalDateTimeUtil;
|
import cn.hutool.core.util.ArrayUtil;
|
import cn.hutool.core.util.IdUtil;
|
import com.product.admin.service.PublicService;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.config.CoreConst;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.core.transfer.Transactional;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.service.ide.ICommonService;
|
import com.product.data.center.utils.CallBackReturnValue;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.quartz.service.impl.SysJobService;
|
import com.product.util.BaseUtil;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.Resource;
|
import java.text.DecimalFormat;
|
import java.time.LocalDate;
|
import java.util.*;
|
import java.util.regex.Matcher;
|
import java.util.regex.Pattern;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/7/22 9:43
|
* @Desc
|
*/
|
@Service("dataCommonService")
|
public class CommonService extends AbstractBaseService implements ICommonService {
|
|
@Resource
|
SysJobService sysJobService;
|
|
@Resource
|
PublicService publicService;
|
|
@Resource
|
SyncDelRecordService syncDelRecordService;
|
|
@Resource
|
MesExternalService mesExternalService;
|
|
@Value("${data.system.name}")
|
private String dataSystemName;
|
|
@Transactional
|
public void deleteCenterLogV2() {
|
try {
|
|
StringBuilder querySql = new StringBuilder(64);
|
|
//select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime,min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y-%m') current_month from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) group by date_format(created_utc_datetime,'%Y-%m')
|
querySql.append(" select min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y%m') current_month ");
|
querySql.append(" from product_sys_data_center_log ");
|
querySql.append(" where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) ");
|
querySql.append(" group by date_format(created_utc_datetime,'%Y%m') ");
|
|
DataTableEntity dt = getBaseDao().listTable(querySql.toString(), new Object[]{});
|
|
if (DataTableEntity.isEmpty(dt)) {
|
return;
|
}
|
//在mysql的information_schema 表中查询指定表是否存在
|
String tableName = "da_product_sys_data_center_log";
|
|
//封装所有表名
|
Set<String> allTableName = dt.getData().stream().map(e -> tableName + "_" + e.getString("current_month")).collect(Collectors.toSet());
|
|
//在mysql的information_schema 表中查询所有的表
|
querySql.setLength(0);
|
querySql.append(" select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", allTableName.size(), true));
|
String dataBaseName = getBaseDao().getDataBaseName();
|
Object[] params = new Object[allTableName.size() + 1];
|
params[0] = dataBaseName;
|
int index = 1;
|
for (String tableNameStr : allTableName) {
|
params[index] = tableNameStr;
|
index++;
|
}
|
DataTableEntity dataTableEntity = getBaseDao().listTable(querySql.toString(), params);
|
|
Set<String> existsTable = dataTableEntity.getData().stream().map(e -> e.getString("table_name")).collect(Collectors.toSet());
|
|
//对比表名,创建不存在的表
|
allTableName.removeAll(existsTable);
|
for (String tableNameStr : allTableName) {
|
String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
|
getBaseDao().executeUpdate(createTableSql, new Object[]{});
|
}
|
|
querySql.setLength(0);
|
//查询data_center_log表的uuid字段 created_utc_datetime最新的数据根据 config_uuid 和 type进行分组 且 result = 1 or ( deal_flag = 1 AND deal_result = 1 ) 每组只取最新的一条数据
|
querySql.append(" SELECT uuid,row_num,(select uuid from product_sys_data_center_log where pre_step_uuid = uuid and type=2) as pre_step_uuid ");
|
querySql.append(" FROM ( ");
|
querySql.append(" SELECT ");
|
querySql.append(" uuid, ");
|
querySql.append(" ROW_NUMBER() OVER (PARTITION BY config_uuid, type ORDER BY created_utc_datetime DESC) AS row_num ");
|
querySql.append(" FROM product_sys_data_center_log ");
|
querySql.append(" WHERE result = 1 OR (deal_flag = 1 AND deal_result = 1) ");
|
querySql.append(" ) AS subquery ");
|
querySql.append(" WHERE row_num = 1 ");
|
|
|
//查询需要保留的数据uuid
|
DataTableEntity retainData = getBaseDao().listTable(querySql.toString(), new Object[]{});
|
String[] retainUuids = DataTableEntity.isEmpty(retainData) ? null : new String[retainData.getRows() * 2];
|
if (!DataTableEntity.isEmpty(retainData)) {
|
for (int i = 0; i < retainData.getRows(); i++) {
|
retainUuids[i] = retainData.getString(i, "uuid");
|
retainUuids[i + retainData.getRows()] = retainData.getString(i, "pre_step_uuid");
|
}
|
}
|
|
|
for (int i = 0; i < dt.getRows(); i++) {
|
Integer minId = dt.getInt(i, "min_id");
|
Integer maxId = dt.getInt(i, "max_id");
|
|
String currentMonth = dt.getString(i, "current_month");
|
|
//查询mysql中是否存在表
|
|
querySql.setLength(0);
|
int pageSize = 500;
|
//分页查询采集日志数据7天前的最大和最小id
|
querySql.append(" select a.uuid as log_collect_uuid,b.uuid as log_extract_uuid from product_sys_data_center_log a");
|
querySql.append(" join product_sys_data_center_log b ");
|
querySql.append(" on a.uuid=b.pre_step_uuid ");
|
querySql.append(" where a.id >= ? and a.id <= ? and a.type=1 and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) and a.type=1 and b.type=2 limit ").append(pageSize);
|
|
while (true) {
|
DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId});
|
if (DataTableEntity.isEmpty(queryDt)) {
|
break;
|
}
|
|
//将查询出的log_collect_uuid和log_extract_uuid分别存入两个list中
|
List<String> logCollectUuidList = new ArrayList<>();
|
List<String> logExtractUuidList = new ArrayList<>();
|
for (int j = 0; j < queryDt.getRows(); j++) {
|
logCollectUuidList.add(queryDt.getString(j, "log_collect_uuid"));
|
logExtractUuidList.add(queryDt.getString(j, "log_extract_uuid"));
|
}
|
//根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in logCollectUuidList 和 logExtractUuidList 的数据
|
StringBuilder insertSql = new StringBuilder(64);
|
insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth);
|
|
//将logCollectUuidList和logExtractUuidList合并为一个数组且数组中如果有rangeUuids中的数据则排除
|
Set<String> uuidSet = new HashSet<>();
|
uuidSet.addAll(logCollectUuidList);
|
uuidSet.addAll(logExtractUuidList);
|
uuidSet.removeAll(Arrays.asList(retainUuids));
|
|
String[] uuids = uuidSet.toArray(new String[0]);
|
|
|
insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuids.length, true)).append(")");
|
//已经在表中的数据进行排除
|
insertSql.append(" and uuid not in (select uuid from ").append(tableName).append("_").append(currentMonth).append(" )");
|
getBaseDao().executeUpdate(insertSql.toString(), uuids);
|
|
//删除采集、提取日志数据
|
StringBuilder deleteSql = new StringBuilder(64);
|
deleteSql.append(" delete a from product_sys_data_center_log a ");
|
deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuids.length, true)).append(")");
|
getBaseDao().executeUpdate(deleteSql.toString(), uuids);
|
if (pageSize != queryDt.getRows()) {
|
break;
|
}
|
}
|
|
//分页删除type>2的数据
|
querySql.setLength(0);
|
querySql.append(" select uuid,id from product_sys_data_center_log a");
|
querySql.append(" where id >= ? and id <= ? and type>2 and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) limit ").append(pageSize * 2);
|
while (true) {
|
DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId});
|
if (DataTableEntity.isEmpty(queryDt)) {
|
break;
|
}
|
//将查询出的uuid存入list中
|
List<String> uuidList = queryDt.getData().stream().map(e -> e.getString("uuid")).collect(Collectors.toList());
|
//排除rangeUuids中的数据
|
uuidList.removeAll(Arrays.asList(retainUuids));
|
if (uuidList.isEmpty()) {
|
if (queryDt.getRows() < pageSize * 2) {
|
break;
|
} else {
|
minId = queryDt.getData().stream().min(Comparator.comparingInt(e -> e.getInteger("id"))).get().getInteger("id");
|
}
|
}
|
//根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in uuidList 的数据
|
StringBuilder insertSql = new StringBuilder(64);
|
insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth);
|
insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuidList.size(), true)).append(")");
|
//已经在表中的数据进行排除
|
insertSql.append(" and uuid not in (select uuid from ").append(tableName).append("_").append(currentMonth).append(" )");
|
getBaseDao().executeUpdate(insertSql.toString(), uuidList.toArray());
|
//删除其他日志数据
|
StringBuilder deleteSql = new StringBuilder(64);
|
deleteSql.append(" delete a from product_sys_data_center_log a ");
|
deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuidList.size(), true)).append(")");
|
getBaseDao().executeUpdate(deleteSql.toString(), uuidList.toArray());
|
if (pageSize * 2 != queryDt.getRows()) {
|
break;
|
}
|
}
|
}
|
|
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error("deleteCenterLogV2 error");
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
e.printStackTrace();
|
throw e;
|
}
|
}
|
|
public void deleteCenterLog() {
|
this.deleteCenterLogV2();
|
}
|
|
public void deleteCenterLogBak() {
|
try {
|
//在mysql的information_schema 表中查询指定表是否存在
|
String tableName = "da_product_sys_data_center_log";
|
|
//归档7天前的数据
|
//截止日期
|
Date endTime = DateUtil.offset(new Date(), DateField.DAY_OF_YEAR, -7);
|
|
//格式化日期
|
String endTimeStr = DateUtil.format(endTime, "yyyy-MM-dd");
|
|
//根据截止日期查询表中最大、最小的创建时间根据创建时间进行分表按月分表
|
String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime <now() + interval -7 day and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) group by date_format(created_utc_datetime,'%Y-%m') ";
|
|
FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
|
|
if (fs == null) {
|
return;
|
}
|
//最小时间
|
Date minTime = fs.getDate("minTime");
|
|
//最大时间
|
Date maxTime = fs.getDate("maxTime");
|
|
//根据最小时间和最大时间获取区间内(包含最小时间和最大时间)的年月日期格式为yyyy-MM
|
|
List<DateTime> tableSuffixList = DateUtil.rangeToList(minTime, maxTime, DateField.MONTH);
|
|
//查询数据库中是否存在表
|
|
Set<String> tableNameSet = tableSuffixList.stream().map(e -> tableName + "_" + DateUtil.format(e, "yyyyMM")).collect(Collectors.toSet());
|
|
//查询数据库中存在的表
|
String dataBaseName = getBaseDao().getDataBaseName();
|
sql = "select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", tableNameSet.size(), true);
|
|
Object[] params = new Object[tableNameSet.size() + 1];
|
params[0] = dataBaseName;
|
int index = 1;
|
for (String tableNameStr : tableNameSet) {
|
params[index] = tableNameStr;
|
index++;
|
}
|
DataTableEntity dataTableEntity = getBaseDao().listTable(sql, params);
|
|
if (!DataTableEntity.isEmpty(dataTableEntity)) {
|
for (int i = 0; i < dataTableEntity.getRows(); i++) {
|
String tableNameStr = dataTableEntity.getString(i, "table_name");
|
tableNameSet.remove(tableNameStr);
|
}
|
}
|
//创建不存在的表,表结构与原表一致
|
for (String tableNameStr : tableNameSet) {
|
String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log";
|
getBaseDao().executeUpdate(createTableSql, new Object[]{});
|
}
|
//查询各类型配置最后一次成功的数据
|
String sqlFilter = "SELECT * FROM (SELECT uuid\n" +
|
"FROM product_sys_data_center_log\n" +
|
"WHERE uuid IN (\n" +
|
"\tSELECT uuid\n" +
|
"\tFROM product_sys_data_center_log\n" +
|
"\tWHERE id IN (\n" +
|
"\t\tSELECT max(a.id)\n" +
|
"\t\tFROM product_sys_data_center_log a\n" +
|
"\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
|
"\t\tWHERE a.type = 1\n" +
|
"\t\t\tAND b.type = 2\n" +
|
"\t\t\tAND (a.result = 1\n" +
|
"\t\t\t\tOR (a.deal_flag = 1\n" +
|
"\t\t\t\t\tAND a.deal_result = 1))\n" +
|
"\t\t\tAND (b.result = 1\n" +
|
"\t\t\t\tOR (b.deal_flag = 1\n" +
|
"\t\t\t\t\tAND b.deal_result = 1))\n" +
|
"\t\tGROUP BY a.config_uuid\n" +
|
"\t)\n" +
|
"\tUNION ALL\n" +
|
"\tSELECT aa.uuid\n" +
|
"\tFROM product_sys_data_center_log aa\n" +
|
"\t\tJOIN product_sys_data_center_log bb ON aa.pre_step_uuid = bb.uuid\n" +
|
"\tWHERE aa.type = 2\n" +
|
"\t\tAND bb.type = 1\n" +
|
"\t\tAND bb.id IN (\n" +
|
"\t\t\tSELECT max(a.id)\n" +
|
"\t\t\tFROM product_sys_data_center_log a\n" +
|
"\t\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" +
|
"\t\t\tWHERE a.type = 1\n" +
|
"\t\t\t\tAND b.type = 2\n" +
|
"\t\t\t\tAND (a.result = 1\n" +
|
"\t\t\t\t\tOR (a.deal_flag = 1\n" +
|
"\t\t\t\t\t\tAND a.deal_result = 1))\n" +
|
"\t\t\t\tAND (b.result = 1\n" +
|
"\t\t\t\t\tOR (b.deal_flag = 1\n" +
|
"\t\t\t\t\t\tAND b.deal_result = 1))\n" +
|
"\t\t\tGROUP BY a.config_uuid\n" +
|
"\t\t)\n" +
|
"\tUNION ALL\n" +
|
"\tSELECT uuid\n" +
|
"\tFROM product_sys_data_center_log a\n" +
|
"\tWHERE a.type = 5\n" +
|
"\t\tAND detail != 3\n" +
|
"\t\tAND (a.result = 1\n" +
|
"\t\t\tOR (a.deal_flag = 1\n" +
|
"\t\t\t\tAND a.deal_result = 1))\n" +
|
")) ddd";
|
//插入数据根据创建时间的年月分别插入到对应的表中
|
for (DateTime dateTime : tableSuffixList) {
|
|
String selectSql = "select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
|
" and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
|
|
|
//获取datetime所在月份的第一天0点0分0秒 和最后一天23点59分59秒
|
DateTime beginOfMonth = DateUtil.beginOfMonth(dateTime);
|
DateTime endOfMonth = DateUtil.endOfMonth(dateTime);
|
|
|
int currentPage = 1;
|
|
int totalPage;
|
do {
|
//分页查询数据
|
DataTableEntity dt = getBaseDao().listTable(selectSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")}, currentPage, 2000);
|
if (DataTableEntity.isEmpty(dt)) {
|
break;
|
}
|
totalPage = dt.getSqle().getTotalpage();
|
//更改dt的表名进行插入
|
dt.getMeta().setTableName(new Object[]{tableName + "_" + DateUtil.format(dateTime, "yyyyMM")});
|
getBaseDao().add(dt);
|
} while (totalPage > currentPage);
|
// String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" +
|
// " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime <now() + interval -7 day";
|
// getBaseDao().executeUpdate(insertSql, new Object[]{DateUtil.format(dateTime, "yyyy-MM"), DateUtil.format(dateTime, "yyyy-MM")});
|
// //删除数据已插入的数据
|
StringBuilder deleteSql = new StringBuilder();
|
// deleteSql.append(" DELETE a FROM product_sys_data_center_log a INNER JOIN ");
|
// deleteSql.append(" product_sys_data_center_log b on a.uuid=b.pre_step_uuid and a.type=1 and b.type=2 ");
|
// deleteSql.append(" where ");
|
// deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
|
// deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
|
// deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
|
// deleteSql.append(" and a.type=1 ");
|
// deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
|
// //先删除采集日志数据
|
// getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
|
|
|
//删除数据已插入的数据
|
|
//查询data_center_log表中的创建时间是7天前的数据 且开始时间是dateTime的数据
|
|
|
StringBuilder querySql = new StringBuilder(64);
|
|
|
querySql.append("select max(id) max_id,min(id) min_id from product_sys_data_center_log ");
|
querySql.append(" where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s')");
|
|
|
querySql.append("select id from ");
|
querySql.append(tableName).append("_").append(DateUtil.format(dateTime, "yyyyMM"));
|
querySql.append(" where type=1 and (result=1 or (deal_flag=1 and deal_result=1))");
|
|
|
deleteSql.setLength(0);
|
//再删除提取日志数据
|
deleteSql.append(" DELETE a FROM product_sys_data_center_log a LEFT JOIN ");
|
deleteSql.append(" product_sys_data_center_log b on b.uuid=a.pre_step_uuid and b.type=1 and a.type=2 ");
|
deleteSql.append(" where ");
|
deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
|
deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
|
deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
|
deleteSql.append(" and a.type=2 and b.id is null ");
|
deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
|
|
getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{});
|
|
deleteSql.setLength(0);
|
//再删除其他日志数据
|
deleteSql.append(" DELETE FROM product_sys_data_center_log a ");
|
deleteSql.append(" where ");
|
deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
|
deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) ");
|
deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) ");
|
deleteSql.append(" and a.type>2 ");
|
deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") ");
|
getBaseDao().executeUpdate(deleteSql.toString());
|
}
|
|
// fs = null;
|
// do {
|
// this.getBaseDao().executeUpdate("DELETE FROM product_sys_data_center_log WHERE uuid in (select * from (select * from product_sys_data_center_log_del_v limit 10000) a ) ");
|
// //检查是否还有数据
|
// sql = "select 1 as del_count from product_sys_data_center_log_del_v limit 1";
|
// //检查是否还有数据
|
// fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false);
|
// } while (fs != null && fs.getInteger("del_count") != null && fs.getInteger("del_count") > 0);
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw e;
|
}
|
}
|
|
/**
|
* 创建定时任务
|
*
|
* @param taskName 任务名称
|
* @param isUsed 是否启用
|
* @param corn 执行时间
|
* @param executeExpression 执行表达式(bean.method)
|
*/
|
private String createTimeTask(String timeTaskUuid, String taskName, boolean isUsed, String corn, String executeExpression) {
|
FieldSetEntity fse = new FieldSetEntity();
|
try {
|
fse.setTableName("product_sys_timed_task");
|
fse.setValue("uuid", timeTaskUuid);
|
fse.setValue("job_name", taskName);//任务名称
|
fse.setValue("job_group", "system");//分组
|
fse.setValue("invoke_target", executeExpression);//调用目标字符串
|
fse.setValue("cron_expression", corn);//cron表达式
|
fse.setValue("misfire_policy", "3");//错误执行策略 只执行一次
|
fse.setValue("concurrent", 0);//不允许并发执行
|
fse.setValue("remark", taskName);
|
fse.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id());
|
fse.setValue("created_utc_datetime", new Date());
|
fse.setValue("status", isUsed ? 1 : 0);
|
fse.setValue("is_conceal", 1);
|
|
if (!StringUtils.isEmpty(timeTaskUuid)) {
|
BaseUtil.createCreatorAndCreationTime(fse);
|
sysJobService.updateJob(fse);
|
} else {
|
sysJobService.insertJob(fse);
|
}
|
return fse.getUUID();
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw new BaseException(ErrorCode.CRATED_TIMED_TASK_FAIL);
|
}
|
}
|
|
/**
|
* 通用保存
|
*
|
* @param fse
|
* @param t
|
*/
|
private void commonSave(FieldSetEntity fse, CallBackReturnValue<String, FieldSetEntity> t) {
|
boolean isAdd = false;
|
BaseUtil.createCreatorAndCreationTime(fse);
|
if (StringUtils.isEmpty(fse.getUUID())) {
|
fse.setValue(CmnConst.UUID, IdUtil.randomUUID());
|
isAdd = true;
|
} else if (!StringUtils.isEmpty(fse.getUUID()) && "add".equals(fse.getString(CoreConst.SYSTEM_DATA_OPERATE_TYPE))) {
|
isAdd = true;
|
}
|
String timeTasksUid = t.invokeMethod(fse);
|
fse.setValue(CmnConst.TIME_TASK_UUID, timeTasksUid);
|
if (isAdd) {
|
getBaseDao().add(fse);
|
} else {
|
getBaseDao().update(fse);
|
}
|
}
|
|
|
/**
|
* 保存MES数据同步配置
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
@Transactional
|
@Override
|
public String saveMesSyncDataConfig(FieldSetEntity fse) throws BaseException {
|
DataTableEntity subDt = fse.getSubDataTable("product_sys_data_sync_mes_sub");
|
//固定uuid
|
fse.setValue("uuid", 1);
|
DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "del_type=1");
|
DataTableEntity subDataTable = null;
|
if (!DataTableEntity.isEmpty(subDt)) {
|
subDataTable = subDt.clones();
|
int b = 0;
|
for (int i = 0; i < subDt.getRows(); i++) {
|
FieldSetEntity subFse = subDt.getFieldSetEntity(i);
|
String dataType = subFse.getString("~type~");
|
String uuid = subFse.getString(CmnConst.UUID);
|
if ("del".equals(dataType)) {
|
String timeTaskUid = subFse.getString(CmnConst.TIME_TASK_UUID);
|
FieldSetEntity task = getBaseDao().getFieldSetEntity("product_sys_timed_task", timeTaskUid, false);
|
if (task != null) {
|
cancelTimeTask(task);
|
}
|
subDataTable.setFieldValue(b, "config_uuid", subFse.getUUID());
|
} else {
|
subFse.setValue("master_uuid", fse.getUUID());
|
commonSave(subFse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "SYNC:" + fs.getString("table_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execution_time"), "dataSyncService.runTimeTask('" + fs.getUUID() + "')"));
|
subDt.removeFieldSetEntity(i);
|
subDataTable.setFieldValue(b, "uuid", subFse.getUUID());
|
i--;
|
}
|
b++;
|
}
|
if (!DataTableEntity.isEmpty(subDt)) {
|
Object[] objects = subDt.getData().stream().map(item -> item.getUUID()).toArray();
|
getBaseDao().delete("product_sys_data_sync_mes_sub", objects);
|
}
|
}
|
fse.removeSubData("product_sys_data_sync_mes_sub");
|
getBaseDao().saveFieldSetEntity(fse);
|
//创建触发器
|
// //1 主库删除后同步删除子库
|
syncDelRecordService.createTrigger(subDataTable, fse.getString("data_source"), fse.getString("target_data_source"));
|
return fse.getUUID();
|
}
|
|
/**
|
* 取消定时任务
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
private void cancelTimeTask(FieldSetEntity fse) throws BaseException {
|
try {
|
this.sysJobService.deleteJob(fse);
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw new BaseException(ErrorCode.CANCEL_TIME_TASK_FAIL);
|
}
|
}
|
|
/**
|
* 采集配置保存
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
@Transactional
|
public synchronized String saveCollectConfig(FieldSetEntity fse) throws BaseException {
|
boolean isAdd = false;
|
if (StringUtils.isEmpty(fse.getUUID())) {
|
if (StringUtils.isEmpty(dataSystemName)) {
|
//数据来源系统名称不能为空
|
throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY);
|
}
|
Pattern r = Pattern.compile("^[A-Za-z-]+$");
|
Matcher m = r.matcher(dataSystemName);
|
if (!m.matches()) {
|
//数据来源系统名称格式不符
|
throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_FORMAT_UNQUALIFIED);
|
}
|
//新增
|
FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL("SELECT\n" + "\t ifnull(MAX( id ),0)+1 id " + "FROM\n" + "\t( SELECT CONVERT(REPLACE ( id, ?, '' ),UNSIGNED int) id FROM product_sys_data_collect WHERE id LIKE concat(?,'%') ) a", new Object[]{dataSystemName, dataSystemName}, false);
|
if (!FieldSetEntity.isEmpty(fs) && !StringUtils.isEmpty(fs.getString("id"))) {
|
String id = fs.getString("id");
|
DecimalFormat decimalFormat = new DecimalFormat("000000");
|
id = decimalFormat.format(Integer.valueOf(id));
|
fse.setValue("id", dataSystemName + id);
|
} else {
|
throw new BaseException(ErrorCode.UNIQUE_COLLECT_CREATE_FAIL);
|
}
|
isAdd = true;
|
}
|
|
commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DC:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency"), "dataCollectService.dataCollect('" + fs.getUUID() + "')"));
|
saveDelRecordConfig(1, fse.getUUID());
|
if (!"ch-kt".equals(dataSystemName)) {
|
//子库保存后发送到主服务
|
fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
|
mesExternalService.remoteSaveCollectConfig(fse);
|
}
|
return fse.getUUID();
|
}
|
|
/**
|
* 采集配置详情
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
public FieldSetEntity findCollectConfig(FieldSetEntity fse) throws BaseException {
|
return publicService.getFieldSetEntity(fse, false);
|
}
|
|
/**
|
* 采集配置删除
|
*
|
* @param fse
|
*/
|
@Override
|
@Transactional
|
public void delCollectConfig(FieldSetEntity fse) throws BaseException {
|
String uuids = fse.getUUID();
|
String[] split = uuids.split(",");
|
DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM product_sys_data_collect WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
|
if (!DataTableEntity.isEmpty(dt)) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
cancelTimeTask(dt.getFieldSetEntity(i));
|
}
|
}
|
for (String s : split) {
|
cancelDelRecordConfig(1, s);
|
}
|
publicService.delete(fse);
|
}
|
|
/**
|
* 提取配置保存
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
@Transactional
|
public String saveExtractConfig(FieldSetEntity fse) throws BaseException {
|
boolean isAdd = !StringUtils.isEmpty(fse.getUUID());
|
commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DE:" + fs.getString("extract_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("extract_time"), "dataExtractService.startExtractData('" + fs.getUUID() + "')"));
|
saveDelRecordConfig(2, fse.getUUID());
|
if ("ch-kt".equals(dataSystemName)) {
|
//主服务保存后分发到子服务中
|
fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
|
mesExternalService.remoteSaveExtractConfig(fse);
|
}
|
return fse.getUUID();
|
}
|
|
/**
|
* 提取配置查询
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
public FieldSetEntity findExtractConfig(FieldSetEntity fse) throws BaseException {
|
return publicService.getFieldSetEntity(fse, true);
|
}
|
|
/**
|
* 提取配置删除
|
*
|
* @param fse
|
*/
|
@Override
|
@Transactional
|
public void delExtractConfig(FieldSetEntity fse) throws BaseException {
|
String uuids = fse.getUUID();
|
String[] split = uuids.split(",");
|
DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM product_sys_data_extract_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
|
if (!DataTableEntity.isEmpty(dt)) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
cancelTimeTask(dt.getFieldSetEntity(i));
|
}
|
}
|
for (String s : split) {
|
cancelDelRecordConfig(2, s);
|
}
|
publicService.delete(fse);
|
}
|
|
/**
|
* 归档配置保存
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
@Transactional
|
public String saveArchivingConfig(FieldSetEntity fse) throws BaseException {
|
String sourceDataSource = fse.getString("source_data_source");
|
String targetDataSource = fse.getString("target_data_source");
|
String[] dataSource = (sourceDataSource + "," + targetDataSource).split(",");
|
StringBuilder sql = new StringBuilder();
|
sql.append("SELECT db_type num FROM product_sys_data_sync_manager WHERE ");
|
sql.append(BaseUtil.buildQuestionMarkFilter("uuid", dataSource.length, true));
|
sql.append(" GROUP BY db_type ");
|
DataTableEntity dt = getBaseDao().listTable(sql.toString(), dataSource);
|
if (DataTableEntity.isEmpty(dt)) {
|
throw new BaseException(ErrorCode.VERIFY_DATA_SOURCE_TYPE_FAIL);
|
}
|
/* 2022年10月10日 15:33:47 6c 需要能够支持不同数据库间的归档
|
if (dt.getRows() != 1) {
|
throw new BaseException(ErrorCode.DATA_SOURCE_INCONFORMITY);
|
}*/
|
if (StringUtils.equalsAny(dt.getString(0, "num"), "0", "1", "5")) {
|
boolean isAdd = !StringUtils.isEmpty(fse.getUUID());
|
commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DA:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execute_time"), "dataArchivingService.dataArchivingEntry('" + fs.getUUID() + "')"));
|
if ("ch-kt".equals(dataSystemName)) {
|
//主服务保存后分发到子服务中
|
fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update");
|
mesExternalService.remoteSaveArchiveConfig(fse);
|
}
|
}
|
|
return fse.getUUID();
|
}
|
|
/**
|
* 归档配置详情
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
public FieldSetEntity findArchivingConfig(FieldSetEntity fse) throws BaseException {
|
return publicService.getFieldSetEntity(fse, false);
|
}
|
|
/**
|
* 归档配置删除
|
*
|
* @param fse
|
*/
|
@Override
|
@Transactional
|
public void delArchivingConfig(FieldSetEntity fse) throws BaseException {
|
String uuids = fse.getUUID();
|
String[] split = uuids.split(",");
|
DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM product_sys_data_archiving_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")");
|
if (!DataTableEntity.isEmpty(dt)) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
cancelTimeTask(dt.getFieldSetEntity(i));
|
}
|
}
|
publicService.delete(fse);
|
}
|
|
/**
|
* 订单数据验证配置保存
|
*
|
* @param fse
|
* @return
|
*/
|
@Transactional
|
@Override
|
public String saveOrderDataVerification(FieldSetEntity fse) {
|
fse.setValue(CmnConst.UUID, "1");
|
commonSave(fse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "订单数据同步验证", fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency_verification"), "orderDataValidationService.verificationEntryPoint"));
|
return fse.getUUID();
|
}
|
|
/**
|
* 保存删除记录配置
|
*
|
* @param type
|
* @param uuid
|
*/
|
public void saveDelRecordConfig(int type, String uuid) {
|
String sql = getDelRecordSql(type);
|
|
DataTableEntity dt = getBaseDao().listTable(sql, new Object[]{uuid, dataSystemName});
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fse = dt.getFieldSetEntity(i);
|
fse.setTableName(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG);
|
fse.setValue("delete_sync_content", 1);
|
syncDelRecordService.createTrigger(fse);
|
}
|
|
}
|
|
private String getDelRecordSql(int type) {
|
StringBuffer sql = new StringBuffer();
|
sql.append("\n SELECT ");
|
sql.append("\n c.uuid, ");
|
sql.append("\n c.random_suffix,");
|
sql.append("\n collect.uuid config_uuid, ");
|
sql.append("\n collect.data_source, ");
|
sql.append("\n extract.extract_target_source target_data_source, ");
|
sql.append("\n collect.auto_field master_key_field, ");
|
sql.append("\n collect.source_table table_name, ");
|
sql.append("\n collect.id collect_id,2 del_type, ");
|
sql.append("\n extract.collect_source_field collect_field,");
|
sql.append("\n extract.extract_unique_field pre_master_field");
|
sql.append("\n FROM ");
|
sql.append("\n product_sys_data_collect collect ");
|
sql.append("\n JOIN product_sys_data_extract_config extract ON upper( case when length(collect.target_table)>0 then collect.target_table else collect.source_table end ) = upper( extract.extract_source_table ) ");
|
sql.append("\n LEFT JOIN product_mes_del_record_config c ON collect.uuid=c.config_uuid and c.del_type=2 ");
|
sql.append(" WHERE ");
|
if (1 == type) {
|
//采集
|
sql.append("\n collect.uuid=? ");
|
} else {
|
//提取
|
sql.append("\n extract.uuid=? ");
|
}
|
sql.append(" and collect.id like concat(?,'%')");
|
return sql.toString();
|
}
|
|
public void cancelDelRecordConfig(int type, String uuid) {
|
FieldSetEntity fieldSetEntity = new FieldSetEntity();
|
fieldSetEntity.setTableName("temp");
|
if (type == 1) {
|
fieldSetEntity.setValue("uuid", uuid);
|
syncDelRecordService.deleteTrigger(fieldSetEntity);
|
} else {
|
//提取
|
DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "UPPER(source_table) in (select upper(extract_source_table) from product_sys_data_extract_config where uuid =?) ", new Object[]{uuid});
|
if (DataTableEntity.isEmpty(dt)) {
|
return;
|
}
|
Object[] uuids = dt.getUuids();
|
for (Object o : uuids) {
|
fieldSetEntity.setValue("uuid", o);
|
syncDelRecordService.deleteTrigger(fieldSetEntity);
|
}
|
}
|
|
}
|
}
|