package com.product.data.center.service;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.beust.jcommander.internal.Lists;
|
import com.google.common.collect.Maps;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.util.BaseUtil;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
|
/**
|
* 实现功能:
|
*
|
* @author 作者[夜丶光]
|
* @version 1.0.00 2025-08-04 10:48
|
*/
|
@Service
|
public class SpDealService extends AbstractBaseService {
|
@Autowired
|
private BaseDao baseDao;
|
@Autowired
|
private DataArchivingService dataArchivingService;
|
@Autowired
|
private JournalManagerService journalManagerService;
|
|
/*========================请求历史-start========================*/
|
// 正在处理标识
|
private static boolean processingFlag = false;
|
// 停止标识
|
private static boolean stopFlag = false;
|
// 数据字典名称-请求历史
|
private static final String DICT_NAME_REQUEST_HISTORY = "请求历史特殊处理";
|
// 页大小
|
private static int pageSize = 0;
|
// 子库操作的表集合,需要从数据库中读取
|
private static List<String> operateTableList = Lists.newArrayList();
|
// 整机子库ip
|
private static String subDbIp = "";
|
// 子库端口
|
private static int subDbPort = 0;
|
// 报表库ip
|
private static String reportDbIp = "";
|
// 报表库端口
|
private static int reportDbPort = 0;
|
|
/**
|
* 通常定时任务触发-停止执行
|
*/
|
public void stopExecute() {
|
SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-停止]");
|
stopFlag = true;
|
}
|
|
/**
|
* 通常定时任务触发-请求历史特殊处理-测试方法
|
*/
|
public void testRequestHistorySpDeal() {
|
requestHistorySpDeal(true);
|
}
|
|
/**
|
* 通常定时任务触发-请求历史特殊处理
|
*/
|
public void requestHistorySpDeal() {
|
requestHistorySpDeal(false);
|
}
|
|
/**
|
* 请求历史特殊处理
|
*/
|
private void requestHistorySpDeal(boolean testFlag) {
|
SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-开始]");
|
if (processingFlag) {
|
SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-正在进行-跳过]");
|
return;
|
}
|
synchronized ("requestHistorySpDeal") {
|
DataBaseEntity subDbe = null;
|
DataBaseEntity reportDbe = null;
|
DataTableEntity waitInsertDte = null;
|
try {
|
if (processingFlag) {
|
return;
|
}
|
processingFlag = true;
|
|
// 获取特殊处理信息
|
JSONObject spDealInfoObj = getSpDealInfo();
|
if (spDealInfoObj.isEmpty()) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_SP_DEAL_INFO);
|
}
|
String tableFormat = spDealInfoObj.getString("tableFormat");
|
|
// 获取子库和报表库连接信息
|
DataTableEntity dbLinkDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "ip=? OR (ip=? AND port=?)", new Object[]{subDbIp, reportDbIp, reportDbPort});
|
FieldSetEntity subDbFse = null;
|
FieldSetEntity reportDbFse = null;
|
for (int i = 0; i < dbLinkDte.getRows(); i++) {
|
FieldSetEntity singleFse = dbLinkDte.getFieldSetEntity(i);
|
if (subDbIp.equals(singleFse.getString("ip")) && !StringUtils.isEmpty(singleFse.getString("port")) && subDbPort == singleFse.getInteger("port")) {
|
subDbFse = singleFse;
|
} else if (reportDbIp.equals(singleFse.getString("ip")) && !StringUtils.isEmpty(singleFse.getString("port")) && reportDbPort == singleFse.getInteger("port")) {
|
reportDbFse = singleFse;
|
}
|
}
|
if (subDbFse == null || reportDbFse == null) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_DB_LINK_INFO);
|
}
|
|
// 获取子库对应表的信息:表名,主键字段,时间字段
|
JSONObject subDbTableInfoObj = getSubDbTableInfo(subDbFse.getUUID());
|
if (subDbTableInfoObj.isEmpty()) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_INFO);
|
}
|
|
// 获取连接
|
subDbe = new DataBaseEntity(subDbFse);
|
reportDbe = new DataBaseEntity(reportDbFse);
|
|
// 按主键排序,分页提取子库数据,对比报表库数据,不存在则插入
|
if (!stopFlag) {
|
int curMaxMasterKeyValue = 0;
|
outer: for (String tableNameStr : operateTableList) {
|
SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-数据处理-开始]-" + tableNameStr);
|
int index = 0;
|
JSONObject singleSubDbTableInfoObj = subDbTableInfoObj.getJSONObject(tableNameStr);
|
// 采集配置表id
|
String sourceInfo = singleSubDbTableInfoObj.getString(CmnConst.ID);
|
String tableName = String.format(tableFormat, tableNameStr);
|
String reportTableNamePrefix = String.format("da_%s", tableNameStr);
|
String orderBy = singleSubDbTableInfoObj.getString("auto_field");
|
DataArchivingService.DataArchivingServiceImpl createTableService = getDDLService(tableName, subDbe, reportDbe);
|
String masterKeyFieldName = singleSubDbTableInfoObj.getString("auto_field");
|
long initId = spDealInfoObj.getJSONObject(tableNameStr) == null ? 0 : spDealInfoObj.getJSONObject(tableNameStr).getIntValue("dealt_max_id");
|
long curId = initId;
|
int count;
|
int totalCount = 0;
|
// 统计页数,每多少页记录一次最大id
|
int statisticsPage = 10;
|
if (!testFlag) {
|
do {
|
index++;
|
if (stopFlag) {
|
break outer;
|
}
|
DataTableEntity singlePageDataDte;
|
try {
|
singlePageDataDte = subDbe.getDao().getList(tableName, null, masterKeyFieldName + ">?", new Object[]{curId}, orderBy, 1, pageSize);
|
} catch (Exception e) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_WITH_SUB_DB_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_WITH_SUB_DB_DATA.getText() + ":" + e.getMessage());
|
}
|
count = singlePageDataDte.getRows();
|
if (count > 0) {
|
curMaxMasterKeyValue = singlePageDataDte.getFieldSetEntity(singlePageDataDte.getRows() - 1).getInteger(singleSubDbTableInfoObj.getString("auto_field"));
|
curId = curMaxMasterKeyValue;
|
Map<String, Map<String, FieldSetEntity>> monthMap = Maps.newHashMap();
|
dealSubDbData(singlePageDataDte, singleSubDbTableInfoObj, monthMap);
|
|
// 检查是否所有表都存在,若是不存在,那么创建
|
checkTableIfNoThenCreate(monthMap.keySet(), reportTableNamePrefix, createTableService);
|
|
String querySql = joinReportDbQuerySql(monthMap, reportTableNamePrefix, sourceInfo);
|
DataTableEntity reportDbExistsDte = reportDbe.getDao().getList(querySql);
|
for (int i = 0; i < reportDbExistsDte.getRows(); i++) {
|
FieldSetEntity reportDbExistsFse = reportDbExistsDte.getFieldSetEntity(i);
|
for (Map<String, FieldSetEntity> singleMonthMap : monthMap.values()) {
|
singleMonthMap.remove(reportDbExistsFse.getString("pre_master_key"));
|
}
|
}
|
for (Map.Entry<String, Map<String, FieldSetEntity>> entry : monthMap.entrySet()) {
|
String insertTableName = String.format("%s_%s", reportTableNamePrefix, entry.getKey());
|
waitInsertDte = getSingleMonthInsertDte(entry.getValue(), insertTableName, sourceInfo);
|
reportDbe.getDao().addBatch(waitInsertDte);
|
totalCount += waitInsertDte.getRows();
|
}
|
if (index % statisticsPage == 0) {
|
// 更新当前操作的表统计日志信息,每指定页数更新一次
|
updateOperateTableInfo(tableNameStr, initId, curMaxMasterKeyValue, totalCount);
|
}
|
}
|
} while (count > 0);
|
if (index % statisticsPage != 0) {
|
// 更新当前操作的表统计日志信息,若是不是倍数,那么需要额外执行一次
|
updateOperateTableInfo(tableNameStr, initId, curMaxMasterKeyValue, totalCount);
|
}
|
}
|
}
|
}
|
} catch (Exception e) {
|
recordErrorLog(waitInsertDte, e);
|
throw e;
|
} finally {
|
stop(subDbe, reportDbe);
|
SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-结束]");
|
}
|
}
|
}
|
|
/**
|
* 记录错误日志
|
* @param waitInsertDte 等待插入的数据dte
|
* @param e 错误
|
*/
|
private void recordErrorLog(DataTableEntity waitInsertDte, Exception e) {
|
String errorInfo = journalManagerService.getStackTrace(e);
|
String groupUUID = UUID.randomUUID().toString();
|
Date curTime = new Date();
|
if (!DataTableEntity.isEmpty(waitInsertDte)) {
|
// 记录当前报错的信息
|
DataTableEntity logDte = new DataTableEntity();
|
for (int i = 0; i < waitInsertDte.getRows(); i++) {
|
FieldSetEntity singlePageDataFse = waitInsertDte.getFieldSetEntity(i);
|
FieldSetEntity logFse = new FieldSetEntity("product_sys_sp_deal_log");
|
logFse.setValue("content", BaseUtil.fieldSetEntityToJson(singlePageDataFse).toJSONString());
|
logFse.setValue("error", errorInfo);
|
logFse.setValue("group_uuid", groupUUID);
|
logFse.setValue("created_utc_datetime", curTime);
|
logDte.addFieldSetEntity(logFse);
|
}
|
baseDao.add(logDte);
|
} else {
|
FieldSetEntity logFse = new FieldSetEntity("product_sys_sp_deal_log");
|
logFse.setValue("content", "非数据处理阶段出错");
|
logFse.setValue("error", errorInfo);
|
logFse.setValue("group_uuid", groupUUID);
|
logFse.setValue("created_utc_datetime", curTime);
|
baseDao.add(logFse);
|
}
|
}
|
|
/**
|
* 获取单月插入dte
|
* @param singleMonthMap 单月数据map
|
* @param insertTableName 报表库表名
|
* @param sourceInfo 采集配置表id
|
* @return 单批次中单月(因为按月分表)待插入的dte
|
*/
|
private DataTableEntity getSingleMonthInsertDte(Map<String, FieldSetEntity> singleMonthMap, String insertTableName, String sourceInfo) {
|
DataTableEntity waitInsertDte = new DataTableEntity();
|
for (Map.Entry<String, FieldSetEntity> entry : singleMonthMap.entrySet()) {
|
FieldSetEntity waitInsertFse = entry.getValue();
|
waitInsertFse.setTableName(insertTableName);
|
waitInsertFse.setValue("pre_master_key", entry.getKey());
|
waitInsertFse.setValue("source_info", sourceInfo);
|
waitInsertDte.addFieldSetEntity(waitInsertFse);
|
}
|
return waitInsertDte;
|
}
|
|
/**
|
* 获取创建表的service
|
* @param tableName 表名,子库历史表表名
|
* @param subDbe 子库数据库连接实例
|
* @param reportDbe 报表数据库连接实例
|
* @return 能够执行创建表DDL语句的service
|
*/
|
private DataArchivingService.DataArchivingServiceImpl getDDLService(String tableName, DataBaseEntity subDbe, DataBaseEntity reportDbe) {
|
Dao sourceDao = subDbe.getDao();
|
Dao targetDao = reportDbe.getDao();
|
String sourceDbName = subDbe.getDbName();
|
String targetDbName = reportDbe.getDbName();
|
return dataArchivingService.new DataArchivingServiceImpl(sourceDao, targetDao, tableName, "requestHistory", sourceDbName, targetDbName);
|
}
|
|
/**
|
* 检查是否所有表都存在,若是不存在,那么创建
|
* @param tableTailSet 表尾缀集合,时间
|
* @param reportTableNamePrefix 报表库表名前缀
|
* @param createTableService 创建表的service
|
*/
|
private void checkTableIfNoThenCreate(Set<String> tableTailSet, String reportTableNamePrefix, DataArchivingService.DataArchivingServiceImpl createTableService) {
|
for (String tableTail : tableTailSet) {
|
createTableService.createTable(reportTableNamePrefix, tableTail, null);
|
}
|
}
|
|
/**
|
* 更新当前操作的表的统计日志信息
|
* @param tableName 表名,子库正式表表名
|
* @param initId 本次执行前,采集库数据字典中参数里对应表的最大执行id
|
* @param curMaxMasterKeyValue 从子库中获取到的本批次数据最大的id
|
* @param operateCount 插入报表库数据的总条数
|
*/
|
private void updateOperateTableInfo(String tableName, long initId, int curMaxMasterKeyValue, int operateCount) {
|
String maxIdDictLabel = String.format("%s_dealt_max_id", tableName);
|
String latestTimeDictLabel = String.format("%s_latest_time", tableName);
|
String latestOperateCountDictLabel = String.format("%s_latest_operate_count", tableName);
|
DataTableEntity dictDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DICT, "dict_name=? AND dict_label IN (?,?,?)", new Object[]{DICT_NAME_REQUEST_HISTORY, maxIdDictLabel, latestTimeDictLabel, latestOperateCountDictLabel});
|
FieldSetEntity maxIdDictFse = null;
|
FieldSetEntity latestTimeDictFse = null;
|
FieldSetEntity latestOperateCountDictFse = null;
|
if (!DataTableEntity.isEmpty(dictDte)) {
|
for (int i = 0; i < dictDte.getRows(); i++) {
|
FieldSetEntity dictFse = dictDte.getFieldSetEntity(i);
|
if (maxIdDictLabel.equals(dictFse.getString("dict_label"))) {
|
maxIdDictFse = dictFse;
|
} else if (latestTimeDictLabel.equals(dictFse.getString("dict_label"))) {
|
latestTimeDictFse = dictFse;
|
} else if (latestOperateCountDictLabel.equals(dictFse.getString("dict_label"))) {
|
latestOperateCountDictFse = dictFse;
|
}
|
}
|
}
|
|
FieldSetEntity newDictFse = new FieldSetEntity(CmnConst.PRODUCT_SYS_DICT);
|
newDictFse.setValue("dict_name", DICT_NAME_REQUEST_HISTORY);
|
newDictFse.setValue("is_used", 1);
|
newDictFse.setValue("sequence", 1);
|
newDictFse.setValue("client_type", "Web");
|
newDictFse.setValue("created_utc_datetime", new Date());
|
newDictFse.setValue("created_by", 1);
|
|
// 最大id
|
if (initId < curMaxMasterKeyValue) {
|
saveDictFse(maxIdDictFse, newDictFse, maxIdDictLabel, curMaxMasterKeyValue);
|
}
|
// 最近执行时间
|
saveDictFse(latestTimeDictFse, newDictFse, latestTimeDictLabel, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
|
// 最近一次执行总数
|
saveDictFse(latestOperateCountDictFse, newDictFse, latestOperateCountDictLabel, operateCount);
|
}
|
|
/**
|
* 保存请求历史在数据字典里面的参数fse
|
* @param saveDictFse 需要保存dict的fse
|
* @param modelDictFse 模板fse
|
* @param dictLabel 标签
|
* @param dictValue 值
|
*/
|
private void saveDictFse(FieldSetEntity saveDictFse, FieldSetEntity modelDictFse, String dictLabel, Object dictValue) {
|
if (saveDictFse == null) {
|
saveDictFse = modelDictFse.clones();
|
saveDictFse.setValue("dict_label", dictLabel);
|
} else {
|
saveDictFse.setValue("updated_utc_datetime", new Date());
|
saveDictFse.setValue("updated_by", 1);
|
}
|
saveDictFse.setValue("dict_value", dictValue);
|
baseDao.saveFieldSetEntity(saveDictFse);
|
}
|
|
/**
|
* 拼接报表库查询语句
|
* @param monthMap 月份map
|
* @param reportTableNamePrefix 报表库表名前缀
|
* @param sourceInfo 采集表id
|
* @return 报表库查询语句
|
*/
|
private String joinReportDbQuerySql(Map<String, Map<String, FieldSetEntity>> monthMap, String reportTableNamePrefix, String sourceInfo) {
|
StringBuilder sql = new StringBuilder(128);
|
String model = "SELECT * FROM %s_%s WHERE source_info='%s' AND %s";
|
for (Map.Entry<String, Map<String, FieldSetEntity>> entry : monthMap.entrySet()) {
|
String month = entry.getKey();
|
Set<String> masterKeySet = entry.getValue().keySet();
|
if (sql.length() > 0) {
|
sql.append("\nUNION ALL\n");
|
}
|
sql.append(String.format(model, reportTableNamePrefix, month, sourceInfo, BaseUtil.buildQuestionMarkFilter("pre_master_key", masterKeySet.toArray(), true)));
|
}
|
return sql.toString();
|
}
|
|
/**
|
* 获取子库对应表的信息:表名,主键字段,时间字段等
|
* @param subDbUuid 子库连接配置表uuid
|
* @return JSONObject 子库对应表的信息
|
*/
|
private JSONObject getSubDbTableInfo(String subDbUuid) {
|
String filter = String.format("data_source=? AND %s", BaseUtil.buildQuestionMarkFilter("source_table", operateTableList.size(), true));
|
List<String> paramList = Lists.newArrayList();
|
paramList.add(subDbUuid);
|
paramList.addAll(operateTableList);
|
DataTableEntity tableDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, filter, paramList.toArray());
|
JSONObject resultObj = new JSONObject();
|
for (int i = 0; i < tableDte.getRows(); i++) {
|
FieldSetEntity singleFse = tableDte.getFieldSetEntity(i);
|
String tableName = singleFse.getString("source_table").toLowerCase(Locale.ROOT);
|
resultObj.computeIfAbsent(tableName, s -> BaseUtil.fieldSetEntityToJson(singleFse));
|
}
|
return resultObj;
|
}
|
|
/**
|
* 获取特殊处理信息 t_wip_detail_dealt_max_id
|
* @return JSONObject 特殊处理信息
|
*/
|
private JSONObject getSpDealInfo() {
|
DataTableEntity dictDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DICT, "dict_name=?", new Object[]{DICT_NAME_REQUEST_HISTORY});
|
JSONObject resultObj = new JSONObject();
|
for (int i = 0; i < dictDte.getRows(); i++) {
|
FieldSetEntity singleFse = dictDte.getFieldSetEntity(i);
|
String dictLabel = singleFse.getString("dict_label");
|
String dictValue = singleFse.getString("dict_value");
|
if ("tableFormat".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库历史表格式");
|
}
|
resultObj.put("tableFormat", singleFse.getString("dict_value"));
|
} else if ("dealTables".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "需要处理的表名");
|
}
|
operateTableList = Arrays.asList(dictValue.split(","));
|
} else if ("pageSize".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "单页处理页大小");
|
}
|
pageSize = Integer.parseInt(dictValue);
|
if (pageSize <= 0) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "正确的单页处理页大小");
|
}
|
} else if ("subDbIp".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库地址");
|
}
|
subDbIp = dictValue;
|
} else if ("subDbPort".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库端口");
|
}
|
subDbPort = Integer.parseInt(dictValue);
|
} else if ("reportDbIp".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "报表库地址");
|
}
|
reportDbIp = dictValue;
|
} else if ("reportDbPort".equals(dictLabel)) {
|
if (StringUtils.isEmpty(dictValue)) {
|
throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "报表库端口");
|
}
|
reportDbPort = Integer.parseInt(dictValue);
|
} else {
|
String dictLabelLowerCase = dictLabel.toLowerCase(Locale.ROOT);
|
for (String tableName : operateTableList) {
|
String tablenNameLowerCase = tableName.toLowerCase(Locale.ROOT);
|
if (dictLabelLowerCase.startsWith(tablenNameLowerCase)) {
|
JSONObject singleObj = (JSONObject) resultObj.computeIfAbsent(tablenNameLowerCase, s -> new JSONObject());
|
singleObj.put(dictLabelLowerCase.replace(String.format("%s_", tablenNameLowerCase), ""), singleFse.getValue("dict_value"));
|
}
|
}
|
}
|
}
|
return resultObj;
|
}
|
|
/**
|
* 处理子库中查询到的数据,按照月份和主键分别置于不同的容器中
|
* @param singlePageDataDte 子库数据
|
* @param singleSubDbTableInfoObj 表信息
|
* @param monthMap 容器:Map<yyyyMM, <masterKey, Fse>>
|
*/
|
private void dealSubDbData(DataTableEntity singlePageDataDte, JSONObject singleSubDbTableInfoObj, Map<String, Map<String, FieldSetEntity>> monthMap) {
|
String masterKeyFieldName = singleSubDbTableInfoObj.getString("auto_field");
|
String timeFieldName = singleSubDbTableInfoObj.getString("time_field");
|
for (int i = 0; i < singlePageDataDte.getRows(); i++) {
|
FieldSetEntity singlePageDataFse = singlePageDataDte.getFieldSetEntity(i);
|
String timeStr = singlePageDataFse.getDate(timeFieldName, "yyyyMM");
|
String masterValue = singlePageDataFse.getString(masterKeyFieldName);
|
monthMap.computeIfAbsent(timeStr, s -> Maps.newHashMap()).put(masterValue, singlePageDataFse);
|
}
|
}
|
|
/**
|
* 停止
|
*/
|
private void stop(DataBaseEntity... dbeArr) {
|
if (dbeArr != null) {
|
for (DataBaseEntity dbe : dbeArr) {
|
if (dbe != null && dbe.getDao() != null) {
|
dbe.getDao().closeConnection();
|
}
|
}
|
}
|
processingFlag = false;
|
stopFlag = false;
|
}
|
/*========================请求历史-final========================*/
|
}
|