1821349743@qq.com
2023-02-20 93dd9bc3f16b0f626761ec624f2dc78037568897
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.product.data.service;
 
import com.product.common.lang.StringUtils;
import com.product.core.entity.FieldSetEntity;
import com.product.core.exception.BaseException;
import com.product.core.service.support.AbstractBaseService;
import com.product.core.spring.context.SpringMVCContextHolder;
import com.product.data.config.CmnConst;
import com.product.data.config.ErrorCode;
import com.product.data.entity.DatabaseEntity;
import com.product.data.entity.SyncExecuteRecordEntity;
import com.product.data.entity.SyncFieldConfigEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
/**
 * @Author cheng
 * @Date 2022/2/9 20:10
 * @Desc 数据同步Service
 */
@Service("syncDataManager")
public class SyncDataManagerService extends AbstractBaseService {
 
    @Autowired
    SyncDataProcessService syncDataProcessService;
 
    /**
     * 执行同步数据任务
     *
     * @param task_uuid 同步任务配置uuid
     * @throws BaseException
     */
    public void executeDataSyncTask(String task_uuid) throws BaseException {
        try {
            if (StringUtils.isEmpty(task_uuid)) {
                return;
            }
            //同步任务配置
            FieldSetEntity fse = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER_SUB, task_uuid, true);
            if (fse == null) {
                throw new BaseException(ErrorCode.SYNC_DATA_EXECUTE_TASK_TARGET_NOT_EXIST);
            }
            //根据目标表做同步锁 同一个表只能有一个任务在执行 此锁也可以阻塞同一个任务同时在执行
            synchronized (fse.getString("target_table").intern()) {
                //连接配置表uuid
                String master_uuid = fse.getString("master_uuid");
                if (StringUtils.isEmpty(master_uuid)) {
                    // 连接配置获取错误
                    throw new BaseException(ErrorCode.SYNC_DATA_CONNECTION_FIND_FAIL);
                }
                //连接配置
                FieldSetEntity connectionConfig = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, master_uuid, false);
                //初始连接配置
                DatabaseEntity dbe = new DatabaseEntity(connectionConfig);
                SyncFieldConfigEntity syncFieldConfigEntity = new SyncFieldConfigEntity(fse, getBaseDao().getPKField(fse.getString("target_table")), dbe.getDbType());
                SyncExecuteService syncExecuteService = new SyncExecuteService(dbe, syncFieldConfigEntity, getPrevExecuteRecord(syncFieldConfigEntity.getTaskUuid(), null), 100000, 5000);
                syncExecuteService.executeSync();
                SyncExecuteRecordEntity executeRecord = syncExecuteService.getExecuteRecord();
                String insertQueryFilter = executeRecord.getInsertQueryFilter();
                String updateQueryFilter = executeRecord.getUpdateQueryFilter();
                System.out.println(insertQueryFilter);
                System.out.println(updateQueryFilter);
                syncDataProcessService.executeSyncAfter(syncFieldConfigEntity.getTargetTable(), insertQueryFilter, updateQueryFilter);
//                syncExecuteService.historyDataUpdate();
            }
        } catch (Exception e) {
            e.printStackTrace();
            SpringMVCContextHolder.getSystemLogger().error(e);
            throw new BaseException(ErrorCode.EXECUTE_SYNC_DATA_TASK_FAIL);
        }
    }
 
 
    /**
     * 获取上一次的执行记录
     *
     * @param syncTaskUuid
     * @return
     * @throws BaseException
     */
    public SyncExecuteRecordEntity getPrevExecuteRecord(String syncTaskUuid, String excludedUuid) throws BaseException {
 
        if (StringUtils.isEmpty(syncTaskUuid)) {
            return null;
        }
        StringBuilder sql = new StringBuilder();
        Object[] parmas = new Object[]{syncTaskUuid};
        sql.append("SELECT * FROM product_sys_data_sync_manager_log where sync_task_uuid=? and ");
        if (!StringUtils.isEmpty(excludedUuid)) {
            sql.append(" (excludedUuid!=? ) and ");
            parmas = new Object[]{syncTaskUuid, excludedUuid};
        }
        sql.append("(increase_number>0 or update_number>0) and  created_utc_datetime is not null order by created_utc_datetime desc limit 1");
        //查询上一次执行有增加数量或新增数量的数据
        FieldSetEntity fse = getBaseDao().getFieldSetEntityBySQL(sql.toString(), parmas, false);
        if (fse == null || fse.getUUID() == null) {
            return null;
        }
        return new SyncExecuteRecordEntity(fse);
    }
 
 
}