From 31016f01ec27432295e77d1720b19cd5fd37ce72 Mon Sep 17 00:00:00 2001
From: cheng <1821349743@qq.com>
Date: 星期日, 28 一月 2024 14:29:00 +0800
Subject: [PATCH] 归档限制每个表的队列数量,oracle 更改创建连接池

---
 product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java |  728 ++++++++++++++++++++++++++++---------------------------
 1 files changed, 366 insertions(+), 362 deletions(-)

diff --git a/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java b/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
index 42972e2..8cccab4 100644
--- a/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
+++ b/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -34,381 +34,385 @@
  */
 @Service
 public class DataArchivingQueue extends AbstractBaseService {
-	// 鏌ヨ闃熷垪map锛歮ap<琛ㄥ悕锛宒te闃熷垪>
-	private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
-	// 鏌ヨ瀛樻椿map锛氭鍦ㄦ墽琛屾煡璇㈢殑琛紝瀵瑰簲鐨勫�间负鎵ц鐨勭嚎绋嬫暟锛屼负0鏍囪瘑宸茬粡鎵ц瀹屾垚
-	private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
-	// 鏌ヨ绾跨▼map
-	private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
-	// 閿欒鏃ュ織map
-	private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
-	// 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟
-	private static final int QUERY_THREAD_COUNT = 3;
-	// 鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆★紙鍗曡〃鏌ヨ鏈�澶ф壒娆�=鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆� + 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟锛�
-	private static final int QUERY_MAX_BATCH_COUNT = 4;
-	// 鏌ヨ姣忛〉澶у皬
-	private static final int QUERY_PAGE_SIZE = 50000;
-	// 鎻掑叆姣忛〉澶у皬
-	public static final int INSERT_PAGE_SIZE = 5000;
+    // 鏌ヨ闃熷垪map锛歮ap<琛ㄥ悕锛宒te闃熷垪>
+    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
+    // 鏌ヨ瀛樻椿map锛氭鍦ㄦ墽琛屾煡璇㈢殑琛紝瀵瑰簲鐨勫�间负鎵ц鐨勭嚎绋嬫暟锛屼负0鏍囪瘑宸茬粡鎵ц瀹屾垚
+    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
+    // 鏌ヨ绾跨▼map
+    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
+    // 閿欒鏃ュ織map
+    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
+    // 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟
+    private static final int QUERY_THREAD_COUNT = 3;
+    // 鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆★紙鍗曡〃鏌ヨ鏈�澶ф壒娆�=鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆� + 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟锛�
+    private static final int QUERY_MAX_BATCH_COUNT = 4;
+    // 鏌ヨ姣忛〉澶у皬
+    private static final int QUERY_PAGE_SIZE = 50000;
+    // 鎻掑叆姣忛〉澶у皬
+    public static final int INSERT_PAGE_SIZE = 5000;
 
-	/**
-	 * 鏌ヨ
-	 *
-	 * @param sourceDbe
-	 * @param sourceTable
-	 * @param filter
-	 * @param params
-	 * @param uniqueField 涓婚敭锛屼笉浠呬細鐢ㄤ簬璇嗗埆杩樹細鐢ㄤ簬鎺掑簭闃叉oracle鍒嗛〉鑾峰彇鍒伴噸澶嶆暟鎹�
-	 * @param minID
-	 */
-	public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
-		Dao sourceDao = null;
-		try {
-			StringBuilder countSql = new StringBuilder(128);
-			countSql.append("select count(*) count_value from ").append(sourceTable);
-			if (!StringUtils.isEmpty(filter)) {
-				countSql.append(" where ").append(filter);
-			}
-			sourceDao = sourceDbe.newDao();
-			FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
-			int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
-			int partCount = getPartCount(totalCount);
-			shutdownQueryThread(sourceTable);
-			ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
-			queryThreadMap.put(sourceTable, executorService);
-			StringBuilder rangeSql = new StringBuilder(128);
-			String tempPartMinID = minID;
-			int partSize = partCount * QUERY_PAGE_SIZE;
-			int count = ceilPage(totalCount, partSize);
-			for (int i = 1; i <= count; i++) {
-				rangeSql.setLength(0);
-				if (!StringUtils.isEmpty(filter)) {
-					rangeSql.append(filter).append(" and ");
-				}
-				rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
-				DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
-				if (DataTableEntity.isEmpty(rangeDte)) {
-					continue;
-				}
-				FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
-				String curPartMaxID = rangeFse.getString("max_id");
-				String curPartMinID = tempPartMinID;
-				executorService.submit(() -> {
-					String threadInfo = String.valueOf(Thread.currentThread().getId());
-					Dao threadSourceDao = null;
-					String thisPartMinID = curPartMinID;
-					try {
-						threadSourceDao = sourceDbe.newDao();
-						startQuery(sourceTable, threadInfo);
-						int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
-						StringBuilder tempFilter = new StringBuilder(128);
-						for (int j = 0; j < totalPage; j++) {
-							while (!allowQuery(sourceTable)) {
-								Thread.sleep(RandomUtil.randomInt(800, 1200));
-							}
-							tempFilter.setLength(0);
-							tempFilter.append(uniqueField);
-							WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
-							if (minID.equals(thisPartMinID)) {
-								tempFilter.append(">=");
-							} else {
-								tempFilter.append(">");
-							}
-							tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
-							if (!StringUtils.isEmpty(filter)) {
-								tempFilter.append(" and ").append(filter);
-							}
-							WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage锛�" + (j + 1) + "-pageSize锛�" + QUERY_PAGE_SIZE + "-filter锛�" + tempFilter);
-							DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
-							if (!DataTableEntity.isEmpty(allDte)) {
-								add(sourceTable, allDte);
-								thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
-							} else {
-								break;
-							}
-						}
-					} catch (Exception e) {
-						appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
-						SpringMVCContextHolder.getSystemLogger().error(e);
-						clear(sourceTable);
-					} finally {
-						if (threadSourceDao != null) {
-							threadSourceDao.closeConnection();
-						}
-						finalQuery(sourceTable, threadInfo);
-					}
-				});
-				tempPartMinID = curPartMaxID;
-			}
-		} catch (Exception e) {
-			throw e;
-		} finally {
-			if (sourceDao != null) {
-				sourceDao.closeConnection();
-			}
-		}
-	}
+    /**
+     * 鏌ヨ
+     *
+     * @param sourceDbe
+     * @param sourceTable
+     * @param filter
+     * @param params
+     * @param uniqueField 涓婚敭锛屼笉浠呬細鐢ㄤ簬璇嗗埆杩樹細鐢ㄤ簬鎺掑簭闃叉oracle鍒嗛〉鑾峰彇鍒伴噸澶嶆暟鎹�
+     * @param minID
+     */
+    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
+        Dao sourceDao = null;
+        try {
+            StringBuilder countSql = new StringBuilder(128);
+            countSql.append("select count(*) count_value from ").append(sourceTable);
+            if (!StringUtils.isEmpty(filter)) {
+                countSql.append(" where ").append(filter);
+            }
+            sourceDao = sourceDbe.newDao();
+            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
+            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
+            int partCount = getPartCount(totalCount);
+            shutdownQueryThread(sourceTable);
+            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
+            queryThreadMap.put(sourceTable, executorService);
+            StringBuilder rangeSql = new StringBuilder(128);
+            String tempPartMinID = minID;
+            int partSize = partCount * QUERY_PAGE_SIZE;
+            int count = ceilPage(totalCount, partSize);
+            for (int i = 1; i <= count; i++) {
+                rangeSql.setLength(0);
+                if (!StringUtils.isEmpty(filter)) {
+                    rangeSql.append(filter).append(" and ");
+                }
+                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
+                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
+                if (DataTableEntity.isEmpty(rangeDte)) {
+                    continue;
+                }
+                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
+                String curPartMaxID = rangeFse.getString("max_id");
+                String curPartMinID = tempPartMinID;
+                executorService.submit(() -> {
+                    String threadInfo = String.valueOf(Thread.currentThread().getId());
+                    Dao threadSourceDao = null;
+                    String thisPartMinID = curPartMinID;
+                    try {
+                        threadSourceDao = sourceDbe.newDao();
+                        startQuery(sourceTable, threadInfo);
+                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
+                        StringBuilder tempFilter = new StringBuilder(128);
+                        for (int j = 0; j < totalPage; j++) {
+                            while (!allowQuery(sourceTable)) {
+                                Thread.sleep(RandomUtil.randomInt(800, 1200));
+                            }
+                            tempFilter.setLength(0);
+                            tempFilter.append(uniqueField);
+                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
+                            if (minID.equals(thisPartMinID)) {
+                                tempFilter.append(">=");
+                            } else {
+                                tempFilter.append(">");
+                            }
+                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
+                            if (!StringUtils.isEmpty(filter)) {
+                                tempFilter.append(" and ").append(filter);
+                            }
+                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage锛�" + (j + 1) + "-pageSize锛�" + QUERY_PAGE_SIZE + "-filter锛�" + tempFilter);
+                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
+                            if (!DataTableEntity.isEmpty(allDte)) {
+                                add(sourceTable, allDte);
+                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
+                            } else {
+                                break;
+                            }
+                        }
+                    } catch (Exception e) {
+                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
+                        SpringMVCContextHolder.getSystemLogger().error(e);
+                        clear(sourceTable);
+                    } finally {
+                        if (threadSourceDao != null) {
+                            threadSourceDao.closeConnection();
+                        }
+                        finalQuery(sourceTable, threadInfo);
+                    }
+                });
+                tempPartMinID = curPartMaxID;
+            }
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            if (sourceDao != null) {
+                sourceDao.closeConnection();
+            }
+        }
+    }
 
-	/**
-	 * 浠庨槦鍒椾腑鑾峰彇
-	 *
-	 * @param tableName
-	 * @return
-	 */
-	public DataTableEntity get(String tableName) {
-		WriteUtil.append("DA-浠庨槦鍒椾腑鑾峰彇-琛ㄥ悕锛�" + tableName);
-		synchronized (tableName.intern()) {
-			LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
-			if (queryQueue == null) {
-				return null;
-			}
-			return queryQueue.poll();
-		}
-	}
+    /**
+     * 浠庨槦鍒椾腑鑾峰彇
+     *
+     * @param tableName
+     * @return
+     */
+    public DataTableEntity get(String tableName) {
+        WriteUtil.append("DA-浠庨槦鍒椾腑鑾峰彇-琛ㄥ悕锛�" + tableName);
+        synchronized (tableName.intern()) {
+            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
+            if (queryQueue == null) {
+                return null;
+            }
+            return queryQueue.poll();
+        }
+    }
 
-	/**
-	 * 鍒ゅ畾鏄惁鏌ヨ瀹屾瘯
-	 *
-	 * @param tableName
-	 * @return
-	 */
-	public boolean checkQueryFinish(String tableName) {
-		WriteUtil.append("DA-鍒ゅ畾鏄惁鏌ヨ瀹屾瘯");
-		Set<String> set = existsQueryMap.get(tableName);
-		return set == null || set.isEmpty();
-	}
+    /**
+     * 鍒ゅ畾鏄惁鏌ヨ瀹屾瘯
+     *
+     * @param tableName
+     * @return
+     */
+    public boolean checkQueryFinish(String tableName) {
+        WriteUtil.append("DA-鍒ゅ畾鏄惁鏌ヨ瀹屾瘯");
+        Set<String> set = existsQueryMap.get(tableName);
+        return set == null || set.isEmpty();
+    }
 
-	/**
-	 * 鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖
-	 *
-	 * @param tableName
-	 * @return
-	 */
-	public boolean checkInsertQueueEmpty(String tableName) {
-		WriteUtil.append("DA-鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖");
-		return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
-	}
+    /**
+     * 鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖
+     *
+     * @param tableName
+     * @return
+     */
+    public boolean checkInsertQueueEmpty(String tableName) {
+        WriteUtil.append("DA-鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖");
+        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
+    }
 
-	/**
-	 * 鍏抽棴鏌ヨ绾跨▼
-	 *
-	 * @param tableName
-	 */
-	public void shutdownQueryThread(String tableName) {
-		synchronized (tableName.intern()) {
-			ExecutorService executorService = queryThreadMap.get(tableName);
-			if (executorService != null) {
-				if (!executorService.isShutdown()) {
-					executorService.shutdown();
-				}
-				queryThreadMap.remove(tableName);
-			}
-		}
-	}
+    /**
+     * 鍏抽棴鏌ヨ绾跨▼
+     *
+     * @param tableName
+     */
+    public void shutdownQueryThread(String tableName) {
+        synchronized (tableName.intern()) {
+            ExecutorService executorService = queryThreadMap.get(tableName);
+            if (executorService != null) {
+                if (!executorService.isShutdown()) {
+                    executorService.shutdown();
+                }
+                queryThreadMap.remove(tableName);
+            }
+        }
+    }
 
-	/**
-	 * 娓呯悊
-	 *
-	 * @param tableName
-	 */
-	public void clear(String tableName) {
-		synchronized (tableName.intern()) {
-			queryMap.remove(tableName);
-			shutdownQueryThread(tableName);
-		}
-	}
+    /**
+     * 娓呯悊
+     *
+     * @param tableName
+     */
+    public void clear(String tableName) {
+        synchronized (tableName.intern()) {
+            queryMap.remove(tableName);
+            shutdownQueryThread(tableName);
+        }
+    }
 
-	/**
-	 * 鎻愬彇閿欒鏃ュ織锛屽彧鑳芥彁鍙栦竴娆★紝鎻愬彇鍚庝細鐩存帴娓呯┖
-	 *
-	 * @param tableName
-	 * @return
-	 */
-	public String getErrorLog(String tableName) {
-		synchronized (tableName.intern()) {
-			if (errorLogMap == null) {
-				return null;
-			} else {
-				StringBuilder result = errorLogMap.get(tableName);
-				errorLogMap.remove(tableName);
-				return result == null ? null : result.toString();
-			}
-		}
-	}
+    /**
+     * 鎻愬彇閿欒鏃ュ織锛屽彧鑳芥彁鍙栦竴娆★紝鎻愬彇鍚庝細鐩存帴娓呯┖
+     *
+     * @param tableName
+     * @return
+     */
+    public String getErrorLog(String tableName) {
+        synchronized (tableName.intern()) {
+            if (errorLogMap == null) {
+                return null;
+            } else {
+                StringBuilder result = errorLogMap.get(tableName);
+                errorLogMap.remove(tableName);
+                return result == null ? null : result.toString();
+            }
+        }
+    }
 
-	/**
-	 * 鑾峰彇鍒嗘鐨勬暟閲忥紝鏈�澶�16锛屾渶灏忎负0锛屼负0琛ㄧず鐢ㄤ笉涓婃墍鏈夌殑绾跨▼
-	 *
-	 * @param totalCount
-	 * @return
-	 */
-	private int getPartCount(int totalCount) {
-		int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
-		int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
-		if (partCount >= 16) {
-			return 16;
-		} else if (partCount >= 8) {
-			return 8;
-		} else if (partCount >= 4) {
-			return 4;
-		} else if (partCount >= 2) {
-			return 2;
-		} else if (partCount >= 1) {
-			return 1;
-		} else {
-			return 0;
-		}
-	}
+    /**
+     * 鑾峰彇鍒嗘鐨勬暟閲忥紝鏈�澶�16锛屾渶灏忎负0锛屼负0琛ㄧず鐢ㄤ笉涓婃墍鏈夌殑绾跨▼
+     *
+     * @param totalCount
+     * @return
+     */
+    private int getPartCount(int totalCount) {
+        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
+        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
+        if (partCount >= 16) {
+            return 16;
+        } else if (partCount >= 8) {
+            return 8;
+        } else if (partCount >= 4) {
+            return 4;
+        } else if (partCount >= 2) {
+            return 2;
+        } else if (partCount >= 1) {
+            return 1;
+        } else {
+            return 0;
+        }
+    }
 
-	/**
-	 * 鑾峰彇sql锛氭煡璇㈣寖鍥村唴鐨勬渶澶d鍊�
-	 *
-	 * @param uniqueField
-	 * @param sourceTable
-	 * @param filter
-	 * @param dbType
-	 * @param pageIndex
-	 * @param pageSize
-	 * @return
-	 */
-	private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
-		int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
-		int finalIndex = startIndex + pageSize;
-		StringBuilder sql = new StringBuilder(128);
-		if (DataBaseType.MYSQL.getValue() == dbType) {
-			sql.append("select max(").append(uniqueField).append(") max_id from (");
-			sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
-			if (!StringUtils.isEmpty(filter)) {
-				sql.append("\n    where ").append(filter);
-			}
-			sql.append("\n    order by ").append(uniqueField);
-			sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
-			sql.append("\n) t");
-		} else if (DataBaseType.ORACLE.getValue() == dbType) {
-			sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
-			sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
-			sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
-			if (!StringUtils.isEmpty(filter)) {
-				sql.append("\n        WHERE ").append(filter);
-			}
-			sql.append("\n        ORDER BY ").append(uniqueField);
-			sql.append("\n    ) T1");
-			sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
-			sql.append("\n) T2");
-			sql.append("\nWHERE R>").append(startIndex);
-		}
-		return sql.toString();
-	}
+    /**
+     * 鑾峰彇sql锛氭煡璇㈣寖鍥村唴鐨勬渶澶d鍊�
+     *
+     * @param uniqueField
+     * @param sourceTable
+     * @param filter
+     * @param dbType
+     * @param pageIndex
+     * @param pageSize
+     * @return
+     */
+    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
+        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
+        int finalIndex = startIndex + pageSize;
+        StringBuilder sql = new StringBuilder(128);
+        if (DataBaseType.MYSQL.getValue() == dbType) {
+            sql.append("select max(").append(uniqueField).append(") max_id from (");
+            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
+            if (!StringUtils.isEmpty(filter)) {
+                sql.append("\n    where ").append(filter);
+            }
+            sql.append("\n    order by ").append(uniqueField);
+            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
+            sql.append("\n) t");
+        } else if (DataBaseType.ORACLE.getValue() == dbType) {
+            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
+            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
+            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
+            if (!StringUtils.isEmpty(filter)) {
+                sql.append("\n        WHERE ").append(filter);
+            }
+            sql.append("\n        ORDER BY ").append(uniqueField);
+            sql.append("\n    ) T1");
+            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
+            sql.append("\n) T2");
+            sql.append("\nWHERE R>").append(startIndex);
+        }
+        return sql.toString();
+    }
 
-	/**
-	 * 鏀惧叆闃熷垪
-	 *
-	 * @param tableName
-	 * @param dte
-	 */
-	private void add(String tableName, DataTableEntity dte) {
-		synchronized (tableName.intern()) {
-			LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
-			if (queryQueue == null) {
-				queryQueue = new LinkedBlockingQueue<>();
-				queryMap.put(tableName, queryQueue);
-			}
-			if (tableName.endsWith("BAK20230823")) {
-				//妫�鏌te涓殑source_info 鍜� pre_master_key 鏄惁涓虹┖
-				for (int i = 0; i < dte.getRows(); i++) {
-					String sourceInfo = dte.getString(i, "source_info");
-					String preMasterKey = dte.getString(i, "pre_master_key");
-					if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
-						dte.setFieldValue(i, "source_info", "ch-kt");
-						String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
-						dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
-					}
-				}
-			}
-			queryQueue.add(dte);
-			WriteUtil.append("DA-鎴愬姛鏀惧叆闃熷垪-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size());
-		}
-	}
+    /**
+     * 鏀惧叆闃熷垪
+     *
+     * @param tableName
+     * @param dte
+     */
+    private void add(String tableName, DataTableEntity dte) {
+        synchronized (tableName.intern()) {
+            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
+            if (queryQueue == null) {
+                queryQueue = new LinkedBlockingQueue<>();
+                queryMap.put(tableName, queryQueue);
+            }
+            if (tableName.endsWith("BAK20230823")) {
+                //妫�鏌te涓殑source_info 鍜� pre_master_key 鏄惁涓虹┖
+                for (int i = 0; i < dte.getRows(); i++) {
+                    String sourceInfo = dte.getString(i, "source_info");
+                    String preMasterKey = dte.getString(i, "pre_master_key");
+                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
+                        dte.setFieldValue(i, "source_info", "ch-kt");
+                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
+                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
+                    }
+                }
+            }
 
-	/**
-	 * 鏌ヨ寮�濮嬶紝鍚戝瓨娲籱ap涓坊鍔�1
-	 *
-	 * @param tableName
-	 */
-	private void startQuery(String tableName, String threadInfo) {
-		synchronized (tableName.intern()) {
-			Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
-			set.add(threadInfo);
-		}
-	}
+            queryQueue.add(dte);
+            WriteUtil.append("DA-鎴愬姛鏀惧叆闃熷垪-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size());
+            while (queryQueue.size() >= 10) {
+                SpringMVCContextHolder.getSystemLogger().error("DA-闃熷垪宸叉弧-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size());
+            }
+        }
+    }
 
-	/**
-	 * 鏌ヨ缁撴潫锛屽悜瀛樻椿map涓噺灏�1
-	 *
-	 * @param tableName
-	 */
-	private void finalQuery(String tableName, String threadInfo) {
-		synchronized (tableName.intern()) {
-			Set<String> set = existsQueryMap.get(tableName);
-			if (set == null || !set.contains(threadInfo)) {
-				throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
-			}
-			set.remove(threadInfo);
-			if (set.isEmpty()) {
-				existsQueryMap.remove(tableName);
-			}
-		}
-	}
+    /**
+     * 鏌ヨ寮�濮嬶紝鍚戝瓨娲籱ap涓坊鍔�1
+     *
+     * @param tableName
+     */
+    private void startQuery(String tableName, String threadInfo) {
+        synchronized (tableName.intern()) {
+            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
+            set.add(threadInfo);
+        }
+    }
 
-	/**
-	 * 鍏佽鎵ц鏌ヨ锛堥伩鍏嶉槦鍒椾腑绛夊緟鎻掑叆鐨勫お澶氾紝瀵艰嚧鍐呭瓨婧㈠嚭锛�
-	 *
-	 * @param tableName
-	 * @return
-	 */
-	private boolean allowQuery(String tableName) {
-		synchronized (tableName.intern()) {
-			return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
-		}
-	}
+    /**
+     * 鏌ヨ缁撴潫锛屽悜瀛樻椿map涓噺灏�1
+     *
+     * @param tableName
+     */
+    private void finalQuery(String tableName, String threadInfo) {
+        synchronized (tableName.intern()) {
+            Set<String> set = existsQueryMap.get(tableName);
+            if (set == null || !set.contains(threadInfo)) {
+                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
+            }
+            set.remove(threadInfo);
+            if (set.isEmpty()) {
+                existsQueryMap.remove(tableName);
+            }
+        }
+    }
 
-	/**
-	 * 鎻掑叆閿欒鏃ュ織
-	 *
-	 * @param tableName
-	 * @param error
-	 */
-	private void appendErrorLog(String tableName, String error) {
-		synchronized (tableName.intern()) {
-			StringBuilder errorSb = errorLogMap.get(tableName);
-			if (errorSb == null) {
-				errorSb = new StringBuilder(128);
-				errorLogMap.put(tableName, errorSb);
-			}
-			if (errorSb.length() > 0) {
-				errorSb.append("\n");
-			}
-			if (errorSb.length() < 2000) {
-				errorSb.append(error);
-			}
-		}
-	}
+    /**
+     * 鍏佽鎵ц鏌ヨ锛堥伩鍏嶉槦鍒椾腑绛夊緟鎻掑叆鐨勫お澶氾紝瀵艰嚧鍐呭瓨婧㈠嚭锛�
+     *
+     * @param tableName
+     * @return
+     */
+    private boolean allowQuery(String tableName) {
+        synchronized (tableName.intern()) {
+            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
+        }
+    }
 
-	/**
-	 * 鑾峰彇椤垫暟锛屽悜涓婂彇鏁�
-	 *
-	 * @param count
-	 * @param size
-	 * @return
-	 */
-	private int ceilPage(int count, int size) {
-		if (size == 0) {
-			if (count == 0) {
-				return 0;
-			} else {
-				throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
-			}
-		}
-		return count / size + (count % size == 0 ? 0 : 1);
-	}
+    /**
+     * 鎻掑叆閿欒鏃ュ織
+     *
+     * @param tableName
+     * @param error
+     */
+    private void appendErrorLog(String tableName, String error) {
+        synchronized (tableName.intern()) {
+            StringBuilder errorSb = errorLogMap.get(tableName);
+            if (errorSb == null) {
+                errorSb = new StringBuilder(128);
+                errorLogMap.put(tableName, errorSb);
+            }
+            if (errorSb.length() > 0) {
+                errorSb.append("\n");
+            }
+            if (errorSb.length() < 2000) {
+                errorSb.append(error);
+            }
+        }
+    }
+
+    /**
+     * 鑾峰彇椤垫暟锛屽悜涓婂彇鏁�
+     *
+     * @param count
+     * @param size
+     * @return
+     */
+    private int ceilPage(int count, int size) {
+        if (size == 0) {
+            if (count == 0) {
+                return 0;
+            } else {
+                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
+            }
+        }
+        return count / size + (count % size == 0 ? 0 : 1);
+    }
 }

--
Gitblit v1.9.2