package com.product.data.service; import com.product.common.lang.StringUtils; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.config.CmnConst; import com.product.data.config.ErrorCode; import com.product.data.entity.DatabaseEntity; import com.product.data.entity.SyncExecuteRecordEntity; import com.product.data.entity.SyncFieldConfigEntity; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Author cheng * @Date 2022/2/9 20:10 * @Desc 数据同步Service */ @Service("syncDataManager") public class SyncDataManagerService extends AbstractBaseService { @Autowired SyncDataProcessService syncDataProcessService; /** * 执行同步数据任务 * * @param task_uuid 同步任务配置uuid * @throws BaseException */ public void executeDataSyncTask(String task_uuid) throws BaseException { try { if (StringUtils.isEmpty(task_uuid)) { return; } //同步任务配置 FieldSetEntity fse = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER_SUB, task_uuid, true); if (fse == null) { throw new BaseException(ErrorCode.SYNC_DATA_EXECUTE_TASK_TARGET_NOT_EXIST); } //根据目标表做同步锁 同一个表只能有一个任务在执行 此锁也可以阻塞同一个任务同时在执行 synchronized (fse.getString("target_table").intern()) { //连接配置表uuid String master_uuid = fse.getString("master_uuid"); if (StringUtils.isEmpty(master_uuid)) { // 连接配置获取错误 throw new BaseException(ErrorCode.SYNC_DATA_CONNECTION_FIND_FAIL); } //连接配置 FieldSetEntity connectionConfig = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, master_uuid, false); //初始连接配置 DatabaseEntity dbe = new DatabaseEntity(connectionConfig); SyncFieldConfigEntity syncFieldConfigEntity = new SyncFieldConfigEntity(fse, getBaseDao().getPKField(fse.getString("target_table")), dbe.getDbType()); SyncExecuteService syncExecuteService = new SyncExecuteService(dbe, syncFieldConfigEntity, getPrevExecuteRecord(syncFieldConfigEntity.getTaskUuid(), null), 100000, 5000); syncExecuteService.executeSync(); SyncExecuteRecordEntity executeRecord = syncExecuteService.getExecuteRecord(); String insertQueryFilter = executeRecord.getInsertQueryFilter(); String updateQueryFilter = executeRecord.getUpdateQueryFilter(); System.out.println(insertQueryFilter); System.out.println(updateQueryFilter); syncDataProcessService.executeSyncAfter(syncFieldConfigEntity.getTargetTable(), insertQueryFilter, updateQueryFilter); // syncExecuteService.historyDataUpdate(); } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); throw new BaseException(ErrorCode.EXECUTE_SYNC_DATA_TASK_FAIL); } } /** * 获取上一次的执行记录 * * @param syncTaskUuid * @return * @throws BaseException */ public SyncExecuteRecordEntity getPrevExecuteRecord(String syncTaskUuid, String excludedUuid) throws BaseException { if (StringUtils.isEmpty(syncTaskUuid)) { return null; } StringBuilder sql = new StringBuilder(); Object[] parmas = new Object[]{syncTaskUuid}; sql.append("SELECT * FROM product_sys_data_sync_manager_log where sync_task_uuid=? and "); if (!StringUtils.isEmpty(excludedUuid)) { sql.append(" (excludedUuid!=? ) and "); parmas = new Object[]{syncTaskUuid, excludedUuid}; } sql.append("(increase_number>0 or update_number>0) and created_utc_datetime is not null order by created_utc_datetime desc limit 1"); //查询上一次执行有增加数量或新增数量的数据 FieldSetEntity fse = getBaseDao().getFieldSetEntityBySQL(sql.toString(), parmas, false); if (fse == null || fse.getUUID() == null) { return null; } return new SyncExecuteRecordEntity(fse); } }