package com.product.data.center.service; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson.JSONArray; import com.product.common.utils.StringUtils; import com.product.core.dao.BaseDao; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import com.product.util.BaseUtil; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.text.DateFormat; import java.text.DecimalFormat; import java.util.*; /** * @Author cheng * @Date 2022/11/21 16:17 * @Desc 采集提取数据验证 */ @Service public class DataValidationService { @Resource private BaseDao baseDao; @Value("${data.system.name}") private String DATA_SYSTEM_NAME; /** * 默认查询近一周验证统计结果 * * @return */ public JSONArray getValidationData() { StringBuilder sql = new StringBuilder(); sql.append("\n WITH all_data AS ( SELECT * FROM `product_sys_mes_data_validation` WHERE created_utc_datetime>=DATE_SUB( DATE_FORMAT(NOW(),'%Y-%m-%d'), INTERVAL 7 DAY ) ) "); sql.append("\n SELECT "); sql.append("\n create_data_time t1, "); sql.append("\n DATE_FORMAT( create_data_time, '%Y-%m-%d' ) create_data_time, "); sql.append("\n extract_table_name validation_table_name, "); sql.append("\n concat( "); sql.append("\n min( sub_min_value ), "); sql.append("\n '-', "); sql.append("\n max( sub_max_value )) sub_min_max, "); sql.append("\n concat( "); sql.append("\n min( master_min_value ), "); sql.append("\n '-', "); sql.append("\n max( master_max_value )) master_min_max, "); sql.append("\n sum( sub_hours_total_count ) sub_hours_total_count, "); sql.append("\n sum( master_hours_total_count ) master_hours_total_count, "); sql.append("\n 1 type, "); sql.append("\n NULL parent_code, "); sql.append("\n concat( DATE_FORMAT( create_data_time, '%Y-%m-%d' ), '-', extract_table_name ) `code`, "); sql.append("\n CASE "); sql.append("\n "); sql.append("\n WHEN sum( sub_hours_total_count )= sum( master_hours_total_count ) "); sql.append("\n AND min( sub_min_value )= min( master_min_value ) "); sql.append("\n AND max( sub_max_value ) = max( master_max_value ) THEN "); sql.append("\n 'success' ELSE 'error' "); sql.append("\n END AS result "); sql.append("\n FROM "); sql.append("\n all_data "); sql.append("\n GROUP BY "); sql.append("\n create_data_time, "); sql.append("\n extract_table_name UNION ALL "); sql.append("\n SELECT "); sql.append("\n create_data_time t1, "); sql.append("\n concat( DATE_FORMAT( create_data_time, '%Y-%m-%d' ), ' ', ifnull(( SELECT sync_name FROM product_sys_data_sync_manager WHERE uuid = data_source ), '未知数据源' ) ) create_data_time, "); sql.append("\n extract_table_name validation_table_name, "); sql.append("\n concat( "); sql.append("\n min( sub_min_value ), "); sql.append("\n '-', "); sql.append("\n max( sub_max_value )) sub_min_max, "); sql.append("\n concat( "); sql.append("\n min( master_min_value ), "); sql.append("\n '-', "); sql.append("\n max( master_max_value )) master_min_max, "); sql.append("\n sum( sub_hours_total_count ) sub_hours_total_count, "); sql.append("\n sum( master_hours_total_count ) master_hours_total_count, "); sql.append("\n 2 type, "); sql.append("\n concat( DATE_FORMAT( create_data_time, '%Y-%m-%d' ), '-', extract_table_name ) parent_code, "); sql.append("\n concat( DATE_FORMAT( create_data_time, '%Y-%m-%d' ), '-', extract_table_name, '-', data_source ) `code`, "); sql.append("\n CASE "); sql.append("\n "); sql.append("\n WHEN sum( sub_hours_total_count )= sum( master_hours_total_count ) "); sql.append("\n AND min( sub_min_value )= min( master_min_value ) "); sql.append("\n AND max( sub_max_value ) = max( master_max_value ) THEN "); sql.append("\n 'success' ELSE 'error' "); sql.append("\n END AS result "); sql.append("\n FROM "); sql.append("\n all_data "); sql.append("\n GROUP BY "); sql.append("\n create_data_time, "); sql.append("\n extract_table_name UNION ALL "); sql.append("\n SELECT "); sql.append("\n * "); sql.append("\n FROM "); sql.append("\n ( "); sql.append("\n SELECT "); sql.append("\n create_data_time t1, "); sql.append("\n hours create_data_time, "); sql.append("\n extract_table_name validation_table_name, "); sql.append("\n concat( min(sub_min_value), '-', max(sub_max_value) ) sub_min_max, "); sql.append("\n concat( min(master_min_value), '-', max(master_max_value) ) master_min_max, "); sql.append("\n sum(sub_hours_total_count) sub_hours_total_count, "); sql.append("\n sum(master_hours_total_count) master_hours_total_count, "); sql.append("\n 3 type, "); sql.append("\n concat( DATE_FORMAT( create_data_time, '%Y-%m-%d' ), '-', extract_table_name, '-', data_source ) parent_code, "); sql.append("\n concat( DATE_FORMAT( create_data_time, '%Y-%m-%d' ), '-', extract_table_name, '-', data_source, '-', hours ) `code`, "); sql.append("\n CASE "); sql.append("\n "); sql.append("\n WHEN sum(sub_hours_total_count) = sum(master_hours_total_count) "); sql.append("\n AND min(sub_min_value) = min(master_min_value) "); sql.append("\n AND max(sub_max_value) = max(master_max_value) THEN "); sql.append("\n 'success' ELSE 'error' "); sql.append("\n END AS result "); sql.append("\n FROM "); sql.append("\n all_data "); sql.append("\n GROUP BY create_data_time,extract_table_name,data_source,hours "); sql.append("\n order by hours "); sql.append("\n ) a "); DataTableEntity dt = baseDao.listTable(sql.toString(), new Object[]{}); if (DataTableEntity.isEmpty(dt)) { return null; } return BaseUtil.dataTableToTreeData(dt, "code", "parent_code", null, false); } /** * 验证数据入口 */ public void verificationEntryPoint() { StringBuilder sql = new StringBuilder(); sql.append("\n SELECT "); sql.append("\n collect.id collect_id, "); sql.append("\n collect.auto_field collect_auto_field, "); sql.append("\n collect.data_source collect_data_source , "); sql.append("\n collect.time_field collect_time_field, "); sql.append("\n UPPER(collect.source_table) source_table, "); sql.append("\n extract.extract_target_source, "); sql.append("\n extract.extract_target_table, "); sql.append("\n extract.collect_source_field, "); sql.append("\n extract.extract_unique_field "); sql.append("\n FROM "); sql.append("\n product_sys_data_collect collect "); sql.append("\n JOIN product_sys_data_extract_config extract "); sql.append("\n ON "); sql.append("\n UPPER(extract.extract_source_table)=UPPER(collect.source_table) "); sql.append("\n AND collect.is_used =1 AND extract.is_used=1 AND extract.extract_prefix_key='DC' "); sql.append("\n ORDER BY source_table "); //查询启用的采集和提取 DataTableEntity dt = baseDao.listTable(sql.toString(), new Object[]{}); if (DataTableEntity.isEmpty(dt)) { return; } Map groupTable = new HashMap<>(); for (int i = 0; i < dt.getRows(); i++) { String extractTargetSource = dt.getString(i, "source_table") + "!:!" + dt.getString(i, "collect_data_source"); ValidationEntity verificationEntity = groupTable.get(extractTargetSource); if (verificationEntity == null) { verificationEntity = new ValidationEntity(); groupTable.put(extractTargetSource, verificationEntity); verificationEntity.setTargetDataSource(dt.getString(i, "extract_target_source")); verificationEntity.setCollectTableName(dt.getString(i, "source_table")); verificationEntity.setTargetDataTable(dt.getString(i, "extract_target_table")); verificationEntity.setAutoField(dt.getString(i, "collect_auto_field")); verificationEntity.setTimeField(dt.getString(i, "collect_time_field")); verificationEntity.setCollectSourceField(dt.getString(i, "collect_source_field")); verificationEntity.setPreMasterField(dt.getString(i, "extract_unique_field")); } verificationEntity.addCollectDataSource(dt.getString(i, "collect_data_source")); verificationEntity.addCollectId(dt.getString(i, "collect_id")); } for (ValidationEntity validation : groupTable.values()) { //循环提取 try { verifyValidation(validation); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } } private void verifyValidation(ValidationEntity validationEntity) { List collectDataSourceList = validationEntity.getCollectDataSource(); if (collectDataSourceList == null || collectDataSourceList.isEmpty()) { return; } String sql = replacePlaceholder(getTemplateSql(true), new String[]{validationEntity.getTimeField(), validationEntity.getPreMasterField(), validationEntity.getCollectSourceField(), validationEntity.getCollectTableName()}, null); String targetDataSource = validationEntity.getTargetDataSource(); DataBaseEntity dbe = new DataBaseEntity(targetDataSource); Dao targetDao = dbe.getDao(); DataTableEntity list = targetDao.getList(sql); targetDao.closeConnection(); Map collect = new HashMap<>(); List collectIds = validationEntity.getCollectIds(); for (int i = 0; i < list.getRows(); i++) { FieldSetEntity item = list.getFieldSetEntity(i); item.setTableName("product_sys_mes_data_validation"); String collectId = item.getString(validationEntity.getCollectSourceField()); // if(StringUtils.isEmpty(collectId)) Map> tableOfField = item.getMeta().getTableOfField(); if (tableOfField.containsKey("")) { tableOfField.put(item.getTableName(), tableOfField.remove("")); } item.setValue(validationEntity.getCollectSourceField(), collectId); collect.put(collectId + ":" + item.getString("hours"), item); } DecimalFormat decimalFormat = new DecimalFormat("00"); for (String collectId : collectIds) { for (int i = 0; i < 24; i++) { String format = decimalFormat.format(i); String key = collectId + ":" + format; if (!collect.containsKey(key)) { FieldSetEntity fs = new FieldSetEntity(); this.setDefaultValue(validationEntity.getCollectTableName(), collectId, fs, new Date()); fs.setValue(validationEntity.getCollectSourceField(), collectId); fs.setValue("collect_id", collectId); fs.setValue("hours", format); collect.put(key, fs); } key = DATA_SYSTEM_NAME + ":" + format; if (!collect.containsKey(key)) { FieldSetEntity fs = new FieldSetEntity(); this.setDefaultValue(validationEntity.getCollectTableName(), DATA_SYSTEM_NAME, fs, new Date()); fs.setValue(validationEntity.getCollectSourceField(), DATA_SYSTEM_NAME); fs.setValue("collect_id", DATA_SYSTEM_NAME); fs.setValue("hours", format); collect.put(key, fs); } } } // list.getData().stream().collect(Collectors.groupingBy(item -> { // String collectId = item.getString(validationEntity.getCollectSourceField()); // if (StringUtils.isEmpty(collectId)) { // // } // return collectId + ":" + item.getString("hours"); //// return // })); DataTableEntity result = new DataTableEntity(); Object[] fields = new Object[]{"hours_total_count", "min_value", "max_value"}; for (FieldSetEntity fs : collect.values()) { this.replaceFieldName("master", fields, fs, null, true); } for (int i = 0; i < collectIds.size(); i++) { String collectId = collectIds.get(i); String collectDataSource = collectDataSourceList.get(i); dbe = new DataBaseEntity(collectDataSource); Dao dao = dbe.getDao(); try { sql = replacePlaceholder(getTemplateSql(false), new String[]{validationEntity.getTimeField(), validationEntity.getAutoField(), validationEntity.getCollectSourceField(), validationEntity.getCollectTableName()}, null); DataTableEntity dt = dao.getList(sql); if (!DataTableEntity.isEmpty(dt)) { for (int j = 0; j < dt.getRows(); j++) { FieldSetEntity fse = dt.getFieldSetEntity(j); fse.setTableName("product_sys_mes_data_validation"); Map> tableOfField = fse.getMeta().getTableOfField(); if (tableOfField.containsKey("")) { tableOfField.put(fse.getTableName(), tableOfField.remove("")); } String hours = dt.getString(j, "hours"); String sourceInfo = dt.getString(j, validationEntity.getCollectSourceField()); if (StringUtils.isEmpty(sourceInfo)) { sourceInfo = collectId; } FieldSetEntity fs = collect.get(sourceInfo + ":" + hours); fs.setValue("collect_id", sourceInfo); this.setDefaultValue(validationEntity.getCollectTableName(), collectId, fs, new Date()); fse.setTableName(fs.getTableName()); this.replaceFieldName("sub", fields, fse, null, true); fse.append(fs); fse.setValue("data_source", dbe.getUuid()); result.addFieldSetEntity(fse); collect.remove(sourceInfo + ":" + hours); } } } catch (Exception e) { throw e; } finally { dao.closeConnection(); } } if (!DataTableEntity.isEmpty(result)) { baseDao.add(result); } } private static String getStartTime() { Calendar todayStart = Calendar.getInstance(); todayStart.add(Calendar.DATE, -1); todayStart.set(Calendar.SECOND, 0); todayStart.set(Calendar.MINUTE, 0); todayStart.set(Calendar.HOUR_OF_DAY, 0); todayStart.set(Calendar.MILLISECOND, 0); return DateUtil.format(todayStart.getTime(), DateFormat.getDateTimeInstance()); } private static String getEndTime() { Calendar todayEnd = Calendar.getInstance(); todayEnd.add(Calendar.DATE, -1); todayEnd.set(Calendar.SECOND, 59); todayEnd.set(Calendar.MINUTE, 59); todayEnd.set(Calendar.HOUR_OF_DAY, 23); todayEnd.set(Calendar.MILLISECOND, 999); return DateUtil.format(todayEnd.getTime(), DateFormat.getDateTimeInstance()); } private StringBuilder getTemplateSql(boolean isMain) { StringBuilder sql = new StringBuilder(); sql.append("\n SELECT data.*,data.D_H_HOURS HOURS FROM ("); sql.append("\n SELECT "); sql.append("\n COUNT( {1} ) HOURS_TOTAL_COUNT, "); sql.append("\n TO_CHAR( {0}, 'hh24' ) D_H_HOURS, "); sql.append("\n {2}, "); sql.append("\n MIN( {1} ) MIN_VALUE, "); sql.append("\n MAX( {1} ) MAX_VALUE "); sql.append("\n FROM "); sql.append("\n {3} "); sql.append("\n WHERE "); sql.append("\n {0}>=to_date('" + getStartTime() + "','yyyy-mm-dd HH24:MI:SS') and {0}<=TO_DATE('" + getEndTime() + "','yyyy-mm-dd HH24:MI:SS') "); if (isMain) { sql.append("\n and {2} like 'ch-kt%' "); } sql.append("\n GROUP BY "); sql.append("\n {2}, "); sql.append("\n TRUNC( {0} ), "); sql.append("\n TO_CHAR( {0}, 'hh24' ) ) data "); // sql.append("\n ) data ON h.C_M_HOUR = data.D_H_HOURS "); return sql; } /** * 验证数据单个 * * @param validationEntity 验证数据信息 */ // private void verifyValidation(ValidationEntity validationEntity) { // List collectDataSourceList = validationEntity.getCollectDataSource(); // if (collectDataSourceList == null || collectDataSourceList.isEmpty()) { // return; // } // String extractTargetSource = validationEntity.getTargetDataSource(); // for (int i = 0; i < collectDataSourceList.size(); i++) { // //采集来源数据源 // String collectDataSource = collectDataSourceList.get(i); // try { // DataBaseEntity dbe = new DataBaseEntity(collectDataSource); // Dao collectTargetDao = dbe.getDao(); // StringBuilder sqlTemplate = new StringBuilder(268); // sqlTemplate.append("\n WITH T__T AS ( "); // // {0} timeField {1} autoField {2} 查询表名 // sqlTemplate.append("\n SELECT TO_CHAR(").append("{0}").append(",'hh24') AS D_H_HOURS ,"); // sqlTemplate.append("\n ").append("{1}").append(",").append("{0}"); // sqlTemplate.append("\n FROM ").append("{2}"); // // - 1 代表执行日期的前一天 // sqlTemplate.append("\n WHERE TRUNC(").append("{0}").append(") = TRUNC(sysdate - 1 ) ) "); // // --- WITH 结束 // sqlTemplate.append("\n SELECT "); // sqlTemplate.append("\n COUNT( {1} ) HOURS_TOTAL_COUNT,"); // sqlTemplate.append("\n D_H_HOURS HOURS, "); // sqlTemplate.append("\n MIN( {1} ) MIN_VALUE, "); // sqlTemplate.append("\n MAX( {1} ) MAX_VALUE "); // sqlTemplate.append("\n FROM "); // sqlTemplate.append("\n T__T"); // sqlTemplate.append("\n GROUP BY TRUNC( {0} ), D_H_HOURS "); // sqlTemplate.append("\n ORDER BY D_H_HOURS "); // DataTableEntity list = collectTargetDao.getList(replacePlaceholder(sqlTemplate, // new String[]{validationEntity.getTimeField(), validationEntity.getAutoField(), // validationEntity.getCollectTableName()}, null)); // collectTargetDao.closeConnection(); // if (DataTableEntity.isEmpty(list)) { // return; // } else { // final String yesterdayDate = DateUtil.format(DateUtil.offset(new Date(), DateField.HOUR_OF_DAY, -1), "yyyy-MM-dd"); // final Object[] fields = list.getMeta().getFields(); // //根据小时分组 // Map map = list.getData().stream().collect(Collectors.toMap(item -> item.getString("hours"), item -> { // replaceFieldName("sub", fields, item, null, true); // setDefaultValue(validationEntity, item, yesterdayDate); // return item; // })); // dbe = new DataBaseEntity(extractTargetSource); // Dao extractTargetDao = dbe.getDao(); // DataTableEntity result = new DataTableEntity(); // list = extractTargetDao.getList(replacePlaceholder(sqlTemplate, // getExtractTargetTableField(validationEntity.getCollectTableName(), validationEntity.getTargetDataTable(), // validationEntity.getAutoField(), validationEntity.getTimeField()), " AND " + // validationEntity.getCollectSourceField() + " =?"), new Object[]{validationEntity.getCollectId()}); // if (DataTableEntity.isEmpty(list)) { // for (FieldSetEntity value : map.values()) { // replaceFieldName("master", fields, value, 0, false); // result.addFieldSetEntity(value); // } // return; // } // for (int j = 0; j < list.getRows(); j++) { // String hours = list.getString(i, "hours"); // FieldSetEntity fs = list.getFieldSetEntity(i); // FieldSetEntity fse = map.get(hours); // replaceFieldName("master", fields, fs, null, true); // if (!FieldSetEntity.isEmpty(fse)) { // fse.append(fs); // map.remove(hours); // result.addFieldSetEntity(fse); // } else { // replaceFieldName("sub", fields, fs, 0, true); // setDefaultValue(validationEntity, fs, yesterdayDate); // result.addFieldSetEntity(fs); // } // } // if (!map.isEmpty()) { // for (FieldSetEntity value : map.values()) { // replaceFieldName("master", fields, value, 0, false); // result.addFieldSetEntity(value); // } // } // if (DataTableEntity.isEmpty(result)) { // baseDao.add(result); // } // } // } catch (Exception e) { // e.printStackTrace(); // SpringMVCContextHolder.getSystemLogger().error(e); // } // } // // } /** * 设置默认值 * * @param collectTableName * @param collectId * @param fs * @param yesterdayDate */ private void setDefaultValue(String collectTableName, String collectId, FieldSetEntity fs, Date yesterdayDate) { fs.setTableName("product_sys_mes_data_validation"); fs.setValue("create_data_time", getStartTime()); fs.setValue(CmnConst.CREATED_UTC_DATETIME, yesterdayDate); // fs.setValue("collect_id", collectId); fs.setValue("extract_table_name", collectTableName); } /** * 替换字段名 * * @param prefix 给字段加前缀 * @param fields 所有字段名 * @param fse * @param defaultValue * @param deleteField */ private void replaceFieldName(String prefix, Object[] fields, FieldSetEntity fse, Object defaultValue, boolean deleteField) { for (Object field : fields) { String fieldName = field.toString(); if ("hours".equals(fieldName)) continue; fse.setValue(prefix + "_" + fieldName, defaultValue == null ? fse.getObject(fieldName) : defaultValue); if (deleteField) { fse.remove(fieldName); } } } /** * 获取提取映射的目标字段 * * @param collectSourceTable 采集来源表名 * @param extractTargetTable 提取目标表名 * @param autoField 提取自增字段 * @param timeField 提取时间字段 * @return */ private String[] getExtractTargetTableField(String collectSourceTable, String extractTargetTable, String autoField, String timeField) { StringBuilder sql = new StringBuilder(); sql.append("\nSELECT source_redis_field,target_table_field "); sql.append("\nFROM `product_sys_data_extract_config_sub` "); sql.append("\nwhere parent_uuid =( SELECT"); sql.append("\nuuid FROM `product_sys_data_extract_config` "); sql.append("\nWHERE extract_source_table = ? ) AND ( source_redis_field = ? or source_redis_field = ? ) limit 2"); DataTableEntity dt = baseDao.listTable(sql.toString(), new Object[]{collectSourceTable, autoField, timeField}); if (DataTableEntity.isEmpty(dt) || dt.getRows() != 2) { throw new BaseException(ErrorCode.GET_EXTRACT_MAPPING_FIELD_FAIL); } String[] result = new String[3]; result[2] = extractTargetTable; for (int i = 0; i < 2; i++) { String sourceRedisField = dt.getString(i, "source_redis_field"); String targetTableField = dt.getString(i, "target_table_field"); if (autoField.equals(sourceRedisField)) { // result[1] = targetTableField; } else { result[0] = targetTableField; } } return result; } /** * 根据参数替换 * * @param template * @param params * @param filter * @return */ private String replacePlaceholder(StringBuilder template, String[] params, String filter) { String sql = template.toString(); if (params != null) { for (int i = 0; i < params.length; i++) { sql = sql.replaceAll("\\{" + i + "}", params[i]); } } return sql.replace("{-1}", filter == null ? " 1=1 " : filter); // return sql; } private class ValidationEntity { //采集唯一标识 private List collectIds; //采集自增字段 private String autoField; //采集时间字段 private String timeField; //采集数据源 private List collectDataSource; //采集表名 private String collectTableName; //提取目标表数据源 private String targetDataSource; //提取目标数据表 private String targetDataTable; //采集id配置字段 private String collectSourceField; private String preMasterField; public String getPreMasterField() { return preMasterField; } public void setPreMasterField(String preMasterField) { this.preMasterField = preMasterField; } public List getCollectIds() { return collectIds; } public void setCollectIds(List collectIds) { this.collectIds = collectIds; } public String getAutoField() { return autoField; } public void setAutoField(String autoField) { this.autoField = autoField; } public String getTimeField() { return timeField; } public void setTimeField(String timeField) { this.timeField = timeField; } public List getCollectDataSource() { return collectDataSource; } public void addCollectId(String collectId) { if (StringUtils.isEmpty(collectId)) { return; } if (this.collectIds == null) { this.collectIds = new ArrayList<>(); } this.collectIds.add(collectId); } public void addCollectDataSource(String collectDataSource) { if (StringUtils.isEmpty(collectDataSource)) { return; } if (this.collectDataSource == null) { this.collectDataSource = new ArrayList<>(); } this.collectDataSource.add(collectDataSource); } public void setCollectDataSource(List collectDataSource) { this.collectDataSource = collectDataSource; } public String getCollectTableName() { return collectTableName; } public void setCollectTableName(String collectTableName) { this.collectTableName = collectTableName; } public String getTargetDataSource() { return targetDataSource; } public void setTargetDataSource(String targetDataSource) { this.targetDataSource = targetDataSource; } public String getTargetDataTable() { return targetDataTable; } public void setTargetDataTable(String targetDataTable) { this.targetDataTable = targetDataTable; } public String getCollectSourceField() { return collectSourceField; } public void setCollectSourceField(String collectSourceField) { this.collectSourceField = collectSourceField; } } }