package com.product.data.center.service; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.RandomUtil; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.product.core.cache.util.RedisUtil; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.entity.JournalEntity; import com.product.data.center.utils.CustomLock; import com.product.data.center.utils.QuerySqlParseUtil; import com.product.data.center.utils.SqlTransferUtil; import com.product.data.center.utils.WriteUtil; import com.product.data.service.SyncDataConfigService; import com.product.datasource.config.DataBaseType; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import com.product.datasource.service.RedisService; import com.product.util.BaseUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.SQLException; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; /** * @Author cheng * @Date 2022/7/26 9:41 * @Desc 数据归档 */ @Service public class DataArchivingService extends AbstractBaseService { @Autowired JournalManagerService journalManagerService; @Autowired DataArchivingQueue dataArchivingQueue; @Resource private SyncDataConfigService syncDataConfigService; private static CustomLock lock = new CustomLock(); private Map getCollectIds(String[] deleteDataSource, String tableName) { StringBuilder sql = new StringBuilder("SELECT "); sql.append("id,data_source FROM product_sys_data_collect WHERE "); List params = new ArrayList<>(); for (int i = 0; i < deleteDataSource.length; i++) { String source = deleteDataSource[i]; if (i > 0) { sql.append(" or "); } sql.append("( data_source = ? and LOWER(source_table) = ? )"); params.add(source); params.add(tableName.toLowerCase()); } DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), params.toArray()); if (DataTableEntity.isEmpty(dataTableEntity)) { return null; } Map collect = dataTableEntity.getData().stream().collect(Collectors.toMap(item -> item.getString("data_source"), item -> item.getString("id"))); if (collect.size() != deleteDataSource.length) { throw new BaseException(ErrorCode.GET_COLLECT_ID_FAIL); } return collect; } public void reDeal(String logUuid) { FieldSetEntity fs = getBaseDao().getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUuid, false); if (!FieldSetEntity.isEmpty(fs)) { String configUid = fs.getString(CmnConst.CONFIG_UUID); reDealArchiving(configUid); } fs.setValue(CmnConst.DEAL_FLAG, 1); getBaseDao().update(fs); } @Async public void reDealArchiving(String uuid) { this.dataArchivingEntry(uuid); } /** * 扫码库删除 * * @param configFse * @throws BaseException */ private String sweepCodeLibrary(FieldSetEntity configFse) throws BaseException, SQLException { JournalEntity journalEntity = new JournalEntity(); journalEntity.setDetail(3); TimeInterval timer = DateUtil.timer(); int deleteSuccessCount = 0; try { String deleteDataSource = configFse.getString("delete_data_source"); WriteUtil.append("DA-deleteDataSource:" + deleteDataSource); WriteUtil.append("DA-sub_delete_select_filter:" + configFse.getString("delete_select_filter")); if (StringUtils.isEmpty(deleteDataSource)) { return null; } String[] deleteDataSourceArray = deleteDataSource.split(","); // DataBaseEntity dbe = new DataBaseEntity(deleteDataSource); String source_data_validation = configFse.getString("source_data_validation"); if (StringUtils.isEmpty(source_data_validation)) { return null; } String deleteDataTable = configFse.getString("delete_data_table"); if (StringUtils.isEmpty(deleteDataTable)) { return null; } String collectIdField = configFse.getString("collect_id_field"); DataBaseEntity validationDbe = new DataBaseEntity(source_data_validation); //扫码库删除唯一标识 String deleteUniqueField = configFse.getString("delete_unique_field"); //验证表 String validationTableName = configFse.getString("table_data_validation"); //验证唯一字段 String validationUniqueField = configFse.getString("validation_unique_field"); //忽略对比字段 List ignoreComparisonFields = !StringUtils.isEmpty(configFse.getString("ignore_comparison_field")) ? Lists.newArrayList(Arrays.asList(configFse.getString("ignore_comparison_field").split(","))) : null; int pageSize = 500; DataTableEntity list; Dao dao = null; int currentCount = 0; Map collectIds = getCollectIds(deleteDataSource.split(","), deleteDataTable); try { String minID = null; String maxID = null; for (int j = 0; j < deleteDataSourceArray.length; j++) { if (dao != null) { dao.closeConnection(); } dao = new DataBaseEntity(deleteDataSourceArray[j]).getDao(); do { // String sql = "SELECT * FROM (SELECT * from " + deleteDataTable + " where " + configFse.getString("delete_select_filter") + " order by " + deleteUniqueField + " ) A "; // if (!Objects.isNull(minID) && !Objects.isNull(maxID)) { // sql += " where " + deleteUniqueField + " > " + maxID; // } String filter = configFse.getString("delete_select_filter"); if (!Objects.isNull(minID) && !Objects.isNull(maxID)) { filter += " and ( " + deleteUniqueField + " > " + maxID + ")"; } list = dao.getList(deleteDataTable, filter, new Object[]{}, deleteUniqueField, 1, pageSize); WriteUtil.append("DA-删除子库数据-表名:" + deleteDataTable); if (DataTableEntity.isEmpty(list)) { break; } currentCount = list.getRows(); Object[] uniqueValues = list.getData().stream().map(item -> item.getString(deleteUniqueField)).toArray(); DataTableEntity validationData = validationDbe.getDao().getList(validationTableName, collectIdField + "= ? AND " + BaseUtil.buildQuestionMarkFilter(validationUniqueField, uniqueValues.length, true), ArrayUtil.addAll(new Object[]{collectIds.get(deleteDataSourceArray[j])}, uniqueValues)); validationDbe.getDao().closeConnection(); if (DataTableEntity.isEmpty(validationData)) { break; } Map collectMap = validationData.getData().stream().collect(Collectors.toMap( (item) -> item.getString(validationUniqueField), item -> item.getValues())); List deleteUniqueValueList = new ArrayList<>(); for (int i = 0; i < list.getRows(); i++) { FieldSetEntity fs = list.getFieldSetEntity(i); String uniqueValue = fs.getString(deleteUniqueField); Map map = collectMap.get(uniqueValue); if (ignoreComparisonFields != null && map != null) { for (int i1 = 0; i1 < ignoreComparisonFields.size(); i1++) { try { map.remove(ignoreComparisonFields.get(i1)); fs.remove(ignoreComparisonFields.get(i1)); } catch (Exception e) { e.printStackTrace(); } } } if (map == null || !new JSONObject((Map) fs.getValues()).equals(new JSONObject(map))) { list.removeFieldSetEntity(i); i--; continue; } deleteUniqueValueList.add(uniqueValue); minID = getAimID(minID, uniqueValue, 0); maxID = getAimID(maxID, uniqueValue, 1); } collectMap.clear(); if (deleteUniqueValueList.size() > 0) { deleteSuccessCount += dao.deleteRInt(configFse.getString("delete_data_table"), BaseUtil.buildQuestionMarkFilter(deleteUniqueField, deleteUniqueValueList.size(), true), deleteUniqueValueList.toArray()); WriteUtil.append("DA-删除子库数据-已经删除条数:" + deleteSuccessCount); } } while (currentCount == pageSize); } journalEntity.setResult(1); journalEntity.setMin_id(minID); journalEntity.setMax_id(maxID); } catch (Exception e) { throw e; } finally { if (dao != null) { dao.closeConnection(); } } } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); journalEntity.setResult(0); journalEntity.setError(journalManagerService.getStackTrace(e)); throw e; } finally { journalEntity.setCount(deleteSuccessCount); journalEntity.setCreated_utc_datetime(new Date()); journalEntity.setSingle_duration(timer.intervalMs()); journalEntity.setType(5); journalEntity.setConfigUuid(configFse.getUUID()); } if (journalEntity != null && (journalEntity.getCount() > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1)) { return journalManagerService.autoCreateJournal(journalEntity).getUUID(); } return null; } /** * 数据归档入口 * * @param uuid 归档配置uuid */ public void dataArchivingEntry(String uuid) { if (lock.tryLock(uuid)) { try { dataArchivingEntryLock(uuid); } catch (Exception e) { throw e; } finally { lock.unLock(uuid); } return; } SpringMVCContextHolder.getSystemLogger().info("跳过执行:" + uuid); } public void dataArchivingEntryLock(String uuid) { SpringMVCContextHolder.getSystemLogger().info("开始执行归档》》》》" + uuid); WriteUtil.append("DA》》》"); WriteUtil.append("DA-已经获取到锁"); TimeInterval timer = DateUtil.timer(); String curDateTimeStr = DateUtil.date().toString(); int archivingSuccessCount = 0; JournalEntity journalEntity = new JournalEntity(); journalEntity.setResult(1); try { //获取归档服务配置详情 FieldSetEntity configFse = getBaseDao().getFieldSetEntity(CmnConst.DATA_ARCHIVING_TABLE, uuid, false); TimeInterval tempTestTimer = DateUtil.timer(); //来源数据表 String sourceTable = configFse.getString("source_table"); boolean canExecuteClearFlag = canExecuteClear(sourceTable); String deleteSubLogUUID = null; if (canExecuteClearFlag) { WriteUtil.append("DA-删除扫码库》》》"); //删除扫码库已提取到mes主库且根据配置条件过滤的数据 KT特有 // deleteSubLogUUID = this.sweepCodeLibrary(configFse); //更改为异步执行 ThreadUtil.execAsync(() -> { try { sweepCodeLibrary(configFse); } catch (SQLException e) { e.printStackTrace(); } }); } //来源数据源 String sourceDataSource = configFse.getString("source_data_source"); DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSource); //目标数据源 String targetDataSource = configFse.getString("target_data_source"); DataBaseEntity targetDbe = new DataBaseEntity(targetDataSource); //目标数据表前缀 String targetTablePrefix = configFse.getString("target_table_prefix"); if (targetTablePrefix.endsWith("_")) { targetTablePrefix = targetTablePrefix.substring(0, targetTablePrefix.length() - 1); } //来源采集ID字段 String sourceCollectIdField = configFse.getString("source_collect_id_field"); //来源唯一标识字段 由MES子库生成的值 String sourceUniqueField = configFse.getString("source_unique_field"); //唯一字段 String uniqueField = configFse.getString("unique_field"); // 时间字段 String timeField = configFse.getString("time_field"); // 提取时间字段 String extractTimeField = configFse.getString(CmnConst.EXTRACT_TIME_FIELD); Dao sourceDao = sourceDbe.getDao(); Dao targetDao = targetDbe.getDao(); Set targetTableSet; try { DataArchivingServiceImpl service = new DataArchivingServiceImpl(sourceDao, targetDao, sourceTable, uuid, sourceDbe.getDbName(), targetDbe.getDbName()); String keyPrefix = "DA_STORE:" + sourceTable + ":"; boolean serializeFlag = true; int outTime = 60 * 60;// 60 min RedisService readRedis = StringUtils.isEmpty(configFse.getString("redis_config_uuid")) ? new RedisService() : new RedisService(new DataBaseEntity(configFse.getString("redis_config_uuid"))); tempTestTimer = DateUtil.timer(); WriteUtil.append("DA-获取表名集合》》》"); if (!StringUtils.isEmpty(timeField)) { targetTableSet = QuerySqlParseUtil.getAllTableName(targetDao, targetDbe.getDbName(), sourceTable); } else { targetTableSet = Sets.newLinkedHashSet(); targetTableSet.add(targetTablePrefix); } WriteUtil.append("DA-获取表名集合耗时:" + tempTestTimer.intervalMs()); FieldSetEntity paramFse = getBaseDao().getFieldSetBySQL("select max(statistics_final_time) statistics_final_time from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false); Date preMaxTime = paramFse.getDate("statistics_final_time"); // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容) boolean turnRedisFilterFlag = preMaxTime == null; boolean turnTargetDBClearFlag = false; if (turnRedisFilterFlag) { // 若是没有成功的日志,表示为第一次归档,跳过目标库数据清理 FieldSetEntity logExistsFse = getBaseDao().getFieldSetBySQL("select count(1) count_value from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false); turnTargetDBClearFlag = "0".equals(logExistsFse.getString("count_value")); } StringBuilder paramSql = new StringBuilder(128); paramSql.append("select max(").append(uniqueField).append(") max_id,min(").append(uniqueField).append(") min_id"); if (!StringUtils.isEmpty(timeField)) { paramSql.append(",max(").append(timeField).append(") max_update_time,min(").append(timeField).append(") min_update_time"); } if (!StringUtils.isEmpty(extractTimeField)) { paramSql.append(",max(").append(extractTimeField).append(") max_extract_time,min(").append(extractTimeField).append(") min_extract_time"); } paramSql.append(" from ").append(sourceTable); StringBuilder filterSb = new StringBuilder(128); if (preMaxTime != null && (!StringUtils.isEmpty(timeField) || !StringUtils.isEmpty(extractTimeField))) { int sourceDbType = sourceDbe.getDbType().getValue(); if (!StringUtils.isEmpty(timeField)) { filterSb.append("(").append(timeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE)) .append(" and ").append(timeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")"); } if (!StringUtils.isEmpty(extractTimeField)) { if (filterSb.length() > 0) { filterSb.append(" or "); } filterSb.append("(").append(extractTimeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE)) .append(" and ").append(extractTimeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")"); } paramSql.append(" where (").append(filterSb).append(")"); } paramFse = sourceDao.getOne(paramSql.toString()); if (filterSb.length() > 0) { filterSb.insert(0, "("); filterSb.append(") and "); } if (StringUtils.isEmpty(paramFse.getString("max_id"))) { WriteUtil.append("最大id为空,跳出,执行sql:" + paramSql); return; } filterSb.append(uniqueField).append("<='").append(paramFse.getString("max_id")).append("'"); if (!StringUtils.isEmpty(configFse.getString("select_filter")) && !StringUtils.isEmpty(configFse.getString("select_filter").trim())) { filterSb.append(" and (").append(configFse.getString("select_filter")).append(")"); } WriteUtil.append("DA-sql-filter:" + filterSb); Date statisticsStartTime = getAimDate(paramFse.getDate("min_update_time"), paramFse.getDate("min_extract_time"), 0); Date statisticsFinalTime = getAimDate(paramFse.getDate("max_update_time"), paramFse.getDate("max_extract_time"), 0); String maxID = paramFse.getString("max_id"); String minID = paramFse.getString("min_id"); String splitTableType = "1".equals(configFse.getString("split_table_type")) ? "1" : "0"; //开启队列查询时,先清空队列防止上次未处理完的数据占用内存 dataArchivingQueue.clear(sourceTable); dataArchivingQueue.query(sourceDbe, sourceTable, filterSb.toString(), null, uniqueField, minID); DataTableEntity allDte; Map> groupDteMap; do { allDte = dataArchivingQueue.get(sourceTable); if (DataTableEntity.isEmpty(allDte)) { WriteUtil.append("DA-从队列中获取内容为空,执行睡眠跳过..."); Thread.sleep(RandomUtil.randomInt(800, 1200)); continue; } WriteUtil.append("DA-从队列中获取内容非空,执行插入..."); groupDteMap = dteGroupByTime(allDte, timeField, splitTableType); FieldSetEntity tempFse; String field; Object value; Date date; Date updateTime; Date extractTime; Map> sourceTableUniqueByCollectId = null; for (Map.Entry> entry : groupDteMap.entrySet()) { String time = entry.getKey(); List dteList = entry.getValue(); for (DataTableEntity list : dteList) { //创建表(不存在才创建) tempTestTimer = DateUtil.timer(); WriteUtil.append("DA-创建表》》》"); String tableName = service.createTable(targetTablePrefix, time, targetDbe); WriteUtil.append("DA-创建表耗时:" + tempTestTimer.intervalMs()); targetTableSet.add(tableName); FieldSetEntity fs = list.getFieldSetEntity(0); tempTestTimer = DateUtil.timer(); WriteUtil.append("DA-数据筛选》》》"); if (!StringUtils.isEmpty(sourceCollectIdField)) { sourceTableUniqueByCollectId = new HashMap<>(); } // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容),跳过验证 if (!turnRedisFilterFlag) { // 验证是否存在redis中,若是存在,比较时间字段值的大小,若是查询出数据中的时间更靠近当前时间,那么重置redis中的时间和过期时间,若是redis中的时间更靠近当前时间,则剔除数据集中的数据,并重置过期时间;若是不存在,则正常执行,先删除然后新增 for (int i = list.getRows() - 1; i >= 0; i--) { tempFse = list.getFieldSetEntity(i); updateTime = tempFse.getDate(timeField); extractTime = tempFse.getDate(extractTimeField); date = getAimDate(updateTime, extractTime, 0); field = tempFse.getString(uniqueField); value = readRedis.get(keyPrefix + field, serializeFlag); String sourceUniqueValue = tempFse.getString(sourceUniqueField); String collectId = tempFse.getString(sourceCollectIdField); if (!StringUtils.isEmpty(sourceCollectIdField) && sourceTableUniqueByCollectId != null && !StringUtils.isEmpty(collectId) && !StringUtils.isEmpty(sourceUniqueValue)) { Set uniqueValues = sourceTableUniqueByCollectId.computeIfAbsent(collectId, k -> new HashSet<>()); uniqueValues.add(sourceUniqueValue); } if (value != null && !StringUtils.isEmpty(value.toString())) { if (((Date) value).compareTo(date) <= 0) { readRedis.set(keyPrefix + field, date, serializeFlag); readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); statisticsStartTime = getAimDate(statisticsStartTime, date, 0); statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); minID = getAimID(minID, field, 0); maxID = getAimID(maxID, field, 1); } else { list.removeFieldSetEntity(i); readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); } } else { readRedis.set(keyPrefix + field, date, serializeFlag); readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); statisticsStartTime = getAimDate(statisticsStartTime, date, 0); statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); minID = getAimID(minID, field, 0); maxID = getAimID(maxID, field, 1); } } } WriteUtil.append("DA-数据筛选耗时:" + tempTestTimer.intervalMs()); if (DataTableEntity.isEmpty(list)) { continue; } //重设表名 fs.setTableName(tableName); list.setMeta(fs.getMeta()); Connection connection = sourceDao.getConnection(); //设置手动提交 connection.setAutoCommit(false); try { tempTestTimer = DateUtil.timer(); WriteUtil.append("DA-清理数据》》》"); // 拉取全量数据到空表(第一次归档),跳过清理 if (!turnTargetDBClearFlag) { // 清理数据 DataTableEntity clearDte = clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list); WriteUtil.append("DA-清理数据量:" + clearDte.getRows()); } WriteUtil.append("DA-清理数据耗时:" + tempTestTimer.intervalMs()); // 新增数据 tempTestTimer = DateUtil.timer(); WriteUtil.append("DA-新增数据量:" + list.getRows()); if (list.getRows() > 0) { WriteUtil.append("DA-新增数据》》》表名:" + list.getFieldSetEntity(0).getMeta().getTableName()[0]); try { targetDao.addBatch(list); } catch (Exception e) { if (turnTargetDBClearFlag) { clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list); targetDao.addBatch(list); } else { throw e; } } } WriteUtil.append("DA-新增数据耗时:" + tempTestTimer.intervalMs()); // 提交 connection.commit(); } catch (Exception e) { //若批量添加失败回滚删除 try { connection.rollback(); } catch (Exception er) { e.printStackTrace(); throw er; } throw e; } finally { //重设连接为自动提交 connection.setAutoCommit(true); } archivingSuccessCount += list.getRows(); } } } while (!dataArchivingQueue.checkQueryFinish(sourceTable) || !dataArchivingQueue.checkInsertQueueEmpty(sourceTable)); journalEntity.setSingle_duration(timer.intervalMs()); journalEntity.setStatistics_start_time(statisticsStartTime); journalEntity.setStatistics_final_time(statisticsFinalTime); journalEntity.setMin_id(minID); journalEntity.setMax_id(maxID); WriteUtil.append("DA-循环完毕"); } catch (Exception e) { WriteUtil.append("error:\n" + journalManagerService.getStackTrace(e)); throw e; } finally { targetDao.closeConnection(); sourceDao.closeConnection(); // 关闭线程 dataArchivingQueue.shutdownQueryThread(sourceTable); } // 删除mes主库的内容 if (canExecuteClearFlag) { WriteUtil.append("DA-删除mes主库内容》》》"); String deleteMasterLogUUID = deleteMasterData(sourceTable, configFse, sourceDao, targetDao, uniqueField, timeField, extractTimeField, targetTableSet, deleteSubLogUUID); if (!StringUtils.isEmpty(deleteMasterLogUUID)) { journalEntity.setPre_step_uuid(deleteMasterLogUUID); } else if (!StringUtils.isEmpty(deleteSubLogUUID)) { journalEntity.setPre_step_uuid(deleteMasterLogUUID); } WriteUtil.append("DA-删除mes主库内容完毕"); } String errorInfo = dataArchivingQueue.getErrorLog(sourceTable); if (!StringUtils.isEmpty(errorInfo)) { journalEntity.setError(errorInfo); journalEntity.setResult(0); } } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); if (!StringUtils.isEmpty(journalEntity.getError())) { journalEntity.setError(journalEntity.getError() + "\n" + journalManagerService.getStackTrace(e)); } else { journalEntity.setError(journalManagerService.getStackTrace(e)); } journalEntity.setResult(0); } finally { journalEntity.setSingle_duration(timer.intervalMs()); journalEntity.setCount(archivingSuccessCount); journalEntity.setConfigUuid(uuid); journalEntity.setType(5); journalEntity.setDetail(4); journalEntity.setCreated_utc_datetime(new Date()); FieldSetEntity curLogFse = null; if (archivingSuccessCount > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1 || !StringUtils.isEmpty(journalEntity.getPre_step_uuid())) { curLogFse = journalManagerService.autoCreateJournal(journalEntity); } // 将日志表中执行失败的记录标记为已经重新处理 WriteUtil.append("DA-将日志表中执行失败的记录标记为已经重新处理"); DataTableEntity logDte; if (curLogFse == null) { logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=?", new Object[]{uuid}); } else { logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=? and uuid<>?", new Object[]{uuid, curLogFse.getUUID()}); } for (int i = 0; i < logDte.getRows(); i++) { journalManagerService.writeBackReDealResult(logDte.getFieldSetEntity(i), true); } WriteUtil.append("DA-执行完毕"); } } /** * 清理归档重复数据 * * @param sourceTable * @param targetTableSet * @param uniqueField * @param targetDao */ private DataTableEntity clearArchiveRepeatData(String sourceTable, Set targetTableSet, String uniqueField, Dao targetDao, DataTableEntity list) { StringBuilder clearSql = new StringBuilder(128); clearSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet, Arrays.asList(uniqueField, "{#table_name#}"), false, " where " + BaseUtil.buildQuestionMarkFilter(uniqueField, list.getFieldAllValues(uniqueField), true))) .append("\nselect ").append(uniqueField).append(",_table_name from ").append(sourceTable); DataTableEntity clearDte = targetDao.getList(clearSql.toString()); if (!DataTableEntity.isEmpty(clearDte)) { Set clearTableSet = Sets.newHashSet(); clearTableSet.addAll(Arrays.asList(clearDte.getFieldAllValues("_table_name"))); for (Object targetTableName : clearTableSet) { targetDao.delete(targetTableName.toString(), BaseUtil.buildQuestionMarkFilter(uniqueField, clearDte.getRows(), true), clearDte.getData().stream().map(item -> item.getString(uniqueField)).toArray()); } } return clearDte; } /** * 清理判定,每天切换的时候可以执行一次 * * @param tableName * @return */ public boolean canExecuteClear(String tableName) { String dateStr = DateUtil.format(DateUtil.date(), "yyyy-MM-dd"); final String KEY = "DE_CLEAR_STORE"; Object preDate = RedisUtil.getHash(KEY, tableName); if (dateStr.equals(preDate)) { return false; } else { RedisUtil.setHash(KEY, tableName, dateStr); return true; } } /** * 清理主库数据 * * @param sourceTable 源表名 * @param configFse 配置fse * @param sourceDao 源dao * @param targetDao 目标dao * @param uniqueField 唯一字段 * @param timeField 更新(归档)时间字段 * @param extractTimeField 提取时间字段 * @param targetTableSet 目标表集合 * @param deleteSubLogUUID 删除子表日志uuid * @return */ private String deleteMasterData(String sourceTable, FieldSetEntity configFse, Dao sourceDao, Dao targetDao, String uniqueField, String timeField, String extractTimeField, Set targetTableSet, String deleteSubLogUUID) { FieldSetEntity deleteMasterLogFse = new FieldSetEntity(); deleteMasterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); TimeInterval deleteMasterLogTimer = DateUtil.timer(); String minID = ""; String maxID = ""; Date statisticsStartTime = null; Date statisticsFinalTime = null; try { StringBuilder sql = new StringBuilder(128); sql.append("select count(*) count_value from ").append(sourceTable); if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) { sql.append(" where ").append(configFse.getString("source_select_filter")); } else { sql.append(" where 1=2"); } FieldSetEntity paramFse = sourceDao.getOne(sql.toString()); WriteUtil.append("DA-sourceDao.getOne(sql.toString())-sql:" + sql); int totalCount = StringUtils.isEmpty(paramFse.getString("count_value")) ? 0 : paramFse.getInteger("count_value"); int delTotalCount = 0; if (totalCount > 0) { int pageSize = 1000; int totalPage = totalCount / pageSize + (totalCount % pageSize == 0 ? 0 : 1); sql.setLength(0); sql.append("select ").append(uniqueField).append(" from ").append(sourceTable); if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) { sql.append(" where ").append(configFse.getString("source_select_filter")); } StringBuilder existSql = new StringBuilder(128); existSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet)) .append("\nselect ").append(uniqueField).append(" from ").append(sourceTable); StringBuilder tempSql = new StringBuilder(128); for (int i = 0; i < totalPage; i++) { DataTableEntity delDte = sourceDao.getList(sql.toString(), new Object[]{}, 1, pageSize); WriteUtil.append("DA-删除的数据-delDte:" + Arrays.toString(delDte.getFieldAllValues(uniqueField))); if (DataTableEntity.isEmpty(delDte)) { continue; } // 验证归档库里面存在,仅删除存在,不存在的保留 tempSql.setLength(0); tempSql.append(existSql); tempSql.append(" where ").append(BaseUtil.buildQuestionMarkFilter(uniqueField, delDte.getFieldAllValues(uniqueField), true)); WriteUtil.append("DA-tempSql:" + tempSql); DataTableEntity existDte = targetDao.getList(tempSql.toString(), new Object[]{}); if (existDte.getRows() > 0) { FieldSetEntity existFse; for (int j = 0; j < existDte.getRows(); j++) { existFse = existDte.getFieldSetEntity(j); Date updateTime = existFse.getDate(timeField); Date extractTime = existFse.getDate(extractTimeField); Date date = getAimDate(updateTime, extractTime, 0); statisticsStartTime = getAimDate(statisticsStartTime, date, 0); statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); minID = getAimID(minID, existFse.getString(uniqueField), 0); maxID = getAimID(maxID, existFse.getString(uniqueField), 1); } delTotalCount += existDte.getRows(); sourceDao.delete(sourceTable, BaseUtil.buildQuestionMarkFilter(uniqueField, existDte.getRows(), true), existDte.getData().stream().map(item -> item.getString(uniqueField)).toArray()); } } WriteUtil.append("DA-删除总条数:" + delTotalCount); } deleteMasterLogFse.setValue(CmnConst.COUNT, delTotalCount); deleteMasterLogFse.setValue(CmnConst.RESULT, 1); } catch (Exception e) { e.printStackTrace(); deleteMasterLogFse.setValue(CmnConst.RESULT, 0); deleteMasterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e)); } finally { targetDao.closeConnection(); sourceDao.closeConnection(); deleteMasterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); deleteMasterLogFse.setValue(CmnConst.TYPE, 5); deleteMasterLogFse.setValue(CmnConst.DETAIL, 6); deleteMasterLogFse.setValue(CmnConst.PRE_STEP_UUID, deleteSubLogUUID); deleteMasterLogFse.setValue(CmnConst.DEAL_FLAG, 0); deleteMasterLogFse.setValue(CmnConst.DEAL_RESULT, 1); deleteMasterLogFse.setValue(CmnConst.MIN_ID, minID); deleteMasterLogFse.setValue(CmnConst.MAX_ID, maxID); deleteMasterLogFse.setValue(CmnConst.SINGLE_DURATION, deleteMasterLogTimer.intervalMs()); deleteMasterLogFse.setValue(CmnConst.CONFIG_UUID, configFse.getUUID()); deleteMasterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime); deleteMasterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime); if ((!StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.COUNT)) && deleteMasterLogFse.getInteger(CmnConst.COUNT) > 0) || !StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.ERROR)) || !"1".equals(deleteMasterLogFse.getString(CmnConst.RESULT))) { getBaseDao().add(deleteMasterLogFse); } } return deleteMasterLogFse.getUUID(); } /** * 获取指定的日期,若有一个为空,那么直接获取另外一个的值;否则按照指定取值 * * @param d1 * @param d2 * @param sign 大于0,取两者中大的,就是更靠近当前时间的;否则取小的,就是更远离当前时间的 * @return */ private Date getAimDate(Date d1, Date d2, int sign) { if (d1 == null && d2 == null) { return null; } if (d1 == null || d2 == null) { return d1 == null ? d2 : d1; } if (sign > 0) { return d1.compareTo(d2) > 0 ? d1 : d2; } else { return d1.compareTo(d2) > 0 ? d2 : d1; } } /** * 获取指定的ID,那么直接获取另外一个的值;否则按照指定取值 * * @param s1 * @param s2 * @param sign 大于0,取两者中大的;否则取小的 * @return */ private String getAimID(String s1, String s2, int sign) { if (StringUtils.isEmpty(s1) && StringUtils.isEmpty(s2)) { return null; } if (StringUtils.isEmpty(s1) || StringUtils.isEmpty(s2)) { return StringUtils.isEmpty(s1) ? s2 : s1; } String numberRegexp = "\\d{1,11}"; if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) { boolean b = Long.parseLong(s1) > Long.parseLong(s2); if (sign > 0) { return b ? s1 : s2; } else { return b ? s2 : s1; } } else { if (sign > 0) { return s1.compareTo(s2) > 0 ? s1 : s2; } else { return s1.compareTo(s2) > 0 ? s2 : s1; } } } /** * 将dte按照年份分组 * * @param dte * @param timeField 时间字段 * @param splitTableType 分表方式,0-年,1-月 * @return Map<时间, List < dte数据>> */ private Map> dteGroupByTime(DataTableEntity dte, String timeField, String splitTableType) { Map> groupDteMap = Maps.newHashMap(); FieldSetEntity fse; String time; List groupDteList; if (StringUtils.isEmpty(timeField)) { groupDteList = Lists.newArrayList(); groupDte(groupDteList, dte); groupDteMap.put("0", groupDteList); } else { SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM"); for (int i = 0; i < dte.getRows(); i++) { fse = dte.getFieldSetEntity(i); if (fse.getDate(timeField) == null) { throw new BaseException(ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getValue(), ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getText() + "\ntable_name:" + fse.getTableName() + "\ndata:" + fse.getValues()); } if ("1".equals(splitTableType)) { time = dateFormat.format(fse.getDate(timeField)); } else { time = String.valueOf(DateUtil.year(fse.getDate(timeField))); } groupDteList = groupDteMap.computeIfAbsent(time, k -> Lists.newArrayList()); groupAddDte(groupDteList, fse); } } return groupDteMap; } private void groupDte(List list, DataTableEntity allDte) { if (list == null) { throw new BaseException(ErrorCode.DATA_ARCHIVE_GROUP_CONTAINER_IS_NULL); } for (int i = 0; i < allDte.getRows(); i++) { groupAddDte(list, allDte.getFieldSetEntity(i)); } } private void groupAddDte(List list, FieldSetEntity fse) { DataTableEntity dte; if (list.isEmpty()) { dte = new DataTableEntity(); list.add(dte); } else { dte = list.get(list.size() - 1); if (dte.getRows() >= DataArchivingQueue.INSERT_PAGE_SIZE) { dte = new DataTableEntity(); list.add(dte); } } dte.addFieldSetEntity(fse); } class DataArchivingServiceImpl { private Dao sourceDao; private Dao targetDao; private String sourceTable; private String configUid; private String sourceDbName; private String targetDbName; public DataArchivingServiceImpl(Dao sourceDao, Dao targetDao, String sourceTable, String configUid, String sourceDbName, String targetDbName) { this.sourceDao = sourceDao; this.targetDao = targetDao; this.sourceTable = sourceTable; this.configUid = configUid; this.sourceDbName = sourceDbName; this.targetDbName = targetDbName; } /** * 验证数据表是否存在 * * @param tableName 数据表名 * @return */ public boolean dataTableIsExists(String tableName) throws BaseException { try { Connection connection = targetDao.getConnection(); DatabaseMetaData metaData = targetDao.getConnection().getMetaData(); ResultSet tables = metaData.getTables(null, null, tableName, new String[]{"TABLE"}); boolean result = tables.next(); tables.close(); connection.close(); return result; } catch (Exception e) { e.printStackTrace(); throw new BaseException(e); } } /** * 创建表 * * @param prefix 表名前缀 */ public String createTable(String prefix, String time, DataBaseEntity dbe) throws BaseException { String tableName; if (!"0".equals(time)) { tableName = prefix + (prefix.lastIndexOf("_") != prefix.length() - 1 ? "_" : "") + time; } else { tableName = prefix.endsWith("_") ? prefix.substring(0, prefix.length() - 1) : prefix; } if (dataTableIsExists(tableName)) { return tableName; } JSONObject tableInfoObj = getTableInfo(tableName); String sql = getSql(tableInfoObj); SpringMVCContextHolder.getSystemLogger().error("sql:\n" + sql); //先创建记录再执行ddl语句不然报错后ddl无法回滚 saveCreateTableRecord(tableName, time); targetDao.executeSql(sql); // syncDataConfigService.addTableField(dbe,dbe.getUuid(),tableName); return tableName; } /** * 根据表名获取来源数据源对应表的结构信息 * * @param tableName * @return */ private JSONObject getTableInfo(String tableName) { JSONObject tableInfoObj = new JSONObject(); tableInfoObj.put(CmnConst.NAME, tableName); DataTableEntity tempDte; FieldSetEntity tempFse; JSONObject fieldInfoObj = new JSONObject(new LinkedHashMap<>()); tableInfoObj.put(CmnConst.FIELD, fieldInfoObj); JSONObject indexInfoObj = new JSONObject(new LinkedHashMap<>()); tableInfoObj.put(CmnConst.INDEX, indexInfoObj); JSONObject singleFieldInfoObj; if (DataBaseType.MYSQL.equals(sourceDao.getDataBaseType())) { // mysql // 表 tempFse = sourceDao.getOne("information_schema.`TABLES`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}); tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("table_comment")); // 字段表 tempDte = sourceDao.getList("information_schema.`COLUMNS`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}, "ordinal_position", 1, Integer.MAX_VALUE); for (int i = 0; i < tempDte.getRows(); i++) { tempFse = tempDte.getFieldSetEntity(i); singleFieldInfoObj = new JSONObject(); fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj); singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type")); singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("character_maximum_length")) ? tempFse.getString("numeric_precision") : tempFse.getString("character_maximum_length")); singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("numeric_scale")); singleFieldInfoObj.put(CmnConst.NULLABLE, "NO".equalsIgnoreCase(tempFse.getString("is_nullable")) ? 0 : 1); singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("column_comment")); } // 索引表 StringBuilder sql = new StringBuilder(128); sql.append("select index_name,non_unique,group_concat(column_name) column_name"); sql.append("\nfrom information_schema.`STATISTICS`"); sql.append("\nwhere table_schema=? and table_name=?"); sql.append("\ngroup by index_name,non_unique"); tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, sourceTable}); for (int i = 0; i < tempDte.getRows(); i++) { tempFse = tempDte.getFieldSetEntity(i); singleFieldInfoObj = new JSONObject(); indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); singleFieldInfoObj.put(CmnConst.TYPE, "PRIMARY".equalsIgnoreCase(tempFse.getString("index_name")) ? CmnConst.PRIMARY : ("1".equals(tempFse.getString("non_unique")) ? CmnConst.NORMAL : CmnConst.UNIQUE)); singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); } } else if (DataBaseType.ORACLE.equals(sourceDao.getDataBaseType())) { // oracle // 表 String upperTableName = sourceTable.toUpperCase(); tempFse = sourceDao.getOne("SYS.USER_TAB_COMMENTS", "table_name=?", new Object[]{upperTableName}); tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments")); // 字段表 StringBuilder sql = new StringBuilder(128); sql.append("SELECT TC.COLUMN_NAME,DATA_TYPE,DATA_LENGTH,DATA_PRECISION,DATA_SCALE,NULLABLE,CHAR_LENGTH,COMMENTS FROM SYS.USER_TAB_COLUMNS TC"); sql.append("\nLEFT JOIN USER_COL_COMMENTS CC ON TC.TABLE_NAME=CC.TABLE_NAME AND TC.COLUMN_NAME=CC.COLUMN_NAME"); sql.append("\nWHERE TC.TABLE_NAME=?"); sql.append("\nORDER BY TC.COLUMN_ID"); tempDte = sourceDao.getList(sql.toString(), new Object[]{upperTableName}); for (int i = 0; i < tempDte.getRows(); i++) { tempFse = tempDte.getFieldSetEntity(i); singleFieldInfoObj = new JSONObject(); fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj); singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type").contains("VARCHAR") ? "varchar" : tempFse.getString("data_type")); singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("data_precision")) ? tempFse.getString("char_length") : tempFse.getString("data_precision")); singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("data_scale")); singleFieldInfoObj.put(CmnConst.NULLABLE, "N".equalsIgnoreCase(tempFse.getString("nullable")) ? 0 : 1); singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments")); } // 索引表 sql.setLength(0); sql.append("SELECT DIC.INDEX_NAME,WM_CONCAT(DIC.COLUMN_NAME) column_name FROM SYS.DBA_IND_COLUMNS DIC"); sql.append("\nLEFT JOIN SYS.DBA_INDEXES DI ON DIC.INDEX_NAME=DI.INDEX_NAME"); sql.append("\nWHERE UNIQUENESS='NONUNIQUE' AND DIC.TABLE_OWNER=? AND DI.TABLE_NAME=?"); sql.append("\nGROUP BY DIC.INDEX_NAME"); tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName}); for (int i = 0; i < tempDte.getRows(); i++) { tempFse = tempDte.getFieldSetEntity(i); singleFieldInfoObj = new JSONObject(); indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); singleFieldInfoObj.put(CmnConst.TYPE, CmnConst.NORMAL); singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); } // 约束表 C-检查,写到字段里面;R-外键,不要;P-主键;U-唯一键 sql.setLength(0); sql.append("SELECT DC.CONSTRAINT_NAME index_name,CONSTRAINT_TYPE,LISTAGG(COLUMN_NAME, ',') WITHIN GROUP(ORDER BY DCC.POSITION) column_name FROM SYS.DBA_CONS_COLUMNS DCC"); sql.append("\nLEFT JOIN SYS.DBA_CONSTRAINTS DC ON DCC.CONSTRAINT_NAME=DC.CONSTRAINT_NAME"); sql.append("\nWHERE DCC.OWNER=? AND DCC.TABLE_NAME=? AND CONSTRAINT_TYPE IN ('P','U')"); sql.append("\nGROUP BY DC.CONSTRAINT_NAME,CONSTRAINT_TYPE"); tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName}); WriteUtil.append("DA-DDL-:" + sql + " |||库名:" + sourceDbName + " |||表名:" + upperTableName); for (int i = 0; i < tempDte.getRows(); i++) { tempFse = tempDte.getFieldSetEntity(i); singleFieldInfoObj = new JSONObject(); indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); singleFieldInfoObj.put(CmnConst.TYPE, "P".equalsIgnoreCase(tempFse.getString("constraint_type")) ? CmnConst.PRIMARY : CmnConst.UNIQUE); singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); } WriteUtil.append("DA-DDL-创表信息:" + tableInfoObj); } else { throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL); } return tableInfoObj; } /** * 根据表结构信息,拼接DDL创建表sql语句 * * @param tableInfoObj * @return */ private String getSql(JSONObject tableInfoObj) { StringBuilder sql = new StringBuilder(128); JSONObject fieldInfoObj = tableInfoObj.getJSONObject(CmnConst.FIELD); JSONObject indexInfoObj = tableInfoObj.getJSONObject(CmnConst.INDEX); JSONObject singleFieldInfoObj; sql.append("CREATE TABLE ").append(tableInfoObj.getString(CmnConst.NAME)).append(" ("); if (DataBaseType.MYSQL.equals(targetDao.getDataBaseType())) { // mysql for (String field : fieldInfoObj.keySet()) { singleFieldInfoObj = fieldInfoObj.getJSONObject(field); sql.append("\n `").append(field.toLowerCase(Locale.ROOT)).append("` "); if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("timestamp") || singleFieldInfoObj.getString(CmnConst.TYPE).contains("TIMESTAMP")) { sql.append("timestamp "); } else if (singleFieldInfoObj.getIntValue(CmnConst.DECIMAL) > 0) { sql.append("decimal(").append(singleFieldInfoObj.getString(CmnConst.INTEGER)).append(",").append(singleFieldInfoObj.getString(CmnConst.DECIMAL)).append(") "); } else if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("int")) { sql.append("int(0) "); } else if ("number".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { if ("0".equals(singleFieldInfoObj.getString(CmnConst.DECIMAL))) { sql.append("int(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") "); } else { sql.append("decimal(22,4) "); } } else if ("date".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { sql.append("datetime(0) "); } else if ("blob".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { sql.append("blob "); } else if ("text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT).endsWith("text") || (("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) && singleFieldInfoObj.getIntValue(CmnConst.INTEGER) >= 4000) ) { sql.append("text "); } else { sql.append(singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT)).append("(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") "); } if ("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { sql.append(" CHARACTER SET utf8mb4 COLLATE utf8mb4_bin "); } if ("0".equals(singleFieldInfoObj.getString(CmnConst.NULLABLE))) { sql.append("not null "); } if (!StringUtils.isEmpty(singleFieldInfoObj.getString(CmnConst.COMMENT))) { sql.append(" comment '").append(singleFieldInfoObj.getString(CmnConst.COMMENT)).append("'"); } sql.append(","); } for (String indexName : indexInfoObj.keySet()) { singleFieldInfoObj = indexInfoObj.getJSONObject(indexName); if (CmnConst.PRIMARY.equalsIgnoreCase(indexName) || CmnConst.PRIMARY.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { sql.append("\n PRIMARY KEY (`").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append("`) USING BTREE,"); } else if (CmnConst.UNIQUE.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { sql.append("\n UNIQUE INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,"); } else if (CmnConst.NORMAL.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { sql.append("\n INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,"); } } sql.deleteCharAt(sql.length() - 1); sql.append("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin "); if (!StringUtils.isEmpty(tableInfoObj.getString(CmnConst.COMMENT))) { sql.append(" COMMENT = '").append(tableInfoObj.getString(CmnConst.COMMENT)).append("'"); } } else { throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL); } return sql.toString(); } private String saveCreateTableRecord(String tableName, String time) { /*=====================================================*/ // 新增创建表记录 FieldSetEntity fse = new FieldSetEntity(); fse.setTableName(CmnConst.DATA_ARCHIVING_SUB_TABLE); fse.setValue("table_name", tableName); fse.setValue("parent_uuid", this.configUid); fse.setValue("data_time", time); BaseUtil.createCreatorAndCreationTime(fse); getBaseDao().saveFieldSetEntity(fse); /*=====================================================*/ return tableName; } private void createIndex(String targetTable, String time) throws BaseException { StringBuilder sql = new StringBuilder(); sql.append("\n SELECT DBMS_METADATA.GET_DDL('INDEX',u.index_name) as create_index_statement,u.INDEX_NAME "); sql.append("\n from USER_INDEXES u where u.TABLE_NAME=? "); String primaryIndexName = getPrimaryIndexName(); List params = new ArrayList<>(); params.add(sourceTable); if (!StringUtils.isEmpty(primaryIndexName)) { sql.append(" and u.index_name != ? "); params.add(primaryIndexName); } DataTableEntity list = sourceDao.getList(sql.toString(), params.toArray()); if (!DataTableEntity.isEmpty(list)) { for (int i = 0; i < list.getRows(); i++) { //循环获取创建索引语句 String createIndexStatement = list.getString(i, "create_index_statement"); String indexName = list.getString(i, "index_name"); if (!StringUtils.isEmpty(createIndexStatement)) { //新的索引名称 String newIndexName = this.getIndexName(indexName, time); //将建索引ddl createIndexStatement = createIndexStatement.replaceAll(sourceTable, targetTable) .replaceAll("\"" + indexName + "\"", "\"" + newIndexName + "\""); //执行创建索引 targetDao.executeSql(createIndexStatement); } } } } /** * 获取新的索引名称 * * @param indexName * @return */ private String getIndexName(String indexName, String time) { //新的索引名称 String newIndexName; if (indexName.length() <= 26) { newIndexName = indexName + time; } else { //超过26位随机生成索引名称 newIndexName = RandomUtil.randomString(10) + time; } return newIndexName; } /** * 获取主键索引的名名称 * * @return */ private String getPrimaryIndexName() { StringBuilder sql = new StringBuilder(); sql.append("\n SELECT a.index_name from user_constraints a "); sql.append("\n WHERE a.constraint_type = 'P' "); sql.append("\n AND a.table_name = ? "); FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()}); return one != null ? one.getString("index_name") : null; } /** * 获取创建表语句(包含主键索引) * * @return */ private String getCreateTableStatement() { StringBuilder sql = new StringBuilder(); sql.append(" SELECT DBMS_METADATA.GET_DDL(U.OBJECT_TYPE, u.object_name) create_table_statement "); sql.append(" from USER_OBJECTS u "); sql.append(" where U.OBJECT_TYPE ='TABLE' and u.object_name=? "); FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()}); return one != null ? one.getString("create_table_statement") : null; } /** * 获取其他索引名称 * * @return */ private List getOtherIndexName() { StringBuilder sql = new StringBuilder(); sql.append(" SELECT INDEX_NAME FROM USER_INDEXES "); sql.append("\n WHERE TABLE_NAME=? AND INDEX_NAME NOT IN ( "); sql.append("\n SELECT a.index_name from user_constraints a "); sql.append("\n WHERE a.constraint_type = 'P' "); sql.append("\n AND a.TABLE_NAME = ? ) "); DataTableEntity list = sourceDao.getList(sql.toString(), new Object[]{this.sourceTable, this.sourceTable}); if (!DataTableEntity.isEmpty(list)) { return list.getData().stream().map(item -> item.getString("index_name")).collect(Collectors.toList()); } return null; } } }