package com.product.data.center.service;
|
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.date.TimeInterval;
|
import cn.hutool.core.util.IdUtil;
|
import com.alibaba.fastjson.JSONObject;
|
import com.google.common.collect.Lists;
|
import com.product.core.config.CoreConst;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.entity.BatchConfigEntity;
|
import com.product.data.center.entity.BatchExecuteEntity;
|
import com.product.data.center.entity.QueryDataConfigEntity;
|
import com.product.data.center.service.ide.IBatchService;
|
import com.product.data.center.utils.CallBackReturnValue;
|
import com.product.data.center.utils.SqlTransferUtil;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.datasource.service.RedisService;
|
import com.product.util.BaseUtil;
|
import com.product.util.CallBack;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.util.*;
|
|
/**
|
* Copyright © 6c
|
*
|
* @Date 2022年07月12日 16:00
|
* @Author 6c
|
* @Description 数据采集
|
*/
|
@Component
|
public class DataCollectService extends AbstractBaseService {
|
@Autowired
|
private BaseDao baseDao;
|
@Autowired
|
private IBatchService batchService;
|
@Autowired
|
private JournalManagerService journalManagerService;
|
|
@Resource
|
MesExternalService mesExternalService;
|
|
// 采集最大批次
|
public static final long COLLECT_MAX_BATCH_COUNT = 100L;
|
|
public static void main(String[] args) {
|
Calendar calendar = Calendar.getInstance();
|
int hour = calendar.get(Calendar.HOUR_OF_DAY);
|
int minute = calendar.get(Calendar.MINUTE);
|
if (hour == 1 && minute == 10) {
|
System.out.println("2222222");
|
}
|
}
|
|
/**
|
* 数据采集-定时任务执行方法
|
*
|
* @param uuid
|
*/
|
public void dataCollect(String uuid) {
|
|
synchronized (uuid.intern()) {
|
//凌晨1点10分开始进入此方法到1点20分不执行
|
Calendar startCalendar = Calendar.getInstance();
|
Date preExecuteTime = null;
|
Date curExecuteTime = startCalendar.getTime();
|
String curDateTimeStr = DateUtil.date().toString();
|
int minID = -1;
|
int maxID = -1;
|
FieldSetEntity dataCollectFse = null;
|
DataBaseEntity sourceDbe = null;
|
DataBaseEntity targetDbe = null;
|
try {
|
dataCollectFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_COLLECT, uuid, false);
|
if (FieldSetEntity.isEmpty(dataCollectFse)) {
|
throw new BaseException(ErrorCode.DATA_COLLECT_GET_CONFIG_FAIL);
|
}
|
FieldSetEntity targetDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataCollectFse.getString(CmnConst.TARGET_DATA_SOURCE), false);
|
targetDbe = new DataBaseEntity(targetDataSourceFse);
|
// 批次判定
|
String saveName = StringUtils.isEmpty(dataCollectFse.getString("target_table")) || StringUtils.isEmpty(dataCollectFse.getString("target_table").trim())
|
? dataCollectFse.getString(CmnConst.SOURCE_TABLE) : dataCollectFse.getString("target_table");
|
saveName = saveName.toLowerCase(Locale.ROOT);
|
String key = "DC:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID);
|
RedisService readRedis = targetDbe.getRedisService();
|
long curBatchCount = readRedis.getHashLength(key) == null ? 0L : readRedis.getHashLength(key);
|
if (curBatchCount > COLLECT_MAX_BATCH_COUNT) {
|
SpringMVCContextHolder.getSystemLogger().error(dataCollectFse.getString(CmnConst.SOURCE_TABLE) + "当前待采集批次超过" + COLLECT_MAX_BATCH_COUNT + ",跳过本次采集");
|
return;
|
}
|
|
String dataSourceUUID = dataCollectFse.getString(CmnConst.DATA_SOURCE);
|
StringBuilder sql = new StringBuilder(128);
|
sql.append("\nselect *");
|
sql.append("\nfrom product_sys_data_center_log");
|
sql.append("\nwhere (source_uuid='' or source_uuid is null) and config_uuid=? and type=1 and result=1");
|
sql.append("\norder by statistics_final_time desc");
|
sql.append("\nlimit 1");
|
FieldSetEntity logFse = baseDao.getFieldSetBySQL(sql.toString(), new Object[]{dataCollectFse.getUUID()}, false);
|
if (!FieldSetEntity.isEmpty(logFse)) {
|
preExecuteTime = logFse.getDate(CmnConst.STATISTICS_FINAL_TIME);
|
}
|
String collectTableName = dataCollectFse.getString(CmnConst.SOURCE_TABLE);
|
StringBuilder timeFilter = new StringBuilder(64);
|
FieldSetEntity dataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataSourceUUID, false);
|
int dbType = dataSourceFse.getInteger(CmnConst.DB_TYPE);
|
List<String> paramSqlFilterList = Lists.newArrayList();
|
if (preExecuteTime != null && !StringUtils.isEmpty(dataCollectFse.getString(CmnConst.TIME_FIELD))) {
|
// 数据库中存在的最大统计时间-10min
|
timeFilter.append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append(">=").append(SqlTransferUtil.addDate(dbType, SqlTransferUtil.str2Date(dbType, preExecuteTime), -10, SqlTransferUtil.MINUTE));
|
paramSqlFilterList.add(timeFilter.toString());
|
}
|
sourceDbe = new DataBaseEntity(dataSourceFse);
|
String standardFilter = dataCollectFse.getString(CmnConst.FILTER);
|
if (!StringUtils.isEmpty(standardFilter) && !StringUtils.isEmpty(standardFilter.trim())) {
|
paramSqlFilterList.add(standardFilter);
|
}
|
// 需要优先查询maxID
|
sql.setLength(0);
|
sql.append("select max(").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(") maxID,min(").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(") min_id from ").append(collectTableName);
|
if (!StringUtils.isEmpty(dataCollectFse.getString(CmnConst.TIME_FIELD))) {
|
// 时间向未来移动2min,即查询未来2min内产生的数据都会被查询到
|
paramSqlFilterList.add(new StringBuilder().append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append("<=").append(SqlTransferUtil.addDate(dbType, SqlTransferUtil.str2Date(dbType, curDateTimeStr), 2, SqlTransferUtil.MINUTE)).toString());
|
}
|
String filter = toFilter(paramSqlFilterList);
|
if (!StringUtils.isEmpty(filter)) {
|
sql.append(" where ").append(filter);
|
}
|
FieldSetEntity paramFse = sourceDbe.getDao().getOne(sql.toString(), new Object[]{});
|
if (!StringUtils.isEmpty(paramFse.getString("maxid"))) {
|
maxID = paramFse.getInteger("maxid");
|
minID = paramFse.getInteger("min_id");
|
if (!StringUtils.isEmpty(filter)) {
|
filter = new StringBuilder(128).append("(").append(filter).append(")")
|
.append(" and ").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append("<=").append(maxID)
|
.append(" and ").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(">=").append(minID)
|
.toString();
|
} else {
|
filter = new StringBuilder(128)
|
.append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append("<=").append(maxID)
|
.append(" and ").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(">=").append(minID)
|
.toString();
|
}
|
}
|
|
QueryDataConfigEntity queryDataConfigEntity = new QueryDataConfigEntity(collectTableName, filter, dataCollectFse.getString(CmnConst.AUTO_FIELD));
|
queryDataConfigEntity.addQueryField(dataCollectFse.getString(CmnConst.COLLECT_FIELDS).split(","));
|
|
BatchConfigEntity batchConfigEntity = new BatchConfigEntity(
|
sourceDbe,
|
targetDbe,
|
queryDataConfigEntity,
|
dataDeal(),
|
errorDeal(null, dataCollectFse, startCalendar),
|
false);
|
|
batchService.executeBatch(batchConfigEntity, dataInsert(dataCollectFse, null, false));
|
} catch (Exception e) {
|
// 写日志
|
FieldSetEntity dataCenterLogFse = new FieldSetEntity();
|
dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
|
dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
|
dataCenterLogFse.setValue(CmnConst.TYPE, 1);
|
dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DETAIL));
|
dataCenterLogFse.setValue(CmnConst.RESULT, 0);
|
dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
|
dataCenterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e));
|
dataCenterLogFse.setValue(CmnConst.DATA_SOURCE, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DATA_SOURCE));
|
dataCenterLogFse.setValue(CmnConst.MIN_ID, minID);
|
dataCenterLogFse.setValue(CmnConst.MAX_ID, maxID);
|
Calendar finalCalendar = Calendar.getInstance();
|
dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, finalCalendar.getTimeInMillis() - startCalendar.getTimeInMillis());
|
dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, uuid);
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, preExecuteTime);
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, curExecuteTime);
|
baseDao.saveFieldSetEntity(dataCenterLogFse);
|
mesExternalService.remoteSaveCollectLog(dataCenterLogFse);
|
} finally {
|
if (sourceDbe != null && sourceDbe.getDao() != null) {
|
sourceDbe.getDao().closeConnection();
|
}
|
if (targetDbe != null && targetDbe.getRedisService() != null) {
|
targetDbe.getRedisService().close();
|
}
|
}
|
}
|
}
|
|
/**
|
* 集合转化为过滤条件
|
*
|
* @param paramSqlFilterCollection
|
* @return
|
*/
|
private String toFilter(Collection<String> paramSqlFilterCollection) {
|
StringBuilder filterSb = new StringBuilder(128);
|
paramSqlFilterCollection.forEach(singleFilter -> {
|
if (filterSb.length() > 0) {
|
filterSb.append(" and ");
|
}
|
if (!StringUtils.isEmpty(singleFilter) && !StringUtils.isEmpty(singleFilter.trim())) {
|
filterSb.append("(").append(singleFilter).append(")");
|
}
|
});
|
return filterSb.toString();
|
}
|
|
public FieldSetEntity reDeal(String uuid) {
|
return reDeal(uuid, null);
|
}
|
|
/**
|
* 重新处理(根据报错的日志信息重新拉取数据)
|
*
|
* @param uuid
|
*/
|
public FieldSetEntity reDeal(String uuid, FieldSetEntity dataExtractLogFse) {
|
Calendar startCalendar = Calendar.getInstance();
|
FieldSetEntity sourceLogFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, uuid, false);
|
if (FieldSetEntity.isEmpty(sourceLogFse)) {
|
throw new BaseException(ErrorCode.DATA_CENTER_LOG_GET_FAIL);
|
}
|
String minID = sourceLogFse.getString(CmnConst.MIN_ID);
|
String maxID = sourceLogFse.getString(CmnConst.MAX_ID);
|
Date statisticsStartTime = sourceLogFse.getDate(CmnConst.STATISTICS_START_TIME);
|
Date statisticsFinalTime = sourceLogFse.getDate(CmnConst.STATISTICS_FINAL_TIME);
|
FieldSetEntity dataCollectFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_COLLECT, sourceLogFse.getString(CmnConst.CONFIG_UUID), false);
|
DataBaseEntity sourceDbe = null;
|
DataBaseEntity targetDbe = null;
|
try {
|
String dataSourceUUID = dataCollectFse.getString(CmnConst.DATA_SOURCE);
|
StringBuilder autoFilter = new StringBuilder(64);
|
StringBuilder timeFilter = new StringBuilder(64);
|
if (!StringUtils.isEmpty(minID)) {
|
autoFilter.append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(">=").append(minID);
|
}
|
if (autoFilter.length() > 0) {
|
autoFilter.append(" and ");
|
}
|
if (!StringUtils.isEmpty(maxID)) {
|
autoFilter.append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append("<=").append(maxID);
|
}
|
FieldSetEntity dataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataSourceUUID, false);
|
int dbType = dataSourceFse.getInteger(CmnConst.DB_TYPE);
|
if (statisticsStartTime != null && !StringUtils.isEmpty(dataCollectFse.getString(CmnConst.TIME_FIELD))) {
|
timeFilter.append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append(">=").append(SqlTransferUtil.str2Date(dbType, statisticsStartTime));
|
}
|
if (timeFilter.length() > 0) {
|
timeFilter.append(" and ");
|
}
|
if (statisticsFinalTime != null) {
|
timeFilter.append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append("<=").append(SqlTransferUtil.str2Date(dbType, statisticsFinalTime));
|
}
|
|
sourceDbe = new DataBaseEntity(dataSourceFse);
|
String collectTableName = dataCollectFse.getString(CmnConst.SOURCE_TABLE);
|
QueryDataConfigEntity queryDataConfigEntity = new QueryDataConfigEntity(collectTableName, joinFilter(autoFilter, timeFilter, false), dataCollectFse.getString(CmnConst.AUTO_FIELD));
|
queryDataConfigEntity.addQueryField(dataCollectFse.getString(CmnConst.COLLECT_FIELDS).split(","));
|
FieldSetEntity targetDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataCollectFse.getString(CmnConst.TARGET_DATA_SOURCE), false);
|
targetDbe = new DataBaseEntity(targetDataSourceFse);
|
BatchConfigEntity batchConfigEntity = new BatchConfigEntity(
|
sourceDbe,
|
targetDbe,
|
queryDataConfigEntity,
|
dataDeal(),
|
errorDeal(sourceLogFse, dataCollectFse, startCalendar),
|
false);
|
|
if (dataExtractLogFse == null) {
|
batchService.executeBatch(batchConfigEntity, dataInsert(dataCollectFse, uuid, true));
|
|
// 回写日志
|
journalManagerService.writeBackReDealResult(sourceLogFse, true);
|
return sourceLogFse;
|
} else {
|
batchService.executeBatch(batchConfigEntity, dataInsert(dataCollectFse, dataExtractLogFse.getUUID(), true));
|
|
// 回写日志
|
journalManagerService.writeBackReDealResult(dataExtractLogFse, true);
|
return dataExtractLogFse;
|
}
|
} catch (Exception e) {
|
// 写日志
|
FieldSetEntity dataCenterLogFse = new FieldSetEntity();
|
dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
|
dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
|
dataCenterLogFse.setValue(CmnConst.TYPE, 1);
|
dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DETAIL));
|
dataCenterLogFse.setValue(CmnConst.RESULT, 0);
|
dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
|
dataCenterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e));
|
dataCenterLogFse.setValue(CmnConst.DATA_SOURCE, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DATA_SOURCE));
|
dataCenterLogFse.setValue(CmnConst.SOURCE_UUID, uuid);
|
dataCenterLogFse.setValue(CmnConst.MIN_ID, minID);
|
dataCenterLogFse.setValue(CmnConst.MAX_ID, maxID);
|
Calendar finalCalendar = Calendar.getInstance();
|
dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, finalCalendar.getTimeInMillis() - startCalendar.getTimeInMillis());
|
dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, sourceLogFse.getString(CmnConst.CONFIG_UUID));
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime);
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime);
|
baseDao.saveFieldSetEntity(dataCenterLogFse);
|
|
|
mesExternalService.remoteSaveCollectLog(dataCenterLogFse);
|
if (dataExtractLogFse == null) {
|
// 回写日志
|
journalManagerService.writeBackReDealResult(sourceLogFse, false);
|
} else {
|
// 回写日志
|
journalManagerService.writeBackReDealResult(dataExtractLogFse, false);
|
}
|
} finally {
|
if (sourceDbe != null && sourceDbe.getDao() != null) {
|
sourceDbe.getDao().closeConnection();
|
}
|
if (targetDbe != null && targetDbe.getRedisService() != null) {
|
targetDbe.getRedisService().close();
|
}
|
}
|
return null;
|
}
|
|
/**
|
* 数据处理(数据采集部分没有特殊操作,原样返回)
|
*
|
* @return
|
*/
|
public CallBackReturnValue<DataTableEntity, DataTableEntity> dataDeal() {
|
return dte -> dte;
|
}
|
|
/**
|
* 数据插入到redis
|
*
|
* @param dataCollectFse
|
* @return
|
*/
|
public IBatchService.QueryDataAfterProcessing dataInsert(FieldSetEntity dataCollectFse, String sourceUUID, boolean reDealFlag) {
|
return (dt, minId, maxId, sourceDbe, targetDbe, c1) -> {
|
RedisService redisService = targetDbe.getRedisService();
|
StringBuilder errorInfo = new StringBuilder(512);
|
|
// 清理redis中已经存在且更新时间不大于当前redis中存储的值
|
TimeInterval timer = DateUtil.timer();
|
StringBuilder logUseTimeInfo = new StringBuilder(128);
|
String saveName = StringUtils.isEmpty(dataCollectFse.getString("target_table")) || StringUtils.isEmpty(dataCollectFse.getString("target_table").trim())
|
? dataCollectFse.getString(CmnConst.SOURCE_TABLE) : dataCollectFse.getString("target_table");
|
saveName = saveName.toLowerCase(Locale.ROOT);
|
List<String> curAddKeyList = Lists.newArrayList();
|
RedisService readRedis = new RedisService();
|
if (!reDealFlag) {
|
FieldSetEntity tempFse;
|
String keyPrefix = "DC_STORE:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID) + ":";
|
String field;
|
Object value;
|
Date date;
|
boolean serializeFlag = true;
|
int outTime = (60 * 60) / 2;
|
for (int i = dt.getRows() - 1; i >= 0; i--) {
|
try {
|
tempFse = dt.getFieldSetEntity(i);
|
date = tempFse.getDate(dataCollectFse.getString(CmnConst.TIME_FIELD));
|
field = tempFse.getString(dataCollectFse.getString(CmnConst.AUTO_FIELD));
|
value = readRedis.get(keyPrefix + field, serializeFlag);
|
if (value != null && !StringUtils.isEmpty(value.toString())) {
|
if (((Date) value).compareTo(date) < 0) {
|
readRedis.set(keyPrefix + field, date, serializeFlag);
|
readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
|
curAddKeyList.add(keyPrefix + field);
|
} else {
|
dt.removeFieldSetEntity(i);
|
readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
|
}
|
} else {
|
readRedis.set(keyPrefix + field, date, serializeFlag);
|
readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag);
|
curAddKeyList.add(keyPrefix + field);
|
}
|
} catch (Exception e) {
|
if (errorInfo.length() < 2048) {
|
errorInfo.append(journalManagerService.getStackTrace(e));
|
} else {
|
errorInfo.append("\n...");
|
}
|
}
|
}
|
if (dt.getRows() == 0) {
|
return;
|
}
|
logUseTimeInfo.append("采集-redis数据清理耗时:" + timer.intervalMs() + ";");
|
}
|
|
JSONObject paramObj = journalManagerService.getRangeInfo(dt, dataCollectFse.getString(CmnConst.TIME_FIELD));
|
StringBuilder filter = new StringBuilder(32);
|
filter.append("min_id=? and max_id=?");
|
if (!StringUtils.isEmpty(paramObj.getString(CmnConst.STATISTICS_FINAL_TIME))) {
|
filter.append(" and statistics_start_time=").append(SqlTransferUtil.str2Date(1, paramObj.get(CmnConst.STATISTICS_FINAL_TIME)))
|
.append(" and statistics_final_time=").append(SqlTransferUtil.str2Date(1, paramObj.get(CmnConst.STATISTICS_FINAL_TIME)));
|
}
|
DataTableEntity dte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, filter.toString(), new Object[]{minId, maxId});
|
if (!DataTableEntity.isEmpty(dte) && dte.getRows() > 0) {
|
return;
|
}
|
|
// 写日志
|
FieldSetEntity dataCenterLogFse = new FieldSetEntity();
|
dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
|
dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
|
dataCenterLogFse.setValue(CmnConst.MIN_ID, minId);
|
dataCenterLogFse.setValue(CmnConst.MAX_ID, maxId);
|
dataCenterLogFse.setValue(CmnConst.ERROR, errorInfo.toString());
|
dataCenterLogFse.setValue(CmnConst.COUNT, dt.getRows());
|
dataCenterLogFse.setValue(CmnConst.TYPE, 1);
|
dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse.getString(CmnConst.DETAIL));
|
dataCenterLogFse.setValue(CmnConst.RESULT, 1);
|
dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
|
dataCenterLogFse.setValue(CmnConst.SOURCE_UUID, sourceUUID);
|
dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, dataCollectFse.getUUID());
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, paramObj.get(CmnConst.STATISTICS_START_TIME));
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, paramObj.get(CmnConst.STATISTICS_FINAL_TIME));
|
String uuid = IdUtil.randomUUID();
|
dataCenterLogFse.setValue(CmnConst.UUID, uuid);
|
|
timer = DateUtil.timer();
|
redisService.setHash("DC:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID), dataCenterLogFse.getUUID(), dt);
|
|
Calendar c2 = Calendar.getInstance();
|
|
dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, c2.getTimeInMillis() - c1.getTimeInMillis());
|
logUseTimeInfo.append("采集-redis数据存放耗时:" + timer.intervalMs());
|
dataCenterLogFse.setValue(CmnConst.OTHER_INFO, logUseTimeInfo.toString());
|
dataCenterLogFse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add");
|
try {
|
mesExternalService.remoteSaveCollectLog(dataCenterLogFse);
|
baseDao.add(dataCenterLogFse);
|
} catch (Exception e) {
|
redisService.del("DC:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID), dataCenterLogFse.getUUID());
|
readRedis.del(true, curAddKeyList.toArray(new String[]{}));
|
throw e;
|
}
|
|
// 标记当前时间之前非提取失败导致的采集失败日志为成功
|
if (!reDealFlag) {
|
FieldSetEntity curLogFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, dataCenterLogFse.getUUID(), false);
|
DataTableEntity waitUpdateDte = baseDao.listTable(
|
CmnConst.PRODUCT_SYS_DATA_CENTER_LOG,
|
"id<=? and created_utc_datetime<=now() and config_uuid=? and result=0 and deal_flag=0 and error not like '采集成功,未能在%'",
|
new Object[]{curLogFse.getInteger(CmnConst.ID), curLogFse.getString(CmnConst.CONFIG_UUID)});
|
if (!DataTableEntity.isEmpty(waitUpdateDte)) {
|
StringBuilder sql = new StringBuilder(128);
|
sql.append("update product_sys_data_center_log set deal_flag=1,deal_result=1 where ").append(BaseUtil.buildQuestionMarkFilter(CmnConst.ID, waitUpdateDte.getRows(), true));
|
baseDao.executeUpdate(sql.toString(), waitUpdateDte.getFieldAllValues(CmnConst.ID));
|
}
|
}
|
};
|
}
|
|
/**
|
* 异常处理
|
*
|
* @param sourceLogFse
|
* @param dataCollectFse
|
* @param startCalendar
|
* @return
|
*/
|
public CallBack<BatchExecuteEntity> errorDeal(FieldSetEntity sourceLogFse, FieldSetEntity dataCollectFse, Calendar startCalendar) {
|
return (BatchExecuteEntity... batchExecuteEntities) -> {
|
// 写日志
|
BatchExecuteEntity batchExecuteEntity = batchExecuteEntities[0];
|
FieldSetEntity dataCenterLogFse = new FieldSetEntity();
|
dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG);
|
dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date());
|
dataCenterLogFse.setValue(CmnConst.TYPE, 1);
|
dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DETAIL));
|
dataCenterLogFse.setValue(CmnConst.RESULT, 0);
|
dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0);
|
dataCenterLogFse.setValue(CmnConst.ERROR, batchExecuteEntity.getErrorMessage());
|
dataCenterLogFse.setValue(CmnConst.DATA_SOURCE, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DATA_SOURCE));
|
dataCenterLogFse.setValue(CmnConst.MIN_ID, batchExecuteEntity.getMinId());
|
dataCenterLogFse.setValue(CmnConst.MAX_ID, batchExecuteEntity.getMaxId());
|
Calendar finalCalendar = Calendar.getInstance();
|
dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, finalCalendar.getTimeInMillis() - startCalendar.getTimeInMillis());
|
if (sourceLogFse != null) {
|
dataCenterLogFse.setValue(CmnConst.SOURCE_UUID, sourceLogFse.getUUID());
|
dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, sourceLogFse.getString(CmnConst.CONFIG_UUID));
|
} else {
|
dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, dataCollectFse == null ? null : dataCollectFse.getUUID());
|
}
|
|
DataTableEntity dte = batchExecuteEntity.getDte();
|
JSONObject paramObj = journalManagerService.getRangeInfo(dte, dataCollectFse.getString(CmnConst.TIME_FIELD));
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, paramObj.get(CmnConst.STATISTICS_START_TIME));
|
dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, paramObj.get(CmnConst.STATISTICS_FINAL_TIME));
|
baseDao.saveFieldSetEntity(dataCenterLogFse);
|
};
|
}
|
|
/**
|
* 拼接过滤条件
|
*
|
* @param autoFilter
|
* @param timeFilter
|
* @return
|
*/
|
private String joinFilter(StringBuilder autoFilter, StringBuilder timeFilter, boolean isOr) {
|
StringBuilder filterSb = new StringBuilder(128);
|
if (autoFilter.length() > 0) {
|
filterSb.append("(").append(autoFilter).append(")");
|
}
|
if (timeFilter.length() > 0) {
|
if (filterSb.length() > 0) {
|
filterSb.append(" ").append(isOr ? "or" : "and").append(" ");
|
}
|
filterSb.append("(").append(timeFilter).append(")");
|
}
|
return filterSb.toString();
|
}
|
}
|