package com.product.data.center.service;
|
|
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.TimeInterval;
|
import cn.hutool.core.util.ArrayUtil;
|
import cn.hutool.core.util.NumberUtil;
|
import com.alibaba.fastjson.JSONObject;
|
import com.product.core.cache.util.RedisUtil;
|
import com.product.core.config.Global;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.entity.ExtractUniqueEntity;
|
import com.product.data.center.entity.JournalEntity;
|
import com.product.data.center.utils.CustomLock;
|
import com.product.data.center.utils.WriteExtractUtil;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.BatchResultEntity;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.datasource.entity.UpdateFilterEntity;
|
import com.product.datasource.service.RedisService;
|
import com.product.util.BaseUtil;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.Resource;
|
import java.sql.Connection;
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.TimeUnit;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/7/15 13:25
|
* @PackageName:com.product.data.center.service
|
* @ClassName: DataExtractService
|
* @Description: 数据提取 从Redis 到 数据库
|
* @Version 1.0
|
*/
|
@Service
|
public class DataExtractService {
|
|
final String FIX_KEY = "2022年9月2日092542";
|
|
@Autowired
|
BaseDao baseDao;
|
|
@Autowired
|
JournalManagerService journalManagerService;
|
|
@Resource
|
MesExternalService mesExternalService;
|
|
public final static String UNIQUE_FIX_KEY = "MES_MATER_TABLE_KEY:";
|
|
|
private static CustomLock lock = new CustomLock(500L);
|
|
/**
|
* 检测采集成功但未提取的记录并标记为执行失败
|
*
|
* @param errorMinute 检测规定时间范围内没有被提取的时间(分钟)
|
*/
|
public void detectioncExtractIntegrality(String errorMinute) {
|
StringBuilder sql = new StringBuilder(128);
|
sql.append("\n SELECT ");
|
sql.append("\n l.uuid,l.config_uuid ");
|
sql.append("\n FROM ");
|
sql.append("\n product_sys_data_center_log l ");
|
sql.append("\n LEFT JOIN product_sys_data_center_log g ON l.uuid = g.pre_step_uuid AND g.type = 2 ");
|
sql.append("\n WHERE ");
|
sql.append("\n l.type = 1 AND l.result!=0 AND l.deal_flag!=1 and l.count>0 ");
|
sql.append("\n AND g.uuid IS NULL ");
|
sql.append("\n AND TIMESTAMPDIFF(MINUTE ,l.created_utc_datetime,NOW())> ?");
|
DataTableEntity dt = baseDao.listTable(sql.toString(), new Object[]{Integer.parseInt(errorMinute)});
|
if (!DataTableEntity.isEmpty(dt)) {
|
baseDao.executeUpdate("UPDATE product_sys_data_center_log SET error=? ,result=0,deal_flag=0,updated_utc_datetime=now() WHERE " +
|
BaseUtil.buildQuestionMarkFilter("uuid", dt.getRows(), true),
|
ArrayUtil.addAll(new Object[]{"采集成功,未能在 " + errorMinute + " 分钟内成功提取!"}, dt.getUuids()));
|
}
|
|
sql.setLength(0);
|
sql.append("\nSELECT");
|
sql.append("\nb.uuid");
|
sql.append("\n FROM");
|
sql.append("\nproduct_sys_data_center_log a");
|
sql.append("\nJOIN product_sys_data_center_log b ON a.type = 2");
|
sql.append("\nAND b.type = 1");
|
sql.append("\nAND a.pre_step_uuid = b.uuid");
|
sql.append("\nWHERE");
|
sql.append("\nb.error LIKE '采集成功,未能在%分钟内成功提取!'");
|
sql.append("\nAND a.result = 1");
|
sql.append("\nAND b.result = 0");
|
sql.append("\nAND b.deal_flag !=1");
|
dt = baseDao.listTable(sql.toString(), new Object[]{});
|
if (!DataTableEntity.isEmpty(dt)) {
|
baseDao.executeUpdate("update product_sys_data_center_log set result=1,error=null,updated_utc_datetime=now() where " + BaseUtil.buildQuestionMarkFilter("uuid", dt.getRows(), true), dt.getUuids());
|
}
|
}
|
|
/**
|
* 初始化表唯一值
|
*
|
* @param tableName
|
* @param uniqueFieldName
|
* @param collectId
|
* @param collectSourceField
|
* @param targetSource
|
* @param autoIncrementField
|
*/
|
public void initTableUniqueValue(String tableName, String uniqueFieldName, String collectId, String collectSourceField, String targetSource, String autoIncrementField) {
|
ExtractUniqueEntity extractUniqueEntity = new ExtractUniqueEntity();
|
extractUniqueEntity.setCollectSourceField(collectSourceField);
|
extractUniqueEntity.setExtractTargetTable(tableName);
|
extractUniqueEntity.setUniqueFieldName(uniqueFieldName);
|
extractUniqueEntity.setExtractTargetSource(targetSource);
|
extractUniqueEntity.setAutoIncrement(autoIncrementField);
|
extractUniqueEntity.addSourceInfo(collectId);
|
// SpringMVCContextHolder.getSystemLogger().info("开始初始化表唯一值:" + tableName);
|
// WriteExtractUtil.append("开始初始化表唯一值:" + tableName);
|
String[] lockKey = {tableName, collectId};
|
try {
|
tableName = tableName.toUpperCase();
|
String loadKey = UNIQUE_FIX_KEY + "LOADING-" + tableName + ":" + collectId;
|
if ("1".equals(((String) RedisUtil.get(loadKey)))) {
|
WriteExtractUtil.append("唯一值缓存已存在跳过:" + tableName);
|
return;
|
}
|
this.initUniqueValue(extractUniqueEntity);
|
// SpringMVCContextHolder.getSystemLogger().info("初始化表唯一值结束:" + tableName + ",耗时" + tempTestTimer.intervalMs() + " ms");
|
// WriteExtractUtil.append("初始化表唯一值结束:" + tableName + ",耗时" + tempTestTimer.intervalMs() + " ms");
|
} catch (Exception e) {
|
WriteExtractUtil.append("缓存唯一值出错:" + tableName);
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw e;
|
}
|
}
|
|
/**
|
* 初始化历史数据唯一值
|
* 在系统启动完成后自动调用
|
*/
|
@Async //异步执行
|
public void initUniqueValue() {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT ");
|
sql.append("collect.id collect_id,\n");
|
sql.append("extract_target_source,\n");
|
sql.append("extract_target_table,\n");
|
sql.append("extract.extract_unique_field,\n");
|
sql.append("extract.collect_source_field,\n");
|
sql.append("extract.auto_increment \n");
|
sql.append(" \n FROM ");
|
sql.append(" \n product_sys_data_collect collect ");
|
sql.append(" \n JOIN product_sys_data_extract_config extract ON UPPER( extract.extract_source_table )= UPPER( case when length(collect.target_table)>0 then collect.target_table else collect.source_table end ) ");
|
sql.append(" \n AND extract.is_used = 1 ");
|
sql.append(" \n AND collect.is_used =1 ");
|
|
DataTableEntity dt = baseDao.listTable(sql.toString(), (Object[]) null);
|
if (!DataTableEntity.isEmpty(dt)) {
|
Map<String, ExtractUniqueEntity> extractUniqueEntityMap = new HashMap<>();
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fse = dt.getFieldSetEntity(i);
|
String extractTargetSource = fse.getString("extract_target_source");
|
String extractTargetTable = fse.getString("extract_target_table");
|
String uniqueFieldName = fse.getString("extract_unique_field");
|
String collectSourceField = fse.getString("collect_source_field");
|
String autoIncrement = fse.getString("auto_increment");
|
String key = extractTargetSource + ":" + extractTargetTable;
|
ExtractUniqueEntity extractUniqueEntity = extractUniqueEntityMap.get(key);
|
if (extractUniqueEntity == null) {
|
extractUniqueEntity = new ExtractUniqueEntity();
|
extractUniqueEntityMap.put(key, extractUniqueEntity);
|
extractUniqueEntity.setExtractTargetSource(extractTargetSource);
|
extractUniqueEntity.setAutoIncrement(autoIncrement);
|
extractUniqueEntity.setExtractTargetTable(extractTargetTable);
|
extractUniqueEntity.setUniqueFieldName(uniqueFieldName);
|
extractUniqueEntity.setCollectSourceField(collectSourceField);
|
}
|
extractUniqueEntity.addSourceInfo(fse.getString("collect_id"));
|
}
|
ExecutorService threadPool = Executors.newFixedThreadPool(extractUniqueEntityMap.size()
|
> 10 ? 10 : extractUniqueEntityMap.size());
|
for (ExtractUniqueEntity value : extractUniqueEntityMap.values()) {
|
WriteExtractUtil.append("提交到线程池:" + value.getExtractTargetTable());
|
threadPool.submit(() -> this.initUniqueValue(value));
|
}
|
threadPool.shutdown();
|
try {//等待直到所有任务完成
|
threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.MINUTES);
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
SpringMVCContextHolder.getSystemLogger().info("初始化加载提取表唯一值完成!!!!!!!!");
|
}
|
}
|
|
|
void initUniqueValue(ExtractUniqueEntity extractUnique) {
|
if (StringUtils.isEmpty(extractUnique.getExtractTargetSource()) || StringUtils.isEmpty(extractUnique.getExtractTargetTable())) {
|
return;
|
}
|
Set<String> collectIdSet = extractUnique.getCollectIdSet();
|
String loadKeyTemplate = UNIQUE_FIX_KEY + "LOADING-" + extractUnique.getExtractTargetTable() + ":";
|
Iterator<String> iterator = collectIdSet.iterator();
|
while (iterator.hasNext()) {
|
//初始化过了跳过
|
if ("1".equals(((String) RedisUtil.get(loadKeyTemplate + iterator.next())))) {
|
iterator.remove();
|
}
|
}
|
if (collectIdSet == null || collectIdSet.isEmpty()) {
|
return;
|
}
|
for (String collect : collectIdSet) {
|
lock.lock(extractUnique.getExtractTargetTable(), collect);
|
}
|
DataBaseEntity dbe = new DataBaseEntity(extractUnique.getExtractTargetSource());
|
Dao dao = dbe.getDao();
|
try {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT MAX(TO_NUMBER(").append(extractUnique.getUniqueFieldName()).append(")) max_value, ");
|
sql.append(extractUnique.getCollectSourceField()).append(" source_info");
|
sql.append(" FROM ").append(extractUnique.getExtractTargetTable());
|
sql.append(" GROUP BY ").append(extractUnique.getCollectSourceField());
|
WriteExtractUtil.append("【开始】开始初始化表唯一值:" + extractUnique.getExtractTargetTable() + ",分组: " + extractUnique.getCollectIdSet());
|
DataTableEntity dt = dao.getList(sql.toString(), (Object[]) null);
|
if (!DataTableEntity.isEmpty(dt)) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
String maxValue = dt.getString(i, "max_value");
|
String sourceInfo = dt.getString(i, "source_info");
|
if (StringUtils.isEmpty(maxValue) || StringUtils.isEmpty(sourceInfo)) {
|
continue;
|
}
|
//放入最大值
|
RedisUtil.set(UNIQUE_FIX_KEY + extractUnique.getExtractTargetTable().toUpperCase() + ":" + sourceInfo, maxValue);
|
}
|
}
|
for (String collect : collectIdSet) {
|
RedisUtil.set(loadKeyTemplate + collect, "1");
|
}
|
WriteExtractUtil.append("【结束】初始化表唯一值结束:" + extractUnique.getExtractTargetTable() + ",分组: " + extractUnique.getCollectIdSet());
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
} finally {
|
dao.closeConnection();
|
for (String collect : collectIdSet) {
|
lock.unLock(extractUnique.getExtractTargetTable(), collect);
|
}
|
}
|
}
|
|
/**
|
* 删除表对应的唯一值
|
*
|
* @param tableName 表名
|
* @param tableUniqueByCollectId 表对应的唯一值
|
*/
|
@Deprecated
|
public static void removeTableUniqueValue(String tableName, Map<String, Set<String>> tableUniqueByCollectId) {
|
if (StringUtils.isEmpty(tableName)) {
|
return;
|
}
|
tableName = tableName.toUpperCase();
|
|
String[] lockKey = {tableName, null};
|
if (tableUniqueByCollectId != null && tableUniqueByCollectId.size() > 0) {
|
for (Map.Entry<String, Set<String>> v : tableUniqueByCollectId.entrySet()) {
|
Set<String> values = v.getValue();
|
String collectId = v.getKey();
|
lockKey[1] = collectId;
|
lock.lock(lockKey);
|
try {
|
String loadKey = UNIQUE_FIX_KEY + "LOADING-" + tableName + ":" + collectId;
|
if (!"1".equals((RedisUtil.get(loadKey)))) {
|
//表+采集id对应的唯一值集合还未初始化
|
continue;
|
}
|
RedisUtil.delSetAimValue(UNIQUE_FIX_KEY + tableName + ":" + collectId, values.toArray(new String[0]));
|
} finally {
|
lock.unLock(lockKey);
|
}
|
}
|
}
|
|
}
|
|
|
/**
|
* 添加表唯一值
|
*
|
* @param tableName
|
* @param uniqueValues
|
* @param collectId
|
*/
|
private void addTableUniqueValue(String tableName, String[] uniqueValues, String collectId) {
|
tableName = tableName.toUpperCase();
|
String[] lockKey = {tableName, collectId};
|
Optional<String> max = Arrays.stream(uniqueValues).max(Comparator.comparingInt(item -> Integer.parseInt(item)));
|
Integer maxValue = Integer.parseInt(max.get());
|
lock.lock(lockKey);
|
try {
|
String value = (String) RedisUtil.get(UNIQUE_FIX_KEY + tableName + ":" + collectId);
|
if (!StringUtils.isEmpty(value) && Integer.parseInt(value) < maxValue) {
|
RedisUtil.set(UNIQUE_FIX_KEY + tableName + ":" + collectId, String.valueOf(maxValue));
|
}
|
} finally {
|
lock.unLock(lockKey);
|
}
|
}
|
|
/**
|
* 验证表唯一值是否存在
|
*
|
* @param tableName
|
* @param value
|
* @param collectId
|
* @return
|
*/
|
private boolean isExists(String tableName, String value, String collectId) {
|
tableName = tableName.toUpperCase();
|
String[] lockKey = {tableName, collectId};
|
lock.lock(lockKey);
|
try {
|
String valueOld = (String) RedisUtil.get(UNIQUE_FIX_KEY + tableName + ":" + collectId);
|
|
if (StringUtils.isEmpty(valueOld)) {
|
return false;
|
} else if (!NumberUtil.isNumber(valueOld)) {
|
throw new BaseException(ErrorCode.UNIQUE_VALUE_NUMBER);
|
} else {
|
return Integer.parseInt(value) <= Integer.parseInt(valueOld);
|
}
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
lock.unLock(lockKey);
|
}
|
}
|
|
private CustomLock extractLock = new CustomLock();
|
|
|
/**
|
* 提取数据定时任务入口方法
|
*
|
* @param uuid 提取配置uuid
|
*/
|
public void startExtractData(String uuid) throws BaseException {
|
String[] lockKey = {"startExtractData", uuid};
|
if (!extractLock.tryLock(lockKey)) {
|
WriteExtractUtil.append("已有任务在运行跳过此次运行!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!:::" + uuid);
|
return;
|
}
|
WriteExtractUtil.append("开始提取!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!:::" + uuid);
|
TimeInterval tempTestTimer = DateUtil.timer();
|
WriteExtractUtil.append("提取-等待耗时:" + tempTestTimer.intervalMs() + ",:::" + uuid);
|
tempTestTimer = DateUtil.timer();
|
try {
|
//查询提取数据配置
|
FieldSetEntity fse = baseDao.getFieldSetEntity(CmnConst.DATA_EXTRACT_TABLE, uuid, true);
|
if (FieldSetEntity.isEmpty(fse) || DataTableEntity.isEmpty(fse.getSubDataTable(CmnConst.DATA_EXTRACT_SUB_TABLE))) {
|
writeLog(uuid, false, ErrorCode.GET_EXTRACT_DATA_CONFIG_FAIL.getText(), -1, null);
|
throw new BaseException(ErrorCode.GET_EXTRACT_DATA_CONFIG_FAIL);
|
}
|
DataTableEntity fieldMappingConfig = fse.getSubDataTable(CmnConst.DATA_EXTRACT_SUB_TABLE);
|
//目标表唯一标识字段
|
String extractUniqueField = fse.getString("extract_unique_field");
|
//来源唯一标识字段
|
String sourceUniqueField = null;
|
//自动增长字段
|
String autoIncrementField = fse.getString("auto_increment");
|
for (int i = 0; i < fieldMappingConfig.getRows(); i++) {
|
if (extractUniqueField.equals(fieldMappingConfig.getString(i, "target_table_field"))) {
|
sourceUniqueField = fieldMappingConfig.getString(i, "source_redis_field");
|
// break;
|
}
|
}
|
if (StringUtils.isEmpty(sourceUniqueField)) {
|
throw new BaseException(ErrorCode.GET_EXTRACT_UNIQUE_FIELD_FAIL);
|
}
|
|
//redis 主
|
String extractSource = fse.getString("extract_source");
|
//redis 从
|
// String extractSourceBak = fse.getString("bak_redis_source");
|
//目标源配置uuid
|
String extractTargetSource = fse.getString("extract_target_source");
|
//redis数据源
|
// DataBaseEntity redisDataBase = new DataBaseEntity(extractSourceBak);
|
//目标数据源
|
DataBaseEntity targetDataBase = new DataBaseEntity(extractTargetSource);
|
//集群模式
|
DataBaseEntity redisDataBase = new DataBaseEntity(extractSource);
|
RedisService readRedisService = new RedisService(redisDataBase);
|
RedisService writeRedisService = readRedisService;
|
// redisDataBase = new DataBaseEntity(extractSource);
|
// RedisService writeRedisService = new RedisService(redisDataBase);
|
String templatePattern = fse.getString("extract_prefix_key") + ":" + fse.getString("extract_source_table").toLowerCase() + ":";
|
String pattern = templatePattern + "*";
|
|
WriteExtractUtil.append("keyp-pattern:" + pattern);
|
String[] keys = readRedisService.getKeys(pattern);
|
if (ArrayUtil.isEmpty(keys)) {
|
writeLog(uuid, true, "", 0, null);
|
readRedisService.close();
|
WriteExtractUtil.append("DE-没有找到对应的redis数据," + pattern);
|
return;
|
}
|
Dao targetDao = targetDataBase.getDao();
|
//提取更新时间标识子弹
|
String extractUpdateTimeField = fse.getString("extract_update_time_field");
|
//采集来源id存储字段
|
String collectSourceField = fse.getString("collect_source_field");
|
//提取目标表表名
|
String extractTargetTable = fse.getString("extract_target_table");
|
// 提取时间字段
|
String extractTimeField = fse.getString(CmnConst.EXTRACT_TIME_FIELD);
|
Dao targetNewDao = null;
|
WriteExtractUtil.append("提取-准备耗时:" + tempTestTimer.intervalMs());
|
try {
|
for (String key : keys) {
|
List<String> hashFields = readRedisService.getHashFields(key);
|
hashFields = redisHashFields(hashFields);
|
if (hashFields != null && hashFields.size() > 0) {
|
//采集ID
|
String collectId = key.substring(key.lastIndexOf(":") + 1);
|
initTableUniqueValue(extractTargetTable, extractUniqueField, collectId, collectSourceField, targetDataBase.getUuid(), autoIncrementField);
|
for (int ij = 0; ij < hashFields.size(); ij++) {
|
String field = hashFields.get(ij);
|
tempTestTimer = DateUtil.timer();
|
Object value = readRedisService.getHash(key, field);
|
JournalEntity logFse = new JournalEntity();
|
WriteExtractUtil.append("提取-redis数据获取耗时:" + tempTestTimer.intervalMs());
|
logFse.setOther_info("提取-redis数据获取耗时:" + tempTestTimer.intervalMs());
|
if (field == null || value == null || ("impossible_" + FIX_KEY).equals(field)) {
|
continue;
|
}
|
tempTestTimer = DateUtil.timer();
|
String collectLogUid = field;
|
// FieldSetEntity collectInfo = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, collectLogUid, false);
|
|
int totalCount = -1;
|
try {
|
// DataTableEntity extractLogs = new DataTableEntity();
|
DataTableEntity data = (DataTableEntity) value;
|
totalCount = data.getRows();
|
DataTableEntity maybeUpdate = new DataTableEntity();
|
TimeInterval tempTestTimer1 = DateUtil.timer();
|
List<String> newAutoIncrementValues = new ArrayList<>();
|
//制令单特殊处理
|
updateMoBase(data, extractTargetTable, targetDao);
|
//字段映射处理
|
data = getFieldMappingDispose(collectId,
|
collectSourceField,
|
extractTargetTable,
|
extractTimeField,
|
data,
|
fieldMappingConfig,
|
extractUniqueField,
|
extractUpdateTimeField,
|
maybeUpdate,
|
autoIncrementField,
|
targetDataBase,
|
newAutoIncrementValues
|
);
|
WriteExtractUtil.append("提取-字段映射耗时:" + tempTestTimer1.intervalMs());
|
tempTestTimer1 = DateUtil.timer();
|
|
BatchResultEntity batchResultEntity = null;
|
if (!DataTableEntity.isEmpty(maybeUpdate)) {
|
//有需要查询确认更新的数据
|
StringBuilder filter = new StringBuilder(32);
|
filter.append(extractUpdateTimeField).append(" < ? ");
|
filter.append(" AND ").append(collectSourceField).append(" =? AND ");
|
filter.append(extractUniqueField).append("= ? ");
|
targetNewDao = targetNewDao != null ? targetNewDao : targetDataBase.newDao();
|
TimeInterval tempTestTimer2 = DateUtil.timer();
|
UpdateFilterEntity updateFilterEntity = new UpdateFilterEntity(autoIncrementField + "=?",
|
new String[]{autoIncrementField});
|
WriteExtractUtil.append("开始查询数据是否需要更新:" + maybeUpdate.getRows() + ",:" + extractTargetTable);
|
Map<JSONObject, JSONObject> exist = isExist(maybeUpdate, collectId, extractTargetTable, autoIncrementField,
|
collectSourceField, extractUniqueField, extractUpdateTimeField, targetNewDao);
|
WriteExtractUtil.append("查询数据是否需要更新结束共耗时:" + tempTestTimer2.intervalMs() + ",:" + extractTargetTable);
|
for (int i = 0; i < maybeUpdate.getRows(); i++) {
|
FieldSetEntity item = maybeUpdate.getFieldSetEntity(i);
|
String sourceInfo = item.getString(collectSourceField);
|
String preMasterKey = item.getString(extractUniqueField);
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put(collectSourceField, sourceInfo);
|
jsonObject.put(extractUniqueField, preMasterKey);
|
jsonObject = exist.get(jsonObject);
|
if ((jsonObject == null || jsonObject.isEmpty())) {
|
// WriteExtractUtil.append("将数据放入新增:" + item.getString(extractUniqueField) + "," + extractTargetTable);
|
// //判定为新增的数据
|
data.addFieldSetEntity(item);
|
newAutoIncrementValues.add(item.getString(extractUniqueField));
|
maybeUpdate.removeFieldSetEntity(i);
|
i--;
|
continue;
|
}
|
item.setValue(autoIncrementField, jsonObject.get(autoIncrementField));
|
//数据库更新时间
|
Date oldDate = jsonObject.getDate(extractUpdateTimeField);
|
//数据的更新时间
|
Date newDate = item.getDate(extractUpdateTimeField);
|
int compare = DateUtil.compare(oldDate, newDate);
|
if (compare >= 0) {
|
// extractLogs.addFieldSetEntity(getFieldSetEntity(item.getString(extractUniqueField), item.getTableName(), 2, item.getString(collectSourceField)));
|
//不需要更新
|
maybeUpdate.removeFieldSetEntity(i);
|
i--;
|
continue;
|
// WriteExtractUtil.append("跳过不需要更新的数据项,主键:" + item.getString(autoIncrementField) + ",数据库时间:" + DateUtil.format(oldDate, "yyyy-MM-dd HH:mm:ss") + ",数据库时间:" + DateUtil.format(newDate, "yyyy-MM-dd HH:mm:ss"));
|
}
|
// extractLogs.addFieldSetEntity(getFieldSetEntity(item.getString(extractUniqueField), item.getTableName(), 1, item.getString(collectSourceField)));
|
}
|
if (maybeUpdate.getRows() > 0) {
|
maybeUpdate.setMeta(maybeUpdate.getFieldSetEntity(0).getMeta());
|
}
|
WriteExtractUtil.append("更新提取过滤后数据:" + extractTargetTable + ",需要更新的条数:" + maybeUpdate.getRows());
|
|
maybeUpdate.getData().sort((o1, o2) -> DateUtil.compare(o1.getDate(extractUpdateTimeField), o2.getDate(extractUpdateTimeField)));
|
|
batchResultEntity = targetNewDao.updateBatch(maybeUpdate, updateFilterEntity, false);
|
|
WriteExtractUtil.append("更新提取过滤后数据:" + extractTargetTable + ",需要更新的条数:" + maybeUpdate.getRows() + ",耗时:" + tempTestTimer2.intervalMs());
|
targetNewDao.closeConnection();
|
}
|
|
if (!DataTableEntity.isEmpty(data)) {
|
tempTestTimer1 = DateUtil.timer();
|
targetDao.addBatch(data, autoIncrementField);
|
// data.getData().stream().forEach(item -> {
|
// extractLogs.addFieldSetEntity(getFieldSetEntity(item.getString(extractUniqueField), item.getTableName(), 0, item.getString(collectSourceField)));
|
// });
|
WriteExtractUtil.append("新增数据耗时:" + tempTestTimer1.intervalMs());
|
this.addTableUniqueValue(extractTargetTable, newAutoIncrementValues.stream().toArray(String[]::new), collectId);
|
}
|
// if (!DataTableEntity.isEmpty(extractLogs)) {
|
// baseDao.add(extractLogs);
|
// }
|
// writeLog(uuid, true, "", totalCount, collectLogUid);
|
WriteExtractUtil.append("提取-新增更新数据耗时:" + tempTestTimer1.intervalMs());
|
WriteExtractUtil.append("提取-写入数据库耗时:" + tempTestTimer.intervalMs());
|
writeLog(logFse, uuid, true, "", totalCount, collectLogUid);
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
writeLog(uuid, false, e, totalCount, collectLogUid);
|
} finally {
|
//redis数据源
|
writeRedisService.setHash(key, "impossible_" + FIX_KEY, new DataTableEntity());
|
writeRedisService.delHash(key, field);
|
}
|
}
|
}
|
}
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (targetNewDao != null) {
|
targetNewDao.closeConnection();
|
}
|
readRedisService.close();
|
writeRedisService.close();
|
targetDao.closeConnection();
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw e;
|
} finally {
|
extractLock.unLock(lockKey);
|
}
|
}
|
|
|
/**
|
* 制令单特殊处理 将找目标库中的工单表的id字段写入project_id
|
*
|
* @param dt
|
* @param targetTable
|
* @param targetDao
|
* @throws BaseException
|
*/
|
private void updateMoBase(DataTableEntity dt, String targetTable, Dao targetDao) throws BaseException {
|
if (DataTableEntity.isEmpty(dt) || !"T_PM_MO_BASE".equalsIgnoreCase(targetTable)) {
|
return;
|
}
|
Map<String, FieldSetEntity> map = new HashMap<>();
|
List<Object> moNumberList = new ArrayList<>();
|
for (int i = 0; i < dt.getRows(); i++) {
|
String moNumber = dt.getString(i, "mo_number");
|
if (StringUtils.isEmpty(moNumber)) {
|
continue;
|
}
|
moNumberList.add(moNumber);
|
map.put(moNumber, dt.getFieldSetEntity(i));
|
}
|
//查询制令单关联的工单数据
|
DataTableEntity projectBaseDt = targetDao.getList("T_PM_PROJECT_BASE", new String[]{"PROJECT_ID,PROJECT_BASE_ID"},
|
BaseUtil.buildQuestionMarkFilter("project_id", moNumberList.size(), true), moNumberList.toArray(), null, 1, dt.getRows());
|
if (!DataTableEntity.isEmpty(projectBaseDt)) {
|
for (int i = 0; i < projectBaseDt.getRows(); i++) {
|
FieldSetEntity fs = projectBaseDt.getFieldSetEntity(i);
|
String projectId = fs.getString("project_id");
|
FieldSetEntity fieldSetEntity = map.get(projectId);
|
if (!FieldSetEntity.isEmpty(fieldSetEntity)) {
|
//将工单表id写入制令单的project_id字段
|
fieldSetEntity.setValue("project_id", fs.getString("project_base_id"));
|
map.remove(projectId);
|
}
|
}
|
}
|
if (!map.isEmpty()) {
|
throw new BaseException(ErrorCode.MO_NUMBER_MASTER_PROJECT_BASE_EMPTY);
|
}
|
}
|
|
/**
|
* @param primaryValue 主键值
|
* @param tableName 表名
|
* @param operationType 操作类型 0 insert 1 update 2 数据已存在不需要更新
|
* @return
|
*/
|
private FieldSetEntity getFieldSetEntity(String primaryValue, String tableName, int operationType, String sourceInfo) {
|
FieldSetEntity fs = new FieldSetEntity();
|
fs.setTableName("product_mes_extract_log");
|
fs.setValue("primary_value", primaryValue);
|
fs.setValue("table_name", tableName);
|
fs.setValue("operation_type", operationType);
|
fs.setValue("source_info", sourceInfo);
|
fs.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
|
return fs;
|
}
|
|
/**
|
* 将采集uuid按采集表id排序返回
|
*
|
* @param fields
|
* @return
|
*/
|
private List<String> redisHashFields(List<String> fields) {
|
if (fields != null && !fields.isEmpty()) {
|
DataTableEntity dt = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG,
|
"type = 1 and " + BaseUtil.buildQuestionMarkFilter("uuid", fields.toArray(), true),
|
new Object[]{}, new Object[]{CmnConst.UUID}, "id");
|
if (!DataTableEntity.isEmpty(dt)) {
|
return dt.getData().stream().map(item -> item.getString(CmnConst.UUID)).collect(Collectors.toList());
|
}
|
}
|
|
return Collections.emptyList();
|
}
|
|
private void writeLog(String configUid, boolean isSuccess, String errorMsg, int count, String collectLogUid) {
|
JournalEntity fse = new JournalEntity();
|
fse.setType(2);
|
fse.setError(errorMsg);
|
fse.setCount(count);
|
fse.setResult(isSuccess ? 1 : 0);
|
fse.setConfigUuid(configUid);
|
fse.setPre_step_uuid(collectLogUid);
|
fse.setCreated_utc_datetime(new Date());
|
if (!StringUtils.isEmpty(errorMsg) || count > 0 || !isSuccess) {
|
FieldSetEntity fs = journalManagerService.autoCreateJournal(fse);
|
mesExternalService.remoteSaveExtractLog(fs);
|
}
|
}
|
|
private void writeLog(JournalEntity fse, String configUid, boolean isSuccess, String errorMsg, int count, String collectLogUid) {
|
fse.setType(2);
|
fse.setError(errorMsg);
|
fse.setCount(count);
|
fse.setResult(isSuccess ? 1 : 0);
|
fse.setConfigUuid(configUid);
|
fse.setPre_step_uuid(collectLogUid);
|
fse.setCreated_utc_datetime(new Date());
|
FieldSetEntity fs = journalManagerService.autoCreateJournal(fse);
|
mesExternalService.remoteSaveExtractLog(fs);
|
}
|
|
private void writeLog(String configUid, boolean isSuccess, Throwable throwable, int count, String collectLogUid) {
|
JournalEntity fse = new JournalEntity();
|
String errorMsg = journalManagerService.getStackTrace(throwable);
|
fse.setType(2);
|
fse.setError(errorMsg);
|
fse.setCount(count);
|
fse.setResult(isSuccess ? 1 : 0);
|
fse.setConfigUuid(configUid);
|
fse.setPre_step_uuid(collectLogUid);
|
fse.setCreated_utc_datetime(new Date());
|
FieldSetEntity fs = journalManagerService.autoCreateJournal(fse);
|
mesExternalService.remoteSaveExtractLog(fs);
|
|
}
|
|
/**
|
* @param dt
|
* @param collectId 采集id
|
* @param targetTable 目标表表名
|
* @param idField id字段名称
|
* @param sourceInfoField 采集来源字段名称
|
* @param preMasterKeyField 子库主键字段名称
|
* @param extractUpdateTimeField 修改时间字段
|
* @param dao
|
* @return
|
*/
|
private Map<JSONObject, JSONObject> isExist(DataTableEntity dt, String collectId, String targetTable, String idField, String sourceInfoField, String preMasterKeyField, String extractUpdateTimeField, Dao dao) {
|
List<FieldSetEntity> data = dt.getData();
|
String dataSystemName = Global.getSystemConfig("data.system.name", "");
|
if (StringUtils.isEmpty(dataSystemName)) {
|
//数据来源系统名称不能为空
|
throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY);
|
}
|
Map<String, List<String>> collect = data.stream().collect(Collectors.groupingBy(item -> item.getString(sourceInfoField), Collectors.mapping(item -> item.getString(preMasterKeyField), Collectors.toList())));
|
//子库采集数据的主键集合
|
// Set<String> collectData = data.stream().filter(item -> !dataSystemName.equals(item.getString(sourceInfoField))).map(item -> item.getString(preMasterKeyField)).collect(Collectors.toSet());
|
int i = 0;
|
List<String> params = new ArrayList<>();
|
StringBuilder sql = new StringBuilder();
|
for (Map.Entry<String, List<String>> v : collect.entrySet()) {
|
if (collectId.equals(v.getKey()) || dataSystemName.equals(v.getKey())) {
|
if (i > 0) {
|
sql.append(" UNION ALL ");
|
}
|
sql.append(" SELECT ").append(preMasterKeyField).append(",").append(idField).append(",").append(extractUpdateTimeField);
|
sql.append(" ,").append(sourceInfoField);
|
sql.append(" FROM ").append(targetTable);
|
sql.append(" WHERE ").append(sourceInfoField).append(" = ? and ").append(BaseUtil.buildQuestionMarkFilter(preMasterKeyField, v.getValue().size(), true));
|
params.add(v.getKey());
|
params.addAll(v.getValue());
|
}
|
i++;
|
}
|
DataTableEntity result = dao.getList(sql.toString(), params.toArray());
|
if (DataTableEntity.isEmpty(result)) {
|
return Collections.emptyMap();
|
}
|
Map<JSONObject, JSONObject> resultMap = new HashMap<>();
|
for (int j = 0; j < result.getRows(); j++) {
|
FieldSetEntity item = result.getFieldSetEntity(j);
|
JSONObject jsonObject = new JSONObject();
|
jsonObject.put(preMasterKeyField, item.getString(preMasterKeyField));
|
jsonObject.put(sourceInfoField, item.getString(sourceInfoField));
|
resultMap.put(jsonObject, new JSONObject((Map) item.getValues()));
|
}
|
return resultMap;
|
}
|
|
/**
|
* 字段映射
|
* *
|
*
|
* @param collectId 采集id
|
* @param collectSourceField 采集来源字段
|
* @param extractTargetTable 提取目标表
|
* @param data 要映射的元数据集
|
* @param fieldMappingConfig 映射关系
|
* @param extractUniqueField 目标表唯一标识字段
|
* @param updateTimeField 更新标识字段
|
* @param updateDt 可能需要更新的数据
|
* @return
|
*/
|
private DataTableEntity getFieldMappingDispose(String collectId,
|
String collectSourceField,
|
String extractTargetTable,
|
String extractTimeField,
|
DataTableEntity data,
|
DataTableEntity fieldMappingConfig,
|
String extractUniqueField,
|
String updateTimeField,
|
DataTableEntity updateDt,
|
String autoIncrementField,
|
DataBaseEntity targetDataBase,
|
List<String> newAutoIncrementValues) {
|
String dataSystemName = Global.getSystemConfig("data.system.name", "");
|
if (StringUtils.isEmpty(dataSystemName)) {
|
//数据来源系统名称不能为空
|
throw new BaseException(ErrorCode.DATA_SYSTEM_NAME_CAN_NOT_EMPTY);
|
}
|
if (!DataTableEntity.isEmpty(data)) {
|
//从数据库中查询有没有存在的主键值
|
DataTableEntity result = new DataTableEntity();
|
for (int i = 0; i < data.getRows(); i++) {
|
FieldSetEntity fse1 = data.getFieldSetEntity(i);
|
FieldSetEntity fseNew = new FieldSetEntity();
|
fseNew.setTableName(extractTargetTable);
|
//历史数据
|
String historyCollect = fse1.getString(collectSourceField);
|
for (int j = 0; j < fieldMappingConfig.getRows(); j++) {
|
String targetTableField = fieldMappingConfig.getString(j, "target_table_field");
|
String sourceRedisField = fieldMappingConfig.getString(j, "source_redis_field");
|
fseNew.setValue(targetTableField, fse1.getObject(sourceRedisField));
|
}
|
// 提取时间
|
fseNew.setValue(extractTimeField, new Date());
|
//数据系统名称与历史采集id不一致才复制当前采集的id 否则判断为历史系统数据将进行更新操作 2022年12月1日22:33:00 cheng
|
if (!dataSystemName.equals(historyCollect)) {
|
fseNew.setValue(collectSourceField, collectId);
|
} else {
|
fseNew.setValue(collectSourceField, historyCollect);
|
fseNew.setValue(extractUniqueField, fse1.getString(autoIncrementField));
|
}
|
if (StringUtils.isEmpty(fseNew.getString(extractUniqueField))) {
|
//唯一值为空
|
throw new BaseException(ErrorCode.EXTRACT_DATA_ROW_UNIQUE_VALUE_CAN_NOT_EMPTY);
|
}
|
if (StringUtils.isEmpty(updateTimeField)) {
|
throw new BaseException(new Exception("更新时间字段为空"));
|
}
|
//系统数据名称和采集来源字段的值相同 默认为更新
|
if (fseNew.getObject(updateTimeField) != null && (dataSystemName.equals(historyCollect) ||
|
// exist.contains(fseNew.getString(uniqueField))
|
isExists(extractTargetTable, fseNew.getString(extractUniqueField), collectId)
|
)) {
|
//更新字段不为空可能会有更新
|
fseNew.remove(autoIncrementField);
|
updateDt.addFieldSetEntity(fseNew);
|
} else {
|
//新增
|
result.addFieldSetEntity(fseNew);
|
newAutoIncrementValues.add(fseNew.getString(extractUniqueField));
|
}
|
}
|
return result;
|
|
}
|
return data;
|
}
|
}
|