6c
6 天以前 ecd282bfc3f87f34ce04ec8887169b99fe04478e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
package com.product.data.center.service;
 
import com.alibaba.fastjson.JSONObject;
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.Maps;
import com.product.common.lang.StringUtils;
import com.product.core.dao.BaseDao;
import com.product.core.entity.DataTableEntity;
import com.product.core.entity.FieldSetEntity;
import com.product.core.exception.BaseException;
import com.product.core.service.support.AbstractBaseService;
import com.product.core.spring.context.SpringMVCContextHolder;
import com.product.data.center.config.CmnConst;
import com.product.data.center.config.ErrorCode;
import com.product.datasource.dao.Dao;
import com.product.datasource.entity.DataBaseEntity;
import com.product.util.BaseUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
import java.text.SimpleDateFormat;
import java.util.*;
 
/**
 * 实现功能:
 *
 * @author 作者[夜丶光]
 * @version 1.0.00  2025-08-04 10:48
 */
@Service
public class SpDealService extends AbstractBaseService {
    @Autowired
    private BaseDao baseDao;
    @Autowired
    private DataArchivingService dataArchivingService;
    @Autowired
    private JournalManagerService journalManagerService;
 
    /*========================请求历史-start========================*/
    // 正在处理标识
    private static boolean processingFlag = false;
    // 停止标识
    private static boolean stopFlag = false;
    // 数据字典名称-请求历史
    private static final String DICT_NAME_REQUEST_HISTORY = "请求历史特殊处理";
    // 页大小
    private static int pageSize = 0;
    // 子库操作的表集合,需要从数据库中读取
    private static List<String> operateTableList = Lists.newArrayList();
    // 整机子库ip
    private static String subDbIp = "";
    // 子库端口
    private static int subDbPort = 0;
    // 报表库ip
    private static String reportDbIp = "";
    // 报表库端口
    private static int reportDbPort = 0;
 
    /**
     * 通常定时任务触发-停止执行
     */
    public void stopExecute() {
        SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-停止]");
        stopFlag = true;
    }
 
    /**
     * 通常定时任务触发-请求历史特殊处理-测试方法
     */
    public void testRequestHistorySpDeal() {
        requestHistorySpDeal(true);
    }
 
    /**
     * 通常定时任务触发-请求历史特殊处理
     */
    public void requestHistorySpDeal() {
        requestHistorySpDeal(false);
    }
 
    /**
     * 请求历史特殊处理
     */
    private void requestHistorySpDeal(boolean testFlag) {
        SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-开始]");
        if (processingFlag) {
            SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-正在进行-跳过]");
            return;
        }
        synchronized ("requestHistorySpDeal") {
            DataBaseEntity subDbe = null;
            DataBaseEntity reportDbe = null;
            DataTableEntity waitInsertDte = null;
            try {
                if (processingFlag) {
                    return;
                }
                processingFlag = true;
 
                // 获取特殊处理信息
                JSONObject spDealInfoObj = getSpDealInfo();
                if (spDealInfoObj.isEmpty()) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_SP_DEAL_INFO);
                }
                String tableFormat = spDealInfoObj.getString("tableFormat");
 
                // 获取子库和报表库连接信息
                DataTableEntity dbLinkDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "ip=? OR (ip=? AND port=?)", new Object[]{subDbIp, reportDbIp, reportDbPort});
                FieldSetEntity subDbFse = null;
                FieldSetEntity reportDbFse = null;
                for (int i = 0; i < dbLinkDte.getRows(); i++) {
                    FieldSetEntity singleFse = dbLinkDte.getFieldSetEntity(i);
                    if (subDbIp.equals(singleFse.getString("ip")) && !StringUtils.isEmpty(singleFse.getString("port")) && subDbPort == singleFse.getInteger("port")) {
                        subDbFse = singleFse;
                    } else if (reportDbIp.equals(singleFse.getString("ip")) && !StringUtils.isEmpty(singleFse.getString("port")) && reportDbPort == singleFse.getInteger("port")) {
                        reportDbFse = singleFse;
                    }
                }
                if (subDbFse == null || reportDbFse == null) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_DB_LINK_INFO);
                }
 
                // 获取子库对应表的信息:表名,主键字段,时间字段
                JSONObject subDbTableInfoObj = getSubDbTableInfo(subDbFse.getUUID());
                if (subDbTableInfoObj.isEmpty()) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_INFO);
                }
 
                // 获取连接
                subDbe = new DataBaseEntity(subDbFse);
                reportDbe = new DataBaseEntity(reportDbFse);
 
                // 按主键排序,分页提取子库数据,对比报表库数据,不存在则插入
                if (!stopFlag) {
                    int curMaxMasterKeyValue = 0;
                    outer: for (String tableNameStr : operateTableList) {
                        SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-数据处理-开始]-" + tableNameStr);
                        int index = 0;
                        JSONObject singleSubDbTableInfoObj = subDbTableInfoObj.getJSONObject(tableNameStr);
                        // 采集配置表id
                        String sourceInfo = singleSubDbTableInfoObj.getString(CmnConst.ID);
                        String tableName = String.format(tableFormat, tableNameStr);
                        String reportTableNamePrefix = String.format("da_%s", tableNameStr);
                        String orderBy = singleSubDbTableInfoObj.getString("auto_field");
                        DataArchivingService.DataArchivingServiceImpl createTableService = getDDLService(tableName, subDbe, reportDbe);
                        String masterKeyFieldName = singleSubDbTableInfoObj.getString("auto_field");
                        long initId = spDealInfoObj.getJSONObject(tableNameStr) == null ? 0 : spDealInfoObj.getJSONObject(tableNameStr).getIntValue("dealt_max_id");
                        long curId = initId;
                        int count;
                        int totalCount = 0;
                        // 统计页数,每多少页记录一次最大id
                        int statisticsPage = 10;
                        if (!testFlag) {
                            do {
                                index++;
                                if (stopFlag) {
                                    break outer;
                                }
                                DataTableEntity singlePageDataDte;
                                try {
                                    singlePageDataDte = subDbe.getDao().getList(tableName, null, masterKeyFieldName + ">?", new Object[]{curId}, orderBy, 1, pageSize);
                                } catch (Exception e) {
                                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_WITH_SUB_DB_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_WITH_SUB_DB_DATA.getText() + ":" + e.getMessage());
                                }
                                count = singlePageDataDte.getRows();
                                if (count > 0) {
                                    curMaxMasterKeyValue = singlePageDataDte.getFieldSetEntity(singlePageDataDte.getRows() - 1).getInteger(singleSubDbTableInfoObj.getString("auto_field"));
                                    curId = curMaxMasterKeyValue;
                                    Map<String, Map<String, FieldSetEntity>> monthMap = Maps.newHashMap();
                                    dealSubDbData(singlePageDataDte, singleSubDbTableInfoObj, monthMap);
 
                                    // 检查是否所有表都存在,若是不存在,那么创建
                                    checkTableIfNoThenCreate(monthMap.keySet(), reportTableNamePrefix, createTableService);
 
                                    String querySql = joinReportDbQuerySql(monthMap, reportTableNamePrefix, sourceInfo);
                                    DataTableEntity reportDbExistsDte = reportDbe.getDao().getList(querySql);
                                    for (int i = 0; i < reportDbExistsDte.getRows(); i++) {
                                        FieldSetEntity reportDbExistsFse = reportDbExistsDte.getFieldSetEntity(i);
                                        for (Map<String, FieldSetEntity> singleMonthMap : monthMap.values()) {
                                            singleMonthMap.remove(reportDbExistsFse.getString("pre_master_key"));
                                        }
                                    }
                                    for (Map.Entry<String, Map<String, FieldSetEntity>> entry : monthMap.entrySet()) {
                                        String insertTableName = String.format("%s_%s", reportTableNamePrefix, entry.getKey());
                                        waitInsertDte = getSingleMonthInsertDte(entry.getValue(), insertTableName, sourceInfo);
                                        reportDbe.getDao().addBatch(waitInsertDte);
                                        totalCount += waitInsertDte.getRows();
                                    }
                                    if (index % statisticsPage == 0) {
                                        // 更新当前操作的表统计日志信息,每指定页数更新一次
                                        updateOperateTableInfo(tableNameStr, initId, curMaxMasterKeyValue, totalCount);
                                    }
                                }
                            } while (count > 0);
                            if (index % statisticsPage != 0) {
                                // 更新当前操作的表统计日志信息,若是不是倍数,那么需要额外执行一次
                                updateOperateTableInfo(tableNameStr, initId, curMaxMasterKeyValue, totalCount);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                recordErrorLog(waitInsertDte, e);
                throw e;
            } finally {
                stop(subDbe, reportDbe);
                SpringMVCContextHolder.getSystemLogger().info("[请求历史特殊处理-结束]");
            }
        }
    }
 
    /**
     * 记录错误日志
     * @param waitInsertDte 等待插入的数据dte
     * @param e             错误
     */
    private void recordErrorLog(DataTableEntity waitInsertDte, Exception e) {
        String errorInfo = journalManagerService.getStackTrace(e);
        String groupUUID = UUID.randomUUID().toString();
        Date curTime = new Date();
        if (!DataTableEntity.isEmpty(waitInsertDte)) {
            // 记录当前报错的信息
            DataTableEntity logDte = new DataTableEntity();
            for (int i = 0; i < waitInsertDte.getRows(); i++) {
                FieldSetEntity singlePageDataFse = waitInsertDte.getFieldSetEntity(i);
                FieldSetEntity logFse = new FieldSetEntity("product_sys_sp_deal_log");
                logFse.setValue("content", BaseUtil.fieldSetEntityToJson(singlePageDataFse).toJSONString());
                logFse.setValue("error", errorInfo);
                logFse.setValue("group_uuid", groupUUID);
                logFse.setValue("created_utc_datetime", curTime);
                logDte.addFieldSetEntity(logFse);
            }
            baseDao.add(logDte);
        } else {
            FieldSetEntity logFse = new FieldSetEntity("product_sys_sp_deal_log");
            logFse.setValue("content", "非数据处理阶段出错");
            logFse.setValue("error", errorInfo);
            logFse.setValue("group_uuid", groupUUID);
            logFse.setValue("created_utc_datetime", curTime);
            baseDao.add(logFse);
        }
    }
 
    /**
     * 获取单月插入dte
     * @param singleMonthMap  单月数据map
     * @param insertTableName 报表库表名
     * @param sourceInfo      采集配置表id
     * @return  单批次中单月(因为按月分表)待插入的dte
     */
    private DataTableEntity getSingleMonthInsertDte(Map<String, FieldSetEntity> singleMonthMap, String insertTableName, String sourceInfo) {
        DataTableEntity waitInsertDte = new DataTableEntity();
        for (Map.Entry<String, FieldSetEntity> entry : singleMonthMap.entrySet()) {
            FieldSetEntity waitInsertFse = entry.getValue();
            waitInsertFse.setTableName(insertTableName);
            waitInsertFse.setValue("pre_master_key", entry.getKey());
            waitInsertFse.setValue("source_info", sourceInfo);
            waitInsertDte.addFieldSetEntity(waitInsertFse);
        }
        return waitInsertDte;
    }
 
    /**
     * 获取创建表的service
     * @param tableName     表名,子库历史表表名
     * @param subDbe        子库数据库连接实例
     * @param reportDbe     报表数据库连接实例
     * @return  能够执行创建表DDL语句的service
     */
    private DataArchivingService.DataArchivingServiceImpl getDDLService(String tableName, DataBaseEntity subDbe, DataBaseEntity reportDbe) {
        Dao sourceDao = subDbe.getDao();
        Dao targetDao = reportDbe.getDao();
        String sourceDbName = subDbe.getDbName();
        String targetDbName = reportDbe.getDbName();
        return dataArchivingService.new DataArchivingServiceImpl(sourceDao, targetDao, tableName, "requestHistory", sourceDbName, targetDbName);
    }
 
    /**
     * 检查是否所有表都存在,若是不存在,那么创建
     * @param tableTailSet          表尾缀集合,时间
     * @param reportTableNamePrefix 报表库表名前缀
     * @param createTableService    创建表的service
     */
    private void checkTableIfNoThenCreate(Set<String> tableTailSet, String reportTableNamePrefix, DataArchivingService.DataArchivingServiceImpl createTableService) {
        for (String tableTail : tableTailSet) {
            createTableService.createTable(reportTableNamePrefix, tableTail, null);
        }
    }
 
    /**
     * 更新当前操作的表的统计日志信息
     * @param tableName                 表名,子库正式表表名
     * @param initId                    本次执行前,采集库数据字典中参数里对应表的最大执行id
     * @param curMaxMasterKeyValue      从子库中获取到的本批次数据最大的id
     * @param operateCount              插入报表库数据的总条数
     */
    private void updateOperateTableInfo(String tableName, long initId, int curMaxMasterKeyValue, int operateCount) {
        String maxIdDictLabel = String.format("%s_dealt_max_id", tableName);
        String latestTimeDictLabel = String.format("%s_latest_time", tableName);
        String latestOperateCountDictLabel = String.format("%s_latest_operate_count", tableName);
        DataTableEntity dictDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DICT, "dict_name=? AND dict_label IN (?,?,?)", new Object[]{DICT_NAME_REQUEST_HISTORY, maxIdDictLabel, latestTimeDictLabel, latestOperateCountDictLabel});
        FieldSetEntity maxIdDictFse = null;
        FieldSetEntity latestTimeDictFse = null;
        FieldSetEntity latestOperateCountDictFse = null;
        if (!DataTableEntity.isEmpty(dictDte)) {
            for (int i = 0; i < dictDte.getRows(); i++) {
                FieldSetEntity dictFse = dictDte.getFieldSetEntity(i);
                if (maxIdDictLabel.equals(dictFse.getString("dict_label"))) {
                    maxIdDictFse = dictFse;
                } else if (latestTimeDictLabel.equals(dictFse.getString("dict_label"))) {
                    latestTimeDictFse = dictFse;
                } else if (latestOperateCountDictLabel.equals(dictFse.getString("dict_label"))) {
                    latestOperateCountDictFse = dictFse;
                }
            }
        }
 
        FieldSetEntity newDictFse = new FieldSetEntity(CmnConst.PRODUCT_SYS_DICT);
        newDictFse.setValue("dict_name", DICT_NAME_REQUEST_HISTORY);
        newDictFse.setValue("is_used", 1);
        newDictFse.setValue("sequence", 1);
        newDictFse.setValue("client_type", "Web");
        newDictFse.setValue("created_utc_datetime", new Date());
        newDictFse.setValue("created_by", 1);
 
        // 最大id
        if (initId < curMaxMasterKeyValue) {
            saveDictFse(maxIdDictFse, newDictFse, maxIdDictLabel, curMaxMasterKeyValue);
        }
        // 最近执行时间
        saveDictFse(latestTimeDictFse, newDictFse, latestTimeDictLabel, new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        // 最近一次执行总数
        saveDictFse(latestOperateCountDictFse, newDictFse, latestOperateCountDictLabel, operateCount);
    }
 
    /**
     * 保存请求历史在数据字典里面的参数fse
     * @param saveDictFse       需要保存dict的fse
     * @param modelDictFse      模板fse
     * @param dictLabel         标签
     * @param dictValue         值
     */
    private void saveDictFse(FieldSetEntity saveDictFse, FieldSetEntity modelDictFse, String dictLabel, Object dictValue) {
        if (saveDictFse == null) {
            saveDictFse = modelDictFse.clones();
            saveDictFse.setValue("dict_label", dictLabel);
        } else {
            saveDictFse.setValue("updated_utc_datetime", new Date());
            saveDictFse.setValue("updated_by", 1);
        }
        saveDictFse.setValue("dict_value", dictValue);
        baseDao.saveFieldSetEntity(saveDictFse);
    }
 
    /**
     * 拼接报表库查询语句
     * @param monthMap              月份map
     * @param reportTableNamePrefix 报表库表名前缀
     * @param sourceInfo            采集表id
     * @return  报表库查询语句
     */
    private String joinReportDbQuerySql(Map<String, Map<String, FieldSetEntity>> monthMap, String reportTableNamePrefix, String sourceInfo) {
        StringBuilder sql = new StringBuilder(128);
        String model = "SELECT * FROM %s_%s WHERE source_info='%s' AND %s";
        for (Map.Entry<String, Map<String, FieldSetEntity>> entry : monthMap.entrySet()) {
            String month = entry.getKey();
            Set<String> masterKeySet = entry.getValue().keySet();
            if (sql.length() > 0) {
                sql.append("\nUNION ALL\n");
            }
            sql.append(String.format(model, reportTableNamePrefix, month, sourceInfo, BaseUtil.buildQuestionMarkFilter("pre_master_key", masterKeySet.toArray(), true)));
        }
        return sql.toString();
    }
 
    /**
     * 获取子库对应表的信息:表名,主键字段,时间字段等
     * @param subDbUuid     子库连接配置表uuid
     * @return  JSONObject 子库对应表的信息
     */
    private JSONObject getSubDbTableInfo(String subDbUuid) {
        String filter = String.format("data_source=? AND %s", BaseUtil.buildQuestionMarkFilter("source_table", operateTableList.size(), true));
        List<String> paramList = Lists.newArrayList();
        paramList.add(subDbUuid);
        paramList.addAll(operateTableList);
        DataTableEntity tableDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, filter, paramList.toArray());
        JSONObject resultObj = new JSONObject();
        for (int i = 0; i < tableDte.getRows(); i++) {
            FieldSetEntity singleFse = tableDte.getFieldSetEntity(i);
            String tableName = singleFse.getString("source_table").toLowerCase(Locale.ROOT);
            resultObj.computeIfAbsent(tableName, s -> BaseUtil.fieldSetEntityToJson(singleFse));
        }
        return resultObj;
    }
 
    /**
     * 获取特殊处理信息 t_wip_detail_dealt_max_id
     * @return  JSONObject 特殊处理信息
     */
    private JSONObject getSpDealInfo() {
        DataTableEntity dictDte = baseDao.listTable(CmnConst.PRODUCT_SYS_DICT, "dict_name=?", new Object[]{DICT_NAME_REQUEST_HISTORY});
        JSONObject resultObj = new JSONObject();
        for (int i = 0; i < dictDte.getRows(); i++) {
            FieldSetEntity singleFse = dictDte.getFieldSetEntity(i);
            String dictLabel = singleFse.getString("dict_label");
            String dictValue = singleFse.getString("dict_value");
            if ("tableFormat".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库历史表格式");
                }
                resultObj.put("tableFormat", singleFse.getString("dict_value"));
            } else if ("dealTables".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "需要处理的表名");
                }
                operateTableList = Arrays.asList(dictValue.split(","));
            } else if ("pageSize".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "单页处理页大小");
                }
                pageSize = Integer.parseInt(dictValue);
                if (pageSize <= 0) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "正确的单页处理页大小");
                }
            } else if ("subDbIp".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库地址");
                }
                subDbIp = dictValue;
            } else if ("subDbPort".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "子库端口");
                }
                subDbPort = Integer.parseInt(dictValue);
            } else if ("reportDbIp".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "报表库地址");
                }
                reportDbIp = dictValue;
            } else if ("reportDbPort".equals(dictLabel)) {
                if (StringUtils.isEmpty(dictValue)) {
                    throw new BaseException(ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getValue(), ErrorCode.REQUEST_HISTORY_SP_DEAL_FAIL_NO_TABLE_DATA.getText() + "报表库端口");
                }
                reportDbPort = Integer.parseInt(dictValue);
            } else {
                String dictLabelLowerCase = dictLabel.toLowerCase(Locale.ROOT);
                for (String tableName : operateTableList) {
                    String tablenNameLowerCase = tableName.toLowerCase(Locale.ROOT);
                    if (dictLabelLowerCase.startsWith(tablenNameLowerCase)) {
                        JSONObject singleObj = (JSONObject) resultObj.computeIfAbsent(tablenNameLowerCase, s -> new JSONObject());
                        singleObj.put(dictLabelLowerCase.replace(String.format("%s_", tablenNameLowerCase), ""), singleFse.getValue("dict_value"));
                    }
                }
            }
        }
        return resultObj;
    }
 
    /**
     * 处理子库中查询到的数据,按照月份和主键分别置于不同的容器中
     * @param singlePageDataDte         子库数据
     * @param singleSubDbTableInfoObj   表信息
     * @param monthMap                  容器:Map<yyyyMM, <masterKey, Fse>>
     */
    private void dealSubDbData(DataTableEntity singlePageDataDte, JSONObject singleSubDbTableInfoObj, Map<String, Map<String, FieldSetEntity>> monthMap) {
        String masterKeyFieldName = singleSubDbTableInfoObj.getString("auto_field");
        String timeFieldName = singleSubDbTableInfoObj.getString("time_field");
        for (int i = 0; i < singlePageDataDte.getRows(); i++) {
            FieldSetEntity singlePageDataFse = singlePageDataDte.getFieldSetEntity(i);
            String timeStr = singlePageDataFse.getDate(timeFieldName, "yyyyMM");
            String masterValue = singlePageDataFse.getString(masterKeyFieldName);
            monthMap.computeIfAbsent(timeStr, s -> Maps.newHashMap()).put(masterValue, singlePageDataFse);
        }
    }
 
    /**
     * 停止
     */
    private void stop(DataBaseEntity... dbeArr) {
        if (dbeArr != null) {
            for (DataBaseEntity dbe : dbeArr) {
                if (dbe != null && dbe.getDao() != null) {
                    dbe.getDao().closeConnection();
                }
            }
        }
        processingFlag = false;
        stopFlag = false;
    }
    /*========================请求历史-final========================*/
}