cheng
2024-01-28 31016f01ec27432295e77d1720b19cd5fd37ce72
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -34,381 +34,385 @@
 */
@Service
public class DataArchivingQueue extends AbstractBaseService {
   // 查询队列map:map<表名,dte队列>
   private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
   // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
   private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
   // 查询线程map
   private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
   // 错误日志map
   private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
   // 单表查询最大线程数
   private static final int QUERY_THREAD_COUNT = 3;
   // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数)
   private static final int QUERY_MAX_BATCH_COUNT = 4;
   // 查询每页大小
   private static final int QUERY_PAGE_SIZE = 50000;
   // 插入每页大小
   public static final int INSERT_PAGE_SIZE = 5000;
    // 查询队列map:map<表名,dte队列>
    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
    // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
    // 查询线程map
    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
    // 错误日志map
    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
    // 单表查询最大线程数
    private static final int QUERY_THREAD_COUNT = 3;
    // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数)
    private static final int QUERY_MAX_BATCH_COUNT = 4;
    // 查询每页大小
    private static final int QUERY_PAGE_SIZE = 50000;
    // 插入每页大小
    public static final int INSERT_PAGE_SIZE = 5000;
   /**
    * 查询
    *
    * @param sourceDbe
    * @param sourceTable
    * @param filter
    * @param params
    * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
    * @param minID
    */
   public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
      Dao sourceDao = null;
      try {
         StringBuilder countSql = new StringBuilder(128);
         countSql.append("select count(*) count_value from ").append(sourceTable);
         if (!StringUtils.isEmpty(filter)) {
            countSql.append(" where ").append(filter);
         }
         sourceDao = sourceDbe.newDao();
         FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
         int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
         int partCount = getPartCount(totalCount);
         shutdownQueryThread(sourceTable);
         ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
         queryThreadMap.put(sourceTable, executorService);
         StringBuilder rangeSql = new StringBuilder(128);
         String tempPartMinID = minID;
         int partSize = partCount * QUERY_PAGE_SIZE;
         int count = ceilPage(totalCount, partSize);
         for (int i = 1; i <= count; i++) {
            rangeSql.setLength(0);
            if (!StringUtils.isEmpty(filter)) {
               rangeSql.append(filter).append(" and ");
            }
            rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
            DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
            if (DataTableEntity.isEmpty(rangeDte)) {
               continue;
            }
            FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
            String curPartMaxID = rangeFse.getString("max_id");
            String curPartMinID = tempPartMinID;
            executorService.submit(() -> {
               String threadInfo = String.valueOf(Thread.currentThread().getId());
               Dao threadSourceDao = null;
               String thisPartMinID = curPartMinID;
               try {
                  threadSourceDao = sourceDbe.newDao();
                  startQuery(sourceTable, threadInfo);
                  int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                  StringBuilder tempFilter = new StringBuilder(128);
                  for (int j = 0; j < totalPage; j++) {
                     while (!allowQuery(sourceTable)) {
                        Thread.sleep(RandomUtil.randomInt(800, 1200));
                     }
                     tempFilter.setLength(0);
                     tempFilter.append(uniqueField);
                     WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
                     if (minID.equals(thisPartMinID)) {
                        tempFilter.append(">=");
                     } else {
                        tempFilter.append(">");
                     }
                     tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
                     if (!StringUtils.isEmpty(filter)) {
                        tempFilter.append(" and ").append(filter);
                     }
                     WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
                     DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
                     if (!DataTableEntity.isEmpty(allDte)) {
                        add(sourceTable, allDte);
                        thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
                     } else {
                        break;
                     }
                  }
               } catch (Exception e) {
                  appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
                  SpringMVCContextHolder.getSystemLogger().error(e);
                  clear(sourceTable);
               } finally {
                  if (threadSourceDao != null) {
                     threadSourceDao.closeConnection();
                  }
                  finalQuery(sourceTable, threadInfo);
               }
            });
            tempPartMinID = curPartMaxID;
         }
      } catch (Exception e) {
         throw e;
      } finally {
         if (sourceDao != null) {
            sourceDao.closeConnection();
         }
      }
   }
    /**
     * 查询
     *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
     * @param params
     * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param minID
     */
    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
        Dao sourceDao = null;
        try {
            StringBuilder countSql = new StringBuilder(128);
            countSql.append("select count(*) count_value from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                countSql.append(" where ").append(filter);
            }
            sourceDao = sourceDbe.newDao();
            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
            int partCount = getPartCount(totalCount);
            shutdownQueryThread(sourceTable);
            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
            queryThreadMap.put(sourceTable, executorService);
            StringBuilder rangeSql = new StringBuilder(128);
            String tempPartMinID = minID;
            int partSize = partCount * QUERY_PAGE_SIZE;
            int count = ceilPage(totalCount, partSize);
            for (int i = 1; i <= count; i++) {
                rangeSql.setLength(0);
                if (!StringUtils.isEmpty(filter)) {
                    rangeSql.append(filter).append(" and ");
                }
                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
                if (DataTableEntity.isEmpty(rangeDte)) {
                    continue;
                }
                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
                String curPartMaxID = rangeFse.getString("max_id");
                String curPartMinID = tempPartMinID;
                executorService.submit(() -> {
                    String threadInfo = String.valueOf(Thread.currentThread().getId());
                    Dao threadSourceDao = null;
                    String thisPartMinID = curPartMinID;
                    try {
                        threadSourceDao = sourceDbe.newDao();
                        startQuery(sourceTable, threadInfo);
                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                        StringBuilder tempFilter = new StringBuilder(128);
                        for (int j = 0; j < totalPage; j++) {
                            while (!allowQuery(sourceTable)) {
                                Thread.sleep(RandomUtil.randomInt(800, 1200));
                            }
                            tempFilter.setLength(0);
                            tempFilter.append(uniqueField);
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
                            if (minID.equals(thisPartMinID)) {
                                tempFilter.append(">=");
                            } else {
                                tempFilter.append(">");
                            }
                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
                            if (!StringUtils.isEmpty(filter)) {
                                tempFilter.append(" and ").append(filter);
                            }
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
                            if (!DataTableEntity.isEmpty(allDte)) {
                                add(sourceTable, allDte);
                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
                            } else {
                                break;
                            }
                        }
                    } catch (Exception e) {
                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        clear(sourceTable);
                    } finally {
                        if (threadSourceDao != null) {
                            threadSourceDao.closeConnection();
                        }
                        finalQuery(sourceTable, threadInfo);
                    }
                });
                tempPartMinID = curPartMaxID;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (sourceDao != null) {
                sourceDao.closeConnection();
            }
        }
    }
   /**
    * 从队列中获取
    *
    * @param tableName
    * @return
    */
   public DataTableEntity get(String tableName) {
      WriteUtil.append("DA-从队列中获取-表名:" + tableName);
      synchronized (tableName.intern()) {
         LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
         if (queryQueue == null) {
            return null;
         }
         return queryQueue.poll();
      }
   }
    /**
     * 从队列中获取
     *
     * @param tableName
     * @return
     */
    public DataTableEntity get(String tableName) {
        WriteUtil.append("DA-从队列中获取-表名:" + tableName);
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                return null;
            }
            return queryQueue.poll();
        }
    }
   /**
    * 判定是否查询完毕
    *
    * @param tableName
    * @return
    */
   public boolean checkQueryFinish(String tableName) {
      WriteUtil.append("DA-判定是否查询完毕");
      Set<String> set = existsQueryMap.get(tableName);
      return set == null || set.isEmpty();
   }
    /**
     * 判定是否查询完毕
     *
     * @param tableName
     * @return
     */
    public boolean checkQueryFinish(String tableName) {
        WriteUtil.append("DA-判定是否查询完毕");
        Set<String> set = existsQueryMap.get(tableName);
        return set == null || set.isEmpty();
    }
   /**
    * 判定插入队列(查询完成后放入的队列)是否为空
    *
    * @param tableName
    * @return
    */
   public boolean checkInsertQueueEmpty(String tableName) {
      WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
      return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
   }
    /**
     * 判定插入队列(查询完成后放入的队列)是否为空
     *
     * @param tableName
     * @return
     */
    public boolean checkInsertQueueEmpty(String tableName) {
        WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
    }
   /**
    * 关闭查询线程
    *
    * @param tableName
    */
   public void shutdownQueryThread(String tableName) {
      synchronized (tableName.intern()) {
         ExecutorService executorService = queryThreadMap.get(tableName);
         if (executorService != null) {
            if (!executorService.isShutdown()) {
               executorService.shutdown();
            }
            queryThreadMap.remove(tableName);
         }
      }
   }
    /**
     * 关闭查询线程
     *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
        synchronized (tableName.intern()) {
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
                queryThreadMap.remove(tableName);
            }
        }
    }
   /**
    * 清理
    *
    * @param tableName
    */
   public void clear(String tableName) {
      synchronized (tableName.intern()) {
         queryMap.remove(tableName);
         shutdownQueryThread(tableName);
      }
   }
    /**
     * 清理
     *
     * @param tableName
     */
    public void clear(String tableName) {
        synchronized (tableName.intern()) {
            queryMap.remove(tableName);
            shutdownQueryThread(tableName);
        }
    }
   /**
    * 提取错误日志,只能提取一次,提取后会直接清空
    *
    * @param tableName
    * @return
    */
   public String getErrorLog(String tableName) {
      synchronized (tableName.intern()) {
         if (errorLogMap == null) {
            return null;
         } else {
            StringBuilder result = errorLogMap.get(tableName);
            errorLogMap.remove(tableName);
            return result == null ? null : result.toString();
         }
      }
   }
    /**
     * 提取错误日志,只能提取一次,提取后会直接清空
     *
     * @param tableName
     * @return
     */
    public String getErrorLog(String tableName) {
        synchronized (tableName.intern()) {
            if (errorLogMap == null) {
                return null;
            } else {
                StringBuilder result = errorLogMap.get(tableName);
                errorLogMap.remove(tableName);
                return result == null ? null : result.toString();
            }
        }
    }
   /**
    * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
    *
    * @param totalCount
    * @return
    */
   private int getPartCount(int totalCount) {
      int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
      int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
      if (partCount >= 16) {
         return 16;
      } else if (partCount >= 8) {
         return 8;
      } else if (partCount >= 4) {
         return 4;
      } else if (partCount >= 2) {
         return 2;
      } else if (partCount >= 1) {
         return 1;
      } else {
         return 0;
      }
   }
    /**
     * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
     *
     * @param totalCount
     * @return
     */
    private int getPartCount(int totalCount) {
        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
        if (partCount >= 16) {
            return 16;
        } else if (partCount >= 8) {
            return 8;
        } else if (partCount >= 4) {
            return 4;
        } else if (partCount >= 2) {
            return 2;
        } else if (partCount >= 1) {
            return 1;
        } else {
            return 0;
        }
    }
   /**
    * 获取sql:查询范围内的最大id值
    *
    * @param uniqueField
    * @param sourceTable
    * @param filter
    * @param dbType
    * @param pageIndex
    * @param pageSize
    * @return
    */
   private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
      int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
      int finalIndex = startIndex + pageSize;
      StringBuilder sql = new StringBuilder(128);
      if (DataBaseType.MYSQL.getValue() == dbType) {
         sql.append("select max(").append(uniqueField).append(") max_id from (");
         sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
         if (!StringUtils.isEmpty(filter)) {
            sql.append("\n    where ").append(filter);
         }
         sql.append("\n    order by ").append(uniqueField);
         sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
         sql.append("\n) t");
      } else if (DataBaseType.ORACLE.getValue() == dbType) {
         sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
         sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
         sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
         if (!StringUtils.isEmpty(filter)) {
            sql.append("\n        WHERE ").append(filter);
         }
         sql.append("\n        ORDER BY ").append(uniqueField);
         sql.append("\n    ) T1");
         sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
         sql.append("\n) T2");
         sql.append("\nWHERE R>").append(startIndex);
      }
      return sql.toString();
   }
    /**
     * 获取sql:查询范围内的最大id值
     *
     * @param uniqueField
     * @param sourceTable
     * @param filter
     * @param dbType
     * @param pageIndex
     * @param pageSize
     * @return
     */
    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
        int finalIndex = startIndex + pageSize;
        StringBuilder sql = new StringBuilder(128);
        if (DataBaseType.MYSQL.getValue() == dbType) {
            sql.append("select max(").append(uniqueField).append(") max_id from (");
            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n    where ").append(filter);
            }
            sql.append("\n    order by ").append(uniqueField);
            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
            sql.append("\n) t");
        } else if (DataBaseType.ORACLE.getValue() == dbType) {
            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n        WHERE ").append(filter);
            }
            sql.append("\n        ORDER BY ").append(uniqueField);
            sql.append("\n    ) T1");
            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
            sql.append("\n) T2");
            sql.append("\nWHERE R>").append(startIndex);
        }
        return sql.toString();
    }
   /**
    * 放入队列
    *
    * @param tableName
    * @param dte
    */
   private void add(String tableName, DataTableEntity dte) {
      synchronized (tableName.intern()) {
         LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
         if (queryQueue == null) {
            queryQueue = new LinkedBlockingQueue<>();
            queryMap.put(tableName, queryQueue);
         }
         if (tableName.endsWith("BAK20230823")) {
            //检查dte中的source_info 和 pre_master_key 是否为空
            for (int i = 0; i < dte.getRows(); i++) {
               String sourceInfo = dte.getString(i, "source_info");
               String preMasterKey = dte.getString(i, "pre_master_key");
               if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                  dte.setFieldValue(i, "source_info", "ch-kt");
                  String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                  dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
               }
            }
         }
         queryQueue.add(dte);
         WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
      }
   }
    /**
     * 放入队列
     *
     * @param tableName
     * @param dte
     */
    private void add(String tableName, DataTableEntity dte) {
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            if (tableName.endsWith("BAK20230823")) {
                //检查dte中的source_info 和 pre_master_key 是否为空
                for (int i = 0; i < dte.getRows(); i++) {
                    String sourceInfo = dte.getString(i, "source_info");
                    String preMasterKey = dte.getString(i, "pre_master_key");
                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                        dte.setFieldValue(i, "source_info", "ch-kt");
                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
                    }
                }
            }
   /**
    * 查询开始,向存活map中添加1
    *
    * @param tableName
    */
   private void startQuery(String tableName, String threadInfo) {
      synchronized (tableName.intern()) {
         Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
         set.add(threadInfo);
      }
   }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            while (queryQueue.size() >= 10) {
                SpringMVCContextHolder.getSystemLogger().error("DA-队列已满-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            }
        }
    }
   /**
    * 查询结束,向存活map中减少1
    *
    * @param tableName
    */
   private void finalQuery(String tableName, String threadInfo) {
      synchronized (tableName.intern()) {
         Set<String> set = existsQueryMap.get(tableName);
         if (set == null || !set.contains(threadInfo)) {
            throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
         }
         set.remove(threadInfo);
         if (set.isEmpty()) {
            existsQueryMap.remove(tableName);
         }
      }
   }
    /**
     * 查询开始,向存活map中添加1
     *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
            set.add(threadInfo);
        }
    }
   /**
    * 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
    *
    * @param tableName
    * @return
    */
   private boolean allowQuery(String tableName) {
      synchronized (tableName.intern()) {
         return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
      }
   }
    /**
     * 查询结束,向存活map中减少1
     *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.get(tableName);
            if (set == null || !set.contains(threadInfo)) {
                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
            }
            set.remove(threadInfo);
            if (set.isEmpty()) {
                existsQueryMap.remove(tableName);
            }
        }
    }
   /**
    * 插入错误日志
    *
    * @param tableName
    * @param error
    */
   private void appendErrorLog(String tableName, String error) {
      synchronized (tableName.intern()) {
         StringBuilder errorSb = errorLogMap.get(tableName);
         if (errorSb == null) {
            errorSb = new StringBuilder(128);
            errorLogMap.put(tableName, errorSb);
         }
         if (errorSb.length() > 0) {
            errorSb.append("\n");
         }
         if (errorSb.length() < 2000) {
            errorSb.append(error);
         }
      }
   }
    /**
     * 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
     *
     * @param tableName
     * @return
     */
    private boolean allowQuery(String tableName) {
        synchronized (tableName.intern()) {
            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
        }
    }
   /**
    * 获取页数,向上取整
    *
    * @param count
    * @param size
    * @return
    */
   private int ceilPage(int count, int size) {
      if (size == 0) {
         if (count == 0) {
            return 0;
         } else {
            throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
         }
      }
      return count / size + (count % size == 0 ? 0 : 1);
   }
    /**
     * 插入错误日志
     *
     * @param tableName
     * @param error
     */
    private void appendErrorLog(String tableName, String error) {
        synchronized (tableName.intern()) {
            StringBuilder errorSb = errorLogMap.get(tableName);
            if (errorSb == null) {
                errorSb = new StringBuilder(128);
                errorLogMap.put(tableName, errorSb);
            }
            if (errorSb.length() > 0) {
                errorSb.append("\n");
            }
            if (errorSb.length() < 2000) {
                errorSb.append(error);
            }
        }
    }
    /**
     * 获取页数,向上取整
     *
     * @param count
     * @param size
     * @return
     */
    private int ceilPage(int count, int size) {
        if (size == 0) {
            if (count == 0) {
                return 0;
            } else {
                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
            }
        }
        return count / size + (count % size == 0 ? 0 : 1);
    }
}