package com.product.data.center.service; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.RandomUtil; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.product.common.utils.spring.SpringUtils; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.ErrorCode; import com.product.data.center.utils.WriteUtil; import com.product.datasource.config.DataBaseType; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; /** * Copyright © 6c * * @Date 2022年11月27日 8:55 * @Author 6c * @Description */ @Service public class DataArchivingQueue extends AbstractBaseService { // 查询队列map:map<表名,dte队列> private static Map> queryMap = new ConcurrentHashMap<>(); // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成 private static Map> existsQueryMap = Maps.newHashMap(); // 查询线程map private static Map queryThreadMap = Maps.newHashMap(); // 错误日志map private static Map errorLogMap = Maps.newHashMap(); // 单表查询最大线程数 private static final int QUERY_THREAD_COUNT = 3; // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数) private static final int QUERY_MAX_BATCH_COUNT = 4; // 查询每页大小 private static final int QUERY_PAGE_SIZE = 50000; // 插入每页大小 public static final int INSERT_PAGE_SIZE = 5000; /** * 查询 * * @param sourceDbe * @param sourceTable * @param filter * @param params * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据 * @param minID */ public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) { Dao sourceDao = null; try { StringBuilder countSql = new StringBuilder(128); countSql.append("select count(*) count_value from ").append(sourceTable); if (!StringUtils.isEmpty(filter)) { countSql.append(" where ").append(filter); } sourceDao = sourceDbe.newDao(); FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params); int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value"); int partCount = getPartCount(totalCount); shutdownQueryThread(sourceTable); ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT); queryThreadMap.put(sourceTable, executorService); StringBuilder rangeSql = new StringBuilder(128); String tempPartMinID = minID; int partSize = partCount * QUERY_PAGE_SIZE; int count = ceilPage(totalCount, partSize); for (int i = 1; i <= count; i++) { rangeSql.setLength(0); if (!StringUtils.isEmpty(filter)) { rangeSql.append(filter).append(" and "); } rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'"); DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params); if (DataTableEntity.isEmpty(rangeDte)) { continue; } FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0); String curPartMaxID = rangeFse.getString("max_id"); String curPartMinID = tempPartMinID; executorService.submit(() -> { String threadInfo = String.valueOf(Thread.currentThread().getId()); Dao threadSourceDao = null; String thisPartMinID = curPartMinID; try { threadSourceDao = sourceDbe.newDao(); startQuery(sourceTable, threadInfo); int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE); StringBuilder tempFilter = new StringBuilder(128); for (int j = 0; j < totalPage; j++) { //查询时检查队列数量,如果超过最大值则等待 checkQueueCount(sourceTable); while (!allowQuery(sourceTable)) { Thread.sleep(RandomUtil.randomInt(800, 1200)); } tempFilter.setLength(0); tempFilter.append(uniqueField); WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID); if (minID.equals(thisPartMinID)) { tempFilter.append(">="); } else { tempFilter.append(">"); } tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'"); if (!StringUtils.isEmpty(filter)) { tempFilter.append(" and ").append(filter); } WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter); DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE); if (!DataTableEntity.isEmpty(allDte)) { add(sourceTable, allDte); thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField); } else { break; } } } catch (Exception e) { appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim()); SpringMVCContextHolder.getSystemLogger().error(e); clear(sourceTable); } finally { if (threadSourceDao != null) { threadSourceDao.closeConnection(); } finalQuery(sourceTable, threadInfo); } }); tempPartMinID = curPartMaxID; } } catch (Exception e) { throw e; } finally { if (sourceDao != null) { sourceDao.closeConnection(); } } } /** * 从队列中获取 * * @param tableName * @return */ public DataTableEntity get(String tableName) { WriteUtil.append("DA-从队列中获取-表名:" + tableName); synchronized (tableName.intern()) { LinkedBlockingQueue queryQueue = queryMap.get(tableName); if (queryQueue == null) { return null; } return queryQueue.poll(); } } /** * 判定是否查询完毕 * * @param tableName * @return */ public boolean checkQueryFinish(String tableName) { WriteUtil.append("DA-判定是否查询完毕"); Set set = existsQueryMap.get(tableName); return set == null || set.isEmpty(); } /** * 判定插入队列(查询完成后放入的队列)是否为空 * * @param tableName * @return */ public boolean checkInsertQueueEmpty(String tableName) { WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空"); return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty(); } /** * 关闭查询线程 * * @param tableName */ public void shutdownQueryThread(String tableName) { synchronized (tableName.intern()) { ExecutorService executorService = queryThreadMap.get(tableName); if (executorService != null) { if (!executorService.isShutdown()) { executorService.shutdownNow(); } queryThreadMap.remove(tableName); } } } /** * 清理 * * @param tableName */ public void clear(String tableName) { synchronized (tableName.intern()) { queryMap.remove(tableName); shutdownQueryThread(tableName); } } /** * 提取错误日志,只能提取一次,提取后会直接清空 * * @param tableName * @return */ public String getErrorLog(String tableName) { synchronized (tableName.intern()) { if (errorLogMap == null) { return null; } else { StringBuilder result = errorLogMap.get(tableName); errorLogMap.remove(tableName); return result == null ? null : result.toString(); } } } /** * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程 * * @param totalCount * @return */ private int getPartCount(int totalCount) { int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE; int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1); if (partCount >= 16) { return 16; } else if (partCount >= 8) { return 8; } else if (partCount >= 4) { return 4; } else if (partCount >= 2) { return 2; } else if (partCount >= 1) { return 1; } else { return 0; } } /** * 获取sql:查询范围内的最大id值 * * @param uniqueField * @param sourceTable * @param filter * @param dbType * @param pageIndex * @param pageSize * @return */ private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) { int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize; int finalIndex = startIndex + pageSize; StringBuilder sql = new StringBuilder(128); if (DataBaseType.MYSQL.getValue() == dbType) { sql.append("select max(").append(uniqueField).append(") max_id from ("); sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable); if (!StringUtils.isEmpty(filter)) { sql.append("\n where ").append(filter); } sql.append("\n order by ").append(uniqueField); sql.append("\n limit ").append(startIndex).append(",").append(pageSize); sql.append("\n) t"); } else if (DataBaseType.ORACLE.getValue() == dbType) { sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM ("); sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM ("); sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable); if (!StringUtils.isEmpty(filter)) { sql.append("\n WHERE ").append(filter); } sql.append("\n ORDER BY ").append(uniqueField); sql.append("\n ) T1"); sql.append("\n WHERE ROWNUM<=").append(finalIndex); sql.append("\n) T2"); sql.append("\nWHERE R>").append(startIndex); } return sql.toString(); } /** * 检查队列数量 */ private void checkQueueCount(String tableName) { //初始时间 long startTime = System.currentTimeMillis(); while (queryMap.get(tableName) != null && queryMap.get(tableName).size() >= 10) { //与初始时间比较,如果超过10分钟则打印 if (System.currentTimeMillis() - startTime > 600000) { WriteUtil.append("DA-队列数量超过10,当前数量:" + queryMap.get(tableName).size()); startTime = System.currentTimeMillis(); } ThreadUtil.sleep(10000); } } /** * 放入队列 * * @param tableName * @param dte */ private void add(String tableName, DataTableEntity dte) { synchronized (tableName.intern()) { LinkedBlockingQueue queryQueue = queryMap.get(tableName); if (queryQueue == null) { queryQueue = new LinkedBlockingQueue<>(); queryMap.put(tableName, queryQueue); } if (tableName.endsWith("BAK20230823")) { //检查dte中的source_info 和 pre_master_key 是否为空 for (int i = 0; i < dte.getRows(); i++) { String sourceInfo = dte.getString(i, "source_info"); String preMasterKey = dte.getString(i, "pre_master_key"); if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) { dte.setFieldValue(i, "source_info", "ch-kt"); String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id"; dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName)); } } } queryQueue.add(dte); WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size()); } } /** * 查询开始,向存活map中添加1 * * @param tableName */ private void startQuery(String tableName, String threadInfo) { synchronized (tableName.intern()) { Set set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet()); set.add(threadInfo); } } /** * 查询结束,向存活map中减少1 * * @param tableName */ private void finalQuery(String tableName, String threadInfo) { synchronized (tableName.intern()) { Set set = existsQueryMap.get(tableName); if (set == null || !set.contains(threadInfo)) { throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName); } set.remove(threadInfo); if (set.isEmpty()) { existsQueryMap.remove(tableName); } } } /** * 允许执行查询(避免队列中等待插入的太多,导致内存溢出) * * @param tableName * @return */ private boolean allowQuery(String tableName) { synchronized (tableName.intern()) { return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT; } } /** * 插入错误日志 * * @param tableName * @param error */ private void appendErrorLog(String tableName, String error) { synchronized (tableName.intern()) { StringBuilder errorSb = errorLogMap.get(tableName); if (errorSb == null) { errorSb = new StringBuilder(128); errorLogMap.put(tableName, errorSb); } if (errorSb.length() > 0) { errorSb.append("\n"); } if (errorSb.length() < 2000) { errorSb.append(error); } } } /** * 获取页数,向上取整 * * @param count * @param size * @return */ private int ceilPage(int count, int size) { if (size == 0) { if (count == 0) { return 0; } else { throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR); } } return count / size + (count % size == 0 ? 0 : 1); } }