cheng
2024-01-28 31016f01ec27432295e77d1720b19cd5fd37ce72
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -17,6 +17,7 @@
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -52,11 +53,12 @@
    /**
     * 查询
     *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
     * @param params
     * @param uniqueField   主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param minID
     */
    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
@@ -78,7 +80,7 @@
            String tempPartMinID = minID;
            int partSize = partCount * QUERY_PAGE_SIZE;
            int count = ceilPage(totalCount, partSize);
            for (int i = 1; i  <= count; i++) {
            for (int i = 1; i <= count; i++) {
                rangeSql.setLength(0);
                if (!StringUtils.isEmpty(filter)) {
                    rangeSql.append(filter).append(" and ");
@@ -149,6 +151,7 @@
    /**
     * 从队列中获取
     *
     * @param tableName
     * @return
     */
@@ -165,6 +168,7 @@
    /**
     * 判定是否查询完毕
     *
     * @param tableName
     * @return
     */
@@ -176,6 +180,7 @@
    /**
     * 判定插入队列(查询完成后放入的队列)是否为空
     *
     * @param tableName
     * @return
     */
@@ -186,6 +191,7 @@
    /**
     * 关闭查询线程
     *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
@@ -202,6 +208,7 @@
    /**
     * 清理
     *
     * @param tableName
     */
    public void clear(String tableName) {
@@ -213,6 +220,7 @@
    /**
     * 提取错误日志,只能提取一次,提取后会直接清空
     *
     * @param tableName
     * @return
     */
@@ -230,6 +238,7 @@
    /**
     * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
     *
     * @param totalCount
     * @return
     */
@@ -253,6 +262,7 @@
    /**
     * 获取sql:查询范围内的最大id值
     *
     * @param uniqueField
     * @param sourceTable
     * @param filter
@@ -292,6 +302,7 @@
    /**
     * 放入队列
     *
     * @param tableName
     * @param dte
     */
@@ -302,13 +313,30 @@
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            if (tableName.endsWith("BAK20230823")) {
                //检查dte中的source_info 和 pre_master_key 是否为空
                for (int i = 0; i < dte.getRows(); i++) {
                    String sourceInfo = dte.getString(i, "source_info");
                    String preMasterKey = dte.getString(i, "pre_master_key");
                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                        dte.setFieldValue(i, "source_info", "ch-kt");
                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
                    }
                }
            }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            while (queryQueue.size() >= 10) {
                SpringMVCContextHolder.getSystemLogger().error("DA-队列已满-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            }
        }
    }
    /**
     * 查询开始,向存活map中添加1
     *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
@@ -320,6 +348,7 @@
    /**
     * 查询结束,向存活map中减少1
     *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
@@ -337,6 +366,7 @@
    /**
     * 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
     *
     * @param tableName
     * @return
     */
@@ -348,6 +378,7 @@
    /**
     * 插入错误日志
     *
     * @param tableName
     * @param error
     */
@@ -369,6 +400,7 @@
    /**
     * 获取页数,向上取整
     *
     * @param count
     * @param size
     * @return