//package com.product.kt.test.service;
|
//
|
//import java.util.ArrayList;
|
//import java.util.List;
|
//import java.util.Map;
|
//import java.util.concurrent.ExecutionException;
|
//import java.util.concurrent.Future;
|
//
|
//import com.product.kt.test.config.SyncConfig;
|
//import com.product.util.BaseUtil;
|
//
|
//import net.minidev.json.JSONObject;
|
//
|
//import org.slf4j.Logger;
|
//import org.slf4j.LoggerFactory;
|
//import org.springframework.beans.factory.annotation.Autowired;
|
//import org.springframework.stereotype.Service;
|
//
|
//import com.product.core.entity.FieldSetEntity;
|
//
|
//
|
//@Service
|
//public class ToolsThread {
|
//
|
// private static final Logger LOG = LoggerFactory.getLogger(ToolsThread.class);
|
//
|
// @Autowired
|
// SyncToolsHandler syncToolsHandler;
|
//
|
// @Autowired
|
// ReadToolsService readToolsService;
|
//
|
// //同步表
|
// public static Map<String, JSONObject> readTableMap = SyncConfig.SYNC_TABLE_MAP;
|
// //每次读取数据量大小
|
// public static int pageSize = SyncConfig.PAGE_SIZE;
|
//
|
// public void receiveBookJobRun() {
|
// if (!readTableMap.isEmpty()) {
|
//
|
// // 入库开始时间
|
// Long inserOrUpdateBegin = System.currentTimeMillis();
|
// LOG.info("数据更新开始时间:" + inserOrUpdateBegin);
|
//
|
// //遍历同步表
|
// readTableMap.forEach((tableName, specialField) -> {
|
// //获取数据总条数
|
// int countSize = readToolsService.getToolCount(tableName);
|
//
|
// String timeField = null;
|
// if (!specialField.isEmpty() && !BaseUtil.strIsNull(specialField.getAsString("time_field"))) {
|
// timeField=specialField.getAsString("time_field");
|
// }
|
//
|
// //获取总页数
|
// int pageCount = countSize / pageSize;
|
//
|
// LOG.info("数据操作表:" + tableName + ", 数据总条数:" + countSize + ", 总页数:" + pageCount);
|
//// LOG.info("数据操作表:"+tableName+", 排序字段:"+orderField+", 数据总条数:"+countSize+", 总页数:"+pageCount);
|
//
|
// //遍历页数并获取每页数据
|
// for (int j = 0; j < pageCount; j++) {
|
// //获取分页下标
|
//// int index = j * pageSize;
|
//
|
// //获取数据
|
// List<FieldSetEntity> readDatas = readToolsService.getToolListByAutoId(j+1, pageSize, tableName,timeField);
|
//
|
// LOG.info("读取表" + tableName + "数据第" + (j + 1) + "页共" + readDatas.size() + "条数据");
|
//
|
// // 接收集合各段的 执行的返回结果
|
// List<Future<String>> futureList = new ArrayList<Future<String>>();
|
//
|
// //判断是否读取到数据
|
// if (readDatas != null) {
|
// // 将集合切分的段数(2*CPU的核心数)
|
// int threadSum = 2 * Runtime.getRuntime().availableProcessors();
|
// int listSize = readDatas.size();
|
// int listStart, listEnd;
|
// // 当总条数不足threadSum条时 用总条数 当做线程切分值
|
// if (threadSum > listSize) {
|
// threadSum = listSize;
|
// }
|
//
|
// //将list切分
|
// for (int k = 0; k < threadSum; k++) {
|
// // 计算切割 开始和结束
|
// listStart = listSize / threadSum * k;
|
// listEnd = listSize / threadSum * (k + 1);
|
// // 最后一段线程会 出现与其他线程不等的情况
|
// if (k == threadSum - 1) {
|
// listEnd = listSize;
|
// }
|
// // 数据切断
|
// List<FieldSetEntity> readToolList = readDatas.subList(listStart, listEnd);
|
//
|
// // 每段数据集合并行入库
|
// futureList.add(syncToolsHandler.syncTools(readToolList, k, tableName, timeField));
|
// }
|
// // 对各个线程段结果进行解析
|
// for (Future<String> future : futureList) {
|
// if (null != future) {
|
// try {
|
// String str = future.get().toString();
|
// LOG.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
|
//
|
// } catch (ExecutionException | InterruptedException e) {
|
//
|
// LOG.info("线程运行异常!");
|
// }
|
//
|
// } else {
|
// LOG.info("线程运行异常!");
|
// }
|
// }
|
// }
|
// }
|
// });
|
// Long inserOrUpdateEnd = System.currentTimeMillis();
|
// LOG.info("数据更新结束时间:" + inserOrUpdateEnd + "。此次更新数据花费时间为:" + (inserOrUpdateEnd - inserOrUpdateBegin));
|
// }
|
// }
|
//}
|