许鹏程
2023-06-30 3bbfaa3d7d416afbd154576453c8ee9e7e2f8899
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//package com.product.kt.test.service;
//
//import java.util.ArrayList;
//import java.util.List;
//import java.util.Map;
//import java.util.concurrent.ExecutionException;
//import java.util.concurrent.Future;
//
//import com.product.kt.test.config.SyncConfig;
//import com.product.util.BaseUtil;
//
//import net.minidev.json.JSONObject;
//
//import org.slf4j.Logger;
//import org.slf4j.LoggerFactory;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Service;
//
//import com.product.core.entity.FieldSetEntity;
//
//
//@Service
//public class ToolsThread {
//
//    private static final Logger LOG = LoggerFactory.getLogger(ToolsThread.class);
//
//    @Autowired
//    SyncToolsHandler syncToolsHandler;
//
//    @Autowired
//    ReadToolsService readToolsService;
//
//    //同步表
//    public static Map<String, JSONObject> readTableMap = SyncConfig.SYNC_TABLE_MAP;
//    //每次读取数据量大小
//    public static int pageSize = SyncConfig.PAGE_SIZE;
//
//    public void receiveBookJobRun() {
//        if (!readTableMap.isEmpty()) {
//
//            // 入库开始时间
//            Long inserOrUpdateBegin = System.currentTimeMillis();
//            LOG.info("数据更新开始时间:" + inserOrUpdateBegin);
//
//            //遍历同步表
//            readTableMap.forEach((tableName, specialField) -> {
//                //获取数据总条数
//                int countSize = readToolsService.getToolCount(tableName);
//
//                String timeField = null;
//                if (!specialField.isEmpty() && !BaseUtil.strIsNull(specialField.getAsString("time_field"))) {
//                    timeField=specialField.getAsString("time_field");
//                }
//
//                //获取总页数
//                int pageCount = countSize / pageSize;
//
//                LOG.info("数据操作表:" + tableName + ",  数据总条数:" + countSize + ", 总页数:" + pageCount);
////                LOG.info("数据操作表:"+tableName+", 排序字段:"+orderField+", 数据总条数:"+countSize+", 总页数:"+pageCount);
//
//                //遍历页数并获取每页数据
//                for (int j = 0; j < pageCount; j++) {
//                    //获取分页下标
////                    int index = j * pageSize;
//
//                    //获取数据
//                    List<FieldSetEntity> readDatas = readToolsService.getToolListByAutoId(j+1, pageSize, tableName,timeField);
//
//                    LOG.info("读取表" + tableName + "数据第" + (j + 1) + "页共" + readDatas.size() + "条数据");
//
//                    // 接收集合各段的 执行的返回结果
//                    List<Future<String>> futureList = new ArrayList<Future<String>>();
//
//                    //判断是否读取到数据
//                    if (readDatas != null) {
//                        // 将集合切分的段数(2*CPU的核心数)
//                        int threadSum = 2 * Runtime.getRuntime().availableProcessors();
//                        int listSize = readDatas.size();
//                        int listStart, listEnd;
//                        // 当总条数不足threadSum条时 用总条数 当做线程切分值
//                        if (threadSum > listSize) {
//                            threadSum = listSize;
//                        }
//
//                        //将list切分
//                        for (int k = 0; k < threadSum; k++) {
//                            // 计算切割 开始和结束
//                            listStart = listSize / threadSum * k;
//                            listEnd = listSize / threadSum * (k + 1);
//                            // 最后一段线程会 出现与其他线程不等的情况
//                            if (k == threadSum - 1) {
//                                listEnd = listSize;
//                            }
//                            // 数据切断
//                            List<FieldSetEntity> readToolList = readDatas.subList(listStart, listEnd);
//
//                            // 每段数据集合并行入库
//                            futureList.add(syncToolsHandler.syncTools(readToolList, k, tableName, timeField));
//                        }
//                        // 对各个线程段结果进行解析
//                        for (Future<String> future : futureList) {
//                            if (null != future) {
//                                try {
//                                    String str = future.get().toString();
//                                    LOG.info("current thread id =" + Thread.currentThread().getName() + ",result=" + str);
//
//                                } catch (ExecutionException | InterruptedException e) {
//
//                                    LOG.info("线程运行异常!");
//                                }
//
//                            } else {
//                                LOG.info("线程运行异常!");
//                            }
//                        }
//                    }
//                }
//            });
//            Long inserOrUpdateEnd = System.currentTimeMillis();
//            LOG.info("数据更新结束时间:" + inserOrUpdateEnd + "。此次更新数据花费时间为:" + (inserOrUpdateEnd - inserOrUpdateBegin));
//        }
//    }
//}