1821349743@qq.com
2023-02-20 93dd9bc3f16b0f626761ec624f2dc78037568897
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
package com.product.data.service;
 
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.product.common.lang.StringUtils;
import com.product.core.connection.DataSourceManager;
import com.product.core.entity.DataTableEntity;
import com.product.core.entity.FieldSetEntity;
import com.product.core.exception.BaseException;
import com.product.core.service.support.AbstractBaseService;
import com.product.core.spring.context.SpringMVCContextHolder;
import com.product.core.transfer.Transactional;
import com.product.data.config.CmnConst;
import com.product.data.config.ErrorCode;
import com.product.data.service.impl.ISyncDataProcessService;
import com.product.data.utli.OrgDataMap;
import com.product.module.sys.entity.SystemUser;
import com.product.tool.table.service.DataModelService;
import com.product.util.BaseUtil;
import com.product.util.CallBack;
import org.springframework.stereotype.Service;
 
import javax.annotation.Resource;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
 
/**
 * @Author cheng
 * @Date 2022/2/13 14:01
 * @Desc 同步数据后处理
 */
@Service
public class SyncDataProcessService extends AbstractBaseService implements ISyncDataProcessService {
 
    @Resource
    DataModelService dataModelService;
 
    /**
     * 查询数同步详情
     *
     * @param fse
     * @return
     */
    @Override
    public FieldSetEntity findDataProcessInfo(FieldSetEntity fse) throws BaseException {
        String uuid = fse.getUUID();
        if (StringUtils.isEmpty(uuid)) {
            return null;
        }
        fse = getBaseDao().getFieldSetEntity(fse.getTableName(), uuid, false);
        String tableType = fse.getString(CmnConst.TABLE_TYPE);
        if ("0".equals(tableType)) {
            // 新建表 查询表字段、表索引
            String processTable = getProcessTable(fse);
            fse.addSubDataTable(getBaseDao().listTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD,
                    "table_uuid  =(select uuid from product_sys_datamodel_table where table_name=?) order by field_id", new Object[]{processTable}));
            fse.addSubDataTable(getBaseDao().listTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX,
                    "table_uuid  =(select uuid from product_sys_datamodel_table where table_name=?) order by id", new Object[]{processTable}));
        }
        return fse;
    }
 
    /**
     * 保存数据处理配置详情
     *
     * @param fse
     * @throws BaseException
     */
    @Transactional
    @Override
    public void saveDataProcessInfo(FieldSetEntity fse) throws BaseException {
        SystemUser currentUser = SpringMVCContextHolder.getCurrentUser();
        if (currentUser == null || currentUser.getUserType() != 1) {
            return;
        }
        boolean truncate_table = fse.getBoolean("truncate_table");
        String createdTableStatement = null;
        final String processTable = getProcessTable(fse);
        FieldSetEntity tableInfo = getTableInfo(fse, truncate_table);
        //调用数据建模中的处理
        fse.getSubData().clear();
        getBaseDao().saveFieldSetEntity(fse);
        DruidDataSource druidDataSource = null;
        if (truncate_table) {
            //获取创建表语句
            FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL("show create table " + getProcessTable(fse), null, false);
            getBaseDao().delete(tableInfo.getTableName(), new Object[]{tableInfo.getString(CmnConst.TABLE_UUID)});
            tableInfo.remove(CmnConst.TABLE_UUID);
            createdTableStatement = fs.getString("Create Table");
            if (createdTableStatement == null || createdTableStatement.length() <= 0) {
                throw new BaseException(new NullPointerException());
            }
            //清空表
            druidDataSource = (DruidDataSource) DataSourceManager.getInstance().getDataSource("default");
            try (DruidPooledConnection connection = druidDataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("DROP TABLE IF EXISTS `" + processTable + "`")) {
                preparedStatement.execute();
            } catch (SQLException e) {
                throw new BaseException(e);
            }
        }
        try {
            dataModelService.dataModelOperation(tableInfo);
        } catch (Exception e) {
            if (truncate_table) {
                //之前删除了表 但是执行创建表过程中出错再恢复原表
                try (DruidPooledConnection connection = druidDataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("DROP TABLE IF EXISTS `" + processTable + "`")) {
                    preparedStatement.execute();
                    preparedStatement.execute(createdTableStatement);
                } catch (SQLException e1) {
//                    throw new BaseException("")
                }
            }
            throw e;
        }
    }
 
    private DataTableEntity getDefaultField() throws BaseException {
        DataTableEntity dt = new DataTableEntity();
        FieldSetEntity d = new FieldSetEntity();
        d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "id");
        d.setValue("field_show_name", "自增主键");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "pk");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 11);
        d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "自增主键");
        dt.addFieldSetEntity(d);
        d = new FieldSetEntity();
        d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "uuid");
        d.setValue("field_show_name", "唯一标识");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "string");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 80);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
        d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "唯一标识");
        dt.addFieldSetEntity(d);
 
        d = new FieldSetEntity();
        d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "dept_code");
        d.setValue("field_show_name", "部门编码");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "string");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 80);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
        d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 0);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "部门编码 管理 prodcut_sys_org_level 表 org_level_code 字段");
        dt.addFieldSetEntity(d);
        d = new FieldSetEntity();
        d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "organization_type");
        d.setValue("field_show_name", "机构类型");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "int");
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 4);
        d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 0);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
        d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "机构类型");
        dt.addFieldSetEntity(d);
        return dt;
    }
 
    @Override
    public String efficacySql(String runSql) throws BaseException {
        AtomicReference<String> pkFieldName = new AtomicReference<>("id");
        DataTableEntity defaultField = getDefaultField();
        List<String> defaultFieldList = defaultField.getData().stream().map((f) -> f.getString(CmnConst.FIELD_NAME).toLowerCase()).collect(Collectors.toList());
        Object[] objects = dataModelService.detectionSql(runSql, defaultField, f -> {
            if (f[0] != null && !StringUtils.isEmpty(f[0].getString(CmnConst.FIELD_NAME))) {
 
                f[0].setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 0);
                String fieldName = f[0].getString(CmnConst.FIELD_NAME);
                if (!"id".equalsIgnoreCase(fieldName) && defaultFieldList.contains(fieldName.toLowerCase())) {
                    // 不能包含默认值字段
                    throw new BaseException(ErrorCode.RESULT_COLUMN_EXISTS_SAME_FILED.getValue(), ErrorCode.RESULT_COLUMN_EXISTS_SAME_FILED.getText().replace("{{fieldName}}", fieldName));
                } else if (defaultFieldList.contains("id") && pkFieldName.get().equalsIgnoreCase(fieldName)) {
                    // 包含id 字段 将默认值id字段名称 修改
                    int i = "id".equals(pkFieldName.get()) ? 0 : Integer.valueOf(pkFieldName.get().split("_")[1]);
                    while ("id".equals(pkFieldName.get()) || pkFieldName.get().equalsIgnoreCase(fieldName)) {
                        pkFieldName.set(pkFieldName.get() + "_" + i);
                        i++;
                    }
                }
            }
        });
        DataTableEntity dt = (DataTableEntity) objects[0];
        Map<String, Object> other = (Map<String, Object>) objects[1];
        if (!"id".equals(pkFieldName.get())) {
            //主键字段名称改变
            dt.setFieldValue(0, CmnConst.FIELD_NAME, pkFieldName.get());
        }
        return BaseUtil.success(dt, other);
    }
 
    /**
     * 获取数据处理目标表表名
     *
     * @param fse
     * @return
     * @throws BaseException
     */
    private String getProcessTable(FieldSetEntity fse) throws BaseException {
        String tableType = fse.getString(CmnConst.TABLE_TYPE);
        String basic_table = fse.getString(CmnConst.BASIC_TABLE);
        //如果是新建表 拼接前缀
        if ("0".equals(tableType)) {
            return fse.getString("table_classify") + basic_table;
        }
        return basic_table;
    }
 
    /**
     * 获取原表信息
     *
     * @param fse
     * @return
     */
    private FieldSetEntity getTableInfo(FieldSetEntity fse, boolean insert) throws BaseException {
 
        String basic_table = getProcessTable(fse);
        String uuid = fse.getUUID();
        String descContent = fse.getString("desc_content");
        TreeMap<String, DataTableEntity> subData = (TreeMap<String, DataTableEntity>) ((TreeMap<String, DataTableEntity>) fse.getSubData()).clone();
        if (insert) {
            //新增
            fse = new FieldSetEntity();
            fse.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE);
            fse.setTableName(CmnConst.TABLE_MODULE_TABLE);
            // 表名
            fse.setValue(CmnConst.TABLE_NAME, basic_table);
            // 表类型
            fse.setValue(CmnConst.TABLE_TYPE, 1);
            fse.setValue("sequence", 1);
            fse.setValue(CmnConst.TABLE_DESCRIPTION, descContent);
            if (!StringUtils.isEmpty(uuid)) {
                FieldSetEntity temp = BaseUtil.getSingleInfoByCache("所有表信息", new String[]{basic_table});
                if (temp == null || StringUtils.isEmpty(temp.getUUID()) || !basic_table.equals(temp.getString(CmnConst.TABLE_NAME))) {
                    //表不存在
                    throw new BaseException(ErrorCode.SAVE_DATA_PROCESS_TABLE_NOT_EXISTS);
                }
                fse.setValue(CmnConst.TABLE_UUID, temp.getUUID());
                subData.forEach((k, v) -> {
                    for (int i = 0; i < v.getData().size(); i++) {
                        FieldSetEntity data = v.getData().get(i);
                        if (!StringUtils.isEmpty(data.getUUID())) {
                            v.removeFieldSetEntity(i);
                            i--;
                            continue;
                        }
                        data.setValue("uuid", null);
                        if (data.getSubData() != null) {
                            data.getSubData().clear();
                        }
 
                    }
                });
            }
        } else {
            fse = BaseUtil.getSingleInfoByCache("所有表信息", new String[]{basic_table});
            if (fse == null || StringUtils.isEmpty(fse.getUUID()) || !basic_table.equals(fse.getString(CmnConst.TABLE_NAME))) {
                //表不存在
                throw new BaseException(ErrorCode.SAVE_DATA_PROCESS_TABLE_NOT_EXISTS);
            }
            // 克隆避免影响缓存中的数据
            fse = fse.clones();
        }
        //将表字段、表索引添加到子表
        fse.setSubData(subData);
        return fse;
    }
 
    /**
     * 执行数据同步之后调用
     *
     * @param tableName   目标表表名
     * @param queryFilter 查询数据条件
     */
    public synchronized void executeSyncAfter(String tableName, String... queryFilter) {
        if (StringUtils.isEmpty(tableName)) {
            return;
        }
        DataTableEntity dt = getBaseDao().listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and concat(',',ifnull(execute_table,''),',') not like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')",
                new Object[]{tableName, tableName}, new Object[]{CmnConst.UUID, "execute_table"});
        if (!DataTableEntity.isEmpty(dt)) {
            for (int i = 0; i < dt.getRows(); i++) {
                // 修改为执行表名 2022年2月21日09:53:29
                String execute_table = dt.getString(i, "execute_table");
                if (!StringUtils.isEmpty(execute_table)) {
                    execute_table += ",";
                } else if (execute_table == null) {
                    execute_table = "";
                }
                execute_table += tableName;
                dt.setFieldValue(i, "execute_table", execute_table.trim());
                //todo 暂不支持数据处理增量查询因此将条件保存到表中无用
//                if (queryFilter != null && queryFilter.length > 0) {
//                    DataTableEntity subDataTable = new DataTableEntity();
//                    for (String filter : queryFilter) {
//                        if (StringUtils.isEmpty(filter)) continue;
//                        FieldSetEntity fse = new FieldSetEntity();
//                        fse.setTableName("product_sys_database_sync_processing_config_sub");
//                        fse.setValue("table_name", tableName);
//                        fse.setValue("filter", filter);
//                        subDataTable.addFieldSetEntity(fse);
//                    }
//                    if (!DataTableEntity.isEmpty(subDataTable)) {
//                        dt.getFieldSetEntity(i).addSubDataTable(subDataTable);
//                    }
//                }
//                getBaseDao().saveFieldSetEntity(dt.getFieldSetEntity(i));
            }
            //todo 如需保存子表先确认该方法是否能保存子表 ,写注释时还没有支持批量保存子表 2022年2月15日19:53:54 cheng
            getBaseDao().update(dt);
            examiningProcessTask(tableName);
        }
    }
 
 
    /**
     * 条件替换
     * 替换规则  str = "`field`>0 and `field1` > 6"  alias="a" 替换为 "a.`field`>0 and a.`field1` > 6"
     * 以mysql 字段分隔符  `fieldName` 替换
     *
     * @param str   条件字符串
     * @param alias 表别名
     * @return
     * @throws BaseException
     */
    public static String filterFieldTableAlias(String str, String alias) throws BaseException {
        Pattern p = Pattern.compile("(`)([\\w]+)(`)");
        Matcher m = p.matcher(str);
        StringBuffer sb = new StringBuffer();
        while (m.find()) {
            String group = m.group(0);
            m.appendReplacement(sb, " " + alias + "." + group);
        }
        m.appendTail(sb);
        return sb.toString();
    }
 
    /**
     * 检测处理任务是否运行
     * todo 只有有一个在检测或运行 避免gc,因进入方法后不能控制 后续考虑优化 cheng
     *
     * @param tableName
     */
    public void examiningProcessTask(String tableName) {
 
        // 查询出 tableName 包含在relevance_table 字段中的配置数据
        DataTableEntity dt = getBaseDao().listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and (parent_uuid is null  or parent_uuid ='')",
                new Object[]{tableName});
        if (DataTableEntity.isEmpty(dt)) {
            return;
        }
        for (int i = 0; i < dt.getRows(); i++) {
            String relevance_table = dt.getString(i, "relevance_table");
            String execute_table = dt.getString(i, "execute_table");
            if (StringUtils.isEmpty(relevance_table) || StringUtils.isEmpty(execute_table)) {
                dt.removeFieldSetEntity(i);
                i--;
                continue;
            } else {
                String[] relevanceTable = relevance_table.split(",");
                String[] executeTable = execute_table.split(",");
                Arrays.sort(relevanceTable);
                Arrays.sort(executeTable);
                //两个数组不相同代表已执行表和关联表不一致 不满足执行条件
                if (!Arrays.equals(relevanceTable, executeTable)) {
                    dt.removeFieldSetEntity(i);
                    i--;
                    continue;
                }
            }
        }
        //组织机构处理map picc独有
        OrgDataMap orgDataMap = new OrgDataMap(getBaseDao());
        //线程池线程数
        int threadCount = 5;
        if (dt.getRows() < 5) {
            //要处理任务的数量小于5个就使用任务数量作为线程池数量
            threadCount = dt.getRows();
        }
        //获取固定线程数量的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity fse = dt.getFieldSetEntity(i);
            //关联表名多个逗号分隔
            String relevance_table = fse.getString("relevance_table");
            //目标表(要插入数据的表)
            //获取真实的表名 cheng 2022年3月1日14:47:46
            String basic_table = getProcessTable(fse);
            if (StringUtils.isEmpty(relevance_table.trim())) {
                continue;
            }
            String bts[] = relevance_table.split(",");
            //对应别名
            Map<String, String> aliasMap = new HashMap<>();
            if (!StringUtils.isEmpty(fse.getString("table_alias"))) {
                String alias[] = fse.getString("table_alias").split(",");
                for (int k = 0; k < bts.length; k++) {
                    if (alias.length > k) {
                        if (alias[k] == null) {
                            continue;
                        }
                        aliasMap.put(bts[k], alias[k]);
                    }
                }
            }
            //todo 暂不支持增量处理数据
//            DataTableEntity sdt = getBaseDao().listTable("product_sys_database_sync_processing_config_sub", "main_uuid =? ", new String[]{fse.getUUID()});
            StringBuilder filter = new StringBuilder();
            Map<String, String> tableFilter = new HashMap<>();
            //todo 暂不支持增量处理数据
//            if (!DataTableEntity.isEmpty(sdt)) {
//                for (int j = 0; j < sdt.getRows(); j++) {
//
//                    String table_name = sdt.getString(j, "table_name");
//                    String table_filter = sdt.getString(j, "filter");
//                    if (StringUtils.isEmpty(table_name) || StringUtils.isEmpty(table_filter)) {
//                        continue;
//                    }
//                    if (filter.length() > 0) {
//                        filter.append(" OR ");
//                    }
//                    String alias = aliasMap.get(table_name);
//                    if (StringUtils.isEmpty(alias)) {
//                        alias = table_name;
//                    }
//                    String f = "";
//                    if (!StringUtils.isEmpty(tableFilter.get(table_name))) {
//                        f = tableFilter.get(table_name) + " or ";
//                    }
//                    f += " ( " + table_filter + " ) ";
//                    tableFilter.put(table_name, f);
//                    filter.append(" ( ").append(filterFieldTableAlias(table_filter, alias)).append(" ) ");
//                }
//            }
            //默认条件 1=1
            if (filter.length() == 0) {
                filter.append(" 1=1 ");
            }
            String templateSql = fse.getString("template_sql");
            if (StringUtils.isEmpty(templateSql)) {
                continue;
            }
            //替换条件
            templateSql = templateSql.replaceAll("\\{#sync_data_process_data_filter#}", filter.toString());
            //主表表名
            String mainTableName = null;
            //获取分页表达式位置 替换为分页 并获取要分页的主表表名
            int startIndex = templateSql.indexOf("[[");
            if (startIndex != -1 && templateSql.indexOf("]]") > startIndex) {
                mainTableName = templateSql.substring(startIndex + 2, templateSql.indexOf("]]") - "_limit".length());
                //将[[mainTableName_limit]] 替换为 分页变量
                templateSql = templateSql.replace("[[" + mainTableName + "_limit" + "]]", "limit {#start_size#},{#end_size#} ");
            } else {
                //没有找到表达式未知 在sql末尾加上分页变量
                templateSql += " LIMIT {#start_size#},{#end_size#}";
            }
            int totalNumber = -1;
            if (mainTableName != null) {
                totalNumber = getTableTotalNumber(mainTableName, tableFilter.get(mainTableName));
                if (totalNumber <= 0) {
                    continue;
                }
            }
            // 同步锁 同一个目标表不能同时处理
            synchronized (basic_table.intern()) {
                int finalTotalNumber = totalNumber;
                String finalTemplateSql = templateSql;
                int finalThreadCount = threadCount;
                //提交到线程池处理
                executorService.submit(() -> {
                    SyncDataAfterProcessExecuteService syncDataAfterProcessExecuteService =
                            //默认调用此方法就是清空basic_table表数据 如后期需要优化再修改
                            new SyncDataAfterProcessExecuteService(basic_table, finalTotalNumber, finalTemplateSql, fse.getUUID(), true);
                    //todo 添加部门编码默认值字段 FirstCallBack 中写入value,组织机构类型默认值 picc独有
                    syncDataAfterProcessExecuteService.addDefaultValue("dept_code", null);
                    syncDataAfterProcessExecuteService.addDefaultValue("organization_type", 4);
                    syncDataAfterProcessExecuteService.executeProcess(getFirstCallBack(orgDataMap));
                    //将当前数据处理执行次数归零
                    getBaseDao().executeUpdate("UPDATE product_sys_database_sync_processing_config SET execute_table=null where  uuid=?", new Object[]{fse.getUUID()});
                    getBaseDao().executeUpdate("DELETE FROM product_sys_database_sync_processing_config_sub where main_uuid=?", new Object[]{fse.getUUID()});
                    //继续处理parent_uuid = fse.getUUID() 的数据
                    // 100000/ finalThreadCount 考虑到多线程查询 所以做此限制 在线程中每次查询数量不能太多
                    this.lastProcessData(fse.getUUID(), 100000 / finalThreadCount);
                });
            }
 
        }
        //关闭线程池提交任务
        executorService.shutdown();
        long time = System.currentTimeMillis();
        //等待线程池任务执行完毕
        while (!executorService.isTerminated()) {
            //每隔30秒打印一次
            if (System.currentTimeMillis() - time > 30000) {
                SpringMVCContextHolder.getSystemLogger().info("数据同步处理中....");
                time = System.currentTimeMillis();
            }
        }
        SpringMVCContextHolder.getSystemLogger().info("数据同步处理完成");
    }
 
    /**
     * 数据处理下一任务(parent_uuid)
     *
     * @param parentUuid
     * @param pageSize
     */
    public void lastProcessData(String parentUuid, int pageSize) {
        DataTableEntity dt = getBaseDao().listTable("product_sys_database_sync_processing_config", "parent_uuid=?", new Object[]{parentUuid});
        for (int i = 0; i < dt.getRows(); i++) {
            FieldSetEntity ff = dt.getFieldSetEntity(i);
            if (ff != null && ff.getUUID() != null) {
                //替换条件
                String templateSql = ff.getString("template_sql");
                templateSql = templateSql.replaceAll("\\{#sync_data_process_data_filter#}", "1=1");
                //主表表名
                String mainTableName = null;
                //获取分页表达式位置 替换为分页 并获取要分页的主表表名
                int startIndex = templateSql.indexOf("[[");
                if (startIndex != -1 && templateSql.indexOf("]]") > startIndex) {
                    mainTableName = templateSql.substring(startIndex + 2, templateSql.indexOf("]]") - "_limit".length());
                    //将[[mainTableName_limit]] 替换为 分页变量
                    templateSql = templateSql.replace("[[" + mainTableName + "_limit" + "]]", "limit {#start_size#},{#end_size#} ");
                } else {
                    //没有找到表达式未知 在sql末尾加上分页变量
                    templateSql += " LIMIT {#start_size#},{#end_size#}";
                }
                int totalNumber = -1;
                if (mainTableName != null) {
                    totalNumber = getTableTotalNumber(mainTableName, "1=1");
                    if (totalNumber <= 0) {
                        continue;
                    }
                }
                SyncDataAfterProcessExecuteService syncDataAfterProcessExecuteService =
                        //默认调用此方法就是清空basic_table表数据 如后期需要优化再修改
                        new SyncDataAfterProcessExecuteService(getProcessTable(ff), totalNumber,
                                templateSql, ff.getUUID(), ff.getBoolean("delete_data"));
                syncDataAfterProcessExecuteService.setPageSize(pageSize);
                syncDataAfterProcessExecuteService.executeProcess(null);
                this.lastProcessData(ff.getUUID(), pageSize);
            }
        }
    }
 
    /**
     * 第一次处理关联表数据 回调方法 (picc独有 需要处理dept_code org_level_code)
     *
     * @param finalOrgDataMap
     * @return
     */
    private CallBack<Map<String, Object>> getFirstCallBack(OrgDataMap finalOrgDataMap) {
        CallBack<Map<String, Object>> callBack = (obj) -> {
            Map<String, Object> map = obj[0];
            String user_id = null;
            if (map.get("user_id") != null) {
                user_id = String.valueOf(map.get("user_id"));
            }
            if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) {
                String code = (String) map.get(CmnConst.ORG_LEVEL_CODE);
                if (finalOrgDataMap.isCompany(code)) {
                    if (user_id != null) {
                        String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
                        if (!StringUtils.isEmpty(deptCode)) {
                            map.put("dept_code", deptCode);
                        }
                    }
                } else if (finalOrgDataMap.isDept(code)) {
                    String companyCode = finalOrgDataMap.getCompanyCode(code);
                    if (StringUtils.isEmpty(companyCode)) {
                        companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
                    }
                    if (!StringUtils.isEmpty(companyCode)) {
                        map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
                        map.put("dept_code", code);
                    }
                }
            }
            if (!StringUtils.isEmpty(user_id) && (StringUtils.isEmpty(map.get("dept_code")) || StringUtils.isEmpty(map.get(CmnConst.ORG_LEVEL_CODE)))) {
                String companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
                String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
                if (map.containsKey(CmnConst.ORG_LEVEL_CODE) && !StringUtils.isEmpty(companyCode)) {
                    map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
                }
                if (map.containsKey("dept_code") && !StringUtils.isEmpty(deptCode)) {
                    map.put("dept_code", deptCode);
                }
            }
        };
        return callBack;
    }
 
    /**
     * 获取表数据总数
     *
     * @param tableName 查询表名
     * @param filter    查询条件
     * @return
     */
    private int getTableTotalNumber(String tableName, String filter) throws BaseException {
        FieldSetEntity fse = getBaseDao().getFieldSetEntityByFilter(tableName, new String[]{" ifnull(COUNT(1),0) as totalNumber "}, filter, new Object[]{}, false, null);
 
        if (fse != null && fse.getObject("totalNumber") != null) {
            return fse.getInteger("totalNumber").intValue();
        }
        return -1;
    }
 
}