package com.product.data.center.service; import cn.hutool.core.date.DateField; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.IdUtil; import com.product.admin.service.PublicService; import com.product.common.lang.StringUtils; import com.product.core.config.CoreConst; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.core.transfer.Transactional; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.service.ide.ICommonService; import com.product.data.center.utils.CallBackReturnValue; import com.product.datasource.entity.DataBaseEntity; import com.product.quartz.service.impl.SysJobService; import com.product.util.BaseUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.text.DecimalFormat; import java.time.LocalDate; import java.util.*; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; /** * @Author cheng * @Date 2022/7/22 9:43 * @Desc */ @Service("dataCommonService") public class CommonService extends AbstractBaseService implements ICommonService { @Resource SysJobService sysJobService; @Resource PublicService publicService; @Resource SyncDelRecordService syncDelRecordService; @Resource MesExternalService mesExternalService; @Value("${data.system.name}") private String dataSystemName; @Transactional public void deleteCenterLogV2() { try { StringBuilder querySql = new StringBuilder(64); //select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime,min(id) min_id,max(id) max_id, date_format(created_utc_datetime,'%Y-%m') current_month from product_sys_data_center_log where created_utc_datetime allTableName = dt.getData().stream().map(e -> tableName + "_" + e.getString("current_month")).collect(Collectors.toSet()); //在mysql的information_schema 表中查询所有的表 querySql.setLength(0); querySql.append(" select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", allTableName.size(), true)); String dataBaseName = getBaseDao().getDataBaseName(); Object[] params = new Object[allTableName.size() + 1]; params[0] = dataBaseName; int index = 1; for (String tableNameStr : allTableName) { params[index] = tableNameStr; index++; } DataTableEntity dataTableEntity = getBaseDao().listTable(querySql.toString(), params); Set existsTable = dataTableEntity.getData().stream().map(e -> e.getString("table_name")).collect(Collectors.toSet()); //对比表名,创建不存在的表 allTableName.removeAll(existsTable); for (String tableNameStr : allTableName) { String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log"; getBaseDao().executeUpdate(createTableSql, new Object[]{}); } querySql.setLength(0); //查询data_center_log表的uuid字段 created_utc_datetime最新的数据根据 config_uuid 和 type进行分组 且 result = 1 or ( deal_flag = 1 AND deal_result = 1 ) 每组只取最新的一条数据 querySql.append(" SELECT uuid,row_num,(select uuid from product_sys_data_center_log where pre_step_uuid = uuid and type=2) as pre_step_uuid "); querySql.append(" FROM ( "); querySql.append(" SELECT "); querySql.append(" uuid, "); querySql.append(" ROW_NUMBER() OVER (PARTITION BY config_uuid, type ORDER BY created_utc_datetime DESC) AS row_num "); querySql.append(" FROM product_sys_data_center_log "); querySql.append(" WHERE result = 1 OR (deal_flag = 1 AND deal_result = 1) "); querySql.append(" ) AS subquery "); querySql.append(" WHERE row_num = 1 "); //查询需要保留的数据uuid DataTableEntity retainData = getBaseDao().listTable(querySql.toString(), new Object[]{}); String[] retainUuids = DataTableEntity.isEmpty(retainData) ? null : new String[retainData.getRows() * 2]; if (!DataTableEntity.isEmpty(retainData)) { for (int i = 0; i < retainData.getRows(); i++) { retainUuids[i] = retainData.getString(i, "uuid"); retainUuids[i + retainData.getRows()] = retainData.getString(i, "pre_step_uuid"); } } for (int i = 0; i < dt.getRows(); i++) { Integer minId = dt.getInt(i, "min_id"); Integer maxId = dt.getInt(i, "max_id"); String currentMonth = dt.getString(i, "current_month"); //查询mysql中是否存在表 querySql.setLength(0); int pageSize = 500; //分页查询采集日志数据7天前的最大和最小id querySql.append(" select a.uuid as log_collect_uuid,b.uuid as log_extract_uuid from product_sys_data_center_log a"); querySql.append(" join product_sys_data_center_log b "); querySql.append(" on a.uuid=b.pre_step_uuid "); querySql.append(" where a.id >= ? and a.id <= ? and a.type=1 and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) and a.type=1 and b.type=2 limit ").append(pageSize); while (true) { DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId}); if (DataTableEntity.isEmpty(queryDt)) { break; } //将查询出的log_collect_uuid和log_extract_uuid分别存入两个list中 List logCollectUuidList = new ArrayList<>(); List logExtractUuidList = new ArrayList<>(); for (int j = 0; j < queryDt.getRows(); j++) { logCollectUuidList.add(queryDt.getString(j, "log_collect_uuid")); logExtractUuidList.add(queryDt.getString(j, "log_extract_uuid")); } //根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in logCollectUuidList 和 logExtractUuidList 的数据 StringBuilder insertSql = new StringBuilder(64); insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth); //将logCollectUuidList和logExtractUuidList合并为一个数组且数组中如果有rangeUuids中的数据则排除 Set uuidSet = new HashSet<>(); uuidSet.addAll(logCollectUuidList); uuidSet.addAll(logExtractUuidList); uuidSet.removeAll(Arrays.asList(retainUuids)); String[] uuids = uuidSet.toArray(new String[0]); insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuids.length, true)).append(")"); //已经在表中的数据进行排除 insertSql.append(" and uuid not in (select uuid from ").append(tableName).append("_").append(currentMonth).append(" )"); getBaseDao().executeUpdate(insertSql.toString(), uuids); //删除采集、提取日志数据 StringBuilder deleteSql = new StringBuilder(64); deleteSql.append(" delete a from product_sys_data_center_log a "); deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuids.length, true)).append(")"); getBaseDao().executeUpdate(deleteSql.toString(), uuids); if (pageSize != queryDt.getRows()) { break; } } //分页删除type>2的数据 querySql.setLength(0); querySql.append(" select uuid,id from product_sys_data_center_log a"); querySql.append(" where id >= ? and id <= ? and type>2 and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) limit ").append(pageSize * 2); while (true) { DataTableEntity queryDt = getBaseDao().listTable(querySql.toString(), new Object[]{minId, maxId}); if (DataTableEntity.isEmpty(queryDt)) { break; } //将查询出的uuid存入list中 List uuidList = queryDt.getData().stream().map(e -> e.getString("uuid")).collect(Collectors.toList()); //排除rangeUuids中的数据 uuidList.removeAll(Arrays.asList(retainUuids)); if (uuidList.isEmpty()) { if (queryDt.getRows() < pageSize * 2) { break; } else { minId = queryDt.getData().stream().min(Comparator.comparingInt(e -> e.getInteger("id"))).get().getInteger("id"); } } //根据查询出的uuid将数据插入到对应的表中,使用insert into select的方式 条件是 uuid in uuidList 的数据 StringBuilder insertSql = new StringBuilder(64); insertSql.append(" insert into ").append(tableName).append("_").append(currentMonth); insertSql.append(" select * from product_sys_data_center_log").append(" where (").append(BaseUtil.buildQuestionMarkFilter("uuid", uuidList.size(), true)).append(")"); //已经在表中的数据进行排除 insertSql.append(" and uuid not in (select uuid from ").append(tableName).append("_").append(currentMonth).append(" )"); getBaseDao().executeUpdate(insertSql.toString(), uuidList.toArray()); //删除其他日志数据 StringBuilder deleteSql = new StringBuilder(64); deleteSql.append(" delete a from product_sys_data_center_log a "); deleteSql.append(" where (").append(BaseUtil.buildQuestionMarkFilter("a.uuid", uuidList.size(), true)).append(")"); getBaseDao().executeUpdate(deleteSql.toString(), uuidList.toArray()); if (pageSize * 2 != queryDt.getRows()) { break; } } } } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error("deleteCenterLogV2 error"); SpringMVCContextHolder.getSystemLogger().error(e); e.printStackTrace(); throw e; } } public void deleteCenterLog() { this.deleteCenterLogV2(); } public void deleteCenterLogBak() { try { //在mysql的information_schema 表中查询指定表是否存在 String tableName = "da_product_sys_data_center_log"; //归档7天前的数据 //截止日期 Date endTime = DateUtil.offset(new Date(), DateField.DAY_OF_YEAR, -7); //格式化日期 String endTimeStr = DateUtil.format(endTime, "yyyy-MM-dd"); //根据截止日期查询表中最大、最小的创建时间根据创建时间进行分表按月分表 String sql = "select min(created_utc_datetime) as minTime,max(created_utc_datetime) as maxTime from product_sys_data_center_log where created_utc_datetime tableSuffixList = DateUtil.rangeToList(minTime, maxTime, DateField.MONTH); //查询数据库中是否存在表 Set tableNameSet = tableSuffixList.stream().map(e -> tableName + "_" + DateUtil.format(e, "yyyyMM")).collect(Collectors.toSet()); //查询数据库中存在的表 String dataBaseName = getBaseDao().getDataBaseName(); sql = "select TABLE_NAME table_name from information_schema.tables where table_schema = ? and " + BaseUtil.buildQuestionMarkFilter("TABLE_NAME", tableNameSet.size(), true); Object[] params = new Object[tableNameSet.size() + 1]; params[0] = dataBaseName; int index = 1; for (String tableNameStr : tableNameSet) { params[index] = tableNameStr; index++; } DataTableEntity dataTableEntity = getBaseDao().listTable(sql, params); if (!DataTableEntity.isEmpty(dataTableEntity)) { for (int i = 0; i < dataTableEntity.getRows(); i++) { String tableNameStr = dataTableEntity.getString(i, "table_name"); tableNameSet.remove(tableNameStr); } } //创建不存在的表,表结构与原表一致 for (String tableNameStr : tableNameSet) { String createTableSql = "create table " + tableNameStr + " like product_sys_data_center_log"; getBaseDao().executeUpdate(createTableSql, new Object[]{}); } //查询各类型配置最后一次成功的数据 String sqlFilter = "SELECT * FROM (SELECT uuid\n" + "FROM product_sys_data_center_log\n" + "WHERE uuid IN (\n" + "\tSELECT uuid\n" + "\tFROM product_sys_data_center_log\n" + "\tWHERE id IN (\n" + "\t\tSELECT max(a.id)\n" + "\t\tFROM product_sys_data_center_log a\n" + "\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" + "\t\tWHERE a.type = 1\n" + "\t\t\tAND b.type = 2\n" + "\t\t\tAND (a.result = 1\n" + "\t\t\t\tOR (a.deal_flag = 1\n" + "\t\t\t\t\tAND a.deal_result = 1))\n" + "\t\t\tAND (b.result = 1\n" + "\t\t\t\tOR (b.deal_flag = 1\n" + "\t\t\t\t\tAND b.deal_result = 1))\n" + "\t\tGROUP BY a.config_uuid\n" + "\t)\n" + "\tUNION ALL\n" + "\tSELECT aa.uuid\n" + "\tFROM product_sys_data_center_log aa\n" + "\t\tJOIN product_sys_data_center_log bb ON aa.pre_step_uuid = bb.uuid\n" + "\tWHERE aa.type = 2\n" + "\t\tAND bb.type = 1\n" + "\t\tAND bb.id IN (\n" + "\t\t\tSELECT max(a.id)\n" + "\t\t\tFROM product_sys_data_center_log a\n" + "\t\t\t\tJOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid\n" + "\t\t\tWHERE a.type = 1\n" + "\t\t\t\tAND b.type = 2\n" + "\t\t\t\tAND (a.result = 1\n" + "\t\t\t\t\tOR (a.deal_flag = 1\n" + "\t\t\t\t\t\tAND a.deal_result = 1))\n" + "\t\t\t\tAND (b.result = 1\n" + "\t\t\t\t\tOR (b.deal_flag = 1\n" + "\t\t\t\t\t\tAND b.deal_result = 1))\n" + "\t\t\tGROUP BY a.config_uuid\n" + "\t\t)\n" + "\tUNION ALL\n" + "\tSELECT uuid\n" + "\tFROM product_sys_data_center_log a\n" + "\tWHERE a.type = 5\n" + "\t\tAND detail != 3\n" + "\t\tAND (a.result = 1\n" + "\t\t\tOR (a.deal_flag = 1\n" + "\t\t\t\tAND a.deal_result = 1))\n" + ")) ddd"; //插入数据根据创建时间的年月分别插入到对应的表中 for (DateTime dateTime : tableSuffixList) { String selectSql = "select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" + " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime currentPage); // String insertSql = "insert into " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " select * from product_sys_data_center_log where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') + interval 1 month" + // " and id >(select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) and (result = 1 or ( deal_flag = 1 AND deal_result = 1 ) ) and created_utc_datetime = (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) "); // deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) "); // deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) "); // deleteSql.append(" and a.type=1 "); // deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") "); // //先删除采集日志数据 // getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{}); //删除数据已插入的数据 //查询data_center_log表中的创建时间是7天前的数据 且开始时间是dateTime的数据 StringBuilder querySql = new StringBuilder(64); querySql.append("select max(id) max_id,min(id) min_id from product_sys_data_center_log "); querySql.append(" where created_utc_datetime >= str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s') and created_utc_datetime < str_to_date(concat(?, '-01 00:00:00'), '%Y-%m-%d %H:%i:%s')"); querySql.append("select id from "); querySql.append(tableName).append("_").append(DateUtil.format(dateTime, "yyyyMM")); querySql.append(" where type=1 and (result=1 or (deal_flag=1 and deal_result=1))"); deleteSql.setLength(0); //再删除提取日志数据 deleteSql.append(" DELETE a FROM product_sys_data_center_log a LEFT JOIN "); deleteSql.append(" product_sys_data_center_log b on b.uuid=a.pre_step_uuid and b.type=1 and a.type=2 "); deleteSql.append(" where "); deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) "); deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) "); deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) "); deleteSql.append(" and a.type=2 and b.id is null "); deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") "); getBaseDao().executeUpdate(deleteSql.toString(), new Object[]{}); deleteSql.setLength(0); //再删除其他日志数据 deleteSql.append(" DELETE FROM product_sys_data_center_log a "); deleteSql.append(" where "); deleteSql.append(" a.id>= (select ifnull(min(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) "); deleteSql.append(" and a.id <= (select ifnull(max(id),0) from " + tableName + "_" + DateUtil.format(dateTime, "yyyyMM") + " ) "); deleteSql.append(" and (a.result = 1 or ( a.deal_flag = 1 AND a.deal_result = 1 ) ) "); deleteSql.append(" and a.type>2 "); deleteSql.append(" and a.uuid not in (").append(sqlFilter).append(") "); getBaseDao().executeUpdate(deleteSql.toString()); } // fs = null; // do { // this.getBaseDao().executeUpdate("DELETE FROM product_sys_data_center_log WHERE uuid in (select * from (select * from product_sys_data_center_log_del_v limit 10000) a ) "); // //检查是否还有数据 // sql = "select 1 as del_count from product_sys_data_center_log_del_v limit 1"; // //检查是否还有数据 // fs = getBaseDao().getFieldSetEntityBySQL(sql, new Object[]{}, false); // } while (fs != null && fs.getInteger("del_count") != null && fs.getInteger("del_count") > 0); } catch (Exception e) { e.printStackTrace(); throw e; } } /** * 创建定时任务 * * @param taskName 任务名称 * @param isUsed 是否启用 * @param corn 执行时间 * @param executeExpression 执行表达式(bean.method) */ private String createTimeTask(String timeTaskUuid, String taskName, boolean isUsed, String corn, String executeExpression) { FieldSetEntity fse = new FieldSetEntity(); try { fse.setTableName("product_sys_timed_task"); fse.setValue("uuid", timeTaskUuid); fse.setValue("job_name", taskName);//任务名称 fse.setValue("job_group", "system");//分组 fse.setValue("invoke_target", executeExpression);//调用目标字符串 fse.setValue("cron_expression", corn);//cron表达式 fse.setValue("misfire_policy", "3");//错误执行策略 只执行一次 fse.setValue("concurrent", 0);//不允许并发执行 fse.setValue("remark", taskName); fse.setValue("created_by", SpringMVCContextHolder.getCurrentUser().getUser_id()); fse.setValue("created_utc_datetime", new Date()); fse.setValue("status", isUsed ? 1 : 0); fse.setValue("is_conceal", 1); if (!StringUtils.isEmpty(timeTaskUuid)) { BaseUtil.createCreatorAndCreationTime(fse); sysJobService.updateJob(fse); } else { sysJobService.insertJob(fse); } return fse.getUUID(); } catch (Exception e) { e.printStackTrace(); throw new BaseException(ErrorCode.CRATED_TIMED_TASK_FAIL); } } /** * 通用保存 * * @param fse * @param t */ private void commonSave(FieldSetEntity fse, CallBackReturnValue t) { boolean isAdd = false; BaseUtil.createCreatorAndCreationTime(fse); if (StringUtils.isEmpty(fse.getUUID())) { fse.setValue(CmnConst.UUID, IdUtil.randomUUID()); isAdd = true; } else if (!StringUtils.isEmpty(fse.getUUID()) && "add".equals(fse.getString(CoreConst.SYSTEM_DATA_OPERATE_TYPE))) { isAdd = true; } String timeTasksUid = t.invokeMethod(fse); fse.setValue(CmnConst.TIME_TASK_UUID, timeTasksUid); if (isAdd) { getBaseDao().add(fse); } else { getBaseDao().update(fse); } } /** * 保存MES数据同步配置 * * @param fse * @return * @throws BaseException */ @Transactional @Override public String saveMesSyncDataConfig(FieldSetEntity fse) throws BaseException { DataTableEntity subDt = fse.getSubDataTable("product_sys_data_sync_mes_sub"); //固定uuid fse.setValue("uuid", 1); DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "del_type=1"); DataTableEntity subDataTable = null; if (!DataTableEntity.isEmpty(subDt)) { subDataTable = subDt.clones(); int b = 0; for (int i = 0; i < subDt.getRows(); i++) { FieldSetEntity subFse = subDt.getFieldSetEntity(i); String dataType = subFse.getString("~type~"); String uuid = subFse.getString(CmnConst.UUID); if ("del".equals(dataType)) { String timeTaskUid = subFse.getString(CmnConst.TIME_TASK_UUID); FieldSetEntity task = getBaseDao().getFieldSetEntity("product_sys_timed_task", timeTaskUid, false); if (task != null) { cancelTimeTask(task); } subDataTable.setFieldValue(b, "config_uuid", subFse.getUUID()); } else { subFse.setValue("master_uuid", fse.getUUID()); commonSave(subFse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "SYNC:" + fs.getString("table_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execution_time"), "dataSyncService.runTimeTask('" + fs.getUUID() + "')")); subDt.removeFieldSetEntity(i); subDataTable.setFieldValue(b, "uuid", subFse.getUUID()); i--; } b++; } if (!DataTableEntity.isEmpty(subDt)) { Object[] objects = subDt.getData().stream().map(item -> item.getUUID()).toArray(); getBaseDao().delete("product_sys_data_sync_mes_sub", objects); } } fse.removeSubData("product_sys_data_sync_mes_sub"); getBaseDao().saveFieldSetEntity(fse); //创建触发器 // //1 主库删除后同步删除子库 syncDelRecordService.createTrigger(subDataTable, fse.getString("data_source"), fse.getString("target_data_source")); return fse.getUUID(); } /** * 取消定时任务 * * @param fse * @throws BaseException */ private void cancelTimeTask(FieldSetEntity fse) throws BaseException { try { this.sysJobService.deleteJob(fse); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); throw new BaseException(ErrorCode.CANCEL_TIME_TASK_FAIL); } } /** * 采集配置保存 * * @param fse * @return */ @Override @Transactional public synchronized String saveCollectConfig(FieldSetEntity fse) throws BaseException { boolean isAdd = false; if (StringUtils.isEmpty(fse.getUUID())) { if (StringUtils.isEmpty(dataSystemName)) { //数据来源系统名称不能为空 throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY); } Pattern r = Pattern.compile("^[A-Za-z-]+$"); Matcher m = r.matcher(dataSystemName); if (!m.matches()) { //数据来源系统名称格式不符 throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_FORMAT_UNQUALIFIED); } //新增 FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL("SELECT\n" + "\t ifnull(MAX( id ),0)+1 id " + "FROM\n" + "\t( SELECT CONVERT(REPLACE ( id, ?, '' ),UNSIGNED int) id FROM product_sys_data_collect WHERE id LIKE concat(?,'%') ) a", new Object[]{dataSystemName, dataSystemName}, false); if (!FieldSetEntity.isEmpty(fs) && !StringUtils.isEmpty(fs.getString("id"))) { String id = fs.getString("id"); DecimalFormat decimalFormat = new DecimalFormat("000000"); id = decimalFormat.format(Integer.valueOf(id)); fse.setValue("id", dataSystemName + id); } else { throw new BaseException(ErrorCode.UNIQUE_COLLECT_CREATE_FAIL); } isAdd = true; } commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DC:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency"), "dataCollectService.dataCollect('" + fs.getUUID() + "')")); saveDelRecordConfig(1, fse.getUUID()); if (!"ch-kt".equals(dataSystemName)) { //子库保存后发送到主服务 fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update"); mesExternalService.remoteSaveCollectConfig(fse); } return fse.getUUID(); } /** * 采集配置详情 * * @param fse * @return */ @Override public FieldSetEntity findCollectConfig(FieldSetEntity fse) throws BaseException { return publicService.getFieldSetEntity(fse, false); } /** * 采集配置删除 * * @param fse */ @Override @Transactional public void delCollectConfig(FieldSetEntity fse) throws BaseException { String uuids = fse.getUUID(); String[] split = uuids.split(","); DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM product_sys_data_collect WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")"); if (!DataTableEntity.isEmpty(dt)) { for (int i = 0; i < dt.getRows(); i++) { cancelTimeTask(dt.getFieldSetEntity(i)); } } for (String s : split) { cancelDelRecordConfig(1, s); } publicService.delete(fse); } /** * 提取配置保存 * * @param fse * @return */ @Override @Transactional public String saveExtractConfig(FieldSetEntity fse) throws BaseException { boolean isAdd = !StringUtils.isEmpty(fse.getUUID()); commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DE:" + fs.getString("extract_name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("extract_time"), "dataExtractService.startExtractData('" + fs.getUUID() + "')")); saveDelRecordConfig(2, fse.getUUID()); if ("ch-kt".equals(dataSystemName)) { //主服务保存后分发到子服务中 fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update"); mesExternalService.remoteSaveExtractConfig(fse); } return fse.getUUID(); } /** * 提取配置查询 * * @param fse * @return */ @Override public FieldSetEntity findExtractConfig(FieldSetEntity fse) throws BaseException { return publicService.getFieldSetEntity(fse, true); } /** * 提取配置删除 * * @param fse */ @Override @Transactional public void delExtractConfig(FieldSetEntity fse) throws BaseException { String uuids = fse.getUUID(); String[] split = uuids.split(","); DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM product_sys_data_extract_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")"); if (!DataTableEntity.isEmpty(dt)) { for (int i = 0; i < dt.getRows(); i++) { cancelTimeTask(dt.getFieldSetEntity(i)); } } for (String s : split) { cancelDelRecordConfig(2, s); } publicService.delete(fse); } /** * 归档配置保存 * * @param fse * @return */ @Override @Transactional public String saveArchivingConfig(FieldSetEntity fse) throws BaseException { String sourceDataSource = fse.getString("source_data_source"); String targetDataSource = fse.getString("target_data_source"); String[] dataSource = (sourceDataSource + "," + targetDataSource).split(","); StringBuilder sql = new StringBuilder(); sql.append("SELECT db_type num FROM product_sys_data_sync_manager WHERE "); sql.append(BaseUtil.buildQuestionMarkFilter("uuid", dataSource.length, true)); sql.append(" GROUP BY db_type "); DataTableEntity dt = getBaseDao().listTable(sql.toString(), dataSource); if (DataTableEntity.isEmpty(dt)) { throw new BaseException(ErrorCode.VERIFY_DATA_SOURCE_TYPE_FAIL); } /* 2022年10月10日 15:33:47 6c 需要能够支持不同数据库间的归档 if (dt.getRows() != 1) { throw new BaseException(ErrorCode.DATA_SOURCE_INCONFORMITY); }*/ if (StringUtils.equalsAny(dt.getString(0, "num"), "0", "1", "5")) { boolean isAdd = !StringUtils.isEmpty(fse.getUUID()); commonSave(fse, (fs) -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "DA:" + fs.getString("name"), fs.getBoolean(CmnConst.IS_USED), fs.getString("execute_time"), "dataArchivingService.dataArchivingEntry('" + fs.getUUID() + "')")); if ("ch-kt".equals(dataSystemName)) { //主服务保存后分发到子服务中 fse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, isAdd ? "add" : "update"); mesExternalService.remoteSaveArchiveConfig(fse); } } return fse.getUUID(); } /** * 归档配置详情 * * @param fse * @return */ @Override public FieldSetEntity findArchivingConfig(FieldSetEntity fse) throws BaseException { return publicService.getFieldSetEntity(fse, false); } /** * 归档配置删除 * * @param fse */ @Override @Transactional public void delArchivingConfig(FieldSetEntity fse) throws BaseException { String uuids = fse.getUUID(); String[] split = uuids.split(","); DataTableEntity dt = getBaseDao().listTable("product_sys_timed_task", "uuid in " + "(SELECT time_task_uuid FROM product_sys_data_archiving_config WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", split, true) + ")"); if (!DataTableEntity.isEmpty(dt)) { for (int i = 0; i < dt.getRows(); i++) { cancelTimeTask(dt.getFieldSetEntity(i)); } } publicService.delete(fse); } /** * 订单数据验证配置保存 * * @param fse * @return */ @Transactional @Override public String saveOrderDataVerification(FieldSetEntity fse) { fse.setValue(CmnConst.UUID, "1"); commonSave(fse, fs -> createTimeTask(fs.getString(CmnConst.TIME_TASK_UUID), "订单数据同步验证", fs.getBoolean(CmnConst.IS_USED), fs.getString("frequency_verification"), "orderDataValidationService.verificationEntryPoint")); return fse.getUUID(); } /** * 保存删除记录配置 * * @param type * @param uuid */ public void saveDelRecordConfig(int type, String uuid) { String sql = getDelRecordSql(type); DataTableEntity dt = getBaseDao().listTable(sql, new Object[]{uuid, dataSystemName}); for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity fse = dt.getFieldSetEntity(i); fse.setTableName(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG); fse.setValue("delete_sync_content", 1); syncDelRecordService.createTrigger(fse); } } private String getDelRecordSql(int type) { StringBuffer sql = new StringBuffer(); sql.append("\n SELECT "); sql.append("\n c.uuid, "); sql.append("\n c.random_suffix,"); sql.append("\n collect.uuid config_uuid, "); sql.append("\n collect.data_source, "); sql.append("\n extract.extract_target_source target_data_source, "); sql.append("\n collect.auto_field master_key_field, "); sql.append("\n collect.source_table table_name, "); sql.append("\n collect.id collect_id,2 del_type, "); sql.append("\n extract.collect_source_field collect_field,"); sql.append("\n extract.extract_unique_field pre_master_field"); sql.append("\n FROM "); sql.append("\n product_sys_data_collect collect "); sql.append("\n JOIN product_sys_data_extract_config extract ON upper( case when length(collect.target_table)>0 then collect.target_table else collect.source_table end ) = upper( extract.extract_source_table ) "); sql.append("\n LEFT JOIN product_mes_del_record_config c ON collect.uuid=c.config_uuid and c.del_type=2 "); sql.append(" WHERE "); if (1 == type) { //采集 sql.append("\n collect.uuid=? "); } else { //提取 sql.append("\n extract.uuid=? "); } sql.append(" and collect.id like concat(?,'%')"); return sql.toString(); } public void cancelDelRecordConfig(int type, String uuid) { FieldSetEntity fieldSetEntity = new FieldSetEntity(); fieldSetEntity.setTableName("temp"); if (type == 1) { fieldSetEntity.setValue("uuid", uuid); syncDelRecordService.deleteTrigger(fieldSetEntity); } else { //提取 DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "UPPER(source_table) in (select upper(extract_source_table) from product_sys_data_extract_config where uuid =?) ", new Object[]{uuid}); if (DataTableEntity.isEmpty(dt)) { return; } Object[] uuids = dt.getUuids(); for (Object o : uuids) { fieldSetEntity.setValue("uuid", o); syncDelRecordService.deleteTrigger(fieldSetEntity); } } } }