package com.product.data.sync.util;
|
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.util.CallBack;
|
import com.product.util.CallBackValue;
|
|
import java.sql.Connection;
|
import java.sql.PreparedStatement;
|
import java.sql.ResultSet;
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
|
class ThreadSelectManager {
|
|
private Map<Integer, CustomResultSet> pageResultSet = new HashMap<>();
|
/**
|
* 开始查询页数
|
*/
|
private int startPage;
|
|
private ExecutorService executorService;
|
|
private Connection sourceConnection;
|
|
private CallBackValue<String> getQuerySql;
|
|
private int totalPage = 0;
|
|
private CallBack<Object> setColumnNames;
|
|
private CallBackValue<String[]> getColumnNames;
|
|
|
public ThreadSelectManager(BatchData batchData) {
|
this.getQuerySql = (o) -> batchData.getQuerySql((int) o);
|
this.setColumnNames = (o) -> {
|
batchData.setColumnNames((String[]) o);
|
};
|
this.getColumnNames = (o) -> batchData.getColumnNames();
|
this.sourceConnection = ConnectionManager.getConnection();
|
this.totalPage = batchData.getTotalPage();
|
this.currentResult = 1;
|
}
|
|
/**
|
* 当前以获取第几页
|
*/
|
private int currentResult = -1;
|
|
private boolean openLog = true;
|
|
/**
|
* 开始查询
|
*
|
* @return
|
*/
|
public boolean startQuery() {
|
//不能重复开始查询
|
if (this.pageResultSet.size() <= 0 && this.executorService == null) {
|
this.executorService = Executors.newFixedThreadPool(1);
|
this.queryData();
|
return true;
|
}
|
return false;
|
}
|
|
/**
|
* 获取下一页数据
|
*
|
* @return
|
*/
|
public CustomResultSet getNext() {
|
if (this.currentResult == -1) {
|
return null;
|
}
|
int page = this.currentResult;
|
if (this.pageResultSet.get(page) != null) {
|
this.currentResult++;
|
return this.pageResultSet.get(page);
|
} else {
|
if (!this.executorService.isTerminated()) {
|
try {
|
outPutLog("等待线程中查询数据....", 1);
|
Thread.currentThread().sleep(1500);
|
return this.getNext();
|
} catch (Exception e) {
|
outPutLog("等待线程中查询数据,出错....", 2);
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
return null;
|
}
|
} else {
|
this.currentResult++;
|
outPutLog("查询数据线程已结束,没有 " + page + " 数据", 1);
|
return null;
|
}
|
}
|
}
|
|
public static void main(String[] args) {
|
int total = 3313;
|
int i1 = total / 3;
|
String[] ints = new String[3];
|
for (int i = 1; i <= 3; i++) {
|
}
|
}
|
|
/**
|
* 查询数据
|
*/
|
private void queryData() {
|
outPutLog("开始线程查询数据,需要查询:" + totalPage + " 页数据", 1);
|
// for (int i = 0; i < 3; i++) {
|
this.executorService.execute(() -> {
|
try {
|
int currentPage = 1;
|
while (totalPage > 0) {
|
outPutLog("开始查询页数:" + currentPage, 1);
|
String querySql = getQuerySql.method(currentPage);
|
outPutLog("查询sql:\n\t" + querySql, 1);
|
PreparedStatement pst = sourceConnection.prepareStatement(querySql);
|
ResultSet resultSet = pst.executeQuery();
|
if (resultSet != null) {
|
String[] columnNames = this.getColumnNames.method(null);
|
CustomResultSet customResultSet = new CustomResultSet(resultSet, columnNames);
|
if (columnNames == null || columnNames.length <= 0) {
|
this.setColumnNames.method(customResultSet.getColumnNames());
|
}
|
this.pageResultSet.put(currentPage, customResultSet);
|
}
|
currentPage++;
|
this.totalPage--;
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
outPutLog("线程线程查询数据,出错....", 2);
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
});
|
// }
|
this.executorService.shutdown();
|
}
|
|
/**
|
* 输出日志
|
*
|
* @param msg 消息
|
* @param type 类型 1 info 2 error
|
*/
|
private void outPutLog(String msg, int type) {
|
if (this.openLog) {
|
if (type == 1) {
|
SpringMVCContextHolder.getSystemLogger().info(msg);
|
} else if (type == 2) {
|
SpringMVCContextHolder.getSystemLogger().error(msg);
|
}
|
}
|
}
|
|
|
}
|