许鹏程
2023-09-07 09ba53cbe50ea20ec17ea8c9e5c620fa520f5b45
commit
已修改3个文件
已删除7个文件
555 ■■■■ 文件已修改
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/compiler.xml 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/encodings.xml 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/jarRepositories.xml 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/misc.xml 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/runConfigurations.xml 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/vcs.xml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/workspace.xml 358 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -10,3 +10,4 @@
#所有目录下的target文件夹
**/target/
.svn
.idea
.idea/compiler.xml
ÎļþÒÑɾ³ý
.idea/encodings.xml
ÎļþÒÑɾ³ý
.idea/jarRepositories.xml
ÎļþÒÑɾ³ý
.idea/misc.xml
ÎļþÒÑɾ³ý
.idea/runConfigurations.xml
ÎļþÒÑɾ³ý
.idea/vcs.xml
ÎļþÒÑɾ³ý
.idea/workspace.xml
ÎļþÒÑɾ³ý
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -17,6 +17,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +53,7 @@
    /**
     * æŸ¥è¯¢
     *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
@@ -149,6 +151,7 @@
    /**
     * ä»Žé˜Ÿåˆ—中获取
     *
     * @param tableName
     * @return
     */
@@ -165,6 +168,7 @@
    /**
     * åˆ¤å®šæ˜¯å¦æŸ¥è¯¢å®Œæ¯•
     *
     * @param tableName
     * @return
     */
@@ -176,6 +180,7 @@
    /**
     * åˆ¤å®šæ’入队列(查询完成后放入的队列)是否为空
     *
     * @param tableName
     * @return
     */
@@ -186,6 +191,7 @@
    /**
     * å…³é—­æŸ¥è¯¢çº¿ç¨‹
     *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
@@ -202,6 +208,7 @@
    /**
     * æ¸…理
     *
     * @param tableName
     */
    public void clear(String tableName) {
@@ -213,6 +220,7 @@
    /**
     * æå–错误日志,只能提取一次,提取后会直接清空
     *
     * @param tableName
     * @return
     */
@@ -230,6 +238,7 @@
    /**
     * èŽ·å–åˆ†æ®µçš„æ•°é‡ï¼Œæœ€å¤š16,最小为0,为0表示用不上所有的线程
     *
     * @param totalCount
     * @return
     */
@@ -253,6 +262,7 @@
    /**
     * èŽ·å–sql:查询范围内的最大id值
     *
     * @param uniqueField
     * @param sourceTable
     * @param filter
@@ -292,6 +302,7 @@
    /**
     * æ”¾å…¥é˜Ÿåˆ—
     *
     * @param tableName
     * @param dte
     */
@@ -302,6 +313,18 @@
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            if (tableName.endsWith("BAK20230823")) {
                //检查dte中的source_info å’Œ pre_master_key æ˜¯å¦ä¸ºç©º
                for (int i = 0; i < dte.getRows(); i++) {
                    String sourceInfo = dte.getString(i, "source_info");
                    String preMasterKey = dte.getString(i, "pre_master_key");
                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                        dte.setFieldValue(i, "source_info", "ch-kt");
                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
                    }
                }
            }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
        }
@@ -309,6 +332,7 @@
    /**
     * æŸ¥è¯¢å¼€å§‹ï¼Œå‘存活map中添加1
     *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
@@ -320,6 +344,7 @@
    /**
     * æŸ¥è¯¢ç»“束,向存活map中减少1
     *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
@@ -337,6 +362,7 @@
    /**
     * å…è®¸æ‰§è¡ŒæŸ¥è¯¢ï¼ˆé¿å…é˜Ÿåˆ—中等待插入的太多,导致内存溢出)
     *
     * @param tableName
     * @return
     */
@@ -348,6 +374,7 @@
    /**
     * æ’入错误日志
     *
     * @param tableName
     * @param error
     */
@@ -369,6 +396,7 @@
    /**
     * èŽ·å–é¡µæ•°ï¼Œå‘ä¸Šå–æ•´
     *
     * @param count
     * @param size
     * @return
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java
@@ -4,6 +4,7 @@
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
@@ -42,6 +43,10 @@
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
@@ -104,8 +109,8 @@
        Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
        HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
        Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
        HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
//        Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
//        HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
        //主库数据源配置
@@ -117,7 +122,7 @@
        try {
            Connection connection = dao.getConnection();
            connection.setAutoCommit(false);
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData, productSnData};
            HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
            insertMasterTableData(dao, historyEntities);
            insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
            connection.commit();
@@ -164,7 +169,13 @@
     */
    public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
            if (DataTableEntity.isEmpty(masterDataTable)) {
                continue;
            }
            Object[] objects = masterDataTable.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).toArray();
            //查询主库数据是否存在
            DataTableEntity list = dao.getList(historyEntity.getTableName(),
@@ -200,6 +211,9 @@
    public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                                   Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
        for (HistoryEntity historyEntity : historyEntities) {
            if (historyEntity == null) {
                continue;
            }
            Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
            if (groupData == null || groupData.isEmpty()) {
                continue;
@@ -248,6 +262,9 @@
    }
    public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
        if (DataTableEntity.isEmpty(dt)) {
            return null;
        }
        HistoryEntity historyEntity = new HistoryEntity();
        historyEntity.setMoNumberField("mo_number");
        if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
@@ -304,33 +321,24 @@
    }
    public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
            serialNumber, ErrorCode[] errorCodes) {
            serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
        if (CollectionUtil.isEmpty(tableSet)) {
            throw new BaseException(errorCodes[0]);
        }
        String[] tableArray = tableSet.toArray(new String[]{});
        StringBuilder sql = new StringBuilder();
        sql.append("with ");
        List<Object> params = new ArrayList<>();
        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
        //多线程查询单张表,等待所有线程查询完毕
        for (String tableName : tableArray) {
            objectCompletionService.submit(() -> dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + "a where " + filterFieldName + " = ?", new Object[]{serialNumber}));
        }
        DataTableEntity data = new DataTableEntity();
        Future<DataTableEntity> take = objectCompletionService.take();
        for (int i = 0; i < tableArray.length; i++) {
            String takcingTable = tableArray[i];
            if (i > 0) {
                sql.append(",\n\t");
            DataTableEntity dataTableEntity = take.get();
            BaseUtil.dataTableMerge(data, dataTableEntity);
            }
            sql.append("`").append(takcingTable).append("` as (select a.*,'").append(takcingTable).append("' as '~table_name~' ").append(" from ").append(takcingTable).append(" a where `").append(filterFieldName).append("`=? )");
            params.add(serialNumber);
        }
        sql.append("\nselect * from (");
        for (int i = 0; i < tableArray.length; i++) {
            if (i > 0) {
                sql.append("\n\tunion all");
            }
            sql.append("\n\tselect * from `").append(tableArray[i]).append("`");
        }
        sql.append("\n) a");
        DataTableEntity data = dao.getList(sql.toString(), params.toArray());
        if (DataTableEntity.isEmpty(data)) {
        if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
            throw new BaseException(errorCodes[1]);
        }
        return data;