package com.product.data.sync.util; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.util.CallBack; import com.product.util.CallBackValue; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class ThreadSelectManager { private Map pageResultSet = new HashMap<>(); /** * 开始查询页数 */ private int startPage; private ExecutorService executorService; private Connection sourceConnection; private CallBackValue getQuerySql; private int totalPage = 0; private CallBack setColumnNames; private CallBackValue getColumnNames; public ThreadSelectManager(BatchData batchData) { this.getQuerySql = (o) -> batchData.getQuerySql((int) o); this.setColumnNames = (o) -> { batchData.setColumnNames((String[]) o); }; this.getColumnNames = (o) -> batchData.getColumnNames(); this.sourceConnection = ConnectionManager.getConnection(); this.totalPage = batchData.getTotalPage(); this.currentResult = 1; } /** * 当前以获取第几页 */ private int currentResult = -1; private boolean openLog = true; /** * 开始查询 * * @return */ public boolean startQuery() { //不能重复开始查询 if (this.pageResultSet.size() <= 0 && this.executorService == null) { this.executorService = Executors.newFixedThreadPool(1); this.queryData(); return true; } return false; } /** * 获取下一页数据 * * @return */ public CustomResultSet getNext() { if (this.currentResult == -1) { return null; } int page = this.currentResult; if (this.pageResultSet.get(page) != null) { this.currentResult++; return this.pageResultSet.get(page); } else { if (!this.executorService.isTerminated()) { try { outPutLog("等待线程中查询数据....", 1); Thread.currentThread().sleep(1500); return this.getNext(); } catch (Exception e) { outPutLog("等待线程中查询数据,出错....", 2); SpringMVCContextHolder.getSystemLogger().error(e); return null; } } else { this.currentResult++; outPutLog("查询数据线程已结束,没有 " + page + " 数据", 1); return null; } } } public static void main(String[] args) { int total = 3313; int i1 = total / 3; String[] ints = new String[3]; for (int i = 1; i <= 3; i++) { } } /** * 查询数据 */ private void queryData() { outPutLog("开始线程查询数据,需要查询:" + totalPage + " 页数据", 1); // for (int i = 0; i < 3; i++) { this.executorService.execute(() -> { try { int currentPage = 1; while (totalPage > 0) { outPutLog("开始查询页数:" + currentPage, 1); String querySql = getQuerySql.method(currentPage); outPutLog("查询sql:\n\t" + querySql, 1); PreparedStatement pst = sourceConnection.prepareStatement(querySql); ResultSet resultSet = pst.executeQuery(); if (resultSet != null) { String[] columnNames = this.getColumnNames.method(null); CustomResultSet customResultSet = new CustomResultSet(resultSet, columnNames); if (columnNames == null || columnNames.length <= 0) { this.setColumnNames.method(customResultSet.getColumnNames()); } this.pageResultSet.put(currentPage, customResultSet); } currentPage++; this.totalPage--; } } catch (Exception e) { e.printStackTrace(); outPutLog("线程线程查询数据,出错....", 2); SpringMVCContextHolder.getSystemLogger().error(e); } }); // } this.executorService.shutdown(); } /** * 输出日志 * * @param msg 消息 * @param type 类型 1 info 2 error */ private void outPutLog(String msg, int type) { if (this.openLog) { if (type == 1) { SpringMVCContextHolder.getSystemLogger().info(msg); } else if (type == 2) { SpringMVCContextHolder.getSystemLogger().error(msg); } } } }