package com.product.data.center.service; import cn.hutool.core.codec.Base64; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.io.file.FileWriter; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.*; import cn.hutool.crypto.SecureUtil; import cn.hutool.crypto.symmetric.AES; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.product.core.cache.util.RedisUtil; import com.product.core.config.CoreConst; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.utils.QuerySqlParseUtil; import com.product.datasource.config.DataBaseType; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import com.product.util.BaseUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import redis.clients.jedis.Jedis; import javax.annotation.Resource; import java.io.File; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedTransferQueue; import java.util.stream.Collectors; /** * @Author cheng * @Date 2022/11/29 8:29 * @Desc 同步删除记录 */ @Service public class SyncDelRecordService { @Resource private BaseDao baseDao; public void runTask() { DataTableEntity configDt = baseDao.listTable(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG); if (DataTableEntity.isEmpty(configDt)) { return; } for (int i = 0; i < configDt.getRows(); i++) { TriggerDeleteService triggerDeleteService =null; try { triggerDeleteService = new TriggerDeleteService(configDt.getFieldSetEntity(i)); if ("2".equals(triggerDeleteService.getDelType())) { //子库采集表删除 triggerDeleteService.collectTableDelete(); } else { //主库表删除 triggerDeleteService.mainDataBaseTableDelete(); } } catch (Exception e) { if(triggerDeleteService!=null){ Set daoSet = triggerDeleteService.getDaoSet(); for (Dao dao : daoSet) { dao.closeConnection(); } } e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } System.out.println("完成"); } private static ExecutorService singleThreadExecutor = null; private static Queue queue = null; public static synchronized void WriteData(List records, String fileName) { if (CollectionUtil.isEmpty(records)) { return; } if (queue == null) { queue = new LinkedTransferQueue<>(); } if (StringUtils.isEmpty(fileName)) { throw new BaseException(ErrorCode.FILENAME_CAN_NOT_EMPTY); } List collect = ListUtil.toList(records.stream().map(item -> new JSONObject((Map) item.getRecord().getValues())).toArray()); JSONArray objects = new JSONArray(collect); JSONObject jsonObject = new JSONObject(); jsonObject.put(com.product.file.config.CmnConst.FILE_NAME, fileName); jsonObject.put("data", objects); byte[] key = Base64.decode("glwqQbMzmInerCSs9RoAWQ=="); AES aes = SecureUtil.aes(key); queue.add(jsonObject); if (singleThreadExecutor != null && !singleThreadExecutor.isTerminated()) { return; } singleThreadExecutor = Executors.newSingleThreadExecutor(); // if(singleThreadExecutor.) singleThreadExecutor.submit(() -> { while (true) { if (queue.isEmpty()) { ThreadUtil.sleep(30000); continue; } try { JSONObject poll = queue.poll(); String name = poll.getString(com.product.file.config.CmnConst.FILE_NAME); JSONArray data = poll.getJSONArray("data"); if (StringUtils.isEmpty(name) || (data == null || data.isEmpty())) { SpringMVCContextHolder.getSystemLogger().error(new BaseException(ErrorCode.FILENAME_OR_DATA_EMPTY)); continue; } String now = DateUtil.format(new Date(), "yyyyMMdd"); String filePath = "." + File.separator + "触发器删除数据记录" + File.separator + now + File.separator + name + ".dbackups"; File file = new File(filePath); File parentFile = file.getParentFile(); if (!parentFile.exists()) { parentFile.mkdirs(); } if (!file.exists()) { file.createNewFile(); } FileWriter writer = new FileWriter(file); writer.append(aes.encryptBase64(data.toJSONString())); writer.append("\n"); System.out.println("写入成功"); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } }); singleThreadExecutor.shutdown(); } public static class Record { private int deleteFlag = 1; private Integer delTableId; private String[] uniqueValue; private String sourceInfo; private String[] primaryValue; private FieldSetEntity record; public Record(Integer delTableId, String[] uniqueValue) { this.delTableId = delTableId; this.uniqueValue = uniqueValue; } public Record(Integer delTableId, String[] uniqueValue, String sourceInfo) { this.delTableId = delTableId; this.uniqueValue = uniqueValue; this.sourceInfo = sourceInfo; } public Record(String uniqueValue[], String sourceInfo) { this.uniqueValue = uniqueValue; this.sourceInfo = sourceInfo; } public void setRecord(FieldSetEntity record) { this.record = record; } public void setDeleteFlag(int deleteFlag) { this.deleteFlag = deleteFlag; } public FieldSetEntity getRecord() { return record; } public int getDeleteFlag() { return deleteFlag; } public Integer getDelTableId() { return delTableId; } public String[] getUniqueValue() { return uniqueValue; } public void setUniqueValue(String[] uniqueValue) { this.uniqueValue = uniqueValue; } public String getSourceInfo() { return sourceInfo; } public String[] getPrimaryValue() { return primaryValue; } public void setPrimaryValue(String[] primaryValue) { this.primaryValue = primaryValue; } @Override public boolean equals(Object o) { if (this == o) return true; if (!(o instanceof Record)) return false; Record record = (Record) o; return ArrayUtil.equals(uniqueValue, record.uniqueValue) && (Objects.equals(sourceInfo, record.sourceInfo)); } @Override public int hashCode() { return Objects.hash(uniqueValue, sourceInfo); } } private class TriggerDeleteService { //删除记录数据源 private String dataSource; //删除目标数据源 private String[] targetDataSource; //表名 private String tableName; //随机后缀 private String randomSuffix; //唯一字段名称(多个组合唯一) private String[] uniqueFields; //子库存储主库唯一字段名称 private String preMasterField; //删除记录表名称 private String delRecordTable; //采集表id private String collectId; //采集来源记录字段 private String sourceInfoField; // 删除类型 1 主库删除后同步删除子库 2 子库删除后同步到主库 private String delType; //删除表数据记录 private DataTableEntity deleteTableRecord; private Dao sourceDao; private String targetTable; public String getDelType() { return delType; } private Set daoSet=new HashSet<>(); public Set getDaoSet() { return daoSet; } public TriggerDeleteService(FieldSetEntity fse) throws SQLException { //数据源 this.dataSource = fse.getString("data_source"); //目标数据源 this.targetDataSource = fse.getString("target_data_source").split(","); //表名 this.tableName = StringUtils.lowerCase(fse.getString("table_name")); //随机后缀 this.randomSuffix = fse.getString("random_suffix"); //唯一字段 this.uniqueFields = StringUtils.lowerCase(fse.getString("unique_field")).split(","); //主库存储子库字段 this.preMasterField = StringUtils.lowerCase(fse.getString("pre_master_field")); //删除记录表表名 this.delRecordTable = "trigger_table_" + this.randomSuffix; // 删除类型 1 主库删除后同步删除子库 2 子库删除后同步到主库 this.delType = fse.getString("del_type"); //采集id this.collectId = fse.getString("collect_id"); //采集ID字段 this.sourceInfoField = StringUtils.lowerCase(fse.getString("collect_field")); if ("2".equals(this.getDelType())) { // StringBuilder sql = new StringBuilder(); sql.append("\n SELECT extract.* "); sql.append("\n FROM "); sql.append("\n product_sys_data_collect collect "); sql.append("\n JOIN product_sys_data_extract_config extract ON "); sql.append("\n upper( case when length(collect.target_table)>0 then "); sql.append("\n collect.target_table else collect.source_table end ) = upper( extract.extract_source_table )"); sql.append(" where collect.uuid=? limit 1"); FieldSetEntity fs = baseDao.getFieldSetEntityBySQL(sql.toString(), new Object[]{fse.getString(CmnConst.CONFIG_UUID)}, false); if (fs == null) { throw new RuntimeException("提取配置不存在"); } this.targetTable = fs.getString("extract_target_table"); } if (StringUtils.isEmpty(targetTable)) { this.targetTable = tableName; } this.targetTable = StringUtils.lowerCase(targetTable); DataBaseEntity dbe = new DataBaseEntity(this.dataSource); sourceDao = dbe.getDao(); daoSet.add(sourceDao); Connection connection = sourceDao.getConnection(); connection.setAutoCommit(false); try { selectDelRecord(fse.getUUID()); } catch (Exception e) { connection.close(); throw e; } } /** * 采集表删除同步到主 */ public void collectTableDelete() throws SQLException { Dao targetDao = null; try { if (DataTableEntity.isEmpty(this.deleteTableRecord)) { return; } List data = this.deleteTableRecord.getData(); List> splitRecord = CollectionUtil.split(data, 1000); for (List fieldSetEntityList : splitRecord) { Map> groupUniqueValues = new HashMap<>(); fieldSetEntityList.stream().forEach(item -> { List records = groupUniqueValues.computeIfAbsent(item.getString(this.sourceInfoField), k -> new ArrayList<>()); //子库的主键字段值 String[] uniqueValues = new String[this.uniqueFields.length]; for (int i = 0; i < this.uniqueFields.length; i++) { uniqueValues[i] = item.getString(this.uniqueFields[i]); } Record record = new Record(item.getInteger(("ID_" + this.randomSuffix).toLowerCase()), uniqueValues, item.getString(this.sourceInfoField)); record.setDeleteFlag("2".equals(item.getString("delete_flag")) ? 2 : 1); records.add(record); }); if (groupUniqueValues.isEmpty()) { return; } StringBuilder sql = new StringBuilder(128); int archivingCount = 0; for (List value : groupUniqueValues.values()) { List collect = value.stream().filter(item -> item.getDeleteFlag() != 2).collect(Collectors.toList()); archivingCount += value.size() - collect.size(); if (!CollectionUtil.isEmpty(collect)) { getUniqueValues(collect, sql); } } if (sql.length() <= 0 && archivingCount == 0) { return; } DataBaseEntity dbe = new DataBaseEntity(targetDataSource[0]); targetDao = dbe.getDao(); daoSet.add(targetDao); targetDao.getConnection().setAutoCommit(false); if (sql.length() > 0) { DataTableEntity dt = targetDao.getList(sql.toString(), (Object[]) null); if (DataTableEntity.isEmpty(dt) && archivingCount == 0) { return; } for (int i = 0; i < dt.getRows(); i++) { //主库存的子库主键值 String preMaterKey = dt.getString(i, this.preMasterField); //采集来源值 String sourceInfo = dt.getString(i, this.sourceInfoField); List records = groupUniqueValues.get(sourceInfo); if (CollectionUtil.isEmpty(records)) { continue; } String[] primaryValue = new String[this.uniqueFields.length]; for (int i1 = 0; i1 < this.uniqueFields.length; i1++) { primaryValue[i1] = dt.getString(i, this.uniqueFields[i1]); } int index = records.indexOf(new Record(new String[]{preMaterKey}, sourceInfo)); if (index < 0) { continue; } Record record = records.get(index); record.setPrimaryValue(primaryValue); record.setRecord(dt.getFieldSetEntity(i)); } } sql.setLength(0); sql.append(" DELETE FROM ").append(this.targetTable).append(" WHERE "); int length = sql.length(); List delRecords = new ArrayList<>(); List lastRecord = new ArrayList<>(); if (archivingCount == 0 || archivingCount != this.deleteTableRecord.getRows()) { for (List value : groupUniqueValues.values()) { String sourceInfo = value.get(0).getSourceInfo(); String[] collect = value.stream().filter(item -> { if (item.getDeleteFlag() == 2) { lastRecord.add(item); } if (ArrayUtil.isEmpty(item.getPrimaryValue())) { return false; } delRecords.add(item); return true; }).flatMap(item -> Arrays.stream(item.getPrimaryValue())).toArray(String[]::new); if (ArrayUtil.isEmpty(collect)) { continue; } if (length < sql.length()) { sql.append(" OR "); } sql.append(" (").append(this.sourceInfoField).append("='").append(sourceInfo).append("' and ("); for (int i = 0; i < this.uniqueFields.length; i++) { if (i > 0) { sql.append(" AND "); } sql.append(BaseUtil.buildQuestionMarkFilter(uniqueFields[i], collect, true)); } sql.append(" ) )"); } if (lastRecord.size() > 0) { delRecords.removeAll(lastRecord); } } else { lastRecord.addAll(groupUniqueValues.values().stream().flatMap(item -> item.stream()).collect(Collectors.toList())); } if (sql.length() > length) { targetDao.executeSql(sql.toString()); } delRecords.addAll(lastRecord); List allRecord = new ArrayList<>(delRecords); // List records = new ArrayList<>(allRecord); this.deleteArchivingData(allRecord); delRecords.stream().forEach(item -> item.setDeleteFlag(1)); if (!CollectionUtil.isEmpty(allRecord)) { //标记为归档未删除 allRecord.stream().forEach(item -> item.setDeleteFlag(2)); } List collect = lastRecord.stream().filter(item -> item.getRecord() == null).collect(Collectors.toList()); delRecords.removeAll(collect); updateRecordTableStatus(delRecords); targetDao.getConnection().commit(); WriteData(delRecords, targetDataSource[0] + "&_" + tableName + "&_collect"); this.sourceDao.getConnection().commit(); } } catch (Exception e) { throw e; } finally { if (targetDao != null) { targetDao.closeConnection(); } if (this.sourceDao != null) { this.sourceDao.closeConnection(); } } } /** * 主库删除同步到子 */ public void mainDataBaseTableDelete() throws SQLException { DataBaseEntity[] dataBaseEntities = null; try { if (DataTableEntity.isEmpty(this.deleteTableRecord)) { return; } List recordList = new ArrayList<>(); List data = this.deleteTableRecord.getData(); List> splitRecord = CollectionUtil.split(data, 1000); for (List fieldSetEntityList : splitRecord) { fieldSetEntityList.stream().forEach(item -> { String[] uniqueValues = new String[this.uniqueFields.length]; for (int i = 0; i < this.uniqueFields.length; i++) { uniqueValues[i] = item.getString(this.uniqueFields[i]); } if (ArrayUtil.isAllEmpty(uniqueValues)) { return; } Record record = new Record(item.getInteger((this.randomSuffix + "_ID").toLowerCase()), uniqueValues); record.setPrimaryValue(uniqueValues); record.setDeleteFlag("2".equals(item.getString("delete_flag")) ? 2 : 1); recordList.add(record); }); dataBaseEntities = new DataBaseEntity[this.targetDataSource.length]; for (int i = 0; i < this.targetDataSource.length; i++) { dataBaseEntities[i] = new DataBaseEntity(this.targetDataSource[i]); Dao targetDao = dataBaseEntities[i].getDao(); daoSet.add(targetDao); Connection connection = targetDao.getConnection(); connection.setAutoCommit(false); DataTableEntity dt; if (this.uniqueFields.length == 1) { dt = targetDao.getList(tableName, BaseUtil.buildQuestionMarkFilter(this.uniqueFields[0], recordList.stream().map(item -> item.getUniqueValue()[0]).toArray(), true), null, null); } else { if (recordList.size() > 1000) { List> split = ListUtil.split(recordList, 500); dt = new DataTableEntity(); for (int j = 0; j < split.size(); j++) { dt.addFieldSetEntity(targetDao.getList(tableName, getMultiplePrimaryFieldFilter(split.get(j)), null, null)); } } else { dt = targetDao.getList(tableName, getMultiplePrimaryFieldFilter(recordList), null, null); } } for (Record record : recordList) { int deleteFlag = record.getDeleteFlag(); if (2 == deleteFlag) { if (dt == null) { dt = new DataTableEntity(); } FieldSetEntity fs = new FieldSetEntity(); fs.setTableName("temp"); for (int j = 0; j < this.uniqueFields.length; j++) { fs.setValue(this.uniqueFields[j], record.getUniqueValue()[j]); } dt.addFieldSetEntity(fs); } } if (DataTableEntity.isEmpty(dt)) { continue; } // List records = recordList; //在数据库查询到的视为全部能删除 // if (dt.getRows() != recordList.size()) { // } if (CollectionUtil.isEmpty(recordList)) { return; } List records = new ArrayList<>(); Record record; for (int j = 0; j < dt.getRows(); j++) { String[] uniqueValues = new String[this.uniqueFields.length]; for (int k = 0; k < this.uniqueFields.length; k++) { uniqueValues[k] = dt.getString(j, this.uniqueFields[k]); } record = new Record(uniqueValues, null); int index = recordList.indexOf(record); if (index == -1) { continue; } record = recordList.get(index); record.setRecord(dt.getFieldSetEntity(j)); records.add(record); } if (CollectionUtil.isEmpty(records)) { return; } List recordSub = records.stream().filter(item -> item.getDeleteFlag() != 2 && item.getRecord() != null).collect(Collectors.toList()); if (!CollectionUtil.isEmpty(recordSub)) { if (this.uniqueFields.length == 1) { targetDao.delete(tableName, BaseUtil.buildQuestionMarkFilter(this.uniqueFields[0], recordSub.stream().map(item -> item.getUniqueValue()[0]).toArray(), true)); } else { if (records.size() > 1000) { List> split = ListUtil.split(recordSub, 500); for (int j = 0; j < split.size(); j++) { targetDao.delete(tableName, getMultiplePrimaryFieldFilter(split.get(j))); } } else { targetDao.delete(tableName, getMultiplePrimaryFieldFilter(recordSub)); } } } WriteData(recordSub, dataBaseEntities[i].getUuid() + "&_" + tableName + "&_master"); List deRecords = new ArrayList<>(records); deleteArchivingData(deRecords); records.stream().forEach(item -> item.setDeleteFlag(1)); if (!CollectionUtil.isEmpty(deRecords)) { //标记为归档未删除 deRecords.stream().forEach(item -> item.setDeleteFlag(2)); } updateRecordTableStatus(records); } for (DataBaseEntity dataBaseEntity : dataBaseEntities) { Dao dao=dataBaseEntity.getDao(); daoSet.add(dao); Connection connection = dao.getConnection(); connection.commit(); } } this.sourceDao.getConnection().commit(); } catch (Exception e) { throw e; } finally { if (!ArrayUtil.isEmpty(dataBaseEntities)) { for (DataBaseEntity dataBaseEntity : dataBaseEntities) { dataBaseEntity.getDao().closeConnection(); } } this.sourceDao.closeConnection(); } } /** * 更新触发器删除记录状态 * * @param recordList */ private void updateRecordTableStatus(List recordList) { if (CollectionUtil.isEmpty(recordList)) { return; } Map> collect = recordList.stream().collect(Collectors.groupingBy(item -> Integer.valueOf(item.getDeleteFlag()), Collectors.mapping(item -> item.getDelTableId(), Collectors.toList()))); for (Map.Entry> entry : collect.entrySet()) { List value = entry.getValue(); value.add(0, entry.getKey()); this.sourceDao.executeSql("update " + this.delRecordTable + " set DELETE_FLAG=? WHERE " + BaseUtil.buildQuestionMarkFilter("ID_" + this.randomSuffix, entry.getValue().size() - 1, true), entry.getValue().toArray()); } } /** * 删除归档库 * * @param list * @return * @throws SQLException */ private void deleteArchivingData(List list) throws SQLException { if (CollectionUtil.isEmpty(list)) { return; } List recordList = new ArrayList<>(new HashSet<>(list)); FieldSetEntity fse = baseDao.getFieldSetByFilter(CmnConst.DATA_ARCHIVING_TABLE, "UPPER(source_table)=Upper(?)", new Object[]{targetTable}, false); if (FieldSetEntity.isEmpty(fse)) { return; } String targetDataSource = fse.getString(CmnConst.TARGET_DATA_SOURCE); String targetTablePrefix = fse.getString("target_table_prefix"); // String targetDataSource = "c94c19cf-86a8-4b84-85ed-3f3f90d7ecca"; // String targetTablePrefix = "da_" + this.tableName; DataBaseEntity dbe = new DataBaseEntity(targetDataSource); Dao targetDao = dbe.getDao(); daoSet.add(targetDao); Connection connection = null; try { connection = targetDao.getConnection(); connection.setAutoCommit(false); Set allTableName = QuerySqlParseUtil.getAllTableName(targetDao, dbe.getDbName(), targetTablePrefix); if (CollectionUtil.isEmpty(allTableName)) { allTableName = new HashSet<>(); allTableName.add(targetTablePrefix); } StringBuilder lastFilter = null; List lastNotDeleteRecord = recordList.stream().filter(item -> item.getDeleteFlag() == 2).collect(Collectors.toList()); if (!CollectionUtil.isEmpty(lastNotDeleteRecord)) { if ("2".equals(delType)) { lastFilter = new StringBuilder(); Map> collect = lastNotDeleteRecord.stream().collect( Collectors.groupingBy(item -> StringUtils.isEmpty(item.getSourceInfo()) ? this.collectId : item.getSourceInfo())); for (Map.Entry> entry : collect.entrySet()) { if (lastFilter.length() > 0) { lastFilter.append(" OR "); } lastFilter.append(" ("); lastFilter.append(BaseUtil.buildQuestionMarkFilter(this.preMasterField, entry.getValue().stream().map(item -> item.getUniqueValue()[0]).toArray(), true)); lastFilter.append(" and ").append(this.sourceInfoField).append("= '").append(entry.getKey()).append("' )"); } } else { lastFilter = new StringBuilder(getMultiplePrimaryFieldFilter(lastNotDeleteRecord)); } } StringBuilder sql = new StringBuilder(); for (int i = 0; i < allTableName.size(); i++) { String tableName = CollectionUtil.get(allTableName, i); if (i > 0) { sql.append(" UNION ALL \n"); } sql.append(" SELECT *"); // .append(ArrayUtil.join(this.uniqueFields, ",")); sql.append(",'").append(tableName).append("' table_name_c_m_t "); if ("2".equals(delType)) { // sql.append(",").append(this.sourceInfoField).append(",").append(preMasterField); } sql.append(" FROM ").append(tableName); sql.append(" WHERE ").append(getMultiplePrimaryFieldFilter(recordList)); if (!CollectionUtil.isEmpty(lastNotDeleteRecord)) { sql.append(" OR ("); sql.append(lastFilter); sql.append(" )"); } } DataTableEntity dt = targetDao.getList(sql.toString()); if (DataTableEntity.isEmpty(dt)) { return; } Map> groupTableRecord = new HashMap<>(); for (int i = 0; i < dt.getRows(); i++) { String tableName = dt.getString(i, "table_name_c_m_t"); String[] uniqueValues = new String[this.uniqueFields.length]; for (int k = 0; k < this.uniqueFields.length; k++) { uniqueValues[k] = dt.getString(i, this.uniqueFields[k]); } Record record; if ("2".equals(delType)) { record = new Record(new String[]{dt.getString(i, this.preMasterField)}, dt.getString(i, this.sourceInfoField)); } else { record = new Record(uniqueValues, null); } int index = recordList.indexOf(record); List r = groupTableRecord.computeIfAbsent(tableName, k -> new ArrayList<>()); if (index != -1) { record = recordList.get(index); if (record.getDeleteFlag() == 2) { String[] primaryValues = new String[this.uniqueFields.length]; for (int j = 0; j < this.uniqueFields.length; j++) { primaryValues[j] = dt.getString(i, this.uniqueFields[j]); } record.setPrimaryValue(primaryValues); record.setRecord(dt.getFieldSetEntity(i)); } r.add(record); } } for (Map.Entry> entry : groupTableRecord.entrySet()) { if (CollectionUtil.isEmpty(entry.getValue())) { entry.getValue().clear(); continue; } String filter = getMultiplePrimaryFieldFilter(entry.getValue()); if (StringUtils.isEmpty(filter)) { entry.getValue().clear(); continue; } targetDao.delete(entry.getKey(), filter); WriteData(entry.getValue(), dbe.getUuid() + "&_" + entry.getKey() + "&_archiving"); } List collect = groupTableRecord.values().stream().flatMap(item -> item.stream()).collect(Collectors.toList()); connection.commit(); list.removeAll(collect); } catch (Exception e) { throw e; } finally { if (connection != null) { connection.close(); } } } private String getMultiplePrimaryFieldFilter(Collection recordList) { if (CollectionUtil.isEmpty(recordList)) { return "1=2"; } StringBuilder sqlFilter = new StringBuilder(128); if (this.uniqueFields.length == 1) { List uniqueValues = new ArrayList<>(); for (Record item : recordList) { // Record item = CollectionUtil.get(recordList,j); if (ArrayUtil.isEmpty(item.getPrimaryValue())) { continue; } uniqueValues.add(item.getPrimaryValue()[0]); } if (uniqueValues.size() > 0) { sqlFilter.append(" (").append(BaseUtil.buildQuestionMarkFilter(this.uniqueFields[0], uniqueValues.toArray(), true)).append(" )"); } } else { for (Record item : recordList) { // Record item = CollectionUtil.get(recordList,j); if (ArrayUtil.isEmpty(item.getPrimaryValue())) { continue; } if (sqlFilter.length() > 0) { sqlFilter.append(" OR "); } sqlFilter.append("( "); for (int k = 0; k < this.uniqueFields.length; k++) { if (k > 0) { sqlFilter.append(" and "); } sqlFilter.append(this.uniqueFields[k]).append("='").append(item.getPrimaryValue()[k]).append("'"); } sqlFilter.append(" ) "); } } if (sqlFilter.length() == 0) { return "1=2"; } return sqlFilter.toString(); } private StringBuilder getUniqueValues(List recordList, StringBuilder sql) { if (recordList == null || recordList.isEmpty()) { return sql; } if (sql.length() > 0) { sql.append("UNION ALL \n"); } sql.append(" SELECT * "); // .append(ArrayUtil.join(ArrayUtil.addAll(this.uniqueFields, new String[]{this.preMasterField, this.sourceInfoField}), ",")); sql.append(" FROM ").append(this.targetTable); String sourceInfo = recordList.get(0).getSourceInfo().toLowerCase(Locale.ROOT); sql.append(" WHERE ").append(sourceInfoField).append("='").append(sourceInfo).append("' and ("); sql.append(BaseUtil.buildQuestionMarkFilter(this.preMasterField, recordList.stream().map(item -> item.getUniqueValue()[0]).toArray(), true)).append(" ) "); return sql; } /** * 查询删除记录 */ private void selectDelRecord(String uuid) { Object o = RedisUtil.get("sync-del-record:" + uuid); String filter = "DELETE_FLAG <> 1 and "; if (this.sourceDao.getDataBaseType().getValue() == DataBaseType.MYSQL.getValue()) { filter += " CREATE_TIME= date_add(DATE_FORMAT(now(),'%Y-%m-%d'),interval -2 DAY)"; } else { filter += " CREATE_TIME< sysdate-((1/24/60)*10) "; if (o != null) { filter += " and CREATE_TIME>=trunc(sysdate-2)"; } else { filter += " and CREATE_TIME>=trunc(sysdate-30)"; } } this.deleteTableRecord = this.sourceDao.getList(this.delRecordTable, filter, null, null); for (int i = 0; i < this.deleteTableRecord.getRows(); i++) { this.deleteTableRecord.setFieldValue(i, "source_info", StringUtils.lowerCase(this.deleteTableRecord.getString(i, "source_info"))); } RedisUtil.set("sync-del-record:" + uuid, "true"); } } /** * 创建oracle触发器sql * * @param tableName 要记录删除的表名 * @param uniqueFields 记录删除的主键或唯一值字段名 * @param randomSuffix 随机后缀 * @param delType 删除类型 0 主库删除 1 采集删除 * @param collectId 采集id * @return */ private String getCreateTriggerSql(String tableName, String[] uniqueFields, String randomSuffix, int delType, String collectField, String collectId) { StringBuffer sql = new StringBuffer(); sql.append(" create or replace trigger \"trigger_").append(randomSuffix).append("\" after delete on ").append(tableName); sql.append(" FOR EACH ROW BEGIN "); String idField = "ID_" + randomSuffix.toUpperCase(); sql.append(" insert into \"trigger_table_").append(randomSuffix).append("\" (" + idField + ",CREATE_TIME,DELETE_FLAG"); if (2 == delType) { sql.append(",SOURCE_INFO, "); } sql.append(ArrayUtil.join(uniqueFields, ",")); sql.append(")values(getnewid('trigger_table_"); sql.append(randomSuffix).append("'),sysdate,0"); if (2 == delType) { sql.append(",nvl(:OLD.").append(collectField.toUpperCase()).append(",'").append(collectId).append("'),"); sql.append(Arrays.stream(uniqueFields).map(item -> ":OLD." + item.toLowerCase()).collect(Collectors.joining())); sql.append(" "); } sql.append(" ); END; "); return sql.toString().toUpperCase(); } /** * 获取创建删除记录表sql * * @param randomSuffix * @return */ private String getCreateTableSql(String randomSuffix, int delType, String[] uniqueFields) { //修改创建字段请同步修改触发器中的insert sql StringBuffer sql = new StringBuffer(); String idField = "ID_" + randomSuffix.toUpperCase(); sql.append("CREATE TABLE \"trigger_table_").append(randomSuffix).append("\""); sql.append("( \"" + idField + "\" NUMBER(10,0) NOT NULL ENABLE,\n"); for (String uniqueField : uniqueFields) { sql.append(" \"" + uniqueField.toUpperCase() + "\" VARCHAR2(255) NOT NULL ENABLE,\n"); } if (2 == delType) { //采集 sql.append(" \"SOURCE_INFO\" VARCHAR2(50),\n"); } sql.append(" \"CREATE_TIME\" DATE,\n"); sql.append(" \"DELETE_FLAG\" NUMBER(2,0),\n"); sql.append(" PRIMARY KEY (\"" + idField + "\")\n"); sql.append(" )"); return sql.toString().toUpperCase(); } /** * 获取创建删除记录表sql * * @param randomSuffix * @return */ private String getDropTableSql(String randomSuffix) { //修改创建字段请同步修改触发器中的insert sql StringBuffer sql = new StringBuffer(); sql.append("DROP TABLE trigger_table_").append(randomSuffix); return sql.toString().toUpperCase(); } /** * 创建oracle触发器sql * * @param randomSuffix 随机后缀 * @return */ private String getDropTriggerSql(String randomSuffix) { StringBuffer sql = new StringBuffer(); sql.append(" DROP TRIGGER trigger_").append(randomSuffix); return sql.toString().toUpperCase(); } /** * 触发器是否存在 * * @param random_suffix * @param dao * @return */ private boolean triggerIsExist(String random_suffix, Dao dao) { FieldSetEntity one = dao.getOne("user_triggers", "TRIGGER_NAME=UPPER(?)", new String[]{"TRIGGER_NAME"}, new Object[]{"trigger_" + random_suffix}); if (FieldSetEntity.isEmpty(one)) { return false; } return !StringUtils.isEmpty(one.getString("trigger_name")); } /** * 表是否存在 * * @param random_suffix * @param dao * @return */ private boolean tableIsExist(String random_suffix, Dao dao) { FieldSetEntity one = dao.getOne("user_tables", "TABLE_NAME=UPPER(?)", new String[]{"TABLE_NAME"}, new Object[]{"trigger_table_" + random_suffix}); if (FieldSetEntity.isEmpty(one)) { return false; } return !StringUtils.isEmpty(one.getString("table_name")); } /** * 在来源库,创建删除记录表、触发器 * * @param trigger * @return */ public boolean createTriggerDDL(FieldSetEntity trigger) { String randomSuffix = trigger.getString("random_suffix"); if (StringUtils.isEmpty(randomSuffix)) { return false; } DataBaseEntity dbe = new DataBaseEntity(trigger.getString("data_source")); Dao dao = dbe.getDao(); try { String[] uniqueFields = trigger.getString("unique_field").split(","); Integer delType = trigger.getInteger("del_type"); if (!tableIsExist(randomSuffix, dao)) { dao.executeSql(getCreateTableSql(randomSuffix, delType, uniqueFields)); //初始化创建自增主键 mes dao.executeSql("declare ids number; begin ids:=getnewid('trigger_table_" + randomSuffix + "'); dbms_output.put_line(ids);end;"); } if (!triggerIsExist(randomSuffix, dao)) { try (Connection conn = dao.getConnection(); Statement statement = conn.createStatement()) { String createTriggerSql = getCreateTriggerSql(trigger.getString("table_name"), uniqueFields, randomSuffix, delType, trigger.getString("collect_field"), trigger.getString("collect_id")); SpringMVCContextHolder.getSystemLogger().info("创建触发器SQL:\n" + createTriggerSql); statement.execute(createTriggerSql); } } // dao.executeSql(); return true; } catch (SQLException e) { e.printStackTrace(); return false; } finally { dao.closeConnection(); } } /** * 删除触发器、删除记录表 * * @return */ public boolean deleteTriggerDDL(FieldSetEntity trigger) { if (FieldSetEntity.isEmpty(trigger)) { return false; } String randomSuffix = trigger.getString("random_suffix"); if (StringUtils.isEmpty(randomSuffix)) { return false; } //执行删除trigger ddl语句 DataBaseEntity dbe = new DataBaseEntity(trigger.getString("data_source")); Dao dao = dbe.getDao(); try { if (triggerIsExist(randomSuffix, dao)) { dao.executeSql(getDropTriggerSql(randomSuffix)); } if (tableIsExist(randomSuffix, dao)) { dao.executeSql(getDropTableSql(randomSuffix)); } } finally { dao.closeConnection(); baseDao.delete(trigger.getTableName(), new String[]{trigger.getUUID()}); } return true; } /** * 删除触发器、删除记录表 * * @param fs 数据采集配置 * @return */ public boolean deleteTrigger(FieldSetEntity fs) { FieldSetEntity trigger = baseDao.getFieldSetByFilter(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "config_uuid=?", new String[]{fs.getUUID()}, false); deleteTriggerDDL(trigger); return true; } /** * 入口 创建触发器,同步配置调用 */ public void createTrigger(DataTableEntity dt, String data_source, String target_data_source) { for (int i = 0; i < dt.getRows(); i++) { if (CoreConst.SYSTEM_DATA_OPERATE_DEL.equals(dt.getString(i, "~type~")) || !"1".equals(dt.getString(i, "is_used"))) { //删除子表数据 FieldSetEntity trigger = baseDao.getFieldSetByFilter(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "config_uuid=?", new String[]{dt.getFieldSetEntity(i).getUUID()}, false); deleteTriggerDDL(trigger); } else { if (StringUtils.isEmpty(dt.getString(i, CmnConst.UPDATE_TIME))) { //更新时间字段为空的是覆盖对比 continue; } dt.setFieldValue(i, "data_source", data_source); dt.setFieldValue(i, "target_data_source", target_data_source); dt.setFieldValue(i, "del_type", 1); dt.setFieldValue(i, "master_key_field", dt.getString(i, "unique_sign")); createTrigger(dt.getFieldSetEntity(i)); } } } /** * 入口 创建触发器,采集配置调用 * * @param fs */ public void createTrigger(FieldSetEntity fs) { //采集配置表 // if (!CmnConst.PRODUCT_SYS_DATA_SYNC_MES_SUB.equals(fs.getTableName())) { // fs.setValue("del_type", 2);// 2 子库删除后同步到主库 // fs.setValue("delete_sync_content", 1); // } String uuid = "2".equals(fs.getString("del_type")) ? fs.getString("config_uuid") : fs.getUUID(); FieldSetEntity trigger = null; // if (!StringUtils.isEmpty(uuid)) {//新增同步配置 trigger = baseDao.getFieldSetEntityByFilter(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "config_uuid=?", new String[]{uuid}, false); if (FieldSetEntity.isEmpty(trigger)) { trigger = null; } // } if ("1".equals(fs.getString("delete_sync_content"))) {//同步删除标识 if (trigger == null) { trigger = new FieldSetEntity(); trigger.setTableName(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG); trigger.setValue("random_suffix", RandomUtil.randomString(5)); trigger.setValue("config_uuid", uuid);//同步配置子表数据ID } trigger.setValue("table_name", fs.getValue("table_name")); trigger.setValue("data_source", fs.getValue("data_source")); trigger.setValue("del_type", fs.getValue("del_type")); trigger.setValue("unique_field", fs.getValue("master_key_field"));// trigger.setValue("target_data_source", fs.getValue("target_data_source")); //采集表专有的 trigger.setValue("collect_id", fs.getValue("collect_id")); trigger.setValue("collect_field", fs.getValue("collect_field")); trigger.setValue("pre_master_field", fs.getValue("pre_master_field")); BaseUtil.createCreatorAndCreationTime(trigger); baseDao.saveFieldSetEntity(trigger); createTriggerDDL(trigger); } else if (trigger != null) {//证明,不启动触发器,要删除 deleteTrigger(fs); } } }