1821349743@qq.com
2023-02-20 93dd9bc3f16b0f626761ec624f2dc78037568897
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
package com.product.data.service;
 
import cn.hutool.core.util.IdUtil;
import com.google.common.collect.Lists;
import com.product.common.lang.DateUtils;
import com.product.common.lang.ExceptionUtils;
import com.product.common.lang.StringUtils;
import com.product.core.entity.FieldSetEntity;
import com.product.core.exception.BaseException;
import com.product.core.spring.context.SpringMVCContextHolder;
import com.product.data.config.CmnConst;
import com.product.data.config.DatabaseType;
import com.product.data.config.ErrorCode;
import com.product.data.connection.ConnectionManager;
import com.product.data.entity.QueryResultEntity;
import com.product.data.utli.CommonUtils;
import com.product.data.utli.QueryDataService;
import com.product.util.CallBack;
import com.product.util.CallBackValue;
import com.product.util.SystemParamReplace;
 
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
 
/**
 * @Author cheng
 * @Date 2022/2/13 17:25
 * @Desc 同步数据之后处理执行Service
 */
public class SyncDataAfterProcessExecuteService {
    /**
     * 目标表名
     */
    private String targetTable;
    /**
     * 总条数
     */
    private int totalNumber;
    /**
     * 查询模板sql
     */
    private String queryTemplateSql;
    /**
     * 查询模板sql
     */
    private String insertTemplateSql;
    /**
     * 处理任务主表uuid
     */
    private String processUuid;
    /**
     * 是否清空表数据
     */
    private boolean isClearTableData;
    /**
     * 系统连接
     */
    private Connection systemConnection = ConnectionManager.getSystemConnection();
    /**
     * 默认值map
     */
    private Map<String, Object> defaultValue = new HashMap<>();
    /**
     * 当前页
     */
    private int currentPage = 1;
    /**
     * 总页数
     */
    private int totalPage = -1;
    /**
     * 每页查询条数
     */
    private int pageSize = 100000;
    /**
     * 每批次提交条数
     */
    private int batchSize = 5000;
    /**
     * 查询数据service
     */
    private QueryDataService queryDataService;
 
    private Map<String, Map<Object, Object>> systemFieldByCache;
 
    private void initDefaultValue() {
        defaultValue.put(CmnConst.CREATED_BY, 1);
        defaultValue.put(CmnConst.CREATED_UTC_DATETIME, DateUtils.getDateTime());
        defaultValue.put("org_level_uuid", "00000000-0000-0000-0000-000000000000");
        CallBackValue<String> getUuidMethod = (o) -> IdUtil.randomUUID();
        defaultValue.put(CmnConst.UUID, getUuidMethod);
        String[] fields = defaultValue.keySet().toArray(new String[]{});
        for (String field : fields) {
            if (!systemFieldByCache.containsKey(field)) {
                this.defaultValue.remove(field);
            }
        }
    }
 
    /**
     * 添加默认值
     * 字段名称必选在目标表中
     *
     * @param fieldName
     * @param value
     */
    public void addDefaultValue(String fieldName, Object value) {
        if (this.systemFieldByCache.containsKey(fieldName)) {
            this.defaultValue.put(fieldName, value);
        }
    }
 
    public void setPageSize(int pageSize) {
        this.pageSize = pageSize;
    }
 
    /**
     * 初始构造
     *
     * @param targetTable      目标表名
     * @param totalNumber      要查询数据的总条数
     * @param queryTemplateSql 查询模板sql
     * @param processUuid      同步任务后处理 uuid
     * @param isClearTableData 是否清空表数据
     */
    public SyncDataAfterProcessExecuteService(String targetTable, int totalNumber, String queryTemplateSql, String processUuid, boolean isClearTableData) throws BaseException {
        try {
            this.targetTable = targetTable;
            this.totalNumber = totalNumber;
            this.queryTemplateSql = queryTemplateSql;
            this.processUuid = processUuid;
            this.isClearTableData = isClearTableData;
            //初始化查询Service
            this.queryDataService = new QueryDataService(this.systemConnection);
//            this.initInsertSql();
            this.systemConnection.setAutoCommit(false);
            if (this.totalNumber > 0) {
                this.totalPage = (int) Math.ceil(totalNumber / Double.valueOf(this.pageSize));
            }
            //获取目标表系统字段
            this.systemFieldByCache = CommonUtils.getSystemFieldByCache(targetTable);
            //初始化默认值
            this.initDefaultValue();
        } catch (BaseException e) {
            e.printStackTrace();
            this.outPutError("初始化SyncDataAfterProcessExecuteService构造失败", e);
            throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL);
        } catch (Exception e) {
            e.printStackTrace();
            this.outPutError("初始化SyncDataAfterProcessExecuteService构造失败", e);
            throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_SERVICE_INIT_FAIL);
        }
    }
 
    private List<String> valueFields;
 
    private void initInsertSql() throws BaseException {
        if (StringUtils.isEmpty(this.targetTable)) {
            throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_TARGET_TABLE_EMPTY);
        }
 
        StringBuilder sql = new StringBuilder(" INSERT INTO `");
        sql.append(this.targetTable).append("` ( ");
        String values = "";
        this.valueFields = new ArrayList<>();
        for (int i = 0; i < this.queryFields.size(); i++) {
            String queryField = this.queryFields.get(i);
            this.valueFields.add(queryField);
            if (this.defaultValue.containsKey(queryField)) {
                this.defaultValue.remove(queryField);
            }
            sql.append(CommonUtils.getFieldName(queryField, DatabaseType.MySql));
            values += "?";
            if (i + 1 < this.queryFields.size()) {
                sql.append(",");
                values += ",";
            } else if (this.defaultValue.size() > 0) {
                String[] fields = this.defaultValue.keySet().toArray(new String[]{});
                sql.append(",");
                values += ",";
                for (int j = 0; j < fields.length; j++) {
                    this.valueFields.add(fields[j]);
                    sql.append(CommonUtils.getFieldName(fields[j], DatabaseType.MySql));
                    values += "?";
                    if (j + 1 < fields.length) {
                        sql.append(",");
                        values += ",";
                    }
                }
            }
 
        }
        sql.append(" ) VALUES (").append(values).append(") ");
        this.insertTemplateSql = sql.toString();
    }
 
    /**
     * 查询字段
     */
    private List<String> queryFields;
 
    private FieldSetEntity pageParam;
 
    /**
     * 查询下一页数据
     *
     * @return
     */
    private QueryResultEntity queryNextPageData() throws BaseException {
        long startTime = System.currentTimeMillis();
        if (pageParam == null) {
            pageParam = new FieldSetEntity();
            pageParam.setTableName("temp");
            pageParam.setValue("end_size", this.pageSize);
        }
        pageParam.setValue("start_size", (this.currentPage - 1) * this.pageSize);
//        String sql = String.format(this.queryTemplateSql, , this.pageSize);
        String sql = SystemParamReplace.formParamsReplace(this.queryTemplateSql, pageParam);
        this.outPutMsg("开始查询第 " + this.currentPage + " 页数据,sql:\n" + sql);
        QueryResultEntity result = this.queryDataService.getResult(sql, queryFields);
        //当查询字段为空 并且查询结果不为空时获取第一条查询结果的所有字段 避免多次获取字段信息
        if (this.queryFields == null || this.queryFields.size() <= 0) {
            if (result == null || result.getColumnNames() == null || result.getColumnNames().size() <= 0) {
                //没有获取到查询的字段信息
                throw new BaseException(ErrorCode.SYNC_DATA_AFTER_PROCESS_QUERY_RESULT_FAIL_EMPTY);
            } else {
                this.queryFields = result.getColumnNames();
                initInsertSql();
            }
        }
        this.outPutMsg("查询完成,本次查询到 " + result.getSize() + " 条数据,耗时:" + ((System.currentTimeMillis() - startTime) / 1000) + " 秒");
        this.currentPage++;
        return result;
    }
 
    private PreparedStatement pst;
 
    private int notCommitNumber = 0;
 
    private int commitNumber = 0;
 
    private int errorNumber = 0;
 
    public void executeProcess(CallBack<Map<String, Object>> callBack) throws BaseException {
        try {
            if (this.isClearTableData) {
                PreparedStatement pst = this.systemConnection.prepareStatement("truncate table " + this.targetTable);
                pst.execute();
                pst.close();
                this.outPutMsg("执行清空表成功:" + this.targetTable);
            }
            execute(callBack);
        } catch (BaseException e) {
            e.printStackTrace();
            this.outPutError("执行同步数据后处理错误", e);
            throw e;
        } catch (Exception e) {
            e.printStackTrace();
            this.outPutError("执行同步数据后处理错误", e);
            throw new BaseException(e);
        } finally {
            //释放资源
            close();
            //将对象都置null 因为处理完成后外部调用方法可能会递归处理其他造成资源浪费
            this.valueFields = null;
            this.queryDataService = null;
            this.pst = null;
            this.defaultValue = null;
            this.queryFields = null;
            this.insertTemplateSql = null;
            this.queryTemplateSql = null;
        }
    }
 
    private void execute(CallBack<Map<String, Object>> callBack) throws BaseException, SQLException {
 
        //查询数据
        QueryResultEntity result = this.queryNextPageData();
        if (this.currentPage - 1 == 1) {
            //查询的第一页初始化预编译对象
            this.pst = this.systemConnection.prepareStatement(this.insertTemplateSql);
        }
        if (result != null && result.getSize() > 0) {
            //查询到数据
            for (int i = 0; i < result.getResult().size(); i++) {
                Map<String, Object> valuesMap = result.getResult().get(i);
                if (this.defaultValue.size() > 0) {
                    this.defaultValue.forEach((k, v) -> {
 
                        if (v instanceof CallBackValue) {
                            v = ((CallBackValue) v).method(k);
                        }
                        valuesMap.put(k, v);
                    });
                }
                if (callBack != null) {
                    callBack.method(valuesMap);
                }
                for (int j = 0; j < this.valueFields.size(); j++) {
                    this.pst.setObject(j + 1, valuesMap.get(this.valueFields.get(j)));
                }
                try {
                    this.pst.addBatch();
                    notCommitNumber++;
                } catch (Exception e) {
                    this.outPutError("批处理增加失败", e);
                    errorNumber++;
                }
                if (notCommitNumber > 0 && notCommitNumber % this.batchSize == 0) {
                    commit();
                }
            }
        }
        //this.currentPage - 1 是因为在调用查询下一页数据方法结束后 +1
        if (this.totalPage != -1 && this.currentPage - 1 < this.totalPage) {
            //有总页数 且当前页数小于总页数
            //进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源
            if (result != null) {
                result.clear();
            }
            this.execute(callBack);
        } else if (result != null && result.getSize() == this.pageSize) {
            //没有指定总页数,查询到数据条数=每页查询条数 可能还有下一页
            //进入递归 清空当前查询的结果集 否则页数过多后占用过多的资源
            if (result != null) {
                result.clear();
            }
            this.execute(callBack);
        } else {
            // 结束查询执行方法完成
            commit();
            this.outPutMsg("同步任务后处理方法执行完成,processUuid:" + this.processUuid);
            this.outPutMsg("本次成功插入数据:" + this.commitNumber + "条");
        }
    }
 
    /**
     * 提交
     */
    private void commit() {
        if (this.notCommitNumber > 0) {
            try {
                this.pst.executeBatch();
                this.systemConnection.commit();
                this.pst.clearBatch();
            } catch (Exception e) {
                errorNumber += this.notCommitNumber;
                this.notCommitNumber = 0;
                this.outPutError("批处理提交失败", e);
            } finally {
                this.commitNumber += this.notCommitNumber;
                this.notCommitNumber = 0;
            }
        }
    }
 
    private void close() {
        try {
            if (this.pst != null && !this.pst.isClosed()) {
                pst.close();
            }
            if (this.systemConnection != null && !this.systemConnection.isClosed()) {
                this.systemConnection.close();
            }
        } catch (Exception e) {
 
        }
    }
 
 
    /**
     * 输出错误日志
     *
     * @param error
     * @param e
     */
    private void outPutError(String error, Exception e) {
        if (e != null) {
            String stackTraceAsString = ExceptionUtils.getStackTraceAsString(e);
            error += (!StringUtils.isEmpty(error) ? ":\n" : "") + stackTraceAsString;
        }
 
        this.outPutError(error);
    }
 
    private void outPutError(String error) {
        SpringMVCContextHolder.getSystemLogger().error(error);
    }
 
    private void outPutMsg(String msg) {
        SpringMVCContextHolder.getSystemLogger().info(msg);
    }
 
 
}