package com.product.data.center.service; import cn.hutool.core.date.DateUtil; import cn.hutool.core.date.TimeInterval; import cn.hutool.core.util.IdUtil; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Lists; import com.product.core.config.CoreConst; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.entity.BatchConfigEntity; import com.product.data.center.entity.BatchExecuteEntity; import com.product.data.center.entity.QueryDataConfigEntity; import com.product.data.center.service.ide.IBatchService; import com.product.data.center.utils.CallBackReturnValue; import com.product.data.center.utils.SqlTransferUtil; import com.product.datasource.entity.DataBaseEntity; import com.product.datasource.service.RedisService; import com.product.util.BaseUtil; import com.product.util.CallBack; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.*; /** * Copyright © 6c * * @Date 2022年07月12日 16:00 * @Author 6c * @Description 数据采集 */ @Component public class DataCollectService extends AbstractBaseService { @Autowired private BaseDao baseDao; @Autowired private IBatchService batchService; @Autowired private JournalManagerService journalManagerService; @Resource MesExternalService mesExternalService; // 采集最大批次 public static final long COLLECT_MAX_BATCH_COUNT = 100L; public static void main(String[] args) { Calendar calendar = Calendar.getInstance(); int hour = calendar.get(Calendar.HOUR_OF_DAY); int minute = calendar.get(Calendar.MINUTE); if (hour == 1 && minute == 10) { System.out.println("2222222"); } } /** * 数据采集-定时任务执行方法 * * @param uuid */ public void dataCollect(String uuid) { synchronized (uuid.intern()) { //凌晨1点10分开始进入此方法的休眠到1点20分 Calendar calendar = Calendar.getInstance(); int hour = calendar.get(Calendar.HOUR_OF_DAY); int minute = calendar.get(Calendar.MINUTE); if (hour == 1 && minute == 10) { try { Thread.sleep(600000); } catch (InterruptedException e) { e.printStackTrace(); } } Calendar startCalendar = Calendar.getInstance(); Date preExecuteTime = null; Date curExecuteTime = startCalendar.getTime(); String curDateTimeStr = DateUtil.date().toString(); int minID = -1; int maxID = -1; FieldSetEntity dataCollectFse = null; DataBaseEntity sourceDbe = null; DataBaseEntity targetDbe = null; try { dataCollectFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_COLLECT, uuid, false); if (FieldSetEntity.isEmpty(dataCollectFse)) { throw new BaseException(ErrorCode.DATA_COLLECT_GET_CONFIG_FAIL); } FieldSetEntity targetDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataCollectFse.getString(CmnConst.TARGET_DATA_SOURCE), false); targetDbe = new DataBaseEntity(targetDataSourceFse); // 批次判定 String saveName = StringUtils.isEmpty(dataCollectFse.getString("target_table")) || StringUtils.isEmpty(dataCollectFse.getString("target_table").trim()) ? dataCollectFse.getString(CmnConst.SOURCE_TABLE) : dataCollectFse.getString("target_table"); saveName = saveName.toLowerCase(Locale.ROOT); String key = "DC:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID); RedisService readRedis = targetDbe.getRedisService(); long curBatchCount = readRedis.getHashLength(key) == null ? 0L : readRedis.getHashLength(key); if (curBatchCount > COLLECT_MAX_BATCH_COUNT) { SpringMVCContextHolder.getSystemLogger().error(dataCollectFse.getString(CmnConst.SOURCE_TABLE) + "当前待采集批次超过" + COLLECT_MAX_BATCH_COUNT + ",跳过本次采集"); return; } String dataSourceUUID = dataCollectFse.getString(CmnConst.DATA_SOURCE); StringBuilder sql = new StringBuilder(128); sql.append("\nselect *"); sql.append("\nfrom product_sys_data_center_log"); sql.append("\nwhere (source_uuid='' or source_uuid is null) and config_uuid=? and type=1 and result=1"); sql.append("\norder by statistics_final_time desc"); sql.append("\nlimit 1"); FieldSetEntity logFse = baseDao.getFieldSetBySQL(sql.toString(), new Object[]{dataCollectFse.getUUID()}, false); if (!FieldSetEntity.isEmpty(logFse)) { preExecuteTime = logFse.getDate(CmnConst.STATISTICS_FINAL_TIME); } String collectTableName = dataCollectFse.getString(CmnConst.SOURCE_TABLE); StringBuilder timeFilter = new StringBuilder(64); FieldSetEntity dataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataSourceUUID, false); int dbType = dataSourceFse.getInteger(CmnConst.DB_TYPE); List paramSqlFilterList = Lists.newArrayList(); if (preExecuteTime != null && !StringUtils.isEmpty(dataCollectFse.getString(CmnConst.TIME_FIELD))) { // 数据库中存在的最大统计时间-10min timeFilter.append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append(">=").append(SqlTransferUtil.addDate(dbType, SqlTransferUtil.str2Date(dbType, preExecuteTime), -10, SqlTransferUtil.MINUTE)); paramSqlFilterList.add(timeFilter.toString()); } sourceDbe = new DataBaseEntity(dataSourceFse); String standardFilter = dataCollectFse.getString(CmnConst.FILTER); if (!StringUtils.isEmpty(standardFilter) && !StringUtils.isEmpty(standardFilter.trim())) { paramSqlFilterList.add(standardFilter); } // 需要优先查询maxID sql.setLength(0); sql.append("select max(").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(") maxID,min(").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(") min_id from ").append(collectTableName); if (!StringUtils.isEmpty(dataCollectFse.getString(CmnConst.TIME_FIELD))) { // 时间向未来移动2min,即查询未来2min内产生的数据都会被查询到 paramSqlFilterList.add(new StringBuilder().append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append("<=").append(SqlTransferUtil.addDate(dbType, SqlTransferUtil.str2Date(dbType, curDateTimeStr), 2, SqlTransferUtil.MINUTE)).toString()); } String filter = toFilter(paramSqlFilterList); if (!StringUtils.isEmpty(filter)) { sql.append(" where ").append(filter); } FieldSetEntity paramFse = sourceDbe.getDao().getOne(sql.toString(), new Object[]{}); if (!StringUtils.isEmpty(paramFse.getString("maxid"))) { maxID = paramFse.getInteger("maxid"); minID = paramFse.getInteger("min_id"); if (!StringUtils.isEmpty(filter)) { filter = new StringBuilder(128).append("(").append(filter).append(")") .append(" and ").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append("<=").append(maxID) .append(" and ").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(">=").append(minID) .toString(); } else { filter = new StringBuilder(128) .append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append("<=").append(maxID) .append(" and ").append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(">=").append(minID) .toString(); } } QueryDataConfigEntity queryDataConfigEntity = new QueryDataConfigEntity(collectTableName, filter, dataCollectFse.getString(CmnConst.AUTO_FIELD)); queryDataConfigEntity.addQueryField(dataCollectFse.getString(CmnConst.COLLECT_FIELDS).split(",")); BatchConfigEntity batchConfigEntity = new BatchConfigEntity( sourceDbe, targetDbe, queryDataConfigEntity, dataDeal(), errorDeal(null, dataCollectFse, startCalendar), false); batchService.executeBatch(batchConfigEntity, dataInsert(dataCollectFse, null, false)); } catch (Exception e) { // 写日志 FieldSetEntity dataCenterLogFse = new FieldSetEntity(); dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); dataCenterLogFse.setValue(CmnConst.TYPE, 1); dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DETAIL)); dataCenterLogFse.setValue(CmnConst.RESULT, 0); dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0); dataCenterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e)); dataCenterLogFse.setValue(CmnConst.DATA_SOURCE, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DATA_SOURCE)); dataCenterLogFse.setValue(CmnConst.MIN_ID, minID); dataCenterLogFse.setValue(CmnConst.MAX_ID, maxID); Calendar finalCalendar = Calendar.getInstance(); dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, finalCalendar.getTimeInMillis() - startCalendar.getTimeInMillis()); dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, uuid); dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, preExecuteTime); dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, curExecuteTime); baseDao.saveFieldSetEntity(dataCenterLogFse); mesExternalService.remoteSaveCollectLog(dataCenterLogFse); } finally { if (sourceDbe != null && sourceDbe.getDao() != null) { sourceDbe.getDao().closeConnection(); } if (targetDbe != null && targetDbe.getRedisService() != null) { targetDbe.getRedisService().close(); } } } } /** * 集合转化为过滤条件 * * @param paramSqlFilterCollection * @return */ private String toFilter(Collection paramSqlFilterCollection) { StringBuilder filterSb = new StringBuilder(128); paramSqlFilterCollection.forEach(singleFilter -> { if (filterSb.length() > 0) { filterSb.append(" and "); } if (!StringUtils.isEmpty(singleFilter) && !StringUtils.isEmpty(singleFilter.trim())) { filterSb.append("(").append(singleFilter).append(")"); } }); return filterSb.toString(); } public FieldSetEntity reDeal(String uuid) { return reDeal(uuid, null); } /** * 重新处理(根据报错的日志信息重新拉取数据) * * @param uuid */ public FieldSetEntity reDeal(String uuid, FieldSetEntity dataExtractLogFse) { Calendar startCalendar = Calendar.getInstance(); FieldSetEntity sourceLogFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, uuid, false); if (FieldSetEntity.isEmpty(sourceLogFse)) { throw new BaseException(ErrorCode.DATA_CENTER_LOG_GET_FAIL); } String minID = sourceLogFse.getString(CmnConst.MIN_ID); String maxID = sourceLogFse.getString(CmnConst.MAX_ID); Date statisticsStartTime = sourceLogFse.getDate(CmnConst.STATISTICS_START_TIME); Date statisticsFinalTime = sourceLogFse.getDate(CmnConst.STATISTICS_FINAL_TIME); FieldSetEntity dataCollectFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_COLLECT, sourceLogFse.getString(CmnConst.CONFIG_UUID), false); DataBaseEntity sourceDbe = null; DataBaseEntity targetDbe = null; try { String dataSourceUUID = dataCollectFse.getString(CmnConst.DATA_SOURCE); StringBuilder autoFilter = new StringBuilder(64); StringBuilder timeFilter = new StringBuilder(64); if (!StringUtils.isEmpty(minID)) { autoFilter.append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append(">=").append(minID); } if (autoFilter.length() > 0) { autoFilter.append(" and "); } if (!StringUtils.isEmpty(maxID)) { autoFilter.append(dataCollectFse.getString(CmnConst.AUTO_FIELD)).append("<=").append(maxID); } FieldSetEntity dataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataSourceUUID, false); int dbType = dataSourceFse.getInteger(CmnConst.DB_TYPE); if (statisticsStartTime != null && !StringUtils.isEmpty(dataCollectFse.getString(CmnConst.TIME_FIELD))) { timeFilter.append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append(">=").append(SqlTransferUtil.str2Date(dbType, statisticsStartTime)); } if (timeFilter.length() > 0) { timeFilter.append(" and "); } if (statisticsFinalTime != null) { timeFilter.append(dataCollectFse.getString(CmnConst.TIME_FIELD)).append("<=").append(SqlTransferUtil.str2Date(dbType, statisticsFinalTime)); } sourceDbe = new DataBaseEntity(dataSourceFse); String collectTableName = dataCollectFse.getString(CmnConst.SOURCE_TABLE); QueryDataConfigEntity queryDataConfigEntity = new QueryDataConfigEntity(collectTableName, joinFilter(autoFilter, timeFilter, false), dataCollectFse.getString(CmnConst.AUTO_FIELD)); queryDataConfigEntity.addQueryField(dataCollectFse.getString(CmnConst.COLLECT_FIELDS).split(",")); FieldSetEntity targetDataSourceFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, dataCollectFse.getString(CmnConst.TARGET_DATA_SOURCE), false); targetDbe = new DataBaseEntity(targetDataSourceFse); BatchConfigEntity batchConfigEntity = new BatchConfigEntity( sourceDbe, targetDbe, queryDataConfigEntity, dataDeal(), errorDeal(sourceLogFse, dataCollectFse, startCalendar), false); if (dataExtractLogFse == null) { batchService.executeBatch(batchConfigEntity, dataInsert(dataCollectFse, uuid, true)); // 回写日志 journalManagerService.writeBackReDealResult(sourceLogFse, true); return sourceLogFse; } else { batchService.executeBatch(batchConfigEntity, dataInsert(dataCollectFse, dataExtractLogFse.getUUID(), true)); // 回写日志 journalManagerService.writeBackReDealResult(dataExtractLogFse, true); return dataExtractLogFse; } } catch (Exception e) { // 写日志 FieldSetEntity dataCenterLogFse = new FieldSetEntity(); dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); dataCenterLogFse.setValue(CmnConst.TYPE, 1); dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DETAIL)); dataCenterLogFse.setValue(CmnConst.RESULT, 0); dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0); dataCenterLogFse.setValue(CmnConst.ERROR, journalManagerService.getStackTrace(e)); dataCenterLogFse.setValue(CmnConst.DATA_SOURCE, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DATA_SOURCE)); dataCenterLogFse.setValue(CmnConst.SOURCE_UUID, uuid); dataCenterLogFse.setValue(CmnConst.MIN_ID, minID); dataCenterLogFse.setValue(CmnConst.MAX_ID, maxID); Calendar finalCalendar = Calendar.getInstance(); dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, finalCalendar.getTimeInMillis() - startCalendar.getTimeInMillis()); dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, sourceLogFse.getString(CmnConst.CONFIG_UUID)); dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, statisticsStartTime); dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, statisticsFinalTime); baseDao.saveFieldSetEntity(dataCenterLogFse); mesExternalService.remoteSaveCollectLog(dataCenterLogFse); if (dataExtractLogFse == null) { // 回写日志 journalManagerService.writeBackReDealResult(sourceLogFse, false); } else { // 回写日志 journalManagerService.writeBackReDealResult(dataExtractLogFse, false); } } finally { if (sourceDbe != null && sourceDbe.getDao() != null) { sourceDbe.getDao().closeConnection(); } if (targetDbe != null && targetDbe.getRedisService() != null) { targetDbe.getRedisService().close(); } } return null; } /** * 数据处理(数据采集部分没有特殊操作,原样返回) * * @return */ public CallBackReturnValue dataDeal() { return dte -> dte; } /** * 数据插入到redis * * @param dataCollectFse * @return */ public IBatchService.QueryDataAfterProcessing dataInsert(FieldSetEntity dataCollectFse, String sourceUUID, boolean reDealFlag) { return (dt, minId, maxId, sourceDbe, targetDbe, c1) -> { RedisService redisService = targetDbe.getRedisService(); StringBuilder errorInfo = new StringBuilder(512); // 清理redis中已经存在且更新时间不大于当前redis中存储的值 TimeInterval timer = DateUtil.timer(); StringBuilder logUseTimeInfo = new StringBuilder(128); String saveName = StringUtils.isEmpty(dataCollectFse.getString("target_table")) || StringUtils.isEmpty(dataCollectFse.getString("target_table").trim()) ? dataCollectFse.getString(CmnConst.SOURCE_TABLE) : dataCollectFse.getString("target_table"); saveName = saveName.toLowerCase(Locale.ROOT); List curAddKeyList = Lists.newArrayList(); RedisService readRedis = new RedisService(); if (!reDealFlag) { FieldSetEntity tempFse; String keyPrefix = "DC_STORE:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID) + ":"; String field; Object value; Date date; boolean serializeFlag = true; int outTime = (60 * 60) / 2; for (int i = dt.getRows() - 1; i >= 0; i--) { try { tempFse = dt.getFieldSetEntity(i); date = tempFse.getDate(dataCollectFse.getString(CmnConst.TIME_FIELD)); field = tempFse.getString(dataCollectFse.getString(CmnConst.AUTO_FIELD)); value = readRedis.get(keyPrefix + field, serializeFlag); if (value != null && !StringUtils.isEmpty(value.toString())) { if (((Date) value).compareTo(date) < 0) { readRedis.set(keyPrefix + field, date, serializeFlag); readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); curAddKeyList.add(keyPrefix + field); } else { dt.removeFieldSetEntity(i); readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); } } else { readRedis.set(keyPrefix + field, date, serializeFlag); readRedis.setOutTime(keyPrefix + field, outTime, serializeFlag); curAddKeyList.add(keyPrefix + field); } } catch (Exception e) { if (errorInfo.length() < 2048) { errorInfo.append(journalManagerService.getStackTrace(e)); } else { errorInfo.append("\n..."); } } } if (dt.getRows() == 0) { return; } logUseTimeInfo.append("采集-redis数据清理耗时:" + timer.intervalMs() + ";"); } JSONObject paramObj = journalManagerService.getRangeInfo(dt, dataCollectFse.getString(CmnConst.TIME_FIELD)); StringBuilder filter = new StringBuilder(32); filter.append("min_id=? and max_id=?"); if (!StringUtils.isEmpty(paramObj.getString(CmnConst.STATISTICS_FINAL_TIME))) { filter.append(" and statistics_start_time=").append(SqlTransferUtil.str2Date(1, paramObj.get(CmnConst.STATISTICS_FINAL_TIME))) .append(" and statistics_final_time=").append(SqlTransferUtil.str2Date(1, paramObj.get(CmnConst.STATISTICS_FINAL_TIME))); } DataTableEntity dte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, filter.toString(), new Object[]{minId, maxId}); if (!DataTableEntity.isEmpty(dte) && dte.getRows() > 0) { return; } // 写日志 FieldSetEntity dataCenterLogFse = new FieldSetEntity(); dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); dataCenterLogFse.setValue(CmnConst.MIN_ID, minId); dataCenterLogFse.setValue(CmnConst.MAX_ID, maxId); dataCenterLogFse.setValue(CmnConst.ERROR, errorInfo.toString()); dataCenterLogFse.setValue(CmnConst.COUNT, dt.getRows()); dataCenterLogFse.setValue(CmnConst.TYPE, 1); dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse.getString(CmnConst.DETAIL)); dataCenterLogFse.setValue(CmnConst.RESULT, 1); dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0); dataCenterLogFse.setValue(CmnConst.SOURCE_UUID, sourceUUID); dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, dataCollectFse.getUUID()); dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, paramObj.get(CmnConst.STATISTICS_START_TIME)); dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, paramObj.get(CmnConst.STATISTICS_FINAL_TIME)); String uuid = IdUtil.randomUUID(); dataCenterLogFse.setValue(CmnConst.UUID, uuid); timer = DateUtil.timer(); redisService.setHash("DC:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID), dataCenterLogFse.getUUID(), dt); Calendar c2 = Calendar.getInstance(); dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, c2.getTimeInMillis() - c1.getTimeInMillis()); logUseTimeInfo.append("采集-redis数据存放耗时:" + timer.intervalMs()); dataCenterLogFse.setValue(CmnConst.OTHER_INFO, logUseTimeInfo.toString()); dataCenterLogFse.setValue(CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add"); try { mesExternalService.remoteSaveCollectLog(dataCenterLogFse); baseDao.add(dataCenterLogFse); } catch (Exception e) { redisService.del("DC:" + saveName + ":" + dataCollectFse.getString(CmnConst.ID), dataCenterLogFse.getUUID()); readRedis.del(true, curAddKeyList.toArray(new String[]{})); throw e; } // 标记当前时间之前非提取失败导致的采集失败日志为成功 if (!reDealFlag) { FieldSetEntity curLogFse = baseDao.getFieldSetEntity(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, dataCenterLogFse.getUUID(), false); DataTableEntity waitUpdateDte = baseDao.listTable( CmnConst.PRODUCT_SYS_DATA_CENTER_LOG, "id<=? and created_utc_datetime<=now() and config_uuid=? and result=0 and deal_flag=0 and error not like '采集成功,未能在%'", new Object[]{curLogFse.getInteger(CmnConst.ID), curLogFse.getString(CmnConst.CONFIG_UUID)}); if (!DataTableEntity.isEmpty(waitUpdateDte)) { StringBuilder sql = new StringBuilder(128); sql.append("update product_sys_data_center_log set deal_flag=1,deal_result=1 where ").append(BaseUtil.buildQuestionMarkFilter(CmnConst.ID, waitUpdateDte.getRows(), true)); baseDao.executeUpdate(sql.toString(), waitUpdateDte.getFieldAllValues(CmnConst.ID)); } } }; } /** * 异常处理 * * @param sourceLogFse * @param dataCollectFse * @param startCalendar * @return */ public CallBack errorDeal(FieldSetEntity sourceLogFse, FieldSetEntity dataCollectFse, Calendar startCalendar) { return (BatchExecuteEntity... batchExecuteEntities) -> { // 写日志 BatchExecuteEntity batchExecuteEntity = batchExecuteEntities[0]; FieldSetEntity dataCenterLogFse = new FieldSetEntity(); dataCenterLogFse.setTableName(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG); dataCenterLogFse.setValue(CmnConst.CREATED_UTC_DATETIME, new Date()); dataCenterLogFse.setValue(CmnConst.TYPE, 1); dataCenterLogFse.setValue(CmnConst.DETAIL, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DETAIL)); dataCenterLogFse.setValue(CmnConst.RESULT, 0); dataCenterLogFse.setValue(CmnConst.DEAL_FLAG, 0); dataCenterLogFse.setValue(CmnConst.ERROR, batchExecuteEntity.getErrorMessage()); dataCenterLogFse.setValue(CmnConst.DATA_SOURCE, dataCollectFse == null ? null : dataCollectFse.getString(CmnConst.DATA_SOURCE)); dataCenterLogFse.setValue(CmnConst.MIN_ID, batchExecuteEntity.getMinId()); dataCenterLogFse.setValue(CmnConst.MAX_ID, batchExecuteEntity.getMaxId()); Calendar finalCalendar = Calendar.getInstance(); dataCenterLogFse.setValue(CmnConst.SINGLE_DURATION, finalCalendar.getTimeInMillis() - startCalendar.getTimeInMillis()); if (sourceLogFse != null) { dataCenterLogFse.setValue(CmnConst.SOURCE_UUID, sourceLogFse.getUUID()); dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, sourceLogFse.getString(CmnConst.CONFIG_UUID)); } else { dataCenterLogFse.setValue(CmnConst.CONFIG_UUID, dataCollectFse == null ? null : dataCollectFse.getUUID()); } DataTableEntity dte = batchExecuteEntity.getDte(); JSONObject paramObj = journalManagerService.getRangeInfo(dte, dataCollectFse.getString(CmnConst.TIME_FIELD)); dataCenterLogFse.setValue(CmnConst.STATISTICS_START_TIME, paramObj.get(CmnConst.STATISTICS_START_TIME)); dataCenterLogFse.setValue(CmnConst.STATISTICS_FINAL_TIME, paramObj.get(CmnConst.STATISTICS_FINAL_TIME)); baseDao.saveFieldSetEntity(dataCenterLogFse); }; } /** * 拼接过滤条件 * * @param autoFilter * @param timeFilter * @return */ private String joinFilter(StringBuilder autoFilter, StringBuilder timeFilter, boolean isOr) { StringBuilder filterSb = new StringBuilder(128); if (autoFilter.length() > 0) { filterSb.append("(").append(autoFilter).append(")"); } if (timeFilter.length() > 0) { if (filterSb.length() > 0) { filterSb.append(" ").append(isOr ? "or" : "and").append(" "); } filterSb.append("(").append(timeFilter).append(")"); } return filterSb.toString(); } }