package com.product.data.service;
|
|
import com.alibaba.druid.pool.DruidDataSource;
|
import com.alibaba.druid.pool.DruidPooledConnection;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.connection.DataSourceManager;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.core.transfer.Transactional;
|
import com.product.data.config.CmnConst;
|
import com.product.data.config.ErrorCode;
|
import com.product.data.service.impl.ISyncDataProcessService;
|
import com.product.data.utli.OrgDataMap;
|
import com.product.module.sys.entity.SystemUser;
|
import com.product.tool.table.service.DataModelService;
|
import com.product.util.BaseUtil;
|
import com.product.util.CallBack;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.Resource;
|
import java.sql.PreparedStatement;
|
import java.sql.SQLException;
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.regex.Matcher;
|
import java.util.regex.Pattern;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/2/13 14:01
|
* @Desc 同步数据后处理
|
*/
|
@Service
|
public class SyncDataProcessService extends AbstractBaseService implements ISyncDataProcessService {
|
|
@Resource
|
DataModelService dataModelService;
|
|
/**
|
* 查询数同步详情
|
*
|
* @param fse
|
* @return
|
*/
|
@Override
|
public FieldSetEntity findDataProcessInfo(FieldSetEntity fse) throws BaseException {
|
String uuid = fse.getUUID();
|
if (StringUtils.isEmpty(uuid)) {
|
return null;
|
}
|
fse = getBaseDao().getFieldSetEntity(fse.getTableName(), uuid, false);
|
String tableType = fse.getString(CmnConst.TABLE_TYPE);
|
if ("0".equals(tableType)) {
|
// 新建表 查询表字段、表索引
|
String processTable = getProcessTable(fse);
|
fse.addSubDataTable(getBaseDao().listTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD,
|
"table_uuid =(select uuid from product_sys_datamodel_table where table_name=?) order by field_id", new Object[]{processTable}));
|
fse.addSubDataTable(getBaseDao().listTable(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE_INDEX,
|
"table_uuid =(select uuid from product_sys_datamodel_table where table_name=?) order by id", new Object[]{processTable}));
|
}
|
return fse;
|
}
|
|
/**
|
* 保存数据处理配置详情
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
@Transactional
|
@Override
|
public void saveDataProcessInfo(FieldSetEntity fse) throws BaseException {
|
SystemUser currentUser = SpringMVCContextHolder.getCurrentUser();
|
if (currentUser == null || currentUser.getUserType() != 1) {
|
return;
|
}
|
boolean truncate_table = fse.getBoolean("truncate_table");
|
String createdTableStatement = null;
|
final String processTable = getProcessTable(fse);
|
FieldSetEntity tableInfo = getTableInfo(fse, truncate_table);
|
//调用数据建模中的处理
|
fse.getSubData().clear();
|
getBaseDao().saveFieldSetEntity(fse);
|
DruidDataSource druidDataSource = null;
|
if (truncate_table) {
|
//获取创建表语句
|
FieldSetEntity fs = getBaseDao().getFieldSetEntityBySQL("show create table " + getProcessTable(fse), null, false);
|
getBaseDao().delete(tableInfo.getTableName(), new Object[]{tableInfo.getString(CmnConst.TABLE_UUID)});
|
tableInfo.remove(CmnConst.TABLE_UUID);
|
createdTableStatement = fs.getString("Create Table");
|
if (createdTableStatement == null || createdTableStatement.length() <= 0) {
|
throw new BaseException(new NullPointerException());
|
}
|
//清空表
|
druidDataSource = (DruidDataSource) DataSourceManager.getInstance().getDataSource("default");
|
try (DruidPooledConnection connection = druidDataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("DROP TABLE IF EXISTS `" + processTable + "`")) {
|
preparedStatement.execute();
|
} catch (SQLException e) {
|
throw new BaseException(e);
|
}
|
}
|
try {
|
dataModelService.dataModelOperation(tableInfo);
|
} catch (Exception e) {
|
if (truncate_table) {
|
//之前删除了表 但是执行创建表过程中出错再恢复原表
|
try (DruidPooledConnection connection = druidDataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("DROP TABLE IF EXISTS `" + processTable + "`")) {
|
preparedStatement.execute();
|
preparedStatement.execute(createdTableStatement);
|
} catch (SQLException e1) {
|
// throw new BaseException("")
|
}
|
}
|
throw e;
|
}
|
}
|
|
private DataTableEntity getDefaultField() throws BaseException {
|
DataTableEntity dt = new DataTableEntity();
|
FieldSetEntity d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "id");
|
d.setValue("field_show_name", "自增主键");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "pk");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 11);
|
d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "自增主键");
|
dt.addFieldSetEntity(d);
|
d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "uuid");
|
d.setValue("field_show_name", "唯一标识");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "string");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 80);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 1);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "唯一标识");
|
dt.addFieldSetEntity(d);
|
|
d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "dept_code");
|
d.setValue("field_show_name", "部门编码");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "string");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 80);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "部门编码 管理 prodcut_sys_org_level 表 org_level_code 字段");
|
dt.addFieldSetEntity(d);
|
d = new FieldSetEntity();
|
d.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_FIELD);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_NAME, "organization_type");
|
d.setValue("field_show_name", "机构类型");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_TYPE, "int");
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_LENGTH, 4);
|
d.setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_UNIT, 0);
|
d.setValue(com.product.tool.table.config.CmnConst.FIELD_DESCRIPTION, "机构类型");
|
dt.addFieldSetEntity(d);
|
return dt;
|
}
|
|
@Override
|
public String efficacySql(String runSql) throws BaseException {
|
AtomicReference<String> pkFieldName = new AtomicReference<>("id");
|
DataTableEntity defaultField = getDefaultField();
|
List<String> defaultFieldList = defaultField.getData().stream().map((f) -> f.getString(CmnConst.FIELD_NAME).toLowerCase()).collect(Collectors.toList());
|
Object[] objects = dataModelService.detectionSql(runSql, defaultField, f -> {
|
if (f[0] != null && !StringUtils.isEmpty(f[0].getString(CmnConst.FIELD_NAME))) {
|
|
f[0].setValue(com.product.tool.table.config.CmnConst.IS_REQUIRED, 0);
|
String fieldName = f[0].getString(CmnConst.FIELD_NAME);
|
if (!"id".equalsIgnoreCase(fieldName) && defaultFieldList.contains(fieldName.toLowerCase())) {
|
// 不能包含默认值字段
|
throw new BaseException(ErrorCode.RESULT_COLUMN_EXISTS_SAME_FILED.getValue(), ErrorCode.RESULT_COLUMN_EXISTS_SAME_FILED.getText().replace("{{fieldName}}", fieldName));
|
} else if (defaultFieldList.contains("id") && pkFieldName.get().equalsIgnoreCase(fieldName)) {
|
// 包含id 字段 将默认值id字段名称 修改
|
int i = "id".equals(pkFieldName.get()) ? 0 : Integer.valueOf(pkFieldName.get().split("_")[1]);
|
while ("id".equals(pkFieldName.get()) || pkFieldName.get().equalsIgnoreCase(fieldName)) {
|
pkFieldName.set(pkFieldName.get() + "_" + i);
|
i++;
|
}
|
}
|
}
|
});
|
DataTableEntity dt = (DataTableEntity) objects[0];
|
Map<String, Object> other = (Map<String, Object>) objects[1];
|
if (!"id".equals(pkFieldName.get())) {
|
//主键字段名称改变
|
dt.setFieldValue(0, CmnConst.FIELD_NAME, pkFieldName.get());
|
}
|
return BaseUtil.success(dt, other);
|
}
|
|
/**
|
* 获取数据处理目标表表名
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
private String getProcessTable(FieldSetEntity fse) throws BaseException {
|
String tableType = fse.getString(CmnConst.TABLE_TYPE);
|
String basic_table = fse.getString(CmnConst.BASIC_TABLE);
|
//如果是新建表 拼接前缀
|
if ("0".equals(tableType)) {
|
return fse.getString("table_classify") + basic_table;
|
}
|
return basic_table;
|
}
|
|
/**
|
* 获取原表信息
|
*
|
* @param fse
|
* @return
|
*/
|
private FieldSetEntity getTableInfo(FieldSetEntity fse, boolean insert) throws BaseException {
|
|
String basic_table = getProcessTable(fse);
|
String uuid = fse.getUUID();
|
String descContent = fse.getString("desc_content");
|
TreeMap<String, DataTableEntity> subData = (TreeMap<String, DataTableEntity>) ((TreeMap<String, DataTableEntity>) fse.getSubData()).clone();
|
if (insert) {
|
//新增
|
fse = new FieldSetEntity();
|
fse.setTableName(com.product.tool.table.config.CmnConst.PRODUCT_SYS_DATAMODEL_TABLE);
|
fse.setTableName(CmnConst.TABLE_MODULE_TABLE);
|
// 表名
|
fse.setValue(CmnConst.TABLE_NAME, basic_table);
|
// 表类型
|
fse.setValue(CmnConst.TABLE_TYPE, 1);
|
fse.setValue("sequence", 1);
|
fse.setValue(CmnConst.TABLE_DESCRIPTION, descContent);
|
if (!StringUtils.isEmpty(uuid)) {
|
FieldSetEntity temp = BaseUtil.getSingleInfoByCache("所有表信息", new String[]{basic_table});
|
if (temp == null || StringUtils.isEmpty(temp.getUUID()) || !basic_table.equals(temp.getString(CmnConst.TABLE_NAME))) {
|
//表不存在
|
throw new BaseException(ErrorCode.SAVE_DATA_PROCESS_TABLE_NOT_EXISTS);
|
}
|
fse.setValue(CmnConst.TABLE_UUID, temp.getUUID());
|
subData.forEach((k, v) -> {
|
for (int i = 0; i < v.getData().size(); i++) {
|
FieldSetEntity data = v.getData().get(i);
|
if (!StringUtils.isEmpty(data.getUUID())) {
|
v.removeFieldSetEntity(i);
|
i--;
|
continue;
|
}
|
data.setValue("uuid", null);
|
if (data.getSubData() != null) {
|
data.getSubData().clear();
|
}
|
|
}
|
});
|
}
|
} else {
|
fse = BaseUtil.getSingleInfoByCache("所有表信息", new String[]{basic_table});
|
if (fse == null || StringUtils.isEmpty(fse.getUUID()) || !basic_table.equals(fse.getString(CmnConst.TABLE_NAME))) {
|
//表不存在
|
throw new BaseException(ErrorCode.SAVE_DATA_PROCESS_TABLE_NOT_EXISTS);
|
}
|
// 克隆避免影响缓存中的数据
|
fse = fse.clones();
|
}
|
//将表字段、表索引添加到子表
|
fse.setSubData(subData);
|
return fse;
|
}
|
|
/**
|
* 执行数据同步之后调用
|
*
|
* @param tableName 目标表表名
|
* @param queryFilter 查询数据条件
|
*/
|
public synchronized void executeSyncAfter(String tableName, String... queryFilter) {
|
if (StringUtils.isEmpty(tableName)) {
|
return;
|
}
|
DataTableEntity dt = getBaseDao().listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and concat(',',ifnull(execute_table,''),',') not like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')",
|
new Object[]{tableName, tableName}, new Object[]{CmnConst.UUID, "execute_table"});
|
if (!DataTableEntity.isEmpty(dt)) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
// 修改为执行表名 2022年2月21日09:53:29
|
String execute_table = dt.getString(i, "execute_table");
|
if (!StringUtils.isEmpty(execute_table)) {
|
execute_table += ",";
|
} else if (execute_table == null) {
|
execute_table = "";
|
}
|
execute_table += tableName;
|
dt.setFieldValue(i, "execute_table", execute_table.trim());
|
//todo 暂不支持数据处理增量查询因此将条件保存到表中无用
|
// if (queryFilter != null && queryFilter.length > 0) {
|
// DataTableEntity subDataTable = new DataTableEntity();
|
// for (String filter : queryFilter) {
|
// if (StringUtils.isEmpty(filter)) continue;
|
// FieldSetEntity fse = new FieldSetEntity();
|
// fse.setTableName("product_sys_database_sync_processing_config_sub");
|
// fse.setValue("table_name", tableName);
|
// fse.setValue("filter", filter);
|
// subDataTable.addFieldSetEntity(fse);
|
// }
|
// if (!DataTableEntity.isEmpty(subDataTable)) {
|
// dt.getFieldSetEntity(i).addSubDataTable(subDataTable);
|
// }
|
// }
|
// getBaseDao().saveFieldSetEntity(dt.getFieldSetEntity(i));
|
}
|
//todo 如需保存子表先确认该方法是否能保存子表 ,写注释时还没有支持批量保存子表 2022年2月15日19:53:54 cheng
|
getBaseDao().update(dt);
|
examiningProcessTask(tableName);
|
}
|
}
|
|
|
/**
|
* 条件替换
|
* 替换规则 str = "`field`>0 and `field1` > 6" alias="a" 替换为 "a.`field`>0 and a.`field1` > 6"
|
* 以mysql 字段分隔符 `fieldName` 替换
|
*
|
* @param str 条件字符串
|
* @param alias 表别名
|
* @return
|
* @throws BaseException
|
*/
|
public static String filterFieldTableAlias(String str, String alias) throws BaseException {
|
Pattern p = Pattern.compile("(`)([\\w]+)(`)");
|
Matcher m = p.matcher(str);
|
StringBuffer sb = new StringBuffer();
|
while (m.find()) {
|
String group = m.group(0);
|
m.appendReplacement(sb, " " + alias + "." + group);
|
}
|
m.appendTail(sb);
|
return sb.toString();
|
}
|
|
/**
|
* 检测处理任务是否运行
|
* todo 只有有一个在检测或运行 避免gc,因进入方法后不能控制 后续考虑优化 cheng
|
*
|
* @param tableName
|
*/
|
public void examiningProcessTask(String tableName) {
|
|
// 查询出 tableName 包含在relevance_table 字段中的配置数据
|
DataTableEntity dt = getBaseDao().listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')",
|
new Object[]{tableName});
|
if (DataTableEntity.isEmpty(dt)) {
|
return;
|
}
|
for (int i = 0; i < dt.getRows(); i++) {
|
String relevance_table = dt.getString(i, "relevance_table");
|
String execute_table = dt.getString(i, "execute_table");
|
if (StringUtils.isEmpty(relevance_table) || StringUtils.isEmpty(execute_table)) {
|
dt.removeFieldSetEntity(i);
|
i--;
|
continue;
|
} else {
|
String[] relevanceTable = relevance_table.split(",");
|
String[] executeTable = execute_table.split(",");
|
Arrays.sort(relevanceTable);
|
Arrays.sort(executeTable);
|
//两个数组不相同代表已执行表和关联表不一致 不满足执行条件
|
if (!Arrays.equals(relevanceTable, executeTable)) {
|
dt.removeFieldSetEntity(i);
|
i--;
|
continue;
|
}
|
}
|
}
|
//组织机构处理map picc独有
|
OrgDataMap orgDataMap = new OrgDataMap(getBaseDao());
|
//线程池线程数
|
int threadCount = 5;
|
if (dt.getRows() < 5) {
|
//要处理任务的数量小于5个就使用任务数量作为线程池数量
|
threadCount = dt.getRows();
|
}
|
//获取固定线程数量的线程池
|
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fse = dt.getFieldSetEntity(i);
|
//关联表名多个逗号分隔
|
String relevance_table = fse.getString("relevance_table");
|
//目标表(要插入数据的表)
|
//获取真实的表名 cheng 2022年3月1日14:47:46
|
String basic_table = getProcessTable(fse);
|
if (StringUtils.isEmpty(relevance_table.trim())) {
|
continue;
|
}
|
String bts[] = relevance_table.split(",");
|
//对应别名
|
Map<String, String> aliasMap = new HashMap<>();
|
if (!StringUtils.isEmpty(fse.getString("table_alias"))) {
|
String alias[] = fse.getString("table_alias").split(",");
|
for (int k = 0; k < bts.length; k++) {
|
if (alias.length > k) {
|
if (alias[k] == null) {
|
continue;
|
}
|
aliasMap.put(bts[k], alias[k]);
|
}
|
}
|
}
|
//todo 暂不支持增量处理数据
|
// DataTableEntity sdt = getBaseDao().listTable("product_sys_database_sync_processing_config_sub", "main_uuid =? ", new String[]{fse.getUUID()});
|
StringBuilder filter = new StringBuilder();
|
Map<String, String> tableFilter = new HashMap<>();
|
//todo 暂不支持增量处理数据
|
// if (!DataTableEntity.isEmpty(sdt)) {
|
// for (int j = 0; j < sdt.getRows(); j++) {
|
//
|
// String table_name = sdt.getString(j, "table_name");
|
// String table_filter = sdt.getString(j, "filter");
|
// if (StringUtils.isEmpty(table_name) || StringUtils.isEmpty(table_filter)) {
|
// continue;
|
// }
|
// if (filter.length() > 0) {
|
// filter.append(" OR ");
|
// }
|
// String alias = aliasMap.get(table_name);
|
// if (StringUtils.isEmpty(alias)) {
|
// alias = table_name;
|
// }
|
// String f = "";
|
// if (!StringUtils.isEmpty(tableFilter.get(table_name))) {
|
// f = tableFilter.get(table_name) + " or ";
|
// }
|
// f += " ( " + table_filter + " ) ";
|
// tableFilter.put(table_name, f);
|
// filter.append(" ( ").append(filterFieldTableAlias(table_filter, alias)).append(" ) ");
|
// }
|
// }
|
//默认条件 1=1
|
if (filter.length() == 0) {
|
filter.append(" 1=1 ");
|
}
|
String templateSql = fse.getString("template_sql");
|
if (StringUtils.isEmpty(templateSql)) {
|
continue;
|
}
|
//替换条件
|
templateSql = templateSql.replaceAll("\\{#sync_data_process_data_filter#}", filter.toString());
|
//主表表名
|
String mainTableName = null;
|
//获取分页表达式位置 替换为分页 并获取要分页的主表表名
|
int startIndex = templateSql.indexOf("[[");
|
if (startIndex != -1 && templateSql.indexOf("]]") > startIndex) {
|
mainTableName = templateSql.substring(startIndex + 2, templateSql.indexOf("]]") - "_limit".length());
|
//将[[mainTableName_limit]] 替换为 分页变量
|
templateSql = templateSql.replace("[[" + mainTableName + "_limit" + "]]", "limit {#start_size#},{#end_size#} ");
|
} else {
|
//没有找到表达式未知 在sql末尾加上分页变量
|
templateSql += " LIMIT {#start_size#},{#end_size#}";
|
}
|
int totalNumber = -1;
|
if (mainTableName != null) {
|
totalNumber = getTableTotalNumber(mainTableName, tableFilter.get(mainTableName));
|
if (totalNumber <= 0) {
|
continue;
|
}
|
}
|
// 同步锁 同一个目标表不能同时处理
|
synchronized (basic_table.intern()) {
|
int finalTotalNumber = totalNumber;
|
String finalTemplateSql = templateSql;
|
int finalThreadCount = threadCount;
|
//提交到线程池处理
|
executorService.submit(() -> {
|
SyncDataAfterProcessExecuteService syncDataAfterProcessExecuteService =
|
//默认调用此方法就是清空basic_table表数据 如后期需要优化再修改
|
new SyncDataAfterProcessExecuteService(basic_table, finalTotalNumber, finalTemplateSql, fse.getUUID(), true);
|
//todo 添加部门编码默认值字段 FirstCallBack 中写入value,组织机构类型默认值 picc独有
|
syncDataAfterProcessExecuteService.addDefaultValue("dept_code", null);
|
syncDataAfterProcessExecuteService.addDefaultValue("organization_type", 4);
|
syncDataAfterProcessExecuteService.executeProcess(getFirstCallBack(orgDataMap));
|
//将当前数据处理执行次数归零
|
getBaseDao().executeUpdate("UPDATE product_sys_database_sync_processing_config SET execute_table=null where uuid=?", new Object[]{fse.getUUID()});
|
getBaseDao().executeUpdate("DELETE FROM product_sys_database_sync_processing_config_sub where main_uuid=?", new Object[]{fse.getUUID()});
|
//继续处理parent_uuid = fse.getUUID() 的数据
|
// 100000/ finalThreadCount 考虑到多线程查询 所以做此限制 在线程中每次查询数量不能太多
|
this.lastProcessData(fse.getUUID(), 100000 / finalThreadCount);
|
});
|
}
|
|
}
|
//关闭线程池提交任务
|
executorService.shutdown();
|
long time = System.currentTimeMillis();
|
//等待线程池任务执行完毕
|
while (!executorService.isTerminated()) {
|
//每隔30秒打印一次
|
if (System.currentTimeMillis() - time > 30000) {
|
SpringMVCContextHolder.getSystemLogger().info("数据同步处理中....");
|
time = System.currentTimeMillis();
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().info("数据同步处理完成");
|
}
|
|
/**
|
* 数据处理下一任务(parent_uuid)
|
*
|
* @param parentUuid
|
* @param pageSize
|
*/
|
public void lastProcessData(String parentUuid, int pageSize) {
|
DataTableEntity dt = getBaseDao().listTable("product_sys_database_sync_processing_config", "parent_uuid=?", new Object[]{parentUuid});
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity ff = dt.getFieldSetEntity(i);
|
if (ff != null && ff.getUUID() != null) {
|
//替换条件
|
String templateSql = ff.getString("template_sql");
|
templateSql = templateSql.replaceAll("\\{#sync_data_process_data_filter#}", "1=1");
|
//主表表名
|
String mainTableName = null;
|
//获取分页表达式位置 替换为分页 并获取要分页的主表表名
|
int startIndex = templateSql.indexOf("[[");
|
if (startIndex != -1 && templateSql.indexOf("]]") > startIndex) {
|
mainTableName = templateSql.substring(startIndex + 2, templateSql.indexOf("]]") - "_limit".length());
|
//将[[mainTableName_limit]] 替换为 分页变量
|
templateSql = templateSql.replace("[[" + mainTableName + "_limit" + "]]", "limit {#start_size#},{#end_size#} ");
|
} else {
|
//没有找到表达式未知 在sql末尾加上分页变量
|
templateSql += " LIMIT {#start_size#},{#end_size#}";
|
}
|
int totalNumber = -1;
|
if (mainTableName != null) {
|
totalNumber = getTableTotalNumber(mainTableName, "1=1");
|
if (totalNumber <= 0) {
|
continue;
|
}
|
}
|
SyncDataAfterProcessExecuteService syncDataAfterProcessExecuteService =
|
//默认调用此方法就是清空basic_table表数据 如后期需要优化再修改
|
new SyncDataAfterProcessExecuteService(getProcessTable(ff), totalNumber,
|
templateSql, ff.getUUID(), ff.getBoolean("delete_data"));
|
syncDataAfterProcessExecuteService.setPageSize(pageSize);
|
syncDataAfterProcessExecuteService.executeProcess(null);
|
this.lastProcessData(ff.getUUID(), pageSize);
|
}
|
}
|
}
|
|
/**
|
* 第一次处理关联表数据 回调方法 (picc独有 需要处理dept_code org_level_code)
|
*
|
* @param finalOrgDataMap
|
* @return
|
*/
|
private CallBack<Map<String, Object>> getFirstCallBack(OrgDataMap finalOrgDataMap) {
|
CallBack<Map<String, Object>> callBack = (obj) -> {
|
Map<String, Object> map = obj[0];
|
String user_id = null;
|
if (map.get("user_id") != null) {
|
user_id = String.valueOf(map.get("user_id"));
|
}
|
if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) {
|
String code = (String) map.get(CmnConst.ORG_LEVEL_CODE);
|
if (finalOrgDataMap.isCompany(code)) {
|
if (user_id != null) {
|
String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
|
if (!StringUtils.isEmpty(deptCode)) {
|
map.put("dept_code", deptCode);
|
}
|
}
|
} else if (finalOrgDataMap.isDept(code)) {
|
String companyCode = finalOrgDataMap.getCompanyCode(code);
|
if (StringUtils.isEmpty(companyCode)) {
|
companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
|
}
|
if (!StringUtils.isEmpty(companyCode)) {
|
map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
|
map.put("dept_code", code);
|
}
|
}
|
}
|
if (!StringUtils.isEmpty(user_id) && (StringUtils.isEmpty(map.get("dept_code")) || StringUtils.isEmpty(map.get(CmnConst.ORG_LEVEL_CODE)))) {
|
String companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
|
String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
|
if (map.containsKey(CmnConst.ORG_LEVEL_CODE) && !StringUtils.isEmpty(companyCode)) {
|
map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
|
}
|
if (map.containsKey("dept_code") && !StringUtils.isEmpty(deptCode)) {
|
map.put("dept_code", deptCode);
|
}
|
}
|
};
|
return callBack;
|
}
|
|
/**
|
* 获取表数据总数
|
*
|
* @param tableName 查询表名
|
* @param filter 查询条件
|
* @return
|
*/
|
private int getTableTotalNumber(String tableName, String filter) throws BaseException {
|
FieldSetEntity fse = getBaseDao().getFieldSetEntityByFilter(tableName, new String[]{" ifnull(COUNT(1),0) as totalNumber "}, filter, new Object[]{}, false, null);
|
|
if (fse != null && fse.getObject("totalNumber") != null) {
|
return fse.getInteger("totalNumber").intValue();
|
}
|
return -1;
|
}
|
|
}
|