许鹏程
2023-09-07 09ba53cbe50ea20ec17ea8c9e5c620fa520f5b45
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -17,6 +17,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,6 +53,7 @@
    /**
     * 查询
    *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
@@ -149,6 +151,7 @@
    /**
     * 从队列中获取
    *
     * @param tableName
     * @return
     */
@@ -165,6 +168,7 @@
    /**
     * 判定是否查询完毕
    *
     * @param tableName
     * @return
     */
@@ -176,6 +180,7 @@
    /**
     * 判定插入队列(查询完成后放入的队列)是否为空
    *
     * @param tableName
     * @return
     */
@@ -186,6 +191,7 @@
    /**
     * 关闭查询线程
    *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
@@ -202,6 +208,7 @@
    /**
     * 清理
    *
     * @param tableName
     */
    public void clear(String tableName) {
@@ -213,6 +220,7 @@
    /**
     * 提取错误日志,只能提取一次,提取后会直接清空
    *
     * @param tableName
     * @return
     */
@@ -230,6 +238,7 @@
    /**
     * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
    *
     * @param totalCount
     * @return
     */
@@ -253,6 +262,7 @@
    /**
     * 获取sql:查询范围内的最大id值
    *
     * @param uniqueField
     * @param sourceTable
     * @param filter
@@ -292,6 +302,7 @@
    /**
     * 放入队列
    *
     * @param tableName
     * @param dte
     */
@@ -302,6 +313,18 @@
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
         if (tableName.endsWith("BAK20230823")) {
            //检查dte中的source_info 和 pre_master_key 是否为空
            for (int i = 0; i < dte.getRows(); i++) {
               String sourceInfo = dte.getString(i, "source_info");
               String preMasterKey = dte.getString(i, "pre_master_key");
               if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                  dte.setFieldValue(i, "source_info", "ch-kt");
                  String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                  dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
               }
            }
         }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
        }
@@ -309,6 +332,7 @@
    /**
     * 查询开始,向存活map中添加1
    *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
@@ -320,6 +344,7 @@
    /**
     * 查询结束,向存活map中减少1
    *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
@@ -337,6 +362,7 @@
    /**
     * 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
    *
     * @param tableName
     * @return
     */
@@ -348,6 +374,7 @@
    /**
     * 插入错误日志
    *
     * @param tableName
     * @param error
     */
@@ -369,6 +396,7 @@
    /**
     * 获取页数,向上取整
    *
     * @param count
     * @param size
     * @return