package com.product.data.service;
|
|
import com.product.common.lang.StringUtils;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.data.config.CmnConst;
|
import com.product.data.config.ErrorCode;
|
import com.product.data.entity.DatabaseEntity;
|
import com.product.data.entity.SyncExecuteRecordEntity;
|
import com.product.data.entity.SyncFieldConfigEntity;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
/**
|
* @Author cheng
|
* @Date 2022/2/9 20:10
|
* @Desc 数据同步Service
|
*/
|
@Service("syncDataManager")
|
public class SyncDataManagerService extends AbstractBaseService {
|
|
@Autowired
|
SyncDataProcessService syncDataProcessService;
|
|
/**
|
* 执行同步数据任务
|
*
|
* @param task_uuid 同步任务配置uuid
|
* @throws BaseException
|
*/
|
public void executeDataSyncTask(String task_uuid) throws BaseException {
|
try {
|
if (StringUtils.isEmpty(task_uuid)) {
|
return;
|
}
|
//同步任务配置
|
FieldSetEntity fse = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER_SUB, task_uuid, true);
|
if (fse == null) {
|
throw new BaseException(ErrorCode.SYNC_DATA_EXECUTE_TASK_TARGET_NOT_EXIST);
|
}
|
//根据目标表做同步锁 同一个表只能有一个任务在执行 此锁也可以阻塞同一个任务同时在执行
|
synchronized (fse.getString("target_table").intern()) {
|
//连接配置表uuid
|
String master_uuid = fse.getString("master_uuid");
|
if (StringUtils.isEmpty(master_uuid)) {
|
// 连接配置获取错误
|
throw new BaseException(ErrorCode.SYNC_DATA_CONNECTION_FIND_FAIL);
|
}
|
//连接配置
|
FieldSetEntity connectionConfig = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, master_uuid, false);
|
//初始连接配置
|
DatabaseEntity dbe = new DatabaseEntity(connectionConfig);
|
SyncFieldConfigEntity syncFieldConfigEntity = new SyncFieldConfigEntity(fse, getBaseDao().getPKField(fse.getString("target_table")), dbe.getDbType());
|
SyncExecuteService syncExecuteService = new SyncExecuteService(dbe, syncFieldConfigEntity, getPrevExecuteRecord(syncFieldConfigEntity.getTaskUuid(), null), 100000, 5000);
|
syncExecuteService.executeSync();
|
SyncExecuteRecordEntity executeRecord = syncExecuteService.getExecuteRecord();
|
String insertQueryFilter = executeRecord.getInsertQueryFilter();
|
String updateQueryFilter = executeRecord.getUpdateQueryFilter();
|
System.out.println(insertQueryFilter);
|
System.out.println(updateQueryFilter);
|
syncDataProcessService.executeSyncAfter(syncFieldConfigEntity.getTargetTable(), insertQueryFilter, updateQueryFilter);
|
// syncExecuteService.historyDataUpdate();
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw new BaseException(ErrorCode.EXECUTE_SYNC_DATA_TASK_FAIL);
|
}
|
}
|
|
|
/**
|
* 获取上一次的执行记录
|
*
|
* @param syncTaskUuid
|
* @return
|
* @throws BaseException
|
*/
|
public SyncExecuteRecordEntity getPrevExecuteRecord(String syncTaskUuid, String excludedUuid) throws BaseException {
|
|
if (StringUtils.isEmpty(syncTaskUuid)) {
|
return null;
|
}
|
StringBuilder sql = new StringBuilder();
|
Object[] parmas = new Object[]{syncTaskUuid};
|
sql.append("SELECT * FROM product_sys_data_sync_manager_log where sync_task_uuid=? and ");
|
if (!StringUtils.isEmpty(excludedUuid)) {
|
sql.append(" (excludedUuid!=? ) and ");
|
parmas = new Object[]{syncTaskUuid, excludedUuid};
|
}
|
sql.append("(increase_number>0 or update_number>0) and created_utc_datetime is not null order by created_utc_datetime desc limit 1");
|
//查询上一次执行有增加数量或新增数量的数据
|
FieldSetEntity fse = getBaseDao().getFieldSetEntityBySQL(sql.toString(), parmas, false);
|
if (fse == null || fse.getUUID() == null) {
|
return null;
|
}
|
return new SyncExecuteRecordEntity(fse);
|
}
|
|
|
}
|