package com.product.data.center.service;
|
|
import cn.hutool.core.util.RandomUtil;
|
import com.google.common.collect.Maps;
|
import com.google.common.collect.Sets;
|
import com.product.common.utils.spring.SpringUtils;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.utils.WriteUtil;
|
import com.product.datasource.config.DataBaseType;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.DataBaseEntity;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.stereotype.Service;
|
|
import java.util.Map;
|
import java.util.Set;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
/**
|
* Copyright © 6c
|
*
|
* @Date 2022年11月27日 8:55
|
* @Author 6c
|
* @Description
|
*/
|
@Service
|
public class DataArchivingQueue extends AbstractBaseService {
|
// 查询队列map:map<表名,dte队列>
|
private static Map<String, LinkedBlockingQueue<DataTableEntity>> queryMap = new ConcurrentHashMap<>();
|
// 查询存活map:正在执行查询的表,对应的值为执行的线程数,为0标识已经执行完成
|
private static Map<String, Set<String>> existsQueryMap = Maps.newHashMap();
|
// 查询线程map
|
private static Map<String, ExecutorService> queryThreadMap = Maps.newHashMap();
|
// 错误日志map
|
private static Map<String, StringBuilder> errorLogMap = Maps.newHashMap();
|
// 单表查询最大线程数
|
private static final int QUERY_THREAD_COUNT = 3;
|
// 单表验证查询最大批次(单表查询最大批次=单表验证查询最大批次 + 单表查询最大线程数)
|
private static final int QUERY_MAX_BATCH_COUNT = 4;
|
// 查询每页大小
|
private static final int QUERY_PAGE_SIZE = 50000;
|
// 插入每页大小
|
public static final int INSERT_PAGE_SIZE = 5000;
|
|
/**
|
* 查询
|
* @param sourceDbe
|
* @param sourceTable
|
* @param filter
|
* @param params
|
* @param uniqueField 主键,不仅会用于识别还会用于排序防止oracle分页获取到重复数据
|
* @param minID
|
*/
|
public void query(DataBaseEntity sourceDbe, String sourceTable, String filter, Object[] params, String uniqueField, String minID) {
|
Dao sourceDao = null;
|
try {
|
StringBuilder countSql = new StringBuilder(128);
|
countSql.append("select count(*) count_value from ").append(sourceTable);
|
if (!StringUtils.isEmpty(filter)) {
|
countSql.append(" where ").append(filter);
|
}
|
sourceDao = sourceDbe.newDao();
|
FieldSetEntity countFse = sourceDao.getOne(countSql.toString(), params);
|
int totalCount = StringUtils.isEmpty(countFse.getString("count_value")) ? 0 : countFse.getInteger("count_value");
|
int partCount = getPartCount(totalCount);
|
shutdownQueryThread(sourceTable);
|
ExecutorService executorService = Executors.newWorkStealingPool(QUERY_THREAD_COUNT);
|
queryThreadMap.put(sourceTable, executorService);
|
StringBuilder rangeSql = new StringBuilder(128);
|
String tempPartMinID = minID;
|
int partSize = partCount * QUERY_PAGE_SIZE;
|
int count = ceilPage(totalCount, partSize);
|
for (int i = 1; i <= count; i++) {
|
rangeSql.setLength(0);
|
if (!StringUtils.isEmpty(filter)) {
|
rangeSql.append(filter).append(" and ");
|
}
|
rangeSql.append(uniqueField).append(">='").append(tempPartMinID).append("'");
|
DataTableEntity rangeDte = sourceDao.getList(getSql(uniqueField, sourceTable, rangeSql.toString(), sourceDbe.getDbType().getValue(), 1, partSize), params);
|
if (DataTableEntity.isEmpty(rangeDte)) {
|
continue;
|
}
|
FieldSetEntity rangeFse = rangeDte.getFieldSetEntity(0);
|
String curPartMaxID = rangeFse.getString("max_id");
|
String curPartMinID = tempPartMinID;
|
executorService.submit(() -> {
|
String threadInfo = String.valueOf(Thread.currentThread().getId());
|
Dao threadSourceDao = null;
|
String thisPartMinID = curPartMinID;
|
try {
|
threadSourceDao = sourceDbe.newDao();
|
startQuery(sourceTable, threadInfo);
|
int totalPage = ceilPage(partSize, QUERY_PAGE_SIZE);
|
StringBuilder tempFilter = new StringBuilder(128);
|
for (int j = 0; j < totalPage; j++) {
|
while (!allowQuery(sourceTable)) {
|
Thread.sleep(RandomUtil.randomInt(800, 1200));
|
}
|
tempFilter.setLength(0);
|
tempFilter.append(uniqueField);
|
WriteUtil.append("DA-threadInfo:" + threadInfo + "-thisPartMinID:" + thisPartMinID);
|
if (minID.equals(thisPartMinID)) {
|
tempFilter.append(">=");
|
} else {
|
tempFilter.append(">");
|
}
|
tempFilter.append("'").append(thisPartMinID).append("'").append(" and ").append(uniqueField).append("<='").append(curPartMaxID).append("'");
|
if (!StringUtils.isEmpty(filter)) {
|
tempFilter.append(" and ").append(filter);
|
}
|
WriteUtil.append("DA-threadInfo:" + threadInfo + "-currentPage:" + (j + 1) + "-pageSize:" + QUERY_PAGE_SIZE + "-filter:" + tempFilter);
|
DataTableEntity allDte = threadSourceDao.getList(sourceTable, tempFilter.toString(), params, uniqueField, 1, QUERY_PAGE_SIZE);
|
if (!DataTableEntity.isEmpty(allDte)) {
|
add(sourceTable, allDte);
|
thisPartMinID = allDte.getFieldSetEntity(allDte.getRows() - 1).getString(uniqueField);
|
} else {
|
break;
|
}
|
}
|
} catch (Exception e) {
|
appendErrorLog(sourceTable, SpringUtils.getBean(JournalManagerService.class).getStackTrace(e).trim());
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
clear(sourceTable);
|
} finally {
|
if (threadSourceDao != null) {
|
threadSourceDao.closeConnection();
|
}
|
finalQuery(sourceTable, threadInfo);
|
}
|
});
|
tempPartMinID = curPartMaxID;
|
}
|
} catch (Exception e) {
|
throw e;
|
} finally {
|
if (sourceDao != null) {
|
sourceDao.closeConnection();
|
}
|
}
|
}
|
|
/**
|
* 从队列中获取
|
* @param tableName
|
* @return
|
*/
|
public DataTableEntity get(String tableName) {
|
WriteUtil.append("DA-从队列中获取-表名:" + tableName);
|
synchronized (tableName.intern()) {
|
LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
|
if (queryQueue == null) {
|
return null;
|
}
|
return queryQueue.poll();
|
}
|
}
|
|
/**
|
* 判定是否查询完毕
|
* @param tableName
|
* @return
|
*/
|
public boolean checkQueryFinish(String tableName) {
|
WriteUtil.append("DA-判定是否查询完毕");
|
Set<String> set = existsQueryMap.get(tableName);
|
return set == null || set.isEmpty();
|
}
|
|
/**
|
* 判定插入队列(查询完成后放入的队列)是否为空
|
* @param tableName
|
* @return
|
*/
|
public boolean checkInsertQueueEmpty(String tableName) {
|
WriteUtil.append("DA-判定插入队列(查询完成后放入的队列)是否为空");
|
return queryMap == null || queryMap.get(tableName) == null || queryMap.get(tableName).isEmpty();
|
}
|
|
/**
|
* 关闭查询线程
|
* @param tableName
|
*/
|
public void shutdownQueryThread(String tableName) {
|
synchronized (tableName.intern()) {
|
ExecutorService executorService = queryThreadMap.get(tableName);
|
if (executorService != null) {
|
if (!executorService.isShutdown()) {
|
executorService.shutdown();
|
}
|
queryThreadMap.remove(tableName);
|
}
|
}
|
}
|
|
/**
|
* 清理
|
* @param tableName
|
*/
|
public void clear(String tableName) {
|
synchronized (tableName.intern()) {
|
queryMap.remove(tableName);
|
shutdownQueryThread(tableName);
|
}
|
}
|
|
/**
|
* 提取错误日志,只能提取一次,提取后会直接清空
|
* @param tableName
|
* @return
|
*/
|
public String getErrorLog(String tableName) {
|
synchronized (tableName.intern()) {
|
if (errorLogMap == null) {
|
return null;
|
} else {
|
StringBuilder result = errorLogMap.get(tableName);
|
errorLogMap.remove(tableName);
|
return result == null ? null : result.toString();
|
}
|
}
|
}
|
|
/**
|
* 获取分段的数量,最多16,最小为0,为0表示用不上所有的线程
|
* @param totalCount
|
* @return
|
*/
|
private int getPartCount(int totalCount) {
|
int num = QUERY_THREAD_COUNT * QUERY_PAGE_SIZE;
|
int partCount = totalCount / num + (totalCount % num == 0 ? 0 : 1);
|
if (partCount >= 16) {
|
return 16;
|
} else if (partCount >= 8) {
|
return 8;
|
} else if (partCount >= 4) {
|
return 4;
|
} else if (partCount >= 2) {
|
return 2;
|
} else if (partCount >= 1) {
|
return 1;
|
} else {
|
return 0;
|
}
|
}
|
|
/**
|
* 获取sql:查询范围内的最大id值
|
* @param uniqueField
|
* @param sourceTable
|
* @param filter
|
* @param dbType
|
* @param pageIndex
|
* @param pageSize
|
* @return
|
*/
|
private String getSql(String uniqueField, String sourceTable, String filter, int dbType, int pageIndex, int pageSize) {
|
int startIndex = (Math.max(pageIndex, 1) - 1) * pageSize;
|
int finalIndex = startIndex + pageSize;
|
StringBuilder sql = new StringBuilder(128);
|
if (DataBaseType.MYSQL.getValue() == dbType) {
|
sql.append("select max(").append(uniqueField).append(") max_id from (");
|
sql.append("\n select ").append(uniqueField).append(" from ").append(sourceTable);
|
if (!StringUtils.isEmpty(filter)) {
|
sql.append("\n where ").append(filter);
|
}
|
sql.append("\n order by ").append(uniqueField);
|
sql.append("\n limit ").append(startIndex).append(",").append(pageSize);
|
sql.append("\n) t");
|
} else if (DataBaseType.ORACLE.getValue() == dbType) {
|
sql.append("SELECT MAX(").append(uniqueField).append(") max_id FROM (");
|
sql.append("\n SELECT ").append(uniqueField).append(",ROWNUM R FROM (");
|
sql.append("\n SELECT ").append(uniqueField).append(" FROM ").append(sourceTable);
|
if (!StringUtils.isEmpty(filter)) {
|
sql.append("\n WHERE ").append(filter);
|
}
|
sql.append("\n ORDER BY ").append(uniqueField);
|
sql.append("\n ) T1");
|
sql.append("\n WHERE ROWNUM<=").append(finalIndex);
|
sql.append("\n) T2");
|
sql.append("\nWHERE R>").append(startIndex);
|
}
|
return sql.toString();
|
}
|
|
/**
|
* 放入队列
|
* @param tableName
|
* @param dte
|
*/
|
private void add(String tableName, DataTableEntity dte) {
|
synchronized (tableName.intern()) {
|
LinkedBlockingQueue<DataTableEntity> queryQueue = queryMap.get(tableName);
|
if (queryQueue == null) {
|
queryQueue = new LinkedBlockingQueue<>();
|
queryMap.put(tableName, queryQueue);
|
}
|
queryQueue.add(dte);
|
WriteUtil.append("DA-成功放入队列-" + tableName + "-当前剩余队列数:" + queryQueue.size());
|
}
|
}
|
|
/**
|
* 查询开始,向存活map中添加1
|
* @param tableName
|
*/
|
private void startQuery(String tableName, String threadInfo) {
|
synchronized (tableName.intern()) {
|
Set<String> set = existsQueryMap.computeIfAbsent(tableName, k -> Sets.newLinkedHashSet());
|
set.add(threadInfo);
|
}
|
}
|
|
/**
|
* 查询结束,向存活map中减少1
|
* @param tableName
|
*/
|
private void finalQuery(String tableName, String threadInfo) {
|
synchronized (tableName.intern()) {
|
Set<String> set = existsQueryMap.get(tableName);
|
if (set == null || !set.contains(threadInfo)) {
|
throw new BaseException(ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getValue(), ErrorCode.DATA_ARCHIVE_QUERY_THREAD_COUNT_ERROR.getText() + " table_name: " + tableName);
|
}
|
set.remove(threadInfo);
|
if (set.isEmpty()) {
|
existsQueryMap.remove(tableName);
|
}
|
}
|
}
|
|
/**
|
* 允许执行查询(避免队列中等待插入的太多,导致内存溢出)
|
* @param tableName
|
* @return
|
*/
|
private boolean allowQuery(String tableName) {
|
synchronized (tableName.intern()) {
|
return queryMap.get(tableName) == null || queryMap.get(tableName).size() <= QUERY_MAX_BATCH_COUNT;
|
}
|
}
|
|
/**
|
* 插入错误日志
|
* @param tableName
|
* @param error
|
*/
|
private void appendErrorLog(String tableName, String error) {
|
synchronized (tableName.intern()) {
|
StringBuilder errorSb = errorLogMap.get(tableName);
|
if (errorSb == null) {
|
errorSb = new StringBuilder(128);
|
errorLogMap.put(tableName, errorSb);
|
}
|
if (errorSb.length() > 0) {
|
errorSb.append("\n");
|
}
|
if (errorSb.length() < 2000) {
|
errorSb.append(error);
|
}
|
}
|
}
|
|
/**
|
* 获取页数,向上取整
|
* @param count
|
* @param size
|
* @return
|
*/
|
private int ceilPage(int count, int size) {
|
if (size == 0) {
|
if (count == 0) {
|
return 0;
|
} else {
|
throw new BaseException(ErrorCode.ARCHIVE_PAGE_CALCULATE_ERROR);
|
}
|
}
|
return count / size + (count % size == 0 ? 0 : 1);
|
}
|
}
|