package com.product.data.center.service; import com.alibaba.fastjson.JSONObject; import com.beust.jcommander.internal.Lists; import com.google.common.collect.Maps; import com.product.common.lang.StringUtils; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import com.product.util.BaseUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.*; /** * 实现功能: * * @author 作者[夜丶光] * @version 1.0.00 2025-08-04 10:48 */ @Service public class SpDealService extends AbstractBaseService { @Autowired private BaseDao baseDao; @Autowired private DataArchivingService dataArchivingService; @Autowired private JournalManagerService journalManagerService; /*========================请求历史-start========================*/ // 正在处理标识 private static boolean processingFlag = false; // 停止标识 private static boolean stopFlag = false; // 数据字典名称-请求历史 private static final String DICT_NAME_REQUEST_HISTORY = "请求历史特殊处理"; // 页大小 private static int pageSize = 0; // 子库操作的表集合,需要从数据库中读取 private static List operateTableList = Lists.newArrayList(); // 整机子库ip private static String subDbIp = ""; // 子库端口 private static int subDbPort = 0; // 报表库ip private static String reportDbIp = ""; // 报表库端口 private static int reportDbPort = 0; /** * 通常定时任务触发-停止执行 */ public void stopExecute() { SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-停止]"); stopFlag = true; } /** * 通常定时任务触发-请求历史特殊处理-测试方法 */ public void testRequestHistorySpDeal() { requestHistorySpDeal(true); } /** * 通常定时任务触发-请求历史特殊处理 */ public void requestHistorySpDeal() { requestHistorySpDeal(false); } /** * 请求历史特殊处理 */ private void requestHistorySpDeal(boolean testFlag) { SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-开始]"); if (processingFlag) { SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-正在进行-跳过]"); return; } synchronized ("requestHistorySpDeal") { DataBaseEntity subDbe = null; DataBaseEntity reportDbe = null; DataTableEntity waitInsertDte = null; try { if (processingFlag) { return; } processingFlag = true; // 获取特殊处理信息 JSONObject spDealInfoObj = getSpDealInfo(); if (spDealInfoObj.isEmpty()) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_SP_DEAL_INFO); } String tableFormat = spDealInfoObj.getString("tableFormat"); // 获取子库和报表库连接信息 DataTableEntity dbLinkDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "ip=? OR (ip=? AND port=?)", new Object[]{subDbIp, reportDbIp, reportDbPort}); FieldSetEntity subDbFse = null; FieldSetEntity reportDbFse = null; for (int i = 0; i < dbLinkDte.getRows(); i++) { FieldSetEntity singleFse = dbLinkDte.getFieldSetEntity(i); if (subDbIp.equals(singleFse.getString("ip")) && !StringUtils.isEmpty(singleFse.getString("port")) && subDbPort == singleFse.getInteger("port")) { subDbFse = singleFse; } else if (reportDbIp.equals(singleFse.getString("ip")) && !StringUtils.isEmpty(singleFse.getString("port")) && reportDbPort == singleFse.getInteger("port")) { reportDbFse = singleFse; } } if (subDbFse == null || reportDbFse == null) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_DB_LINK_INFO); } // 获取子库对应表的信息:表名,主键字段,时间字段 JSONObject subDbTableInfoObj = getSubDbTableInfo(subDbFse.getUUID()); if (subDbTableInfoObj.isEmpty()) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_INFO); } // 获取连接 subDbe = new DataBaseEntity(subDbFse); reportDbe = new DataBaseEntity(reportDbFse); // 按主键排序,分页提取子库数据,对比报表库数据,不存在则插入 if (!stopFlag) { int curMaxMasterKeyValue = 0; outer: for (String tableNameStr : operateTableList) { SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-数据处理-开始]-" + tableNameStr); int index = 0; JSONObject singleSubDbTableInfoObj = subDbTableInfoObj.getJSONObject(tableNameStr); // 采集配置表id String sourceInfo = singleSubDbTableInfoObj.getString(CmnConst.ID); String tableName = String.format(tableFormat, tableNameStr); String reportTableNamePrefix = String.format("da_%s", tableNameStr); String orderBy = singleSubDbTableInfoObj.getString("auto_field"); DataArchivingService.DataArchivingServiceImpl createTableService = getDDLService(tableName, subDbe, reportDbe); String masterKeyFieldName = singleSubDbTableInfoObj.getString("auto_field"); long initId = spDealInfoObj.getJSONObject(tableNameStr) == null ? 0 : spDealInfoObj.getJSONObject(tableNameStr).getIntValue("dealt_max_id"); long curId = initId; int count; int totalCount = 0; // 统计页数,每多少页记录一次最大id int statisticsPage = 10; if (!testFlag) { do { index++; if (stopFlag) { break outer; } DataTableEntity singlePageDataDte; try { singlePageDataDte = subDbe.getDao().getList(tableName, null, masterKeyFieldName + ">?", new Object[]{curId}, orderBy, 1, pageSize); } catch (Exception e) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_WITH_SUB_DB_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_WITH_SUB_DB_DATA.getText() + ":" + e.getMessage()); } count = singlePageDataDte.getRows(); if (count > 0) { curMaxMasterKeyValue = singlePageDataDte.getFieldSetEntity(singlePageDataDte.getRows() - 1).getInteger(singleSubDbTableInfoObj.getString("auto_field")); curId = curMaxMasterKeyValue; Map> monthMap = Maps.newHashMap(); dealSubDbData(singlePageDataDte, singleSubDbTableInfoObj, monthMap); // 检查是否所有表都存在,若是不存在,那么创建 checkTableIfNoThenCreate(monthMap.keySet(), reportTableNamePrefix, createTableService); String querySql = joinReportDbQuerySql(monthMap, reportTableNamePrefix, sourceInfo); DataTableEntity reportDbExistsDte = reportDbe.getDao().getList(querySql); for (int i = 0; i < reportDbExistsDte.getRows(); i++) { FieldSetEntity reportDbExistsFse = reportDbExistsDte.getFieldSetEntity(i); for (Map singleMonthMap : monthMap.values()) { singleMonthMap.remove(reportDbExistsFse.getString("pre_master_key")); } } for (Map.Entry> entry : monthMap.entrySet()) { String insertTableName = String.format("%s_%s", reportTableNamePrefix, entry.getKey()); waitInsertDte = getSingleMonthInsertDte(entry.getValue(), insertTableName, sourceInfo); reportDbe.getDao().addBatch(waitInsertDte); totalCount += waitInsertDte.getRows(); } if (index % statisticsPage == 0) { // 更新当前操作的表统计日志信息,每指定页数更新一次 updateOperateTableInfo(tableNameStr, initId, curMaxMasterKeyValue, totalCount); } } } while (count > 0); if (index % statisticsPage != 0) { // 更新当前操作的表统计日志信息,若是不是倍数,那么需要额外执行一次 updateOperateTableInfo(tableNameStr, initId, curMaxMasterKeyValue, totalCount); } } } } } catch (Exception e) { recordErrorLog(waitInsertDte, e); throw e; } finally { stop(subDbe, reportDbe); SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-结束]"); } } } /** * 记录错误日志 * @param waitInsertDte 等待插入的数据dte * @param e 错误 */ private void recordErrorLog(DataTableEntity waitInsertDte, Exception e) { String errorInfo = journalManagerService.getStackTrace(e); String groupUUID = UUID.randomUUID().toString(); Date curTime = new Date(); if (!DataTableEntity.isEmpty(waitInsertDte)) { // 记录当前报错的信息 DataTableEntity logDte = new DataTableEntity(); for (int i = 0; i < waitInsertDte.getRows(); i++) { FieldSetEntity singlePageDataFse = waitInsertDte.getFieldSetEntity(i); FieldSetEntity logFse = new FieldSetEntity("product_sys_sp_deal_log"); logFse.setValue("content", BaseUtil.fieldSetEntityToJson(singlePageDataFse).toJSONString()); logFse.setValue("error", errorInfo); logFse.setValue("group_uuid", groupUUID); logFse.setValue("created_utc_datetime", curTime); logDte.addFieldSetEntity(logFse); } baseDao.add(logDte); } else { FieldSetEntity logFse = new FieldSetEntity("product_sys_sp_deal_log"); logFse.setValue("content", "非数据处理阶段出错"); logFse.setValue("error", errorInfo); logFse.setValue("group_uuid", groupUUID); logFse.setValue("created_utc_datetime", curTime); baseDao.add(logFse); } } /** * 获取单月插入dte * @param singleMonthMap 单月数据map * @param insertTableName 报表库表名 * @param sourceInfo 采集配置表id * @return 单批次中单月(因为按月分表)待插入的dte */ private DataTableEntity getSingleMonthInsertDte(Map singleMonthMap, String insertTableName, String sourceInfo) { DataTableEntity waitInsertDte = new DataTableEntity(); for (Map.Entry entry : singleMonthMap.entrySet()) { FieldSetEntity waitInsertFse = entry.getValue(); waitInsertFse.setTableName(insertTableName); waitInsertFse.setValue("pre_master_key", entry.getKey()); waitInsertFse.setValue("source_info", sourceInfo); waitInsertDte.addFieldSetEntity(waitInsertFse); } return waitInsertDte; } /** * 获取创建表的service * @param tableName 表名,子库历史表表名 * @param subDbe 子库数据库连接实例 * @param reportDbe 报表数据库连接实例 * @return 能够执行创建表DDL语句的service */ private DataArchivingService.DataArchivingServiceImpl getDDLService(String tableName, DataBaseEntity subDbe, DataBaseEntity reportDbe) { Dao sourceDao = subDbe.getDao(); Dao targetDao = reportDbe.getDao(); String sourceDbName = subDbe.getDbName(); String targetDbName = reportDbe.getDbName(); return dataArchivingService.new DataArchivingServiceImpl(sourceDao, targetDao, tableName, "requestHistory", sourceDbName, targetDbName); } /** * 检查是否所有表都存在,若是不存在,那么创建 * @param tableTailSet 表尾缀集合,时间 * @param reportTableNamePrefix 报表库表名前缀 * @param createTableService 创建表的service */ private void checkTableIfNoThenCreate(Set tableTailSet, String reportTableNamePrefix, DataArchivingService.DataArchivingServiceImpl createTableService) { for (String tableTail : tableTailSet) { createTableService.createTable(reportTableNamePrefix, tableTail, null); } } /** * 更新当前操作的表的统计日志信息 * @param tableName 表名,子库正式表表名 * @param initId 本次执行前,采集库数据字典中参数里对应表的最大执行id * @param curMaxMasterKeyValue 从子库中获取到的本批次数据最大的id * @param operateCount 插入报表库数据的总条数 */ private void updateOperateTableInfo(String tableName, long initId, int curMaxMasterKeyValue, int operateCount) { String maxIdDictLabel = String.format("%s_dealt_max_id", tableName); String latestTimeDictLabel = String.format("%s_latest_time", tableName); String latestOperateCountDictLabel = String.format("%s_latest_operate_count", tableName); DataTableEntity dictDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DICT, "dict_name=? AND dict_label IN (?,?,?)", new Object[]{DICT_NAME_REQUEST_HISTORY, maxIdDictLabel, latestTimeDictLabel, latestOperateCountDictLabel}); FieldSetEntity maxIdDictFse = null; FieldSetEntity latestTimeDictFse = null; FieldSetEntity latestOperateCountDictFse = null; if (!DataTableEntity.isEmpty(dictDte)) { for (int i = 0; i < dictDte.getRows(); i++) { FieldSetEntity dictFse = dictDte.getFieldSetEntity(i); if (maxIdDictLabel.equals(dictFse.getString("dict_label"))) { maxIdDictFse = dictFse; } else if (latestTimeDictLabel.equals(dictFse.getString("dict_label"))) { latestTimeDictFse = dictFse; } else if (latestOperateCountDictLabel.equals(dictFse.getString("dict_label"))) { latestOperateCountDictFse = dictFse; } } } FieldSetEntity newDictFse = new FieldSetEntity(CmnConst.PRODUCT_SYS_DICT); newDictFse.setValue("dict_name", DICT_NAME_REQUEST_HISTORY); newDictFse.setValue("is_used", 1); newDictFse.setValue("sequence", 1); newDictFse.setValue("client_type", "Web"); newDictFse.setValue("created_utc_datetime", new Date()); newDictFse.setValue("created_by", 1); // 最大id if (initId < curMaxMasterKeyValue) { saveDictFse(maxIdDictFse, newDictFse, maxIdDictLabel, curMaxMasterKeyValue); } // 最近执行时间 saveDictFse(latestTimeDictFse, newDictFse, latestTimeDictLabel, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); // 最近一次执行总数 saveDictFse(latestOperateCountDictFse, newDictFse, latestOperateCountDictLabel, operateCount); } /** * 保存请求历史在数据字典里面的参数fse * @param saveDictFse 需要保存dict的fse * @param modelDictFse 模板fse * @param dictLabel 标签 * @param dictValue 值 */ private void saveDictFse(FieldSetEntity saveDictFse, FieldSetEntity modelDictFse, String dictLabel, Object dictValue) { if (saveDictFse == null) { saveDictFse = modelDictFse.clones(); saveDictFse.setValue("dict_label", dictLabel); } else { saveDictFse.setValue("updated_utc_datetime", new Date()); saveDictFse.setValue("updated_by", 1); } saveDictFse.setValue("dict_value", dictValue); baseDao.saveFieldSetEntity(saveDictFse); } /** * 拼接报表库查询语句 * @param monthMap 月份map * @param reportTableNamePrefix 报表库表名前缀 * @param sourceInfo 采集表id * @return 报表库查询语句 */ private String joinReportDbQuerySql(Map> monthMap, String reportTableNamePrefix, String sourceInfo) { StringBuilder sql = new StringBuilder(128); String model = "SELECT * FROM %s_%s WHERE source_info='%s' AND %s"; for (Map.Entry> entry : monthMap.entrySet()) { String month = entry.getKey(); Set masterKeySet = entry.getValue().keySet(); if (sql.length() > 0) { sql.append("\nUNION ALL\n"); } sql.append(String.format(model, reportTableNamePrefix, month, sourceInfo, BaseUtil.buildQuestionMarkFilter("pre_master_key", masterKeySet.toArray(), true))); } return sql.toString(); } /** * 获取子库对应表的信息:表名,主键字段,时间字段等 * @param subDbUuid 子库连接配置表uuid * @return JSONObject 子库对应表的信息 */ private JSONObject getSubDbTableInfo(String subDbUuid) { String filter = String.format("data_source=? AND %s", BaseUtil.buildQuestionMarkFilter("source_table", operateTableList.size(), true)); List paramList = Lists.newArrayList(); paramList.add(subDbUuid); paramList.addAll(operateTableList); DataTableEntity tableDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, filter, paramList.toArray()); JSONObject resultObj = new JSONObject(); for (int i = 0; i < tableDte.getRows(); i++) { FieldSetEntity singleFse = tableDte.getFieldSetEntity(i); String tableName = singleFse.getString("source_table").toLowerCase(Locale.ROOT); resultObj.computeIfAbsent(tableName, s -> BaseUtil.fieldSetEntityToJson(singleFse)); } return resultObj; } /** * 获取特殊处理信息 t_wip_detail_dealt_max_id * @return JSONObject 特殊处理信息 */ private JSONObject getSpDealInfo() { DataTableEntity dictDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DICT, "dict_name=?", new Object[]{DICT_NAME_REQUEST_HISTORY}); JSONObject resultObj = new JSONObject(); for (int i = 0; i < dictDte.getRows(); i++) { FieldSetEntity singleFse = dictDte.getFieldSetEntity(i); String dictLabel = singleFse.getString("dict_label"); String dictValue = singleFse.getString("dict_value"); if ("tableFormat".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库历史表格式"); } resultObj.put("tableFormat", singleFse.getString("dict_value")); } else if ("dealTables".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "需要处理的表名"); } operateTableList = Arrays.asList(dictValue.split(",")); } else if ("pageSize".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "单页处理页大小"); } pageSize = Integer.parseInt(dictValue); if (pageSize <= 0) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "正确的单页处理页大小"); } } else if ("subDbIp".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库地址"); } subDbIp = dictValue; } else if ("subDbPort".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库端口"); } subDbPort = Integer.parseInt(dictValue); } else if ("reportDbIp".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "报表库地址"); } reportDbIp = dictValue; } else if ("reportDbPort".equals(dictLabel)) { if (StringUtils.isEmpty(dictValue)) { throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "报表库端口"); } reportDbPort = Integer.parseInt(dictValue); } else { String dictLabelLowerCase = dictLabel.toLowerCase(Locale.ROOT); for (String tableName : operateTableList) { String tablenNameLowerCase = tableName.toLowerCase(Locale.ROOT); if (dictLabelLowerCase.startsWith(tablenNameLowerCase)) { JSONObject singleObj = (JSONObject) resultObj.computeIfAbsent(tablenNameLowerCase, s -> new JSONObject()); singleObj.put(dictLabelLowerCase.replace(String.format("%s_", tablenNameLowerCase), ""), singleFse.getValue("dict_value")); } } } } return resultObj; } /** * 处理子库中查询到的数据,按照月份和主键分别置于不同的容器中 * @param singlePageDataDte 子库数据 * @param singleSubDbTableInfoObj 表信息 * @param monthMap 容器:Map> */ private void dealSubDbData(DataTableEntity singlePageDataDte, JSONObject singleSubDbTableInfoObj, Map> monthMap) { String masterKeyFieldName = singleSubDbTableInfoObj.getString("auto_field"); String timeFieldName = singleSubDbTableInfoObj.getString("time_field"); for (int i = 0; i < singlePageDataDte.getRows(); i++) { FieldSetEntity singlePageDataFse = singlePageDataDte.getFieldSetEntity(i); String timeStr = singlePageDataFse.getDate(timeFieldName, "yyyyMM"); String masterValue = singlePageDataFse.getString(masterKeyFieldName); monthMap.computeIfAbsent(timeStr, s -> Maps.newHashMap()).put(masterValue, singlePageDataFse); } } /** * 停止 */ private void stop(DataBaseEntity... dbeArr) { if (dbeArr != null) { for (DataBaseEntity dbe : dbeArr) { if (dbe != null && dbe.getDao() != null) { dbe.getDao().closeConnection(); } } } processingFlag = false; stopFlag = false; } /*========================请求历史-final========================*/ }