package com.product.data.center.service;
|
|
import cn.hutool.core.util.NumberUtil;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.entity.BatchConfigEntity;
|
import com.product.data.center.entity.BatchExecuteEntity;
|
import com.product.data.center.entity.QueryDataConfigEntity;
|
import com.product.data.center.service.ide.IBatchService;
|
import com.product.datasource.dao.Dao;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.util.Calendar;
|
|
|
@Service
|
public class BatchService extends AbstractBaseService implements IBatchService {
|
@Autowired
|
private JournalManagerService journalManagerService;
|
|
/**
|
* 批处理执行
|
*
|
* @param batchConEntity
|
*/
|
public void executeBatch(BatchConfigEntity batchConEntity, QueryDataAfterProcessing queryDataAfterProcessing) throws BaseException {
|
Dao dao = null;
|
if (batchConEntity.getSourceDbe() == null) {
|
throw new BaseException(ErrorCode.BATCH_EXECUTE_ERROR_DATASOURCE_EMPTY);
|
}
|
if (batchConEntity.getQueryEntity() == null) {
|
throw new BaseException(ErrorCode.BATCH_EXECUTE_ERROR_DATASOURCE_EMPTY);
|
}
|
try {
|
dao = batchConEntity.getSourceDbe().getDao();
|
QueryDataConfigEntity queryDataConfig = batchConEntity.getQueryEntity();
|
int totalPage;
|
try {
|
//获取查询数据总跳数SQL
|
String queryCountSql = queryDataConfig.getQueryTotalCountSql();
|
//查询总条数
|
FieldSetEntity fs = dao.getOne(queryCountSql, queryDataConfig.getParams());
|
int totalCount = fs.getInteger("total_count");
|
//计算出总页数
|
totalPage = (int) Math.ceil(NumberUtil.div(totalCount, batchConEntity.getBatchDisposeSize()));
|
} catch (Exception e) {
|
throw e;
|
}
|
DataTableEntity dt;
|
int minId;
|
int maxId = -1;
|
Calendar startC;
|
StringBuilder curFilter = new StringBuilder(128);
|
String preFilter = batchConEntity.getQueryEntity().getQueryFilter();
|
for (int i = 1; i <= totalPage; i++) {
|
startC = Calendar.getInstance();
|
//查询每页数据
|
if (i > 1) {
|
curFilter.setLength(0);
|
curFilter.append(preFilter);
|
if (!StringUtils.isEmpty(preFilter)) {
|
curFilter.append(" and ");
|
}
|
curFilter.append(queryDataConfig.getAutoIncrementPrimaryField()).append(">").append(maxId);
|
batchConEntity.getQueryEntity().setQueryFilter(curFilter.toString());
|
}
|
minId = -1;
|
maxId = -1;
|
dt = dao.getList(queryDataConfig.getQuerySql(1, batchConEntity.getBatchDisposeSize()), queryDataConfig.getParams());
|
try {
|
if (batchConEntity.getDataDispose() != null) {
|
//数据处理
|
dt = batchConEntity.getDataDispose().invokeMethod(dt);
|
}
|
//本批次最大ID
|
minId = dt.getInt(0, queryDataConfig.getAutoIncrementPrimaryField());
|
maxId = dt.getInt(dt.getRows() - 1, queryDataConfig.getAutoIncrementPrimaryField());
|
queryDataAfterProcessing.queryDataAfterProcessing(dt, minId, maxId, batchConEntity.getSourceDbe(), batchConEntity.getTargetDbe(), startC);
|
} catch (Exception e) {
|
//调用错误处理
|
if (totalPage > 1) {
|
if (minId == -1 || maxId == -1) {
|
// 没有正确获取到最小最大的ID值,说明查询过程当中出现了问题,那么就不能继续执行,否则就无法通过错误日志进行重新处理(本质原因是无法确认对应的范围)
|
throw e;
|
}
|
batchConEntity.getBatchError().method(new BatchExecuteEntity(dt, maxId, minId, i, journalManagerService.getStackTrace(e)));
|
} else {
|
// 若是处理的数据只有1页,那么一旦报错就无需进行之后的数据处理(因为没有),直接进入整体报错的判定范围
|
throw e;
|
}
|
}
|
}
|
} catch (BaseException e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw e;
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw e;
|
} finally {
|
if (dao != null) {
|
dao.closeConnection();
|
}
|
}
|
//
|
}
|
}
|