xpc
2024-01-30 cc9ebffc57e6343745cb1eadc47360d6107936bc
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -1,5 +1,6 @@
package com.product.data.center.service;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -103,6 +104,8 @@
                  int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                  StringBuilder tempFilter = new StringBuilder(128);
                  for (int j = 0; j < totalPage; j++) {
                     //查询时检查队列数量,如果超过最大值则等待
                     checkQueueCount(sourceTable);
                     while (!allowQuery(sourceTable)) {
                        Thread.sleep(RandomUtil.randomInt(800, 1200));
                     }
@@ -301,6 +304,22 @@
   }
   /**
    * 检查队列数量
    */
   private void checkQueueCount(String tableName) {
      //初始时间
      long startTime = System.currentTimeMillis();
      while (queryMap.get(tableName) != null && queryMap.get(tableName).size() >= 10) {
         //与初始时间比较,如果超过10分钟则打印
         if (System.currentTimeMillis() - startTime > 600000) {
            WriteUtil.append("DA-队列数量超过10,当前数量:" + queryMap.get(tableName).size());
            startTime = System.currentTimeMillis();
         }
         ThreadUtil.sleep(10000);
      }
   }
   /**
    * 放入队列
    *
    * @param tableName