| | |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.util.Locale; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | |
| | | */ |
| | | @Service |
| | | public class DataArchivingQueue extends AbstractBaseService { |
| | | // æ¥è¯¢éåmapï¼map<表åï¼dteéå> |
| | | private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>(); |
| | | // æ¥è¯¢åæ´»mapï¼æ£å¨æ§è¡æ¥è¯¢ç表ï¼å¯¹åºçå¼ä¸ºæ§è¡ççº¿ç¨æ°ï¼ä¸º0æ è¯å·²ç»æ§è¡å®æ |
| | | private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap(); |
| | | // æ¥è¯¢çº¿ç¨map |
| | | private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap(); |
| | | // é误æ¥å¿map |
| | | private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap(); |
| | | // å表æ¥è¯¢æå¤§çº¿ç¨æ° |
| | | private static final int QUERY_THREAD_COUNT = 3; |
| | | // å表éªè¯æ¥è¯¢æå¤§æ¹æ¬¡ï¼å表æ¥è¯¢æå¤§æ¹æ¬¡=å表éªè¯æ¥è¯¢æå¤§æ¹æ¬¡ + å表æ¥è¯¢æå¤§çº¿ç¨æ°ï¼ |
| | | private static final int QUERY_MAX_BATCH_COUNT = 4; |
| | | // æ¥è¯¢æ¯é¡µå¤§å° |
| | | private static final int QUERY_PAGE_SIZE = 50000; |
| | | // æå
¥æ¯é¡µå¤§å° |
| | | public static final int INSERT_PAGE_SIZE = 5000; |
| | | // æ¥è¯¢éåmapï¼map<表åï¼dteéå> |
| | | private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>(); |
| | | // æ¥è¯¢åæ´»mapï¼æ£å¨æ§è¡æ¥è¯¢ç表ï¼å¯¹åºçå¼ä¸ºæ§è¡ççº¿ç¨æ°ï¼ä¸º0æ è¯å·²ç»æ§è¡å®æ |
| | | private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap(); |
| | | // æ¥è¯¢çº¿ç¨map |
| | | private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap(); |
| | | // é误æ¥å¿map |
| | | private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap(); |
| | | // å表æ¥è¯¢æå¤§çº¿ç¨æ° |
| | | private static final int QUERY_THREAD_COUNT = 3; |
| | | // å表éªè¯æ¥è¯¢æå¤§æ¹æ¬¡ï¼å表æ¥è¯¢æå¤§æ¹æ¬¡=å表éªè¯æ¥è¯¢æå¤§æ¹æ¬¡ + å表æ¥è¯¢æå¤§çº¿ç¨æ°ï¼ |
| | | private static final int QUERY_MAX_BATCH_COUNT = 4; |
| | | // æ¥è¯¢æ¯é¡µå¤§å° |
| | | private static final int QUERY_PAGE_SIZE = 50000; |
| | | // æå
¥æ¯é¡µå¤§å° |
| | | public static final int INSERT_PAGE_SIZE = 5000; |
| | | |
| | | /** |
| | | * æ¥è¯¢ |
| | | * @param sourceDbe |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param params |
| | | * @param uniqueField 主é®ï¼ä¸ä»
ä¼ç¨äºè¯å«è¿ä¼ç¨äºæåºé²æ¢oracleå页è·åå°é夿°æ® |
| | | * @param minID |
| | | */ |
| | | public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { |
| | | Dao sourceDao = null; |
| | | try { |
| | | StringBuilder countSql = new StringBuilder(128); |
| | | countSql.append("select count(*) count_value from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | countSql.append(" where ").append(filter); |
| | | } |
| | | sourceDao = sourceDbe.newDao(); |
| | | FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); |
| | | int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); |
| | | int partCount = getPartCount(totalCount); |
| | | shutdownQueryThread(sourceTable); |
| | | ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); |
| | | queryThreadMap.put(sourceTable, executorService); |
| | | StringBuilder rangeSql = new StringBuilder(128); |
| | | String tempPartMinID = minID; |
| | | int partSize = partCount * QUERY_PAGE_SIZE; |
| | | int count = ceilPage(totalCount, partSize); |
| | | for (int i = 1; i <= count; i++) { |
| | | rangeSql.setLength(0); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | rangeSql.append(filter).append(" and "); |
| | | } |
| | | rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); |
| | | DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); |
| | | if (DataTableEntity.isEmpty(rangeDte)) { |
| | | continue; |
| | | } |
| | | FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); |
| | | String curPartMaxID = rangeFse.getString("max_id"); |
| | | String curPartMinID = tempPartMinID; |
| | | executorService.submit(() -> { |
| | | String threadInfo = String.valueOf(Thread.currentThread().getId()); |
| | | Dao threadSourceDao = null; |
| | | String thisPartMinID = curPartMinID; |
| | | try { |
| | | threadSourceDao = sourceDbe.newDao(); |
| | | startQuery(sourceTable, threadInfo); |
| | | int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); |
| | | StringBuilder tempFilter = new StringBuilder(128); |
| | | for (int j = 0; j < totalPage; j++) { |
| | | while (!allowQuery(sourceTable)) { |
| | | Thread.sleep(RandomUtil.randomInt(800, 1200)); |
| | | } |
| | | tempFilter.setLength(0); |
| | | tempFilter.append(uniqueField); |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); |
| | | if (minID.equals(thisPartMinID)) { |
| | | tempFilter.append(">="); |
| | | } else { |
| | | tempFilter.append(">"); |
| | | } |
| | | tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | tempFilter.append(" and ").append(filter); |
| | | } |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPageï¼" + (j + 1) + "-pageSizeï¼" + QUERY_PAGE_SIZE + "-filterï¼" + tempFilter); |
| | | DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); |
| | | if (!DataTableEntity.isEmpty(allDte)) { |
| | | add(sourceTable, allDte); |
| | | thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); |
| | | } else { |
| | | break; |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | clear(sourceTable); |
| | | } finally { |
| | | if (threadSourceDao != null) { |
| | | threadSourceDao.closeConnection(); |
| | | } |
| | | finalQuery(sourceTable, threadInfo); |
| | | } |
| | | }); |
| | | tempPartMinID = curPartMaxID; |
| | | } |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (sourceDao != null) { |
| | | sourceDao.closeConnection(); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * æ¥è¯¢ |
| | | * |
| | | * @param sourceDbe |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param params |
| | | * @param uniqueField 主é®ï¼ä¸ä»
ä¼ç¨äºè¯å«è¿ä¼ç¨äºæåºé²æ¢oracleå页è·åå°é夿°æ® |
| | | * @param minID |
| | | */ |
| | | public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { |
| | | Dao sourceDao = null; |
| | | try { |
| | | StringBuilder countSql = new StringBuilder(128); |
| | | countSql.append("select count(*) count_value from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | countSql.append(" where ").append(filter); |
| | | } |
| | | sourceDao = sourceDbe.newDao(); |
| | | FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); |
| | | int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); |
| | | int partCount = getPartCount(totalCount); |
| | | shutdownQueryThread(sourceTable); |
| | | ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); |
| | | queryThreadMap.put(sourceTable, executorService); |
| | | StringBuilder rangeSql = new StringBuilder(128); |
| | | String tempPartMinID = minID; |
| | | int partSize = partCount * QUERY_PAGE_SIZE; |
| | | int count = ceilPage(totalCount, partSize); |
| | | for (int i = 1; i <= count; i++) { |
| | | rangeSql.setLength(0); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | rangeSql.append(filter).append(" and "); |
| | | } |
| | | rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); |
| | | DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); |
| | | if (DataTableEntity.isEmpty(rangeDte)) { |
| | | continue; |
| | | } |
| | | FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); |
| | | String curPartMaxID = rangeFse.getString("max_id"); |
| | | String curPartMinID = tempPartMinID; |
| | | executorService.submit(() -> { |
| | | String threadInfo = String.valueOf(Thread.currentThread().getId()); |
| | | Dao threadSourceDao = null; |
| | | String thisPartMinID = curPartMinID; |
| | | try { |
| | | threadSourceDao = sourceDbe.newDao(); |
| | | startQuery(sourceTable, threadInfo); |
| | | int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); |
| | | StringBuilder tempFilter = new StringBuilder(128); |
| | | for (int j = 0; j < totalPage; j++) { |
| | | while (!allowQuery(sourceTable)) { |
| | | Thread.sleep(RandomUtil.randomInt(800, 1200)); |
| | | } |
| | | tempFilter.setLength(0); |
| | | tempFilter.append(uniqueField); |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); |
| | | if (minID.equals(thisPartMinID)) { |
| | | tempFilter.append(">="); |
| | | } else { |
| | | tempFilter.append(">"); |
| | | } |
| | | tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | tempFilter.append(" and ").append(filter); |
| | | } |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPageï¼" + (j + 1) + "-pageSizeï¼" + QUERY_PAGE_SIZE + "-filterï¼" + tempFilter); |
| | | DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); |
| | | if (!DataTableEntity.isEmpty(allDte)) { |
| | | add(sourceTable, allDte); |
| | | thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); |
| | | } else { |
| | | break; |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | clear(sourceTable); |
| | | } finally { |
| | | if (threadSourceDao != null) { |
| | | threadSourceDao.closeConnection(); |
| | | } |
| | | finalQuery(sourceTable, threadInfo); |
| | | } |
| | | }); |
| | | tempPartMinID = curPartMaxID; |
| | | } |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (sourceDao != null) { |
| | | sourceDao.closeConnection(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * ä»éåä¸è·å |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public DataTableEntity get(String tableName) { |
| | | WriteUtil.append("DA-ä»éåä¸è·å-表åï¼" + tableName); |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | return null; |
| | | } |
| | | return queryQueue.poll(); |
| | | } |
| | | } |
| | | /** |
| | | * ä»éåä¸è·å |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public DataTableEntity get(String tableName) { |
| | | WriteUtil.append("DA-ä»éåä¸è·å-表åï¼" + tableName); |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | return null; |
| | | } |
| | | return queryQueue.poll(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * å¤å®æ¯å¦æ¥è¯¢å®æ¯ |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkQueryFinish(String tableName) { |
| | | WriteUtil.append("DA-å¤å®æ¯å¦æ¥è¯¢å®æ¯"); |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | return set == null || set.isEmpty(); |
| | | } |
| | | /** |
| | | * å¤å®æ¯å¦æ¥è¯¢å®æ¯ |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkQueryFinish(String tableName) { |
| | | WriteUtil.append("DA-å¤å®æ¯å¦æ¥è¯¢å®æ¯"); |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | return set == null || set.isEmpty(); |
| | | } |
| | | |
| | | /** |
| | | * å¤å®æå
¥éåï¼æ¥è¯¢å®æåæ¾å
¥çéåï¼æ¯å¦ä¸ºç©º |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkInsertQueueEmpty(String tableName) { |
| | | WriteUtil.append("DA-å¤å®æå
¥éåï¼æ¥è¯¢å®æåæ¾å
¥çéåï¼æ¯å¦ä¸ºç©º"); |
| | | return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); |
| | | } |
| | | /** |
| | | * å¤å®æå
¥éåï¼æ¥è¯¢å®æåæ¾å
¥çéåï¼æ¯å¦ä¸ºç©º |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkInsertQueueEmpty(String tableName) { |
| | | WriteUtil.append("DA-å¤å®æå
¥éåï¼æ¥è¯¢å®æåæ¾å
¥çéåï¼æ¯å¦ä¸ºç©º"); |
| | | return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); |
| | | } |
| | | |
| | | /** |
| | | * å
³éæ¥è¯¢çº¿ç¨ |
| | | * @param tableName |
| | | */ |
| | | public void shutdownQueryThread(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | ExecutorService executorService = queryThreadMap.get(tableName); |
| | | if (executorService != null) { |
| | | if (!executorService.isShutdown()) { |
| | | executorService.shutdown(); |
| | | } |
| | | queryThreadMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * å
³éæ¥è¯¢çº¿ç¨ |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void shutdownQueryThread(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | ExecutorService executorService = queryThreadMap.get(tableName); |
| | | if (executorService != null) { |
| | | if (!executorService.isShutdown()) { |
| | | executorService.shutdown(); |
| | | } |
| | | queryThreadMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æ¸
ç |
| | | * @param tableName |
| | | */ |
| | | public void clear(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | queryMap.remove(tableName); |
| | | shutdownQueryThread(tableName); |
| | | } |
| | | } |
| | | /** |
| | | * æ¸
ç |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void clear(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | queryMap.remove(tableName); |
| | | shutdownQueryThread(tableName); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æåé误æ¥å¿ï¼åªè½æå䏿¬¡ï¼æååä¼ç´æ¥æ¸
空 |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public String getErrorLog(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | if (errorLogMap == null) { |
| | | return null; |
| | | } else { |
| | | StringBuilder result = errorLogMap.get(tableName); |
| | | errorLogMap.remove(tableName); |
| | | return result == null ? null : result.toString(); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * æåé误æ¥å¿ï¼åªè½æå䏿¬¡ï¼æååä¼ç´æ¥æ¸
空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public String getErrorLog(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | if (errorLogMap == null) { |
| | | return null; |
| | | } else { |
| | | StringBuilder result = errorLogMap.get(tableName); |
| | | errorLogMap.remove(tableName); |
| | | return result == null ? null : result.toString(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * è·ååæ®µçæ°éï¼æå¤16ï¼æå°ä¸º0ï¼ä¸º0表示ç¨ä¸ä¸ææççº¿ç¨ |
| | | * @param totalCount |
| | | * @return |
| | | */ |
| | | private int getPartCount(int totalCount) { |
| | | int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; |
| | | int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); |
| | | if (partCount >= 16) { |
| | | return 16; |
| | | } else if (partCount >= 8) { |
| | | return 8; |
| | | } else if (partCount >= 4) { |
| | | return 4; |
| | | } else if (partCount >= 2) { |
| | | return 2; |
| | | } else if (partCount >= 1) { |
| | | return 1; |
| | | } else { |
| | | return 0; |
| | | } |
| | | } |
| | | /** |
| | | * è·ååæ®µçæ°éï¼æå¤16ï¼æå°ä¸º0ï¼ä¸º0表示ç¨ä¸ä¸ææççº¿ç¨ |
| | | * |
| | | * @param totalCount |
| | | * @return |
| | | */ |
| | | private int getPartCount(int totalCount) { |
| | | int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; |
| | | int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); |
| | | if (partCount >= 16) { |
| | | return 16; |
| | | } else if (partCount >= 8) { |
| | | return 8; |
| | | } else if (partCount >= 4) { |
| | | return 4; |
| | | } else if (partCount >= 2) { |
| | | return 2; |
| | | } else if (partCount >= 1) { |
| | | return 1; |
| | | } else { |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * è·åsqlï¼æ¥è¯¢èå´å
çæå¤§idå¼ |
| | | * @param uniqueField |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param dbType |
| | | * @param pageIndex |
| | | * @param pageSize |
| | | * @return |
| | | */ |
| | | private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { |
| | | int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; |
| | | int finalIndex = startIndex + pageSize; |
| | | StringBuilder sql = new StringBuilder(128); |
| | | if (DataBaseType.MYSQL.getValue() == dbType) { |
| | | sql.append("select max(").append(uniqueField).append(") max_id from ("); |
| | | sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n where ").append(filter); |
| | | } |
| | | sql.append("\n order by ").append(uniqueField); |
| | | sql.append("\n limit ").append(startIndex).append(",").append(pageSize); |
| | | sql.append("\n) t"); |
| | | } else if (DataBaseType.ORACLE.getValue() == dbType) { |
| | | sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n WHERE ").append(filter); |
| | | } |
| | | sql.append("\n ORDER BY ").append(uniqueField); |
| | | sql.append("\n ) T1"); |
| | | sql.append("\n WHERE ROWNUM<=").append(finalIndex); |
| | | sql.append("\n) T2"); |
| | | sql.append("\nWHERE R>").append(startIndex); |
| | | } |
| | | return sql.toString(); |
| | | } |
| | | /** |
| | | * è·åsqlï¼æ¥è¯¢èå´å
çæå¤§idå¼ |
| | | * |
| | | * @param uniqueField |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param dbType |
| | | * @param pageIndex |
| | | * @param pageSize |
| | | * @return |
| | | */ |
| | | private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { |
| | | int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; |
| | | int finalIndex = startIndex + pageSize; |
| | | StringBuilder sql = new StringBuilder(128); |
| | | if (DataBaseType.MYSQL.getValue() == dbType) { |
| | | sql.append("select max(").append(uniqueField).append(") max_id from ("); |
| | | sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n where ").append(filter); |
| | | } |
| | | sql.append("\n order by ").append(uniqueField); |
| | | sql.append("\n limit ").append(startIndex).append(",").append(pageSize); |
| | | sql.append("\n) t"); |
| | | } else if (DataBaseType.ORACLE.getValue() == dbType) { |
| | | sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n WHERE ").append(filter); |
| | | } |
| | | sql.append("\n ORDER BY ").append(uniqueField); |
| | | sql.append("\n ) T1"); |
| | | sql.append("\n WHERE ROWNUM<=").append(finalIndex); |
| | | sql.append("\n) T2"); |
| | | sql.append("\nWHERE R>").append(startIndex); |
| | | } |
| | | return sql.toString(); |
| | | } |
| | | |
| | | /** |
| | | * æ¾å
¥éå |
| | | * @param tableName |
| | | * @param dte |
| | | */ |
| | | private void add(String tableName, DataTableEntity dte) { |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | queryQueue = new LinkedBlockingQueue<>(); |
| | | queryMap.put(tableName, queryQueue); |
| | | } |
| | | queryQueue.add(dte); |
| | | WriteUtil.append("DA-æåæ¾å
¥éå-" + tableName + "-å½åå©ä½éåæ°ï¼" + queryQueue.size()); |
| | | } |
| | | } |
| | | /** |
| | | * æ¾å
¥éå |
| | | * |
| | | * @param tableName |
| | | * @param dte |
| | | */ |
| | | private void add(String tableName, DataTableEntity dte) { |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | queryQueue = new LinkedBlockingQueue<>(); |
| | | queryMap.put(tableName, queryQueue); |
| | | } |
| | | if (tableName.endsWith("BAK20230823")) { |
| | | //æ£æ¥dteä¸çsource_info å pre_master_key æ¯å¦ä¸ºç©º |
| | | for (int i = 0; i < dte.getRows(); i++) { |
| | | String sourceInfo = dte.getString(i, "source_info"); |
| | | String preMasterKey = dte.getString(i, "pre_master_key"); |
| | | if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { |
| | | dte.setFieldValue(i, "source_info", "ch-kt"); |
| | | String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; |
| | | dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); |
| | | } |
| | | } |
| | | } |
| | | queryQueue.add(dte); |
| | | WriteUtil.append("DA-æåæ¾å
¥éå-" + tableName + "-å½åå©ä½éåæ°ï¼" + queryQueue.size()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æ¥è¯¢å¼å§ï¼ååæ´»map䏿·»å 1 |
| | | * @param tableName |
| | | */ |
| | | private void startQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); |
| | | set.add(threadInfo); |
| | | } |
| | | } |
| | | /** |
| | | * æ¥è¯¢å¼å§ï¼ååæ´»map䏿·»å 1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void startQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); |
| | | set.add(threadInfo); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æ¥è¯¢ç»æï¼ååæ´»mapä¸åå°1 |
| | | * @param tableName |
| | | */ |
| | | private void finalQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | if (set == null || !set.contains(threadInfo)) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); |
| | | } |
| | | set.remove(threadInfo); |
| | | if (set.isEmpty()) { |
| | | existsQueryMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * æ¥è¯¢ç»æï¼ååæ´»mapä¸åå°1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void finalQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | if (set == null || !set.contains(threadInfo)) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); |
| | | } |
| | | set.remove(threadInfo); |
| | | if (set.isEmpty()) { |
| | | existsQueryMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * å
许æ§è¡æ¥è¯¢ï¼é¿å
éåä¸çå¾
æå
¥ç太å¤ï¼å¯¼è´å
åæº¢åºï¼ |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | private boolean allowQuery(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; |
| | | } |
| | | } |
| | | /** |
| | | * å
许æ§è¡æ¥è¯¢ï¼é¿å
éåä¸çå¾
æå
¥ç太å¤ï¼å¯¼è´å
åæº¢åºï¼ |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | private boolean allowQuery(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * æå
¥é误æ¥å¿ |
| | | * @param tableName |
| | | * @param error |
| | | */ |
| | | private void appendErrorLog(String tableName, String error) { |
| | | synchronized (tableName.intern()) { |
| | | StringBuilder errorSb = errorLogMap.get(tableName); |
| | | if (errorSb == null) { |
| | | errorSb = new StringBuilder(128); |
| | | errorLogMap.put(tableName, errorSb); |
| | | } |
| | | if (errorSb.length() > 0) { |
| | | errorSb.append("\n"); |
| | | } |
| | | if (errorSb.length() < 2000) { |
| | | errorSb.append(error); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * æå
¥é误æ¥å¿ |
| | | * |
| | | * @param tableName |
| | | * @param error |
| | | */ |
| | | private void appendErrorLog(String tableName, String error) { |
| | | synchronized (tableName.intern()) { |
| | | StringBuilder errorSb = errorLogMap.get(tableName); |
| | | if (errorSb == null) { |
| | | errorSb = new StringBuilder(128); |
| | | errorLogMap.put(tableName, errorSb); |
| | | } |
| | | if (errorSb.length() > 0) { |
| | | errorSb.append("\n"); |
| | | } |
| | | if (errorSb.length() < 2000) { |
| | | errorSb.append(error); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * è·å页æ°ï¼åä¸åæ´ |
| | | * @param count |
| | | * @param size |
| | | * @return |
| | | */ |
| | | private int ceilPage(int count, int size) { |
| | | if (size == 0) { |
| | | if (count == 0) { |
| | | return 0; |
| | | } else { |
| | | throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); |
| | | } |
| | | } |
| | | return count / size + (count % size == 0 ? 0 : 1); |
| | | } |
| | | /** |
| | | * è·å页æ°ï¼åä¸åæ´ |
| | | * |
| | | * @param count |
| | | * @param size |
| | | * @return |
| | | */ |
| | | private int ceilPage(int count, int size) { |
| | | if (size == 0) { |
| | | if (count == 0) { |
| | | return 0; |
| | | } else { |
| | | throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); |
| | | } |
| | | } |
| | | return count / size + (count % size == 0 ? 0 : 1); |
| | | } |
| | | } |