package com.product.data.center.service; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.NumberUtil; import com.alibaba.fastjson.JSONObject; import com.product.core.cache.util.RedisUtil; import com.product.core.config.Global; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.entity.ExtractUniqueEntity; import com.product.data.center.entity.JournalEntity; import com.product.data.center.utils.CustomLock; import com.product.data.center.utils.WriteExtractUtil; import com.product.datasource.dao.Dao; import com.product.datasource.entity.BatchResultEntity; import com.product.datasource.entity.DataBaseEntity; import com.product.datasource.entity.UpdateFilterEntity; import com.product.datasource.service.RedisService; import com.product.util.BaseUtil; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.sql.Connection; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @Author cheng * @Date 2022/7/15 13:25 * @PackageName:com.product.data.center.service * @ClassName: DataExtractService * @Description: 数据提取 从Redis 到 数据库 * @Version 1.0 */ @Service public class DataExtractService { final String FIX_KEY = "2022年9月2日092542"; @Autowired BaseDao baseDao; @Autowired JournalManagerService journalManagerService; @Resource MesExternalService mesExternalService; public final static String UNIQUE_FIX_KEY = "MES_MATER_TABLE_KEY:"; private static CustomLock lock = new CustomLock(500L); /** * 检测采集成功但未提取的记录并标记为执行失败 * * @param errorMinute 检测规定时间范围内没有被提取的时间(分钟) */ public void detectioncExtractIntegrality(String errorMinute) { StringBuilder sql = new StringBuilder(128); sql.append("\n SELECT "); sql.append("\n l.uuid,l.config_uuid "); sql.append("\n FROM "); sql.append("\n product_sys_data_center_log l "); sql.append("\n LEFT JOIN product_sys_data_center_log g ON l.uuid = g.pre_step_uuid AND g.type = 2 "); sql.append("\n WHERE "); sql.append("\n l.type = 1 AND l.result!=0 AND l.deal_flag!=1 and l.count>0 "); sql.append("\n AND g.uuid IS NULL "); sql.append("\n AND TIMESTAMPDIFF(MINUTE ,l.created_utc_datetime,NOW())> ?"); DataTableEntity dt = baseDao.listTable(sql.toString(), new Object[]{Integer.parseInt(errorMinute)}); if (!DataTableEntity.isEmpty(dt)) { baseDao.executeUpdate("UPDATE product_sys_data_center_log SET error=? ,result=0,deal_flag=0,updated_utc_datetime=now() WHERE " + BaseUtil.buildQuestionMarkFilter("uuid", dt.getRows(), true), ArrayUtil.addAll(new Object[]{"采集成功,未能在 " + errorMinute + " 分钟内成功提取!"}, dt.getUuids())); } sql.setLength(0); sql.append("\nSELECT"); sql.append("\nb.uuid"); sql.append("\n FROM"); sql.append("\nproduct_sys_data_center_log a"); sql.append("\nJOIN product_sys_data_center_log b ON a.type = 2"); sql.append("\nAND b.type = 1"); sql.append("\nAND a.pre_step_uuid = b.uuid"); sql.append("\nWHERE"); sql.append("\nb.error LIKE '采集成功,未能在%分钟内成功提取!'"); sql.append("\nAND a.result = 1"); sql.append("\nAND b.result = 0"); sql.append("\nAND b.deal_flag !=1"); dt = baseDao.listTable(sql.toString(), new Object[]{}); if (!DataTableEntity.isEmpty(dt)) { baseDao.executeUpdate("update product_sys_data_center_log set result=1,error=null,updated_utc_datetime=now() where " + BaseUtil.buildQuestionMarkFilter("uuid", dt.getRows(), true), dt.getUuids()); } } /** * 初始化表唯一值 * * @param tableName * @param uniqueFieldName * @param collectId * @param collectSourceField * @param targetSource * @param autoIncrementField */ public void initTableUniqueValue(String tableName, String uniqueFieldName, String collectId, String collectSourceField, String targetSource, String autoIncrementField) { ExtractUniqueEntity extractUniqueEntity = new ExtractUniqueEntity(); extractUniqueEntity.setCollectSourceField(collectSourceField); extractUniqueEntity.setExtractTargetTable(tableName); extractUniqueEntity.setUniqueFieldName(uniqueFieldName); extractUniqueEntity.setExtractTargetSource(targetSource); extractUniqueEntity.setAutoIncrement(autoIncrementField); extractUniqueEntity.addSourceInfo(collectId); // SpringMVCContextHolder.getSystemLogger().info("开始初始化表唯一值:" + tableName); // WriteExtractUtil.append("开始初始化表唯一值:" + tableName); String[] lockKey = {tableName, collectId}; try { tableName = tableName.toUpperCase(); String loadKey = UNIQUE_FIX_KEY + "LOADING-" + tableName + ":" + collectId; if ("1".equals(((String) RedisUtil.get(loadKey)))) { WriteExtractUtil.append("唯一值缓存已存在跳过:" + tableName); return; } this.initUniqueValue(extractUniqueEntity); // SpringMVCContextHolder.getSystemLogger().info("初始化表唯一值结束:" + tableName + ",耗时" + tempTestTimer.intervalMs() + " ms"); // WriteExtractUtil.append("初始化表唯一值结束:" + tableName + ",耗时" + tempTestTimer.intervalMs() + " ms"); } catch (Exception e) { WriteExtractUtil.append("缓存唯一值出错:" + tableName); e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); throw e; } } /** * 初始化历史数据唯一值 * 在系统启动完成后自动调用 */ @Async //异步执行 public void initUniqueValue() { StringBuilder sql = new StringBuilder(); sql.append(" SELECT "); sql.append("collect.id collect_id,\n"); sql.append("extract_target_source,\n"); sql.append("extract_target_table,\n"); sql.append("extract.extract_unique_field,\n"); sql.append("extract.collect_source_field,\n"); sql.append("extract.auto_increment \n"); sql.append(" \n FROM "); sql.append(" \n product_sys_data_collect collect "); sql.append(" \n JOIN product_sys_data_extract_config extract ON UPPER( extract.extract_source_table )= UPPER( case when length(collect.target_table)>0 then collect.target_table else collect.source_table end ) "); sql.append(" \n AND extract.is_used = 1 "); sql.append(" \n AND collect.is_used =1 "); DataTableEntity dt = baseDao.listTable(sql.toString(), (Object[]) null); if (!DataTableEntity.isEmpty(dt)) { Map extractUniqueEntityMap = new HashMap<>(); for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity fse = dt.getFieldSetEntity(i); String extractTargetSource = fse.getString("extract_target_source"); String extractTargetTable = fse.getString("extract_target_table"); String uniqueFieldName = fse.getString("extract_unique_field"); String collectSourceField = fse.getString("collect_source_field"); String autoIncrement = fse.getString("auto_increment"); String key = extractTargetSource + ":" + extractTargetTable; ExtractUniqueEntity extractUniqueEntity = extractUniqueEntityMap.get(key); if (extractUniqueEntity == null) { extractUniqueEntity = new ExtractUniqueEntity(); extractUniqueEntityMap.put(key, extractUniqueEntity); extractUniqueEntity.setExtractTargetSource(extractTargetSource); extractUniqueEntity.setAutoIncrement(autoIncrement); extractUniqueEntity.setExtractTargetTable(extractTargetTable); extractUniqueEntity.setUniqueFieldName(uniqueFieldName); extractUniqueEntity.setCollectSourceField(collectSourceField); } extractUniqueEntity.addSourceInfo(fse.getString("collect_id")); } ExecutorService threadPool = Executors.newFixedThreadPool(extractUniqueEntityMap.size() > 10 ? 10 : extractUniqueEntityMap.size()); for (ExtractUniqueEntity value : extractUniqueEntityMap.values()) { WriteExtractUtil.append("提交到线程池:" + value.getExtractTargetTable()); threadPool.submit(() -> this.initUniqueValue(value)); } threadPool.shutdown(); try {//等待直到所有任务完成 threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } SpringMVCContextHolder.getSystemLogger().info("初始化加载提取表唯一值完成!!!!!!!!"); } } void initUniqueValue(ExtractUniqueEntity extractUnique) { if (StringUtils.isEmpty(extractUnique.getExtractTargetSource()) || StringUtils.isEmpty(extractUnique.getExtractTargetTable())) { return; } Set collectIdSet = extractUnique.getCollectIdSet(); String loadKeyTemplate = UNIQUE_FIX_KEY + "LOADING-" + extractUnique.getExtractTargetTable() + ":"; Iterator iterator = collectIdSet.iterator(); while (iterator.hasNext()) { //初始化过了跳过 if ("1".equals(((String) RedisUtil.get(loadKeyTemplate + iterator.next())))) { iterator.remove(); } } if (collectIdSet == null || collectIdSet.isEmpty()) { return; } for (String collect : collectIdSet) { lock.lock(extractUnique.getExtractTargetTable(), collect); } DataBaseEntity dbe = new DataBaseEntity(extractUnique.getExtractTargetSource()); Dao dao = dbe.getDao(); try { StringBuilder sql = new StringBuilder(); sql.append(" SELECT MAX(TO_NUMBER(").append(extractUnique.getUniqueFieldName()).append(")) max_value, "); sql.append(extractUnique.getCollectSourceField()).append(" source_info"); sql.append(" FROM ").append(extractUnique.getExtractTargetTable()); sql.append(" GROUP BY ").append(extractUnique.getCollectSourceField()); WriteExtractUtil.append("【开始】开始初始化表唯一值:" + extractUnique.getExtractTargetTable() + ",分组: " + extractUnique.getCollectIdSet()); DataTableEntity dt = dao.getList(sql.toString(), (Object[]) null); if (!DataTableEntity.isEmpty(dt)) { for (int i = 0; i < dt.getRows(); i++) { String maxValue = dt.getString(i, "max_value"); String sourceInfo = dt.getString(i, "source_info"); if (StringUtils.isEmpty(maxValue) || StringUtils.isEmpty(sourceInfo)) { continue; } //放入最大值 RedisUtil.set(UNIQUE_FIX_KEY + extractUnique.getExtractTargetTable().toUpperCase() + ":" + sourceInfo, maxValue); } } for (String collect : collectIdSet) { RedisUtil.set(loadKeyTemplate + collect, "1"); } WriteExtractUtil.append("【结束】初始化表唯一值结束:" + extractUnique.getExtractTargetTable() + ",分组: " + extractUnique.getCollectIdSet()); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } finally { dao.closeConnection(); for (String collect : collectIdSet) { lock.unLock(extractUnique.getExtractTargetTable(), collect); } } } /** * 删除表对应的唯一值 * * @param tableName 表名 * @param tableUniqueByCollectId 表对应的唯一值 */ @Deprecated public static void removeTableUniqueValue(String tableName, Map> tableUniqueByCollectId) { if (StringUtils.isEmpty(tableName)) { return; } tableName = tableName.toUpperCase(); String[] lockKey = {tableName, null}; if (tableUniqueByCollectId != null && tableUniqueByCollectId.size() > 0) { for (Map.Entry> v : tableUniqueByCollectId.entrySet()) { Set values = v.getValue(); String collectId = v.getKey(); lockKey[1] = collectId; lock.lock(lockKey); try { String loadKey = UNIQUE_FIX_KEY + "LOADING-" + tableName + ":" + collectId; if (!"1".equals((RedisUtil.get(loadKey)))) { //表+采集id对应的唯一值集合还未初始化 continue; } RedisUtil.delSetAimValue(UNIQUE_FIX_KEY + tableName + ":" + collectId, values.toArray(new String[0])); } finally { lock.unLock(lockKey); } } } } /** * 添加表唯一值 * * @param tableName * @param uniqueValues * @param collectId */ private void addTableUniqueValue(String tableName, String[] uniqueValues, String collectId) { tableName = tableName.toUpperCase(); String[] lockKey = {tableName, collectId}; Optional max = Arrays.stream(uniqueValues).max(Comparator.comparingInt(item -> Integer.parseInt(item))); Integer maxValue = Integer.parseInt(max.get()); lock.lock(lockKey); try { String value = (String) RedisUtil.get(UNIQUE_FIX_KEY + tableName + ":" + collectId); if (!StringUtils.isEmpty(value) && Integer.parseInt(value) < maxValue) { RedisUtil.set(UNIQUE_FIX_KEY + tableName + ":" + collectId, String.valueOf(maxValue)); } } finally { lock.unLock(lockKey); } } /** * 验证表唯一值是否存在 * * @param tableName * @param value * @param collectId * @return */ private boolean isExists(String tableName, String value, String collectId) { tableName = tableName.toUpperCase(); String[] lockKey = {tableName, collectId}; lock.lock(lockKey); try { String valueOld = (String) RedisUtil.get(UNIQUE_FIX_KEY + tableName + ":" + collectId); if (StringUtils.isEmpty(valueOld)) { return false; } else if (!NumberUtil.isNumber(valueOld)) { throw new BaseException(ErrorCode.UNIQUE_VALUE_NUMBER); } else { return Integer.parseInt(value) <= Integer.parseInt(valueOld); } } catch (Exception e) { throw e; } finally { lock.unLock(lockKey); } } private CustomLock extractLock = new CustomLock(); /** * 提取数据定时任务入口方法 * * @param uuid 提取配置uuid */ public void startExtractData(String uuid) throws BaseException { String[] lockKey = {"startExtractData", uuid}; if (!extractLock.tryLock(lockKey)) { WriteExtractUtil.append("已有任务在运行跳过此次运行!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!:::" + uuid); return; } WriteExtractUtil.append("开始提取!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!:::" + uuid); TimeInterval tempTestTimer = DateUtil.timer(); WriteExtractUtil.append("提取-等待耗时:" + tempTestTimer.intervalMs() + ",:::" + uuid); tempTestTimer = DateUtil.timer(); try { //查询提取数据配置 FieldSetEntity fse = baseDao.getFieldSetEntity(CmnConst.DATA_EXTRACT_TABLE, uuid, true); if (FieldSetEntity.isEmpty(fse) || DataTableEntity.isEmpty(fse.getSubDataTable(CmnConst.DATA_EXTRACT_SUB_TABLE))) { writeLog(uuid, false, ErrorCode.GET_EXTRACT_DATA_CONFIG_FAIL.getText(), -1, null); throw new BaseException(ErrorCode.GET_EXTRACT_DATA_CONFIG_FAIL); } DataTableEntity fieldMappingConfig = fse.getSubDataTable(CmnConst.DATA_EXTRACT_SUB_TABLE); //目标表唯一标识字段 String extractUniqueField = fse.getString("extract_unique_field"); //来源唯一标识字段 String sourceUniqueField = null; //自动增长字段 String autoIncrementField = fse.getString("auto_increment"); for (int i = 0; i < fieldMappingConfig.getRows(); i++) { if (extractUniqueField.equals(fieldMappingConfig.getString(i, "target_table_field"))) { sourceUniqueField = fieldMappingConfig.getString(i, "source_redis_field"); // break; } } if (StringUtils.isEmpty(sourceUniqueField)) { throw new BaseException(ErrorCode.GET_EXTRACT_UNIQUE_FIELD_FAIL); } //redis 主 String extractSource = fse.getString("extract_source"); //redis 从 // String extractSourceBak = fse.getString("bak_redis_source"); //目标源配置uuid String extractTargetSource = fse.getString("extract_target_source"); //redis数据源 // DataBaseEntity redisDataBase = new DataBaseEntity(extractSourceBak); //目标数据源 DataBaseEntity targetDataBase = new DataBaseEntity(extractTargetSource); //集群模式 DataBaseEntity redisDataBase = new DataBaseEntity(extractSource); RedisService readRedisService = new RedisService(redisDataBase); RedisService writeRedisService = readRedisService; // redisDataBase = new DataBaseEntity(extractSource); // RedisService writeRedisService = new RedisService(redisDataBase); String templatePattern = fse.getString("extract_prefix_key") + ":" + fse.getString("extract_source_table").toLowerCase() + ":"; String pattern = templatePattern + "*"; WriteExtractUtil.append("keyp-pattern:" + pattern); String[] keys = readRedisService.getKeys(pattern); if (ArrayUtil.isEmpty(keys)) { writeLog(uuid, true, "", 0, null); readRedisService.close(); WriteExtractUtil.append("DE-没有找到对应的redis数据," + pattern); return; } Dao targetDao = targetDataBase.getDao(); //提取更新时间标识子弹 String extractUpdateTimeField = fse.getString("extract_update_time_field"); //采集来源id存储字段 String collectSourceField = fse.getString("collect_source_field"); //提取目标表表名 String extractTargetTable = fse.getString("extract_target_table"); // 提取时间字段 String extractTimeField = fse.getString(CmnConst.EXTRACT_TIME_FIELD); Dao targetNewDao = null; WriteExtractUtil.append("提取-准备耗时:" + tempTestTimer.intervalMs()); try { for (String key : keys) { List hashFields = readRedisService.getHashFields(key); hashFields = redisHashFields(hashFields); if (hashFields != null && hashFields.size() > 0) { //采集ID String collectId = key.substring(key.lastIndexOf(":") + 1); initTableUniqueValue(extractTargetTable, extractUniqueField, collectId, collectSourceField, targetDataBase.getUuid(), autoIncrementField); for (int ij = 0; ij < hashFields.size(); ij++) { String field = hashFields.get(ij); tempTestTimer = DateUtil.timer(); Object value = readRedisService.getHash(key, field); JournalEntity logFse = new JournalEntity(); WriteExtractUtil.append("提取-redis数据获取耗时:" + tempTestTimer.intervalMs()); logFse.setOther_info("提取-redis数据获取耗时:" + tempTestTimer.intervalMs()); if (field == null || value == null || ("impossible_" + FIX_KEY).equals(field)) { continue; } tempTestTimer = DateUtil.timer(); String collectLogUid = field; // FieldSetEntity collectInfo = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, collectLogUid, false); int totalCount = -1; try { // DataTableEntity extractLogs = new DataTableEntity(); DataTableEntity data = (DataTableEntity) value; totalCount = data.getRows(); DataTableEntity maybeUpdate = new DataTableEntity(); TimeInterval tempTestTimer1 = DateUtil.timer(); List newAutoIncrementValues = new ArrayList<>(); //制令单特殊处理 updateMoBase(data, extractTargetTable, targetDao); //字段映射处理 data = getFieldMappingDispose(collectId, collectSourceField, extractTargetTable, extractTimeField, data, fieldMappingConfig, extractUniqueField, extractUpdateTimeField, maybeUpdate, autoIncrementField, targetDataBase, newAutoIncrementValues ); WriteExtractUtil.append("提取-字段映射耗时:" + tempTestTimer1.intervalMs()); tempTestTimer1 = DateUtil.timer(); BatchResultEntity batchResultEntity = null; if (!DataTableEntity.isEmpty(maybeUpdate)) { //有需要查询确认更新的数据 StringBuilder filter = new StringBuilder(32); filter.append(extractUpdateTimeField).append(" < ? "); filter.append(" AND ").append(collectSourceField).append(" =? AND "); filter.append(extractUniqueField).append("= ? "); targetNewDao = targetNewDao != null ? targetNewDao : targetDataBase.newDao(); TimeInterval tempTestTimer2 = DateUtil.timer(); UpdateFilterEntity updateFilterEntity = new UpdateFilterEntity(autoIncrementField + "=?", new String[]{autoIncrementField}); WriteExtractUtil.append("开始查询数据是否需要更新:" + maybeUpdate.getRows() + ",:" + extractTargetTable); Map exist = isExist(maybeUpdate, collectId, extractTargetTable, autoIncrementField, collectSourceField, extractUniqueField, extractUpdateTimeField, targetNewDao); WriteExtractUtil.append("查询数据是否需要更新结束共耗时:" + tempTestTimer2.intervalMs() + ",:" + extractTargetTable); for (int i = 0; i < maybeUpdate.getRows(); i++) { FieldSetEntity item = maybeUpdate.getFieldSetEntity(i); String sourceInfo = item.getString(collectSourceField); String preMasterKey = item.getString(extractUniqueField); JSONObject jsonObject = new JSONObject(); jsonObject.put(collectSourceField, sourceInfo); jsonObject.put(extractUniqueField, preMasterKey); jsonObject = exist.get(jsonObject); if ((jsonObject == null || jsonObject.isEmpty())) { // WriteExtractUtil.append("将数据放入新增:" + item.getString(extractUniqueField) + "," + extractTargetTable); // //判定为新增的数据 data.addFieldSetEntity(item); newAutoIncrementValues.add(item.getString(extractUniqueField)); maybeUpdate.removeFieldSetEntity(i); i--; continue; } item.setValue(autoIncrementField, jsonObject.get(autoIncrementField)); //数据库更新时间 Date oldDate = jsonObject.getDate(extractUpdateTimeField); //数据的更新时间 Date newDate = item.getDate(extractUpdateTimeField); int compare = DateUtil.compare(oldDate, newDate); if (compare >= 0) { // extractLogs.addFieldSetEntity(getFieldSetEntity(item.getString(extractUniqueField), item.getTableName(), 2, item.getString(collectSourceField))); //不需要更新 maybeUpdate.removeFieldSetEntity(i); i--; continue; // WriteExtractUtil.append("跳过不需要更新的数据项,主键:" + item.getString(autoIncrementField) + ",数据库时间:" + DateUtil.format(oldDate, "yyyy-MM-dd HH:mm:ss") + ",数据库时间:" + DateUtil.format(newDate, "yyyy-MM-dd HH:mm:ss")); } // extractLogs.addFieldSetEntity(getFieldSetEntity(item.getString(extractUniqueField), item.getTableName(), 1, item.getString(collectSourceField))); } if (maybeUpdate.getRows() > 0) { maybeUpdate.setMeta(maybeUpdate.getFieldSetEntity(0).getMeta()); } WriteExtractUtil.append("更新提取过滤后数据:" + extractTargetTable + ",需要更新的条数:" + maybeUpdate.getRows()); maybeUpdate.getData().sort((o1, o2) -> DateUtil.compare(o1.getDate(extractUpdateTimeField), o2.getDate(extractUpdateTimeField))); batchResultEntity = targetNewDao.updateBatch(maybeUpdate, updateFilterEntity, false); WriteExtractUtil.append("更新提取过滤后数据:" + extractTargetTable + ",需要更新的条数:" + maybeUpdate.getRows() + ",耗时:" + tempTestTimer2.intervalMs()); targetNewDao.closeConnection(); } if (!DataTableEntity.isEmpty(data)) { tempTestTimer1 = DateUtil.timer(); targetDao.addBatch(data, autoIncrementField); // data.getData().stream().forEach(item -> { // extractLogs.addFieldSetEntity(getFieldSetEntity(item.getString(extractUniqueField), item.getTableName(), 0, item.getString(collectSourceField))); // }); WriteExtractUtil.append("新增数据耗时:" + tempTestTimer1.intervalMs()); this.addTableUniqueValue(extractTargetTable, newAutoIncrementValues.stream().toArray(String[]::new), collectId); } // if (!DataTableEntity.isEmpty(extractLogs)) { // baseDao.add(extractLogs); // } // writeLog(uuid, true, "", totalCount, collectLogUid); WriteExtractUtil.append("提取-新增更新数据耗时:" + tempTestTimer1.intervalMs()); WriteExtractUtil.append("提取-写入数据库耗时:" + tempTestTimer.intervalMs()); writeLog(logFse, uuid, true, "", totalCount, collectLogUid); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); writeLog(uuid, false, e, totalCount, collectLogUid); } finally { //redis数据源 writeRedisService.setHash(key, "impossible_" + FIX_KEY, new DataTableEntity()); writeRedisService.delHash(key, field); } } } } } catch (Exception e) { throw e; } finally { if (targetNewDao != null) { targetNewDao.closeConnection(); } readRedisService.close(); writeRedisService.close(); targetDao.closeConnection(); } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); throw e; } finally { extractLock.unLock(lockKey); } } /** * 制令单特殊处理 将找目标库中的工单表的id字段写入project_id * * @param dt * @param targetTable * @param targetDao * @throws BaseException */ private void updateMoBase(DataTableEntity dt, String targetTable, Dao targetDao) throws BaseException { if (DataTableEntity.isEmpty(dt) || !"T_PM_MO_BASE".equalsIgnoreCase(targetTable)) { return; } Map map = new HashMap<>(); List moNumberList = new ArrayList<>(); for (int i = 0; i < dt.getRows(); i++) { String moNumber = dt.getString(i, "mo_number"); if (StringUtils.isEmpty(moNumber)) { continue; } moNumberList.add(moNumber); map.put(moNumber, dt.getFieldSetEntity(i)); } //查询制令单关联的工单数据 DataTableEntity projectBaseDt = targetDao.getList("T_PM_PROJECT_BASE", new String[]{"PROJECT_ID,PROJECT_BASE_ID"}, BaseUtil.buildQuestionMarkFilter("project_id", moNumberList.size(), true), moNumberList.toArray(), null, 1, dt.getRows()); if (!DataTableEntity.isEmpty(projectBaseDt)) { for (int i = 0; i < projectBaseDt.getRows(); i++) { FieldSetEntity fs = projectBaseDt.getFieldSetEntity(i); String projectId = fs.getString("project_id"); FieldSetEntity fieldSetEntity = map.get(projectId); if (!FieldSetEntity.isEmpty(fieldSetEntity)) { //将工单表id写入制令单的project_id字段 fieldSetEntity.setValue("project_id", fs.getString("project_base_id")); map.remove(projectId); } } } if (!map.isEmpty()) { throw new BaseException(ErrorCode.MO_NUMBER_MASTER_PROJECT_BASE_EMPTY); } } /** * @param primaryValue 主键值 * @param tableName 表名 * @param operationType 操作类型 0 insert 1 update 2 数据已存在不需要更新 * @return */ private FieldSetEntity getFieldSetEntity(String primaryValue, String tableName, int operationType, String sourceInfo) { FieldSetEntity fs = new FieldSetEntity(); fs.setTableName("product_mes_extract_log"); fs.setValue("primary_value", primaryValue); fs.setValue("table_name", tableName); fs.setValue("operation_type", operationType); fs.setValue("source_info", sourceInfo); fs.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); return fs; } /** * 将采集uuid按采集表id排序返回 * * @param fields * @return */ private List redisHashFields(List fields) { if (fields != null && !fields.isEmpty()) { DataTableEntity dt = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "type = 1 and " + BaseUtil.buildQuestionMarkFilter("uuid", fields.toArray(), true), new Object[]{}, new Object[]{CmnConst.UUID}, "id"); if (!DataTableEntity.isEmpty(dt)) { return dt.getData().stream().map(item -> item.getString(CmnConst.UUID)).collect(Collectors.toList()); } } return Collections.emptyList(); } private void writeLog(String configUid, boolean isSuccess, String errorMsg, int count, String collectLogUid) { JournalEntity fse = new JournalEntity(); fse.setType(2); fse.setError(errorMsg); fse.setCount(count); fse.setResult(isSuccess ? 1 : 0); fse.setConfigUuid(configUid); fse.setPre_step_uuid(collectLogUid); fse.setCreated_utc_datetime(new Date()); if (!StringUtils.isEmpty(errorMsg) || count > 0 || !isSuccess) { FieldSetEntity fs = journalManagerService.autoCreateJournal(fse); mesExternalService.remoteSaveExtractLog(fs); } } private void writeLog(JournalEntity fse, String configUid, boolean isSuccess, String errorMsg, int count, String collectLogUid) { fse.setType(2); fse.setError(errorMsg); fse.setCount(count); fse.setResult(isSuccess ? 1 : 0); fse.setConfigUuid(configUid); fse.setPre_step_uuid(collectLogUid); fse.setCreated_utc_datetime(new Date()); FieldSetEntity fs = journalManagerService.autoCreateJournal(fse); mesExternalService.remoteSaveExtractLog(fs); } private void writeLog(String configUid, boolean isSuccess, Throwable throwable, int count, String collectLogUid) { JournalEntity fse = new JournalEntity(); String errorMsg = journalManagerService.getStackTrace(throwable); fse.setType(2); fse.setError(errorMsg); fse.setCount(count); fse.setResult(isSuccess ? 1 : 0); fse.setConfigUuid(configUid); fse.setPre_step_uuid(collectLogUid); fse.setCreated_utc_datetime(new Date()); FieldSetEntity fs = journalManagerService.autoCreateJournal(fse); mesExternalService.remoteSaveExtractLog(fs); } /** * @param dt * @param collectId 采集id * @param targetTable 目标表表名 * @param idField id字段名称 * @param sourceInfoField 采集来源字段名称 * @param preMasterKeyField 子库主键字段名称 * @param extractUpdateTimeField 修改时间字段 * @param dao * @return */ private Map isExist(DataTableEntity dt, String collectId, String targetTable, String idField, String sourceInfoField, String preMasterKeyField, String extractUpdateTimeField, Dao dao) { List data = dt.getData(); String dataSystemName = Global.getSystemConfig("data.system.name", ""); if (StringUtils.isEmpty(dataSystemName)) { //数据来源系统名称不能为空 throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY); } Map> collect = data.stream().collect(Collectors.groupingBy(item -> item.getString(sourceInfoField), Collectors.mapping(item -> item.getString(preMasterKeyField), Collectors.toList()))); //子库采集数据的主键集合 // Set collectData = data.stream().filter(item -> !dataSystemName.equals(item.getString(sourceInfoField))).map(item -> item.getString(preMasterKeyField)).collect(Collectors.toSet()); int i = 0; List params = new ArrayList<>(); StringBuilder sql = new StringBuilder(); for (Map.Entry> v : collect.entrySet()) { if (collectId.equals(v.getKey()) || dataSystemName.equals(v.getKey())) { if (i > 0) { sql.append(" UNION ALL "); } sql.append(" SELECT ").append(preMasterKeyField).append(",").append(idField).append(",").append(extractUpdateTimeField); sql.append(" ,").append(sourceInfoField); sql.append(" FROM ").append(targetTable); sql.append(" WHERE ").append(sourceInfoField).append(" = ? and ").append(BaseUtil.buildQuestionMarkFilter(preMasterKeyField, v.getValue().size(), true)); params.add(v.getKey()); params.addAll(v.getValue()); } i++; } DataTableEntity result = dao.getList(sql.toString(), params.toArray()); if (DataTableEntity.isEmpty(result)) { return Collections.emptyMap(); } Map resultMap = new HashMap<>(); for (int j = 0; j < result.getRows(); j++) { FieldSetEntity item = result.getFieldSetEntity(j); JSONObject jsonObject = new JSONObject(); jsonObject.put(preMasterKeyField, item.getString(preMasterKeyField)); jsonObject.put(sourceInfoField, item.getString(sourceInfoField)); resultMap.put(jsonObject, new JSONObject((Map) item.getValues())); } return resultMap; } /** * 字段映射 * * * * @param collectId 采集id * @param collectSourceField 采集来源字段 * @param extractTargetTable 提取目标表 * @param data 要映射的元数据集 * @param fieldMappingConfig 映射关系 * @param extractUniqueField 目标表唯一标识字段 * @param updateTimeField 更新标识字段 * @param updateDt 可能需要更新的数据 * @return */ private DataTableEntity getFieldMappingDispose(String collectId, String collectSourceField, String extractTargetTable, String extractTimeField, DataTableEntity data, DataTableEntity fieldMappingConfig, String extractUniqueField, String updateTimeField, DataTableEntity updateDt, String autoIncrementField, DataBaseEntity targetDataBase, List newAutoIncrementValues) { String dataSystemName = Global.getSystemConfig("data.system.name", ""); if (StringUtils.isEmpty(dataSystemName)) { //数据来源系统名称不能为空 throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY); } if (!DataTableEntity.isEmpty(data)) { //从数据库中查询有没有存在的主键值 DataTableEntity result = new DataTableEntity(); for (int i = 0; i < data.getRows(); i++) { FieldSetEntity fse1 = data.getFieldSetEntity(i); FieldSetEntity fseNew = new FieldSetEntity(); fseNew.setTableName(extractTargetTable); //历史数据 String historyCollect = fse1.getString(collectSourceField); for (int j = 0; j < fieldMappingConfig.getRows(); j++) { String targetTableField = fieldMappingConfig.getString(j, "target_table_field"); String sourceRedisField = fieldMappingConfig.getString(j, "source_redis_field"); fseNew.setValue(targetTableField, fse1.getObject(sourceRedisField)); } // 提取时间 fseNew.setValue(extractTimeField, new Date()); //数据系统名称与历史采集id不一致才复制当前采集的id 否则判断为历史系统数据将进行更新操作 2022年12月1日22:33:00 cheng if (!dataSystemName.equals(historyCollect)) { fseNew.setValue(collectSourceField, collectId); } else { fseNew.setValue(collectSourceField, historyCollect); fseNew.setValue(extractUniqueField, fse1.getString(autoIncrementField)); } if (StringUtils.isEmpty(fseNew.getString(extractUniqueField))) { //唯一值为空 throw new BaseException(ErrorCode.EXTRACT_DATA_ROW_UNIQUE_VALUE_CAN_NOT_EMPTY); } if (StringUtils.isEmpty(updateTimeField)) { throw new BaseException(new Exception("更新时间字段为空")); } //系统数据名称和采集来源字段的值相同 默认为更新 if (fseNew.getObject(updateTimeField) != null && (dataSystemName.equals(historyCollect) || // exist.contains(fseNew.getString(uniqueField)) isExists(extractTargetTable, fseNew.getString(extractUniqueField), collectId) )) { //更新字段不为空可能会有更新 fseNew.remove(autoIncrementField); updateDt.addFieldSetEntity(fseNew); } else { //新增 result.addFieldSetEntity(fseNew); newAutoIncrementValues.add(fseNew.getString(extractUniqueField)); } } return result; } return data; } }