许鹏程
2023-10-11 89f165d54d2ffe9d770fb7fb1e69f1f89c1c2775
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java
@@ -4,6 +4,7 @@
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
@@ -42,6 +43,10 @@
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
@@ -69,7 +74,7 @@
   /**
    * 获取历史数据
    */
   public void getHistoryData(FieldSetEntity fse) throws BaseException {
   public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException {
      //机号
      String serialNumber = fse.getString("serial_number");
@@ -104,8 +109,8 @@
      Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
      HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
      Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
      HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
//      Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
//      HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
      //主库数据源配置
@@ -117,7 +122,7 @@
      try {
         Connection connection = dao.getConnection();
         connection.setAutoCommit(false);
         HistoryEntity[] historyEntities = {trackingData, keypData, detailData, productSnData};
         HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
         insertMasterTableData(dao, historyEntities);
         insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
         connection.commit();
@@ -164,7 +169,13 @@
    */
   public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
      for (HistoryEntity historyEntity : historyEntities) {
         if (historyEntity == null) {
            continue;
         }
         DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
         if (DataTableEntity.isEmpty(masterDataTable)) {
            continue;
         }
         Object[] objects = masterDataTable.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).toArray();
         //查询主库数据是否存在
         DataTableEntity list = dao.getList(historyEntity.getTableName(),
@@ -200,6 +211,9 @@
   public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
                           Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
      for (HistoryEntity historyEntity : historyEntities) {
         if (historyEntity == null) {
            continue;
         }
         Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
         if (groupData == null || groupData.isEmpty()) {
            continue;
@@ -248,6 +262,9 @@
   }
   public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
      if (DataTableEntity.isEmpty(dt)) {
         return null;
      }
      HistoryEntity historyEntity = new HistoryEntity();
      historyEntity.setMoNumberField("mo_number");
      if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
@@ -304,33 +321,24 @@
   }
   public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
         serialNumber, ErrorCode[] errorCodes) {
         serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
      if (CollectionUtil.isEmpty(tableSet)) {
         throw new BaseException(errorCodes[0]);
      }
      String[] tableArray = tableSet.toArray(new String[]{});
      StringBuilder sql = new StringBuilder();
      sql.append("with ");
      List<Object> params = new ArrayList<>();
      for (int i = 0; i < tableArray.length; i++) {
         String takcingTable = tableArray[i];
         if (i > 0) {
            sql.append(",\n\t");
         }
         sql.append("`").append(takcingTable).append("` as (select a.*,'").append(takcingTable).append("' as '~table_name~' ").append(" from ").append(takcingTable).append(" a where `").append(filterFieldName).append("`=? )");
         params.add(serialNumber);
      CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
      //多线程查询单张表,等待所有线程查询完毕
      for (String tableName : tableArray) {
         objectCompletionService.submit(() -> dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + "a where " + filterFieldName + " = ?", new Object[]{serialNumber}));
      }
      sql.append("\nselect * from (");
      DataTableEntity data = new DataTableEntity();
      Future<DataTableEntity> take = objectCompletionService.take();
      for (int i = 0; i < tableArray.length; i++) {
         if (i > 0) {
            sql.append("\n\tunion all");
         }
         sql.append("\n\tselect * from `").append(tableArray[i]).append("`");
         DataTableEntity dataTableEntity = take.get();
         BaseUtil.dataTableMerge(data, dataTableEntity);
      }
      sql.append("\n) a");
      DataTableEntity data = dao.getList(sql.toString(), params.toArray());
      if (DataTableEntity.isEmpty(data)) {
      if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
         throw new BaseException(errorCodes[1]);
      }
      return data;