package com.product.data.sync.service;
|
|
import com.google.common.base.Joiner;
|
import com.product.admin.service.OrganizationCacheService;
|
import com.product.core.cache.DataPoolCacheImpl;
|
import com.product.core.dao.BaseDao;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.core.transfer.Transactional;
|
import com.product.data.sync.config.CmnConst;
|
import com.product.data.sync.service.ide.IViewDataProcessService;
|
import com.product.data.sync.util.BatchData;
|
import com.product.util.SystemParamReplace;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import java.util.*;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* 把视图中的数据同步到实体表中
|
* 外部数据同步定时任务执行完成后调用此类的方法,参见
|
*
|
* @author Administrator
|
*/
|
@Service
|
public class ViewDataProcessService implements IViewDataProcessService {
|
@Autowired
|
public BaseDao baseDao;
|
|
|
/**
|
* 1、从远程数据同步到本地库
|
* 2、查找数据处理表product_sys_database_sync_processing_config中关联表字段含有同步表的且父uuid字段等空的处理配置信息
|
* ,此步为把相关表进行关联查询,处理部门、公司、人员信息,此步为增量处理
|
* 3、再用父uuid字段等空这条记录去找下一步处理语,依次是生成人员信息、部门信息、公司信息。此步为全量处理
|
* product_sys_database_sync_processing_config中的所有配置处理方式都是从view sql中查询出来插入到实体表中
|
* 视图sql查询出来的字段与实体表中的字段名必须一致
|
*
|
* @param aimTable
|
*/
|
@Override
|
@Transactional
|
public void copyViewDataToTable(String aimTable) {
|
OrgDataMap orgDataMap = null;
|
if (StringUtils.isEmpty(aimTable.trim())) {
|
return;
|
}
|
ExecutorService executorService = null;
|
DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')", new Object[]{aimTable});
|
if (!dt.isEmpty()) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fs = dt.getFieldSetEntity(i);
|
String basic_table = fs.getString("relevance_table");
|
if (!StringUtils.isEmpty(basic_table.trim())) {
|
String bts[] = basic_table.split(",");
|
//对应别名
|
Map<String, String> aliash = new HashMap<String, String>();
|
if (!StringUtils.isEmpty(fs.getString("table_alias"))) {
|
String alias[] = fs.getString("table_alias").split(",");
|
for (int k = 0; k < bts.length; k++) {
|
if (alias.length > k) {
|
if (alias[k] == null) {
|
continue;
|
}
|
aliash.put(bts[k], alias[k]);
|
}
|
}
|
}
|
Integer num = fs.getInteger("execute_count");
|
if (bts.length == num.intValue() && !StringUtils.isEmpty(fs.getString("template_sql"))) {//相等说明三个表都更新完
|
DataTableEntity sdt = baseDao.listTable("product_sys_database_sync_processing_config_sub", "main_uuid =? ", new String[]{fs.getUUID()});
|
FieldSetEntity table_filter = new FieldSetEntity();
|
table_filter.setTableName("temp");
|
StringBuilder b = new StringBuilder();
|
for (int j = 0; j < sdt.getRows(); j++) {
|
FieldSetEntity fss = sdt.getFieldSetEntity(j);
|
String pkf = baseDao.getPKField(fss.getString("table_name"));//获取表的pk字段
|
if (pkf != null) {
|
if (!StringUtils.isEmpty(aliash.get(fss.getString("table_name")))) {
|
pkf = aliash.get(fss.getString("table_name")) + "." + pkf;
|
}
|
if (!StringUtils.isEmpty(fss.getString("update_ids"))) {//有修改了数据
|
if (b.length() > 0) {
|
b.append(" or ");
|
}
|
b.append(pkf).append(" in (").append(fss.getString("update_ids")).append(")");
|
}
|
if (!StringUtils.isEmpty(fss.getString("add_scope_id"))) {//有新增数据
|
if (b.length() > 0) {
|
b.append(" or ");
|
}
|
String scope[] = fss.getString("add_scope_id").split("~");
|
b.append(pkf).append(" between ").append(scope[0]).append(" and ").append(scope[1]);
|
}
|
} else {
|
//没有主键字段,是否抛错
|
SpringMVCContextHolder.getSystemLogger().error("【" + fss.getString("table_name") + "】表没有配置PK字段。");
|
}
|
}
|
//生成一个总的条件表达式,如果没有,则continue;
|
if (b.length() > 0) {
|
table_filter.setValue("sync_data_process_data_filter", b.toString());
|
} else {
|
// continue;
|
table_filter.setValue("sync_data_process_data_filter", "1=1");
|
}
|
//开始替换View语句中的条件表达式
|
String sql = SystemParamReplace.formParamsReplace(fs.getString("template_sql"), table_filter);
|
if (executorService == null) {
|
executorService = Executors.newFixedThreadPool(2);
|
}
|
if (orgDataMap == null) {
|
orgDataMap = new OrgDataMap();
|
}
|
OrgDataMap finalOrgDataMap = orgDataMap;
|
executorService.execute(() -> {
|
BatchData batchData = new BatchData(sql, fs.getString("basic_table"), 50000, 2000);
|
batchData.getDefaultValueMap().put("organization_type", 4);
|
batchData.getDefaultValueMap().put("dept_code", null);
|
|
fs.setValue("execute_count", 0);
|
baseDao.update(fs);
|
//开始处理数据生成人员,从同步表到报表基础表
|
batchData.batchImprovedCoverage(fs.getBoolean("delete_data"), (obj) -> {
|
Map<String, Object> map = (Map<String, Object>) obj[0];
|
String user_id = null;
|
if (map.get("user_id") != null) {
|
user_id = String.valueOf(map.get("user_id"));
|
}
|
if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) {
|
String code = (String) map.get(CmnConst.ORG_LEVEL_CODE);
|
if (finalOrgDataMap.isCompany(code)) {
|
if (user_id != null) {
|
String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
|
if (!StringUtils.isEmpty(deptCode)) {
|
map.put("dept_code", deptCode);
|
}
|
}
|
} else if (finalOrgDataMap.isDept(code)) {
|
String companyCode = finalOrgDataMap.getCompanyCode(code);
|
if (StringUtils.isEmpty(companyCode)) {
|
companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
|
}
|
if (!StringUtils.isEmpty(companyCode)) {
|
map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
|
map.put("dept_code", code);
|
}
|
}
|
}
|
});
|
batchData.closeConnection();
|
//处理数据生成人员数据,从同步表到报表基础表,再生成部门、公司数据,报表基础表内容处理
|
lastProcessData(fs.getUUID());
|
baseDao.delete("product_sys_database_sync_processing_config_sub", "main_uuid=?", new Object[]{fs.getUUID()});
|
});
|
|
}
|
}
|
}
|
if (executorService != null) {
|
executorService.shutdown();
|
try {
|
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
SpringMVCContextHolder.getSystemLogger().error("同步处理数据完成!!!");
|
}
|
}
|
|
|
public class OrgDataMap {
|
|
private Map<String, FieldSetEntity> companyMap = new HashMap<>();
|
|
private Map<String, FieldSetEntity> deptMap = new HashMap<>();
|
|
private Map<String, String> staffDeptMap = new HashMap<>();
|
|
private Map<String, String> staffCompanyMap = new HashMap<>();
|
|
public OrgDataMap() {
|
//初始化组织机构表数据
|
DataPoolCacheImpl dataPoolCache = DataPoolCacheImpl.getInstance();
|
// DataTableEntity companyDt = dataPoolCache.getCacheData("所有公司信息");
|
DataTableEntity companyDt = OrganizationCacheService.getOrgDataStatic("1");
|
readRecord(companyDt, this.companyMap);
|
// DataTableEntity deptDt = dataPoolCache.getCacheData("公司-部门");
|
DataTableEntity deptDt = OrganizationCacheService.getOrgDataStatic("2");
|
readRecord(deptDt, this.deptMap);
|
DataTableEntity dt = baseDao.listTable("SELECT user_id,l.org_level_code dept_code,l1.org_level_code company_code FROM `product_sys_staffs` s join product_sys_org_levels l on s.dept_uuid=l.uuid \n" +
|
"join product_sys_org_levels l1 on s.org_level_uuid=l1.uuid ", new Object[]{});
|
if (!DataTableEntity.isEmpty(dt)) {
|
DataTableEntity clones = dt.clones();
|
for (int i = 0; i < clones.getRows(); i++) {
|
this.staffDeptMap.put(clones.getString(i, "user_id"), clones.getString(i, "dept_code"));
|
this.staffCompanyMap.put(clones.getString(i, "user_id"), clones.getString(i, "company_code"));
|
}
|
}
|
}
|
|
/**
|
* 根据userid找部门code
|
*
|
* @param userId
|
* @return
|
*/
|
public String getDeptCodeByUser(String userId) {
|
return this.staffDeptMap.get(userId);
|
}
|
|
/**
|
* 根据userid找部门code
|
*
|
* @param userId
|
* @return
|
*/
|
public String getCompanyCodeByUser(String userId) {
|
return this.staffCompanyMap.get(userId);
|
}
|
|
/**
|
* 获取公司编码根据部门编码
|
*
|
* @param deptCode
|
* @return
|
*/
|
public String getCompanyCode(String deptCode) {
|
if (isExist(deptCode, this.deptMap)) {
|
FieldSetEntity fs = this.deptMap.get(deptCode);
|
String parentCode = fs.getString("org_level_code_parent");
|
String[] code = parentCode.split("-");
|
for (int i = 0; i < code.length; i++) {
|
parentCode = "";
|
for (int j = 0; j < code.length - i; j++) {
|
parentCode = (j > 0 ? parentCode + "-" : parentCode) + code[j];
|
}
|
if (isExist(parentCode, this.companyMap)) {
|
return this.companyMap.get(parentCode).getString(CmnConst.ORG_LEVEL_CODE);
|
}
|
}
|
}
|
return null;
|
}
|
|
|
/**
|
* 判断是否为部门
|
*
|
* @param code
|
* @return
|
*/
|
public boolean isDept(String code) {
|
return isExist(code, this.deptMap);
|
}
|
|
/**
|
* 判断是否为公司
|
*
|
* @param code
|
* @return
|
*/
|
public boolean isCompany(String code) {
|
return isExist(code, this.companyMap);
|
}
|
|
/**
|
* 是否存在
|
*
|
* @param code 键
|
* @param map 是否存在此map
|
* @return
|
*/
|
private boolean isExist(String code, Map<String, FieldSetEntity> map) {
|
return code != null ? map.get(code) != null : false;
|
}
|
|
|
}
|
|
|
private void readRecord(DataTableEntity dt, Map<String, FieldSetEntity> map) {
|
if (DataTableEntity.isEmpty(dt) || map == null) return;
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fs = dt.getFieldSetEntity(i);
|
String code = fs.getString(CmnConst.ORG_LEVEL_CODE);
|
map.put(code, fs.clones());
|
}
|
}
|
|
/**
|
* 二次处理数据根据已处理的基础表
|
*/
|
|
public void firstProcessData() {
|
DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", " (parent_uuid is null or parent_uuid ='')", new Object[]{});
|
OrgDataMap orgDataMap = new OrgDataMap();
|
FieldSetEntity table_filter = new FieldSetEntity();
|
table_filter.setTableName("temp");
|
table_filter.setValue("sync_data_process_data_filter", "1=1");
|
ExecutorService executorService = Executors.newFixedThreadPool(10);
|
for (int i = 0; i < dt.getRows(); i++) {
|
int finalI = i;
|
executorService.execute(() -> {
|
FieldSetEntity fs = dt.getFieldSetEntity(finalI);
|
String sql = SystemParamReplace.formParamsReplace(fs.getString("template_sql"), table_filter);
|
BatchData batchData = new BatchData(sql, fs.getString("basic_table"), 100000, 2000);
|
batchData.getDefaultValueMap().put("organization_type", 4);
|
batchData.getDefaultValueMap().put("dept_code", null);
|
OrgDataMap finalOrgDataMap = orgDataMap;
|
//开始处理数据生成人员,从同步表到报表基础表
|
batchData.batchImprovedCoverage(fs.getBoolean("delete_data"), (obj) -> {
|
Map<String, Object> map = (Map<String, Object>) obj[0];
|
String user_id = null;
|
if (map.get("user_id") != null) {
|
user_id = String.valueOf(map.get("user_id"));
|
}
|
if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) {
|
String code = (String) map.get(CmnConst.ORG_LEVEL_CODE);
|
if (finalOrgDataMap.isCompany(code)) {
|
if (user_id != null) {
|
String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
|
if (StringUtils.isEmpty(deptCode)) {
|
map.put("dept_code", deptCode);
|
}
|
}
|
} else if (finalOrgDataMap.isDept(code)) {
|
String companyCode = finalOrgDataMap.getCompanyCode(code);
|
if (StringUtils.isEmpty(companyCode)) {
|
companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
|
}
|
if (!StringUtils.isEmpty(companyCode)) {
|
map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
|
map.put("dept_code", code);
|
}
|
}
|
}
|
});
|
batchData.closeConnection();
|
//处理数据生成人员数据,从同步表到报表基础表,再生成部门、公司数据,报表基础表内容处理
|
lastProcessData(fs.getUUID());
|
});
|
|
}
|
executorService.shutdown();
|
while (!executorService.isTerminated()) {
|
|
}
|
SpringMVCContextHolder.getSystemLogger().info("结束任务firstProcessData!!!!!!!!!!!!");
|
}
|
|
/**
|
* 根据 product_sys_database_sync_processing_config.uuid 执行数据处理
|
*/
|
|
public void firstProcessData(String parent_uuid) {
|
FieldSetEntity fs = baseDao.getFieldSetByFilter("product_sys_database_sync_processing_config", "parent_uuid=?", new Object[]{parent_uuid}, false);
|
if (fs != null) {
|
firstProcessData(fs);
|
}
|
}
|
|
/**
|
* 根据 product_sys_database_sync_processing_config表信息 执行数据处理
|
*/
|
|
public void firstProcessData(FieldSetEntity fs) {
|
if (fs == null) {
|
return;
|
}
|
OrgDataMap orgDataMap = new OrgDataMap();
|
BatchData batchData = new BatchData(fs.getString("template_sql"), fs.getString("basic_table"), 100000, 2000);
|
batchData.getDefaultValueMap().put("organization_type", 4);
|
if (orgDataMap == null) {
|
orgDataMap = new OrgDataMap();
|
}
|
OrgDataMap finalOrgDataMap = orgDataMap;
|
batchData.getDefaultValueMap().put("dept_code", null);
|
batchData.batchImprovedCoverage(true, (obj) -> {
|
Map<String, Object> map = (Map<String, Object>) obj[0];
|
String user_id = null;
|
if (map.get("user_id") != null) {
|
user_id = String.valueOf(map.get("user_id"));
|
}
|
|
if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) {
|
String code = (String) map.get(CmnConst.ORG_LEVEL_CODE);
|
if (finalOrgDataMap.isCompany(code)) {
|
if (user_id != null) {
|
String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id);
|
if (StringUtils.isEmpty(deptCode)) {
|
map.put("dept_code", deptCode);
|
}
|
}
|
} else if (finalOrgDataMap.isDept(code)) {
|
|
String companyCode = finalOrgDataMap.getCompanyCode(code);
|
if (StringUtils.isEmpty(companyCode)) {
|
companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id);
|
}
|
if (!StringUtils.isEmpty(companyCode)) {
|
map.put(CmnConst.ORG_LEVEL_CODE, companyCode);
|
map.put("dept_code", code);
|
}
|
}
|
}
|
});
|
batchData.closeConnection();
|
lastProcessData(fs.getUUID());
|
baseDao.update(fs);
|
baseDao.delete("product_sys_database_sync_processing_config_sub", "main_uuid=?", new Object[]{fs.getUUID()});
|
}
|
|
public void lastProcessData(String parentUuid) {
|
DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", "parent_uuid=?", new Object[]{parentUuid});
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity ff = dt.getFieldSetEntity(i);
|
if (ff != null && ff.getUUID() != null) {
|
this.executeDataProcess(ff.getString("template_sql"), ff.getString("basic_table"), ff.getBoolean("delete_data"));
|
this.lastProcessData(ff.getUUID());
|
}
|
}
|
}
|
|
|
private void executeDataProcess(String sql, String basic_table, boolean deleteData) {
|
BatchData batchData = new BatchData(sql, basic_table, 50000, 2000);
|
batchData.batchImprovedCoverage(deleteData, null);
|
batchData.closeConnection();
|
}
|
|
/**
|
* 更新同步记录
|
*
|
* @param tableName 同步目标表
|
* @param add_ids 同步修改或新增的数据ids
|
* @param updateIds 更新数据的ids 逗号分隔
|
*/
|
@Override
|
@Transactional
|
public void updateSyncRecord(String tableName, Integer[] add_ids, String updateIds) throws BaseException {
|
DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')", new Object[]{tableName});
|
if (!DataTableEntity.isEmpty(dt)) {
|
for (int i = 0; i < dt.getRows(); i++) {
|
if (!StringUtils.isEmpty(updateIds) || (add_ids != null && add_ids.length > 0)) {
|
FieldSetEntity fs = new FieldSetEntity();
|
fs.setTableName("product_sys_database_sync_processing_config_sub");
|
fs.setValue(CmnConst.TABLE_NAME, tableName);
|
if (add_ids != null && add_ids.length > 0) {
|
List<String> idScope = summaryRanges(add_ids);
|
fs.setValue("add_scope_id", Joiner.on(",").join(idScope.toArray()));
|
}
|
fs.setValue("update_ids", updateIds);
|
|
DataTableEntity dataTableEntity = new DataTableEntity();
|
dataTableEntity.addFieldSetEntity(fs);
|
dt.getFieldSetEntity(i).addSubDataTable(dataTableEntity);
|
}
|
//执行次数+1
|
Integer execute_count = dt.getInt(i, "execute_count");
|
if (execute_count == null) {
|
execute_count = 1;
|
} else {
|
execute_count++;
|
}
|
dt.setFieldValue(i, "execute_count", execute_count);
|
baseDao.update(dt.getFieldSetEntity(i));
|
}
|
}
|
this.copyViewDataToTable(tableName);
|
}
|
|
/**
|
* 获取数组中数字的范围
|
*
|
* @param nums 数组
|
* @return
|
*/
|
public static List<String> summaryRanges(Integer[] nums) {
|
Arrays.sort(nums);
|
List<String> list = new ArrayList<>();
|
if (nums.length == 0) {
|
return list;
|
}
|
int begin = Integer.MIN_VALUE;
|
int end = Integer.MIN_VALUE;
|
for (int i = 0; i < nums.length; i++) {
|
if (i == 0) {
|
begin = nums[i];
|
} else if (nums[i - 1] < nums[i] - 1) {
|
list.add(begin + "~" + end);
|
begin = nums[i];
|
}
|
end = nums[i];
|
if (i == nums.length - 1) {
|
list.add(begin + "~" + end);
|
}
|
}
|
return list;
|
}
|
}
|