许鹏程
2023-09-07 09ba53cbe50ea20ec17ea8c9e5c620fa520f5b45
commit
已修改3个文件
已删除7个文件
1223 ■■■■■ 文件已修改
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/compiler.xml 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/encodings.xml 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/jarRepositories.xml 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/misc.xml 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/runConfigurations.xml 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/vcs.xml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/workspace.xml 358 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java 698 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -10,3 +10,4 @@
#所有目录下的target文件夹
**/target/
.svn
.idea
.idea/compiler.xml
ÎļþÒÑɾ³ý
.idea/encodings.xml
ÎļþÒÑɾ³ý
.idea/jarRepositories.xml
ÎļþÒÑɾ³ý
.idea/misc.xml
ÎļþÒÑɾ³ý
.idea/runConfigurations.xml
ÎļþÒÑɾ³ý
.idea/vcs.xml
ÎļþÒÑɾ³ý
.idea/workspace.xml
ÎļþÒÑɾ³ý
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -17,6 +17,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,354 +34,381 @@
 */
@Service
public class DataArchivingQueue extends AbstractBaseService {
    // æŸ¥è¯¢é˜Ÿåˆ—map:map<表名,dte队列>
    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
    // æŸ¥è¯¢å­˜æ´»map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
    // æŸ¥è¯¢çº¿ç¨‹map
    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
    // é”™è¯¯æ—¥å¿—map
    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
    // å•表查询最大线程数
    private static final int QUERY_THREAD_COUNT = 3;
    // å•表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + å•表查询最大线程数)
    private static final int QUERY_MAX_BATCH_COUNT = 4;
    // æŸ¥è¯¢æ¯é¡µå¤§å°
    private static final int QUERY_PAGE_SIZE = 50000;
    // æ’入每页大小
    public static final int INSERT_PAGE_SIZE = 5000;
    // æŸ¥è¯¢é˜Ÿåˆ—map:map<表名,dte队列>
    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
    // æŸ¥è¯¢å­˜æ´»map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
    // æŸ¥è¯¢çº¿ç¨‹map
    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
    // é”™è¯¯æ—¥å¿—map
    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
    // å•表查询最大线程数
    private static final int QUERY_THREAD_COUNT = 3;
    // å•表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + å•表查询最大线程数)
    private static final int QUERY_MAX_BATCH_COUNT = 4;
    // æŸ¥è¯¢æ¯é¡µå¤§å°
    private static final int QUERY_PAGE_SIZE = 50000;
    // æ’入每页大小
    public static final int INSERT_PAGE_SIZE = 5000;
    /**
     * æŸ¥è¯¢
     * @param sourceDbe
     * @param sourceTable
     * @param filter
     * @param params
     * @param uniqueField   ä¸»é”®ï¼Œä¸ä»…会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param minID
     */
    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
        Dao sourceDao = null;
        try {
            StringBuilder countSql = new StringBuilder(128);
            countSql.append("select count(*) count_value from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                countSql.append(" where ").append(filter);
            }
            sourceDao = sourceDbe.newDao();
            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
            int partCount = getPartCount(totalCount);
            shutdownQueryThread(sourceTable);
            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
            queryThreadMap.put(sourceTable, executorService);
            StringBuilder rangeSql = new StringBuilder(128);
            String tempPartMinID = minID;
            int partSize = partCount * QUERY_PAGE_SIZE;
            int count = ceilPage(totalCount, partSize);
            for (int i = 1; i  <= count; i++) {
                rangeSql.setLength(0);
                if (!StringUtils.isEmpty(filter)) {
                    rangeSql.append(filter).append(" and ");
                }
                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
                if (DataTableEntity.isEmpty(rangeDte)) {
                    continue;
                }
                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
                String curPartMaxID = rangeFse.getString("max_id");
                String curPartMinID = tempPartMinID;
                executorService.submit(() -> {
                    String threadInfo = String.valueOf(Thread.currentThread().getId());
                    Dao threadSourceDao = null;
                    String thisPartMinID = curPartMinID;
                    try {
                        threadSourceDao = sourceDbe.newDao();
                        startQuery(sourceTable, threadInfo);
                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                        StringBuilder tempFilter = new StringBuilder(128);
                        for (int j = 0; j < totalPage; j++) {
                            while (!allowQuery(sourceTable)) {
                                Thread.sleep(RandomUtil.randomInt(800, 1200));
                            }
                            tempFilter.setLength(0);
                            tempFilter.append(uniqueField);
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
                            if (minID.equals(thisPartMinID)) {
                                tempFilter.append(">=");
                            } else {
                                tempFilter.append(">");
                            }
                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
                            if (!StringUtils.isEmpty(filter)) {
                                tempFilter.append(" and ").append(filter);
                            }
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
                            if (!DataTableEntity.isEmpty(allDte)) {
                                add(sourceTable, allDte);
                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
                            } else {
                                break;
                            }
                        }
                    } catch (Exception e) {
                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        clear(sourceTable);
                    } finally {
                        if (threadSourceDao != null) {
                            threadSourceDao.closeConnection();
                        }
                        finalQuery(sourceTable, threadInfo);
                    }
                });
                tempPartMinID = curPartMaxID;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (sourceDao != null) {
                sourceDao.closeConnection();
            }
        }
    }
    /**
     * æŸ¥è¯¢
     *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
     * @param params
     * @param uniqueField ä¸»é”®ï¼Œä¸ä»…会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param minID
     */
    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
        Dao sourceDao = null;
        try {
            StringBuilder countSql = new StringBuilder(128);
            countSql.append("select count(*) count_value from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                countSql.append(" where ").append(filter);
            }
            sourceDao = sourceDbe.newDao();
            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
            int partCount = getPartCount(totalCount);
            shutdownQueryThread(sourceTable);
            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
            queryThreadMap.put(sourceTable, executorService);
            StringBuilder rangeSql = new StringBuilder(128);
            String tempPartMinID = minID;
            int partSize = partCount * QUERY_PAGE_SIZE;
            int count = ceilPage(totalCount, partSize);
            for (int i = 1; i <= count; i++) {
                rangeSql.setLength(0);
                if (!StringUtils.isEmpty(filter)) {
                    rangeSql.append(filter).append(" and ");
                }
                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
                if (DataTableEntity.isEmpty(rangeDte)) {
                    continue;
                }
                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
                String curPartMaxID = rangeFse.getString("max_id");
                String curPartMinID = tempPartMinID;
                executorService.submit(() -> {
                    String threadInfo = String.valueOf(Thread.currentThread().getId());
                    Dao threadSourceDao = null;
                    String thisPartMinID = curPartMinID;
                    try {
                        threadSourceDao = sourceDbe.newDao();
                        startQuery(sourceTable, threadInfo);
                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                        StringBuilder tempFilter = new StringBuilder(128);
                        for (int j = 0; j < totalPage; j++) {
                            while (!allowQuery(sourceTable)) {
                                Thread.sleep(RandomUtil.randomInt(800, 1200));
                            }
                            tempFilter.setLength(0);
                            tempFilter.append(uniqueField);
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
                            if (minID.equals(thisPartMinID)) {
                                tempFilter.append(">=");
                            } else {
                                tempFilter.append(">");
                            }
                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
                            if (!StringUtils.isEmpty(filter)) {
                                tempFilter.append(" and ").append(filter);
                            }
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
                            if (!DataTableEntity.isEmpty(allDte)) {
                                add(sourceTable, allDte);
                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
                            } else {
                                break;
                            }
                        }
                    } catch (Exception e) {
                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        clear(sourceTable);
                    } finally {
                        if (threadSourceDao != null) {
                            threadSourceDao.closeConnection();
                        }
                        finalQuery(sourceTable, threadInfo);
                    }
                });
                tempPartMinID = curPartMaxID;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (sourceDao != null) {
                sourceDao.closeConnection();
            }
        }
    }
    /**
     * ä»Žé˜Ÿåˆ—中获取
     * @param tableName
     * @return
     */
    public DataTableEntity get(String tableName) {
        WriteUtil.append("DA-从队列中获取-表名:" + tableName);
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                return null;
            }
            return queryQueue.poll();
        }
    }
    /**
     * ä»Žé˜Ÿåˆ—中获取
     *
     * @param tableName
     * @return
     */
    public DataTableEntity get(String tableName) {
        WriteUtil.append("DA-从队列中获取-表名:" + tableName);
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                return null;
            }
            return queryQueue.poll();
        }
    }
    /**
     * åˆ¤å®šæ˜¯å¦æŸ¥è¯¢å®Œæ¯•
     * @param tableName
     * @return
     */
    public boolean checkQueryFinish(String tableName) {
        WriteUtil.append("DA-判定是否查询完毕");
        Set<String> set = existsQueryMap.get(tableName);
        return set == null || set.isEmpty();
    }
    /**
     * åˆ¤å®šæ˜¯å¦æŸ¥è¯¢å®Œæ¯•
     *
     * @param tableName
     * @return
     */
    public boolean checkQueryFinish(String tableName) {
        WriteUtil.append("DA-判定是否查询完毕");
        Set<String> set = existsQueryMap.get(tableName);
        return set == null || set.isEmpty();
    }
    /**
     * åˆ¤å®šæ’入队列(查询完成后放入的队列)是否为空
     * @param tableName
     * @return
     */
    public boolean checkInsertQueueEmpty(String tableName) {
        WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
    }
    /**
     * åˆ¤å®šæ’入队列(查询完成后放入的队列)是否为空
     *
     * @param tableName
     * @return
     */
    public boolean checkInsertQueueEmpty(String tableName) {
        WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
    }
    /**
     * å…³é—­æŸ¥è¯¢çº¿ç¨‹
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
        synchronized (tableName.intern()) {
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
                queryThreadMap.remove(tableName);
            }
        }
    }
    /**
     * å…³é—­æŸ¥è¯¢çº¿ç¨‹
     *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
        synchronized (tableName.intern()) {
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
                queryThreadMap.remove(tableName);
            }
        }
    }
    /**
     * æ¸…理
     * @param tableName
     */
    public void clear(String tableName) {
        synchronized (tableName.intern()) {
            queryMap.remove(tableName);
            shutdownQueryThread(tableName);
        }
    }
    /**
     * æ¸…理
     *
     * @param tableName
     */
    public void clear(String tableName) {
        synchronized (tableName.intern()) {
            queryMap.remove(tableName);
            shutdownQueryThread(tableName);
        }
    }
    /**
     * æå–错误日志,只能提取一次,提取后会直接清空
     * @param tableName
     * @return
     */
    public String getErrorLog(String tableName) {
        synchronized (tableName.intern()) {
            if (errorLogMap == null) {
                return null;
            } else {
                StringBuilder result = errorLogMap.get(tableName);
                errorLogMap.remove(tableName);
                return result == null ? null : result.toString();
            }
        }
    }
    /**
     * æå–错误日志,只能提取一次,提取后会直接清空
     *
     * @param tableName
     * @return
     */
    public String getErrorLog(String tableName) {
        synchronized (tableName.intern()) {
            if (errorLogMap == null) {
                return null;
            } else {
                StringBuilder result = errorLogMap.get(tableName);
                errorLogMap.remove(tableName);
                return result == null ? null : result.toString();
            }
        }
    }
    /**
     * èŽ·å–åˆ†æ®µçš„æ•°é‡ï¼Œæœ€å¤š16,最小为0,为0表示用不上所有的线程
     * @param totalCount
     * @return
     */
    private int getPartCount(int totalCount) {
        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
        if (partCount >= 16) {
            return 16;
        } else if (partCount >= 8) {
            return 8;
        } else if (partCount >= 4) {
            return 4;
        } else if (partCount >= 2) {
            return 2;
        } else if (partCount >= 1) {
            return 1;
        } else {
            return 0;
        }
    }
    /**
     * èŽ·å–åˆ†æ®µçš„æ•°é‡ï¼Œæœ€å¤š16,最小为0,为0表示用不上所有的线程
     *
     * @param totalCount
     * @return
     */
    private int getPartCount(int totalCount) {
        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
        if (partCount >= 16) {
            return 16;
        } else if (partCount >= 8) {
            return 8;
        } else if (partCount >= 4) {
            return 4;
        } else if (partCount >= 2) {
            return 2;
        } else if (partCount >= 1) {
            return 1;
        } else {
            return 0;
        }
    }
    /**
     * èŽ·å–sql:查询范围内的最大id值
     * @param uniqueField
     * @param sourceTable
     * @param filter
     * @param dbType
     * @param pageIndex
     * @param pageSize
     * @return
     */
    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
        int finalIndex = startIndex + pageSize;
        StringBuilder sql = new StringBuilder(128);
        if (DataBaseType.MYSQL.getValue() == dbType) {
            sql.append("select max(").append(uniqueField).append(") max_id from (");
            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n    where ").append(filter);
            }
            sql.append("\n    order by ").append(uniqueField);
            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
            sql.append("\n) t");
        } else if (DataBaseType.ORACLE.getValue() == dbType) {
            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n        WHERE ").append(filter);
            }
            sql.append("\n        ORDER BY ").append(uniqueField);
            sql.append("\n    ) T1");
            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
            sql.append("\n) T2");
            sql.append("\nWHERE R>").append(startIndex);
        }
        return sql.toString();
    }
    /**
     * èŽ·å–sql:查询范围内的最大id值
     *
     * @param uniqueField
     * @param sourceTable
     * @param filter
     * @param dbType
     * @param pageIndex
     * @param pageSize
     * @return
     */
    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
        int finalIndex = startIndex + pageSize;
        StringBuilder sql = new StringBuilder(128);
        if (DataBaseType.MYSQL.getValue() == dbType) {
            sql.append("select max(").append(uniqueField).append(") max_id from (");
            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n    where ").append(filter);
            }
            sql.append("\n    order by ").append(uniqueField);
            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
            sql.append("\n) t");
        } else if (DataBaseType.ORACLE.getValue() == dbType) {
            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n        WHERE ").append(filter);
            }
            sql.append("\n        ORDER BY ").append(uniqueField);
            sql.append("\n    ) T1");
            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
            sql.append("\n) T2");
            sql.append("\nWHERE R>").append(startIndex);
        }
        return sql.toString();
    }
    /**
     * æ”¾å…¥é˜Ÿåˆ—
     * @param tableName
     * @param dte
     */
    private void add(String tableName, DataTableEntity dte) {
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
        }
    }
    /**
     * æ”¾å…¥é˜Ÿåˆ—
     *
     * @param tableName
     * @param dte
     */
    private void add(String tableName, DataTableEntity dte) {
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            if (tableName.endsWith("BAK20230823")) {
                //检查dte中的source_info å’Œ pre_master_key æ˜¯å¦ä¸ºç©º
                for (int i = 0; i < dte.getRows(); i++) {
                    String sourceInfo = dte.getString(i, "source_info");
                    String preMasterKey = dte.getString(i, "pre_master_key");
                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                        dte.setFieldValue(i, "source_info", "ch-kt");
                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
                    }
                }
            }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
        }
    }
    /**
     * æŸ¥è¯¢å¼€å§‹ï¼Œå‘存活map中添加1
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
            set.add(threadInfo);
        }
    }
    /**
     * æŸ¥è¯¢å¼€å§‹ï¼Œå‘存活map中添加1
     *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
            set.add(threadInfo);
        }
    }
    /**
     * æŸ¥è¯¢ç»“束,向存活map中减少1
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.get(tableName);
            if (set == null || !set.contains(threadInfo)) {
                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
            }
            set.remove(threadInfo);
            if (set.isEmpty()) {
                existsQueryMap.remove(tableName);
            }
        }
    }
    /**
     * æŸ¥è¯¢ç»“束,向存活map中减少1
     *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.get(tableName);
            if (set == null || !set.contains(threadInfo)) {
                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
            }
            set.remove(threadInfo);
            if (set.isEmpty()) {
                existsQueryMap.remove(tableName);
            }
        }
    }
    /**
     * å…è®¸æ‰§è¡ŒæŸ¥è¯¢ï¼ˆé¿å…é˜Ÿåˆ—中等待插入的太多,导致内存溢出)
     * @param tableName
     * @return
     */
    private boolean allowQuery(String tableName) {
        synchronized (tableName.intern()) {
            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
        }
    }
    /**
     * å…è®¸æ‰§è¡ŒæŸ¥è¯¢ï¼ˆé¿å…é˜Ÿåˆ—中等待插入的太多,导致内存溢出)
     *
     * @param tableName
     * @return
     */
    private boolean allowQuery(String tableName) {
        synchronized (tableName.intern()) {
            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
        }
    }
    /**
     * æ’入错误日志
     * @param tableName
     * @param error
     */
    private void appendErrorLog(String tableName, String error) {
        synchronized (tableName.intern()) {
            StringBuilder errorSb = errorLogMap.get(tableName);
            if (errorSb == null) {
                errorSb = new StringBuilder(128);
                errorLogMap.put(tableName, errorSb);
            }
            if (errorSb.length() > 0) {
                errorSb.append("\n");
            }
            if (errorSb.length() < 2000) {
                errorSb.append(error);
            }
        }
    }
    /**
     * æ’入错误日志
     *
     * @param tableName
     * @param error
     */
    private void appendErrorLog(String tableName, String error) {
        synchronized (tableName.intern()) {
            StringBuilder errorSb = errorLogMap.get(tableName);
            if (errorSb == null) {
                errorSb = new StringBuilder(128);
                errorLogMap.put(tableName, errorSb);
            }
            if (errorSb.length() > 0) {
                errorSb.append("\n");
            }
            if (errorSb.length() < 2000) {
                errorSb.append(error);
            }
        }
    }
    /**
     * èŽ·å–é¡µæ•°ï¼Œå‘ä¸Šå–æ•´
     * @param count
     * @param size
     * @return
     */
    private int ceilPage(int count, int size) {
        if (size == 0) {
            if (count == 0) {
                return 0;
            } else {
                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
            }
        }
        return count / size + (count % size == 0 ? 0 : 1);
    }
    /**
     * èŽ·å–é¡µæ•°ï¼Œå‘ä¸Šå–æ•´
     *
     * @param count
     * @param size
     * @return
     */
    private int ceilPage(int count, int size) {
        if (size == 0) {
            if (count == 0) {
                return 0;
            } else {
                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
            }
        }
        return count / size + (count % size == 0 ? 0 : 1);
    }
}
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java
@@ -4,6 +4,7 @@
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
@@ -42,6 +43,10 @@
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
@@ -104,8 +109,8 @@
        Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
        HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
        Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
        HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
//        Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
//        HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
        //主库数据源配置
@@ -117,7 +122,7 @@
        try {
            Connection connection = dao.getConnection();
            connection.setAutoCommit(false);
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData, productSnData};
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
            insertMasterTableData(dao, historyEntities);
            insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
            connection.commit();
@@ -164,7 +169,13 @@
     */
    public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
            if (DataTableEntity.isEmpty(masterDataTable)) {
                continue;
            }
            Object[] objects = masterDataTable.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).toArray();
            //查询主库数据是否存在
            DataTableEntity list = dao.getList(historyEntity.getTableName(),
@@ -200,6 +211,9 @@
    public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                                   Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
            if (groupData == null || groupData.isEmpty()) {
                continue;
@@ -248,6 +262,9 @@
    }
    public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
        if (DataTableEntity.isEmpty(dt)) {
            return null;
        }
        HistoryEntity historyEntity = new HistoryEntity();
        historyEntity.setMoNumberField("mo_number");
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
@@ -304,33 +321,24 @@
    }
    public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
            serialNumber, ErrorCode[] errorCodes) {
            serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
        if (CollectionUtil.isEmpty(tableSet)) {
            throw new BaseException(errorCodes[0]);
        }
        String[] tableArray = tableSet.toArray(new String[]{});
        StringBuilder sql = new StringBuilder();
        sql.append("with ");
        List<Object> params = new ArrayList<>();
        for (int i = 0; i < tableArray.length; i++) {
            String takcingTable = tableArray[i];
            if (i > 0) {
                sql.append(",\n\t");
            }
            sql.append("`").append(takcingTable).append("` as (select a.*,'").append(takcingTable).append("' as '~table_name~' ").append(" from ").append(takcingTable).append(" a where `").append(filterFieldName).append("`=? )");
            params.add(serialNumber);
        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
        //多线程查询单张表,等待所有线程查询完毕
        for (String tableName : tableArray) {
            objectCompletionService.submit(() -> dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + "a where " + filterFieldName + " = ?", new Object[]{serialNumber}));
        }
        sql.append("\nselect * from (");
        DataTableEntity data = new DataTableEntity();
        Future<DataTableEntity> take = objectCompletionService.take();
        for (int i = 0; i < tableArray.length; i++) {
            if (i > 0) {
                sql.append("\n\tunion all");
            }
            sql.append("\n\tselect * from `").append(tableArray[i]).append("`");
            DataTableEntity dataTableEntity = take.get();
            BaseUtil.dataTableMerge(data, dataTableEntity);
        }
        sql.append("\n) a");
        DataTableEntity data = dao.getList(sql.toString(), params.toArray());
        if (DataTableEntity.isEmpty(data)) {
        if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
            throw new BaseException(errorCodes[1]);
        }
        return data;