cheng
2024-01-28 31016f01ec27432295e77d1720b19cd5fd37ce72
归档限制每个表的队列数量,oracle 更改创建连接池
已修改2个文件
793 ■■■■ 文件已修改
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java 728 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-datasource/src/main/java/com/product/datasource/connection/ConnectionManager.java 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -34,381 +34,385 @@
 */
@Service
public class DataArchivingQueue extends AbstractBaseService {
    // 查询队列map:map<表名,dte队列>
    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
    // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
    // 查询线程map
    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
    // 错误日志map
    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
    // 单表查询最大线程数
    private static final int QUERY_THREAD_COUNT = 3;
    // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数)
    private static final int QUERY_MAX_BATCH_COUNT = 4;
    // 查询每页大小
    private static final int QUERY_PAGE_SIZE = 50000;
    // 插入每页大小
    public static final int INSERT_PAGE_SIZE = 5000;
    // 查询队列map:map<表名,dte队列>
    private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
    // 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
    private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
    // 查询线程map
    private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
    // 错误日志map
    private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
    // 单表查询最大线程数
    private static final int QUERY_THREAD_COUNT = 3;
    // 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数)
    private static final int QUERY_MAX_BATCH_COUNT = 4;
    // 查询每页大小
    private static final int QUERY_PAGE_SIZE = 50000;
    // 插入每页大小
    public static final int INSERT_PAGE_SIZE = 5000;
    /**
     * 查询
     *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
     * @param params
     * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param minID
     */
    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
        Dao sourceDao = null;
        try {
            StringBuilder countSql = new StringBuilder(128);
            countSql.append("select count(*) count_value from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                countSql.append(" where ").append(filter);
            }
            sourceDao = sourceDbe.newDao();
            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
            int partCount = getPartCount(totalCount);
            shutdownQueryThread(sourceTable);
            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
            queryThreadMap.put(sourceTable, executorService);
            StringBuilder rangeSql = new StringBuilder(128);
            String tempPartMinID = minID;
            int partSize = partCount * QUERY_PAGE_SIZE;
            int count = ceilPage(totalCount, partSize);
            for (int i = 1; i <= count; i++) {
                rangeSql.setLength(0);
                if (!StringUtils.isEmpty(filter)) {
                    rangeSql.append(filter).append(" and ");
                }
                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
                if (DataTableEntity.isEmpty(rangeDte)) {
                    continue;
                }
                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
                String curPartMaxID = rangeFse.getString("max_id");
                String curPartMinID = tempPartMinID;
                executorService.submit(() -> {
                    String threadInfo = String.valueOf(Thread.currentThread().getId());
                    Dao threadSourceDao = null;
                    String thisPartMinID = curPartMinID;
                    try {
                        threadSourceDao = sourceDbe.newDao();
                        startQuery(sourceTable, threadInfo);
                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                        StringBuilder tempFilter = new StringBuilder(128);
                        for (int j = 0; j < totalPage; j++) {
                            while (!allowQuery(sourceTable)) {
                                Thread.sleep(RandomUtil.randomInt(800, 1200));
                            }
                            tempFilter.setLength(0);
                            tempFilter.append(uniqueField);
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
                            if (minID.equals(thisPartMinID)) {
                                tempFilter.append(">=");
                            } else {
                                tempFilter.append(">");
                            }
                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
                            if (!StringUtils.isEmpty(filter)) {
                                tempFilter.append(" and ").append(filter);
                            }
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
                            if (!DataTableEntity.isEmpty(allDte)) {
                                add(sourceTable, allDte);
                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
                            } else {
                                break;
                            }
                        }
                    } catch (Exception e) {
                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        clear(sourceTable);
                    } finally {
                        if (threadSourceDao != null) {
                            threadSourceDao.closeConnection();
                        }
                        finalQuery(sourceTable, threadInfo);
                    }
                });
                tempPartMinID = curPartMaxID;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (sourceDao != null) {
                sourceDao.closeConnection();
            }
        }
    }
    /**
     * 查询
     *
     * @param sourceDbe
     * @param sourceTable
     * @param filter
     * @param params
     * @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
     * @param minID
     */
    public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
        Dao sourceDao = null;
        try {
            StringBuilder countSql = new StringBuilder(128);
            countSql.append("select count(*) count_value from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                countSql.append(" where ").append(filter);
            }
            sourceDao = sourceDbe.newDao();
            FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
            int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
            int partCount = getPartCount(totalCount);
            shutdownQueryThread(sourceTable);
            ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
            queryThreadMap.put(sourceTable, executorService);
            StringBuilder rangeSql = new StringBuilder(128);
            String tempPartMinID = minID;
            int partSize = partCount * QUERY_PAGE_SIZE;
            int count = ceilPage(totalCount, partSize);
            for (int i = 1; i <= count; i++) {
                rangeSql.setLength(0);
                if (!StringUtils.isEmpty(filter)) {
                    rangeSql.append(filter).append(" and ");
                }
                rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
                DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
                if (DataTableEntity.isEmpty(rangeDte)) {
                    continue;
                }
                FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
                String curPartMaxID = rangeFse.getString("max_id");
                String curPartMinID = tempPartMinID;
                executorService.submit(() -> {
                    String threadInfo = String.valueOf(Thread.currentThread().getId());
                    Dao threadSourceDao = null;
                    String thisPartMinID = curPartMinID;
                    try {
                        threadSourceDao = sourceDbe.newDao();
                        startQuery(sourceTable, threadInfo);
                        int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
                        StringBuilder tempFilter = new StringBuilder(128);
                        for (int j = 0; j < totalPage; j++) {
                            while (!allowQuery(sourceTable)) {
                                Thread.sleep(RandomUtil.randomInt(800, 1200));
                            }
                            tempFilter.setLength(0);
                            tempFilter.append(uniqueField);
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
                            if (minID.equals(thisPartMinID)) {
                                tempFilter.append(">=");
                            } else {
                                tempFilter.append(">");
                            }
                            tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
                            if (!StringUtils.isEmpty(filter)) {
                                tempFilter.append(" and ").append(filter);
                            }
                            WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
                            DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
                            if (!DataTableEntity.isEmpty(allDte)) {
                                add(sourceTable, allDte);
                                thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
                            } else {
                                break;
                            }
                        }
                    } catch (Exception e) {
                        appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
                        SpringMVCContextHolder.getSystemLogger().error(e);
                        clear(sourceTable);
                    } finally {
                        if (threadSourceDao != null) {
                            threadSourceDao.closeConnection();
                        }
                        finalQuery(sourceTable, threadInfo);
                    }
                });
                tempPartMinID = curPartMaxID;
            }
        } catch (Exception e) {
            throw e;
        } finally {
            if (sourceDao != null) {
                sourceDao.closeConnection();
            }
        }
    }
    /**
     * 从队列中获取
     *
     * @param tableName
     * @return
     */
    public DataTableEntity get(String tableName) {
        WriteUtil.append("DA-从队列中获取-表名:" + tableName);
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                return null;
            }
            return queryQueue.poll();
        }
    }
    /**
     * 从队列中获取
     *
     * @param tableName
     * @return
     */
    public DataTableEntity get(String tableName) {
        WriteUtil.append("DA-从队列中获取-表名:" + tableName);
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                return null;
            }
            return queryQueue.poll();
        }
    }
    /**
     * 判定是否查询完毕
     *
     * @param tableName
     * @return
     */
    public boolean checkQueryFinish(String tableName) {
        WriteUtil.append("DA-判定是否查询完毕");
        Set<String> set = existsQueryMap.get(tableName);
        return set == null || set.isEmpty();
    }
    /**
     * 判定是否查询完毕
     *
     * @param tableName
     * @return
     */
    public boolean checkQueryFinish(String tableName) {
        WriteUtil.append("DA-判定是否查询完毕");
        Set<String> set = existsQueryMap.get(tableName);
        return set == null || set.isEmpty();
    }
    /**
     * 判定插入队列(查询完成后放入的队列)是否为空
     *
     * @param tableName
     * @return
     */
    public boolean checkInsertQueueEmpty(String tableName) {
        WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
    }
    /**
     * 判定插入队列(查询完成后放入的队列)是否为空
     *
     * @param tableName
     * @return
     */
    public boolean checkInsertQueueEmpty(String tableName) {
        WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
        return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
    }
    /**
     * 关闭查询线程
     *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
        synchronized (tableName.intern()) {
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
                queryThreadMap.remove(tableName);
            }
        }
    }
    /**
     * 关闭查询线程
     *
     * @param tableName
     */
    public void shutdownQueryThread(String tableName) {
        synchronized (tableName.intern()) {
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                }
                queryThreadMap.remove(tableName);
            }
        }
    }
    /**
     * 清理
     *
     * @param tableName
     */
    public void clear(String tableName) {
        synchronized (tableName.intern()) {
            queryMap.remove(tableName);
            shutdownQueryThread(tableName);
        }
    }
    /**
     * 清理
     *
     * @param tableName
     */
    public void clear(String tableName) {
        synchronized (tableName.intern()) {
            queryMap.remove(tableName);
            shutdownQueryThread(tableName);
        }
    }
    /**
     * 提取错误日志,只能提取一次,提取后会直接清空
     *
     * @param tableName
     * @return
     */
    public String getErrorLog(String tableName) {
        synchronized (tableName.intern()) {
            if (errorLogMap == null) {
                return null;
            } else {
                StringBuilder result = errorLogMap.get(tableName);
                errorLogMap.remove(tableName);
                return result == null ? null : result.toString();
            }
        }
    }
    /**
     * 提取错误日志,只能提取一次,提取后会直接清空
     *
     * @param tableName
     * @return
     */
    public String getErrorLog(String tableName) {
        synchronized (tableName.intern()) {
            if (errorLogMap == null) {
                return null;
            } else {
                StringBuilder result = errorLogMap.get(tableName);
                errorLogMap.remove(tableName);
                return result == null ? null : result.toString();
            }
        }
    }
    /**
     * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
     *
     * @param totalCount
     * @return
     */
    private int getPartCount(int totalCount) {
        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
        if (partCount >= 16) {
            return 16;
        } else if (partCount >= 8) {
            return 8;
        } else if (partCount >= 4) {
            return 4;
        } else if (partCount >= 2) {
            return 2;
        } else if (partCount >= 1) {
            return 1;
        } else {
            return 0;
        }
    }
    /**
     * 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
     *
     * @param totalCount
     * @return
     */
    private int getPartCount(int totalCount) {
        int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
        int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
        if (partCount >= 16) {
            return 16;
        } else if (partCount >= 8) {
            return 8;
        } else if (partCount >= 4) {
            return 4;
        } else if (partCount >= 2) {
            return 2;
        } else if (partCount >= 1) {
            return 1;
        } else {
            return 0;
        }
    }
    /**
     * 获取sql:查询范围内的最大id值
     *
     * @param uniqueField
     * @param sourceTable
     * @param filter
     * @param dbType
     * @param pageIndex
     * @param pageSize
     * @return
     */
    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
        int finalIndex = startIndex + pageSize;
        StringBuilder sql = new StringBuilder(128);
        if (DataBaseType.MYSQL.getValue() == dbType) {
            sql.append("select max(").append(uniqueField).append(") max_id from (");
            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n    where ").append(filter);
            }
            sql.append("\n    order by ").append(uniqueField);
            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
            sql.append("\n) t");
        } else if (DataBaseType.ORACLE.getValue() == dbType) {
            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n        WHERE ").append(filter);
            }
            sql.append("\n        ORDER BY ").append(uniqueField);
            sql.append("\n    ) T1");
            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
            sql.append("\n) T2");
            sql.append("\nWHERE R>").append(startIndex);
        }
        return sql.toString();
    }
    /**
     * 获取sql:查询范围内的最大id值
     *
     * @param uniqueField
     * @param sourceTable
     * @param filter
     * @param dbType
     * @param pageIndex
     * @param pageSize
     * @return
     */
    private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
        int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
        int finalIndex = startIndex + pageSize;
        StringBuilder sql = new StringBuilder(128);
        if (DataBaseType.MYSQL.getValue() == dbType) {
            sql.append("select max(").append(uniqueField).append(") max_id from (");
            sql.append("\n    select ").append(uniqueField).append(" from ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n    where ").append(filter);
            }
            sql.append("\n    order by ").append(uniqueField);
            sql.append("\n    limit ").append(startIndex).append(",").append(pageSize);
            sql.append("\n) t");
        } else if (DataBaseType.ORACLE.getValue() == dbType) {
            sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
            sql.append("\n    SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
            sql.append("\n        SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
            if (!StringUtils.isEmpty(filter)) {
                sql.append("\n        WHERE ").append(filter);
            }
            sql.append("\n        ORDER BY ").append(uniqueField);
            sql.append("\n    ) T1");
            sql.append("\n    WHERE ROWNUM<=").append(finalIndex);
            sql.append("\n) T2");
            sql.append("\nWHERE R>").append(startIndex);
        }
        return sql.toString();
    }
    /**
     * 放入队列
     *
     * @param tableName
     * @param dte
     */
    private void add(String tableName, DataTableEntity dte) {
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            if (tableName.endsWith("BAK20230823")) {
                //检查dte中的source_info 和 pre_master_key 是否为空
                for (int i = 0; i < dte.getRows(); i++) {
                    String sourceInfo = dte.getString(i, "source_info");
                    String preMasterKey = dte.getString(i, "pre_master_key");
                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                        dte.setFieldValue(i, "source_info", "ch-kt");
                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
                    }
                }
            }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
        }
    }
    /**
     * 放入队列
     *
     * @param tableName
     * @param dte
     */
    private void add(String tableName, DataTableEntity dte) {
        synchronized (tableName.intern()) {
            LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
            if (queryQueue == null) {
                queryQueue = new LinkedBlockingQueue<>();
                queryMap.put(tableName, queryQueue);
            }
            if (tableName.endsWith("BAK20230823")) {
                //检查dte中的source_info 和 pre_master_key 是否为空
                for (int i = 0; i < dte.getRows(); i++) {
                    String sourceInfo = dte.getString(i, "source_info");
                    String preMasterKey = dte.getString(i, "pre_master_key");
                    if (StringUtils.isEmpty(sourceInfo) || StringUtils.isEmpty(preMasterKey)) {
                        dte.setFieldValue(i, "source_info", "ch-kt");
                        String idFiledName = tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_detail") ? "wip_detail_id" : tableName.toLowerCase(Locale.ROOT).startsWith("t_wip_product_keyp") ? "pk_id" : "wip_id";
                        dte.setFieldValue(i, "pre_master_key", dte.getString(i, idFiledName));
                    }
                }
            }
    /**
     * 查询开始,向存活map中添加1
     *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
            set.add(threadInfo);
        }
    }
            queryQueue.add(dte);
            WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            while (queryQueue.size() >= 10) {
                SpringMVCContextHolder.getSystemLogger().error("DA-队列已满-" + tableName + "-当前剩余队列数:" + queryQueue.size());
            }
        }
    }
    /**
     * 查询结束,向存活map中减少1
     *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.get(tableName);
            if (set == null || !set.contains(threadInfo)) {
                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
            }
            set.remove(threadInfo);
            if (set.isEmpty()) {
                existsQueryMap.remove(tableName);
            }
        }
    }
    /**
     * 查询开始,向存活map中添加1
     *
     * @param tableName
     */
    private void startQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
            set.add(threadInfo);
        }
    }
    /**
     * 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
     *
     * @param tableName
     * @return
     */
    private boolean allowQuery(String tableName) {
        synchronized (tableName.intern()) {
            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
        }
    }
    /**
     * 查询结束,向存活map中减少1
     *
     * @param tableName
     */
    private void finalQuery(String tableName, String threadInfo) {
        synchronized (tableName.intern()) {
            Set<String> set = existsQueryMap.get(tableName);
            if (set == null || !set.contains(threadInfo)) {
                throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
            }
            set.remove(threadInfo);
            if (set.isEmpty()) {
                existsQueryMap.remove(tableName);
            }
        }
    }
    /**
     * 插入错误日志
     *
     * @param tableName
     * @param error
     */
    private void appendErrorLog(String tableName, String error) {
        synchronized (tableName.intern()) {
            StringBuilder errorSb = errorLogMap.get(tableName);
            if (errorSb == null) {
                errorSb = new StringBuilder(128);
                errorLogMap.put(tableName, errorSb);
            }
            if (errorSb.length() > 0) {
                errorSb.append("\n");
            }
            if (errorSb.length() < 2000) {
                errorSb.append(error);
            }
        }
    }
    /**
     * 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
     *
     * @param tableName
     * @return
     */
    private boolean allowQuery(String tableName) {
        synchronized (tableName.intern()) {
            return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
        }
    }
    /**
     * 获取页数,向上取整
     *
     * @param count
     * @param size
     * @return
     */
    private int ceilPage(int count, int size) {
        if (size == 0) {
            if (count == 0) {
                return 0;
            } else {
                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
            }
        }
        return count / size + (count % size == 0 ? 0 : 1);
    }
    /**
     * 插入错误日志
     *
     * @param tableName
     * @param error
     */
    private void appendErrorLog(String tableName, String error) {
        synchronized (tableName.intern()) {
            StringBuilder errorSb = errorLogMap.get(tableName);
            if (errorSb == null) {
                errorSb = new StringBuilder(128);
                errorLogMap.put(tableName, errorSb);
            }
            if (errorSb.length() > 0) {
                errorSb.append("\n");
            }
            if (errorSb.length() < 2000) {
                errorSb.append(error);
            }
        }
    }
    /**
     * 获取页数,向上取整
     *
     * @param count
     * @param size
     * @return
     */
    private int ceilPage(int count, int size) {
        if (size == 0) {
            if (count == 0) {
                return 0;
            } else {
                throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
            }
        }
        return count / size + (count % size == 0 ? 0 : 1);
    }
}
product-server-datasource/src/main/java/com/product/datasource/connection/ConnectionManager.java
@@ -1,5 +1,9 @@
package com.product.datasource.connection;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.NumberUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.collect.Maps;
import com.product.common.lang.StringUtils;
import com.product.core.config.Global;
import com.product.core.exception.BaseException;
@@ -9,7 +13,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.sql.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @Author cheng
@@ -19,6 +26,8 @@
public class ConnectionManager {
    static Logger log = LoggerFactory.getLogger(ConnectionManager.class);
    private static final Map<String, DruidDataSource> DB_DRUID_DATA_SOURCE_MAP = new ConcurrentHashMap<>();
    public static void main(String[] args) {
        DataBaseEntity DataBaseEntity = new DataBaseEntity(DataBaseType.MYSQL.getValue());
@@ -179,16 +188,72 @@
     */
    private static Connection getOracleConnection(DataBaseEntity dbe) {
        String url;
        String[] params = {dbe.getIp(), dbe.getPort(), null};
        if (!StringUtils.isEmpty(dbe.getSid())) {
            url = String.format("jdbc:oracle:thin:@%s:%s:%s", dbe.getIp(), dbe.getPort(), dbe.getSid());
            params[2] = dbe.getSid();
        } else if (!StringUtils.isEmpty(dbe.getServerName())) {
            url = String.format("jdbc:oracle:thin:@//%s:%s/%s", dbe.getIp(), dbe.getPort(), dbe.getServerName());
            params[2] = dbe.getServerName();
        } else {
            throw new BaseException(ErrorCode.GET_ORACLE_SID_SERVERNAME_EMPTY);
        }
        Boolean enabling = Global.getPropertyToBoolean("data.system.oracle.connection-pool.enabling", "false");
        if (enabling) {
            DruidDataSource druidDataSource = DB_DRUID_DATA_SOURCE_MAP.get(ArrayUtil.join(params, ","));
            if (druidDataSource == null || druidDataSource.isClosed()) {
                druidDataSource = new DruidDataSource();
                druidDataSource.setUrl(url);
                druidDataSource.setUsername(dbe.getUserName());
                druidDataSource.setPassword(dbe.getPassWord());
                // 初始化时建立物理连接的个数
                druidDataSource.setInitialSize(getProperty("data.system.oracle.connection-pool.initial-size", 10));
                // 最大活动连接数
                druidDataSource.setMaxActive(getProperty("data.system.oracle.connection-pool.max-active", 100));
                // 最小空闲连接数
                druidDataSource.setMinIdle(getProperty("data.system.oracle.connection-pool.min-idle", 20));
                // 校验查询语句
                druidDataSource.setValidationQuery(dbe.getDataBaseType().getValidationQuery());
                // 当连接空闲时是否测试连接有效性
                druidDataSource.setTestWhileIdle(true);
                // 两次空闲连接清除之间的时间间隔
                druidDataSource.setTimeBetweenEvictionRunsMillis(60000);
                druidDataSource.setMaxWait(60 * 1000);
                druidDataSource.setPoolPreparedStatements(true);
                druidDataSource.setMaxPoolPreparedStatementPerConnectionSize(100);
                druidDataSource.setRemoveAbandoned(true);
                //半小时强制归还连接
                druidDataSource.setRemoveAbandonedTimeout(60 * 30);
                DB_DRUID_DATA_SOURCE_MAP.put(ArrayUtil.join(params, ","), druidDataSource);
                try {
                    //获获取连接
                    Connection connection = druidDataSource.getConnection();
                    //测试连接
                    if (!connectionValidity(connection, dbe.getDataBaseType())) {
                        throw new BaseException(ErrorCode.GET_CONNECTION_FAIL);
                    }
                    return connection;
                } catch (BaseException e) {
                    throw e;
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("获取链接失败", e);
                    log.error(url);
                    throw new BaseException(ErrorCode.GET_CONNECTION_FAIL);
                }
            }
        }
        return getConnection(url, dbe);
    }
    private static int getProperty(String key, int defaultValue) {
        String systemConfig = Global.getSystemConfig(key, defaultValue + "");
        if (!NumberUtil.isNumber(systemConfig)) {
            return defaultValue;
        }
        return NumberUtil.parseInt(systemConfig);
    }
    /**
     * 获取Informix 连接
     *