xpc
2024-01-30 cc9ebffc57e6343745cb1eadc47360d6107936bc
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -1,5 +1,6 @@
package com.product.data.center.service;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.RandomUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -103,6 +104,8 @@
                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                        StringBuilder tempFilter = new StringBuilder(128);
                        for (int j = 0; j < totalPage; j++) {
                     //查询时检查队列数量,如果超过最大值则等待
                     checkQueueCount(sourceTable);
                            while (!allowQuery(sourceTable)) {
                                Thread.sleep(RandomUtil.randomInt(800, 1200));
                            }
@@ -301,6 +304,22 @@
    }
    /**
    * 检查队列数量
    */
   private void checkQueueCount(String tableName) {
      //初始时间
      long startTime = System.currentTimeMillis();
      while (queryMap.get(tableName) != null && queryMap.get(tableName).size() >= 10) {
         //与初始时间比较,如果超过10分钟则打印
         if (System.currentTimeMillis() - startTime > 600000) {
            WriteUtil.append("DA-队列数量超过10,当前数量:" + queryMap.get(tableName).size());
            startTime = System.currentTimeMillis();
         }
         ThreadUtil.sleep(10000);
      }
   }
   /**
     * 放入队列
     *
     * @param tableName
@@ -325,12 +344,8 @@
                    }
                }
            }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            while (queryQueue.size() >= 10) {
                SpringMVCContextHolder.getSystemLogger().error("DA-队列已满-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            }
        }
    }