From cc9ebffc57e6343745cb1eadc47360d6107936bc Mon Sep 17 00:00:00 2001
From: xpc <1821349743@qq.com>
Date: 星期二, 30 一月 2024 19:04:26 +0800
Subject: [PATCH] commit

---
 product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java |  745 +++++++++++++++++++++++++++++----------------------------
 1 files changed, 380 insertions(+), 365 deletions(-)

diff --git a/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java b/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
index 8cccab4..a2f6daa 100644
--- a/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
+++ b/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -1,5 +1,6 @@
 package com.product.data.center.service;
 
+import cn.hutool.core.thread.ThreadUtil;
 import cn.hutool.core.util.RandomUtil;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -34,385 +35,399 @@
  */
 @Service
 public class DataArchivingQueue extends AbstractBaseService {
-    // 鏌ヨ闃熷垪map锛歮ap<琛ㄥ悕锛宒te闃熷垪>
-    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
-    // 鏌ヨ瀛樻椿map锛氭鍦ㄦ墽琛屾煡璇㈢殑琛紝瀵瑰簲鐨勫�间负鎵ц鐨勭嚎绋嬫暟锛屼负0鏍囪瘑宸茬粡鎵ц瀹屾垚
-    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
-    // 鏌ヨ绾跨▼map
-    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
-    // 閿欒鏃ュ織map
-    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
-    // 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟
-    private static final int QUERY_THREAD_COUNT = 3;
-    // 鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆★紙鍗曡〃鏌ヨ鏈�澶ф壒娆�=鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆� + 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟锛�
-    private static final int QUERY_MAX_BATCH_COUNT = 4;
-    // 鏌ヨ姣忛〉澶у皬
-    private static final int QUERY_PAGE_SIZE = 50000;
-    // 鎻掑叆姣忛〉澶у皬
-    public static final int INSERT_PAGE_SIZE = 5000;
+	// 鏌ヨ闃熷垪map锛歮ap<琛ㄥ悕锛宒te闃熷垪>
+	private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
+	// 鏌ヨ瀛樻椿map锛氭鍦ㄦ墽琛屾煡璇㈢殑琛紝瀵瑰簲鐨勫�间负鎵ц鐨勭嚎绋嬫暟锛屼负0鏍囪瘑宸茬粡鎵ц瀹屾垚
+	private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
+	// 鏌ヨ绾跨▼map
+	private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
+	// 閿欒鏃ュ織map
+	private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
+	// 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟
+	private static final int QUERY_THREAD_COUNT = 3;
+	// 鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆★紙鍗曡〃鏌ヨ鏈�澶ф壒娆�=鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆� + 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟锛�
+	private static final int QUERY_MAX_BATCH_COUNT = 4;
+	// 鏌ヨ姣忛〉澶у皬
+	private static final int QUERY_PAGE_SIZE = 50000;
+	// 鎻掑叆姣忛〉澶у皬
+	public static final int INSERT_PAGE_SIZE = 5000;
 
-    /**
-     * 鏌ヨ
-     *
-     * @param sourceDbe
-     * @param sourceTable
-     * @param filter
-     * @param params
-     * @param uniqueField 涓婚敭锛屼笉浠呬細鐢ㄤ簬璇嗗埆杩樹細鐢ㄤ簬鎺掑簭闃叉oracle鍒嗛〉鑾峰彇鍒伴噸澶嶆暟鎹�
-     * @param minID
-     */
-    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
-        Dao sourceDao = null;
-        try {
-            StringBuilder countSql = new StringBuilder(128);
-            countSql.append("select count(*) count_value from ").append(sourceTable);
-            if (!StringUtils.isEmpty(filter)) {
-                countSql.append(" where ").append(filter);
-            }
-            sourceDao = sourceDbe.newDao();
-            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
-            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
-            int partCount = getPartCount(totalCount);
-            shutdownQueryThread(sourceTable);
-            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
-            queryThreadMap.put(sourceTable, executorService);
-            StringBuilder rangeSql = new StringBuilder(128);
-            String tempPartMinID = minID;
-            int partSize = partCount * QUERY_PAGE_SIZE;
-            int count = ceilPage(totalCount, partSize);
-            for (int i = 1; i <= count; i++) {
-                rangeSql.setLength(0);
-                if (!StringUtils.isEmpty(filter)) {
-                    rangeSql.append(filter).append(" and ");
-                }
-                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
-                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
-                if (DataTableEntity.isEmpty(rangeDte)) {
-                    continue;
-                }
-                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
-                String curPartMaxID = rangeFse.getString("max_id");
-                String curPartMinID = tempPartMinID;
-                executorService.submit(() -> {
-                    String threadInfo = String.valueOf(Thread.currentThread().getId());
-                    Dao threadSourceDao = null;
-                    String thisPartMinID = curPartMinID;
-                    try {
-                        threadSourceDao = sourceDbe.newDao();
-                        startQuery(sourceTable, threadInfo);
-                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
-                        StringBuilder tempFilter = new StringBuilder(128);
-                        for (int j = 0; j < totalPage; j++) {
-                            while (!allowQuery(sourceTable)) {
-                                Thread.sleep(RandomUtil.randomInt(800, 1200));
-                            }
-                            tempFilter.setLength(0);
-                            tempFilter.append(uniqueField);
-                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
-                            if (minID.equals(thisPartMinID)) {
-                                tempFilter.append(">=");
-                            } else {
-                                tempFilter.append(">");
-                            }
-                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
-                            if (!StringUtils.isEmpty(filter)) {
-                                tempFilter.append(" and ").append(filter);
-                            }
-                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage锛�" + (j + 1) + "-pageSize锛�" + QUERY_PAGE_SIZE + "-filter锛�" + tempFilter);
-                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
-                            if (!DataTableEntity.isEmpty(allDte)) {
-                                add(sourceTable, allDte);
-                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
-                            } else {
-                                break;
-                            }
-                        }
-                    } catch (Exception e) {
-                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
-                        SpringMVCContextHolder.getSystemLogger().error(e);
-                        clear(sourceTable);
-                    } finally {
-                        if (threadSourceDao != null) {
-                            threadSourceDao.closeConnection();
-                        }
-                        finalQuery(sourceTable, threadInfo);
-                    }
-                });
-                tempPartMinID = curPartMaxID;
-            }
-        } catch (Exception e) {
-            throw e;
-        } finally {
-            if (sourceDao != null) {
-                sourceDao.closeConnection();
-            }
-        }
-    }
+	/**
+	 * 鏌ヨ
+	 *
+	 * @param sourceDbe
+	 * @param sourceTable
+	 * @param filter
+	 * @param params
+	 * @param uniqueField 涓婚敭锛屼笉浠呬細鐢ㄤ簬璇嗗埆杩樹細鐢ㄤ簬鎺掑簭闃叉oracle鍒嗛〉鑾峰彇鍒伴噸澶嶆暟鎹�
+	 * @param minID
+	 */
+	public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
+		Dao sourceDao = null;
+		try {
+			StringBuilder countSql = new StringBuilder(128);
+			countSql.append("select count(*) count_value from ").append(sourceTable);
+			if (!StringUtils.isEmpty(filter)) {
+				countSql.append(" where ").append(filter);
+			}
+			sourceDao = sourceDbe.newDao();
+			FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
+			int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
+			int partCount = getPartCount(totalCount);
+			shutdownQueryThread(sourceTable);
+			ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
+			queryThreadMap.put(sourceTable, executorService);
+			StringBuilder rangeSql = new StringBuilder(128);
+			String tempPartMinID = minID;
+			int partSize = partCount * QUERY_PAGE_SIZE;
+			int count = ceilPage(totalCount, partSize);
+			for (int i = 1; i <= count; i++) {
+				rangeSql.setLength(0);
+				if (!StringUtils.isEmpty(filter)) {
+					rangeSql.append(filter).append(" and ");
+				}
+				rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
+				DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
+				if (DataTableEntity.isEmpty(rangeDte)) {
+					continue;
+				}
+				FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
+				String curPartMaxID = rangeFse.getString("max_id");
+				String curPartMinID = tempPartMinID;
+				executorService.submit(() -> {
+					String threadInfo = String.valueOf(Thread.currentThread().getId());
+					Dao threadSourceDao = null;
+					String thisPartMinID = curPartMinID;
+					try {
+						threadSourceDao = sourceDbe.newDao();
+						startQuery(sourceTable, threadInfo);
+						int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
+						StringBuilder tempFilter = new StringBuilder(128);
+						for (int j = 0; j < totalPage; j++) {
+							//鏌ヨ鏃舵鏌ラ槦鍒楁暟閲忥紝濡傛灉瓒呰繃鏈�澶у�煎垯绛夊緟
+							checkQueueCount(sourceTable);
+							while (!allowQuery(sourceTable)) {
+								Thread.sleep(RandomUtil.randomInt(800, 1200));
+							}
+							tempFilter.setLength(0);
+							tempFilter.append(uniqueField);
+							WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
+							if (minID.equals(thisPartMinID)) {
+								tempFilter.append(">=");
+							} else {
+								tempFilter.append(">");
+							}
+							tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
+							if (!StringUtils.isEmpty(filter)) {
+								tempFilter.append(" and ").append(filter);
+							}
+							WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage锛�" + (j + 1) + "-pageSize锛�" + QUERY_PAGE_SIZE + "-filter锛�" + tempFilter);
+							DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
+							if (!DataTableEntity.isEmpty(allDte)) {
+								add(sourceTable, allDte);
+								thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
+							} else {
+								break;
+							}
+						}
+					} catch (Exception e) {
+						appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
+						SpringMVCContextHolder.getSystemLogger().error(e);
+						clear(sourceTable);
+					} finally {
+						if (threadSourceDao != null) {
+							threadSourceDao.closeConnection();
+						}
+						finalQuery(sourceTable, threadInfo);
+					}
+				});
+				tempPartMinID = curPartMaxID;
+			}
+		} catch (Exception e) {
+			throw e;
+		} finally {
+			if (sourceDao != null) {
+				sourceDao.closeConnection();
+			}
+		}
+	}
 
-    /**
-     * 浠庨槦鍒椾腑鑾峰彇
-     *
-     * @param tableName
-     * @return
-     */
-    public DataTableEntity get(String tableName) {
-        WriteUtil.append("DA-浠庨槦鍒椾腑鑾峰彇-琛ㄥ悕锛�" + tableName);
-        synchronized (tableName.intern()) {
-            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
-            if (queryQueue == null) {
-                return null;
-            }
-            return queryQueue.poll();
-        }
-    }
+	/**
+	 * 浠庨槦鍒椾腑鑾峰彇
+	 *
+	 * @param tableName
+	 * @return
+	 */
+	public DataTableEntity get(String tableName) {
+		WriteUtil.append("DA-浠庨槦鍒椾腑鑾峰彇-琛ㄥ悕锛�" + tableName);
+		synchronized (tableName.intern()) {
+			LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
+			if (queryQueue == null) {
+				return null;
+			}
+			return queryQueue.poll();
+		}
+	}
 
-    /**
-     * 鍒ゅ畾鏄惁鏌ヨ瀹屾瘯
-     *
-     * @param tableName
-     * @return
-     */
-    public boolean checkQueryFinish(String tableName) {
-        WriteUtil.append("DA-鍒ゅ畾鏄惁鏌ヨ瀹屾瘯");
-        Set<String> set = existsQueryMap.get(tableName);
-        return set == null || set.isEmpty();
-    }
+	/**
+	 * 鍒ゅ畾鏄惁鏌ヨ瀹屾瘯
+	 *
+	 * @param tableName
+	 * @return
+	 */
+	public boolean checkQueryFinish(String tableName) {
+		WriteUtil.append("DA-鍒ゅ畾鏄惁鏌ヨ瀹屾瘯");
+		Set<String> set = existsQueryMap.get(tableName);
+		return set == null || set.isEmpty();
+	}
 
-    /**
-     * 鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖
-     *
-     * @param tableName
-     * @return
-     */
-    public boolean checkInsertQueueEmpty(String tableName) {
-        WriteUtil.append("DA-鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖");
-        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
-    }
+	/**
+	 * 鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖
+	 *
+	 * @param tableName
+	 * @return
+	 */
+	public boolean checkInsertQueueEmpty(String tableName) {
+		WriteUtil.append("DA-鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖");
+		return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
+	}
 
-    /**
-     * 鍏抽棴鏌ヨ绾跨▼
-     *
-     * @param tableName
-     */
-    public void shutdownQueryThread(String tableName) {
-        synchronized (tableName.intern()) {
-            ExecutorService executorService = queryThreadMap.get(tableName);
-            if (executorService != null) {
-                if (!executorService.isShutdown()) {
-                    executorService.shutdown();
-                }
-                queryThreadMap.remove(tableName);
-            }
-        }
-    }
+	/**
+	 * 鍏抽棴鏌ヨ绾跨▼
+	 *
+	 * @param tableName
+	 */
+	public void shutdownQueryThread(String tableName) {
+		synchronized (tableName.intern()) {
+			ExecutorService executorService = queryThreadMap.get(tableName);
+			if (executorService != null) {
+				if (!executorService.isShutdown()) {
+					executorService.shutdown();
+				}
+				queryThreadMap.remove(tableName);
+			}
+		}
+	}
 
-    /**
-     * 娓呯悊
-     *
-     * @param tableName
-     */
-    public void clear(String tableName) {
-        synchronized (tableName.intern()) {
-            queryMap.remove(tableName);
-            shutdownQueryThread(tableName);
-        }
-    }
+	/**
+	 * 娓呯悊
+	 *
+	 * @param tableName
+	 */
+	public void clear(String tableName) {
+		synchronized (tableName.intern()) {
+			queryMap.remove(tableName);
+			shutdownQueryThread(tableName);
+		}
+	}
 
-    /**
-     * 鎻愬彇閿欒鏃ュ織锛屽彧鑳芥彁鍙栦竴娆★紝鎻愬彇鍚庝細鐩存帴娓呯┖
-     *
-     * @param tableName
-     * @return
-     */
-    public String getErrorLog(String tableName) {
-        synchronized (tableName.intern()) {
-            if (errorLogMap == null) {
-                return null;
-            } else {
-                StringBuilder result = errorLogMap.get(tableName);
-                errorLogMap.remove(tableName);
-                return result == null ? null : result.toString();
-            }
-        }
-    }
+	/**
+	 * 鎻愬彇閿欒鏃ュ織锛屽彧鑳芥彁鍙栦竴娆★紝鎻愬彇鍚庝細鐩存帴娓呯┖
+	 *
+	 * @param tableName
+	 * @return
+	 */
+	public String getErrorLog(String tableName) {
+		synchronized (tableName.intern()) {
+			if (errorLogMap == null) {
+				return null;
+			} else {
+				StringBuilder result = errorLogMap.get(tableName);
+				errorLogMap.remove(tableName);
+				return result == null ? null : result.toString();
+			}
+		}
+	}
 
-    /**
-     * 鑾峰彇鍒嗘鐨勬暟閲忥紝鏈�澶�16锛屾渶灏忎负0锛屼负0琛ㄧず鐢ㄤ笉涓婃墍鏈夌殑绾跨▼
-     *
-     * @param totalCount
-     * @return
-     */
-    private int getPartCount(int totalCount) {
-        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
-        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
-        if (partCount >= 16) {
-            return 16;
-        } else if (partCount >= 8) {
-            return 8;
-        } else if (partCount >= 4) {
-            return 4;
-        } else if (partCount >= 2) {
-            return 2;
-        } else if (partCount >= 1) {
-            return 1;
-        } else {
-            return 0;
-        }
-    }
+	/**
+	 * 鑾峰彇鍒嗘鐨勬暟閲忥紝鏈�澶�16锛屾渶灏忎负0锛屼负0琛ㄧず鐢ㄤ笉涓婃墍鏈夌殑绾跨▼
+	 *
+	 * @param totalCount
+	 * @return
+	 */
+	private int getPartCount(int totalCount) {
+		int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
+		int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
+		if (partCount >= 16) {
+			return 16;
+		} else if (partCount >= 8) {
+			return 8;
+		} else if (partCount >= 4) {
+			return 4;
+		} else if (partCount >= 2) {
+			return 2;
+		} else if (partCount >= 1) {
+			return 1;
+		} else {
+			return 0;
+		}
+	}
 
-    /**
-     * 鑾峰彇sql锛氭煡璇㈣寖鍥村唴鐨勬渶澶d鍊�
-     *
-     * @param uniqueField
-     * @param sourceTable
-     * @param filter
-     * @param dbType
-     * @param pageIndex
-     * @param pageSize
-     * @return
-     */
-    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
-        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
-        int finalIndex = startIndex + pageSize;
-        StringBuilder sql = new StringBuilder(128);
-        if (DataBaseType.MYSQL.getValue() == dbType) {
-            sql.append("select max(").append(uniqueField).append(") max_id from (");
-            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
-            if (!StringUtils.isEmpty(filter)) {
-                sql.append("\n    where ").append(filter);
-            }
-            sql.append("\n    order by ").append(uniqueField);
-            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
-            sql.append("\n) t");
-        } else if (DataBaseType.ORACLE.getValue() == dbType) {
-            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
-            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
-            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
-            if (!StringUtils.isEmpty(filter)) {
-                sql.append("\n        WHERE ").append(filter);
-            }
-            sql.append("\n        ORDER BY ").append(uniqueField);
-            sql.append("\n    ) T1");
-            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
-            sql.append("\n) T2");
-            sql.append("\nWHERE R>").append(startIndex);
-        }
-        return sql.toString();
-    }
+	/**
+	 * 鑾峰彇sql锛氭煡璇㈣寖鍥村唴鐨勬渶澶d鍊�
+	 *
+	 * @param uniqueField
+	 * @param sourceTable
+	 * @param filter
+	 * @param dbType
+	 * @param pageIndex
+	 * @param pageSize
+	 * @return
+	 */
+	private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
+		int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
+		int finalIndex = startIndex + pageSize;
+		StringBuilder sql = new StringBuilder(128);
+		if (DataBaseType.MYSQL.getValue() == dbType) {
+			sql.append("select max(").append(uniqueField).append(") max_id from (");
+			sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
+			if (!StringUtils.isEmpty(filter)) {
+				sql.append("\n    where ").append(filter);
+			}
+			sql.append("\n    order by ").append(uniqueField);
+			sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
+			sql.append("\n) t");
+		} else if (DataBaseType.ORACLE.getValue() == dbType) {
+			sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
+			sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
+			sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
+			if (!StringUtils.isEmpty(filter)) {
+				sql.append("\n        WHERE ").append(filter);
+			}
+			sql.append("\n        ORDER BY ").append(uniqueField);
+			sql.append("\n    ) T1");
+			sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
+			sql.append("\n) T2");
+			sql.append("\nWHERE R>").append(startIndex);
+		}
+		return sql.toString();
+	}
 
-    /**
-     * 鏀惧叆闃熷垪
-     *
-     * @param tableName
-     * @param dte
-     */
-    private void add(String tableName, DataTableEntity dte) {
-        synchronized (tableName.intern()) {
-            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
-            if (queryQueue == null) {
-                queryQueue = new LinkedBlockingQueue<>();
-                queryMap.put(tableName, queryQueue);
-            }
-            if (tableName.endsWith("BAK20230823")) {
-                //妫�鏌te涓殑source_info 鍜� pre_master_key 鏄惁涓虹┖
-                for (int i = 0; i < dte.getRows(); i++) {
-                    String sourceInfo = dte.getString(i, "source_info");
-                    String preMasterKey = dte.getString(i, "pre_master_key");
-                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
-                        dte.setFieldValue(i, "source_info", "ch-kt");
-                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
-                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
-                    }
-                }
-            }
+	/**
+	 * 妫�鏌ラ槦鍒楁暟閲�
+	 */
+	private void checkQueueCount(String tableName) {
+		//鍒濆鏃堕棿
+		long startTime = System.currentTimeMillis();
+		while (queryMap.get(tableName) != null && queryMap.get(tableName).size() >= 10) {
+			//涓庡垵濮嬫椂闂存瘮杈冿紝濡傛灉瓒呰繃10鍒嗛挓鍒欐墦鍗�
+			if (System.currentTimeMillis() - startTime > 600000) {
+				WriteUtil.append("DA-闃熷垪鏁伴噺瓒呰繃10锛屽綋鍓嶆暟閲忥細" + queryMap.get(tableName).size());
+				startTime = System.currentTimeMillis();
+			}
+			ThreadUtil.sleep(10000);
+		}
+	}
 
-            queryQueue.add(dte);
-            WriteUtil.append("DA-鎴愬姛鏀惧叆闃熷垪-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size());
-            while (queryQueue.size() >= 10) {
-                SpringMVCContextHolder.getSystemLogger().error("DA-闃熷垪宸叉弧-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size());
-            }
-        }
-    }
+	/**
+	 * 鏀惧叆闃熷垪
+	 *
+	 * @param tableName
+	 * @param dte
+	 */
+	private void add(String tableName, DataTableEntity dte) {
+		synchronized (tableName.intern()) {
+			LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
+			if (queryQueue == null) {
+				queryQueue = new LinkedBlockingQueue<>();
+				queryMap.put(tableName, queryQueue);
+			}
+			if (tableName.endsWith("BAK20230823")) {
+				//妫�鏌te涓殑source_info 鍜� pre_master_key 鏄惁涓虹┖
+				for (int i = 0; i < dte.getRows(); i++) {
+					String sourceInfo = dte.getString(i, "source_info");
+					String preMasterKey = dte.getString(i, "pre_master_key");
+					if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
+						dte.setFieldValue(i, "source_info", "ch-kt");
+						String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
+						dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
+					}
+				}
+			}
+			queryQueue.add(dte);
+			WriteUtil.append("DA-鎴愬姛鏀惧叆闃熷垪-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size());
+		}
+	}
 
-    /**
-     * 鏌ヨ寮�濮嬶紝鍚戝瓨娲籱ap涓坊鍔�1
-     *
-     * @param tableName
-     */
-    private void startQuery(String tableName, String threadInfo) {
-        synchronized (tableName.intern()) {
-            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
-            set.add(threadInfo);
-        }
-    }
+	/**
+	 * 鏌ヨ寮�濮嬶紝鍚戝瓨娲籱ap涓坊鍔�1
+	 *
+	 * @param tableName
+	 */
+	private void startQuery(String tableName, String threadInfo) {
+		synchronized (tableName.intern()) {
+			Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
+			set.add(threadInfo);
+		}
+	}
 
-    /**
-     * 鏌ヨ缁撴潫锛屽悜瀛樻椿map涓噺灏�1
-     *
-     * @param tableName
-     */
-    private void finalQuery(String tableName, String threadInfo) {
-        synchronized (tableName.intern()) {
-            Set<String> set = existsQueryMap.get(tableName);
-            if (set == null || !set.contains(threadInfo)) {
-                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
-            }
-            set.remove(threadInfo);
-            if (set.isEmpty()) {
-                existsQueryMap.remove(tableName);
-            }
-        }
-    }
+	/**
+	 * 鏌ヨ缁撴潫锛屽悜瀛樻椿map涓噺灏�1
+	 *
+	 * @param tableName
+	 */
+	private void finalQuery(String tableName, String threadInfo) {
+		synchronized (tableName.intern()) {
+			Set<String> set = existsQueryMap.get(tableName);
+			if (set == null || !set.contains(threadInfo)) {
+				throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
+			}
+			set.remove(threadInfo);
+			if (set.isEmpty()) {
+				existsQueryMap.remove(tableName);
+			}
+		}
+	}
 
-    /**
-     * 鍏佽鎵ц鏌ヨ锛堥伩鍏嶉槦鍒椾腑绛夊緟鎻掑叆鐨勫お澶氾紝瀵艰嚧鍐呭瓨婧㈠嚭锛�
-     *
-     * @param tableName
-     * @return
-     */
-    private boolean allowQuery(String tableName) {
-        synchronized (tableName.intern()) {
-            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
-        }
-    }
+	/**
+	 * 鍏佽鎵ц鏌ヨ锛堥伩鍏嶉槦鍒椾腑绛夊緟鎻掑叆鐨勫お澶氾紝瀵艰嚧鍐呭瓨婧㈠嚭锛�
+	 *
+	 * @param tableName
+	 * @return
+	 */
+	private boolean allowQuery(String tableName) {
+		synchronized (tableName.intern()) {
+			return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
+		}
+	}
 
-    /**
-     * 鎻掑叆閿欒鏃ュ織
-     *
-     * @param tableName
-     * @param error
-     */
-    private void appendErrorLog(String tableName, String error) {
-        synchronized (tableName.intern()) {
-            StringBuilder errorSb = errorLogMap.get(tableName);
-            if (errorSb == null) {
-                errorSb = new StringBuilder(128);
-                errorLogMap.put(tableName, errorSb);
-            }
-            if (errorSb.length() > 0) {
-                errorSb.append("\n");
-            }
-            if (errorSb.length() < 2000) {
-                errorSb.append(error);
-            }
-        }
-    }
+	/**
+	 * 鎻掑叆閿欒鏃ュ織
+	 *
+	 * @param tableName
+	 * @param error
+	 */
+	private void appendErrorLog(String tableName, String error) {
+		synchronized (tableName.intern()) {
+			StringBuilder errorSb = errorLogMap.get(tableName);
+			if (errorSb == null) {
+				errorSb = new StringBuilder(128);
+				errorLogMap.put(tableName, errorSb);
+			}
+			if (errorSb.length() > 0) {
+				errorSb.append("\n");
+			}
+			if (errorSb.length() < 2000) {
+				errorSb.append(error);
+			}
+		}
+	}
 
-    /**
-     * 鑾峰彇椤垫暟锛屽悜涓婂彇鏁�
-     *
-     * @param count
-     * @param size
-     * @return
-     */
-    private int ceilPage(int count, int size) {
-        if (size == 0) {
-            if (count == 0) {
-                return 0;
-            } else {
-                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
-            }
-        }
-        return count / size + (count % size == 0 ? 0 : 1);
-    }
+	/**
+	 * 鑾峰彇椤垫暟锛屽悜涓婂彇鏁�
+	 *
+	 * @param count
+	 * @param size
+	 * @return
+	 */
+	private int ceilPage(int count, int size) {
+		if (size == 0) {
+			if (count == 0) {
+				return 0;
+			} else {
+				throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
+			}
+		}
+		return count / size + (count % size == 0 ? 0 : 1);
+	}
 }

--
Gitblit v1.9.2