package com.product.data.center.service;
|
|
import cn.hutool.core.codec.Base64;
|
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.collection.ListUtil;
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.io.file.FileWriter;
|
import cn.hutool.core.thread.ThreadUtil;
|
import cn.hutool.core.util.*;
|
import cn.hutool.crypto.SecureUtil;
|
import cn.hutool.crypto.symmetric.AES;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.product.core.cache.util.RedisUtil;
|
import com.product.core.config.CoreConst;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.utils.QuerySqlParseUtil;
|
import com.product.datasource.config.DataBaseType;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.util.BaseUtil;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.stereotype.Service;
|
import redis.clients.jedis.Jedis;
|
|
import javax.annotation.Resource;
|
import java.io.File;
|
import java.sql.Connection;
|
import java.sql.SQLException;
|
import java.sql.Statement;
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.LinkedTransferQueue;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/11/29 8:29
|
* @Desc 同步删除记录
|
*/
|
@Service
|
public class SyncDelRecordService {
|
@Resource
|
private BaseDao baseDao;
|
|
|
public void runTask() {
|
DataTableEntity configDt = baseDao.listTable(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG);
|
if (DataTableEntity.isEmpty(configDt)) {
|
return;
|
}
|
for (int i = 0; i < configDt.getRows(); i++) {
|
TriggerDeleteService triggerDeleteService =null;
|
try {
|
triggerDeleteService = new TriggerDeleteService(configDt.getFieldSetEntity(i));
|
if ("2".equals(triggerDeleteService.getDelType())) {
|
//子库采集表删除
|
triggerDeleteService.collectTableDelete();
|
} else {
|
//主库表删除
|
triggerDeleteService.mainDataBaseTableDelete();
|
}
|
} catch (Exception e) {
|
if(triggerDeleteService!=null){
|
Set<Dao> daoSet = triggerDeleteService.getDaoSet();
|
for (Dao dao : daoSet) {
|
dao.closeConnection();
|
}
|
}
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
System.out.println("完成");
|
}
|
|
private static ExecutorService singleThreadExecutor = null;
|
|
private static Queue<JSONObject> queue = null;
|
|
public static synchronized void WriteData(List<Record> records, String fileName) {
|
if (CollectionUtil.isEmpty(records)) {
|
return;
|
}
|
if (queue == null) {
|
queue = new LinkedTransferQueue<>();
|
}
|
if (StringUtils.isEmpty(fileName)) {
|
throw new BaseException(ErrorCode.FILENAME_CAN_NOT_EMPTY);
|
}
|
List<Object> collect = ListUtil.toList(records.stream().map(item -> new JSONObject((Map) item.getRecord().getValues())).toArray());
|
JSONArray objects = new JSONArray(collect);
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put(com.product.file.config.CmnConst.FILE_NAME, fileName);
|
jsonObject.put("data", objects);
|
|
byte[] key = Base64.decode("glwqQbMzmInerCSs9RoAWQ==");
|
AES aes = SecureUtil.aes(key);
|
queue.add(jsonObject);
|
if (singleThreadExecutor != null && !singleThreadExecutor.isTerminated()) {
|
return;
|
}
|
singleThreadExecutor = Executors.newSingleThreadExecutor();
|
// if(singleThreadExecutor.)
|
singleThreadExecutor.submit(() -> {
|
while (true) {
|
if (queue.isEmpty()) {
|
ThreadUtil.sleep(30000);
|
continue;
|
}
|
try {
|
JSONObject poll = queue.poll();
|
String name = poll.getString(com.product.file.config.CmnConst.FILE_NAME);
|
JSONArray data = poll.getJSONArray("data");
|
if (StringUtils.isEmpty(name) || (data == null || data.isEmpty())) {
|
SpringMVCContextHolder.getSystemLogger().error(new BaseException(ErrorCode.FILENAME_OR_DATA_EMPTY));
|
continue;
|
}
|
String now = DateUtil.format(new Date(), "yyyyMMdd");
|
String filePath = "." + File.separator + "触发器删除数据记录" + File.separator + now + File.separator + name + ".dbackups";
|
File file = new File(filePath);
|
File parentFile = file.getParentFile();
|
if (!parentFile.exists()) {
|
parentFile.mkdirs();
|
}
|
if (!file.exists()) {
|
file.createNewFile();
|
}
|
FileWriter writer = new FileWriter(file);
|
writer.append(aes.encryptBase64(data.toJSONString()));
|
writer.append("\n");
|
System.out.println("写入成功");
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
});
|
singleThreadExecutor.shutdown();
|
}
|
|
public static class Record {
|
|
private int deleteFlag = 1;
|
private Integer delTableId;
|
|
private String[] uniqueValue;
|
|
private String sourceInfo;
|
|
private String[] primaryValue;
|
|
private FieldSetEntity record;
|
|
public Record(Integer delTableId, String[] uniqueValue) {
|
this.delTableId = delTableId;
|
this.uniqueValue = uniqueValue;
|
}
|
|
public Record(Integer delTableId, String[] uniqueValue, String sourceInfo) {
|
this.delTableId = delTableId;
|
this.uniqueValue = uniqueValue;
|
this.sourceInfo = sourceInfo;
|
}
|
|
public Record(String uniqueValue[], String sourceInfo) {
|
this.uniqueValue = uniqueValue;
|
this.sourceInfo = sourceInfo;
|
}
|
|
public void setRecord(FieldSetEntity record) {
|
this.record = record;
|
}
|
|
public void setDeleteFlag(int deleteFlag) {
|
this.deleteFlag = deleteFlag;
|
}
|
|
public FieldSetEntity getRecord() {
|
return record;
|
}
|
|
public int getDeleteFlag() {
|
return deleteFlag;
|
}
|
|
public Integer getDelTableId() {
|
return delTableId;
|
}
|
|
public String[] getUniqueValue() {
|
return uniqueValue;
|
}
|
|
public void setUniqueValue(String[] uniqueValue) {
|
this.uniqueValue = uniqueValue;
|
}
|
|
public String getSourceInfo() {
|
return sourceInfo;
|
}
|
|
public String[] getPrimaryValue() {
|
return primaryValue;
|
}
|
|
public void setPrimaryValue(String[] primaryValue) {
|
this.primaryValue = primaryValue;
|
}
|
|
|
@Override
|
public boolean equals(Object o) {
|
if (this == o) return true;
|
if (!(o instanceof Record)) return false;
|
Record record = (Record) o;
|
return ArrayUtil.equals(uniqueValue, record.uniqueValue) && (Objects.equals(sourceInfo, record.sourceInfo));
|
}
|
|
@Override
|
public int hashCode() {
|
return Objects.hash(uniqueValue, sourceInfo);
|
}
|
}
|
|
|
private class TriggerDeleteService {
|
//删除记录数据源
|
private String dataSource;
|
//删除目标数据源
|
private String[] targetDataSource;
|
//表名
|
private String tableName;
|
//随机后缀
|
private String randomSuffix;
|
//唯一字段名称(多个组合唯一)
|
private String[] uniqueFields;
|
//子库存储主库唯一字段名称
|
private String preMasterField;
|
//删除记录表名称
|
private String delRecordTable;
|
//采集表id
|
private String collectId;
|
//采集来源记录字段
|
private String sourceInfoField;
|
// 删除类型 1 主库删除后同步删除子库 2 子库删除后同步到主库
|
private String delType;
|
//删除表数据记录
|
private DataTableEntity deleteTableRecord;
|
|
private Dao sourceDao;
|
|
private String targetTable;
|
|
public String getDelType() {
|
return delType;
|
}
|
|
private Set<Dao> daoSet=new HashSet<>();
|
|
public Set<Dao> getDaoSet() {
|
return daoSet;
|
}
|
|
public TriggerDeleteService(FieldSetEntity fse) throws SQLException {
|
//数据源
|
this.dataSource = fse.getString("data_source");
|
//目标数据源
|
this.targetDataSource = fse.getString("target_data_source").split(",");
|
//表名
|
this.tableName = StringUtils.lowerCase(fse.getString("table_name"));
|
//随机后缀
|
this.randomSuffix = fse.getString("random_suffix");
|
//唯一字段
|
this.uniqueFields = StringUtils.lowerCase(fse.getString("unique_field")).split(",");
|
//主库存储子库字段
|
this.preMasterField = StringUtils.lowerCase(fse.getString("pre_master_field"));
|
//删除记录表表名
|
this.delRecordTable = "trigger_table_" + this.randomSuffix;
|
// 删除类型 1 主库删除后同步删除子库 2 子库删除后同步到主库
|
this.delType = fse.getString("del_type");
|
//采集id
|
this.collectId = fse.getString("collect_id");
|
//采集ID字段
|
this.sourceInfoField = StringUtils.lowerCase(fse.getString("collect_field"));
|
if ("2".equals(this.getDelType())) {
|
//
|
StringBuilder sql = new StringBuilder();
|
sql.append("\n SELECT extract.* ");
|
sql.append("\n FROM ");
|
sql.append("\n product_sys_data_collect collect ");
|
sql.append("\n JOIN product_sys_data_extract_config extract ON ");
|
sql.append("\n upper( case when length(collect.target_table)>0 then ");
|
sql.append("\n collect.target_table else collect.source_table end ) = upper( extract.extract_source_table )");
|
sql.append(" where collect.uuid=? limit 1");
|
|
FieldSetEntity fs = baseDao.getFieldSetEntityBySQL(sql.toString(), new Object[]{fse.getString(CmnConst.CONFIG_UUID)}, false);
|
if (fs == null) {
|
throw new RuntimeException("提取配置不存在");
|
}
|
this.targetTable = fs.getString("extract_target_table");
|
}
|
if (StringUtils.isEmpty(targetTable)) {
|
this.targetTable = tableName;
|
}
|
this.targetTable = StringUtils.lowerCase(targetTable);
|
DataBaseEntity dbe = new DataBaseEntity(this.dataSource);
|
sourceDao = dbe.getDao();
|
daoSet.add(sourceDao);
|
Connection connection = sourceDao.getConnection();
|
connection.setAutoCommit(false);
|
try {
|
selectDelRecord(fse.getUUID());
|
|
} catch (Exception e) {
|
connection.close();
|
throw e;
|
}
|
}
|
|
/**
|
* 采集表删除同步到主
|
*/
|
public void collectTableDelete() throws SQLException {
|
Dao targetDao = null;
|
try {
|
if (DataTableEntity.isEmpty(this.deleteTableRecord)) {
|
return;
|
}
|
List<FieldSetEntity> data = this.deleteTableRecord.getData();
|
List<List<FieldSetEntity>> splitRecord = CollectionUtil.split(data, 1000);
|
for (List<FieldSetEntity> fieldSetEntityList : splitRecord) {
|
Map<String, List<Record>> groupUniqueValues = new HashMap<>();
|
fieldSetEntityList.stream().forEach(item -> {
|
List<Record> records = groupUniqueValues.computeIfAbsent(item.getString(this.sourceInfoField), k -> new ArrayList<>());
|
//子库的主键字段值
|
String[] uniqueValues = new String[this.uniqueFields.length];
|
for (int i = 0; i < this.uniqueFields.length; i++) {
|
uniqueValues[i] = item.getString(this.uniqueFields[i]);
|
}
|
Record record = new Record(item.getInteger(("ID_" + this.randomSuffix).toLowerCase()), uniqueValues, item.getString(this.sourceInfoField));
|
record.setDeleteFlag("2".equals(item.getString("delete_flag")) ? 2 : 1);
|
records.add(record);
|
});
|
|
if (groupUniqueValues.isEmpty()) {
|
return;
|
}
|
StringBuilder sql = new StringBuilder(128);
|
int archivingCount = 0;
|
for (List<Record> value : groupUniqueValues.values()) {
|
List<Record> collect = value.stream().filter(item -> item.getDeleteFlag() != 2).collect(Collectors.toList());
|
archivingCount += value.size() - collect.size();
|
if (!CollectionUtil.isEmpty(collect)) {
|
getUniqueValues(collect, sql);
|
}
|
}
|
if (sql.length() <= 0 && archivingCount == 0) {
|
return;
|
}
|
DataBaseEntity dbe = new DataBaseEntity(targetDataSource[0]);
|
targetDao = dbe.getDao();
|
daoSet.add(targetDao);
|
targetDao.getConnection().setAutoCommit(false);
|
if (sql.length() > 0) {
|
|
DataTableEntity dt = targetDao.getList(sql.toString(), (Object[]) null);
|
if (DataTableEntity.isEmpty(dt) && archivingCount == 0) {
|
return;
|
}
|
for (int i = 0; i < dt.getRows(); i++) {
|
//主库存的子库主键值
|
String preMaterKey = dt.getString(i, this.preMasterField);
|
//采集来源值
|
String sourceInfo = dt.getString(i, this.sourceInfoField);
|
List<Record> records = groupUniqueValues.get(sourceInfo);
|
if (CollectionUtil.isEmpty(records)) {
|
continue;
|
}
|
String[] primaryValue = new String[this.uniqueFields.length];
|
for (int i1 = 0; i1 < this.uniqueFields.length; i1++) {
|
primaryValue[i1] = dt.getString(i, this.uniqueFields[i1]);
|
}
|
int index = records.indexOf(new Record(new String[]{preMaterKey}, sourceInfo));
|
if (index < 0) {
|
continue;
|
}
|
Record record = records.get(index);
|
record.setPrimaryValue(primaryValue);
|
record.setRecord(dt.getFieldSetEntity(i));
|
}
|
}
|
|
sql.setLength(0);
|
sql.append(" DELETE FROM ").append(this.targetTable).append(" WHERE ");
|
int length = sql.length();
|
List<Record> delRecords = new ArrayList<>();
|
List<Record> lastRecord = new ArrayList<>();
|
if (archivingCount == 0 || archivingCount != this.deleteTableRecord.getRows()) {
|
for (List<Record> value : groupUniqueValues.values()) {
|
String sourceInfo = value.get(0).getSourceInfo();
|
String[] collect = value.stream().filter(item -> {
|
if (item.getDeleteFlag() == 2) {
|
lastRecord.add(item);
|
}
|
if (ArrayUtil.isEmpty(item.getPrimaryValue())) {
|
return false;
|
}
|
delRecords.add(item);
|
return true;
|
}).flatMap(item -> Arrays.stream(item.getPrimaryValue())).toArray(String[]::new);
|
if (ArrayUtil.isEmpty(collect)) {
|
continue;
|
}
|
if (length < sql.length()) {
|
sql.append(" OR ");
|
}
|
sql.append(" (").append(this.sourceInfoField).append("='").append(sourceInfo).append("' and (");
|
for (int i = 0; i < this.uniqueFields.length; i++) {
|
if (i > 0) {
|
sql.append(" AND ");
|
}
|
sql.append(BaseUtil.buildQuestionMarkFilter(uniqueFields[i], collect, true));
|
}
|
sql.append(" ) )");
|
}
|
if (lastRecord.size() > 0) {
|
delRecords.removeAll(lastRecord);
|
}
|
} else {
|
lastRecord.addAll(groupUniqueValues.values().stream().flatMap(item -> item.stream()).collect(Collectors.toList()));
|
}
|
if (sql.length() > length) {
|
targetDao.executeSql(sql.toString());
|
}
|
delRecords.addAll(lastRecord);
|
List<Record> allRecord = new ArrayList<>(delRecords);
|
|
// List<Record> records = new ArrayList<>(allRecord);
|
this.deleteArchivingData(allRecord);
|
delRecords.stream().forEach(item -> item.setDeleteFlag(1));
|
if (!CollectionUtil.isEmpty(allRecord)) {
|
//标记为归档未删除
|
allRecord.stream().forEach(item -> item.setDeleteFlag(2));
|
}
|
List<Record> collect = lastRecord.stream().filter(item -> item.getRecord() == null).collect(Collectors.toList());
|
delRecords.removeAll(collect);
|
updateRecordTableStatus(delRecords);
|
targetDao.getConnection().commit();
|
WriteData(delRecords, targetDataSource[0] + "&_" + tableName + "&_collect");
|
this.sourceDao.getConnection().commit();
|
}
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (targetDao != null) {
|
targetDao.closeConnection();
|
}
|
if (this.sourceDao != null) {
|
this.sourceDao.closeConnection();
|
}
|
}
|
}
|
|
/**
|
* 主库删除同步到子
|
*/
|
public void mainDataBaseTableDelete() throws SQLException {
|
DataBaseEntity[] dataBaseEntities = null;
|
|
try {
|
if (DataTableEntity.isEmpty(this.deleteTableRecord)) {
|
return;
|
}
|
List<Record> recordList = new ArrayList<>();
|
List<FieldSetEntity> data = this.deleteTableRecord.getData();
|
List<List<FieldSetEntity>> splitRecord = CollectionUtil.split(data, 1000);
|
for (List<FieldSetEntity> fieldSetEntityList : splitRecord) {
|
fieldSetEntityList.stream().forEach(item -> {
|
String[] uniqueValues = new String[this.uniqueFields.length];
|
for (int i = 0; i < this.uniqueFields.length; i++) {
|
uniqueValues[i] = item.getString(this.uniqueFields[i]);
|
}
|
if (ArrayUtil.isAllEmpty(uniqueValues)) {
|
return;
|
}
|
Record record = new Record(item.getInteger((this.randomSuffix + "_ID").toLowerCase()), uniqueValues);
|
record.setPrimaryValue(uniqueValues);
|
record.setDeleteFlag("2".equals(item.getString("delete_flag")) ? 2 : 1);
|
recordList.add(record);
|
});
|
dataBaseEntities = new DataBaseEntity[this.targetDataSource.length];
|
for (int i = 0; i < this.targetDataSource.length; i++) {
|
dataBaseEntities[i] = new DataBaseEntity(this.targetDataSource[i]);
|
Dao targetDao = dataBaseEntities[i].getDao();
|
daoSet.add(targetDao);
|
Connection connection = targetDao.getConnection();
|
connection.setAutoCommit(false);
|
DataTableEntity dt;
|
if (this.uniqueFields.length == 1) {
|
dt = targetDao.getList(tableName, BaseUtil.buildQuestionMarkFilter(this.uniqueFields[0], recordList.stream().map(item -> item.getUniqueValue()[0]).toArray(), true), null, null);
|
} else {
|
if (recordList.size() > 1000) {
|
List<List<Record>> split = ListUtil.split(recordList, 500);
|
dt = new DataTableEntity();
|
for (int j = 0; j < split.size(); j++) {
|
dt.addFieldSetEntity(targetDao.getList(tableName, getMultiplePrimaryFieldFilter(split.get(j)), null, null));
|
}
|
} else {
|
dt = targetDao.getList(tableName, getMultiplePrimaryFieldFilter(recordList), null, null);
|
}
|
|
}
|
for (Record record : recordList) {
|
int deleteFlag = record.getDeleteFlag();
|
if (2 == deleteFlag) {
|
if (dt == null) {
|
dt = new DataTableEntity();
|
}
|
FieldSetEntity fs = new FieldSetEntity();
|
fs.setTableName("temp");
|
for (int j = 0; j < this.uniqueFields.length; j++) {
|
fs.setValue(this.uniqueFields[j], record.getUniqueValue()[j]);
|
}
|
dt.addFieldSetEntity(fs);
|
}
|
}
|
if (DataTableEntity.isEmpty(dt)) {
|
continue;
|
}
|
// List<Record> records = recordList;
|
//在数据库查询到的视为全部能删除
|
// if (dt.getRows() != recordList.size()) {
|
|
// }
|
if (CollectionUtil.isEmpty(recordList)) {
|
return;
|
}
|
List<Record> records = new ArrayList<>();
|
Record record;
|
for (int j = 0; j < dt.getRows(); j++) {
|
String[] uniqueValues = new String[this.uniqueFields.length];
|
for (int k = 0; k < this.uniqueFields.length; k++) {
|
uniqueValues[k] = dt.getString(j, this.uniqueFields[k]);
|
}
|
record = new Record(uniqueValues, null);
|
int index = recordList.indexOf(record);
|
if (index == -1) {
|
continue;
|
}
|
record = recordList.get(index);
|
record.setRecord(dt.getFieldSetEntity(j));
|
records.add(record);
|
}
|
if (CollectionUtil.isEmpty(records)) {
|
return;
|
}
|
List<Record> recordSub = records.stream().filter(item -> item.getDeleteFlag() != 2 && item.getRecord() != null).collect(Collectors.toList());
|
if (!CollectionUtil.isEmpty(recordSub)) {
|
if (this.uniqueFields.length == 1) {
|
targetDao.delete(tableName, BaseUtil.buildQuestionMarkFilter(this.uniqueFields[0], recordSub.stream().map(item -> item.getUniqueValue()[0]).toArray(), true));
|
} else {
|
if (records.size() > 1000) {
|
List<List<Record>> split = ListUtil.split(recordSub, 500);
|
for (int j = 0; j < split.size(); j++) {
|
targetDao.delete(tableName, getMultiplePrimaryFieldFilter(split.get(j)));
|
}
|
} else {
|
targetDao.delete(tableName, getMultiplePrimaryFieldFilter(recordSub));
|
}
|
}
|
}
|
WriteData(recordSub, dataBaseEntities[i].getUuid() + "&_" + tableName + "&_master");
|
List<Record> deRecords = new ArrayList<>(records);
|
deleteArchivingData(deRecords);
|
records.stream().forEach(item -> item.setDeleteFlag(1));
|
if (!CollectionUtil.isEmpty(deRecords)) {
|
//标记为归档未删除
|
deRecords.stream().forEach(item -> item.setDeleteFlag(2));
|
}
|
updateRecordTableStatus(records);
|
}
|
for (DataBaseEntity dataBaseEntity : dataBaseEntities) {
|
Dao dao=dataBaseEntity.getDao();
|
daoSet.add(dao);
|
Connection connection = dao.getConnection();
|
connection.commit();
|
}
|
}
|
|
this.sourceDao.getConnection().commit();
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (!ArrayUtil.isEmpty(dataBaseEntities)) {
|
for (DataBaseEntity dataBaseEntity : dataBaseEntities) {
|
dataBaseEntity.getDao().closeConnection();
|
}
|
}
|
this.sourceDao.closeConnection();
|
}
|
}
|
|
/**
|
* 更新触发器删除记录状态
|
*
|
* @param recordList
|
*/
|
private void updateRecordTableStatus(List<Record> recordList) {
|
if (CollectionUtil.isEmpty(recordList)) {
|
return;
|
}
|
Map<Integer, List<Integer>> collect = recordList.stream().collect(Collectors.groupingBy(item -> Integer.valueOf(item.getDeleteFlag()), Collectors.mapping(item -> item.getDelTableId(), Collectors.toList())));
|
for (Map.Entry<Integer, List<Integer>> entry : collect.entrySet()) {
|
List<Integer> value = entry.getValue();
|
value.add(0, entry.getKey());
|
this.sourceDao.executeSql("update " + this.delRecordTable + " set DELETE_FLAG=? WHERE " + BaseUtil.buildQuestionMarkFilter("ID_" + this.randomSuffix, entry.getValue().size() - 1, true), entry.getValue().toArray());
|
}
|
}
|
|
|
/**
|
* 删除归档库
|
*
|
* @param list
|
* @return
|
* @throws SQLException
|
*/
|
private void deleteArchivingData(List<Record> list) throws SQLException {
|
if (CollectionUtil.isEmpty(list)) {
|
return;
|
}
|
List<Record> recordList = new ArrayList<>(new HashSet<>(list));
|
|
FieldSetEntity fse = baseDao.getFieldSetByFilter(CmnConst.DATA_ARCHIVING_TABLE, "UPPER(source_table)=Upper(?)", new Object[]{targetTable}, false);
|
if (FieldSetEntity.isEmpty(fse)) {
|
return;
|
}
|
String targetDataSource = fse.getString(CmnConst.TARGET_DATA_SOURCE);
|
String targetTablePrefix = fse.getString("target_table_prefix");
|
// String targetDataSource = "c94c19cf-86a8-4b84-85ed-3f3f90d7ecca";
|
// String targetTablePrefix = "da_" + this.tableName;
|
DataBaseEntity dbe = new DataBaseEntity(targetDataSource);
|
Dao targetDao = dbe.getDao();
|
daoSet.add(targetDao);
|
Connection connection = null;
|
try {
|
connection = targetDao.getConnection();
|
connection.setAutoCommit(false);
|
Set<String> allTableName = QuerySqlParseUtil.getAllTableName(targetDao, dbe.getDbName(), targetTablePrefix);
|
if (CollectionUtil.isEmpty(allTableName)) {
|
allTableName = new HashSet<>();
|
allTableName.add(targetTablePrefix);
|
}
|
StringBuilder lastFilter = null;
|
List<Record> lastNotDeleteRecord = recordList.stream().filter(item -> item.getDeleteFlag() == 2).collect(Collectors.toList());
|
if (!CollectionUtil.isEmpty(lastNotDeleteRecord)) {
|
if ("2".equals(delType)) {
|
lastFilter = new StringBuilder();
|
Map<String, List<Record>> collect = lastNotDeleteRecord.stream().collect(
|
Collectors.groupingBy(item -> StringUtils.isEmpty(item.getSourceInfo()) ? this.collectId : item.getSourceInfo()));
|
for (Map.Entry<String, List<Record>> entry : collect.entrySet()) {
|
if (lastFilter.length() > 0) {
|
lastFilter.append(" OR ");
|
}
|
lastFilter.append(" (");
|
lastFilter.append(BaseUtil.buildQuestionMarkFilter(this.preMasterField,
|
entry.getValue().stream().map(item -> item.getUniqueValue()[0]).toArray(), true));
|
lastFilter.append(" and ").append(this.sourceInfoField).append("= '").append(entry.getKey()).append("' )");
|
}
|
} else {
|
lastFilter = new StringBuilder(getMultiplePrimaryFieldFilter(lastNotDeleteRecord));
|
}
|
}
|
StringBuilder sql = new StringBuilder();
|
for (int i = 0; i < allTableName.size(); i++) {
|
String tableName = CollectionUtil.get(allTableName, i);
|
if (i > 0) {
|
sql.append(" UNION ALL \n");
|
}
|
sql.append(" SELECT *");
|
// .append(ArrayUtil.join(this.uniqueFields, ","));
|
sql.append(",'").append(tableName).append("' table_name_c_m_t ");
|
if ("2".equals(delType)) {
|
// sql.append(",").append(this.sourceInfoField).append(",").append(preMasterField);
|
}
|
sql.append(" FROM ").append(tableName);
|
sql.append(" WHERE ").append(getMultiplePrimaryFieldFilter(recordList));
|
if (!CollectionUtil.isEmpty(lastNotDeleteRecord)) {
|
sql.append(" OR (");
|
sql.append(lastFilter);
|
sql.append(" )");
|
}
|
}
|
DataTableEntity dt = targetDao.getList(sql.toString());
|
if (DataTableEntity.isEmpty(dt)) {
|
return;
|
}
|
Map<String, List<Record>> groupTableRecord = new HashMap<>();
|
for (int i = 0; i < dt.getRows(); i++) {
|
String tableName = dt.getString(i, "table_name_c_m_t");
|
String[] uniqueValues = new String[this.uniqueFields.length];
|
for (int k = 0; k < this.uniqueFields.length; k++) {
|
uniqueValues[k] = dt.getString(i, this.uniqueFields[k]);
|
}
|
Record record;
|
if ("2".equals(delType)) {
|
record = new Record(new String[]{dt.getString(i, this.preMasterField)}, dt.getString(i, this.sourceInfoField));
|
} else {
|
record = new Record(uniqueValues, null);
|
}
|
int index = recordList.indexOf(record);
|
List<Record> r = groupTableRecord.computeIfAbsent(tableName, k -> new ArrayList<>());
|
if (index != -1) {
|
record = recordList.get(index);
|
if (record.getDeleteFlag() == 2) {
|
String[] primaryValues = new String[this.uniqueFields.length];
|
for (int j = 0; j < this.uniqueFields.length; j++) {
|
primaryValues[j] = dt.getString(i, this.uniqueFields[j]);
|
}
|
record.setPrimaryValue(primaryValues);
|
record.setRecord(dt.getFieldSetEntity(i));
|
}
|
r.add(record);
|
}
|
}
|
for (Map.Entry<String, List<Record>> entry : groupTableRecord.entrySet()) {
|
if (CollectionUtil.isEmpty(entry.getValue())) {
|
entry.getValue().clear();
|
continue;
|
}
|
String filter = getMultiplePrimaryFieldFilter(entry.getValue());
|
if (StringUtils.isEmpty(filter)) {
|
entry.getValue().clear();
|
continue;
|
}
|
targetDao.delete(entry.getKey(), filter);
|
WriteData(entry.getValue(), dbe.getUuid() + "&_" + entry.getKey() + "&_archiving");
|
}
|
List<Record> collect = groupTableRecord.values().stream().flatMap(item -> item.stream()).collect(Collectors.toList());
|
connection.commit();
|
list.removeAll(collect);
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (connection != null) {
|
connection.close();
|
}
|
}
|
}
|
|
private String getMultiplePrimaryFieldFilter(Collection<Record> recordList) {
|
if (CollectionUtil.isEmpty(recordList)) {
|
return "1=2";
|
}
|
StringBuilder sqlFilter = new StringBuilder(128);
|
if (this.uniqueFields.length == 1) {
|
List<String> uniqueValues = new ArrayList<>();
|
for (Record item : recordList) {
|
// Record item = CollectionUtil.get(recordList,j);
|
if (ArrayUtil.isEmpty(item.getPrimaryValue())) {
|
continue;
|
}
|
uniqueValues.add(item.getPrimaryValue()[0]);
|
}
|
if (uniqueValues.size() > 0) {
|
sqlFilter.append(" (").append(BaseUtil.buildQuestionMarkFilter(this.uniqueFields[0], uniqueValues.toArray(), true)).append(" )");
|
}
|
} else {
|
for (Record item : recordList) {
|
// Record item = CollectionUtil.get(recordList,j);
|
if (ArrayUtil.isEmpty(item.getPrimaryValue())) {
|
continue;
|
}
|
if (sqlFilter.length() > 0) {
|
sqlFilter.append(" OR ");
|
}
|
sqlFilter.append("( ");
|
for (int k = 0; k < this.uniqueFields.length; k++) {
|
if (k > 0) {
|
sqlFilter.append(" and ");
|
}
|
sqlFilter.append(this.uniqueFields[k]).append("='").append(item.getPrimaryValue()[k]).append("'");
|
}
|
sqlFilter.append(" ) ");
|
}
|
}
|
if (sqlFilter.length() == 0) {
|
return "1=2";
|
}
|
return sqlFilter.toString();
|
}
|
|
|
private StringBuilder getUniqueValues(List<Record> recordList, StringBuilder sql) {
|
if (recordList == null || recordList.isEmpty()) {
|
return sql;
|
}
|
if (sql.length() > 0) {
|
sql.append("UNION ALL \n");
|
}
|
sql.append(" SELECT * ");
|
// .append(ArrayUtil.join(ArrayUtil.addAll(this.uniqueFields, new String[]{this.preMasterField, this.sourceInfoField}), ","));
|
sql.append(" FROM ").append(this.targetTable);
|
String sourceInfo = recordList.get(0).getSourceInfo().toLowerCase(Locale.ROOT);
|
sql.append(" WHERE ").append(sourceInfoField).append("='").append(sourceInfo).append("' and (");
|
sql.append(BaseUtil.buildQuestionMarkFilter(this.preMasterField, recordList.stream().map(item -> item.getUniqueValue()[0]).toArray(), true)).append(" ) ");
|
return sql;
|
}
|
|
/**
|
* 查询删除记录
|
*/
|
private void selectDelRecord(String uuid) {
|
Object o = RedisUtil.get("sync-del-record:" + uuid);
|
String filter = "DELETE_FLAG <> 1 and ";
|
if (this.sourceDao.getDataBaseType().getValue() == DataBaseType.MYSQL.getValue()) {
|
filter += " CREATE_TIME<date_add(now(),interval -5 minute) and CREATE_TIME>= date_add(DATE_FORMAT(now(),'%Y-%m-%d'),interval -2 DAY)";
|
} else {
|
filter += " CREATE_TIME< sysdate-((1/24/60)*10) ";
|
if (o != null) {
|
filter += " and CREATE_TIME>=trunc(sysdate-2)";
|
} else {
|
filter += " and CREATE_TIME>=trunc(sysdate-30)";
|
}
|
}
|
this.deleteTableRecord = this.sourceDao.getList(this.delRecordTable, filter, null, null);
|
for (int i = 0; i < this.deleteTableRecord.getRows(); i++) {
|
this.deleteTableRecord.setFieldValue(i, "source_info", StringUtils.lowerCase(this.deleteTableRecord.getString(i, "source_info")));
|
}
|
RedisUtil.set("sync-del-record:" + uuid, "true");
|
}
|
|
}
|
|
|
/**
|
* 创建oracle触发器sql
|
*
|
* @param tableName 要记录删除的表名
|
* @param uniqueFields 记录删除的主键或唯一值字段名
|
* @param randomSuffix 随机后缀
|
* @param delType 删除类型 0 主库删除 1 采集删除
|
* @param collectId 采集id
|
* @return
|
*/
|
private String getCreateTriggerSql(String tableName, String[] uniqueFields, String randomSuffix, int delType, String collectField, String collectId) {
|
StringBuffer sql = new StringBuffer();
|
sql.append(" create or replace trigger \"trigger_").append(randomSuffix).append("\" after delete on ").append(tableName);
|
sql.append(" FOR EACH ROW BEGIN ");
|
String idField = "ID_" + randomSuffix.toUpperCase();
|
sql.append(" insert into \"trigger_table_").append(randomSuffix).append("\" (" + idField + ",CREATE_TIME,DELETE_FLAG");
|
if (2 == delType) {
|
sql.append(",SOURCE_INFO, ");
|
}
|
sql.append(ArrayUtil.join(uniqueFields, ","));
|
sql.append(")values(getnewid('trigger_table_");
|
sql.append(randomSuffix).append("'),sysdate,0");
|
if (2 == delType) {
|
sql.append(",nvl(:OLD.").append(collectField.toUpperCase()).append(",'").append(collectId).append("'),");
|
sql.append(Arrays.stream(uniqueFields).map(item -> ":OLD." + item.toLowerCase()).collect(Collectors.joining()));
|
sql.append(" ");
|
}
|
sql.append(" ); END; ");
|
return sql.toString().toUpperCase();
|
}
|
|
/**
|
* 获取创建删除记录表sql
|
*
|
* @param randomSuffix
|
* @return
|
*/
|
private String getCreateTableSql(String randomSuffix, int delType, String[] uniqueFields) {
|
//修改创建字段请同步修改触发器中的insert sql
|
StringBuffer sql = new StringBuffer();
|
String idField = "ID_" + randomSuffix.toUpperCase();
|
sql.append("CREATE TABLE \"trigger_table_").append(randomSuffix).append("\"");
|
sql.append("( \"" + idField + "\" NUMBER(10,0) NOT NULL ENABLE,\n");
|
for (String uniqueField : uniqueFields) {
|
sql.append(" \"" + uniqueField.toUpperCase() + "\" VARCHAR2(255) NOT NULL ENABLE,\n");
|
}
|
if (2 == delType) {
|
//采集
|
sql.append(" \"SOURCE_INFO\" VARCHAR2(50),\n");
|
}
|
sql.append(" \"CREATE_TIME\" DATE,\n");
|
sql.append(" \"DELETE_FLAG\" NUMBER(2,0),\n");
|
sql.append(" PRIMARY KEY (\"" + idField + "\")\n");
|
sql.append(" )");
|
return sql.toString().toUpperCase();
|
}
|
|
/**
|
* 获取创建删除记录表sql
|
*
|
* @param randomSuffix
|
* @return
|
*/
|
private String getDropTableSql(String randomSuffix) {
|
//修改创建字段请同步修改触发器中的insert sql
|
StringBuffer sql = new StringBuffer();
|
sql.append("DROP TABLE trigger_table_").append(randomSuffix);
|
return sql.toString().toUpperCase();
|
}
|
|
/**
|
* 创建oracle触发器sql
|
*
|
* @param randomSuffix 随机后缀
|
* @return
|
*/
|
private String getDropTriggerSql(String randomSuffix) {
|
StringBuffer sql = new StringBuffer();
|
sql.append(" DROP TRIGGER trigger_").append(randomSuffix);
|
return sql.toString().toUpperCase();
|
}
|
|
/**
|
* 触发器是否存在
|
*
|
* @param random_suffix
|
* @param dao
|
* @return
|
*/
|
private boolean triggerIsExist(String random_suffix, Dao dao) {
|
FieldSetEntity one = dao.getOne("user_triggers", "TRIGGER_NAME=UPPER(?)", new String[]{"TRIGGER_NAME"}, new Object[]{"trigger_" + random_suffix});
|
if (FieldSetEntity.isEmpty(one)) {
|
return false;
|
}
|
return !StringUtils.isEmpty(one.getString("trigger_name"));
|
}
|
|
/**
|
* 表是否存在
|
*
|
* @param random_suffix
|
* @param dao
|
* @return
|
*/
|
private boolean tableIsExist(String random_suffix, Dao dao) {
|
FieldSetEntity one = dao.getOne("user_tables", "TABLE_NAME=UPPER(?)", new String[]{"TABLE_NAME"}, new Object[]{"trigger_table_" + random_suffix});
|
if (FieldSetEntity.isEmpty(one)) {
|
return false;
|
}
|
return !StringUtils.isEmpty(one.getString("table_name"));
|
}
|
|
/**
|
* 在来源库,创建删除记录表、触发器
|
*
|
* @param trigger
|
* @return
|
*/
|
public boolean createTriggerDDL(FieldSetEntity trigger) {
|
String randomSuffix = trigger.getString("random_suffix");
|
if (StringUtils.isEmpty(randomSuffix)) {
|
return false;
|
}
|
DataBaseEntity dbe = new DataBaseEntity(trigger.getString("data_source"));
|
Dao dao = dbe.getDao();
|
try {
|
String[] uniqueFields = trigger.getString("unique_field").split(",");
|
Integer delType = trigger.getInteger("del_type");
|
if (!tableIsExist(randomSuffix, dao)) {
|
dao.executeSql(getCreateTableSql(randomSuffix, delType, uniqueFields));
|
//初始化创建自增主键 mes
|
dao.executeSql("declare ids number; begin ids:=getnewid('trigger_table_" + randomSuffix + "'); dbms_output.put_line(ids);end;");
|
}
|
if (!triggerIsExist(randomSuffix, dao)) {
|
try (Connection conn = dao.getConnection(); Statement statement = conn.createStatement()) {
|
String createTriggerSql = getCreateTriggerSql(trigger.getString("table_name"), uniqueFields, randomSuffix, delType, trigger.getString("collect_field"), trigger.getString("collect_id"));
|
SpringMVCContextHolder.getSystemLogger().info("创建触发器SQL:\n" + createTriggerSql);
|
statement.execute(createTriggerSql);
|
}
|
}
|
// dao.executeSql();
|
return true;
|
} catch (SQLException e) {
|
e.printStackTrace();
|
return false;
|
} finally {
|
dao.closeConnection();
|
}
|
}
|
|
/**
|
* 删除触发器、删除记录表
|
*
|
* @return
|
*/
|
public boolean deleteTriggerDDL(FieldSetEntity trigger) {
|
if (FieldSetEntity.isEmpty(trigger)) {
|
return false;
|
}
|
String randomSuffix = trigger.getString("random_suffix");
|
if (StringUtils.isEmpty(randomSuffix)) {
|
return false;
|
}
|
//执行删除trigger ddl语句
|
DataBaseEntity dbe = new DataBaseEntity(trigger.getString("data_source"));
|
Dao dao = dbe.getDao();
|
try {
|
|
if (triggerIsExist(randomSuffix, dao)) {
|
dao.executeSql(getDropTriggerSql(randomSuffix));
|
}
|
if (tableIsExist(randomSuffix, dao)) {
|
dao.executeSql(getDropTableSql(randomSuffix));
|
}
|
} finally {
|
dao.closeConnection();
|
baseDao.delete(trigger.getTableName(), new String[]{trigger.getUUID()});
|
}
|
return true;
|
}
|
|
/**
|
* 删除触发器、删除记录表
|
*
|
* @param fs 数据采集配置
|
* @return
|
*/
|
public boolean deleteTrigger(FieldSetEntity fs) {
|
FieldSetEntity trigger = baseDao.getFieldSetByFilter(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "config_uuid=?", new String[]{fs.getUUID()}, false);
|
deleteTriggerDDL(trigger);
|
return true;
|
}
|
|
/**
|
* 入口 创建触发器,同步配置调用
|
*/
|
public void createTrigger(DataTableEntity dt, String data_source, String target_data_source) {
|
|
for (int i = 0; i < dt.getRows(); i++) {
|
|
if (CoreConst.SYSTEM_DATA_OPERATE_DEL.equals(dt.getString(i, "~type~")) || !"1".equals(dt.getString(i, "is_used"))) {
|
//删除子表数据
|
FieldSetEntity trigger = baseDao.getFieldSetByFilter(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "config_uuid=?", new String[]{dt.getFieldSetEntity(i).getUUID()}, false);
|
deleteTriggerDDL(trigger);
|
} else {
|
if (StringUtils.isEmpty(dt.getString(i, CmnConst.UPDATE_TIME))) {
|
//更新时间字段为空的是覆盖对比
|
continue;
|
}
|
|
dt.setFieldValue(i, "data_source", data_source);
|
dt.setFieldValue(i, "target_data_source", target_data_source);
|
dt.setFieldValue(i, "del_type", 1);
|
dt.setFieldValue(i, "master_key_field", dt.getString(i, "unique_sign"));
|
createTrigger(dt.getFieldSetEntity(i));
|
}
|
}
|
}
|
|
|
/**
|
* 入口 创建触发器,采集配置调用
|
*
|
* @param fs
|
*/
|
public void createTrigger(FieldSetEntity fs) {
|
//采集配置表
|
// if (!CmnConst.PRODUCT_SYS_DATA_SYNC_MES_SUB.equals(fs.getTableName())) {
|
// fs.setValue("del_type", 2);// 2 子库删除后同步到主库
|
// fs.setValue("delete_sync_content", 1);
|
// }
|
|
String uuid = "2".equals(fs.getString("del_type")) ? fs.getString("config_uuid") : fs.getUUID();
|
FieldSetEntity trigger = null;
|
// if (!StringUtils.isEmpty(uuid)) {//新增同步配置
|
trigger = baseDao.getFieldSetEntityByFilter(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG, "config_uuid=?", new String[]{uuid}, false);
|
if (FieldSetEntity.isEmpty(trigger)) {
|
trigger = null;
|
}
|
// }
|
if ("1".equals(fs.getString("delete_sync_content"))) {//同步删除标识
|
if (trigger == null) {
|
trigger = new FieldSetEntity();
|
trigger.setTableName(CmnConst.PRODUCT_MES_DEL_RECORD_CONFIG);
|
trigger.setValue("random_suffix", RandomUtil.randomString(5));
|
|
trigger.setValue("config_uuid", uuid);//同步配置子表数据ID
|
}
|
trigger.setValue("table_name", fs.getValue("table_name"));
|
trigger.setValue("data_source", fs.getValue("data_source"));
|
trigger.setValue("del_type", fs.getValue("del_type"));
|
trigger.setValue("unique_field", fs.getValue("master_key_field"));//
|
trigger.setValue("target_data_source", fs.getValue("target_data_source"));
|
//采集表专有的
|
trigger.setValue("collect_id", fs.getValue("collect_id"));
|
trigger.setValue("collect_field", fs.getValue("collect_field"));
|
trigger.setValue("pre_master_field", fs.getValue("pre_master_field"));
|
BaseUtil.createCreatorAndCreationTime(trigger);
|
baseDao.saveFieldSetEntity(trigger);
|
createTriggerDDL(trigger);
|
} else if (trigger != null) {//证明,不启动触发器,要删除
|
deleteTrigger(fs);
|
}
|
}
|
}
|