package com.product.data.sync.service; import com.baomidou.mybatisplus.toolkit.StringUtils; import com.google.common.base.Joiner; import com.product.admin.service.OrganizationCacheService; import com.product.core.cache.DataPoolCacheImpl; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.core.transfer.Transactional; import com.product.data.sync.config.CmnConst; import com.product.data.sync.service.ide.IViewDataProcessService; import com.product.data.sync.util.BatchData; import com.product.util.SystemParamReplace; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 把视图中的数据同步到实体表中 * 外部数据同步定时任务执行完成后调用此类的方法,参见 * * @author Administrator */ @Service public class ViewDataProcessService implements IViewDataProcessService { @Autowired public BaseDao baseDao; /** * 1、从远程数据同步到本地库 * 2、查找数据处理表product_sys_database_sync_processing_config中关联表字段含有同步表的且父uuid字段等空的处理配置信息 * ,此步为把相关表进行关联查询,处理部门、公司、人员信息,此步为增量处理 * 3、再用父uuid字段等空这条记录去找下一步处理语,依次是生成人员信息、部门信息、公司信息。此步为全量处理 * product_sys_database_sync_processing_config中的所有配置处理方式都是从view sql中查询出来插入到实体表中 * 视图sql查询出来的字段与实体表中的字段名必须一致 * * @param aimTable */ @Override @Transactional public void copyViewDataToTable(String aimTable) { OrgDataMap orgDataMap = null; if (StringUtils.isEmpty(aimTable.trim())) { return; } ExecutorService executorService = null; DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')", new Object[]{aimTable}); if (!dt.isEmpty()) { for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity fs = dt.getFieldSetEntity(i); String basic_table = fs.getString("relevance_table"); if (!StringUtils.isEmpty(basic_table.trim())) { String bts[] = basic_table.split(","); //对应别名 Map aliash = new HashMap(); if (!StringUtils.isEmpty(fs.getString("table_alias"))) { String alias[] = fs.getString("table_alias").split(","); for (int k = 0; k < bts.length; k++) { if (alias.length > k) { if (alias[k] == null) { continue; } aliash.put(bts[k], alias[k]); } } } Integer num = fs.getInteger("execute_count"); if (bts.length == num.intValue() && !StringUtils.isEmpty(fs.getString("template_sql"))) {//相等说明三个表都更新完 DataTableEntity sdt = baseDao.listTable("product_sys_database_sync_processing_config_sub", "main_uuid =? ", new String[]{fs.getUUID()}); FieldSetEntity table_filter = new FieldSetEntity(); table_filter.setTableName("temp"); StringBuilder b = new StringBuilder(); for (int j = 0; j < sdt.getRows(); j++) { FieldSetEntity fss = sdt.getFieldSetEntity(j); String pkf = baseDao.getPKField(fss.getString("table_name"));//获取表的pk字段 if (pkf != null) { if (!StringUtils.isEmpty(aliash.get(fss.getString("table_name")))) { pkf = aliash.get(fss.getString("table_name")) + "." + pkf; } if (!StringUtils.isEmpty(fss.getString("update_ids"))) {//有修改了数据 if (b.length() > 0) { b.append(" or "); } b.append(pkf).append(" in (").append(fss.getString("update_ids")).append(")"); } if (!StringUtils.isEmpty(fss.getString("add_scope_id"))) {//有新增数据 if (b.length() > 0) { b.append(" or "); } String scope[] = fss.getString("add_scope_id").split("~"); b.append(pkf).append(" between ").append(scope[0]).append(" and ").append(scope[1]); } } else { //没有主键字段,是否抛错 SpringMVCContextHolder.getSystemLogger().error("【" + fss.getString("table_name") + "】表没有配置PK字段。"); } } //生成一个总的条件表达式,如果没有,则continue; if (b.length() > 0) { table_filter.setValue("sync_data_process_data_filter", b.toString()); } else { // continue; table_filter.setValue("sync_data_process_data_filter", "1=1"); } //开始替换View语句中的条件表达式 String sql = SystemParamReplace.formParamsReplace(fs.getString("template_sql"), table_filter); if (executorService == null) { executorService = Executors.newFixedThreadPool(2); } if (orgDataMap == null) { orgDataMap = new OrgDataMap(); } OrgDataMap finalOrgDataMap = orgDataMap; executorService.execute(() -> { BatchData batchData = new BatchData(sql, fs.getString("basic_table"), 50000, 2000); batchData.getDefaultValueMap().put("organization_type", 4); batchData.getDefaultValueMap().put("dept_code", null); fs.setValue("execute_count", 0); baseDao.update(fs); //开始处理数据生成人员,从同步表到报表基础表 batchData.batchImprovedCoverage(fs.getBoolean("delete_data"), (obj) -> { Map map = (Map) obj[0]; String user_id = null; if (map.get("user_id") != null) { user_id = String.valueOf(map.get("user_id")); } if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) { String code = (String) map.get(CmnConst.ORG_LEVEL_CODE); if (finalOrgDataMap.isCompany(code)) { if (user_id != null) { String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id); if (!StringUtils.isEmpty(deptCode)) { map.put("dept_code", deptCode); } } } else if (finalOrgDataMap.isDept(code)) { String companyCode = finalOrgDataMap.getCompanyCode(code); if (StringUtils.isEmpty(companyCode)) { companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id); } if (!StringUtils.isEmpty(companyCode)) { map.put(CmnConst.ORG_LEVEL_CODE, companyCode); map.put("dept_code", code); } } } }); batchData.closeConnection(); //处理数据生成人员数据,从同步表到报表基础表,再生成部门、公司数据,报表基础表内容处理 lastProcessData(fs.getUUID()); baseDao.delete("product_sys_database_sync_processing_config_sub", "main_uuid=?", new Object[]{fs.getUUID()}); }); } } } if (executorService != null) { executorService.shutdown(); try { executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } SpringMVCContextHolder.getSystemLogger().error("同步处理数据完成!!!"); } } public class OrgDataMap { private Map companyMap = new HashMap<>(); private Map deptMap = new HashMap<>(); private Map staffDeptMap = new HashMap<>(); private Map staffCompanyMap = new HashMap<>(); public OrgDataMap() { //初始化组织机构表数据 DataPoolCacheImpl dataPoolCache = DataPoolCacheImpl.getInstance(); // DataTableEntity companyDt = dataPoolCache.getCacheData("所有公司信息"); DataTableEntity companyDt = OrganizationCacheService.getOrgData("1"); readRecord(companyDt, this.companyMap); // DataTableEntity deptDt = dataPoolCache.getCacheData("公司-部门"); DataTableEntity deptDt = OrganizationCacheService.getOrgData("2"); readRecord(deptDt, this.deptMap); DataTableEntity dt = baseDao.listTable("SELECT user_id,l.org_level_code dept_code,l1.org_level_code company_code FROM `product_sys_staffs` s join product_sys_org_levels l on s.dept_uuid=l.uuid \n" + "join product_sys_org_levels l1 on s.org_level_uuid=l1.uuid ", new Object[]{}); if (!DataTableEntity.isEmpty(dt)) { DataTableEntity clones = dt.clones(); for (int i = 0; i < clones.getRows(); i++) { this.staffDeptMap.put(clones.getString(i, "user_id"), clones.getString(i, "dept_code")); this.staffCompanyMap.put(clones.getString(i, "user_id"), clones.getString(i, "company_code")); } } } /** * 根据userid找部门code * * @param userId * @return */ public String getDeptCodeByUser(String userId) { return this.staffDeptMap.get(userId); } /** * 根据userid找部门code * * @param userId * @return */ public String getCompanyCodeByUser(String userId) { return this.staffCompanyMap.get(userId); } /** * 获取公司编码根据部门编码 * * @param deptCode * @return */ public String getCompanyCode(String deptCode) { if (isExist(deptCode, this.deptMap)) { FieldSetEntity fs = this.deptMap.get(deptCode); String parentCode = fs.getString("org_level_code_parent"); String[] code = parentCode.split("-"); for (int i = 0; i < code.length; i++) { parentCode = ""; for (int j = 0; j < code.length - i; j++) { parentCode = (j > 0 ? parentCode + "-" : parentCode) + code[j]; } if (isExist(parentCode, this.companyMap)) { return this.companyMap.get(parentCode).getString(CmnConst.ORG_LEVEL_CODE); } } } return null; } /** * 判断是否为部门 * * @param code * @return */ public boolean isDept(String code) { return isExist(code, this.deptMap); } /** * 判断是否为公司 * * @param code * @return */ public boolean isCompany(String code) { return isExist(code, this.companyMap); } /** * 是否存在 * * @param code 键 * @param map 是否存在此map * @return */ private boolean isExist(String code, Map map) { return code != null ? map.get(code) != null : false; } } private void readRecord(DataTableEntity dt, Map map) { if (DataTableEntity.isEmpty(dt) || map == null) return; for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity fs = dt.getFieldSetEntity(i); String code = fs.getString(CmnConst.ORG_LEVEL_CODE); map.put(code, fs.clones()); } } /** * 二次处理数据根据已处理的基础表 */ public void firstProcessData() { DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", " (parent_uuid is null or parent_uuid ='')", new Object[]{}); OrgDataMap orgDataMap = new OrgDataMap(); FieldSetEntity table_filter = new FieldSetEntity(); table_filter.setTableName("temp"); table_filter.setValue("sync_data_process_data_filter", "1=1"); ExecutorService executorService = Executors.newFixedThreadPool(10); for (int i = 0; i < dt.getRows(); i++) { int finalI = i; executorService.execute(() -> { FieldSetEntity fs = dt.getFieldSetEntity(finalI); String sql = SystemParamReplace.formParamsReplace(fs.getString("template_sql"), table_filter); BatchData batchData = new BatchData(sql, fs.getString("basic_table"), 100000, 2000); batchData.getDefaultValueMap().put("organization_type", 4); batchData.getDefaultValueMap().put("dept_code", null); OrgDataMap finalOrgDataMap = orgDataMap; //开始处理数据生成人员,从同步表到报表基础表 batchData.batchImprovedCoverage(fs.getBoolean("delete_data"), (obj) -> { Map map = (Map) obj[0]; String user_id = null; if (map.get("user_id") != null) { user_id = String.valueOf(map.get("user_id")); } if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) { String code = (String) map.get(CmnConst.ORG_LEVEL_CODE); if (finalOrgDataMap.isCompany(code)) { if (user_id != null) { String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id); if (StringUtils.isEmpty(deptCode)) { map.put("dept_code", deptCode); } } } else if (finalOrgDataMap.isDept(code)) { String companyCode = finalOrgDataMap.getCompanyCode(code); if (StringUtils.isEmpty(companyCode)) { companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id); } if (!StringUtils.isEmpty(companyCode)) { map.put(CmnConst.ORG_LEVEL_CODE, companyCode); map.put("dept_code", code); } } } }); batchData.closeConnection(); //处理数据生成人员数据,从同步表到报表基础表,再生成部门、公司数据,报表基础表内容处理 lastProcessData(fs.getUUID()); }); } executorService.shutdown(); while (!executorService.isTerminated()) { } SpringMVCContextHolder.getSystemLogger().info("结束任务firstProcessData!!!!!!!!!!!!"); } /** * 根据 product_sys_database_sync_processing_config.uuid 执行数据处理 */ public void firstProcessData(String parent_uuid) { FieldSetEntity fs = baseDao.getFieldSetByFilter("product_sys_database_sync_processing_config", "parent_uuid=?", new Object[]{parent_uuid}, false); if (fs != null) { firstProcessData(fs); } } /** * 根据 product_sys_database_sync_processing_config表信息 执行数据处理 */ public void firstProcessData(FieldSetEntity fs) { if (fs == null) { return; } OrgDataMap orgDataMap = new OrgDataMap(); BatchData batchData = new BatchData(fs.getString("template_sql"), fs.getString("basic_table"), 100000, 2000); batchData.getDefaultValueMap().put("organization_type", 4); if (orgDataMap == null) { orgDataMap = new OrgDataMap(); } OrgDataMap finalOrgDataMap = orgDataMap; batchData.getDefaultValueMap().put("dept_code", null); batchData.batchImprovedCoverage(true, (obj) -> { Map map = (Map) obj[0]; String user_id = null; if (map.get("user_id") != null) { user_id = String.valueOf(map.get("user_id")); } if (map.get(CmnConst.ORG_LEVEL_CODE) != null && map.containsKey(CmnConst.USER_ID) && map.containsKey("dept_code")) { String code = (String) map.get(CmnConst.ORG_LEVEL_CODE); if (finalOrgDataMap.isCompany(code)) { if (user_id != null) { String deptCode = finalOrgDataMap.getDeptCodeByUser(user_id); if (StringUtils.isEmpty(deptCode)) { map.put("dept_code", deptCode); } } } else if (finalOrgDataMap.isDept(code)) { String companyCode = finalOrgDataMap.getCompanyCode(code); if (StringUtils.isEmpty(companyCode)) { companyCode = finalOrgDataMap.getCompanyCodeByUser(user_id); } if (!StringUtils.isEmpty(companyCode)) { map.put(CmnConst.ORG_LEVEL_CODE, companyCode); map.put("dept_code", code); } } } }); batchData.closeConnection(); lastProcessData(fs.getUUID()); baseDao.update(fs); baseDao.delete("product_sys_database_sync_processing_config_sub", "main_uuid=?", new Object[]{fs.getUUID()}); } public void lastProcessData(String parentUuid) { DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", "parent_uuid=?", new Object[]{parentUuid}); for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity ff = dt.getFieldSetEntity(i); if (ff != null && ff.getUUID() != null) { this.executeDataProcess(ff.getString("template_sql"), ff.getString("basic_table"), ff.getBoolean("delete_data")); this.lastProcessData(ff.getUUID()); } } } private void executeDataProcess(String sql, String basic_table, boolean deleteData) { BatchData batchData = new BatchData(sql, basic_table, 50000, 2000); batchData.batchImprovedCoverage(deleteData, null); batchData.closeConnection(); } /** * 更新同步记录 * * @param tableName 同步目标表 * @param add_ids 同步修改或新增的数据ids * @param updateIds 更新数据的ids 逗号分隔 */ @Override @Transactional public void updateSyncRecord(String tableName, Integer[] add_ids, String updateIds) throws BaseException { DataTableEntity dt = baseDao.listTable("product_sys_database_sync_processing_config", "concat(',',relevance_table,',') like concat('%,',?,',%') and (parent_uuid is null or parent_uuid ='')", new Object[]{tableName}); if (!DataTableEntity.isEmpty(dt)) { for (int i = 0; i < dt.getRows(); i++) { if (!StringUtils.isEmpty(updateIds) || (add_ids != null && add_ids.length > 0)) { FieldSetEntity fs = new FieldSetEntity(); fs.setTableName("product_sys_database_sync_processing_config_sub"); fs.setValue(CmnConst.TABLE_NAME, tableName); if (add_ids != null && add_ids.length > 0) { List idScope = summaryRanges(add_ids); fs.setValue("add_scope_id", Joiner.on(",").join(idScope.toArray())); } fs.setValue("update_ids", updateIds); DataTableEntity dataTableEntity = new DataTableEntity(); dataTableEntity.addFieldSetEntity(fs); dt.getFieldSetEntity(i).addSubDataTable(dataTableEntity); } //执行次数+1 Integer execute_count = dt.getInt(i, "execute_count"); if (execute_count == null) { execute_count = 1; } else { execute_count++; } dt.setFieldValue(i, "execute_count", execute_count); baseDao.update(dt.getFieldSetEntity(i)); } } this.copyViewDataToTable(tableName); } /** * 获取数组中数字的范围 * * @param nums 数组 * @return */ public static List summaryRanges(Integer[] nums) { Arrays.sort(nums); List list = new ArrayList<>(); if (nums.length == 0) { return list; } int begin = Integer.MIN_VALUE; int end = Integer.MIN_VALUE; for (int i = 0; i < nums.length; i++) { if (i == 0) { begin = nums[i]; } else if (nums[i - 1] < nums[i] - 1) { list.add(begin + "~" + end); begin = nums[i]; } end = nums[i]; if (i == nums.length - 1) { list.add(begin + "~" + end); } } return list; } }