package com.product.data.center.service;
|
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.TimeInterval;
|
import cn.hutool.core.thread.ThreadUtil;
|
import cn.hutool.core.util.ArrayUtil;
|
import cn.hutool.core.util.RandomUtil;
|
import com.alibaba.fastjson.JSONObject;
|
import com.google.common.collect.Lists;
|
import com.google.common.collect.Maps;
|
import com.google.common.collect.Sets;
|
import com.product.core.cache.util.RedisUtil;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.entity.JournalEntity;
|
import com.product.data.center.utils.CustomLock;
|
import com.product.data.center.utils.QuerySqlParseUtil;
|
import com.product.data.center.utils.SqlTransferUtil;
|
import com.product.data.center.utils.WriteUtil;
|
import com.product.data.service.SyncDataConfigService;
|
import com.product.datasource.config.DataBaseType;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.datasource.service.RedisService;
|
import com.product.util.BaseUtil;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.Resource;
|
import java.sql.Connection;
|
import java.sql.DatabaseMetaData;
|
import java.sql.ResultSet;
|
import java.sql.SQLException;
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/7/26 9:41
|
* @Desc 数据归档
|
*/
|
@Service
|
public class DataArchivingService extends AbstractBaseService {
|
|
@Autowired
|
JournalManagerService journalManagerService;
|
@Autowired
|
DataArchivingQueue dataArchivingQueue;
|
|
@Resource
|
private SyncDataConfigService syncDataConfigService;
|
|
private static CustomLock lock = new CustomLock();
|
|
private Map<String, String> getCollectIds(String[] deleteDataSource, String tableName) {
|
StringBuilder sql = new StringBuilder("SELECT ");
|
sql.append("id,data_source FROM product_sys_data_collect WHERE ");
|
List<Object> params = new ArrayList<>();
|
for (int i = 0; i < deleteDataSource.length; i++) {
|
String source = deleteDataSource[i];
|
if (i > 0) {
|
sql.append(" or ");
|
}
|
sql.append("( data_source = ? and LOWER(source_table) = ? )");
|
params.add(source);
|
params.add(tableName.toLowerCase());
|
}
|
DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), params.toArray());
|
if (DataTableEntity.isEmpty(dataTableEntity)) {
|
return null;
|
}
|
Map<String, String> collect = dataTableEntity.getData().stream().collect(Collectors.toMap(item -> item.getString("data_source"), item -> item.getString("id")));
|
|
if (collect.size() != deleteDataSource.length) {
|
throw new BaseException(ErrorCode.GET_COLLECT_ID_FAIL);
|
}
|
|
return collect;
|
}
|
|
public void reDeal(String logUuid) {
|
FieldSetEntity fs = getBaseDao().getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUuid, false);
|
if (!FieldSetEntity.isEmpty(fs)) {
|
String configUid = fs.getString(CmnConst.CONFIG_UUID);
|
reDealArchiving(configUid);
|
}
|
fs.setValue(CmnConst.DEAL_FLAG, 1);
|
getBaseDao().update(fs);
|
}
|
|
@Async
|
public void reDealArchiving(String uuid) {
|
this.dataArchivingEntry(uuid);
|
}
|
|
/**
|
* 扫码库删除
|
*
|
* @param configFse
|
* @throws BaseException
|
*/
|
private String sweepCodeLibrary(FieldSetEntity configFse) throws BaseException, SQLException {
|
JournalEntity journalEntity = new JournalEntity();
|
journalEntity.setDetail(3);
|
TimeInterval timer = DateUtil.timer();
|
int deleteSuccessCount = 0;
|
try {
|
String deleteDataSource = configFse.getString("delete_data_source");
|
WriteUtil.append("DA-deleteDataSource:" + deleteDataSource);
|
WriteUtil.append("DA-sub_delete_select_filter:" + configFse.getString("delete_select_filter"));
|
if (StringUtils.isEmpty(deleteDataSource)) {
|
return null;
|
}
|
String[] deleteDataSourceArray = deleteDataSource.split(",");
|
// DataBaseEntity dbe = new DataBaseEntity(deleteDataSource);
|
|
String source_data_validation = configFse.getString("source_data_validation");
|
if (StringUtils.isEmpty(source_data_validation)) {
|
return null;
|
}
|
String deleteDataTable = configFse.getString("delete_data_table");
|
if (StringUtils.isEmpty(deleteDataTable)) {
|
return null;
|
}
|
|
String collectIdField = configFse.getString("collect_id_field");
|
|
DataBaseEntity validationDbe = new DataBaseEntity(source_data_validation);
|
|
//扫码库删除唯一标识
|
String deleteUniqueField = configFse.getString("delete_unique_field");
|
|
//验证表
|
String validationTableName = configFse.getString("table_data_validation");
|
//验证唯一字段
|
String validationUniqueField = configFse.getString("validation_unique_field");
|
//忽略对比字段
|
List<String> ignoreComparisonFields =
|
!StringUtils.isEmpty(configFse.getString("ignore_comparison_field")) ?
|
Lists.newArrayList(Arrays.asList(configFse.getString("ignore_comparison_field").split(","))) : null;
|
int pageSize = 500;
|
DataTableEntity list;
|
Dao dao = null;
|
int currentCount = 0;
|
Map<String, String> collectIds = getCollectIds(deleteDataSource.split(","), deleteDataTable);
|
try {
|
String minID = null;
|
String maxID = null;
|
for (int j = 0; j < deleteDataSourceArray.length; j++) {
|
if (dao != null) {
|
dao.closeConnection();
|
}
|
dao = new DataBaseEntity(deleteDataSourceArray[j]).getDao();
|
do {
|
// String sql = "SELECT * FROM (SELECT * from " + deleteDataTable + " where " + configFse.getString("delete_select_filter") + " order by " + deleteUniqueField + " ) A ";
|
// if (!Objects.isNull(minID) && !Objects.isNull(maxID)) {
|
// sql += " where " + deleteUniqueField + " > " + maxID;
|
// }
|
String filter = configFse.getString("delete_select_filter");
|
if (!Objects.isNull(minID) && !Objects.isNull(maxID)) {
|
filter += " and ( " + deleteUniqueField + " > " + maxID + ")";
|
}
|
list = dao.getList(deleteDataTable, filter, new Object[]{}, deleteUniqueField, 1, pageSize);
|
WriteUtil.append("DA-删除子库数据-表名:" + deleteDataTable);
|
if (DataTableEntity.isEmpty(list)) {
|
break;
|
}
|
currentCount = list.getRows();
|
Object[] uniqueValues = list.getData().stream().map(item -> item.getString(deleteUniqueField)).toArray();
|
DataTableEntity validationData = validationDbe.getDao().getList(validationTableName, collectIdField + "= ? AND " +
|
BaseUtil.buildQuestionMarkFilter(validationUniqueField, uniqueValues.length, true), ArrayUtil.addAll(new Object[]{collectIds.get(deleteDataSourceArray[j])}, uniqueValues));
|
validationDbe.getDao().closeConnection();
|
if (DataTableEntity.isEmpty(validationData)) {
|
break;
|
}
|
Map<String, Map> collectMap = validationData.getData().stream().collect(Collectors.toMap(
|
(item) -> item.getString(validationUniqueField),
|
item -> item.getValues()));
|
List<String> deleteUniqueValueList = new ArrayList<>();
|
for (int i = 0; i < list.getRows(); i++) {
|
FieldSetEntity fs = list.getFieldSetEntity(i);
|
String uniqueValue = fs.getString(deleteUniqueField);
|
Map<String, Object> map = collectMap.get(uniqueValue);
|
if (ignoreComparisonFields != null && map != null) {
|
for (int i1 = 0; i1 < ignoreComparisonFields.size(); i1++) {
|
try {
|
map.remove(ignoreComparisonFields.get(i1));
|
fs.remove(ignoreComparisonFields.get(i1));
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
if (map == null || !new JSONObject((Map) fs.getValues()).equals(new JSONObject(map))) {
|
list.removeFieldSetEntity(i);
|
i--;
|
continue;
|
}
|
deleteUniqueValueList.add(uniqueValue);
|
minID = getAimID(minID, uniqueValue, 0);
|
maxID = getAimID(maxID, uniqueValue, 1);
|
}
|
collectMap.clear();
|
if (deleteUniqueValueList.size() > 0) {
|
deleteSuccessCount += dao.deleteRInt(configFse.getString("delete_data_table"),
|
BaseUtil.buildQuestionMarkFilter(deleteUniqueField, deleteUniqueValueList.size(), true), deleteUniqueValueList.toArray());
|
WriteUtil.append("DA-删除子库数据-已经删除条数:" + deleteSuccessCount);
|
|
}
|
} while (currentCount == pageSize);
|
}
|
journalEntity.setResult(1);
|
journalEntity.setMin_id(minID);
|
journalEntity.setMax_id(maxID);
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (dao != null) {
|
dao.closeConnection();
|
}
|
}
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
journalEntity.setResult(0);
|
journalEntity.setError(journalManagerService.getStackTrace(e));
|
throw e;
|
} finally {
|
journalEntity.setCount(deleteSuccessCount);
|
journalEntity.setCreated_utc_datetime(new Date());
|
journalEntity.setSingle_duration(timer.intervalMs());
|
journalEntity.setType(5);
|
journalEntity.setConfigUuid(configFse.getUUID());
|
}
|
if (journalEntity != null && (journalEntity.getCount() > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1)) {
|
return journalManagerService.autoCreateJournal(journalEntity).getUUID();
|
}
|
return null;
|
}
|
|
/**
|
* 数据归档入口
|
*
|
* @param uuid 归档配置uuid
|
*/
|
public void dataArchivingEntry(String uuid) {
|
if (lock.tryLock(uuid)) {
|
try {
|
dataArchivingEntryLock(uuid);
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
lock.unLock(uuid);
|
}
|
return;
|
}
|
SpringMVCContextHolder.getSystemLogger().info("跳过执行:" + uuid);
|
}
|
|
public void dataArchivingEntryLock(String uuid) {
|
SpringMVCContextHolder.getSystemLogger().info("开始执行归档》》》》" + uuid);
|
WriteUtil.append("DA》》》");
|
WriteUtil.append("DA-已经获取到锁");
|
TimeInterval timer = DateUtil.timer();
|
String curDateTimeStr = DateUtil.date().toString();
|
int archivingSuccessCount = 0;
|
JournalEntity journalEntity = new JournalEntity();
|
journalEntity.setResult(1);
|
try {
|
//获取归档服务配置详情
|
FieldSetEntity configFse = getBaseDao().getFieldSetEntity(CmnConst.DATA_ARCHIVING_TABLE, uuid, false);
|
TimeInterval tempTestTimer = DateUtil.timer();
|
//来源数据表
|
String sourceTable = configFse.getString("source_table");
|
boolean canExecuteClearFlag = canExecuteClear(sourceTable);
|
String deleteSubLogUUID = null;
|
|
if (canExecuteClearFlag) {
|
WriteUtil.append("DA-删除扫码库》》》");
|
//删除扫码库已提取到mes主库且根据配置条件过滤的数据 KT特有
|
// deleteSubLogUUID = this.sweepCodeLibrary(configFse);
|
//更改为异步执行
|
ThreadUtil.execAsync(() -> {
|
try {
|
sweepCodeLibrary(configFse);
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
});
|
}
|
|
//来源数据源
|
String sourceDataSource = configFse.getString("source_data_source");
|
DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSource);
|
//目标数据源
|
String targetDataSource = configFse.getString("target_data_source");
|
DataBaseEntity targetDbe = new DataBaseEntity(targetDataSource);
|
//目标数据表前缀
|
String targetTablePrefix = configFse.getString("target_table_prefix");
|
if (targetTablePrefix.endsWith("_")) {
|
targetTablePrefix = targetTablePrefix.substring(0, targetTablePrefix.length() - 1);
|
}
|
//来源采集ID字段
|
String sourceCollectIdField = configFse.getString("source_collect_id_field");
|
//来源唯一标识字段 由MES子库生成的值
|
String sourceUniqueField = configFse.getString("source_unique_field");
|
//唯一字段
|
String uniqueField = configFse.getString("unique_field");
|
// 时间字段
|
String timeField = configFse.getString("time_field");
|
// 提取时间字段
|
String extractTimeField = configFse.getString(CmnConst.EXTRACT_TIME_FIELD);
|
|
Dao sourceDao = sourceDbe.getDao();
|
Dao targetDao = targetDbe.getDao();
|
Set<String> targetTableSet;
|
try {
|
DataArchivingServiceImpl service = new DataArchivingServiceImpl(sourceDao, targetDao, sourceTable, uuid, sourceDbe.getDbName(), targetDbe.getDbName());
|
String keyPrefix = "DA_STORE:" + sourceTable + ":";
|
boolean serializeFlag = true;
|
int outTime = 60 * 60;// 60 min
|
RedisService readRedis = StringUtils.isEmpty(configFse.getString("redis_config_uuid")) ? new RedisService() : new RedisService(new DataBaseEntity(configFse.getString("redis_config_uuid")));
|
tempTestTimer = DateUtil.timer();
|
WriteUtil.append("DA-获取表名集合》》》");
|
if (!StringUtils.isEmpty(timeField)) {
|
targetTableSet = QuerySqlParseUtil.getAllTableName(targetDao, targetDbe.getDbName(), sourceTable);
|
} else {
|
targetTableSet = Sets.newLinkedHashSet();
|
targetTableSet.add(targetTablePrefix);
|
}
|
WriteUtil.append("DA-获取表名集合耗时:" + tempTestTimer.intervalMs());
|
FieldSetEntity paramFse = getBaseDao().getFieldSetBySQL("select max(statistics_final_time) statistics_final_time from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false);
|
Date preMaxTime = paramFse.getDate("statistics_final_time");
|
// 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容)
|
boolean turnRedisFilterFlag = preMaxTime == null;
|
boolean turnTargetDBClearFlag = false;
|
if (turnRedisFilterFlag) {
|
// 若是没有成功的日志,表示为第一次归档,跳过目标库数据清理
|
FieldSetEntity logExistsFse = getBaseDao().getFieldSetBySQL("select count(1) count_value from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false);
|
turnTargetDBClearFlag = "0".equals(logExistsFse.getString("count_value"));
|
}
|
StringBuilder paramSql = new StringBuilder(128);
|
paramSql.append("select max(").append(uniqueField).append(") max_id,min(").append(uniqueField).append(") min_id");
|
if (!StringUtils.isEmpty(timeField)) {
|
paramSql.append(",max(").append(timeField).append(") max_update_time,min(").append(timeField).append(") min_update_time");
|
}
|
if (!StringUtils.isEmpty(extractTimeField)) {
|
paramSql.append(",max(").append(extractTimeField).append(") max_extract_time,min(").append(extractTimeField).append(") min_extract_time");
|
}
|
paramSql.append(" from ").append(sourceTable);
|
StringBuilder filterSb = new StringBuilder(128);
|
if (preMaxTime != null && (!StringUtils.isEmpty(timeField) || !StringUtils.isEmpty(extractTimeField))) {
|
int sourceDbType = sourceDbe.getDbType().getValue();
|
if (!StringUtils.isEmpty(timeField)) {
|
filterSb.append("(").append(timeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE))
|
.append(" and ").append(timeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")");
|
}
|
if (!StringUtils.isEmpty(extractTimeField)) {
|
if (filterSb.length() > 0) {
|
filterSb.append(" or ");
|
}
|
filterSb.append("(").append(extractTimeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE))
|
.append(" and ").append(extractTimeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")");
|
}
|
paramSql.append(" where (").append(filterSb).append(")");
|
}
|
paramFse = sourceDao.getOne(paramSql.toString());
|
if (filterSb.length() > 0) {
|
filterSb.insert(0, "(");
|
filterSb.append(") and ");
|
}
|
if (StringUtils.isEmpty(paramFse.getString("max_id"))) {
|
WriteUtil.append("最大id为空,跳出,执行sql:" + paramSql);
|
return;
|
}
|
filterSb.append(uniqueField).append("<='").append(paramFse.getString("max_id")).append("'");
|
if (!StringUtils.isEmpty(configFse.getString("select_filter")) && !StringUtils.isEmpty(configFse.getString("select_filter").trim())) {
|
filterSb.append(" and (").append(configFse.getString("select_filter")).append(")");
|
}
|
WriteUtil.append("DA-sql-filter:" + filterSb);
|
Date statisticsStartTime = getAimDate(paramFse.getDate("min_update_time"), paramFse.getDate("min_extract_time"), 0);
|
Date statisticsFinalTime = getAimDate(paramFse.getDate("max_update_time"), paramFse.getDate("max_extract_time"), 0);
|
String maxID = paramFse.getString("max_id");
|
String minID = paramFse.getString("min_id");
|
String splitTableType = "1".equals(configFse.getString("split_table_type")) ? "1" : "0";
|
dataArchivingQueue.query(sourceDbe, sourceTable, filterSb.toString(), null, uniqueField, minID);
|
DataTableEntity allDte;
|
Map<String, List<DataTableEntity>> groupDteMap;
|
do {
|
allDte = dataArchivingQueue.get(sourceTable);
|
if (DataTableEntity.isEmpty(allDte)) {
|
WriteUtil.append("DA-从队列中获取内容为空,执行睡眠跳过...");
|
Thread.sleep(RandomUtil.randomInt(800, 1200));
|
continue;
|
}
|
WriteUtil.append("DA-从队列中获取内容非空,执行插入...");
|
groupDteMap = dteGroupByTime(allDte, timeField, splitTableType);
|
FieldSetEntity tempFse;
|
String field;
|
Object value;
|
Date date;
|
Date updateTime;
|
Date extractTime;
|
Map<String, Set<String>> sourceTableUniqueByCollectId = null;
|
for (Map.Entry<String, List<DataTableEntity>> entry : groupDteMap.entrySet()) {
|
String time = entry.getKey();
|
List<DataTableEntity> dteList = entry.getValue();
|
for (DataTableEntity list : dteList) {
|
//创建表(不存在才创建)
|
tempTestTimer = DateUtil.timer();
|
WriteUtil.append("DA-创建表》》》");
|
String tableName = service.createTable(targetTablePrefix, time, targetDbe);
|
WriteUtil.append("DA-创建表耗时:" + tempTestTimer.intervalMs());
|
targetTableSet.add(tableName);
|
FieldSetEntity fs = list.getFieldSetEntity(0);
|
tempTestTimer = DateUtil.timer();
|
WriteUtil.append("DA-数据筛选》》》");
|
if (!StringUtils.isEmpty(sourceCollectIdField)) {
|
sourceTableUniqueByCollectId = new HashMap<>();
|
}
|
// 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容),跳过验证
|
if (!turnRedisFilterFlag) {
|
// 验证是否存在redis中,若是存在,比较时间字段值的大小,若是查询出数据中的时间更靠近当前时间,那么重置redis中的时间和过期时间,若是redis中的时间更靠近当前时间,则剔除数据集中的数据,并重置过期时间;若是不存在,则正常执行,先删除然后新增
|
for (int i = list.getRows() - 1; i >= 0; i--) {
|
tempFse = list.getFieldSetEntity(i);
|
updateTime = tempFse.getDate(timeField);
|
extractTime = tempFse.getDate(extractTimeField);
|
date = getAimDate(updateTime, extractTime, 0);
|
field = tempFse.getString(uniqueField);
|
value = readRedis.get(keyPrefix + field, serializeFlag);
|
String sourceUniqueValue = tempFse.getString(sourceUniqueField);
|
String collectId = tempFse.getString(sourceCollectIdField);
|
if (!StringUtils.isEmpty(sourceCollectIdField) && sourceTableUniqueByCollectId != null
|
&& !StringUtils.isEmpty(collectId) && !StringUtils.isEmpty(sourceUniqueValue)) {
|
Set<String> uniqueValues = sourceTableUniqueByCollectId.computeIfAbsent(collectId, k -> new HashSet<>());
|
uniqueValues.add(sourceUniqueValue);
|
}
|
if (value != null && !StringUtils.isEmpty(value.toString())) {
|
if (((Date) value).compareTo(date) <= 0) {
|
readRedis.set(keyPrefix + field, date, serializeFlag);
|
readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
|
statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
|
statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
|
minID = getAimID(minID, field, 0);
|
maxID = getAimID(maxID, field, 1);
|
} else {
|
list.removeFieldSetEntity(i);
|
readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
|
}
|
} else {
|
readRedis.set(keyPrefix + field, date, serializeFlag);
|
readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
|
statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
|
statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
|
minID = getAimID(minID, field, 0);
|
maxID = getAimID(maxID, field, 1);
|
}
|
}
|
}
|
WriteUtil.append("DA-数据筛选耗时:" + tempTestTimer.intervalMs());
|
if (DataTableEntity.isEmpty(list)) {
|
continue;
|
}
|
//重设表名
|
fs.setTableName(tableName);
|
list.setMeta(fs.getMeta());
|
Connection connection = sourceDao.getConnection();
|
//设置手动提交
|
connection.setAutoCommit(false);
|
try {
|
tempTestTimer = DateUtil.timer();
|
WriteUtil.append("DA-清理数据》》》");
|
// 拉取全量数据到空表(第一次归档),跳过清理
|
if (!turnTargetDBClearFlag) {
|
// 清理数据
|
DataTableEntity clearDte = clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list);
|
WriteUtil.append("DA-清理数据量:" + clearDte.getRows());
|
}
|
WriteUtil.append("DA-清理数据耗时:" + tempTestTimer.intervalMs());
|
// 新增数据
|
tempTestTimer = DateUtil.timer();
|
WriteUtil.append("DA-新增数据量:" + list.getRows());
|
if (list.getRows() > 0) {
|
WriteUtil.append("DA-新增数据》》》表名:" + list.getFieldSetEntity(0).getMeta().getTableName()[0]);
|
try {
|
targetDao.addBatch(list);
|
} catch (Exception e) {
|
if (turnTargetDBClearFlag) {
|
clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list);
|
targetDao.addBatch(list);
|
} else {
|
throw e;
|
}
|
}
|
}
|
WriteUtil.append("DA-新增数据耗时:" + tempTestTimer.intervalMs());
|
// 提交
|
connection.commit();
|
} catch (Exception e) {
|
|
//若批量添加失败回滚删除
|
try {
|
connection.rollback();
|
} catch (Exception er) {
|
e.printStackTrace();
|
throw er;
|
}
|
throw e;
|
} finally {
|
//重设连接为自动提交
|
connection.setAutoCommit(true);
|
}
|
archivingSuccessCount += list.getRows();
|
}
|
}
|
} while (!dataArchivingQueue.checkQueryFinish(sourceTable) || !dataArchivingQueue.checkInsertQueueEmpty(sourceTable));
|
journalEntity.setSingle_duration(timer.intervalMs());
|
journalEntity.setStatistics_start_time(statisticsStartTime);
|
journalEntity.setStatistics_final_time(statisticsFinalTime);
|
journalEntity.setMin_id(minID);
|
journalEntity.setMax_id(maxID);
|
WriteUtil.append("DA-循环完毕");
|
} catch (Exception e) {
|
WriteUtil.append("error:\n" + journalManagerService.getStackTrace(e));
|
throw e;
|
} finally {
|
targetDao.closeConnection();
|
sourceDao.closeConnection();
|
// 关闭线程
|
dataArchivingQueue.shutdownQueryThread(sourceTable);
|
}
|
|
// 删除mes主库的内容
|
if (canExecuteClearFlag) {
|
WriteUtil.append("DA-删除mes主库内容》》》");
|
String deleteMasterLogUUID = deleteMasterData(sourceTable, configFse, sourceDao, targetDao, uniqueField, timeField, extractTimeField, targetTableSet, deleteSubLogUUID);
|
if (!StringUtils.isEmpty(deleteMasterLogUUID)) {
|
journalEntity.setPre_step_uuid(deleteMasterLogUUID);
|
} else if (!StringUtils.isEmpty(deleteSubLogUUID)) {
|
journalEntity.setPre_step_uuid(deleteMasterLogUUID);
|
}
|
WriteUtil.append("DA-删除mes主库内容完毕");
|
}
|
String errorInfo = dataArchivingQueue.getErrorLog(sourceTable);
|
if (!StringUtils.isEmpty(errorInfo)) {
|
journalEntity.setError(errorInfo);
|
journalEntity.setResult(0);
|
}
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
if (!StringUtils.isEmpty(journalEntity.getError())) {
|
journalEntity.setError(journalEntity.getError() + "\n" + journalManagerService.getStackTrace(e));
|
} else {
|
journalEntity.setError(journalManagerService.getStackTrace(e));
|
}
|
journalEntity.setResult(0);
|
} finally {
|
journalEntity.setSingle_duration(timer.intervalMs());
|
journalEntity.setCount(archivingSuccessCount);
|
journalEntity.setConfigUuid(uuid);
|
journalEntity.setType(5);
|
journalEntity.setDetail(4);
|
journalEntity.setCreated_utc_datetime(new Date());
|
FieldSetEntity curLogFse = null;
|
if (archivingSuccessCount > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1 || !StringUtils.isEmpty(journalEntity.getPre_step_uuid())) {
|
curLogFse = journalManagerService.autoCreateJournal(journalEntity);
|
}
|
|
// 将日志表中执行失败的记录标记为已经重新处理
|
WriteUtil.append("DA-将日志表中执行失败的记录标记为已经重新处理");
|
DataTableEntity logDte;
|
if (curLogFse == null) {
|
logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=?", new Object[]{uuid});
|
} else {
|
logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=? and uuid<>?", new Object[]{uuid, curLogFse.getUUID()});
|
}
|
for (int i = 0; i < logDte.getRows(); i++) {
|
journalManagerService.writeBackReDealResult(logDte.getFieldSetEntity(i), true);
|
}
|
WriteUtil.append("DA-执行完毕");
|
}
|
}
|
|
/**
|
* 清理归档重复数据
|
*
|
* @param sourceTable
|
* @param targetTableSet
|
* @param uniqueField
|
* @param targetDao
|
*/
|
private DataTableEntity clearArchiveRepeatData(String sourceTable, Set<String> targetTableSet, String uniqueField, Dao targetDao, DataTableEntity list) {
|
StringBuilder clearSql = new StringBuilder(128);
|
clearSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet, Arrays.asList(uniqueField, "{#table_name#}"), false, " where " + BaseUtil.buildQuestionMarkFilter(uniqueField, list.getFieldAllValues(uniqueField), true)))
|
.append("\nselect ").append(uniqueField).append(",_table_name from ").append(sourceTable);
|
DataTableEntity clearDte = targetDao.getList(clearSql.toString());
|
if (!DataTableEntity.isEmpty(clearDte)) {
|
Set<Object> clearTableSet = Sets.newHashSet();
|
clearTableSet.addAll(Arrays.asList(clearDte.getFieldAllValues("_table_name")));
|
for (Object targetTableName : clearTableSet) {
|
targetDao.delete(targetTableName.toString(),
|
BaseUtil.buildQuestionMarkFilter(uniqueField, clearDte.getRows(), true),
|
clearDte.getData().stream().map(item -> item.getString(uniqueField)).toArray());
|
}
|
}
|
return clearDte;
|
}
|
|
/**
|
* 清理判定,每天切换的时候可以执行一次
|
*
|
* @param tableName
|
* @return
|
*/
|
public boolean canExecuteClear(String tableName) {
|
String dateStr = DateUtil.format(DateUtil.date(), "yyyy-MM-dd");
|
final String KEY = "DE_CLEAR_STORE";
|
Object preDate = RedisUtil.getHash(KEY, tableName);
|
if (dateStr.equals(preDate)) {
|
return false;
|
} else {
|
RedisUtil.setHash(KEY, tableName, dateStr);
|
return true;
|
}
|
}
|
|
/**
|
* 清理主库数据
|
*
|
* @param sourceTable 源表名
|
* @param configFse 配置fse
|
* @param sourceDao 源dao
|
* @param targetDao 目标dao
|
* @param uniqueField 唯一字段
|
* @param timeField 更新(归档)时间字段
|
* @param extractTimeField 提取时间字段
|
* @param targetTableSet 目标表集合
|
* @param deleteSubLogUUID 删除子表日志uuid
|
* @return
|
*/
|
private String deleteMasterData(String sourceTable, FieldSetEntity configFse, Dao sourceDao, Dao targetDao, String uniqueField, String timeField, String extractTimeField, Set<String> targetTableSet, String deleteSubLogUUID) {
|
FieldSetEntity deleteMasterLogFse = new FieldSetEntity();
|
deleteMasterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
|
TimeInterval deleteMasterLogTimer = DateUtil.timer();
|
String minID = "";
|
String maxID = "";
|
Date statisticsStartTime = null;
|
Date statisticsFinalTime = null;
|
try {
|
StringBuilder sql = new StringBuilder(128);
|
sql.append("select count(*) count_value from ").append(sourceTable);
|
if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) {
|
sql.append(" where ").append(configFse.getString("source_select_filter"));
|
} else {
|
sql.append(" where 1=2");
|
}
|
FieldSetEntity paramFse = sourceDao.getOne(sql.toString());
|
WriteUtil.append("DA-sourceDao.getOne(sql.toString())-sql:" + sql);
|
int totalCount = StringUtils.isEmpty(paramFse.getString("count_value")) ? 0 : paramFse.getInteger("count_value");
|
int delTotalCount = 0;
|
if (totalCount > 0) {
|
int pageSize = 1000;
|
int totalPage = totalCount / pageSize + (totalCount % pageSize == 0 ? 0 : 1);
|
sql.setLength(0);
|
sql.append("select ").append(uniqueField).append(" from ").append(sourceTable);
|
if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) {
|
sql.append(" where ").append(configFse.getString("source_select_filter"));
|
}
|
StringBuilder existSql = new StringBuilder(128);
|
existSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet))
|
.append("\nselect ").append(uniqueField).append(" from ").append(sourceTable);
|
StringBuilder tempSql = new StringBuilder(128);
|
for (int i = 0; i < totalPage; i++) {
|
DataTableEntity delDte = sourceDao.getList(sql.toString(), new Object[]{}, 1, pageSize);
|
WriteUtil.append("DA-删除的数据-delDte:" + Arrays.toString(delDte.getFieldAllValues(uniqueField)));
|
if (DataTableEntity.isEmpty(delDte)) {
|
continue;
|
}
|
// 验证归档库里面存在,仅删除存在,不存在的保留
|
tempSql.setLength(0);
|
tempSql.append(existSql);
|
tempSql.append(" where ").append(BaseUtil.buildQuestionMarkFilter(uniqueField, delDte.getFieldAllValues(uniqueField), true));
|
WriteUtil.append("DA-tempSql:" + tempSql);
|
DataTableEntity existDte = targetDao.getList(tempSql.toString(), new Object[]{});
|
if (existDte.getRows() > 0) {
|
FieldSetEntity existFse;
|
for (int j = 0; j < existDte.getRows(); j++) {
|
existFse = existDte.getFieldSetEntity(j);
|
Date updateTime = existFse.getDate(timeField);
|
Date extractTime = existFse.getDate(extractTimeField);
|
Date date = getAimDate(updateTime, extractTime, 0);
|
statisticsStartTime = getAimDate(statisticsStartTime, date, 0);
|
statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1);
|
minID = getAimID(minID, existFse.getString(uniqueField), 0);
|
maxID = getAimID(maxID, existFse.getString(uniqueField), 1);
|
}
|
delTotalCount += existDte.getRows();
|
sourceDao.delete(sourceTable,
|
BaseUtil.buildQuestionMarkFilter(uniqueField, existDte.getRows(), true),
|
existDte.getData().stream().map(item -> item.getString(uniqueField)).toArray());
|
}
|
}
|
WriteUtil.append("DA-删除总条数:" + delTotalCount);
|
}
|
deleteMasterLogFse.setValue(CmnConst.COUNT, delTotalCount);
|
deleteMasterLogFse.setValue(CmnConst.RESULT, 1);
|
} catch (Exception e) {
|
e.printStackTrace();
|
deleteMasterLogFse.setValue(CmnConst.RESULT, 0);
|
deleteMasterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e));
|
} finally {
|
targetDao.closeConnection();
|
sourceDao.closeConnection();
|
deleteMasterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
|
deleteMasterLogFse.setValue(CmnConst.TYPE, 5);
|
deleteMasterLogFse.setValue(CmnConst.DETAIL, 6);
|
deleteMasterLogFse.setValue(CmnConst.PRE_STEP_UUID, deleteSubLogUUID);
|
deleteMasterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
|
deleteMasterLogFse.setValue(CmnConst.DEAL_RESULT, 1);
|
deleteMasterLogFse.setValue(CmnConst.MIN_ID, minID);
|
deleteMasterLogFse.setValue(CmnConst.MAX_ID, maxID);
|
deleteMasterLogFse.setValue(CmnConst.SINGLE_DURATION, deleteMasterLogTimer.intervalMs());
|
deleteMasterLogFse.setValue(CmnConst.CONFIG_UUID, configFse.getUUID());
|
deleteMasterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime);
|
deleteMasterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime);
|
if ((!StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.COUNT)) && deleteMasterLogFse.getInteger(CmnConst.COUNT) > 0)
|
|| !StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.ERROR)) || !"1".equals(deleteMasterLogFse.getString(CmnConst.RESULT))) {
|
getBaseDao().add(deleteMasterLogFse);
|
}
|
}
|
return deleteMasterLogFse.getUUID();
|
}
|
|
/**
|
* 获取指定的日期,若有一个为空,那么直接获取另外一个的值;否则按照指定取值
|
*
|
* @param d1
|
* @param d2
|
* @param sign 大于0,取两者中大的,就是更靠近当前时间的;否则取小的,就是更远离当前时间的
|
* @return
|
*/
|
private Date getAimDate(Date d1, Date d2, int sign) {
|
if (d1 == null && d2 == null) {
|
return null;
|
}
|
if (d1 == null || d2 == null) {
|
return d1 == null ? d2 : d1;
|
}
|
if (sign > 0) {
|
return d1.compareTo(d2) > 0 ? d1 : d2;
|
} else {
|
return d1.compareTo(d2) > 0 ? d2 : d1;
|
}
|
}
|
|
/**
|
* 获取指定的ID,那么直接获取另外一个的值;否则按照指定取值
|
*
|
* @param s1
|
* @param s2
|
* @param sign 大于0,取两者中大的;否则取小的
|
* @return
|
*/
|
private String getAimID(String s1, String s2, int sign) {
|
if (StringUtils.isEmpty(s1) && StringUtils.isEmpty(s2)) {
|
return null;
|
}
|
if (StringUtils.isEmpty(s1) || StringUtils.isEmpty(s2)) {
|
return StringUtils.isEmpty(s1) ? s2 : s1;
|
}
|
String numberRegexp = "\\d{1,11}";
|
if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) {
|
boolean b = Long.parseLong(s1) > Long.parseLong(s2);
|
if (sign > 0) {
|
return b ? s1 : s2;
|
} else {
|
return b ? s2 : s1;
|
}
|
} else {
|
if (sign > 0) {
|
return s1.compareTo(s2) > 0 ? s1 : s2;
|
} else {
|
return s1.compareTo(s2) > 0 ? s2 : s1;
|
}
|
}
|
}
|
|
/**
|
* 将dte按照年份分组
|
*
|
* @param dte
|
* @param timeField 时间字段
|
* @param splitTableType 分表方式,0-年,1-月
|
* @return Map<时间, List < dte数据>>
|
*/
|
private Map<String, List<DataTableEntity>> dteGroupByTime(DataTableEntity dte, String timeField, String splitTableType) {
|
Map<String, List<DataTableEntity>> groupDteMap = Maps.newHashMap();
|
FieldSetEntity fse;
|
String time;
|
List<DataTableEntity> groupDteList;
|
if (StringUtils.isEmpty(timeField)) {
|
groupDteList = Lists.newArrayList();
|
groupDte(groupDteList, dte);
|
groupDteMap.put("0", groupDteList);
|
} else {
|
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM");
|
for (int i = 0; i < dte.getRows(); i++) {
|
fse = dte.getFieldSetEntity(i);
|
if (fse.getDate(timeField) == null) {
|
throw new BaseException(ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getValue(), ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getText() + "\ntable_name:" + fse.getTableName() + "\ndata:" + fse.getValues());
|
}
|
if ("1".equals(splitTableType)) {
|
time = dateFormat.format(fse.getDate(timeField));
|
} else {
|
time = String.valueOf(DateUtil.year(fse.getDate(timeField)));
|
}
|
groupDteList = groupDteMap.computeIfAbsent(time, k -> Lists.newArrayList());
|
groupAddDte(groupDteList, fse);
|
}
|
}
|
return groupDteMap;
|
}
|
|
private void groupDte(List<DataTableEntity> list, DataTableEntity allDte) {
|
if (list == null) {
|
throw new BaseException(ErrorCode.DATA_ARCHIVE_GROUP_CONTAINER_IS_NULL);
|
}
|
for (int i = 0; i < allDte.getRows(); i++) {
|
groupAddDte(list, allDte.getFieldSetEntity(i));
|
}
|
}
|
|
private void groupAddDte(List<DataTableEntity> list, FieldSetEntity fse) {
|
DataTableEntity dte;
|
if (list.isEmpty()) {
|
dte = new DataTableEntity();
|
list.add(dte);
|
} else {
|
dte = list.get(list.size() - 1);
|
if (dte.getRows() >= DataArchivingQueue.INSERT_PAGE_SIZE) {
|
dte = new DataTableEntity();
|
list.add(dte);
|
}
|
}
|
dte.addFieldSetEntity(fse);
|
}
|
|
|
class DataArchivingServiceImpl {
|
|
private Dao sourceDao;
|
private Dao targetDao;
|
private String sourceTable;
|
private String configUid;
|
private String sourceDbName;
|
private String targetDbName;
|
|
public DataArchivingServiceImpl(Dao sourceDao, Dao targetDao, String sourceTable, String configUid, String sourceDbName, String targetDbName) {
|
this.sourceDao = sourceDao;
|
this.targetDao = targetDao;
|
this.sourceTable = sourceTable;
|
this.configUid = configUid;
|
this.sourceDbName = sourceDbName;
|
this.targetDbName = targetDbName;
|
}
|
|
/**
|
* 验证数据表是否存在
|
*
|
* @param tableName 数据表名
|
* @return
|
*/
|
public boolean dataTableIsExists(String tableName) throws BaseException {
|
try {
|
Connection connection = targetDao.getConnection();
|
DatabaseMetaData metaData = targetDao.getConnection().getMetaData();
|
ResultSet tables = metaData.getTables(null, null, tableName, new String[]{"TABLE"});
|
boolean result = tables.next();
|
tables.close();
|
connection.close();
|
return result;
|
} catch (Exception e) {
|
e.printStackTrace();
|
throw new BaseException(e);
|
}
|
}
|
|
|
/**
|
* 创建表
|
*
|
* @param prefix 表名前缀
|
*/
|
public String createTable(String prefix, String time, DataBaseEntity dbe) throws BaseException {
|
String tableName;
|
if (!"0".equals(time)) {
|
tableName = prefix + (prefix.lastIndexOf("_") != prefix.length() - 1 ? "_" : "") + time;
|
} else {
|
tableName = prefix.endsWith("_") ? prefix.substring(0, prefix.length() - 1) : prefix;
|
}
|
if (dataTableIsExists(tableName)) {
|
return tableName;
|
}
|
JSONObject tableInfoObj = getTableInfo(tableName);
|
String sql = getSql(tableInfoObj);
|
SpringMVCContextHolder.getSystemLogger().error("sql:\n" + sql);
|
//先创建记录再执行ddl语句不然报错后ddl无法回滚
|
saveCreateTableRecord(tableName, time);
|
targetDao.executeSql(sql);
|
// syncDataConfigService.addTableField(dbe,dbe.getUuid(),tableName);
|
return tableName;
|
}
|
|
/**
|
* 根据表名获取来源数据源对应表的结构信息
|
*
|
* @param tableName
|
* @return
|
*/
|
private JSONObject getTableInfo(String tableName) {
|
JSONObject tableInfoObj = new JSONObject();
|
tableInfoObj.put(CmnConst.NAME, tableName);
|
DataTableEntity tempDte;
|
FieldSetEntity tempFse;
|
JSONObject fieldInfoObj = new JSONObject(new LinkedHashMap<>());
|
tableInfoObj.put(CmnConst.FIELD, fieldInfoObj);
|
JSONObject indexInfoObj = new JSONObject(new LinkedHashMap<>());
|
tableInfoObj.put(CmnConst.INDEX, indexInfoObj);
|
JSONObject singleFieldInfoObj;
|
if (DataBaseType.MYSQL.equals(sourceDao.getDataBaseType())) {
|
// mysql
|
// 表
|
tempFse = sourceDao.getOne("information_schema.`TABLES`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable});
|
tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("table_comment"));
|
// 字段表
|
tempDte = sourceDao.getList("information_schema.`COLUMNS`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}, "ordinal_position", 1, Integer.MAX_VALUE);
|
for (int i = 0; i < tempDte.getRows(); i++) {
|
tempFse = tempDte.getFieldSetEntity(i);
|
singleFieldInfoObj = new JSONObject();
|
fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj);
|
singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type"));
|
singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("character_maximum_length")) ? tempFse.getString("numeric_precision") : tempFse.getString("character_maximum_length"));
|
singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("numeric_scale"));
|
singleFieldInfoObj.put(CmnConst.NULLABLE, "NO".equalsIgnoreCase(tempFse.getString("is_nullable")) ? 0 : 1);
|
singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("column_comment"));
|
}
|
// 索引表
|
StringBuilder sql = new StringBuilder(128);
|
sql.append("select index_name,non_unique,group_concat(column_name) column_name");
|
sql.append("\nfrom information_schema.`STATISTICS`");
|
sql.append("\nwhere table_schema=? and table_name=?");
|
sql.append("\ngroup by index_name,non_unique");
|
tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, sourceTable});
|
for (int i = 0; i < tempDte.getRows(); i++) {
|
tempFse = tempDte.getFieldSetEntity(i);
|
singleFieldInfoObj = new JSONObject();
|
indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
|
singleFieldInfoObj.put(CmnConst.TYPE, "PRIMARY".equalsIgnoreCase(tempFse.getString("index_name")) ? CmnConst.PRIMARY : ("1".equals(tempFse.getString("non_unique")) ? CmnConst.NORMAL : CmnConst.UNIQUE));
|
singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
|
}
|
} else if (DataBaseType.ORACLE.equals(sourceDao.getDataBaseType())) {
|
// oracle
|
// 表
|
String upperTableName = sourceTable.toUpperCase();
|
tempFse = sourceDao.getOne("SYS.USER_TAB_COMMENTS", "table_name=?", new Object[]{upperTableName});
|
tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments"));
|
// 字段表
|
StringBuilder sql = new StringBuilder(128);
|
sql.append("SELECT TC.COLUMN_NAME,DATA_TYPE,DATA_LENGTH,DATA_PRECISION,DATA_SCALE,NULLABLE,CHAR_LENGTH,COMMENTS FROM SYS.USER_TAB_COLUMNS TC");
|
sql.append("\nLEFT JOIN USER_COL_COMMENTS CC ON TC.TABLE_NAME=CC.TABLE_NAME AND TC.COLUMN_NAME=CC.COLUMN_NAME");
|
sql.append("\nWHERE TC.TABLE_NAME=?");
|
sql.append("\nORDER BY TC.COLUMN_ID");
|
tempDte = sourceDao.getList(sql.toString(), new Object[]{upperTableName});
|
for (int i = 0; i < tempDte.getRows(); i++) {
|
tempFse = tempDte.getFieldSetEntity(i);
|
singleFieldInfoObj = new JSONObject();
|
fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj);
|
singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type").contains("VARCHAR") ? "varchar" : tempFse.getString("data_type"));
|
singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("data_precision"))
|
? tempFse.getString("char_length") : tempFse.getString("data_precision"));
|
singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("data_scale"));
|
singleFieldInfoObj.put(CmnConst.NULLABLE, "N".equalsIgnoreCase(tempFse.getString("nullable")) ? 0 : 1);
|
singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments"));
|
}
|
// 索引表
|
sql.setLength(0);
|
sql.append("SELECT DIC.INDEX_NAME,WM_CONCAT(DIC.COLUMN_NAME) column_name FROM SYS.DBA_IND_COLUMNS DIC");
|
sql.append("\nLEFT JOIN SYS.DBA_INDEXES DI ON DIC.INDEX_NAME=DI.INDEX_NAME");
|
sql.append("\nWHERE UNIQUENESS='NONUNIQUE' AND DIC.TABLE_OWNER=? AND DI.TABLE_NAME=?");
|
sql.append("\nGROUP BY DIC.INDEX_NAME");
|
tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName});
|
for (int i = 0; i < tempDte.getRows(); i++) {
|
tempFse = tempDte.getFieldSetEntity(i);
|
singleFieldInfoObj = new JSONObject();
|
indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
|
singleFieldInfoObj.put(CmnConst.TYPE, CmnConst.NORMAL);
|
singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
|
}
|
// 约束表 C-检查,写到字段里面;R-外键,不要;P-主键;U-唯一键
|
sql.setLength(0);
|
sql.append("SELECT DC.CONSTRAINT_NAME index_name,CONSTRAINT_TYPE,LISTAGG(COLUMN_NAME, ',') WITHIN GROUP(ORDER BY DCC.POSITION) column_name FROM SYS.DBA_CONS_COLUMNS DCC");
|
sql.append("\nLEFT JOIN SYS.DBA_CONSTRAINTS DC ON DCC.CONSTRAINT_NAME=DC.CONSTRAINT_NAME");
|
sql.append("\nWHERE DCC.OWNER=? AND DCC.TABLE_NAME=? AND CONSTRAINT_TYPE IN ('P','U')");
|
sql.append("\nGROUP BY DC.CONSTRAINT_NAME,CONSTRAINT_TYPE");
|
tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName});
|
WriteUtil.append("DA-DDL-:" + sql + " |||库名:" + sourceDbName + " |||表名:" + upperTableName);
|
for (int i = 0; i < tempDte.getRows(); i++) {
|
tempFse = tempDte.getFieldSetEntity(i);
|
singleFieldInfoObj = new JSONObject();
|
indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj);
|
singleFieldInfoObj.put(CmnConst.TYPE, "P".equalsIgnoreCase(tempFse.getString("constraint_type")) ? CmnConst.PRIMARY : CmnConst.UNIQUE);
|
singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name"));
|
}
|
WriteUtil.append("DA-DDL-创表信息:" + tableInfoObj);
|
} else {
|
throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL);
|
}
|
return tableInfoObj;
|
}
|
|
/**
|
* 根据表结构信息,拼接DDL创建表sql语句
|
*
|
* @param tableInfoObj
|
* @return
|
*/
|
private String getSql(JSONObject tableInfoObj) {
|
StringBuilder sql = new StringBuilder(128);
|
JSONObject fieldInfoObj = tableInfoObj.getJSONObject(CmnConst.FIELD);
|
JSONObject indexInfoObj = tableInfoObj.getJSONObject(CmnConst.INDEX);
|
JSONObject singleFieldInfoObj;
|
sql.append("CREATE TABLE ").append(tableInfoObj.getString(CmnConst.NAME)).append(" (");
|
if (DataBaseType.MYSQL.equals(targetDao.getDataBaseType())) {
|
// mysql
|
for (String field : fieldInfoObj.keySet()) {
|
singleFieldInfoObj = fieldInfoObj.getJSONObject(field);
|
sql.append("\n `").append(field.toLowerCase(Locale.ROOT)).append("` ");
|
if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("timestamp") || singleFieldInfoObj.getString(CmnConst.TYPE).contains("TIMESTAMP")) {
|
sql.append("timestamp ");
|
} else if (singleFieldInfoObj.getIntValue(CmnConst.DECIMAL) > 0) {
|
sql.append("decimal(").append(singleFieldInfoObj.getString(CmnConst.INTEGER)).append(",").append(singleFieldInfoObj.getString(CmnConst.DECIMAL)).append(") ");
|
} else if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("int")) {
|
sql.append("int(0) ");
|
} else if ("number".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
if ("0".equals(singleFieldInfoObj.getString(CmnConst.DECIMAL))) {
|
sql.append("int(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") ");
|
} else {
|
sql.append("decimal(22,4) ");
|
}
|
} else if ("date".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
sql.append("datetime(0) ");
|
} else if ("blob".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
sql.append("blob ");
|
} else if ("text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT).endsWith("text") ||
|
(("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) && singleFieldInfoObj.getIntValue(CmnConst.INTEGER) >= 4000)
|
) {
|
sql.append("text ");
|
} else {
|
sql.append(singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT)).append("(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") ");
|
}
|
if ("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
sql.append(" CHARACTER SET utf8mb4 COLLATE utf8mb4_bin ");
|
}
|
if ("0".equals(singleFieldInfoObj.getString(CmnConst.NULLABLE))) {
|
sql.append("not null ");
|
}
|
if (!StringUtils.isEmpty(singleFieldInfoObj.getString(CmnConst.COMMENT))) {
|
sql.append(" comment '").append(singleFieldInfoObj.getString(CmnConst.COMMENT)).append("'");
|
}
|
sql.append(",");
|
}
|
for (String indexName : indexInfoObj.keySet()) {
|
singleFieldInfoObj = indexInfoObj.getJSONObject(indexName);
|
if (CmnConst.PRIMARY.equalsIgnoreCase(indexName) || CmnConst.PRIMARY.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
sql.append("\n PRIMARY KEY (`").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append("`) USING BTREE,");
|
} else if (CmnConst.UNIQUE.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
sql.append("\n UNIQUE INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,");
|
} else if (CmnConst.NORMAL.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) {
|
sql.append("\n INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,");
|
}
|
}
|
sql.deleteCharAt(sql.length() - 1);
|
sql.append("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ");
|
if (!StringUtils.isEmpty(tableInfoObj.getString(CmnConst.COMMENT))) {
|
sql.append(" COMMENT = '").append(tableInfoObj.getString(CmnConst.COMMENT)).append("'");
|
}
|
} else {
|
throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL);
|
}
|
return sql.toString();
|
}
|
|
private String saveCreateTableRecord(String tableName, String time) {
|
/*=====================================================*/
|
// 新增创建表记录
|
FieldSetEntity fse = new FieldSetEntity();
|
fse.setTableName(CmnConst.DATA_ARCHIVING_SUB_TABLE);
|
fse.setValue("table_name", tableName);
|
fse.setValue("parent_uuid", this.configUid);
|
fse.setValue("data_time", time);
|
BaseUtil.createCreatorAndCreationTime(fse);
|
getBaseDao().saveFieldSetEntity(fse);
|
/*=====================================================*/
|
return tableName;
|
}
|
|
|
private void createIndex(String targetTable, String time) throws BaseException {
|
StringBuilder sql = new StringBuilder();
|
sql.append("\n SELECT DBMS_METADATA.GET_DDL('INDEX',u.index_name) as create_index_statement,u.INDEX_NAME ");
|
sql.append("\n from USER_INDEXES u where u.TABLE_NAME=? ");
|
String primaryIndexName = getPrimaryIndexName();
|
List<Object> params = new ArrayList<>();
|
params.add(sourceTable);
|
if (!StringUtils.isEmpty(primaryIndexName)) {
|
sql.append(" and u.index_name != ? ");
|
params.add(primaryIndexName);
|
}
|
DataTableEntity list = sourceDao.getList(sql.toString(), params.toArray());
|
if (!DataTableEntity.isEmpty(list)) {
|
for (int i = 0; i < list.getRows(); i++) {
|
//循环获取创建索引语句
|
String createIndexStatement = list.getString(i, "create_index_statement");
|
String indexName = list.getString(i, "index_name");
|
if (!StringUtils.isEmpty(createIndexStatement)) {
|
//新的索引名称
|
String newIndexName = this.getIndexName(indexName, time);
|
//将建索引ddl
|
createIndexStatement = createIndexStatement.replaceAll(sourceTable, targetTable)
|
.replaceAll("\"" + indexName + "\"", "\"" + newIndexName + "\"");
|
//执行创建索引
|
targetDao.executeSql(createIndexStatement);
|
}
|
}
|
}
|
|
}
|
|
/**
|
* 获取新的索引名称
|
*
|
* @param indexName
|
* @return
|
*/
|
private String getIndexName(String indexName, String time) {
|
//新的索引名称
|
String newIndexName;
|
if (indexName.length() <= 26) {
|
newIndexName = indexName + time;
|
} else {
|
//超过26位随机生成索引名称
|
newIndexName = RandomUtil.randomString(10) + time;
|
}
|
return newIndexName;
|
}
|
|
/**
|
* 获取主键索引的名名称
|
*
|
* @return
|
*/
|
private String getPrimaryIndexName() {
|
StringBuilder sql = new StringBuilder();
|
sql.append("\n SELECT a.index_name from user_constraints a ");
|
sql.append("\n WHERE a.constraint_type = 'P' ");
|
sql.append("\n AND a.table_name = ? ");
|
FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()});
|
return one != null ? one.getString("index_name") : null;
|
}
|
|
/**
|
* 获取创建表语句(包含主键索引)
|
*
|
* @return
|
*/
|
private String getCreateTableStatement() {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT DBMS_METADATA.GET_DDL(U.OBJECT_TYPE, u.object_name) create_table_statement ");
|
sql.append(" from USER_OBJECTS u ");
|
sql.append(" where U.OBJECT_TYPE ='TABLE' and u.object_name=? ");
|
FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()});
|
return one != null ? one.getString("create_table_statement") : null;
|
}
|
|
|
/**
|
* 获取其他索引名称
|
*
|
* @return
|
*/
|
private List<String> getOtherIndexName() {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT INDEX_NAME FROM USER_INDEXES ");
|
sql.append("\n WHERE TABLE_NAME=? AND INDEX_NAME NOT IN ( ");
|
sql.append("\n SELECT a.index_name from user_constraints a ");
|
sql.append("\n WHERE a.constraint_type = 'P' ");
|
sql.append("\n AND a.TABLE_NAME = ? ) ");
|
|
DataTableEntity list = sourceDao.getList(sql.toString(), new Object[]{this.sourceTable, this.sourceTable});
|
if (!DataTableEntity.isEmpty(list)) {
|
return list.getData().stream().map(item -> item.getString("index_name")).collect(Collectors.toList());
|
}
|
return null;
|
}
|
|
|
}
|
|
}
|