| | |
| | | |
| | | import cn.hutool.core.date.DateUtil; |
| | | import cn.hutool.core.date.TimeInterval; |
| | | import cn.hutool.core.thread.ThreadUtil; |
| | | import cn.hutool.core.util.ArrayUtil; |
| | | import cn.hutool.core.util.RandomUtil; |
| | | import com.alibaba.fastjson.JSONObject; |
| | |
| | | @Service |
| | | public class DataArchivingService extends AbstractBaseService { |
| | | |
| | | @Autowired |
| | | JournalManagerService journalManagerService; |
| | | @Autowired |
| | | DataArchivingQueue dataArchivingQueue; |
| | | @Autowired |
| | | JournalManagerService journalManagerService; |
| | | @Autowired |
| | | DataArchivingQueue dataArchivingQueue; |
| | | |
| | | @Resource |
| | | private SyncDataConfigService syncDataConfigService; |
| | | @Resource |
| | | private SyncDataConfigService syncDataConfigService; |
| | | |
| | | private static CustomLock lock = new CustomLock(); |
| | | private static CustomLock lock = new CustomLock(); |
| | | |
| | | private Map<String, String> getCollectIds(String[] deleteDataSource, String tableName) { |
| | | StringBuilder sql = new StringBuilder("SELECT "); |
| | | sql.append("id,data_source FROM product_sys_data_collect WHERE "); |
| | | List<Object> params = new ArrayList<>(); |
| | | for (int i = 0; i < deleteDataSource.length; i++) { |
| | | String source = deleteDataSource[i]; |
| | | if (i > 0) { |
| | | sql.append(" or "); |
| | | } |
| | | sql.append("( data_source = ? and LOWER(source_table) = ? )"); |
| | | params.add(source); |
| | | params.add(tableName.toLowerCase()); |
| | | } |
| | | DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), params.toArray()); |
| | | if (DataTableEntity.isEmpty(dataTableEntity)) { |
| | | return null; |
| | | } |
| | | Map<String, String> collect = dataTableEntity.getData().stream().collect(Collectors.toMap(item -> item.getString("data_source"), item -> item.getString("id"))); |
| | | private Map<String, String> getCollectIds(String[] deleteDataSource, String tableName) { |
| | | StringBuilder sql = new StringBuilder("SELECT "); |
| | | sql.append("id,data_source FROM product_sys_data_collect WHERE "); |
| | | List<Object> params = new ArrayList<>(); |
| | | for (int i = 0; i < deleteDataSource.length; i++) { |
| | | String source = deleteDataSource[i]; |
| | | if (i > 0) { |
| | | sql.append(" or "); |
| | | } |
| | | sql.append("( data_source = ? and LOWER(source_table) = ? )"); |
| | | params.add(source); |
| | | params.add(tableName.toLowerCase()); |
| | | } |
| | | DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), params.toArray()); |
| | | if (DataTableEntity.isEmpty(dataTableEntity)) { |
| | | return null; |
| | | } |
| | | Map<String, String> collect = dataTableEntity.getData().stream().collect(Collectors.toMap(item -> item.getString("data_source"), item -> item.getString("id"))); |
| | | |
| | | if (collect.size() != deleteDataSource.length) { |
| | | throw new BaseException(ErrorCode.GET_COLLECT_ID_FAIL); |
| | | } |
| | | if (collect.size() != deleteDataSource.length) { |
| | | throw new BaseException(ErrorCode.GET_COLLECT_ID_FAIL); |
| | | } |
| | | |
| | | return collect; |
| | | } |
| | | return collect; |
| | | } |
| | | |
| | | public void reDeal(String logUuid) { |
| | | FieldSetEntity fs = getBaseDao().getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUuid, false); |
| | | if (!FieldSetEntity.isEmpty(fs)) { |
| | | String configUid = fs.getString(CmnConst.CONFIG_UUID); |
| | | reDealArchiving(configUid); |
| | | } |
| | | fs.setValue(CmnConst.DEAL_FLAG, 1); |
| | | getBaseDao().update(fs); |
| | | } |
| | | public void reDeal(String logUuid) { |
| | | FieldSetEntity fs = getBaseDao().getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, logUuid, false); |
| | | if (!FieldSetEntity.isEmpty(fs)) { |
| | | String configUid = fs.getString(CmnConst.CONFIG_UUID); |
| | | reDealArchiving(configUid); |
| | | } |
| | | fs.setValue(CmnConst.DEAL_FLAG, 1); |
| | | getBaseDao().update(fs); |
| | | } |
| | | |
| | | @Async |
| | | public void reDealArchiving(String uuid) { |
| | | this.dataArchivingEntry(uuid); |
| | | } |
| | | @Async |
| | | public void reDealArchiving(String uuid) { |
| | | this.dataArchivingEntry(uuid); |
| | | } |
| | | |
| | | /** |
| | | * 扫码库删除 |
| | | * |
| | | * @param configFse |
| | | * @throws BaseException |
| | | */ |
| | | private String sweepCodeLibrary(FieldSetEntity configFse) throws BaseException, SQLException { |
| | | JournalEntity journalEntity = new JournalEntity(); |
| | | journalEntity.setDetail(3); |
| | | TimeInterval timer = DateUtil.timer(); |
| | | int deleteSuccessCount = 0; |
| | | try { |
| | | String deleteDataSource = configFse.getString("delete_data_source"); |
| | | WriteUtil.append("DA-deleteDataSource:" + deleteDataSource); |
| | | WriteUtil.append("DA-sub_delete_select_filter:" + configFse.getString("delete_select_filter")); |
| | | if (StringUtils.isEmpty(deleteDataSource)) { |
| | | return null; |
| | | } |
| | | String[] deleteDataSourceArray = deleteDataSource.split(","); |
| | | /** |
| | | * 扫码库删除 |
| | | * |
| | | * @param configFse |
| | | * @throws BaseException |
| | | */ |
| | | private String sweepCodeLibrary(FieldSetEntity configFse) throws BaseException, SQLException { |
| | | JournalEntity journalEntity = new JournalEntity(); |
| | | journalEntity.setDetail(3); |
| | | TimeInterval timer = DateUtil.timer(); |
| | | int deleteSuccessCount = 0; |
| | | try { |
| | | String deleteDataSource = configFse.getString("delete_data_source"); |
| | | WriteUtil.append("DA-deleteDataSource:" + deleteDataSource); |
| | | WriteUtil.append("DA-sub_delete_select_filter:" + configFse.getString("delete_select_filter")); |
| | | if (StringUtils.isEmpty(deleteDataSource)) { |
| | | return null; |
| | | } |
| | | String[] deleteDataSourceArray = deleteDataSource.split(","); |
| | | // DataBaseEntity dbe = new DataBaseEntity(deleteDataSource); |
| | | |
| | | String source_data_validation = configFse.getString("source_data_validation"); |
| | | if (StringUtils.isEmpty(source_data_validation)) { |
| | | return null; |
| | | } |
| | | String deleteDataTable = configFse.getString("delete_data_table"); |
| | | if (StringUtils.isEmpty(deleteDataTable)) { |
| | | return null; |
| | | } |
| | | String source_data_validation = configFse.getString("source_data_validation"); |
| | | if (StringUtils.isEmpty(source_data_validation)) { |
| | | return null; |
| | | } |
| | | String deleteDataTable = configFse.getString("delete_data_table"); |
| | | if (StringUtils.isEmpty(deleteDataTable)) { |
| | | return null; |
| | | } |
| | | |
| | | String collectIdField = configFse.getString("collect_id_field"); |
| | | String collectIdField = configFse.getString("collect_id_field"); |
| | | |
| | | DataBaseEntity validationDbe = new DataBaseEntity(source_data_validation); |
| | | DataBaseEntity validationDbe = new DataBaseEntity(source_data_validation); |
| | | |
| | | //扫码库删除唯一标识 |
| | | String deleteUniqueField = configFse.getString("delete_unique_field"); |
| | | //扫码库删除唯一标识 |
| | | String deleteUniqueField = configFse.getString("delete_unique_field"); |
| | | |
| | | //验证表 |
| | | String validationTableName = configFse.getString("table_data_validation"); |
| | | //验证唯一字段 |
| | | String validationUniqueField = configFse.getString("validation_unique_field"); |
| | | //忽略对比字段 |
| | | List<String> ignoreComparisonFields = |
| | | !StringUtils.isEmpty(configFse.getString("ignore_comparison_field")) ? |
| | | Lists.newArrayList(Arrays.asList(configFse.getString("ignore_comparison_field").split(","))) : null; |
| | | int pageSize = 500; |
| | | DataTableEntity list; |
| | | Dao dao = null; |
| | | int currentCount = 0; |
| | | Map<String, String> collectIds = getCollectIds(deleteDataSource.split(","), deleteDataTable); |
| | | try { |
| | | String minID = null; |
| | | String maxID = null; |
| | | for (int j = 0; j < deleteDataSourceArray.length; j++) { |
| | | if (dao != null) { |
| | | dao.closeConnection(); |
| | | } |
| | | dao = new DataBaseEntity(deleteDataSourceArray[j]).getDao(); |
| | | Connection connection = dao.getConnection(); |
| | | connection.setAutoCommit(false); |
| | | do { |
| | | list = dao.getList(deleteDataTable, configFse.getString("delete_select_filter"), new Object[]{}, 1, pageSize); |
| | | WriteUtil.append("DA-删除子库数据-表名:" + deleteDataTable); |
| | | if (DataTableEntity.isEmpty(list)) { |
| | | break; |
| | | } |
| | | currentCount = list.getRows(); |
| | | Object[] uniqueValues = list.getData().stream().map(item -> item.getString(deleteUniqueField)).toArray(); |
| | | DataTableEntity validationData = validationDbe.getDao().getList(validationTableName, collectIdField + "= ? AND " + |
| | | BaseUtil.buildQuestionMarkFilter(validationUniqueField, uniqueValues.length, true), ArrayUtil.addAll(new Object[]{collectIds.get(deleteDataSourceArray[j])}, uniqueValues)); |
| | | validationDbe.getDao().closeConnection(); |
| | | if (DataTableEntity.isEmpty(validationData)) { |
| | | break; |
| | | } |
| | | Map<String, Map> collectMap = validationData.getData().stream().collect(Collectors.toMap( |
| | | (item) -> item.getString(validationUniqueField), |
| | | item -> item.getValues())); |
| | | validationData = null; |
| | | List<String> deleteUniqueValueList = new ArrayList<>(); |
| | | for (int i = 0; i < list.getRows(); i++) { |
| | | FieldSetEntity fs = list.getFieldSetEntity(i); |
| | | String uniqueValue = fs.getString(deleteUniqueField); |
| | | Map<String, Object> map = collectMap.get(uniqueValue); |
| | | if (ignoreComparisonFields != null && map != null) { |
| | | for (int i1 = 0; i1 < ignoreComparisonFields.size(); i1++) { |
| | | try { |
| | | map.remove(ignoreComparisonFields.get(i1)); |
| | | fs.remove(ignoreComparisonFields.get(i1)); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | if (map == null || !new JSONObject((Map) fs.getValues()).equals(new JSONObject(map))) { |
| | | list.removeFieldSetEntity(i); |
| | | i--; |
| | | continue; |
| | | } |
| | | deleteUniqueValueList.add(uniqueValue); |
| | | minID = getAimID(minID, uniqueValue, 0); |
| | | maxID = getAimID(maxID, uniqueValue, 1); |
| | | } |
| | | collectMap.clear(); |
| | | if (deleteUniqueValueList.size() > 0) { |
| | | deleteSuccessCount += dao.deleteRInt(configFse.getString("delete_data_table"), |
| | | BaseUtil.buildQuestionMarkFilter(deleteUniqueField, deleteUniqueValueList.size(), true), deleteUniqueValueList.toArray()); |
| | | WriteUtil.append("DA-删除子库数据-已经删除条数:" + deleteSuccessCount); |
| | | } |
| | | } while (currentCount == pageSize); |
| | | connection.commit(); |
| | | } |
| | | journalEntity.setResult(1); |
| | | journalEntity.setMin_id(minID); |
| | | journalEntity.setMax_id(maxID); |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (dao != null) { |
| | | dao.closeConnection(); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | journalEntity.setResult(0); |
| | | journalEntity.setError(journalManagerService.getStackTrace(e)); |
| | | throw e; |
| | | } finally { |
| | | journalEntity.setCount(deleteSuccessCount); |
| | | journalEntity.setCreated_utc_datetime(new Date()); |
| | | journalEntity.setSingle_duration(timer.intervalMs()); |
| | | journalEntity.setType(5); |
| | | journalEntity.setConfigUuid(configFse.getUUID()); |
| | | } |
| | | if (journalEntity != null && (journalEntity.getCount() > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1)) { |
| | | return journalManagerService.autoCreateJournal(journalEntity).getUUID(); |
| | | } |
| | | return null; |
| | | } |
| | | //验证表 |
| | | String validationTableName = configFse.getString("table_data_validation"); |
| | | //验证唯一字段 |
| | | String validationUniqueField = configFse.getString("validation_unique_field"); |
| | | //忽略对比字段 |
| | | List<String> ignoreComparisonFields = |
| | | !StringUtils.isEmpty(configFse.getString("ignore_comparison_field")) ? |
| | | Lists.newArrayList(Arrays.asList(configFse.getString("ignore_comparison_field").split(","))) : null; |
| | | int pageSize = 500; |
| | | DataTableEntity list; |
| | | Dao dao = null; |
| | | int currentCount = 0; |
| | | Map<String, String> collectIds = getCollectIds(deleteDataSource.split(","), deleteDataTable); |
| | | try { |
| | | String minID = null; |
| | | String maxID = null; |
| | | for (int j = 0; j < deleteDataSourceArray.length; j++) { |
| | | if (dao != null) { |
| | | dao.closeConnection(); |
| | | } |
| | | dao = new DataBaseEntity(deleteDataSourceArray[j]).getDao(); |
| | | do { |
| | | // String sql = "SELECT * FROM (SELECT * from " + deleteDataTable + " where " + configFse.getString("delete_select_filter") + " order by " + deleteUniqueField + " ) A "; |
| | | // if (!Objects.isNull(minID) && !Objects.isNull(maxID)) { |
| | | // sql += " where " + deleteUniqueField + " > " + maxID; |
| | | // } |
| | | String filter = configFse.getString("delete_select_filter"); |
| | | if (!Objects.isNull(minID) && !Objects.isNull(maxID)) { |
| | | filter += " and ( " + deleteUniqueField + " > " + maxID + ")"; |
| | | } |
| | | list = dao.getList(deleteDataTable, filter, new Object[]{}, deleteUniqueField, 1, pageSize); |
| | | WriteUtil.append("DA-删除子库数据-表名:" + deleteDataTable); |
| | | if (DataTableEntity.isEmpty(list)) { |
| | | break; |
| | | } |
| | | currentCount = list.getRows(); |
| | | Object[] uniqueValues = list.getData().stream().map(item -> item.getString(deleteUniqueField)).toArray(); |
| | | DataTableEntity validationData = validationDbe.getDao().getList(validationTableName, collectIdField + "= ? AND " + |
| | | BaseUtil.buildQuestionMarkFilter(validationUniqueField, uniqueValues.length, true), ArrayUtil.addAll(new Object[]{collectIds.get(deleteDataSourceArray[j])}, uniqueValues)); |
| | | validationDbe.getDao().closeConnection(); |
| | | if (DataTableEntity.isEmpty(validationData)) { |
| | | break; |
| | | } |
| | | Map<String, Map> collectMap = validationData.getData().stream().collect(Collectors.toMap( |
| | | (item) -> item.getString(validationUniqueField), |
| | | item -> item.getValues())); |
| | | List<String> deleteUniqueValueList = new ArrayList<>(); |
| | | for (int i = 0; i < list.getRows(); i++) { |
| | | FieldSetEntity fs = list.getFieldSetEntity(i); |
| | | String uniqueValue = fs.getString(deleteUniqueField); |
| | | Map<String, Object> map = collectMap.get(uniqueValue); |
| | | if (ignoreComparisonFields != null && map != null) { |
| | | for (int i1 = 0; i1 < ignoreComparisonFields.size(); i1++) { |
| | | try { |
| | | map.remove(ignoreComparisonFields.get(i1)); |
| | | fs.remove(ignoreComparisonFields.get(i1)); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | if (map == null || !new JSONObject((Map) fs.getValues()).equals(new JSONObject(map))) { |
| | | list.removeFieldSetEntity(i); |
| | | i--; |
| | | continue; |
| | | } |
| | | deleteUniqueValueList.add(uniqueValue); |
| | | minID = getAimID(minID, uniqueValue, 0); |
| | | maxID = getAimID(maxID, uniqueValue, 1); |
| | | } |
| | | collectMap.clear(); |
| | | if (deleteUniqueValueList.size() > 0) { |
| | | deleteSuccessCount += dao.deleteRInt(configFse.getString("delete_data_table"), |
| | | BaseUtil.buildQuestionMarkFilter(deleteUniqueField, deleteUniqueValueList.size(), true), deleteUniqueValueList.toArray()); |
| | | WriteUtil.append("DA-删除子库数据-已经删除条数:" + deleteSuccessCount); |
| | | |
| | | /** |
| | | * 数据归档入口 |
| | | * |
| | | * @param uuid 归档配置uuid |
| | | */ |
| | | public void dataArchivingEntry(String uuid) { |
| | | if (lock.tryLock(uuid)) { |
| | | try { |
| | | dataArchivingEntryLock(uuid); |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | lock.unLock(uuid); |
| | | } |
| | | return; |
| | | } |
| | | SpringMVCContextHolder.getSystemLogger().info("跳过执行:" + uuid); |
| | | } |
| | | } |
| | | } while (currentCount == pageSize); |
| | | } |
| | | journalEntity.setResult(1); |
| | | journalEntity.setMin_id(minID); |
| | | journalEntity.setMax_id(maxID); |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (dao != null) { |
| | | dao.closeConnection(); |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | journalEntity.setResult(0); |
| | | journalEntity.setError(journalManagerService.getStackTrace(e)); |
| | | throw e; |
| | | } finally { |
| | | journalEntity.setCount(deleteSuccessCount); |
| | | journalEntity.setCreated_utc_datetime(new Date()); |
| | | journalEntity.setSingle_duration(timer.intervalMs()); |
| | | journalEntity.setType(5); |
| | | journalEntity.setConfigUuid(configFse.getUUID()); |
| | | } |
| | | if (journalEntity != null && (journalEntity.getCount() > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1)) { |
| | | return journalManagerService.autoCreateJournal(journalEntity).getUUID(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | public void dataArchivingEntryLock(String uuid) { |
| | | SpringMVCContextHolder.getSystemLogger().info("开始执行归档》》》》" + uuid); |
| | | WriteUtil.append("DA》》》"); |
| | | WriteUtil.append("DA-已经获取到锁"); |
| | | TimeInterval timer = DateUtil.timer(); |
| | | String curDateTimeStr = DateUtil.date().toString(); |
| | | int archivingSuccessCount = 0; |
| | | JournalEntity journalEntity = new JournalEntity(); |
| | | journalEntity.setResult(1); |
| | | try { |
| | | //获取归档服务配置详情 |
| | | FieldSetEntity configFse = getBaseDao().getFieldSetEntity(CmnConst.DATA_ARCHIVING_TABLE, uuid, false); |
| | | TimeInterval tempTestTimer = DateUtil.timer(); |
| | | //来源数据表 |
| | | String sourceTable = configFse.getString("source_table"); |
| | | boolean canExecuteClearFlag = canExecuteClear(sourceTable); |
| | | String deleteSubLogUUID = null; |
| | | if (canExecuteClearFlag) { |
| | | WriteUtil.append("DA-删除扫码库》》》"); |
| | | //删除扫码库已提取到mes主库且根据配置条件过滤的数据 KT特有 |
| | | deleteSubLogUUID = this.sweepCodeLibrary(configFse); |
| | | WriteUtil.append("DA-删除扫码库耗时:" + tempTestTimer.intervalMs()); |
| | | } |
| | | /** |
| | | * 数据归档入口 |
| | | * |
| | | * @param uuid 归档配置uuid |
| | | */ |
| | | public void dataArchivingEntry(String uuid) { |
| | | if (lock.tryLock(uuid)) { |
| | | try { |
| | | dataArchivingEntryLock(uuid); |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | lock.unLock(uuid); |
| | | } |
| | | return; |
| | | } |
| | | SpringMVCContextHolder.getSystemLogger().info("跳过执行:" + uuid); |
| | | } |
| | | |
| | | //来源数据源 |
| | | String sourceDataSource = configFse.getString("source_data_source"); |
| | | DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSource); |
| | | //目标数据源 |
| | | String targetDataSource = configFse.getString("target_data_source"); |
| | | DataBaseEntity targetDbe = new DataBaseEntity(targetDataSource); |
| | | //目标数据表前缀 |
| | | String targetTablePrefix = configFse.getString("target_table_prefix"); |
| | | if (targetTablePrefix.endsWith("_")) { |
| | | targetTablePrefix = targetTablePrefix.substring(0, targetTablePrefix.length() - 1); |
| | | } |
| | | //来源采集ID字段 |
| | | String sourceCollectIdField = configFse.getString("source_collect_id_field"); |
| | | //来源唯一标识字段 由MES子库生成的值 |
| | | String sourceUniqueField = configFse.getString("source_unique_field"); |
| | | //唯一字段 |
| | | String uniqueField = configFse.getString("unique_field"); |
| | | // 时间字段 |
| | | String timeField = configFse.getString("time_field"); |
| | | // 提取时间字段 |
| | | String extractTimeField = configFse.getString(CmnConst.EXTRACT_TIME_FIELD); |
| | | public void dataArchivingEntryLock(String uuid) { |
| | | SpringMVCContextHolder.getSystemLogger().info("开始执行归档》》》》" + uuid); |
| | | WriteUtil.append("DA》》》"); |
| | | WriteUtil.append("DA-已经获取到锁"); |
| | | TimeInterval timer = DateUtil.timer(); |
| | | String curDateTimeStr = DateUtil.date().toString(); |
| | | int archivingSuccessCount = 0; |
| | | JournalEntity journalEntity = new JournalEntity(); |
| | | journalEntity.setResult(1); |
| | | try { |
| | | //获取归档服务配置详情 |
| | | FieldSetEntity configFse = getBaseDao().getFieldSetEntity(CmnConst.DATA_ARCHIVING_TABLE, uuid, false); |
| | | TimeInterval tempTestTimer = DateUtil.timer(); |
| | | //来源数据表 |
| | | String sourceTable = configFse.getString("source_table"); |
| | | boolean canExecuteClearFlag = canExecuteClear(sourceTable); |
| | | String deleteSubLogUUID = null; |
| | | |
| | | Dao sourceDao = sourceDbe.getDao(); |
| | | Dao targetDao = targetDbe.getDao(); |
| | | Set<String> targetTableSet; |
| | | try { |
| | | DataArchivingServiceImpl service = new DataArchivingServiceImpl(sourceDao, targetDao, sourceTable, uuid, sourceDbe.getDbName(), targetDbe.getDbName()); |
| | | String keyPrefix = "DA_STORE:" + sourceTable + ":"; |
| | | boolean serializeFlag = true; |
| | | int outTime = 60 * 60;// 60 min |
| | | RedisService readRedis = StringUtils.isEmpty(configFse.getString("redis_config_uuid")) ? new RedisService() : new RedisService(new DataBaseEntity(configFse.getString("redis_config_uuid"))); |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-获取表名集合》》》"); |
| | | if (!StringUtils.isEmpty(timeField)) { |
| | | targetTableSet = QuerySqlParseUtil.getAllTableName(targetDao, targetDbe.getDbName(), sourceTable); |
| | | } else { |
| | | targetTableSet = Sets.newLinkedHashSet(); |
| | | targetTableSet.add(targetTablePrefix); |
| | | } |
| | | WriteUtil.append("DA-获取表名集合耗时:" + tempTestTimer.intervalMs()); |
| | | FieldSetEntity paramFse = getBaseDao().getFieldSetBySQL("select max(statistics_final_time) statistics_final_time from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false); |
| | | Date preMaxTime = paramFse.getDate("statistics_final_time"); |
| | | // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容) |
| | | boolean turnRedisFilterFlag = preMaxTime == null; |
| | | boolean turnTargetDBClearFlag = false; |
| | | if (turnRedisFilterFlag) { |
| | | // 若是没有成功的日志,表示为第一次归档,跳过目标库数据清理 |
| | | FieldSetEntity logExistsFse = getBaseDao().getFieldSetBySQL("select count(1) count_value from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false); |
| | | turnTargetDBClearFlag = "0".equals(logExistsFse.getString("count_value")); |
| | | } |
| | | StringBuilder paramSql = new StringBuilder(128); |
| | | paramSql.append("select max(").append(uniqueField).append(") max_id,min(").append(uniqueField).append(") min_id"); |
| | | if (!StringUtils.isEmpty(timeField)) { |
| | | paramSql.append(",max(").append(timeField).append(") max_update_time,min(").append(timeField).append(") min_update_time"); |
| | | } |
| | | if (!StringUtils.isEmpty(extractTimeField)) { |
| | | paramSql.append(",max(").append(extractTimeField).append(") max_extract_time,min(").append(extractTimeField).append(") min_extract_time"); |
| | | } |
| | | paramSql.append(" from ").append(sourceTable); |
| | | StringBuilder filterSb = new StringBuilder(128); |
| | | if (preMaxTime != null && (!StringUtils.isEmpty(timeField) || !StringUtils.isEmpty(extractTimeField))) { |
| | | int sourceDbType = sourceDbe.getDbType().getValue(); |
| | | if (!StringUtils.isEmpty(timeField)) { |
| | | filterSb.append("(").append(timeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE)) |
| | | .append(" and ").append(timeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")"); |
| | | } |
| | | if (!StringUtils.isEmpty(extractTimeField)) { |
| | | if (filterSb.length() > 0) { |
| | | filterSb.append(" or "); |
| | | } |
| | | filterSb.append("(").append(extractTimeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE)) |
| | | .append(" and ").append(extractTimeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")"); |
| | | } |
| | | paramSql.append(" where (").append(filterSb).append(")"); |
| | | } |
| | | paramFse = sourceDao.getOne(paramSql.toString()); |
| | | if (filterSb.length() > 0) { |
| | | filterSb.insert(0, "("); |
| | | filterSb.append(") and "); |
| | | } |
| | | if (StringUtils.isEmpty(paramFse.getString("max_id"))) { |
| | | WriteUtil.append("最大id为空,跳出,执行sql:" + paramSql); |
| | | return; |
| | | } |
| | | filterSb.append(uniqueField).append("<='").append(paramFse.getString("max_id")).append("'"); |
| | | if (!StringUtils.isEmpty(configFse.getString("select_filter")) && !StringUtils.isEmpty(configFse.getString("select_filter").trim())) { |
| | | filterSb.append(" and (").append(configFse.getString("select_filter")).append(")"); |
| | | } |
| | | WriteUtil.append("DA-sql-filter:" + filterSb); |
| | | Date statisticsStartTime = getAimDate(paramFse.getDate("min_update_time"), paramFse.getDate("min_extract_time"), 0); |
| | | Date statisticsFinalTime = getAimDate(paramFse.getDate("max_update_time"), paramFse.getDate("max_extract_time"), 0); |
| | | String maxID = paramFse.getString("max_id"); |
| | | String minID = paramFse.getString("min_id"); |
| | | String splitTableType = "1".equals(configFse.getString("split_table_type")) ? "1" : "0"; |
| | | dataArchivingQueue.query(sourceDbe, sourceTable, filterSb.toString(), null, uniqueField, minID); |
| | | DataTableEntity allDte; |
| | | Map<String, List<DataTableEntity>> groupDteMap; |
| | | do { |
| | | allDte = dataArchivingQueue.get(sourceTable); |
| | | if (DataTableEntity.isEmpty(allDte)) { |
| | | WriteUtil.append("DA-从队列中获取内容为空,执行睡眠跳过..."); |
| | | Thread.sleep(RandomUtil.randomInt(800, 1200)); |
| | | continue; |
| | | } |
| | | WriteUtil.append("DA-从队列中获取内容非空,执行插入..."); |
| | | groupDteMap = dteGroupByTime(allDte, timeField, splitTableType); |
| | | FieldSetEntity tempFse; |
| | | String field; |
| | | Object value; |
| | | Date date; |
| | | Date updateTime; |
| | | Date extractTime; |
| | | Map<String, Set<String>> sourceTableUniqueByCollectId = null; |
| | | for (Map.Entry<String, List<DataTableEntity>> entry : groupDteMap.entrySet()) { |
| | | String time = entry.getKey(); |
| | | List<DataTableEntity> dteList = entry.getValue(); |
| | | for (DataTableEntity list : dteList) { |
| | | //创建表(不存在才创建) |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-创建表》》》"); |
| | | String tableName = service.createTable(targetTablePrefix, time, targetDbe); |
| | | WriteUtil.append("DA-创建表耗时:" + tempTestTimer.intervalMs()); |
| | | targetTableSet.add(tableName); |
| | | FieldSetEntity fs = list.getFieldSetEntity(0); |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-数据筛选》》》"); |
| | | if (!StringUtils.isEmpty(sourceCollectIdField)) { |
| | | sourceTableUniqueByCollectId = new HashMap<>(); |
| | | } |
| | | // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容),跳过验证 |
| | | if (!turnRedisFilterFlag) { |
| | | // 验证是否存在redis中,若是存在,比较时间字段值的大小,若是查询出数据中的时间更靠近当前时间,那么重置redis中的时间和过期时间,若是redis中的时间更靠近当前时间,则剔除数据集中的数据,并重置过期时间;若是不存在,则正常执行,先删除然后新增 |
| | | for (int i = list.getRows() - 1; i >= 0; i--) { |
| | | tempFse = list.getFieldSetEntity(i); |
| | | updateTime = tempFse.getDate(timeField); |
| | | extractTime = tempFse.getDate(extractTimeField); |
| | | date = getAimDate(updateTime, extractTime, 0); |
| | | field = tempFse.getString(uniqueField); |
| | | value = readRedis.get(keyPrefix + field, serializeFlag); |
| | | String sourceUniqueValue = tempFse.getString(sourceUniqueField); |
| | | String collectId = tempFse.getString(sourceCollectIdField); |
| | | if (!StringUtils.isEmpty(sourceCollectIdField) && sourceTableUniqueByCollectId != null |
| | | && !StringUtils.isEmpty(collectId) && !StringUtils.isEmpty(sourceUniqueValue)) { |
| | | Set<String> uniqueValues = sourceTableUniqueByCollectId.computeIfAbsent(collectId, k -> new HashSet<>()); |
| | | uniqueValues.add(sourceUniqueValue); |
| | | } |
| | | if (value != null && !StringUtils.isEmpty(value.toString())) { |
| | | if (((Date) value).compareTo(date) <= 0) { |
| | | readRedis.set(keyPrefix + field, date, serializeFlag); |
| | | readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); |
| | | statisticsStartTime = getAimDate(statisticsStartTime, date, 0); |
| | | statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); |
| | | minID = getAimID(minID, field, 0); |
| | | maxID = getAimID(maxID, field, 1); |
| | | } else { |
| | | list.removeFieldSetEntity(i); |
| | | readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); |
| | | } |
| | | } else { |
| | | readRedis.set(keyPrefix + field, date, serializeFlag); |
| | | readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); |
| | | statisticsStartTime = getAimDate(statisticsStartTime, date, 0); |
| | | statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); |
| | | minID = getAimID(minID, field, 0); |
| | | maxID = getAimID(maxID, field, 1); |
| | | } |
| | | } |
| | | } |
| | | WriteUtil.append("DA-数据筛选耗时:" + tempTestTimer.intervalMs()); |
| | | if (DataTableEntity.isEmpty(list)) { |
| | | continue; |
| | | } |
| | | //重设表名 |
| | | fs.setTableName(tableName); |
| | | list.setMeta(fs.getMeta()); |
| | | Connection connection = sourceDao.getConnection(); |
| | | //设置手动提交 |
| | | connection.setAutoCommit(false); |
| | | try { |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-清理数据》》》"); |
| | | // 拉取全量数据到空表(第一次归档),跳过清理 |
| | | if (!turnTargetDBClearFlag) { |
| | | // 清理数据 |
| | | DataTableEntity clearDte = clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list); |
| | | WriteUtil.append("DA-清理数据量:" + clearDte.getRows()); |
| | | } |
| | | WriteUtil.append("DA-清理数据耗时:" + tempTestTimer.intervalMs()); |
| | | // 新增数据 |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-新增数据量:" + list.getRows()); |
| | | if (list.getRows() > 0) { |
| | | WriteUtil.append("DA-新增数据》》》表名:" + list.getFieldSetEntity(0).getMeta().getTableName()[0]); |
| | | try { |
| | | targetDao.addBatch(list); |
| | | } catch (Exception e) { |
| | | if (turnTargetDBClearFlag) { |
| | | clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list); |
| | | targetDao.addBatch(list); |
| | | } else { |
| | | throw e; |
| | | } |
| | | } |
| | | } |
| | | WriteUtil.append("DA-新增数据耗时:" + tempTestTimer.intervalMs()); |
| | | // 提交 |
| | | connection.commit(); |
| | | } catch (Exception e) { |
| | | if (canExecuteClearFlag) { |
| | | WriteUtil.append("DA-删除扫码库》》》"); |
| | | //删除扫码库已提取到mes主库且根据配置条件过滤的数据 KT特有 |
| | | // deleteSubLogUUID = this.sweepCodeLibrary(configFse); |
| | | //更改为异步执行 |
| | | ThreadUtil.execAsync(() -> { |
| | | try { |
| | | sweepCodeLibrary(configFse); |
| | | } catch (SQLException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | //若批量添加失败回滚删除 |
| | | try { |
| | | connection.rollback(); |
| | | } catch (Exception er) { |
| | | e.printStackTrace(); |
| | | throw er; |
| | | } |
| | | throw e; |
| | | } finally { |
| | | //重设连接为自动提交 |
| | | connection.setAutoCommit(true); |
| | | } |
| | | archivingSuccessCount += list.getRows(); |
| | | } |
| | | } |
| | | } while (!dataArchivingQueue.checkQueryFinish(sourceTable) || !dataArchivingQueue.checkInsertQueueEmpty(sourceTable)); |
| | | journalEntity.setSingle_duration(timer.intervalMs()); |
| | | journalEntity.setStatistics_start_time(statisticsStartTime); |
| | | journalEntity.setStatistics_final_time(statisticsFinalTime); |
| | | journalEntity.setMin_id(minID); |
| | | journalEntity.setMax_id(maxID); |
| | | WriteUtil.append("DA-循环完毕"); |
| | | } catch (Exception e) { |
| | | WriteUtil.append("error:\n" + journalManagerService.getStackTrace(e)); |
| | | throw e; |
| | | } finally { |
| | | targetDao.closeConnection(); |
| | | sourceDao.closeConnection(); |
| | | // 关闭线程 |
| | | dataArchivingQueue.shutdownQueryThread(sourceTable); |
| | | } |
| | | //来源数据源 |
| | | String sourceDataSource = configFse.getString("source_data_source"); |
| | | DataBaseEntity sourceDbe = new DataBaseEntity(sourceDataSource); |
| | | //目标数据源 |
| | | String targetDataSource = configFse.getString("target_data_source"); |
| | | DataBaseEntity targetDbe = new DataBaseEntity(targetDataSource); |
| | | //目标数据表前缀 |
| | | String targetTablePrefix = configFse.getString("target_table_prefix"); |
| | | if (targetTablePrefix.endsWith("_")) { |
| | | targetTablePrefix = targetTablePrefix.substring(0, targetTablePrefix.length() - 1); |
| | | } |
| | | //来源采集ID字段 |
| | | String sourceCollectIdField = configFse.getString("source_collect_id_field"); |
| | | //来源唯一标识字段 由MES子库生成的值 |
| | | String sourceUniqueField = configFse.getString("source_unique_field"); |
| | | //唯一字段 |
| | | String uniqueField = configFse.getString("unique_field"); |
| | | // 时间字段 |
| | | String timeField = configFse.getString("time_field"); |
| | | // 提取时间字段 |
| | | String extractTimeField = configFse.getString(CmnConst.EXTRACT_TIME_FIELD); |
| | | |
| | | // 删除mes主库的内容 |
| | | if (canExecuteClearFlag) { |
| | | WriteUtil.append("DA-删除mes主库内容》》》"); |
| | | String deleteMasterLogUUID = deleteMasterData(sourceTable, configFse, sourceDao, targetDao, uniqueField, timeField, extractTimeField, targetTableSet, deleteSubLogUUID); |
| | | if (!StringUtils.isEmpty(deleteMasterLogUUID)) { |
| | | journalEntity.setPre_step_uuid(deleteMasterLogUUID); |
| | | } else if (!StringUtils.isEmpty(deleteSubLogUUID)) { |
| | | journalEntity.setPre_step_uuid(deleteMasterLogUUID); |
| | | } |
| | | WriteUtil.append("DA-删除mes主库内容完毕"); |
| | | } |
| | | String errorInfo = dataArchivingQueue.getErrorLog(sourceTable); |
| | | if (!StringUtils.isEmpty(errorInfo)) { |
| | | journalEntity.setError(errorInfo); |
| | | journalEntity.setResult(0); |
| | | } |
| | | } catch (Exception e) { |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | if (!StringUtils.isEmpty(journalEntity.getError())) { |
| | | journalEntity.setError(journalEntity.getError() + "\n" + journalManagerService.getStackTrace(e)); |
| | | } else { |
| | | journalEntity.setError(journalManagerService.getStackTrace(e)); |
| | | } |
| | | journalEntity.setResult(0); |
| | | } finally { |
| | | journalEntity.setSingle_duration(timer.intervalMs()); |
| | | journalEntity.setCount(archivingSuccessCount); |
| | | journalEntity.setConfigUuid(uuid); |
| | | journalEntity.setType(5); |
| | | journalEntity.setDetail(4); |
| | | journalEntity.setCreated_utc_datetime(new Date()); |
| | | FieldSetEntity curLogFse = null; |
| | | if (archivingSuccessCount > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1 || !StringUtils.isEmpty(journalEntity.getPre_step_uuid())) { |
| | | curLogFse = journalManagerService.autoCreateJournal(journalEntity); |
| | | } |
| | | Dao sourceDao = sourceDbe.getDao(); |
| | | Dao targetDao = targetDbe.getDao(); |
| | | Set<String> targetTableSet; |
| | | try { |
| | | DataArchivingServiceImpl service = new DataArchivingServiceImpl(sourceDao, targetDao, sourceTable, uuid, sourceDbe.getDbName(), targetDbe.getDbName()); |
| | | String keyPrefix = "DA_STORE:" + sourceTable + ":"; |
| | | boolean serializeFlag = true; |
| | | int outTime = 60 * 60;// 60 min |
| | | RedisService readRedis = StringUtils.isEmpty(configFse.getString("redis_config_uuid")) ? new RedisService() : new RedisService(new DataBaseEntity(configFse.getString("redis_config_uuid"))); |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-获取表名集合》》》"); |
| | | if (!StringUtils.isEmpty(timeField)) { |
| | | targetTableSet = QuerySqlParseUtil.getAllTableName(targetDao, targetDbe.getDbName(), sourceTable); |
| | | } else { |
| | | targetTableSet = Sets.newLinkedHashSet(); |
| | | targetTableSet.add(targetTablePrefix); |
| | | } |
| | | WriteUtil.append("DA-获取表名集合耗时:" + tempTestTimer.intervalMs()); |
| | | FieldSetEntity paramFse = getBaseDao().getFieldSetBySQL("select max(statistics_final_time) statistics_final_time from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false); |
| | | Date preMaxTime = paramFse.getDate("statistics_final_time"); |
| | | // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容) |
| | | boolean turnRedisFilterFlag = preMaxTime == null; |
| | | boolean turnTargetDBClearFlag = false; |
| | | if (turnRedisFilterFlag) { |
| | | // 若是没有成功的日志,表示为第一次归档,跳过目标库数据清理 |
| | | FieldSetEntity logExistsFse = getBaseDao().getFieldSetBySQL("select count(1) count_value from product_sys_data_center_log where type=5 and detail=4 and result=1 and config_uuid=?", new Object[]{uuid}, false); |
| | | turnTargetDBClearFlag = "0".equals(logExistsFse.getString("count_value")); |
| | | } |
| | | StringBuilder paramSql = new StringBuilder(128); |
| | | paramSql.append("select max(").append(uniqueField).append(") max_id,min(").append(uniqueField).append(") min_id"); |
| | | if (!StringUtils.isEmpty(timeField)) { |
| | | paramSql.append(",max(").append(timeField).append(") max_update_time,min(").append(timeField).append(") min_update_time"); |
| | | } |
| | | if (!StringUtils.isEmpty(extractTimeField)) { |
| | | paramSql.append(",max(").append(extractTimeField).append(") max_extract_time,min(").append(extractTimeField).append(") min_extract_time"); |
| | | } |
| | | paramSql.append(" from ").append(sourceTable); |
| | | StringBuilder filterSb = new StringBuilder(128); |
| | | if (preMaxTime != null && (!StringUtils.isEmpty(timeField) || !StringUtils.isEmpty(extractTimeField))) { |
| | | int sourceDbType = sourceDbe.getDbType().getValue(); |
| | | if (!StringUtils.isEmpty(timeField)) { |
| | | filterSb.append("(").append(timeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE)) |
| | | .append(" and ").append(timeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")"); |
| | | } |
| | | if (!StringUtils.isEmpty(extractTimeField)) { |
| | | if (filterSb.length() > 0) { |
| | | filterSb.append(" or "); |
| | | } |
| | | filterSb.append("(").append(extractTimeField).append(">=").append(SqlTransferUtil.addDate(sourceDbType, SqlTransferUtil.str2Date(sourceDbType, preMaxTime), -2, SqlTransferUtil.MINUTE)) |
| | | .append(" and ").append(extractTimeField).append("<=").append(SqlTransferUtil.str2Date(sourceDbType, curDateTimeStr)).append(")"); |
| | | } |
| | | paramSql.append(" where (").append(filterSb).append(")"); |
| | | } |
| | | paramFse = sourceDao.getOne(paramSql.toString()); |
| | | if (filterSb.length() > 0) { |
| | | filterSb.insert(0, "("); |
| | | filterSb.append(") and "); |
| | | } |
| | | if (StringUtils.isEmpty(paramFse.getString("max_id"))) { |
| | | WriteUtil.append("最大id为空,跳出,执行sql:" + paramSql); |
| | | return; |
| | | } |
| | | filterSb.append(uniqueField).append("<='").append(paramFse.getString("max_id")).append("'"); |
| | | if (!StringUtils.isEmpty(configFse.getString("select_filter")) && !StringUtils.isEmpty(configFse.getString("select_filter").trim())) { |
| | | filterSb.append(" and (").append(configFse.getString("select_filter")).append(")"); |
| | | } |
| | | WriteUtil.append("DA-sql-filter:" + filterSb); |
| | | Date statisticsStartTime = getAimDate(paramFse.getDate("min_update_time"), paramFse.getDate("min_extract_time"), 0); |
| | | Date statisticsFinalTime = getAimDate(paramFse.getDate("max_update_time"), paramFse.getDate("max_extract_time"), 0); |
| | | String maxID = paramFse.getString("max_id"); |
| | | String minID = paramFse.getString("min_id"); |
| | | String splitTableType = "1".equals(configFse.getString("split_table_type")) ? "1" : "0"; |
| | | dataArchivingQueue.query(sourceDbe, sourceTable, filterSb.toString(), null, uniqueField, minID); |
| | | DataTableEntity allDte; |
| | | Map<String, List<DataTableEntity>> groupDteMap; |
| | | do { |
| | | allDte = dataArchivingQueue.get(sourceTable); |
| | | if (DataTableEntity.isEmpty(allDte)) { |
| | | WriteUtil.append("DA-从队列中获取内容为空,执行睡眠跳过..."); |
| | | Thread.sleep(RandomUtil.randomInt(800, 1200)); |
| | | continue; |
| | | } |
| | | WriteUtil.append("DA-从队列中获取内容非空,执行插入..."); |
| | | groupDteMap = dteGroupByTime(allDte, timeField, splitTableType); |
| | | FieldSetEntity tempFse; |
| | | String field; |
| | | Object value; |
| | | Date date; |
| | | Date updateTime; |
| | | Date extractTime; |
| | | Map<String, Set<String>> sourceTableUniqueByCollectId = null; |
| | | for (Map.Entry<String, List<DataTableEntity>> entry : groupDteMap.entrySet()) { |
| | | String time = entry.getKey(); |
| | | List<DataTableEntity> dteList = entry.getValue(); |
| | | for (DataTableEntity list : dteList) { |
| | | //创建表(不存在才创建) |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-创建表》》》"); |
| | | String tableName = service.createTable(targetTablePrefix, time, targetDbe); |
| | | WriteUtil.append("DA-创建表耗时:" + tempTestTimer.intervalMs()); |
| | | targetTableSet.add(tableName); |
| | | FieldSetEntity fs = list.getFieldSetEntity(0); |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-数据筛选》》》"); |
| | | if (!StringUtils.isEmpty(sourceCollectIdField)) { |
| | | sourceTableUniqueByCollectId = new HashMap<>(); |
| | | } |
| | | // 没有成功的最大统计时间:要么是第一次归档,要么是归档的表没有设定时间字段(所有数据都在一张表上,每次都会拉取这张表当时的所有内容),跳过验证 |
| | | if (!turnRedisFilterFlag) { |
| | | // 验证是否存在redis中,若是存在,比较时间字段值的大小,若是查询出数据中的时间更靠近当前时间,那么重置redis中的时间和过期时间,若是redis中的时间更靠近当前时间,则剔除数据集中的数据,并重置过期时间;若是不存在,则正常执行,先删除然后新增 |
| | | for (int i = list.getRows() - 1; i >= 0; i--) { |
| | | tempFse = list.getFieldSetEntity(i); |
| | | updateTime = tempFse.getDate(timeField); |
| | | extractTime = tempFse.getDate(extractTimeField); |
| | | date = getAimDate(updateTime, extractTime, 0); |
| | | field = tempFse.getString(uniqueField); |
| | | value = readRedis.get(keyPrefix + field, serializeFlag); |
| | | String sourceUniqueValue = tempFse.getString(sourceUniqueField); |
| | | String collectId = tempFse.getString(sourceCollectIdField); |
| | | if (!StringUtils.isEmpty(sourceCollectIdField) && sourceTableUniqueByCollectId != null |
| | | && !StringUtils.isEmpty(collectId) && !StringUtils.isEmpty(sourceUniqueValue)) { |
| | | Set<String> uniqueValues = sourceTableUniqueByCollectId.computeIfAbsent(collectId, k -> new HashSet<>()); |
| | | uniqueValues.add(sourceUniqueValue); |
| | | } |
| | | if (value != null && !StringUtils.isEmpty(value.toString())) { |
| | | if (((Date) value).compareTo(date) <= 0) { |
| | | readRedis.set(keyPrefix + field, date, serializeFlag); |
| | | readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); |
| | | statisticsStartTime = getAimDate(statisticsStartTime, date, 0); |
| | | statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); |
| | | minID = getAimID(minID, field, 0); |
| | | maxID = getAimID(maxID, field, 1); |
| | | } else { |
| | | list.removeFieldSetEntity(i); |
| | | readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); |
| | | } |
| | | } else { |
| | | readRedis.set(keyPrefix + field, date, serializeFlag); |
| | | readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); |
| | | statisticsStartTime = getAimDate(statisticsStartTime, date, 0); |
| | | statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); |
| | | minID = getAimID(minID, field, 0); |
| | | maxID = getAimID(maxID, field, 1); |
| | | } |
| | | } |
| | | } |
| | | WriteUtil.append("DA-数据筛选耗时:" + tempTestTimer.intervalMs()); |
| | | if (DataTableEntity.isEmpty(list)) { |
| | | continue; |
| | | } |
| | | //重设表名 |
| | | fs.setTableName(tableName); |
| | | list.setMeta(fs.getMeta()); |
| | | Connection connection = sourceDao.getConnection(); |
| | | //设置手动提交 |
| | | connection.setAutoCommit(false); |
| | | try { |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-清理数据》》》"); |
| | | // 拉取全量数据到空表(第一次归档),跳过清理 |
| | | if (!turnTargetDBClearFlag) { |
| | | // 清理数据 |
| | | DataTableEntity clearDte = clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list); |
| | | WriteUtil.append("DA-清理数据量:" + clearDte.getRows()); |
| | | } |
| | | WriteUtil.append("DA-清理数据耗时:" + tempTestTimer.intervalMs()); |
| | | // 新增数据 |
| | | tempTestTimer = DateUtil.timer(); |
| | | WriteUtil.append("DA-新增数据量:" + list.getRows()); |
| | | if (list.getRows() > 0) { |
| | | WriteUtil.append("DA-新增数据》》》表名:" + list.getFieldSetEntity(0).getMeta().getTableName()[0]); |
| | | try { |
| | | targetDao.addBatch(list); |
| | | } catch (Exception e) { |
| | | if (turnTargetDBClearFlag) { |
| | | clearArchiveRepeatData(sourceTable, targetTableSet, uniqueField, targetDao, list); |
| | | targetDao.addBatch(list); |
| | | } else { |
| | | throw e; |
| | | } |
| | | } |
| | | } |
| | | WriteUtil.append("DA-新增数据耗时:" + tempTestTimer.intervalMs()); |
| | | // 提交 |
| | | connection.commit(); |
| | | } catch (Exception e) { |
| | | |
| | | // 将日志表中执行失败的记录标记为已经重新处理 |
| | | WriteUtil.append("DA-将日志表中执行失败的记录标记为已经重新处理"); |
| | | DataTableEntity logDte; |
| | | if (curLogFse == null) { |
| | | logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=?", new Object[]{uuid}); |
| | | } else { |
| | | logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=? and uuid<>?", new Object[]{uuid, curLogFse.getUUID()}); |
| | | } |
| | | for (int i = 0; i < logDte.getRows(); i++) { |
| | | journalManagerService.writeBackReDealResult(logDte.getFieldSetEntity(i), true); |
| | | } |
| | | WriteUtil.append("DA-执行完毕"); |
| | | } |
| | | } |
| | | //若批量添加失败回滚删除 |
| | | try { |
| | | connection.rollback(); |
| | | } catch (Exception er) { |
| | | e.printStackTrace(); |
| | | throw er; |
| | | } |
| | | throw e; |
| | | } finally { |
| | | //重设连接为自动提交 |
| | | connection.setAutoCommit(true); |
| | | } |
| | | archivingSuccessCount += list.getRows(); |
| | | } |
| | | } |
| | | } while (!dataArchivingQueue.checkQueryFinish(sourceTable) || !dataArchivingQueue.checkInsertQueueEmpty(sourceTable)); |
| | | journalEntity.setSingle_duration(timer.intervalMs()); |
| | | journalEntity.setStatistics_start_time(statisticsStartTime); |
| | | journalEntity.setStatistics_final_time(statisticsFinalTime); |
| | | journalEntity.setMin_id(minID); |
| | | journalEntity.setMax_id(maxID); |
| | | WriteUtil.append("DA-循环完毕"); |
| | | } catch (Exception e) { |
| | | WriteUtil.append("error:\n" + journalManagerService.getStackTrace(e)); |
| | | throw e; |
| | | } finally { |
| | | targetDao.closeConnection(); |
| | | sourceDao.closeConnection(); |
| | | // 关闭线程 |
| | | dataArchivingQueue.shutdownQueryThread(sourceTable); |
| | | } |
| | | |
| | | /** |
| | | * 清理归档重复数据 |
| | | * @param sourceTable |
| | | * @param targetTableSet |
| | | * @param uniqueField |
| | | * @param targetDao |
| | | */ |
| | | private DataTableEntity clearArchiveRepeatData(String sourceTable, Set<String> targetTableSet, String uniqueField, Dao targetDao, DataTableEntity list) { |
| | | StringBuilder clearSql = new StringBuilder(128); |
| | | clearSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet, Arrays.asList(uniqueField, "{#table_name#}"), false, " where " + BaseUtil.buildQuestionMarkFilter(uniqueField, list.getFieldAllValues(uniqueField), true))) |
| | | .append("\nselect ").append(uniqueField).append(",_table_name from ").append(sourceTable); |
| | | DataTableEntity clearDte = targetDao.getList(clearSql.toString()); |
| | | if (!DataTableEntity.isEmpty(clearDte)) { |
| | | Set<Object> clearTableSet = Sets.newHashSet(); |
| | | clearTableSet.addAll(Arrays.asList(clearDte.getFieldAllValues("_table_name"))); |
| | | for (Object targetTableName : clearTableSet) { |
| | | targetDao.delete(targetTableName.toString(), |
| | | BaseUtil.buildQuestionMarkFilter(uniqueField, clearDte.getRows(), true), |
| | | clearDte.getData().stream().map(item -> item.getString(uniqueField)).toArray()); |
| | | } |
| | | } |
| | | return clearDte; |
| | | } |
| | | // 删除mes主库的内容 |
| | | if (canExecuteClearFlag) { |
| | | WriteUtil.append("DA-删除mes主库内容》》》"); |
| | | String deleteMasterLogUUID = deleteMasterData(sourceTable, configFse, sourceDao, targetDao, uniqueField, timeField, extractTimeField, targetTableSet, deleteSubLogUUID); |
| | | if (!StringUtils.isEmpty(deleteMasterLogUUID)) { |
| | | journalEntity.setPre_step_uuid(deleteMasterLogUUID); |
| | | } else if (!StringUtils.isEmpty(deleteSubLogUUID)) { |
| | | journalEntity.setPre_step_uuid(deleteMasterLogUUID); |
| | | } |
| | | WriteUtil.append("DA-删除mes主库内容完毕"); |
| | | } |
| | | String errorInfo = dataArchivingQueue.getErrorLog(sourceTable); |
| | | if (!StringUtils.isEmpty(errorInfo)) { |
| | | journalEntity.setError(errorInfo); |
| | | journalEntity.setResult(0); |
| | | } |
| | | } catch (Exception e) { |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | if (!StringUtils.isEmpty(journalEntity.getError())) { |
| | | journalEntity.setError(journalEntity.getError() + "\n" + journalManagerService.getStackTrace(e)); |
| | | } else { |
| | | journalEntity.setError(journalManagerService.getStackTrace(e)); |
| | | } |
| | | journalEntity.setResult(0); |
| | | } finally { |
| | | journalEntity.setSingle_duration(timer.intervalMs()); |
| | | journalEntity.setCount(archivingSuccessCount); |
| | | journalEntity.setConfigUuid(uuid); |
| | | journalEntity.setType(5); |
| | | journalEntity.setDetail(4); |
| | | journalEntity.setCreated_utc_datetime(new Date()); |
| | | FieldSetEntity curLogFse = null; |
| | | if (archivingSuccessCount > 0 || !StringUtils.isEmpty(journalEntity.getError()) || journalEntity.getResult() != 1 || !StringUtils.isEmpty(journalEntity.getPre_step_uuid())) { |
| | | curLogFse = journalManagerService.autoCreateJournal(journalEntity); |
| | | } |
| | | |
| | | /** |
| | | * 清理判定,每天切换的时候可以执行一次 |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean canExecuteClear(String tableName) { |
| | | String dateStr = DateUtil.format(DateUtil.date(), "yyyy-MM-dd"); |
| | | final String KEY = "DE_CLEAR_STORE"; |
| | | Object preDate = RedisUtil.getHash(KEY, tableName); |
| | | if (dateStr.equals(preDate)) { |
| | | return false; |
| | | } else { |
| | | RedisUtil.setHash(KEY, tableName, dateStr); |
| | | return true; |
| | | } |
| | | } |
| | | // 将日志表中执行失败的记录标记为已经重新处理 |
| | | WriteUtil.append("DA-将日志表中执行失败的记录标记为已经重新处理"); |
| | | DataTableEntity logDte; |
| | | if (curLogFse == null) { |
| | | logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=?", new Object[]{uuid}); |
| | | } else { |
| | | logDte = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type=5 and result=0 and deal_flag=0 and config_uuid=? and uuid<>?", new Object[]{uuid, curLogFse.getUUID()}); |
| | | } |
| | | for (int i = 0; i < logDte.getRows(); i++) { |
| | | journalManagerService.writeBackReDealResult(logDte.getFieldSetEntity(i), true); |
| | | } |
| | | WriteUtil.append("DA-执行完毕"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 清理主库数据 |
| | | * |
| | | * @param sourceTable 源表名 |
| | | * @param configFse 配置fse |
| | | * @param sourceDao 源dao |
| | | * @param targetDao 目标dao |
| | | * @param uniqueField 唯一字段 |
| | | * @param timeField 更新(归档)时间字段 |
| | | * @param extractTimeField 提取时间字段 |
| | | * @param targetTableSet 目标表集合 |
| | | * @param deleteSubLogUUID 删除子表日志uuid |
| | | * @return |
| | | */ |
| | | private String deleteMasterData(String sourceTable, FieldSetEntity configFse, Dao sourceDao, Dao targetDao, String uniqueField, String timeField, String extractTimeField, Set<String> targetTableSet, String deleteSubLogUUID) { |
| | | FieldSetEntity deleteMasterLogFse = new FieldSetEntity(); |
| | | deleteMasterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); |
| | | TimeInterval deleteMasterLogTimer = DateUtil.timer(); |
| | | String minID = ""; |
| | | String maxID = ""; |
| | | Date statisticsStartTime = null; |
| | | Date statisticsFinalTime = null; |
| | | try { |
| | | StringBuilder sql = new StringBuilder(128); |
| | | sql.append("select count(*) count_value from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) { |
| | | sql.append(" where ").append(configFse.getString("source_select_filter")); |
| | | } else { |
| | | sql.append(" where 1=2"); |
| | | } |
| | | FieldSetEntity paramFse = sourceDao.getOne(sql.toString()); |
| | | WriteUtil.append("DA-sourceDao.getOne(sql.toString())-sql:" + sql); |
| | | int totalCount = StringUtils.isEmpty(paramFse.getString("count_value")) ? 0 : paramFse.getInteger("count_value"); |
| | | int delTotalCount = 0; |
| | | if (totalCount > 0) { |
| | | int pageSize = 1000; |
| | | int totalPage = totalCount / pageSize + (totalCount % pageSize == 0 ? 0 : 1); |
| | | sql.setLength(0); |
| | | sql.append("select ").append(uniqueField).append(" from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) { |
| | | sql.append(" where ").append(configFse.getString("source_select_filter")); |
| | | } |
| | | StringBuilder existSql = new StringBuilder(128); |
| | | existSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet)) |
| | | .append("\nselect ").append(uniqueField).append(" from ").append(sourceTable); |
| | | StringBuilder tempSql = new StringBuilder(128); |
| | | for (int i = 0; i < totalPage; i++) { |
| | | DataTableEntity delDte = sourceDao.getList(sql.toString(), new Object[]{}, 1, pageSize); |
| | | WriteUtil.append("DA-删除的数据-delDte:" + Arrays.toString(delDte.getFieldAllValues(uniqueField))); |
| | | if (DataTableEntity.isEmpty(delDte)) { |
| | | continue; |
| | | } |
| | | // 验证归档库里面存在,仅删除存在,不存在的保留 |
| | | tempSql.setLength(0); |
| | | tempSql.append(existSql); |
| | | tempSql.append(" where ").append(BaseUtil.buildQuestionMarkFilter(uniqueField, delDte.getFieldAllValues(uniqueField), true)); |
| | | WriteUtil.append("DA-tempSql:" + tempSql); |
| | | DataTableEntity existDte = targetDao.getList(tempSql.toString(), new Object[]{}); |
| | | if (existDte.getRows() > 0) { |
| | | FieldSetEntity existFse; |
| | | for (int j = 0; j < existDte.getRows(); j++) { |
| | | existFse = existDte.getFieldSetEntity(j); |
| | | Date updateTime = existFse.getDate(timeField); |
| | | Date extractTime = existFse.getDate(extractTimeField); |
| | | Date date = getAimDate(updateTime, extractTime, 0); |
| | | statisticsStartTime = getAimDate(statisticsStartTime, date, 0); |
| | | statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); |
| | | minID = getAimID(minID, existFse.getString(uniqueField), 0); |
| | | maxID = getAimID(maxID, existFse.getString(uniqueField), 1); |
| | | } |
| | | delTotalCount += existDte.getRows(); |
| | | sourceDao.delete(sourceTable, |
| | | BaseUtil.buildQuestionMarkFilter(uniqueField, existDte.getRows(), true), |
| | | existDte.getData().stream().map(item -> item.getString(uniqueField)).toArray()); |
| | | } |
| | | } |
| | | WriteUtil.append("DA-删除总条数:" + delTotalCount); |
| | | } |
| | | deleteMasterLogFse.setValue(CmnConst.COUNT, delTotalCount); |
| | | deleteMasterLogFse.setValue(CmnConst.RESULT, 1); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | deleteMasterLogFse.setValue(CmnConst.RESULT, 0); |
| | | deleteMasterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e)); |
| | | } finally { |
| | | targetDao.closeConnection(); |
| | | sourceDao.closeConnection(); |
| | | deleteMasterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); |
| | | deleteMasterLogFse.setValue(CmnConst.TYPE, 5); |
| | | deleteMasterLogFse.setValue(CmnConst.DETAIL, 6); |
| | | deleteMasterLogFse.setValue(CmnConst.PRE_STEP_UUID, deleteSubLogUUID); |
| | | deleteMasterLogFse.setValue(CmnConst.DEAL_FLAG, 0); |
| | | deleteMasterLogFse.setValue(CmnConst.DEAL_RESULT, 1); |
| | | deleteMasterLogFse.setValue(CmnConst.MIN_ID, minID); |
| | | deleteMasterLogFse.setValue(CmnConst.MAX_ID, maxID); |
| | | deleteMasterLogFse.setValue(CmnConst.SINGLE_DURATION, deleteMasterLogTimer.intervalMs()); |
| | | deleteMasterLogFse.setValue(CmnConst.CONFIG_UUID, configFse.getUUID()); |
| | | deleteMasterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime); |
| | | deleteMasterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime); |
| | | if ((!StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.COUNT)) && deleteMasterLogFse.getInteger(CmnConst.COUNT) > 0) |
| | | || !StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.ERROR)) || !"1".equals(deleteMasterLogFse.getString(CmnConst.RESULT))) { |
| | | getBaseDao().add(deleteMasterLogFse); |
| | | } |
| | | } |
| | | return deleteMasterLogFse.getUUID(); |
| | | } |
| | | /** |
| | | * 清理归档重复数据 |
| | | * |
| | | * @param sourceTable |
| | | * @param targetTableSet |
| | | * @param uniqueField |
| | | * @param targetDao |
| | | */ |
| | | private DataTableEntity clearArchiveRepeatData(String sourceTable, Set<String> targetTableSet, String uniqueField, Dao targetDao, DataTableEntity list) { |
| | | StringBuilder clearSql = new StringBuilder(128); |
| | | clearSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet, Arrays.asList(uniqueField, "{#table_name#}"), false, " where " + BaseUtil.buildQuestionMarkFilter(uniqueField, list.getFieldAllValues(uniqueField), true))) |
| | | .append("\nselect ").append(uniqueField).append(",_table_name from ").append(sourceTable); |
| | | DataTableEntity clearDte = targetDao.getList(clearSql.toString()); |
| | | if (!DataTableEntity.isEmpty(clearDte)) { |
| | | Set<Object> clearTableSet = Sets.newHashSet(); |
| | | clearTableSet.addAll(Arrays.asList(clearDte.getFieldAllValues("_table_name"))); |
| | | for (Object targetTableName : clearTableSet) { |
| | | targetDao.delete(targetTableName.toString(), |
| | | BaseUtil.buildQuestionMarkFilter(uniqueField, clearDte.getRows(), true), |
| | | clearDte.getData().stream().map(item -> item.getString(uniqueField)).toArray()); |
| | | } |
| | | } |
| | | return clearDte; |
| | | } |
| | | |
| | | /** |
| | | * 获取指定的日期,若有一个为空,那么直接获取另外一个的值;否则按照指定取值 |
| | | * |
| | | * @param d1 |
| | | * @param d2 |
| | | * @param sign 大于0,取两者中大的,就是更靠近当前时间的;否则取小的,就是更远离当前时间的 |
| | | * @return |
| | | */ |
| | | private Date getAimDate(Date d1, Date d2, int sign) { |
| | | if (d1 == null && d2 == null) { |
| | | return null; |
| | | } |
| | | if (d1 == null || d2 == null) { |
| | | return d1 == null ? d2 : d1; |
| | | } |
| | | if (sign > 0) { |
| | | return d1.compareTo(d2) > 0 ? d1 : d2; |
| | | } else { |
| | | return d1.compareTo(d2) > 0 ? d2 : d1; |
| | | } |
| | | } |
| | | /** |
| | | * 清理判定,每天切换的时候可以执行一次 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean canExecuteClear(String tableName) { |
| | | String dateStr = DateUtil.format(DateUtil.date(), "yyyy-MM-dd"); |
| | | final String KEY = "DE_CLEAR_STORE"; |
| | | Object preDate = RedisUtil.getHash(KEY, tableName); |
| | | if (dateStr.equals(preDate)) { |
| | | return false; |
| | | } else { |
| | | RedisUtil.setHash(KEY, tableName, dateStr); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取指定的ID,那么直接获取另外一个的值;否则按照指定取值 |
| | | * |
| | | * @param s1 |
| | | * @param s2 |
| | | * @param sign 大于0,取两者中大的;否则取小的 |
| | | * @return |
| | | */ |
| | | private String getAimID(String s1, String s2, int sign) { |
| | | if (StringUtils.isEmpty(s1) && StringUtils.isEmpty(s2)) { |
| | | return null; |
| | | } |
| | | if (StringUtils.isEmpty(s1) || StringUtils.isEmpty(s2)) { |
| | | return StringUtils.isEmpty(s1) ? s2 : s1; |
| | | } |
| | | String numberRegexp = "\\d{1,11}"; |
| | | if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) { |
| | | boolean b = Long.parseLong(s1) > Long.parseLong(s2); |
| | | if (sign > 0) { |
| | | return b ? s1 : s2; |
| | | } else { |
| | | return b ? s2 : s1; |
| | | } |
| | | } else { |
| | | if (sign > 0) { |
| | | return s1.compareTo(s2) > 0 ? s1 : s2; |
| | | } else { |
| | | return s1.compareTo(s2) > 0 ? s2 : s1; |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * 清理主库数据 |
| | | * |
| | | * @param sourceTable 源表名 |
| | | * @param configFse 配置fse |
| | | * @param sourceDao 源dao |
| | | * @param targetDao 目标dao |
| | | * @param uniqueField 唯一字段 |
| | | * @param timeField 更新(归档)时间字段 |
| | | * @param extractTimeField 提取时间字段 |
| | | * @param targetTableSet 目标表集合 |
| | | * @param deleteSubLogUUID 删除子表日志uuid |
| | | * @return |
| | | */ |
| | | private String deleteMasterData(String sourceTable, FieldSetEntity configFse, Dao sourceDao, Dao targetDao, String uniqueField, String timeField, String extractTimeField, Set<String> targetTableSet, String deleteSubLogUUID) { |
| | | FieldSetEntity deleteMasterLogFse = new FieldSetEntity(); |
| | | deleteMasterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); |
| | | TimeInterval deleteMasterLogTimer = DateUtil.timer(); |
| | | String minID = ""; |
| | | String maxID = ""; |
| | | Date statisticsStartTime = null; |
| | | Date statisticsFinalTime = null; |
| | | try { |
| | | StringBuilder sql = new StringBuilder(128); |
| | | sql.append("select count(*) count_value from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) { |
| | | sql.append(" where ").append(configFse.getString("source_select_filter")); |
| | | } else { |
| | | sql.append(" where 1=2"); |
| | | } |
| | | FieldSetEntity paramFse = sourceDao.getOne(sql.toString()); |
| | | WriteUtil.append("DA-sourceDao.getOne(sql.toString())-sql:" + sql); |
| | | int totalCount = StringUtils.isEmpty(paramFse.getString("count_value")) ? 0 : paramFse.getInteger("count_value"); |
| | | int delTotalCount = 0; |
| | | if (totalCount > 0) { |
| | | int pageSize = 1000; |
| | | int totalPage = totalCount / pageSize + (totalCount % pageSize == 0 ? 0 : 1); |
| | | sql.setLength(0); |
| | | sql.append("select ").append(uniqueField).append(" from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(configFse.getString("source_select_filter"))) { |
| | | sql.append(" where ").append(configFse.getString("source_select_filter")); |
| | | } |
| | | StringBuilder existSql = new StringBuilder(128); |
| | | existSql.append("with ").append(QuerySqlParseUtil.getUnionTableSql(sourceTable, targetTableSet)) |
| | | .append("\nselect ").append(uniqueField).append(" from ").append(sourceTable); |
| | | StringBuilder tempSql = new StringBuilder(128); |
| | | for (int i = 0; i < totalPage; i++) { |
| | | DataTableEntity delDte = sourceDao.getList(sql.toString(), new Object[]{}, 1, pageSize); |
| | | WriteUtil.append("DA-删除的数据-delDte:" + Arrays.toString(delDte.getFieldAllValues(uniqueField))); |
| | | if (DataTableEntity.isEmpty(delDte)) { |
| | | continue; |
| | | } |
| | | // 验证归档库里面存在,仅删除存在,不存在的保留 |
| | | tempSql.setLength(0); |
| | | tempSql.append(existSql); |
| | | tempSql.append(" where ").append(BaseUtil.buildQuestionMarkFilter(uniqueField, delDte.getFieldAllValues(uniqueField), true)); |
| | | WriteUtil.append("DA-tempSql:" + tempSql); |
| | | DataTableEntity existDte = targetDao.getList(tempSql.toString(), new Object[]{}); |
| | | if (existDte.getRows() > 0) { |
| | | FieldSetEntity existFse; |
| | | for (int j = 0; j < existDte.getRows(); j++) { |
| | | existFse = existDte.getFieldSetEntity(j); |
| | | Date updateTime = existFse.getDate(timeField); |
| | | Date extractTime = existFse.getDate(extractTimeField); |
| | | Date date = getAimDate(updateTime, extractTime, 0); |
| | | statisticsStartTime = getAimDate(statisticsStartTime, date, 0); |
| | | statisticsFinalTime = getAimDate(statisticsFinalTime, date, 1); |
| | | minID = getAimID(minID, existFse.getString(uniqueField), 0); |
| | | maxID = getAimID(maxID, existFse.getString(uniqueField), 1); |
| | | } |
| | | delTotalCount += existDte.getRows(); |
| | | sourceDao.delete(sourceTable, |
| | | BaseUtil.buildQuestionMarkFilter(uniqueField, existDte.getRows(), true), |
| | | existDte.getData().stream().map(item -> item.getString(uniqueField)).toArray()); |
| | | } |
| | | } |
| | | WriteUtil.append("DA-删除总条数:" + delTotalCount); |
| | | } |
| | | deleteMasterLogFse.setValue(CmnConst.COUNT, delTotalCount); |
| | | deleteMasterLogFse.setValue(CmnConst.RESULT, 1); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | deleteMasterLogFse.setValue(CmnConst.RESULT, 0); |
| | | deleteMasterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e)); |
| | | } finally { |
| | | targetDao.closeConnection(); |
| | | sourceDao.closeConnection(); |
| | | deleteMasterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); |
| | | deleteMasterLogFse.setValue(CmnConst.TYPE, 5); |
| | | deleteMasterLogFse.setValue(CmnConst.DETAIL, 6); |
| | | deleteMasterLogFse.setValue(CmnConst.PRE_STEP_UUID, deleteSubLogUUID); |
| | | deleteMasterLogFse.setValue(CmnConst.DEAL_FLAG, 0); |
| | | deleteMasterLogFse.setValue(CmnConst.DEAL_RESULT, 1); |
| | | deleteMasterLogFse.setValue(CmnConst.MIN_ID, minID); |
| | | deleteMasterLogFse.setValue(CmnConst.MAX_ID, maxID); |
| | | deleteMasterLogFse.setValue(CmnConst.SINGLE_DURATION, deleteMasterLogTimer.intervalMs()); |
| | | deleteMasterLogFse.setValue(CmnConst.CONFIG_UUID, configFse.getUUID()); |
| | | deleteMasterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime); |
| | | deleteMasterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime); |
| | | if ((!StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.COUNT)) && deleteMasterLogFse.getInteger(CmnConst.COUNT) > 0) |
| | | || !StringUtils.isEmpty(deleteMasterLogFse.getString(CmnConst.ERROR)) || !"1".equals(deleteMasterLogFse.getString(CmnConst.RESULT))) { |
| | | getBaseDao().add(deleteMasterLogFse); |
| | | } |
| | | } |
| | | return deleteMasterLogFse.getUUID(); |
| | | } |
| | | |
| | | /** |
| | | * 将dte按照年份分组 |
| | | * |
| | | * @param dte |
| | | * @param timeField 时间字段 |
| | | * @param splitTableType 分表方式,0-年,1-月 |
| | | * @return Map<时间, List < dte数据>> |
| | | */ |
| | | private Map<String, List<DataTableEntity>> dteGroupByTime(DataTableEntity dte, String timeField, String splitTableType) { |
| | | Map<String, List<DataTableEntity>> groupDteMap = Maps.newHashMap(); |
| | | FieldSetEntity fse; |
| | | String time; |
| | | List<DataTableEntity> groupDteList; |
| | | if (StringUtils.isEmpty(timeField)) { |
| | | groupDteList = Lists.newArrayList(); |
| | | groupDte(groupDteList, dte); |
| | | groupDteMap.put("0", groupDteList); |
| | | } else { |
| | | SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM"); |
| | | for (int i = 0; i < dte.getRows(); i++) { |
| | | fse = dte.getFieldSetEntity(i); |
| | | if (fse.getDate(timeField) == null) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getValue(), ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getText() + "\ntable_name:" + fse.getTableName() + "\ndata:" + fse.getValues()); |
| | | } |
| | | if ("1".equals(splitTableType)) { |
| | | time = dateFormat.format(fse.getDate(timeField)); |
| | | } else { |
| | | time = String.valueOf(DateUtil.year(fse.getDate(timeField))); |
| | | } |
| | | groupDteList = groupDteMap.computeIfAbsent(time, k -> Lists.newArrayList()); |
| | | groupAddDte(groupDteList, fse); |
| | | } |
| | | } |
| | | return groupDteMap; |
| | | } |
| | | /** |
| | | * 获取指定的日期,若有一个为空,那么直接获取另外一个的值;否则按照指定取值 |
| | | * |
| | | * @param d1 |
| | | * @param d2 |
| | | * @param sign 大于0,取两者中大的,就是更靠近当前时间的;否则取小的,就是更远离当前时间的 |
| | | * @return |
| | | */ |
| | | private Date getAimDate(Date d1, Date d2, int sign) { |
| | | if (d1 == null && d2 == null) { |
| | | return null; |
| | | } |
| | | if (d1 == null || d2 == null) { |
| | | return d1 == null ? d2 : d1; |
| | | } |
| | | if (sign > 0) { |
| | | return d1.compareTo(d2) > 0 ? d1 : d2; |
| | | } else { |
| | | return d1.compareTo(d2) > 0 ? d2 : d1; |
| | | } |
| | | } |
| | | |
| | | private void groupDte(List<DataTableEntity> list, DataTableEntity allDte) { |
| | | if (list == null) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_GROUP_CONTAINER_IS_NULL); |
| | | } |
| | | for (int i = 0; i < allDte.getRows(); i++) { |
| | | groupAddDte(list, allDte.getFieldSetEntity(i)); |
| | | } |
| | | } |
| | | /** |
| | | * 获取指定的ID,那么直接获取另外一个的值;否则按照指定取值 |
| | | * |
| | | * @param s1 |
| | | * @param s2 |
| | | * @param sign 大于0,取两者中大的;否则取小的 |
| | | * @return |
| | | */ |
| | | private String getAimID(String s1, String s2, int sign) { |
| | | if (StringUtils.isEmpty(s1) && StringUtils.isEmpty(s2)) { |
| | | return null; |
| | | } |
| | | if (StringUtils.isEmpty(s1) || StringUtils.isEmpty(s2)) { |
| | | return StringUtils.isEmpty(s1) ? s2 : s1; |
| | | } |
| | | String numberRegexp = "\\d{1,11}"; |
| | | if (s1.matches(numberRegexp) && s2.matches(numberRegexp)) { |
| | | boolean b = Long.parseLong(s1) > Long.parseLong(s2); |
| | | if (sign > 0) { |
| | | return b ? s1 : s2; |
| | | } else { |
| | | return b ? s2 : s1; |
| | | } |
| | | } else { |
| | | if (sign > 0) { |
| | | return s1.compareTo(s2) > 0 ? s1 : s2; |
| | | } else { |
| | | return s1.compareTo(s2) > 0 ? s2 : s1; |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void groupAddDte(List<DataTableEntity> list, FieldSetEntity fse) { |
| | | DataTableEntity dte; |
| | | if (list.isEmpty()) { |
| | | dte = new DataTableEntity(); |
| | | list.add(dte); |
| | | } else { |
| | | dte = list.get(list.size() - 1); |
| | | if (dte.getRows() >= DataArchivingQueue.INSERT_PAGE_SIZE) { |
| | | dte = new DataTableEntity(); |
| | | list.add(dte); |
| | | } |
| | | } |
| | | dte.addFieldSetEntity(fse); |
| | | } |
| | | /** |
| | | * 将dte按照年份分组 |
| | | * |
| | | * @param dte |
| | | * @param timeField 时间字段 |
| | | * @param splitTableType 分表方式,0-年,1-月 |
| | | * @return Map<时间, List < dte数据>> |
| | | */ |
| | | private Map<String, List<DataTableEntity>> dteGroupByTime(DataTableEntity dte, String timeField, String splitTableType) { |
| | | Map<String, List<DataTableEntity>> groupDteMap = Maps.newHashMap(); |
| | | FieldSetEntity fse; |
| | | String time; |
| | | List<DataTableEntity> groupDteList; |
| | | if (StringUtils.isEmpty(timeField)) { |
| | | groupDteList = Lists.newArrayList(); |
| | | groupDte(groupDteList, dte); |
| | | groupDteMap.put("0", groupDteList); |
| | | } else { |
| | | SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMM"); |
| | | for (int i = 0; i < dte.getRows(); i++) { |
| | | fse = dte.getFieldSetEntity(i); |
| | | if (fse.getDate(timeField) == null) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getValue(), ErrorCode.DATA_ARCHIVE_FAIL_NO_SPLIT_TABLE_DATE.getText() + "\ntable_name:" + fse.getTableName() + "\ndata:" + fse.getValues()); |
| | | } |
| | | if ("1".equals(splitTableType)) { |
| | | time = dateFormat.format(fse.getDate(timeField)); |
| | | } else { |
| | | time = String.valueOf(DateUtil.year(fse.getDate(timeField))); |
| | | } |
| | | groupDteList = groupDteMap.computeIfAbsent(time, k -> Lists.newArrayList()); |
| | | groupAddDte(groupDteList, fse); |
| | | } |
| | | } |
| | | return groupDteMap; |
| | | } |
| | | |
| | | private void groupDte(List<DataTableEntity> list, DataTableEntity allDte) { |
| | | if (list == null) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_GROUP_CONTAINER_IS_NULL); |
| | | } |
| | | for (int i = 0; i < allDte.getRows(); i++) { |
| | | groupAddDte(list, allDte.getFieldSetEntity(i)); |
| | | } |
| | | } |
| | | |
| | | private void groupAddDte(List<DataTableEntity> list, FieldSetEntity fse) { |
| | | DataTableEntity dte; |
| | | if (list.isEmpty()) { |
| | | dte = new DataTableEntity(); |
| | | list.add(dte); |
| | | } else { |
| | | dte = list.get(list.size() - 1); |
| | | if (dte.getRows() >= DataArchivingQueue.INSERT_PAGE_SIZE) { |
| | | dte = new DataTableEntity(); |
| | | list.add(dte); |
| | | } |
| | | } |
| | | dte.addFieldSetEntity(fse); |
| | | } |
| | | |
| | | |
| | | class DataArchivingServiceImpl { |
| | | class DataArchivingServiceImpl { |
| | | |
| | | private Dao sourceDao; |
| | | private Dao targetDao; |
| | | private String sourceTable; |
| | | private String configUid; |
| | | private String sourceDbName; |
| | | private String targetDbName; |
| | | private Dao sourceDao; |
| | | private Dao targetDao; |
| | | private String sourceTable; |
| | | private String configUid; |
| | | private String sourceDbName; |
| | | private String targetDbName; |
| | | |
| | | public DataArchivingServiceImpl(Dao sourceDao, Dao targetDao, String sourceTable, String configUid, String sourceDbName, String targetDbName) { |
| | | this.sourceDao = sourceDao; |
| | | this.targetDao = targetDao; |
| | | this.sourceTable = sourceTable; |
| | | this.configUid = configUid; |
| | | this.sourceDbName = sourceDbName; |
| | | this.targetDbName = targetDbName; |
| | | } |
| | | public DataArchivingServiceImpl(Dao sourceDao, Dao targetDao, String sourceTable, String configUid, String sourceDbName, String targetDbName) { |
| | | this.sourceDao = sourceDao; |
| | | this.targetDao = targetDao; |
| | | this.sourceTable = sourceTable; |
| | | this.configUid = configUid; |
| | | this.sourceDbName = sourceDbName; |
| | | this.targetDbName = targetDbName; |
| | | } |
| | | |
| | | /** |
| | | * 验证数据表是否存在 |
| | | * |
| | | * @param tableName 数据表名 |
| | | * @return |
| | | */ |
| | | public boolean dataTableIsExists(String tableName) throws BaseException { |
| | | try { |
| | | Connection connection = targetDao.getConnection(); |
| | | DatabaseMetaData metaData = targetDao.getConnection().getMetaData(); |
| | | ResultSet tables = metaData.getTables(null, null, tableName, new String[]{"TABLE"}); |
| | | boolean result = tables.next(); |
| | | tables.close(); |
| | | connection.close(); |
| | | return result; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | throw new BaseException(e); |
| | | } |
| | | } |
| | | /** |
| | | * 验证数据表是否存在 |
| | | * |
| | | * @param tableName 数据表名 |
| | | * @return |
| | | */ |
| | | public boolean dataTableIsExists(String tableName) throws BaseException { |
| | | try { |
| | | Connection connection = targetDao.getConnection(); |
| | | DatabaseMetaData metaData = targetDao.getConnection().getMetaData(); |
| | | ResultSet tables = metaData.getTables(null, null, tableName, new String[]{"TABLE"}); |
| | | boolean result = tables.next(); |
| | | tables.close(); |
| | | connection.close(); |
| | | return result; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | throw new BaseException(e); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 创建表 |
| | | * |
| | | * @param prefix 表名前缀 |
| | | */ |
| | | public String createTable(String prefix, String time, DataBaseEntity dbe) throws BaseException { |
| | | String tableName; |
| | | if (!"0".equals(time)) { |
| | | tableName = prefix + (prefix.lastIndexOf("_") != prefix.length() - 1 ? "_" : "") + time; |
| | | } else { |
| | | tableName = prefix.endsWith("_") ? prefix.substring(0, prefix.length() - 1) : prefix; |
| | | } |
| | | if (dataTableIsExists(tableName)) { |
| | | return tableName; |
| | | } |
| | | JSONObject tableInfoObj = getTableInfo(tableName); |
| | | String sql = getSql(tableInfoObj); |
| | | SpringMVCContextHolder.getSystemLogger().error("sql:\n" + sql); |
| | | //先创建记录再执行ddl语句不然报错后ddl无法回滚 |
| | | saveCreateTableRecord(tableName, time); |
| | | targetDao.executeSql(sql); |
| | | /** |
| | | * 创建表 |
| | | * |
| | | * @param prefix 表名前缀 |
| | | */ |
| | | public String createTable(String prefix, String time, DataBaseEntity dbe) throws BaseException { |
| | | String tableName; |
| | | if (!"0".equals(time)) { |
| | | tableName = prefix + (prefix.lastIndexOf("_") != prefix.length() - 1 ? "_" : "") + time; |
| | | } else { |
| | | tableName = prefix.endsWith("_") ? prefix.substring(0, prefix.length() - 1) : prefix; |
| | | } |
| | | if (dataTableIsExists(tableName)) { |
| | | return tableName; |
| | | } |
| | | JSONObject tableInfoObj = getTableInfo(tableName); |
| | | String sql = getSql(tableInfoObj); |
| | | SpringMVCContextHolder.getSystemLogger().error("sql:\n" + sql); |
| | | //先创建记录再执行ddl语句不然报错后ddl无法回滚 |
| | | saveCreateTableRecord(tableName, time); |
| | | targetDao.executeSql(sql); |
| | | // syncDataConfigService.addTableField(dbe,dbe.getUuid(),tableName); |
| | | return tableName; |
| | | } |
| | | return tableName; |
| | | } |
| | | |
| | | /** |
| | | * 根据表名获取来源数据源对应表的结构信息 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | private JSONObject getTableInfo(String tableName) { |
| | | JSONObject tableInfoObj = new JSONObject(); |
| | | tableInfoObj.put(CmnConst.NAME, tableName); |
| | | DataTableEntity tempDte; |
| | | FieldSetEntity tempFse; |
| | | JSONObject fieldInfoObj = new JSONObject(new LinkedHashMap<>()); |
| | | tableInfoObj.put(CmnConst.FIELD, fieldInfoObj); |
| | | JSONObject indexInfoObj = new JSONObject(new LinkedHashMap<>()); |
| | | tableInfoObj.put(CmnConst.INDEX, indexInfoObj); |
| | | JSONObject singleFieldInfoObj; |
| | | if (DataBaseType.MYSQL.equals(sourceDao.getDataBaseType())) { |
| | | // mysql |
| | | // 表 |
| | | tempFse = sourceDao.getOne("information_schema.`TABLES`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}); |
| | | tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("table_comment")); |
| | | // 字段表 |
| | | tempDte = sourceDao.getList("information_schema.`COLUMNS`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}, "ordinal_position", 1, Integer.MAX_VALUE); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type")); |
| | | singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("character_maximum_length")) ? tempFse.getString("numeric_precision") : tempFse.getString("character_maximum_length")); |
| | | singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("numeric_scale")); |
| | | singleFieldInfoObj.put(CmnConst.NULLABLE, "NO".equalsIgnoreCase(tempFse.getString("is_nullable")) ? 0 : 1); |
| | | singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("column_comment")); |
| | | } |
| | | // 索引表 |
| | | StringBuilder sql = new StringBuilder(128); |
| | | sql.append("select index_name,non_unique,group_concat(column_name) column_name"); |
| | | sql.append("\nfrom information_schema.`STATISTICS`"); |
| | | sql.append("\nwhere table_schema=? and table_name=?"); |
| | | sql.append("\ngroup by index_name,non_unique"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, sourceTable}); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, "PRIMARY".equalsIgnoreCase(tempFse.getString("index_name")) ? CmnConst.PRIMARY : ("1".equals(tempFse.getString("non_unique")) ? CmnConst.NORMAL : CmnConst.UNIQUE)); |
| | | singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); |
| | | } |
| | | } else if (DataBaseType.ORACLE.equals(sourceDao.getDataBaseType())) { |
| | | // oracle |
| | | // 表 |
| | | String upperTableName = sourceTable.toUpperCase(); |
| | | tempFse = sourceDao.getOne("SYS.USER_TAB_COMMENTS", "table_name=?", new Object[]{upperTableName}); |
| | | tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments")); |
| | | // 字段表 |
| | | StringBuilder sql = new StringBuilder(128); |
| | | sql.append("SELECT TC.COLUMN_NAME,DATA_TYPE,DATA_LENGTH,DATA_PRECISION,DATA_SCALE,NULLABLE,CHAR_LENGTH,COMMENTS FROM SYS.USER_TAB_COLUMNS TC"); |
| | | sql.append("\nLEFT JOIN USER_COL_COMMENTS CC ON TC.TABLE_NAME=CC.TABLE_NAME AND TC.COLUMN_NAME=CC.COLUMN_NAME"); |
| | | sql.append("\nWHERE TC.TABLE_NAME=?"); |
| | | sql.append("\nORDER BY TC.COLUMN_ID"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{upperTableName}); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type").contains("VARCHAR") ? "varchar" : tempFse.getString("data_type")); |
| | | singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("data_precision")) |
| | | ? tempFse.getString("char_length") : tempFse.getString("data_precision")); |
| | | singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("data_scale")); |
| | | singleFieldInfoObj.put(CmnConst.NULLABLE, "N".equalsIgnoreCase(tempFse.getString("nullable")) ? 0 : 1); |
| | | singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments")); |
| | | } |
| | | // 索引表 |
| | | sql.setLength(0); |
| | | sql.append("SELECT DIC.INDEX_NAME,WM_CONCAT(DIC.COLUMN_NAME) column_name FROM SYS.DBA_IND_COLUMNS DIC"); |
| | | sql.append("\nLEFT JOIN SYS.DBA_INDEXES DI ON DIC.INDEX_NAME=DI.INDEX_NAME"); |
| | | sql.append("\nWHERE UNIQUENESS='NONUNIQUE' AND DIC.TABLE_OWNER=? AND DI.TABLE_NAME=?"); |
| | | sql.append("\nGROUP BY DIC.INDEX_NAME"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName}); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, CmnConst.NORMAL); |
| | | singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); |
| | | } |
| | | // 约束表 C-检查,写到字段里面;R-外键,不要;P-主键;U-唯一键 |
| | | sql.setLength(0); |
| | | sql.append("SELECT DC.CONSTRAINT_NAME index_name,CONSTRAINT_TYPE,LISTAGG(COLUMN_NAME, ',') WITHIN GROUP(ORDER BY DCC.POSITION) column_name FROM SYS.DBA_CONS_COLUMNS DCC"); |
| | | sql.append("\nLEFT JOIN SYS.DBA_CONSTRAINTS DC ON DCC.CONSTRAINT_NAME=DC.CONSTRAINT_NAME"); |
| | | sql.append("\nWHERE DCC.OWNER=? AND DCC.TABLE_NAME=? AND CONSTRAINT_TYPE IN ('P','U')"); |
| | | sql.append("\nGROUP BY DC.CONSTRAINT_NAME,CONSTRAINT_TYPE"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName}); |
| | | WriteUtil.append("DA-DDL-:" + sql + " |||库名:" + sourceDbName + " |||表名:" + upperTableName); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, "P".equalsIgnoreCase(tempFse.getString("constraint_type")) ? CmnConst.PRIMARY : CmnConst.UNIQUE); |
| | | singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); |
| | | } |
| | | WriteUtil.append("DA-DDL-创表信息:" + tableInfoObj); |
| | | } else { |
| | | throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL); |
| | | } |
| | | return tableInfoObj; |
| | | } |
| | | /** |
| | | * 根据表名获取来源数据源对应表的结构信息 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | private JSONObject getTableInfo(String tableName) { |
| | | JSONObject tableInfoObj = new JSONObject(); |
| | | tableInfoObj.put(CmnConst.NAME, tableName); |
| | | DataTableEntity tempDte; |
| | | FieldSetEntity tempFse; |
| | | JSONObject fieldInfoObj = new JSONObject(new LinkedHashMap<>()); |
| | | tableInfoObj.put(CmnConst.FIELD, fieldInfoObj); |
| | | JSONObject indexInfoObj = new JSONObject(new LinkedHashMap<>()); |
| | | tableInfoObj.put(CmnConst.INDEX, indexInfoObj); |
| | | JSONObject singleFieldInfoObj; |
| | | if (DataBaseType.MYSQL.equals(sourceDao.getDataBaseType())) { |
| | | // mysql |
| | | // 表 |
| | | tempFse = sourceDao.getOne("information_schema.`TABLES`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}); |
| | | tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("table_comment")); |
| | | // 字段表 |
| | | tempDte = sourceDao.getList("information_schema.`COLUMNS`", "table_schema=? and table_name=?", new Object[]{sourceDbName, sourceTable}, "ordinal_position", 1, Integer.MAX_VALUE); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type")); |
| | | singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("character_maximum_length")) ? tempFse.getString("numeric_precision") : tempFse.getString("character_maximum_length")); |
| | | singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("numeric_scale")); |
| | | singleFieldInfoObj.put(CmnConst.NULLABLE, "NO".equalsIgnoreCase(tempFse.getString("is_nullable")) ? 0 : 1); |
| | | singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("column_comment")); |
| | | } |
| | | // 索引表 |
| | | StringBuilder sql = new StringBuilder(128); |
| | | sql.append("select index_name,non_unique,group_concat(column_name) column_name"); |
| | | sql.append("\nfrom information_schema.`STATISTICS`"); |
| | | sql.append("\nwhere table_schema=? and table_name=?"); |
| | | sql.append("\ngroup by index_name,non_unique"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, sourceTable}); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, "PRIMARY".equalsIgnoreCase(tempFse.getString("index_name")) ? CmnConst.PRIMARY : ("1".equals(tempFse.getString("non_unique")) ? CmnConst.NORMAL : CmnConst.UNIQUE)); |
| | | singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); |
| | | } |
| | | } else if (DataBaseType.ORACLE.equals(sourceDao.getDataBaseType())) { |
| | | // oracle |
| | | // 表 |
| | | String upperTableName = sourceTable.toUpperCase(); |
| | | tempFse = sourceDao.getOne("SYS.USER_TAB_COMMENTS", "table_name=?", new Object[]{upperTableName}); |
| | | tableInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments")); |
| | | // 字段表 |
| | | StringBuilder sql = new StringBuilder(128); |
| | | sql.append("SELECT TC.COLUMN_NAME,DATA_TYPE,DATA_LENGTH,DATA_PRECISION,DATA_SCALE,NULLABLE,CHAR_LENGTH,COMMENTS FROM SYS.USER_TAB_COLUMNS TC"); |
| | | sql.append("\nLEFT JOIN USER_COL_COMMENTS CC ON TC.TABLE_NAME=CC.TABLE_NAME AND TC.COLUMN_NAME=CC.COLUMN_NAME"); |
| | | sql.append("\nWHERE TC.TABLE_NAME=?"); |
| | | sql.append("\nORDER BY TC.COLUMN_ID"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{upperTableName}); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | fieldInfoObj.put(tempFse.getString("column_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, tempFse.getString("data_type").contains("VARCHAR") ? "varchar" : tempFse.getString("data_type")); |
| | | singleFieldInfoObj.put(CmnConst.INTEGER, StringUtils.isEmpty(tempFse.getString("data_precision")) |
| | | ? tempFse.getString("char_length") : tempFse.getString("data_precision")); |
| | | singleFieldInfoObj.put(CmnConst.DECIMAL, tempFse.getString("data_scale")); |
| | | singleFieldInfoObj.put(CmnConst.NULLABLE, "N".equalsIgnoreCase(tempFse.getString("nullable")) ? 0 : 1); |
| | | singleFieldInfoObj.put(CmnConst.COMMENT, tempFse.getString("comments")); |
| | | } |
| | | // 索引表 |
| | | sql.setLength(0); |
| | | sql.append("SELECT DIC.INDEX_NAME,WM_CONCAT(DIC.COLUMN_NAME) column_name FROM SYS.DBA_IND_COLUMNS DIC"); |
| | | sql.append("\nLEFT JOIN SYS.DBA_INDEXES DI ON DIC.INDEX_NAME=DI.INDEX_NAME"); |
| | | sql.append("\nWHERE UNIQUENESS='NONUNIQUE' AND DIC.TABLE_OWNER=? AND DI.TABLE_NAME=?"); |
| | | sql.append("\nGROUP BY DIC.INDEX_NAME"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName}); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, CmnConst.NORMAL); |
| | | singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); |
| | | } |
| | | // 约束表 C-检查,写到字段里面;R-外键,不要;P-主键;U-唯一键 |
| | | sql.setLength(0); |
| | | sql.append("SELECT DC.CONSTRAINT_NAME index_name,CONSTRAINT_TYPE,LISTAGG(COLUMN_NAME, ',') WITHIN GROUP(ORDER BY DCC.POSITION) column_name FROM SYS.DBA_CONS_COLUMNS DCC"); |
| | | sql.append("\nLEFT JOIN SYS.DBA_CONSTRAINTS DC ON DCC.CONSTRAINT_NAME=DC.CONSTRAINT_NAME"); |
| | | sql.append("\nWHERE DCC.OWNER=? AND DCC.TABLE_NAME=? AND CONSTRAINT_TYPE IN ('P','U')"); |
| | | sql.append("\nGROUP BY DC.CONSTRAINT_NAME,CONSTRAINT_TYPE"); |
| | | tempDte = sourceDao.getList(sql.toString(), new Object[]{sourceDbName, upperTableName}); |
| | | WriteUtil.append("DA-DDL-:" + sql + " |||库名:" + sourceDbName + " |||表名:" + upperTableName); |
| | | for (int i = 0; i < tempDte.getRows(); i++) { |
| | | tempFse = tempDte.getFieldSetEntity(i); |
| | | singleFieldInfoObj = new JSONObject(); |
| | | indexInfoObj.put(tempFse.getString("index_name"), singleFieldInfoObj); |
| | | singleFieldInfoObj.put(CmnConst.TYPE, "P".equalsIgnoreCase(tempFse.getString("constraint_type")) ? CmnConst.PRIMARY : CmnConst.UNIQUE); |
| | | singleFieldInfoObj.put(CmnConst.COLUMN_NAME, tempFse.getString("column_name")); |
| | | } |
| | | WriteUtil.append("DA-DDL-创表信息:" + tableInfoObj); |
| | | } else { |
| | | throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL); |
| | | } |
| | | return tableInfoObj; |
| | | } |
| | | |
| | | /** |
| | | * 根据表结构信息,拼接DDL创建表sql语句 |
| | | * |
| | | * @param tableInfoObj |
| | | * @return |
| | | */ |
| | | private String getSql(JSONObject tableInfoObj) { |
| | | StringBuilder sql = new StringBuilder(128); |
| | | JSONObject fieldInfoObj = tableInfoObj.getJSONObject(CmnConst.FIELD); |
| | | JSONObject indexInfoObj = tableInfoObj.getJSONObject(CmnConst.INDEX); |
| | | JSONObject singleFieldInfoObj; |
| | | sql.append("CREATE TABLE ").append(tableInfoObj.getString(CmnConst.NAME)).append(" ("); |
| | | if (DataBaseType.MYSQL.equals(targetDao.getDataBaseType())) { |
| | | // mysql |
| | | for (String field : fieldInfoObj.keySet()) { |
| | | singleFieldInfoObj = fieldInfoObj.getJSONObject(field); |
| | | sql.append("\n `").append(field.toLowerCase(Locale.ROOT)).append("` "); |
| | | if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("timestamp") || singleFieldInfoObj.getString(CmnConst.TYPE).contains("TIMESTAMP")) { |
| | | sql.append("timestamp "); |
| | | } else if (singleFieldInfoObj.getIntValue(CmnConst.DECIMAL) > 0) { |
| | | sql.append("decimal(").append(singleFieldInfoObj.getString(CmnConst.INTEGER)).append(",").append(singleFieldInfoObj.getString(CmnConst.DECIMAL)).append(") "); |
| | | } else if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("int")) { |
| | | sql.append("int(0) "); |
| | | } else if ("number".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | if ("0".equals(singleFieldInfoObj.getString(CmnConst.DECIMAL))) { |
| | | sql.append("int(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") "); |
| | | } else { |
| | | sql.append("decimal(22,4) "); |
| | | } |
| | | } else if ("date".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("datetime(0) "); |
| | | } else if ("blob".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("blob "); |
| | | } else if ("text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT).endsWith("text") || |
| | | (("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) && singleFieldInfoObj.getIntValue(CmnConst.INTEGER) >= 4000) |
| | | ) { |
| | | sql.append("text "); |
| | | } else { |
| | | sql.append(singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT)).append("(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") "); |
| | | } |
| | | if ("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append(" CHARACTER SET utf8mb4 COLLATE utf8mb4_bin "); |
| | | } |
| | | if ("0".equals(singleFieldInfoObj.getString(CmnConst.NULLABLE))) { |
| | | sql.append("not null "); |
| | | } |
| | | if (!StringUtils.isEmpty(singleFieldInfoObj.getString(CmnConst.COMMENT))) { |
| | | sql.append(" comment '").append(singleFieldInfoObj.getString(CmnConst.COMMENT)).append("'"); |
| | | } |
| | | sql.append(","); |
| | | } |
| | | for (String indexName : indexInfoObj.keySet()) { |
| | | singleFieldInfoObj = indexInfoObj.getJSONObject(indexName); |
| | | if (CmnConst.PRIMARY.equalsIgnoreCase(indexName) || CmnConst.PRIMARY.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("\n PRIMARY KEY (`").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append("`) USING BTREE,"); |
| | | } else if (CmnConst.UNIQUE.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("\n UNIQUE INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,"); |
| | | } else if (CmnConst.NORMAL.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("\n INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,"); |
| | | } |
| | | } |
| | | sql.deleteCharAt(sql.length() - 1); |
| | | sql.append("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin "); |
| | | if (!StringUtils.isEmpty(tableInfoObj.getString(CmnConst.COMMENT))) { |
| | | sql.append(" COMMENT = '").append(tableInfoObj.getString(CmnConst.COMMENT)).append("'"); |
| | | } |
| | | } else { |
| | | throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL); |
| | | } |
| | | return sql.toString(); |
| | | } |
| | | /** |
| | | * 根据表结构信息,拼接DDL创建表sql语句 |
| | | * |
| | | * @param tableInfoObj |
| | | * @return |
| | | */ |
| | | private String getSql(JSONObject tableInfoObj) { |
| | | StringBuilder sql = new StringBuilder(128); |
| | | JSONObject fieldInfoObj = tableInfoObj.getJSONObject(CmnConst.FIELD); |
| | | JSONObject indexInfoObj = tableInfoObj.getJSONObject(CmnConst.INDEX); |
| | | JSONObject singleFieldInfoObj; |
| | | sql.append("CREATE TABLE ").append(tableInfoObj.getString(CmnConst.NAME)).append(" ("); |
| | | if (DataBaseType.MYSQL.equals(targetDao.getDataBaseType())) { |
| | | // mysql |
| | | for (String field : fieldInfoObj.keySet()) { |
| | | singleFieldInfoObj = fieldInfoObj.getJSONObject(field); |
| | | sql.append("\n `").append(field.toLowerCase(Locale.ROOT)).append("` "); |
| | | if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("timestamp") || singleFieldInfoObj.getString(CmnConst.TYPE).contains("TIMESTAMP")) { |
| | | sql.append("timestamp "); |
| | | } else if (singleFieldInfoObj.getIntValue(CmnConst.DECIMAL) > 0) { |
| | | sql.append("decimal(").append(singleFieldInfoObj.getString(CmnConst.INTEGER)).append(",").append(singleFieldInfoObj.getString(CmnConst.DECIMAL)).append(") "); |
| | | } else if (singleFieldInfoObj.getString(CmnConst.TYPE).contains("int")) { |
| | | sql.append("int(0) "); |
| | | } else if ("number".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | if ("0".equals(singleFieldInfoObj.getString(CmnConst.DECIMAL))) { |
| | | sql.append("int(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") "); |
| | | } else { |
| | | sql.append("decimal(22,4) "); |
| | | } |
| | | } else if ("date".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("datetime(0) "); |
| | | } else if ("blob".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("blob "); |
| | | } else if ("text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT).endsWith("text") || |
| | | (("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) && singleFieldInfoObj.getIntValue(CmnConst.INTEGER) >= 4000) |
| | | ) { |
| | | sql.append("text "); |
| | | } else { |
| | | sql.append(singleFieldInfoObj.getString(CmnConst.TYPE).toLowerCase(Locale.ROOT)).append("(").append(singleFieldInfoObj.getIntValue(CmnConst.INTEGER)).append(") "); |
| | | } |
| | | if ("varchar".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "varchar2".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE)) || "text".equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append(" CHARACTER SET utf8mb4 COLLATE utf8mb4_bin "); |
| | | } |
| | | if ("0".equals(singleFieldInfoObj.getString(CmnConst.NULLABLE))) { |
| | | sql.append("not null "); |
| | | } |
| | | if (!StringUtils.isEmpty(singleFieldInfoObj.getString(CmnConst.COMMENT))) { |
| | | sql.append(" comment '").append(singleFieldInfoObj.getString(CmnConst.COMMENT)).append("'"); |
| | | } |
| | | sql.append(","); |
| | | } |
| | | for (String indexName : indexInfoObj.keySet()) { |
| | | singleFieldInfoObj = indexInfoObj.getJSONObject(indexName); |
| | | if (CmnConst.PRIMARY.equalsIgnoreCase(indexName) || CmnConst.PRIMARY.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("\n PRIMARY KEY (`").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append("`) USING BTREE,"); |
| | | } else if (CmnConst.UNIQUE.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("\n UNIQUE INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,"); |
| | | } else if (CmnConst.NORMAL.equalsIgnoreCase(singleFieldInfoObj.getString(CmnConst.TYPE))) { |
| | | sql.append("\n INDEX `").append(indexName).append("`(").append(singleFieldInfoObj.getString(CmnConst.COLUMN_NAME)).append(") USING BTREE,"); |
| | | } |
| | | } |
| | | sql.deleteCharAt(sql.length() - 1); |
| | | sql.append("\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin "); |
| | | if (!StringUtils.isEmpty(tableInfoObj.getString(CmnConst.COMMENT))) { |
| | | sql.append(" COMMENT = '").append(tableInfoObj.getString(CmnConst.COMMENT)).append("'"); |
| | | } |
| | | } else { |
| | | throw new BaseException(ErrorCode.CREATE_TARGET_TABLE_FAIL); |
| | | } |
| | | return sql.toString(); |
| | | } |
| | | |
| | | private String saveCreateTableRecord(String tableName, String time) { |
| | | /*=====================================================*/ |
| | | // 新增创建表记录 |
| | | FieldSetEntity fse = new FieldSetEntity(); |
| | | fse.setTableName(CmnConst.DATA_ARCHIVING_SUB_TABLE); |
| | | fse.setValue("table_name", tableName); |
| | | fse.setValue("parent_uuid", this.configUid); |
| | | fse.setValue("data_time", time); |
| | | BaseUtil.createCreatorAndCreationTime(fse); |
| | | getBaseDao().saveFieldSetEntity(fse); |
| | | /*=====================================================*/ |
| | | return tableName; |
| | | } |
| | | private String saveCreateTableRecord(String tableName, String time) { |
| | | /*=====================================================*/ |
| | | // 新增创建表记录 |
| | | FieldSetEntity fse = new FieldSetEntity(); |
| | | fse.setTableName(CmnConst.DATA_ARCHIVING_SUB_TABLE); |
| | | fse.setValue("table_name", tableName); |
| | | fse.setValue("parent_uuid", this.configUid); |
| | | fse.setValue("data_time", time); |
| | | BaseUtil.createCreatorAndCreationTime(fse); |
| | | getBaseDao().saveFieldSetEntity(fse); |
| | | /*=====================================================*/ |
| | | return tableName; |
| | | } |
| | | |
| | | |
| | | private void createIndex(String targetTable, String time) throws BaseException { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append("\n SELECT DBMS_METADATA.GET_DDL('INDEX',u.index_name) as create_index_statement,u.INDEX_NAME "); |
| | | sql.append("\n from USER_INDEXES u where u.TABLE_NAME=? "); |
| | | String primaryIndexName = getPrimaryIndexName(); |
| | | List<Object> params = new ArrayList<>(); |
| | | params.add(sourceTable); |
| | | if (!StringUtils.isEmpty(primaryIndexName)) { |
| | | sql.append(" and u.index_name != ? "); |
| | | params.add(primaryIndexName); |
| | | } |
| | | DataTableEntity list = sourceDao.getList(sql.toString(), params.toArray()); |
| | | if (!DataTableEntity.isEmpty(list)) { |
| | | for (int i = 0; i < list.getRows(); i++) { |
| | | //循环获取创建索引语句 |
| | | String createIndexStatement = list.getString(i, "create_index_statement"); |
| | | String indexName = list.getString(i, "index_name"); |
| | | if (!StringUtils.isEmpty(createIndexStatement)) { |
| | | //新的索引名称 |
| | | String newIndexName = this.getIndexName(indexName, time); |
| | | //将建索引ddl |
| | | createIndexStatement = createIndexStatement.replaceAll(sourceTable, targetTable) |
| | | .replaceAll("\"" + indexName + "\"", "\"" + newIndexName + "\""); |
| | | //执行创建索引 |
| | | targetDao.executeSql(createIndexStatement); |
| | | } |
| | | } |
| | | } |
| | | private void createIndex(String targetTable, String time) throws BaseException { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append("\n SELECT DBMS_METADATA.GET_DDL('INDEX',u.index_name) as create_index_statement,u.INDEX_NAME "); |
| | | sql.append("\n from USER_INDEXES u where u.TABLE_NAME=? "); |
| | | String primaryIndexName = getPrimaryIndexName(); |
| | | List<Object> params = new ArrayList<>(); |
| | | params.add(sourceTable); |
| | | if (!StringUtils.isEmpty(primaryIndexName)) { |
| | | sql.append(" and u.index_name != ? "); |
| | | params.add(primaryIndexName); |
| | | } |
| | | DataTableEntity list = sourceDao.getList(sql.toString(), params.toArray()); |
| | | if (!DataTableEntity.isEmpty(list)) { |
| | | for (int i = 0; i < list.getRows(); i++) { |
| | | //循环获取创建索引语句 |
| | | String createIndexStatement = list.getString(i, "create_index_statement"); |
| | | String indexName = list.getString(i, "index_name"); |
| | | if (!StringUtils.isEmpty(createIndexStatement)) { |
| | | //新的索引名称 |
| | | String newIndexName = this.getIndexName(indexName, time); |
| | | //将建索引ddl |
| | | createIndexStatement = createIndexStatement.replaceAll(sourceTable, targetTable) |
| | | .replaceAll("\"" + indexName + "\"", "\"" + newIndexName + "\""); |
| | | //执行创建索引 |
| | | targetDao.executeSql(createIndexStatement); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取新的索引名称 |
| | | * |
| | | * @param indexName |
| | | * @return |
| | | */ |
| | | private String getIndexName(String indexName, String time) { |
| | | //新的索引名称 |
| | | String newIndexName; |
| | | if (indexName.length() <= 26) { |
| | | newIndexName = indexName + time; |
| | | } else { |
| | | //超过26位随机生成索引名称 |
| | | newIndexName = RandomUtil.randomString(10) + time; |
| | | } |
| | | return newIndexName; |
| | | } |
| | | /** |
| | | * 获取新的索引名称 |
| | | * |
| | | * @param indexName |
| | | * @return |
| | | */ |
| | | private String getIndexName(String indexName, String time) { |
| | | //新的索引名称 |
| | | String newIndexName; |
| | | if (indexName.length() <= 26) { |
| | | newIndexName = indexName + time; |
| | | } else { |
| | | //超过26位随机生成索引名称 |
| | | newIndexName = RandomUtil.randomString(10) + time; |
| | | } |
| | | return newIndexName; |
| | | } |
| | | |
| | | /** |
| | | * 获取主键索引的名名称 |
| | | * |
| | | * @return |
| | | */ |
| | | private String getPrimaryIndexName() { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append("\n SELECT a.index_name from user_constraints a "); |
| | | sql.append("\n WHERE a.constraint_type = 'P' "); |
| | | sql.append("\n AND a.table_name = ? "); |
| | | FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()}); |
| | | return one != null ? one.getString("index_name") : null; |
| | | } |
| | | /** |
| | | * 获取主键索引的名名称 |
| | | * |
| | | * @return |
| | | */ |
| | | private String getPrimaryIndexName() { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append("\n SELECT a.index_name from user_constraints a "); |
| | | sql.append("\n WHERE a.constraint_type = 'P' "); |
| | | sql.append("\n AND a.table_name = ? "); |
| | | FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()}); |
| | | return one != null ? one.getString("index_name") : null; |
| | | } |
| | | |
| | | /** |
| | | * 获取创建表语句(包含主键索引) |
| | | * |
| | | * @return |
| | | */ |
| | | private String getCreateTableStatement() { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append(" SELECT DBMS_METADATA.GET_DDL(U.OBJECT_TYPE, u.object_name) create_table_statement "); |
| | | sql.append(" from USER_OBJECTS u "); |
| | | sql.append(" where U.OBJECT_TYPE ='TABLE' and u.object_name=? "); |
| | | FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()}); |
| | | return one != null ? one.getString("create_table_statement") : null; |
| | | } |
| | | /** |
| | | * 获取创建表语句(包含主键索引) |
| | | * |
| | | * @return |
| | | */ |
| | | private String getCreateTableStatement() { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append(" SELECT DBMS_METADATA.GET_DDL(U.OBJECT_TYPE, u.object_name) create_table_statement "); |
| | | sql.append(" from USER_OBJECTS u "); |
| | | sql.append(" where U.OBJECT_TYPE ='TABLE' and u.object_name=? "); |
| | | FieldSetEntity one = sourceDao.getOne(sql.toString(), new Object[]{this.sourceTable.toUpperCase()}); |
| | | return one != null ? one.getString("create_table_statement") : null; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 获取其他索引名称 |
| | | * |
| | | * @return |
| | | */ |
| | | private List<String> getOtherIndexName() { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append(" SELECT INDEX_NAME FROM USER_INDEXES "); |
| | | sql.append("\n WHERE TABLE_NAME=? AND INDEX_NAME NOT IN ( "); |
| | | sql.append("\n SELECT a.index_name from user_constraints a "); |
| | | sql.append("\n WHERE a.constraint_type = 'P' "); |
| | | sql.append("\n AND a.TABLE_NAME = ? ) "); |
| | | /** |
| | | * 获取其他索引名称 |
| | | * |
| | | * @return |
| | | */ |
| | | private List<String> getOtherIndexName() { |
| | | StringBuilder sql = new StringBuilder(); |
| | | sql.append(" SELECT INDEX_NAME FROM USER_INDEXES "); |
| | | sql.append("\n WHERE TABLE_NAME=? AND INDEX_NAME NOT IN ( "); |
| | | sql.append("\n SELECT a.index_name from user_constraints a "); |
| | | sql.append("\n WHERE a.constraint_type = 'P' "); |
| | | sql.append("\n AND a.TABLE_NAME = ? ) "); |
| | | |
| | | DataTableEntity list = sourceDao.getList(sql.toString(), new Object[]{this.sourceTable, this.sourceTable}); |
| | | if (!DataTableEntity.isEmpty(list)) { |
| | | return list.getData().stream().map(item -> item.getString("index_name")).collect(Collectors.toList()); |
| | | } |
| | | return null; |
| | | } |
| | | DataTableEntity list = sourceDao.getList(sql.toString(), new Object[]{this.sourceTable, this.sourceTable}); |
| | | if (!DataTableEntity.isEmpty(list)) { |
| | | return list.getData().stream().map(item -> item.getString("index_name")).collect(Collectors.toList()); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | |