package com.product.data.center.service; import cn.hutool.core.util.NumberUtil; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.ErrorCode; import com.product.data.center.entity.BatchConfigEntity; import com.product.data.center.entity.BatchExecuteEntity; import com.product.data.center.entity.QueryDataConfigEntity; import com.product.data.center.service.ide.IBatchService; import com.product.datasource.dao.Dao; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Calendar; @Service public class BatchService extends AbstractBaseService implements IBatchService { @Autowired private JournalManagerService journalManagerService; /** * 批处理执行 * * @param batchConEntity */ public void executeBatch(BatchConfigEntity batchConEntity, QueryDataAfterProcessing queryDataAfterProcessing) throws BaseException { Dao dao = null; if (batchConEntity.getSourceDbe() == null) { throw new BaseException(ErrorCode.BATCH_EXECUTE_ERROR_DATASOURCE_EMPTY); } if (batchConEntity.getQueryEntity() == null) { throw new BaseException(ErrorCode.BATCH_EXECUTE_ERROR_DATASOURCE_EMPTY); } try { dao = batchConEntity.getSourceDbe().getDao(); QueryDataConfigEntity queryDataConfig = batchConEntity.getQueryEntity(); int totalPage; try { //获取查询数据总跳数SQL String queryCountSql = queryDataConfig.getQueryTotalCountSql(); //查询总条数 FieldSetEntity fs = dao.getOne(queryCountSql, queryDataConfig.getParams()); int totalCount = fs.getInteger("total_count"); //计算出总页数 totalPage = (int) Math.ceil(NumberUtil.div(totalCount, batchConEntity.getBatchDisposeSize())); } catch (Exception e) { throw e; } DataTableEntity dt; int minId; int maxId = -1; Calendar startC; StringBuilder curFilter = new StringBuilder(128); String preFilter = batchConEntity.getQueryEntity().getQueryFilter(); for (int i = 1; i <= totalPage; i++) { startC = Calendar.getInstance(); //查询每页数据 if (i > 1) { curFilter.setLength(0); curFilter.append(preFilter); if (!StringUtils.isEmpty(preFilter)) { curFilter.append(" and "); } curFilter.append(queryDataConfig.getAutoIncrementPrimaryField()).append(">").append(maxId); batchConEntity.getQueryEntity().setQueryFilter(curFilter.toString()); } minId = -1; maxId = -1; dt = dao.getList(queryDataConfig.getQuerySql(1, batchConEntity.getBatchDisposeSize()), queryDataConfig.getParams()); try { if (batchConEntity.getDataDispose() != null) { //数据处理 dt = batchConEntity.getDataDispose().invokeMethod(dt); } //本批次最大ID minId = dt.getInt(0, queryDataConfig.getAutoIncrementPrimaryField()); maxId = dt.getInt(dt.getRows() - 1, queryDataConfig.getAutoIncrementPrimaryField()); queryDataAfterProcessing.queryDataAfterProcessing(dt, minId, maxId, batchConEntity.getSourceDbe(), batchConEntity.getTargetDbe(), startC); } catch (Exception e) { //调用错误处理 if (totalPage > 1) { if (minId == -1 || maxId == -1) { // 没有正确获取到最小最大的ID值,说明查询过程当中出现了问题,那么就不能继续执行,否则就无法通过错误日志进行重新处理(本质原因是无法确认对应的范围) throw e; } batchConEntity.getBatchError().method(new BatchExecuteEntity(dt, maxId, minId, i, journalManagerService.getStackTrace(e))); } else { // 若是处理的数据只有1页,那么一旦报错就无需进行之后的数据处理(因为没有),直接进入整体报错的判定范围 throw e; } } } } catch (BaseException e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); throw e; } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); throw e; } finally { if (dao != null) { dao.closeConnection(); } } // } }