| | |
| | | */ |
| | | @Service |
| | | public class DataArchivingQueue extends AbstractBaseService { |
| | | // 查询队列map:map<表名,dte队列> |
| | | private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>(); |
| | | // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成 |
| | | private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap(); |
| | | // 查询线程map |
| | | private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap(); |
| | | // 错误日志map |
| | | private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap(); |
| | | // 单表查询最大线程数 |
| | | private static final int QUERY_THREAD_COUNT = 3; |
| | | // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数) |
| | | private static final int QUERY_MAX_BATCH_COUNT = 4; |
| | | // 查询每页大小 |
| | | private static final int QUERY_PAGE_SIZE = 50000; |
| | | // 插入每页大小 |
| | | public static final int INSERT_PAGE_SIZE = 5000; |
| | | // 查询队列map:map<表名,dte队列> |
| | | private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>(); |
| | | // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成 |
| | | private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap(); |
| | | // 查询线程map |
| | | private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap(); |
| | | // 错误日志map |
| | | private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap(); |
| | | // 单表查询最大线程数 |
| | | private static final int QUERY_THREAD_COUNT = 3; |
| | | // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数) |
| | | private static final int QUERY_MAX_BATCH_COUNT = 4; |
| | | // 查询每页大小 |
| | | private static final int QUERY_PAGE_SIZE = 50000; |
| | | // 插入每页大小 |
| | | public static final int INSERT_PAGE_SIZE = 5000; |
| | | |
| | | /** |
| | | * 查询 |
| | | * |
| | | * @param sourceDbe |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param params |
| | | * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据 |
| | | * @param minID |
| | | */ |
| | | public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { |
| | | Dao sourceDao = null; |
| | | try { |
| | | StringBuilder countSql = new StringBuilder(128); |
| | | countSql.append("select count(*) count_value from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | countSql.append(" where ").append(filter); |
| | | } |
| | | sourceDao = sourceDbe.newDao(); |
| | | FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); |
| | | int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); |
| | | int partCount = getPartCount(totalCount); |
| | | shutdownQueryThread(sourceTable); |
| | | ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); |
| | | queryThreadMap.put(sourceTable, executorService); |
| | | StringBuilder rangeSql = new StringBuilder(128); |
| | | String tempPartMinID = minID; |
| | | int partSize = partCount * QUERY_PAGE_SIZE; |
| | | int count = ceilPage(totalCount, partSize); |
| | | for (int i = 1; i <= count; i++) { |
| | | rangeSql.setLength(0); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | rangeSql.append(filter).append(" and "); |
| | | } |
| | | rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); |
| | | DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); |
| | | if (DataTableEntity.isEmpty(rangeDte)) { |
| | | continue; |
| | | } |
| | | FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); |
| | | String curPartMaxID = rangeFse.getString("max_id"); |
| | | String curPartMinID = tempPartMinID; |
| | | executorService.submit(() -> { |
| | | String threadInfo = String.valueOf(Thread.currentThread().getId()); |
| | | Dao threadSourceDao = null; |
| | | String thisPartMinID = curPartMinID; |
| | | try { |
| | | threadSourceDao = sourceDbe.newDao(); |
| | | startQuery(sourceTable, threadInfo); |
| | | int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); |
| | | StringBuilder tempFilter = new StringBuilder(128); |
| | | for (int j = 0; j < totalPage; j++) { |
| | | while (!allowQuery(sourceTable)) { |
| | | Thread.sleep(RandomUtil.randomInt(800, 1200)); |
| | | } |
| | | tempFilter.setLength(0); |
| | | tempFilter.append(uniqueField); |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); |
| | | if (minID.equals(thisPartMinID)) { |
| | | tempFilter.append(">="); |
| | | } else { |
| | | tempFilter.append(">"); |
| | | } |
| | | tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | tempFilter.append(" and ").append(filter); |
| | | } |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter); |
| | | DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); |
| | | if (!DataTableEntity.isEmpty(allDte)) { |
| | | add(sourceTable, allDte); |
| | | thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); |
| | | } else { |
| | | break; |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | clear(sourceTable); |
| | | } finally { |
| | | if (threadSourceDao != null) { |
| | | threadSourceDao.closeConnection(); |
| | | } |
| | | finalQuery(sourceTable, threadInfo); |
| | | } |
| | | }); |
| | | tempPartMinID = curPartMaxID; |
| | | } |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (sourceDao != null) { |
| | | sourceDao.closeConnection(); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * 查询 |
| | | * |
| | | * @param sourceDbe |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param params |
| | | * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据 |
| | | * @param minID |
| | | */ |
| | | public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { |
| | | Dao sourceDao = null; |
| | | try { |
| | | StringBuilder countSql = new StringBuilder(128); |
| | | countSql.append("select count(*) count_value from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | countSql.append(" where ").append(filter); |
| | | } |
| | | sourceDao = sourceDbe.newDao(); |
| | | FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); |
| | | int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); |
| | | int partCount = getPartCount(totalCount); |
| | | shutdownQueryThread(sourceTable); |
| | | ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); |
| | | queryThreadMap.put(sourceTable, executorService); |
| | | StringBuilder rangeSql = new StringBuilder(128); |
| | | String tempPartMinID = minID; |
| | | int partSize = partCount * QUERY_PAGE_SIZE; |
| | | int count = ceilPage(totalCount, partSize); |
| | | for (int i = 1; i <= count; i++) { |
| | | rangeSql.setLength(0); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | rangeSql.append(filter).append(" and "); |
| | | } |
| | | rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); |
| | | DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); |
| | | if (DataTableEntity.isEmpty(rangeDte)) { |
| | | continue; |
| | | } |
| | | FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); |
| | | String curPartMaxID = rangeFse.getString("max_id"); |
| | | String curPartMinID = tempPartMinID; |
| | | executorService.submit(() -> { |
| | | String threadInfo = String.valueOf(Thread.currentThread().getId()); |
| | | Dao threadSourceDao = null; |
| | | String thisPartMinID = curPartMinID; |
| | | try { |
| | | threadSourceDao = sourceDbe.newDao(); |
| | | startQuery(sourceTable, threadInfo); |
| | | int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); |
| | | StringBuilder tempFilter = new StringBuilder(128); |
| | | for (int j = 0; j < totalPage; j++) { |
| | | while (!allowQuery(sourceTable)) { |
| | | Thread.sleep(RandomUtil.randomInt(800, 1200)); |
| | | } |
| | | tempFilter.setLength(0); |
| | | tempFilter.append(uniqueField); |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); |
| | | if (minID.equals(thisPartMinID)) { |
| | | tempFilter.append(">="); |
| | | } else { |
| | | tempFilter.append(">"); |
| | | } |
| | | tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | tempFilter.append(" and ").append(filter); |
| | | } |
| | | WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter); |
| | | DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); |
| | | if (!DataTableEntity.isEmpty(allDte)) { |
| | | add(sourceTable, allDte); |
| | | thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); |
| | | } else { |
| | | break; |
| | | } |
| | | } |
| | | } catch (Exception e) { |
| | | appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); |
| | | SpringMVCContextHolder.getSystemLogger().error(e); |
| | | clear(sourceTable); |
| | | } finally { |
| | | if (threadSourceDao != null) { |
| | | threadSourceDao.closeConnection(); |
| | | } |
| | | finalQuery(sourceTable, threadInfo); |
| | | } |
| | | }); |
| | | tempPartMinID = curPartMaxID; |
| | | } |
| | | } catch (Exception e) { |
| | | throw e; |
| | | } finally { |
| | | if (sourceDao != null) { |
| | | sourceDao.closeConnection(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 从队列中获取 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public DataTableEntity get(String tableName) { |
| | | WriteUtil.append("DA-从队列中获取-表名:" + tableName); |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | return null; |
| | | } |
| | | return queryQueue.poll(); |
| | | } |
| | | } |
| | | /** |
| | | * 从队列中获取 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public DataTableEntity get(String tableName) { |
| | | WriteUtil.append("DA-从队列中获取-表名:" + tableName); |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | return null; |
| | | } |
| | | return queryQueue.poll(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 判定是否查询完毕 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkQueryFinish(String tableName) { |
| | | WriteUtil.append("DA-判定是否查询完毕"); |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | return set == null || set.isEmpty(); |
| | | } |
| | | /** |
| | | * 判定是否查询完毕 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkQueryFinish(String tableName) { |
| | | WriteUtil.append("DA-判定是否查询完毕"); |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | return set == null || set.isEmpty(); |
| | | } |
| | | |
| | | /** |
| | | * 判定插入队列(查询完成后放入的队列)是否为空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkInsertQueueEmpty(String tableName) { |
| | | WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空"); |
| | | return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); |
| | | } |
| | | /** |
| | | * 判定插入队列(查询完成后放入的队列)是否为空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public boolean checkInsertQueueEmpty(String tableName) { |
| | | WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空"); |
| | | return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); |
| | | } |
| | | |
| | | /** |
| | | * 关闭查询线程 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void shutdownQueryThread(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | ExecutorService executorService = queryThreadMap.get(tableName); |
| | | if (executorService != null) { |
| | | if (!executorService.isShutdown()) { |
| | | executorService.shutdown(); |
| | | } |
| | | queryThreadMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * 关闭查询线程 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void shutdownQueryThread(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | ExecutorService executorService = queryThreadMap.get(tableName); |
| | | if (executorService != null) { |
| | | if (!executorService.isShutdown()) { |
| | | executorService.shutdown(); |
| | | } |
| | | queryThreadMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 清理 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void clear(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | queryMap.remove(tableName); |
| | | shutdownQueryThread(tableName); |
| | | } |
| | | } |
| | | /** |
| | | * 清理 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | public void clear(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | queryMap.remove(tableName); |
| | | shutdownQueryThread(tableName); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 提取错误日志,只能提取一次,提取后会直接清空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public String getErrorLog(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | if (errorLogMap == null) { |
| | | return null; |
| | | } else { |
| | | StringBuilder result = errorLogMap.get(tableName); |
| | | errorLogMap.remove(tableName); |
| | | return result == null ? null : result.toString(); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * 提取错误日志,只能提取一次,提取后会直接清空 |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | public String getErrorLog(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | if (errorLogMap == null) { |
| | | return null; |
| | | } else { |
| | | StringBuilder result = errorLogMap.get(tableName); |
| | | errorLogMap.remove(tableName); |
| | | return result == null ? null : result.toString(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程 |
| | | * |
| | | * @param totalCount |
| | | * @return |
| | | */ |
| | | private int getPartCount(int totalCount) { |
| | | int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; |
| | | int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); |
| | | if (partCount >= 16) { |
| | | return 16; |
| | | } else if (partCount >= 8) { |
| | | return 8; |
| | | } else if (partCount >= 4) { |
| | | return 4; |
| | | } else if (partCount >= 2) { |
| | | return 2; |
| | | } else if (partCount >= 1) { |
| | | return 1; |
| | | } else { |
| | | return 0; |
| | | } |
| | | } |
| | | /** |
| | | * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程 |
| | | * |
| | | * @param totalCount |
| | | * @return |
| | | */ |
| | | private int getPartCount(int totalCount) { |
| | | int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; |
| | | int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); |
| | | if (partCount >= 16) { |
| | | return 16; |
| | | } else if (partCount >= 8) { |
| | | return 8; |
| | | } else if (partCount >= 4) { |
| | | return 4; |
| | | } else if (partCount >= 2) { |
| | | return 2; |
| | | } else if (partCount >= 1) { |
| | | return 1; |
| | | } else { |
| | | return 0; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取sql:查询范围内的最大id值 |
| | | * |
| | | * @param uniqueField |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param dbType |
| | | * @param pageIndex |
| | | * @param pageSize |
| | | * @return |
| | | */ |
| | | private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { |
| | | int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; |
| | | int finalIndex = startIndex + pageSize; |
| | | StringBuilder sql = new StringBuilder(128); |
| | | if (DataBaseType.MYSQL.getValue() == dbType) { |
| | | sql.append("select max(").append(uniqueField).append(") max_id from ("); |
| | | sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n where ").append(filter); |
| | | } |
| | | sql.append("\n order by ").append(uniqueField); |
| | | sql.append("\n limit ").append(startIndex).append(",").append(pageSize); |
| | | sql.append("\n) t"); |
| | | } else if (DataBaseType.ORACLE.getValue() == dbType) { |
| | | sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n WHERE ").append(filter); |
| | | } |
| | | sql.append("\n ORDER BY ").append(uniqueField); |
| | | sql.append("\n ) T1"); |
| | | sql.append("\n WHERE ROWNUM<=").append(finalIndex); |
| | | sql.append("\n) T2"); |
| | | sql.append("\nWHERE R>").append(startIndex); |
| | | } |
| | | return sql.toString(); |
| | | } |
| | | /** |
| | | * 获取sql:查询范围内的最大id值 |
| | | * |
| | | * @param uniqueField |
| | | * @param sourceTable |
| | | * @param filter |
| | | * @param dbType |
| | | * @param pageIndex |
| | | * @param pageSize |
| | | * @return |
| | | */ |
| | | private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { |
| | | int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; |
| | | int finalIndex = startIndex + pageSize; |
| | | StringBuilder sql = new StringBuilder(128); |
| | | if (DataBaseType.MYSQL.getValue() == dbType) { |
| | | sql.append("select max(").append(uniqueField).append(") max_id from ("); |
| | | sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n where ").append(filter); |
| | | } |
| | | sql.append("\n order by ").append(uniqueField); |
| | | sql.append("\n limit ").append(startIndex).append(",").append(pageSize); |
| | | sql.append("\n) t"); |
| | | } else if (DataBaseType.ORACLE.getValue() == dbType) { |
| | | sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); |
| | | sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | sql.append("\n WHERE ").append(filter); |
| | | } |
| | | sql.append("\n ORDER BY ").append(uniqueField); |
| | | sql.append("\n ) T1"); |
| | | sql.append("\n WHERE ROWNUM<=").append(finalIndex); |
| | | sql.append("\n) T2"); |
| | | sql.append("\nWHERE R>").append(startIndex); |
| | | } |
| | | return sql.toString(); |
| | | } |
| | | |
| | | /** |
| | | * 放入队列 |
| | | * |
| | | * @param tableName |
| | | * @param dte |
| | | */ |
| | | private void add(String tableName, DataTableEntity dte) { |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | queryQueue = new LinkedBlockingQueue<>(); |
| | | queryMap.put(tableName, queryQueue); |
| | | } |
| | | if (tableName.endsWith("BAK20230823")) { |
| | | //检查dte中的source_info 和 pre_master_key 是否为空 |
| | | for (int i = 0; i < dte.getRows(); i++) { |
| | | String sourceInfo = dte.getString(i, "source_info"); |
| | | String preMasterKey = dte.getString(i, "pre_master_key"); |
| | | if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { |
| | | dte.setFieldValue(i, "source_info", "ch-kt"); |
| | | String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; |
| | | dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); |
| | | } |
| | | } |
| | | } |
| | | queryQueue.add(dte); |
| | | WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size()); |
| | | } |
| | | } |
| | | /** |
| | | * 放入队列 |
| | | * |
| | | * @param tableName |
| | | * @param dte |
| | | */ |
| | | private void add(String tableName, DataTableEntity dte) { |
| | | synchronized (tableName.intern()) { |
| | | LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName); |
| | | if (queryQueue == null) { |
| | | queryQueue = new LinkedBlockingQueue<>(); |
| | | queryMap.put(tableName, queryQueue); |
| | | } |
| | | if (tableName.endsWith("BAK20230823")) { |
| | | //检查dte中的source_info 和 pre_master_key 是否为空 |
| | | for (int i = 0; i < dte.getRows(); i++) { |
| | | String sourceInfo = dte.getString(i, "source_info"); |
| | | String preMasterKey = dte.getString(i, "pre_master_key"); |
| | | if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { |
| | | dte.setFieldValue(i, "source_info", "ch-kt"); |
| | | String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; |
| | | dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 查询开始,向存活map中添加1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void startQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); |
| | | set.add(threadInfo); |
| | | } |
| | | } |
| | | queryQueue.add(dte); |
| | | WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size()); |
| | | while (queryQueue.size() >= 10) { |
| | | SpringMVCContextHolder.getSystemLogger().error("DA-队列已满-" + tableName + "-当前剩余队列数:" + queryQueue.size()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 查询结束,向存活map中减少1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void finalQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | if (set == null || !set.contains(threadInfo)) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); |
| | | } |
| | | set.remove(threadInfo); |
| | | if (set.isEmpty()) { |
| | | existsQueryMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * 查询开始,向存活map中添加1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void startQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); |
| | | set.add(threadInfo); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 允许执行查询(避免队列中等待插入的太多,导致内存溢出) |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | private boolean allowQuery(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; |
| | | } |
| | | } |
| | | /** |
| | | * 查询结束,向存活map中减少1 |
| | | * |
| | | * @param tableName |
| | | */ |
| | | private void finalQuery(String tableName, String threadInfo) { |
| | | synchronized (tableName.intern()) { |
| | | Set<String> set = existsQueryMap.get(tableName); |
| | | if (set == null || !set.contains(threadInfo)) { |
| | | throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); |
| | | } |
| | | set.remove(threadInfo); |
| | | if (set.isEmpty()) { |
| | | existsQueryMap.remove(tableName); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 插入错误日志 |
| | | * |
| | | * @param tableName |
| | | * @param error |
| | | */ |
| | | private void appendErrorLog(String tableName, String error) { |
| | | synchronized (tableName.intern()) { |
| | | StringBuilder errorSb = errorLogMap.get(tableName); |
| | | if (errorSb == null) { |
| | | errorSb = new StringBuilder(128); |
| | | errorLogMap.put(tableName, errorSb); |
| | | } |
| | | if (errorSb.length() > 0) { |
| | | errorSb.append("\n"); |
| | | } |
| | | if (errorSb.length() < 2000) { |
| | | errorSb.append(error); |
| | | } |
| | | } |
| | | } |
| | | /** |
| | | * 允许执行查询(避免队列中等待插入的太多,导致内存溢出) |
| | | * |
| | | * @param tableName |
| | | * @return |
| | | */ |
| | | private boolean allowQuery(String tableName) { |
| | | synchronized (tableName.intern()) { |
| | | return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取页数,向上取整 |
| | | * |
| | | * @param count |
| | | * @param size |
| | | * @return |
| | | */ |
| | | private int ceilPage(int count, int size) { |
| | | if (size == 0) { |
| | | if (count == 0) { |
| | | return 0; |
| | | } else { |
| | | throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); |
| | | } |
| | | } |
| | | return count / size + (count % size == 0 ? 0 : 1); |
| | | } |
| | | /** |
| | | * 插入错误日志 |
| | | * |
| | | * @param tableName |
| | | * @param error |
| | | */ |
| | | private void appendErrorLog(String tableName, String error) { |
| | | synchronized (tableName.intern()) { |
| | | StringBuilder errorSb = errorLogMap.get(tableName); |
| | | if (errorSb == null) { |
| | | errorSb = new StringBuilder(128); |
| | | errorLogMap.put(tableName, errorSb); |
| | | } |
| | | if (errorSb.length() > 0) { |
| | | errorSb.append("\n"); |
| | | } |
| | | if (errorSb.length() < 2000) { |
| | | errorSb.append(error); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 获取页数,向上取整 |
| | | * |
| | | * @param count |
| | | * @param size |
| | | * @return |
| | | */ |
| | | private int ceilPage(int count, int size) { |
| | | if (size == 0) { |
| | | if (count == 0) { |
| | | return 0; |
| | | } else { |
| | | throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); |
| | | } |
| | | } |
| | | return count / size + (count % size == 0 ? 0 : 1); |
| | | } |
| | | } |