From cc9ebffc57e6343745cb1eadc47360d6107936bc Mon Sep 17 00:00:00 2001 From: xpc <1821349743@qq.com> Date: 星期二, 30 一月 2024 19:04:26 +0800 Subject: [PATCH] commit --- product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java | 745 +++++++++++++++++++++++++++++---------------------------- 1 files changed, 380 insertions(+), 365 deletions(-) diff --git a/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java b/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java index 8cccab4..a2f6daa 100644 --- a/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java +++ b/product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java @@ -1,5 +1,6 @@ package com.product.data.center.service; +import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.RandomUtil; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -34,385 +35,399 @@ */ @Service public class DataArchivingQueue extends AbstractBaseService { - // 鏌ヨ闃熷垪map锛歮ap<琛ㄥ悕锛宒te闃熷垪> - private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>(); - // 鏌ヨ瀛樻椿map锛氭鍦ㄦ墽琛屾煡璇㈢殑琛紝瀵瑰簲鐨勫�间负鎵ц鐨勭嚎绋嬫暟锛屼负0鏍囪瘑宸茬粡鎵ц瀹屾垚 - private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap(); - // 鏌ヨ绾跨▼map - private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap(); - // 閿欒鏃ュ織map - private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap(); - // 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟 - private static final int QUERY_THREAD_COUNT = 3; - // 鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆★紙鍗曡〃鏌ヨ鏈�澶ф壒娆�=鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆� + 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟锛� - private static final int QUERY_MAX_BATCH_COUNT = 4; - // 鏌ヨ姣忛〉澶у皬 - private static final int QUERY_PAGE_SIZE = 50000; - // 鎻掑叆姣忛〉澶у皬 - public static final int INSERT_PAGE_SIZE = 5000; + // 鏌ヨ闃熷垪map锛歮ap<琛ㄥ悕锛宒te闃熷垪> + private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>(); + // 鏌ヨ瀛樻椿map锛氭鍦ㄦ墽琛屾煡璇㈢殑琛紝瀵瑰簲鐨勫�间负鎵ц鐨勭嚎绋嬫暟锛屼负0鏍囪瘑宸茬粡鎵ц瀹屾垚 + private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap(); + // 鏌ヨ绾跨▼map + private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap(); + // 閿欒鏃ュ織map + private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap(); + // 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟 + private static final int QUERY_THREAD_COUNT = 3; + // 鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆★紙鍗曡〃鏌ヨ鏈�澶ф壒娆�=鍗曡〃楠岃瘉鏌ヨ鏈�澶ф壒娆� + 鍗曡〃鏌ヨ鏈�澶х嚎绋嬫暟锛� + private static final int QUERY_MAX_BATCH_COUNT = 4; + // 鏌ヨ姣忛〉澶у皬 + private static final int QUERY_PAGE_SIZE = 50000; + // 鎻掑叆姣忛〉澶у皬 + public static final int INSERT_PAGE_SIZE = 5000; - /** - * 鏌ヨ - * - * @param sourceDbe - * @param sourceTable - * @param filter - * @param params - * @param uniqueField 涓婚敭锛屼笉浠呬細鐢ㄤ簬璇嗗埆杩樹細鐢ㄤ簬鎺掑簭闃叉oracle鍒嗛〉鑾峰彇鍒伴噸澶嶆暟鎹� - * @param minID - */ - public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { - Dao sourceDao = null; - try { - StringBuilder countSql = new StringBuilder(128); - countSql.append("select count(*) count_value from ").append(sourceTable); - if (!StringUtils.isEmpty(filter)) { - countSql.append(" where ").append(filter); - } - sourceDao = sourceDbe.newDao(); - FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); - int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); - int partCount = getPartCount(totalCount); - shutdownQueryThread(sourceTable); - ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); - queryThreadMap.put(sourceTable, executorService); - StringBuilder rangeSql = new StringBuilder(128); - String tempPartMinID = minID; - int partSize = partCount * QUERY_PAGE_SIZE; - int count = ceilPage(totalCount, partSize); - for (int i = 1; i <= count; i++) { - rangeSql.setLength(0); - if (!StringUtils.isEmpty(filter)) { - rangeSql.append(filter).append(" and "); - } - rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); - DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); - if (DataTableEntity.isEmpty(rangeDte)) { - continue; - } - FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); - String curPartMaxID = rangeFse.getString("max_id"); - String curPartMinID = tempPartMinID; - executorService.submit(() -> { - String threadInfo = String.valueOf(Thread.currentThread().getId()); - Dao threadSourceDao = null; - String thisPartMinID = curPartMinID; - try { - threadSourceDao = sourceDbe.newDao(); - startQuery(sourceTable, threadInfo); - int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); - StringBuilder tempFilter = new StringBuilder(128); - for (int j = 0; j < totalPage; j++) { - while (!allowQuery(sourceTable)) { - Thread.sleep(RandomUtil.randomInt(800, 1200)); - } - tempFilter.setLength(0); - tempFilter.append(uniqueField); - WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); - if (minID.equals(thisPartMinID)) { - tempFilter.append(">="); - } else { - tempFilter.append(">"); - } - tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); - if (!StringUtils.isEmpty(filter)) { - tempFilter.append(" and ").append(filter); - } - WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage锛�" + (j + 1) + "-pageSize锛�" + QUERY_PAGE_SIZE + "-filter锛�" + tempFilter); - DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); - if (!DataTableEntity.isEmpty(allDte)) { - add(sourceTable, allDte); - thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); - } else { - break; - } - } - } catch (Exception e) { - appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); - SpringMVCContextHolder.getSystemLogger().error(e); - clear(sourceTable); - } finally { - if (threadSourceDao != null) { - threadSourceDao.closeConnection(); - } - finalQuery(sourceTable, threadInfo); - } - }); - tempPartMinID = curPartMaxID; - } - } catch (Exception e) { - throw e; - } finally { - if (sourceDao != null) { - sourceDao.closeConnection(); - } - } - } + /** + * 鏌ヨ + * + * @param sourceDbe + * @param sourceTable + * @param filter + * @param params + * @param uniqueField 涓婚敭锛屼笉浠呬細鐢ㄤ簬璇嗗埆杩樹細鐢ㄤ簬鎺掑簭闃叉oracle鍒嗛〉鑾峰彇鍒伴噸澶嶆暟鎹� + * @param minID + */ + public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { + Dao sourceDao = null; + try { + StringBuilder countSql = new StringBuilder(128); + countSql.append("select count(*) count_value from ").append(sourceTable); + if (!StringUtils.isEmpty(filter)) { + countSql.append(" where ").append(filter); + } + sourceDao = sourceDbe.newDao(); + FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); + int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); + int partCount = getPartCount(totalCount); + shutdownQueryThread(sourceTable); + ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); + queryThreadMap.put(sourceTable, executorService); + StringBuilder rangeSql = new StringBuilder(128); + String tempPartMinID = minID; + int partSize = partCount * QUERY_PAGE_SIZE; + int count = ceilPage(totalCount, partSize); + for (int i = 1; i <= count; i++) { + rangeSql.setLength(0); + if (!StringUtils.isEmpty(filter)) { + rangeSql.append(filter).append(" and "); + } + rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); + DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); + if (DataTableEntity.isEmpty(rangeDte)) { + continue; + } + FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); + String curPartMaxID = rangeFse.getString("max_id"); + String curPartMinID = tempPartMinID; + executorService.submit(() -> { + String threadInfo = String.valueOf(Thread.currentThread().getId()); + Dao threadSourceDao = null; + String thisPartMinID = curPartMinID; + try { + threadSourceDao = sourceDbe.newDao(); + startQuery(sourceTable, threadInfo); + int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); + StringBuilder tempFilter = new StringBuilder(128); + for (int j = 0; j < totalPage; j++) { + //鏌ヨ鏃舵鏌ラ槦鍒楁暟閲忥紝濡傛灉瓒呰繃鏈�澶у�煎垯绛夊緟 + checkQueueCount(sourceTable); + while (!allowQuery(sourceTable)) { + Thread.sleep(RandomUtil.randomInt(800, 1200)); + } + tempFilter.setLength(0); + tempFilter.append(uniqueField); + WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); + if (minID.equals(thisPartMinID)) { + tempFilter.append(">="); + } else { + tempFilter.append(">"); + } + tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); + if (!StringUtils.isEmpty(filter)) { + tempFilter.append(" and ").append(filter); + } + WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage锛�" + (j + 1) + "-pageSize锛�" + QUERY_PAGE_SIZE + "-filter锛�" + tempFilter); + DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); + if (!DataTableEntity.isEmpty(allDte)) { + add(sourceTable, allDte); + thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); + } else { + break; + } + } + } catch (Exception e) { + appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); + SpringMVCContextHolder.getSystemLogger().error(e); + clear(sourceTable); + } finally { + if (threadSourceDao != null) { + threadSourceDao.closeConnection(); + } + finalQuery(sourceTable, threadInfo); + } + }); + tempPartMinID = curPartMaxID; + } + } catch (Exception e) { + throw e; + } finally { + if (sourceDao != null) { + sourceDao.closeConnection(); + } + } + } - /** - * 浠庨槦鍒椾腑鑾峰彇 - * - * @param tableName - * @return - */ - public DataTableEntity get(String tableName) { - WriteUtil.append("DA-浠庨槦鍒椾腑鑾峰彇-琛ㄥ悕锛�" + tableName); - synchronized (tableName.intern()) { - LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); - if (queryQueue == null) { - return null; - } - return queryQueue.poll(); - } - } + /** + * 浠庨槦鍒椾腑鑾峰彇 + * + * @param tableName + * @return + */ + public DataTableEntity get(String tableName) { + WriteUtil.append("DA-浠庨槦鍒椾腑鑾峰彇-琛ㄥ悕锛�" + tableName); + synchronized (tableName.intern()) { + LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); + if (queryQueue == null) { + return null; + } + return queryQueue.poll(); + } + } - /** - * 鍒ゅ畾鏄惁鏌ヨ瀹屾瘯 - * - * @param tableName - * @return - */ - public boolean checkQueryFinish(String tableName) { - WriteUtil.append("DA-鍒ゅ畾鏄惁鏌ヨ瀹屾瘯"); - Set<String> set = existsQueryMap.get(tableName); - return set == null || set.isEmpty(); - } + /** + * 鍒ゅ畾鏄惁鏌ヨ瀹屾瘯 + * + * @param tableName + * @return + */ + public boolean checkQueryFinish(String tableName) { + WriteUtil.append("DA-鍒ゅ畾鏄惁鏌ヨ瀹屾瘯"); + Set<String> set = existsQueryMap.get(tableName); + return set == null || set.isEmpty(); + } - /** - * 鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖ - * - * @param tableName - * @return - */ - public boolean checkInsertQueueEmpty(String tableName) { - WriteUtil.append("DA-鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖"); - return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); - } + /** + * 鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖ + * + * @param tableName + * @return + */ + public boolean checkInsertQueueEmpty(String tableName) { + WriteUtil.append("DA-鍒ゅ畾鎻掑叆闃熷垪锛堟煡璇㈠畬鎴愬悗鏀惧叆鐨勯槦鍒楋級鏄惁涓虹┖"); + return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); + } - /** - * 鍏抽棴鏌ヨ绾跨▼ - * - * @param tableName - */ - public void shutdownQueryThread(String tableName) { - synchronized (tableName.intern()) { - ExecutorService executorService = queryThreadMap.get(tableName); - if (executorService != null) { - if (!executorService.isShutdown()) { - executorService.shutdown(); - } - queryThreadMap.remove(tableName); - } - } - } + /** + * 鍏抽棴鏌ヨ绾跨▼ + * + * @param tableName + */ + public void shutdownQueryThread(String tableName) { + synchronized (tableName.intern()) { + ExecutorService executorService = queryThreadMap.get(tableName); + if (executorService != null) { + if (!executorService.isShutdown()) { + executorService.shutdown(); + } + queryThreadMap.remove(tableName); + } + } + } - /** - * 娓呯悊 - * - * @param tableName - */ - public void clear(String tableName) { - synchronized (tableName.intern()) { - queryMap.remove(tableName); - shutdownQueryThread(tableName); - } - } + /** + * 娓呯悊 + * + * @param tableName + */ + public void clear(String tableName) { + synchronized (tableName.intern()) { + queryMap.remove(tableName); + shutdownQueryThread(tableName); + } + } - /** - * 鎻愬彇閿欒鏃ュ織锛屽彧鑳芥彁鍙栦竴娆★紝鎻愬彇鍚庝細鐩存帴娓呯┖ - * - * @param tableName - * @return - */ - public String getErrorLog(String tableName) { - synchronized (tableName.intern()) { - if (errorLogMap == null) { - return null; - } else { - StringBuilder result = errorLogMap.get(tableName); - errorLogMap.remove(tableName); - return result == null ? null : result.toString(); - } - } - } + /** + * 鎻愬彇閿欒鏃ュ織锛屽彧鑳芥彁鍙栦竴娆★紝鎻愬彇鍚庝細鐩存帴娓呯┖ + * + * @param tableName + * @return + */ + public String getErrorLog(String tableName) { + synchronized (tableName.intern()) { + if (errorLogMap == null) { + return null; + } else { + StringBuilder result = errorLogMap.get(tableName); + errorLogMap.remove(tableName); + return result == null ? null : result.toString(); + } + } + } - /** - * 鑾峰彇鍒嗘鐨勬暟閲忥紝鏈�澶�16锛屾渶灏忎负0锛屼负0琛ㄧず鐢ㄤ笉涓婃墍鏈夌殑绾跨▼ - * - * @param totalCount - * @return - */ - private int getPartCount(int totalCount) { - int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; - int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); - if (partCount >= 16) { - return 16; - } else if (partCount >= 8) { - return 8; - } else if (partCount >= 4) { - return 4; - } else if (partCount >= 2) { - return 2; - } else if (partCount >= 1) { - return 1; - } else { - return 0; - } - } + /** + * 鑾峰彇鍒嗘鐨勬暟閲忥紝鏈�澶�16锛屾渶灏忎负0锛屼负0琛ㄧず鐢ㄤ笉涓婃墍鏈夌殑绾跨▼ + * + * @param totalCount + * @return + */ + private int getPartCount(int totalCount) { + int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; + int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); + if (partCount >= 16) { + return 16; + } else if (partCount >= 8) { + return 8; + } else if (partCount >= 4) { + return 4; + } else if (partCount >= 2) { + return 2; + } else if (partCount >= 1) { + return 1; + } else { + return 0; + } + } - /** - * 鑾峰彇sql锛氭煡璇㈣寖鍥村唴鐨勬渶澶d鍊� - * - * @param uniqueField - * @param sourceTable - * @param filter - * @param dbType - * @param pageIndex - * @param pageSize - * @return - */ - private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { - int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; - int finalIndex = startIndex + pageSize; - StringBuilder sql = new StringBuilder(128); - if (DataBaseType.MYSQL.getValue() == dbType) { - sql.append("select max(").append(uniqueField).append(") max_id from ("); - sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); - if (!StringUtils.isEmpty(filter)) { - sql.append("\n where ").append(filter); - } - sql.append("\n order by ").append(uniqueField); - sql.append("\n limit ").append(startIndex).append(",").append(pageSize); - sql.append("\n) t"); - } else if (DataBaseType.ORACLE.getValue() == dbType) { - sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); - sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); - sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); - if (!StringUtils.isEmpty(filter)) { - sql.append("\n WHERE ").append(filter); - } - sql.append("\n ORDER BY ").append(uniqueField); - sql.append("\n ) T1"); - sql.append("\n WHERE ROWNUM<=").append(finalIndex); - sql.append("\n) T2"); - sql.append("\nWHERE R>").append(startIndex); - } - return sql.toString(); - } + /** + * 鑾峰彇sql锛氭煡璇㈣寖鍥村唴鐨勬渶澶d鍊� + * + * @param uniqueField + * @param sourceTable + * @param filter + * @param dbType + * @param pageIndex + * @param pageSize + * @return + */ + private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { + int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; + int finalIndex = startIndex + pageSize; + StringBuilder sql = new StringBuilder(128); + if (DataBaseType.MYSQL.getValue() == dbType) { + sql.append("select max(").append(uniqueField).append(") max_id from ("); + sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); + if (!StringUtils.isEmpty(filter)) { + sql.append("\n where ").append(filter); + } + sql.append("\n order by ").append(uniqueField); + sql.append("\n limit ").append(startIndex).append(",").append(pageSize); + sql.append("\n) t"); + } else if (DataBaseType.ORACLE.getValue() == dbType) { + sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); + sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); + sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); + if (!StringUtils.isEmpty(filter)) { + sql.append("\n WHERE ").append(filter); + } + sql.append("\n ORDER BY ").append(uniqueField); + sql.append("\n ) T1"); + sql.append("\n WHERE ROWNUM<=").append(finalIndex); + sql.append("\n) T2"); + sql.append("\nWHERE R>").append(startIndex); + } + return sql.toString(); + } - /** - * 鏀惧叆闃熷垪 - * - * @param tableName - * @param dte - */ - private void add(String tableName, DataTableEntity dte) { - synchronized (tableName.intern()) { - LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); - if (queryQueue == null) { - queryQueue = new LinkedBlockingQueue<>(); - queryMap.put(tableName, queryQueue); - } - if (tableName.endsWith("BAK20230823")) { - //妫�鏌te涓殑source_info 鍜� pre_master_key 鏄惁涓虹┖ - for (int i = 0; i < dte.getRows(); i++) { - String sourceInfo = dte.getString(i, "source_info"); - String preMasterKey = dte.getString(i, "pre_master_key"); - if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { - dte.setFieldValue(i, "source_info", "ch-kt"); - String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; - dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); - } - } - } + /** + * 妫�鏌ラ槦鍒楁暟閲� + */ + private void checkQueueCount(String tableName) { + //鍒濆鏃堕棿 + long startTime = System.currentTimeMillis(); + while (queryMap.get(tableName) != null && queryMap.get(tableName).size() >= 10) { + //涓庡垵濮嬫椂闂存瘮杈冿紝濡傛灉瓒呰繃10鍒嗛挓鍒欐墦鍗� + if (System.currentTimeMillis() - startTime > 600000) { + WriteUtil.append("DA-闃熷垪鏁伴噺瓒呰繃10锛屽綋鍓嶆暟閲忥細" + queryMap.get(tableName).size()); + startTime = System.currentTimeMillis(); + } + ThreadUtil.sleep(10000); + } + } - queryQueue.add(dte); - WriteUtil.append("DA-鎴愬姛鏀惧叆闃熷垪-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size()); - while (queryQueue.size() >= 10) { - SpringMVCContextHolder.getSystemLogger().error("DA-闃熷垪宸叉弧-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size()); - } - } - } + /** + * 鏀惧叆闃熷垪 + * + * @param tableName + * @param dte + */ + private void add(String tableName, DataTableEntity dte) { + synchronized (tableName.intern()) { + LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); + if (queryQueue == null) { + queryQueue = new LinkedBlockingQueue<>(); + queryMap.put(tableName, queryQueue); + } + if (tableName.endsWith("BAK20230823")) { + //妫�鏌te涓殑source_info 鍜� pre_master_key 鏄惁涓虹┖ + for (int i = 0; i < dte.getRows(); i++) { + String sourceInfo = dte.getString(i, "source_info"); + String preMasterKey = dte.getString(i, "pre_master_key"); + if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { + dte.setFieldValue(i, "source_info", "ch-kt"); + String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; + dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); + } + } + } + queryQueue.add(dte); + WriteUtil.append("DA-鎴愬姛鏀惧叆闃熷垪-" + tableName + "-褰撳墠鍓╀綑闃熷垪鏁帮細" + queryQueue.size()); + } + } - /** - * 鏌ヨ寮�濮嬶紝鍚戝瓨娲籱ap涓坊鍔�1 - * - * @param tableName - */ - private void startQuery(String tableName, String threadInfo) { - synchronized (tableName.intern()) { - Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); - set.add(threadInfo); - } - } + /** + * 鏌ヨ寮�濮嬶紝鍚戝瓨娲籱ap涓坊鍔�1 + * + * @param tableName + */ + private void startQuery(String tableName, String threadInfo) { + synchronized (tableName.intern()) { + Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); + set.add(threadInfo); + } + } - /** - * 鏌ヨ缁撴潫锛屽悜瀛樻椿map涓噺灏�1 - * - * @param tableName - */ - private void finalQuery(String tableName, String threadInfo) { - synchronized (tableName.intern()) { - Set<String> set = existsQueryMap.get(tableName); - if (set == null || !set.contains(threadInfo)) { - throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); - } - set.remove(threadInfo); - if (set.isEmpty()) { - existsQueryMap.remove(tableName); - } - } - } + /** + * 鏌ヨ缁撴潫锛屽悜瀛樻椿map涓噺灏�1 + * + * @param tableName + */ + private void finalQuery(String tableName, String threadInfo) { + synchronized (tableName.intern()) { + Set<String> set = existsQueryMap.get(tableName); + if (set == null || !set.contains(threadInfo)) { + throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); + } + set.remove(threadInfo); + if (set.isEmpty()) { + existsQueryMap.remove(tableName); + } + } + } - /** - * 鍏佽鎵ц鏌ヨ锛堥伩鍏嶉槦鍒椾腑绛夊緟鎻掑叆鐨勫お澶氾紝瀵艰嚧鍐呭瓨婧㈠嚭锛� - * - * @param tableName - * @return - */ - private boolean allowQuery(String tableName) { - synchronized (tableName.intern()) { - return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; - } - } + /** + * 鍏佽鎵ц鏌ヨ锛堥伩鍏嶉槦鍒椾腑绛夊緟鎻掑叆鐨勫お澶氾紝瀵艰嚧鍐呭瓨婧㈠嚭锛� + * + * @param tableName + * @return + */ + private boolean allowQuery(String tableName) { + synchronized (tableName.intern()) { + return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; + } + } - /** - * 鎻掑叆閿欒鏃ュ織 - * - * @param tableName - * @param error - */ - private void appendErrorLog(String tableName, String error) { - synchronized (tableName.intern()) { - StringBuilder errorSb = errorLogMap.get(tableName); - if (errorSb == null) { - errorSb = new StringBuilder(128); - errorLogMap.put(tableName, errorSb); - } - if (errorSb.length() > 0) { - errorSb.append("\n"); - } - if (errorSb.length() < 2000) { - errorSb.append(error); - } - } - } + /** + * 鎻掑叆閿欒鏃ュ織 + * + * @param tableName + * @param error + */ + private void appendErrorLog(String tableName, String error) { + synchronized (tableName.intern()) { + StringBuilder errorSb = errorLogMap.get(tableName); + if (errorSb == null) { + errorSb = new StringBuilder(128); + errorLogMap.put(tableName, errorSb); + } + if (errorSb.length() > 0) { + errorSb.append("\n"); + } + if (errorSb.length() < 2000) { + errorSb.append(error); + } + } + } - /** - * 鑾峰彇椤垫暟锛屽悜涓婂彇鏁� - * - * @param count - * @param size - * @return - */ - private int ceilPage(int count, int size) { - if (size == 0) { - if (count == 0) { - return 0; - } else { - throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); - } - } - return count / size + (count % size == 0 ? 0 : 1); - } + /** + * 鑾峰彇椤垫暟锛屽悜涓婂彇鏁� + * + * @param count + * @param size + * @return + */ + private int ceilPage(int count, int size) { + if (size == 0) { + if (count == 0) { + return 0; + } else { + throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); + } + } + return count / size + (count % size == 0 ? 0 : 1); + } } -- Gitblit v1.9.2