package com.product.data.center.service; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.date.DateTime; import cn.hutool.core.date.DateUtil; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.NumberUtil; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Sets; import com.product.common.utils.spring.SpringUtils; import com.product.core.config.CoreConst; import com.product.core.config.Global; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.exception.BaseException; import com.product.core.service.support.AbstractBaseService; import com.product.core.sign.SignUtil; import com.product.core.spring.context.SpringBeanUtil; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.core.transfer.Transactional; import com.product.core.util.JsonUtil; import com.product.data.center.config.CmnConst; import com.product.data.center.config.ErrorCode; import com.product.data.center.entity.HistoryEntity; import com.product.data.center.service.ide.IMesExternalService; import com.product.data.center.utils.QuerySqlParseUtil; import com.product.datasource.dao.Dao; import com.product.datasource.dao.impl.OracleDaoImpl; import com.product.datasource.entity.DataBaseEntity; import com.product.quartz.service.IRemoteService; import com.product.util.BaseUtil; import oracle.jdbc.internal.OracleTypes; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.servlet.http.HttpServletRequest; import java.sql.CallableStatement; import java.sql.Connection; import java.sql.SQLException; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; /** * @Author cheng * @Date 2022/12/16 13:20 * @Desc MES外部接口 */ @Service public class MesExternalService extends AbstractBaseService implements IMesExternalService, IRemoteService, com.product.data.service.impl.IRemoteService { @Value("${data.system.name}") private String dataSystemName; private CommonService commonService = null; private FieldSetEntity collectLogCache = null; public CommonService getCommonService() { if (this.commonService == null) { this.commonService = SpringBeanUtil.getBean(CommonService.class); } return commonService; } public static void main(String[] args) { Date parse = DateUtil.parse("2023-10-23 18:22:50", "yyyy-MM-dd HH:mm:ss"); Date parse1 = DateUtil.parse("2023-10-23 18:24:11", "yyyy-MM-dd HH:mm:ss"); Date parse2 = DateUtil.parse("2023-10-23 18:24:21", "yyyy-MM-dd HH:mm:ss"); Set set = Sets.newHashSet(parse, parse1, parse2); List sets = CollectionUtil.toList(set.toArray(new Date[0])); Optional max = sets.stream().max(Comparator.comparing((a) -> a.getTime())); System.out.println(max.get()); } public void splitTableData() { // FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false); // if (FieldSetEntity.isEmpty(reportDbConfig)) { // throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL); // } // DataBaseEntity dbe = new DataBaseEntity(reportDbConfig); // Dao reportDao = dbe.getDao(); // Set trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking"); // // for (String tableName : trackingTableSet) { // //获取年份从表名最后一个 下划线开始截取 // String year = tableName.substring(tableName.lastIndexOf("_") + 1); // if (!StringUtils.equalsAny(year, "2023") || year.length() > 4) { // continue; // } // //获取表前缀 // String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_")); // ExecutorService executorService = Executors.newFixedThreadPool(12); // for (int i = 1; i <= 12; i++) { // final int finalI = i; // executorService.submit(() -> { // Dao currentDao = dbe.newDao(); // //获取当前月份 以MM格式化 // String month = String.format("%02d", finalI); // //检查月份对应的表是否存在 // String monthTableName = tablePrefix + "_" + year + month; // Set allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month); // if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) { // //根据原始表结构创建新表 // String sql = "create table " + monthTableName + " like " + tableName; // currentDao.executeSql(sql); // SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName); // } // String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI; // currentDao.executeSql(sql); // currentDao.closeConnection(); // }); // // } // executorService.shutdown(); // while (true) { // try { // if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break; // Thread.sleep(5000); // SpringMVCContextHolder.getSystemLogger().info("线程等待中..."); // } catch (InterruptedException e) { // e.printStackTrace(); // } // // } // } // reportDao.closeConnection(); } /** * 获取历史数据 */ public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException { //机号 String serialNumber = fse.getString("serial_number"); SpringMVCContextHolder.getSystemLogger().info("准备回写机号数据:" + serialNumber); if (StringUtils.isEmpty(serialNumber)) { throw new BaseException(ErrorCode.SERIAL_NUMBER_IS_NULL); } FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1 limit 1", null, false); if (FieldSetEntity.isEmpty(fs)) { throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR); } FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false); if (FieldSetEntity.isEmpty(reportDbConfig)) { throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL); } DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "upper(source_table) in ('T_WIP_TRACKING','T_WIP_PRODUCT_KEYP','T_WIP_DETAIL','T_PM_PRODUCT_SN') and data_source in (select uuid from product_sys_data_sync_manager)", new Object[]{}, new Object[]{"uuid,id,data_source,source_table"}); Map> groupByCollectId = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("id"))); Map> groupBySourceTable = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("source_table"))); DataBaseEntity dbe = new DataBaseEntity(reportDbConfig); Dao reportDao = dbe.getDao(); String reportDbName = dbe.getDbName(); Set trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_tracking"); HistoryEntity trackingData = historyBeforeDispose(getData(reportDao, trackingTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.TRACKING_TABLE_NOT_EXISTS, ErrorCode.NOT_FOUND_SERIAL_NUMBER}), CmnConst.T_WIP_TRACKING); Set keypTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_product_keyp"); HistoryEntity keypData = historyBeforeDispose(getData(reportDao, keypTableSet, "pk_product_sn", serialNumber, new ErrorCode[]{ErrorCode.KEYP_TABLE_NOT_EXISTS, ErrorCode.KEYP_DATA_NOT_FOUND}), CmnConst.T_WIP_PRODUCT_KEYP); Set detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail"); HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL); // Set productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn"); // HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN); //主库数据源配置 String masterDataSource = fs.getString("data_source"); dbe = new DataBaseEntity(masterDataSource); Map groupDao = new HashMap<>(); Dao dao = dbe.getDao(); boolean success = false; try { Connection connection = dao.getConnection(); connection.setAutoCommit(false); HistoryEntity[] historyEntities = {trackingData, keypData, detailData}; insertMasterTableData(dao, historyEntities); insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities); connection.commit(); batchCommit(groupDao); success = true; SpringMVCContextHolder.getSystemLogger().info("回写机号:" + serialNumber + "成功"); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error("回写机号:" + serialNumber + "失败"); SpringMVCContextHolder.getSystemLogger().error(e); throw new BaseException(ErrorCode.INSERT_DATA_FAIL); } finally { try { if (!success && !dao.getConnection().getAutoCommit()) { dao.getConnection().rollback(); } dao.closeConnection(); for (Dao value : groupDao.values()) { try (Connection connection = value.getConnection()) { if (!connection.getAutoCommit()) { connection.rollback(); } } value.closeConnection(); } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } } public void batchCommit(Map groupDao) throws SQLException { for (Dao value : groupDao.values()) { value.getConnection().commit(); } } /** * 插入数据到主库 * * @param dao * @param historyEntities */ public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) { for (HistoryEntity historyEntity : historyEntities) { if (historyEntity == null) { continue; } DataTableEntity masterDataTable = historyEntity.getMasterDataTable(); if (DataTableEntity.isEmpty(masterDataTable)) { continue; } Object[] objects = masterDataTable.getData().stream().map(item -> { String primaryValue = item.getString(historyEntity.getPrimaryField()); //判断是否为数值且包含小数点 小数点后面是否全是0 if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue. indexOf(".") + 1).matches("^0*$")) { //返回整数字符串 return primaryValue.substring(0, primaryValue.indexOf(".")); } return primaryValue; }).toArray(); //查询主库数据是否存在 DataTableEntity list = dao.getList(historyEntity.getTableName(), BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), objects.length, true), new String[]{historyEntity.getPrimaryField()}, objects); List existsPrimaryValues = null; if (!DataTableEntity.isEmpty(list)) { existsPrimaryValues = list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList()); } for (int i = 0; i < masterDataTable.getRows(); i++) { String primaryValue = masterDataTable.getString(i, historyEntity.getPrimaryField()); if (existsPrimaryValues != null && existsPrimaryValues.contains(primaryValue)) { //数据存在跳过该数据 continue; } FieldSetEntity fieldSetEntity = masterDataTable.getFieldSetEntity(i); fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()}); fieldSetEntity.remove("~table_name~"); dao.add(fieldSetEntity); } } } /** * 插入子库表数据 * * @param groupDao 子库dao * @param groupByCollectId 采集配置按采集id分组 * @param groupBySourceTable 采集配置按表名分组 * @param historyEntities 历史数据 */ public void insertSubTableData(Map groupDao, Map> groupByCollectId, Map> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception { for (HistoryEntity historyEntity : historyEntities) { if (historyEntity == null) { continue; } Map> groupData = historyEntity.getGroupData(); if (groupData == null || groupData.isEmpty()) { continue; } for (Map.Entry> entry : groupData.entrySet()) { List daoList = new ArrayList<>(); if ("ch-kt".equals(entry.getKey())) { List fieldSetEntityList = groupBySourceTable.get(historyEntity.getTableName().toLowerCase()); Set dataSourceSet = fieldSetEntityList.stream().map(item -> item.getString("data_source")).collect(Collectors.toSet()); for (String sourceUuid : dataSourceSet) { daoList.add(getDao(groupDao, sourceUuid)); } } else { daoList.add(getDao(groupDao, groupByCollectId.get(entry.getKey()).get(0).getString("data_source"))); } for (Dao dao : daoList) { List value = entry.getValue(); //查询已存在的数据 DataTableEntity list = dao.getList(historyEntity.getTableName(), BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), value.size(), true), value.stream().map(item -> { String primaryValue = item.getString(historyEntity.getPrimaryField()); //判断是否为数值且包含小数点 小数点后面是否全是0 if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue. indexOf(".") + 1).matches("^0*$")) { //返回整数字符串 return primaryValue.substring(0, primaryValue.indexOf(".")); } return primaryValue; }).toArray()); List existIds = DataTableEntity.isEmpty(list) ? null : list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList()); for (FieldSetEntity fieldSetEntity : value) { String primaryValue = fieldSetEntity.getString(historyEntity.getPrimaryField()); if (existIds != null && existIds.contains(NumberUtil.parseNumber(primaryValue).toString())) { continue; } fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()}); fieldSetEntity.remove("~table_name~"); dao.add(fieldSetEntity); } } } } } public Dao getDao(Map groupDao, String sourceUuid) throws Exception { Dao dao = groupDao.get(sourceUuid); if (null == dao) { DataBaseEntity dbe = new DataBaseEntity(sourceUuid); dao = dbe.getDao(); dao.getConnection().setAutoCommit(false); groupDao.put(sourceUuid, dao); } return dao; } public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) { if (DataTableEntity.isEmpty(dt)) { return null; } HistoryEntity historyEntity = new HistoryEntity(); historyEntity.setMoNumberField("mo_number"); if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) { historyEntity.setPrimaryField("wip_id"); historyEntity.setTimeField("update_date"); } else if (CmnConst.T_WIP_PRODUCT_KEYP.equalsIgnoreCase(targetTableName)) { historyEntity.setPrimaryField("pk_id"); historyEntity.setTimeField("pk_loadtime"); historyEntity.setMoNumberField("pk_mo"); } else if (CmnConst.T_WIP_DETAIL.equalsIgnoreCase(targetTableName)) { historyEntity.setPrimaryField("wip_detail_id"); historyEntity.setTimeField("update_date"); } else if (CmnConst.T_PM_PRODUCT_SN.equalsIgnoreCase(targetTableName)) { historyEntity.setPrimaryField("row_id"); historyEntity.setTimeField("update_date"); } else { return null; } historyEntity.setTableName(targetTableName.toUpperCase()); Date now = new Date(); for (int i = 0; i < dt.getRows(); i++) { Map values = dt.getFieldSetEntity(i).getValues(); //遍历map中的value是否为数字,如果是数字判断小数点后面是否有值,如果没有值则转换为整数 for (Map.Entry entry : values.entrySet()) { if (entry.getValue() instanceof Number) { Number number = (Number) entry.getValue(); if (number.doubleValue() == number.intValue()) { entry.setValue(number.intValue()); } } } } if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName) && dt.getRows() > 1) { List data = dt.getData(); FieldSetEntity newData = null; for (int i = 0; i < data.size(); i++) { FieldSetEntity fieldSetEntity = data.get(i); if ("ch-kt".equals(fieldSetEntity.getValue("pre_master_key"))) { data.remove(i); dt.removeFieldSetEntity(i); break; } else if (newData == null) { newData = fieldSetEntity; continue; } Date date = fieldSetEntity.getDate(historyEntity.getTimeField()); if (date.getTime() > newData.getDate(historyEntity.getTimeField()).getTime()) { newData = fieldSetEntity; } } if (newData != null) { dt = new DataTableEntity(); dt.addFieldSetEntity(newData); } } DataTableEntity subData = dt.clones(); DataTableEntity masterData = dt.clones(); for (int i = 0; i < subData.getRows(); i++) { FieldSetEntity fs = subData.getFieldSetEntity(i); String preMasterKey = fs.getString("pre_master_key"); if (historyEntity.getPrimaryField().equals(preMasterKey)) { preMasterKey = null; } if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) { preMasterKey = fs.getString(historyEntity.getPrimaryField()); masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField())); } fs.setValue(historyEntity.getPrimaryField(), preMasterKey); fs.setValue("pre_master_key", null); masterData.setFieldValue(i, historyEntity.getTimeField(), now); fs.setValue(historyEntity.getTimeField(), now); fs.remove("~table_name~"); masterData.getFieldSetEntity(i).remove("~table_name~"); if (StringUtils.isEmpty(preMasterKey)) { masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField())); } if (StringUtils.isEmpty(historyEntity.getPrimaryField())) { subData.removeFieldSetEntity(i); masterData.removeFieldSetEntity(i); i--; } } historyEntity.setArchivedDataTable(dt); historyEntity.setMasterDataTable(masterData); historyEntity.setSubDataTable(subData); return historyEntity; } public DataTableEntity getData(Dao dao, Set tableSet, String filterFieldName, String serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException { if (CollectionUtil.isEmpty(tableSet)) { throw new BaseException(errorCodes[0]); } String[] tableArray = tableSet.toArray(new String[]{}); StringBuilder sql = new StringBuilder(); for (String tableName : tableArray) { if (sql.length() > 0) { sql.append(" union all "); } sql.append("select a.*,'" + tableName + "' as '~table_name~' from " + tableName + " a where " + filterFieldName + " = ?"); } DataTableEntity data = dao.getList(sql.toString(), Arrays.stream(tableArray).map((t) -> serialNumber).toArray()); // CompletionService objectCompletionService = ThreadUtil.newCompletionService(); // //多线程查询单张表,等待所有线程查询完毕 // for (String tableName : tableArray) { // objectCompletionService.submit(() -> { // DataTableEntity list = dao.getList("select a.*,'" + tableName + "' as '~table_name~' from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber}); // return list; // }); // } // DataTableEntity data = new DataTableEntity(); // for (int i = 0; i < tableArray.length; i++) { // DataTableEntity dataTableEntity = objectCompletionService.take().get(); // BaseUtil.dataTableMerge(data, dataTableEntity); // } if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) { throw new BaseException(errorCodes[1]); } return data; } /** * 初始化制令单 * * @param moNumbers * @throws BaseException */ public void initMoBase(String[] moNumbers) throws BaseException { FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1 limit 1", null, false); if (FieldSetEntity.isEmpty(fs)) { throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR); } String[] targetDataSource = fs.getString("target_data_source").split(","); Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new); StringBuilder errorMsg = new StringBuilder(); for (Dao dao : targetDao) { for (String moNumber : moNumbers) { synchronized (moNumber.intern()) { try (Connection connection = dao.getConnection(); CallableStatement callableStatement = connection.prepareCall( "{CALL SMT_T_PM_MO_BASE(?,?,?,?,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,?,null,null" + ",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null)}")) { callableStatement.setInt(1, 5); callableStatement.registerOutParameter(2, OracleTypes.SMALLINT); callableStatement.registerOutParameter(3, OracleTypes.VARCHAR); callableStatement.setString(4, moNumber); callableStatement.setString(5, "1"); callableStatement.execute(); //执行后返回的错误码 int errorCode = callableStatement.getInt(2); //执行后返回的错误信息 String errorText = callableStatement.getString(3); if (errorCode != 0) { //错误的 errorMsg.append("制令单号:").append(moNumber); errorMsg.append("\nerrorCode:").append(errorCode); errorMsg.append("\nerrorText:").append(errorText); } } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); e.printStackTrace(); errorMsg.append("制令单号:").append(moNumber); errorMsg.append("\n执行时未知错误:").append(e.getMessage()); } } } } if (errorMsg.length() > 0) { throw new BaseException(ErrorCode.SUB_MO_BASE_INI_FAIL.getValue(), ErrorCode.SUB_MO_BASE_INI_FAIL.getText() + "。\n" + errorMsg); } } /** * @param moNumbers 制令单号,多个逗号分隔 * @param type 操作类型 1 创建制令单 2 更新制令单中的指定字段 * @throws BaseException */ public void updateMoBase(String[] moNumbers, int type) throws BaseException { FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1 limit 1", null, false); if (FieldSetEntity.isEmpty(fs)) { throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR); } String dataSource = fs.getString("data_source"); String[] targetDataSource = fs.getString("target_data_source").split(","); StringBuilder errorMessage = new StringBuilder(); DataBaseEntity dbe = new DataBaseEntity(dataSource); for (int i = 0; i < moNumbers.length; i++) { String moNumber = moNumbers[i]; try { synchronized (moNumber.intern()) { Dao dao = dbe.getDao(); FieldSetEntity one = dao.getOne("T_PM_MO_BASE", "MO_NUMBER=?", new Object[]{moNumber}); if (FieldSetEntity.isEmpty(one)) { throw new BaseException(ErrorCode.MO_NUMBER_RECORD_SELECT_EMPTY); } one.setTableName("T_PM_MO_BASE"); dao.closeConnection(); Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new); StringBuilder errorMsg = new StringBuilder(); try { if (1 == type) { createMoBase(errorMsg, one, targetDao); } else if (2 == type) { updateMoBase(errorMsg, one, targetDao); } if (errorMsg.length() > 0) { //有错误 throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMsg); } } finally { for (Dao dao1 : targetDao) { dao1.closeConnection(); } } } } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); errorMessage.append(e.getMessage()).append("\n"); } } if (errorMessage.length() > 0) { throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMessage); } } /** * 创建制令单 * * @param errorMsg * @param moBase * @param subDao */ private void createMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) { String moNumber = moBase.getString("mo_number"); for (int i = 0; i < subDao.length; i++) { Dao dao = subDao[i]; try { //检查工单表是否存在 FieldSetEntity projectBase = dao.getOne("T_PM_PROJECT_BASE", "PROJECT_ID = ? ", new Object[]{moNumber}); if (FieldSetEntity.isEmpty(projectBase)) { throw new BaseException(ErrorCode.SUB_PROJECT_BASE_CAN_NOT_EMPTY); } //检查制令单表是否已存在 FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber}); if (!FieldSetEntity.isEmpty(fs)) { //进行更新操作 updateMoBase(errorMsg, moBase, new Dao[]{dao}); continue; } //将 T_PM_MO_BASE.PROJECT_ID 更改为子库中对应 T_PM_PROJECT_BASE.PROJECT_BASE_ID moBase.setValue("project_id", projectBase.getString("project_base_id")); // moBase.setValue("row_id", -1); dao.add(moBase); } catch (BaseException e) { errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)). append("]").append(e.getMessageInfo()); e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } catch (Exception e) { errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("创建制令单失败,未知的错误"); e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } } /** * 更新制令单 * * @param errorMsg * @param moBase * @param subDao */ private void updateMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) { String moNumber = moBase.getString("mo_number"); for (int i = 0; i < subDao.length; i++) { Dao dao = subDao[i]; try { //检查制令单表是否已存在 FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber}); if (FieldSetEntity.isEmpty(fs)) { //进行创建操作 createMoBase(errorMsg, moBase, new Dao[]{dao}); continue; } // 当子库中制令单 input_qty 字段为 0 或者 为 null 时 更新多个字段,否则只更新target_qty 字段 String[] updateField = StringUtils.isEmpty(fs.getString("input_qty")) || "0".equals(fs.getString("input_qty")) ? new String[]{"target_qty", "owner", "mo_create_date", "mo_schedule_date", "mo_due_date", "areaid", "technicsid", "close_flag", "default_group", "end_group"} : new String[]{"target_qty"}; StringBuilder sql = new StringBuilder(); sql.append(" UPDATE T_PM_MO_BASE SET "); Object[] params = new Object[updateField.length + 1]; for (int k = 0; k < updateField.length; k++) { if (k > 0) { sql.append(","); } params[k] = moBase.getObject(updateField[k]); sql.append(updateField[k].toUpperCase()).append(" = ? "); } params[updateField.length] = moNumber; sql.append(" WHERE MO_NUMBER = ? "); dao.executeSql(sql.toString(), params); } catch (BaseException e) { errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)). append("]").append(e.getMessageInfo()); e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } catch (Exception e) { errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("更新制令单失败,未知的错误"); e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } } private String getIp(Dao dao) { return ((OracleDaoImpl) dao).getDataBaseEntity().getIp(); } /** * 采集配置保存 * * @param request * @throws BaseException */ @Override @Transactional public void saveCollectConfig(HttpServletRequest request) throws BaseException { //服务名称 String serverName = request.getHeader("server-name"); if (StringUtils.isEmpty(serverName)) { throw new BaseException(ErrorCode.SERVER_NAME_CAN_NOT_EMPTY); } FieldSetEntity fse = BaseUtil.getFieldSetEntity(request, CmnConst.PRODUCT_SYS_DATA_COLLECT); //采集来源 String sourceInfo = fse.getString("id"); if (StringUtils.isEmpty(sourceInfo) || StringUtils.contains(sourceInfo, dataSystemName)) { throw new BaseException(ErrorCode.COLLECT_SOURCE_VALUE); } commonSave(fse); } /** * 提取配置保存 * * @param fse * @throws BaseException */ @Override @Transactional public void saveExtractConfig(FieldSetEntity fse) throws BaseException { String dataSource = fse.getString(CmnConst.TABLE_SYNC_MANAGER); if (!StringUtils.isEmpty(dataSource)) { FieldSetEntity fs = JsonUtil.pareseJsonToFieldSetEntity(dataSource); if (!FieldSetEntity.isEmpty(fs)) { commonSave(fs); } } commonSave(fse); getCommonService().saveDelRecordConfig(2, fse.getUUID()); } /** * 归档配置保存 * * @param fse * @throws BaseException */ @Override @Transactional public void saveArchiveConfig(FieldSetEntity fse) throws BaseException { commonSave(fse); } /** * @param fse * @throws BaseException */ @Override public void saveCollectLog(FieldSetEntity fse) throws BaseException { this.commonSave(fse); } /** * @param fse * @throws BaseException */ @Override public void saveExtractLog(FieldSetEntity fse) throws BaseException { this.commonSave(fse); } /** * 数据源保存会调用该方法 * * @param fse * @throws BaseException */ @Override public void saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException { //子服务保存数据库连接配置后调用该方法,传输数据到主服务 //TODO } /** * 定时任务生成日志会调用该方法 * * @param fse * @throws BaseException */ @Override public void saveTimeLog(FieldSetEntity fse) throws BaseException { //主服务提取日志保存后调用该方法传输到子服务 //TODO } private void commonSave(FieldSetEntity fse) throws BaseException { String uuid = fse.getUUID(); FieldSetEntity fs = getBaseDao().getFieldSetEntity(fse.getTableName(), new String[]{CmnConst.UUID}, uuid, false); Map subData = fse.getSubData(); if (!CollectionUtil.isEmpty(subData)) { DataTableEntity addDt = new DataTableEntity(); DataTableEntity updateDt = new DataTableEntity(); for (Map.Entry entry : subData.entrySet()) { String tableName = entry.getKey(); DataTableEntity value = entry.getValue(); if (DataTableEntity.isEmpty(value)) { continue; } Object[] uuids = value.getUuids(); DataTableEntity dt = getBaseDao().listTable(tableName, BaseUtil.buildQuestionMarkFilter(CmnConst.UUID, uuids.length, true), uuids, new String[]{CmnConst.UUID}); for (int i = 0; i < value.getRows(); i++) { uuid = value.getString(i, CmnConst.UUID); List fieldSetEntity = dt.getFieldSetEntity(uuid); if (CollectionUtil.isEmpty(fieldSetEntity)) { addDt.addFieldSetEntity(value.getFieldSetEntity(i)); value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add"); } else { addDt.addFieldSetEntity(updateDt.getFieldSetEntity(i)); value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "update"); } } if (!DataTableEntity.isEmpty(addDt)) { getBaseDao().add(addDt); } if (!DataTableEntity.isEmpty(updateDt)) { getBaseDao().update(updateDt); } } } if (FieldSetEntity.isEmpty(fs)) { getBaseDao().add(fse, false); } else { getBaseDao().update(fse, false); } } /** * 调用远程主服务器采集保存 * * @param fse * @return * @throws BaseException */ public FieldSetEntity remoteSaveCollectConfig(FieldSetEntity fse) throws BaseException { if ("ch-kt".equals(dataSystemName)) { return fse; } FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false); if (FieldSetEntity.isEmpty(fs)) { throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY); } //服务域名的端口 String ipPort = fs.getString("server_url"); String serverName = fs.getString("server_name"); String serverUrl = ipPort + CmnConst.SAVE_COLLECT_URL; FieldSetEntity res = doPost(serverUrl, serverName, fse); return res; } /** * 主服务提取保存日志后调用该方法传入到子服务保存 * * @param fse * @return * @throws BaseException */ public void remoteSaveExtractLog(FieldSetEntity fse) throws BaseException { try { if ("ch-kt".equals(dataSystemName)) { //主服务采集日志uuid String preStepUuid = fse.getString(CmnConst.PRE_STEP_UUID); StringBuilder sql = new StringBuilder(); sql.append("\nSELECT "); sql.append("\nserver.* "); sql.append("\nFROM ").append(CmnConst.PRODUCT_SYS_DATA_COLLECT).append(" collect "); sql.append("\nJOIN ").append(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG).append(" log "); sql.append("\nON log.config_uuid=collect.uuid and log.type=1 "); sql.append("\nJOIN ").append(CmnConst.PRODUCT_MES_SERVER).append(" server "); sql.append("\nON collect.id like concat(server.server_name,'%') "); sql.append("\nWHERE log.uuid=? and collect.id not like concat(?,'%') limit 1"); FieldSetEntity fs = getBaseDao().getFieldSetBySQL(sql.toString(), new Object[]{preStepUuid, dataSystemName}, false); if (FieldSetEntity.isEmpty(fs)) { return; } String ipPort = fs.getString("server_url"); String serverName = fs.getString("server_name"); String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_LOG_URL; doPostAsync(serverUrl, serverName, fse); } } catch (Exception e) { //捕获异常为了使采集定时任务正常运行 SpringMVCContextHolder.getSystemLogger().error(e); e.printStackTrace(); } } /** * 调用远程主服务器保存 * * @param fse * @return * @throws BaseException */ public void remoteSaveExtractConfig(FieldSetEntity fse) throws BaseException { DataTableEntity dt = getRemoteSubServer(); if (DataTableEntity.isEmpty(dt)) { return; } FieldSetEntity extractTargetSource = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, fse.getString("extract_target_source"), false); if (!FieldSetEntity.isEmpty(extractTargetSource)) { extractTargetSource.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, CmnConst.TABLE_SYNC_MANAGER); fse.setValue(CmnConst.TABLE_SYNC_MANAGER, BaseUtil.fieldSetEntityToJson(extractTargetSource)); } for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity fs = dt.getFieldSetEntity(i); //服务域名的端口 String ipPort = fs.getString("server_url"); String serverName = fs.getString("server_name"); String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_URL; doPost(serverUrl, serverName, fse); } } /** * 调用远程主服务器保存 * * @param fse * @return * @throws BaseException */ public void remoteSaveArchiveConfig(FieldSetEntity fse) throws BaseException { DataTableEntity dt = getRemoteSubServer(); if (DataTableEntity.isEmpty(dt)) { return; } for (int i = 0; i < dt.getRows(); i++) { FieldSetEntity fs = dt.getFieldSetEntity(i); //服务域名的端口 String ipPort = fs.getString("server_url"); String serverName = fs.getString("server_name"); String serverUrl = ipPort + CmnConst.SAVE_ARCHIVE_URL; doPost(serverUrl, serverName, fse); } } /** * 调用远程主服务器保存 * * @param fse * @return * @throws BaseException */ public void remoteSaveCollectLog(FieldSetEntity fse) throws BaseException { if ("ch-kt".equals(dataSystemName) || FieldSetEntity.isEmpty(fse)) { return; } FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false); if (FieldSetEntity.isEmpty(fs)) { throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY); } //服务域名的端口 String ipPort = fs.getString("server_url"); String serverName = fs.getString("server_name"); String serverUrl = ipPort + CmnConst.SAVE_COLLECT_LOG_URL; doPost(serverUrl, serverName, fse); } /** * 获取子服务配置 * * @return * @throws BaseException */ private DataTableEntity getRemoteSubServer() throws BaseException { return getBaseDao().listTable(CmnConst.PRODUCT_MES_SERVER, "server_type!=0"); } // @Async void doPostAsync(String url, String serverName, FieldSetEntity fse) throws BaseException { doPost(url, serverName, fse); } public String getDataSystemName() { return dataSystemName; } public boolean remoteRehandle(FieldSetEntity fse) { String type = fse.getString(CmnConst.TYPE);//类型 String logUuid; if ("2".equals(type)) { logUuid = fse.getString(CmnConst.PRE_STEP_UUID); } else if ("1".equals(type)) { logUuid = fse.getUUID(); } else { return false; } FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_COLLECT, "uuid =(select config_uuid FROM product_sys_data_center_log where uuid=?)", new Object[]{logUuid}, false); if (!FieldSetEntity.isEmpty(fs)) { //采集id String collectId = fs.getString(CmnConst.ID); if (collectId.indexOf(dataSystemName) == -1) { fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=1 and ? like concat(server_name,'%')", new Object[]{collectId}, false); if (!FieldSetEntity.isEmpty(fs)) { //服务域名的端口 String ipPort = fs.getString("server_url"); String serverName = fs.getString("server_name"); String serverUrl = ipPort + CmnConst.REHANDLE_ERROR_URL; doPost(serverUrl, serverName, fse); //标记日志成功 JournalManagerService journalManagerService = SpringUtils.getBean(JournalManagerService.class); journalManagerService.writeBackReDealResult(fse, true); } return true; } } return false; } /** * post 请求 * * @param url * @param serverName * @param fse * @throws BaseException */ private FieldSetEntity doPost(String url, String serverName, FieldSetEntity fse) throws BaseException { fse.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, fse.getTableName()); JSONObject requestBody = BaseUtil.fieldSetEntityToJson(fse); String requestData = requestBody.toJSONString(); //签名 String signature = SignUtil.getHmacSHA1(requestData, Global.getSystemConfig("signature.key", "")); try (HttpResponse response = HttpRequest.post(url) .contentType("application/x-www-form-urlencoded") .header(CoreConst.SYSTEM_LANGUAGE_CODE_, "zh-CN") //语言编码 .header(CoreConst.SYSTEM_CLIENT_TYPE_, "Web") //客户端类型 .header(CoreConst.SYSTEM_CLIENT_VERSION_, "1.0.0") //客户端版本 .header("server-name", serverName) //系统名称 .header("signature", signature) //签名 .body("formData=" + requestData).execute()) { if (response.getStatus() == 200) { //请求成功 String res = response.body(); if (!StringUtils.isEmpty(res)) { JSONObject resBody = JSON.parseObject(res); if (resBody != null) { String code = resBody.getString(CoreConst.API_RETURN_KEY_CODE); if (!CoreConst.API_RETURN_VALUE_CODE_200.equals(code)) { //服务内部抛出的错误 throw new BaseException("调用接口失败,", resBody.getString(CoreConst.API_RETURN_KEY_MSG)); } String formData = resBody.getString(CoreConst.API_RETURN_KEY_DATA); if (StringUtils.isEmpty(formData)) { return null; } FieldSetEntity result = JsonUtil.pareseJsonToFieldSetEntity(formData); return result; } } } throw new BaseException(ErrorCode.OPEN_API_REQUEST_FAIL); } } /** * 获取采集日志 * * @return */ public FieldSetEntity getCollectLog() { collectLogCache = null; if (collectLogCache == null) { loadCollectLogCache(); } return collectLogCache; } public void loadCollectLogCache() { StringBuilder sql = new StringBuilder(); sql.append("\n with no_extract as ( "); sql.append("\n SELECT "); sql.append("\n a.* "); sql.append("\n FROM "); sql.append("\n product_sys_data_center_log A "); sql.append("\n LEFT JOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid "); sql.append("\n WHERE "); sql.append("\n a.type = 1 "); sql.append("\n and a.result=1 and (a.deal_flag is null or a.deal_flag =0) "); sql.append("\n AND ( "); sql.append("\n b.pre_step_uuid IS NULL "); sql.append("\n OR "); sql.append("\n ( "); sql.append("\n b.pre_step_uuid=a.uuid "); sql.append("\n AND "); sql.append("\n (b.result=0 and (b.deal_result = 0 or b.deal_flag is null)) "); sql.append("\n ) "); sql.append("\n ) "); sql.append("\n ), "); sql.append("\n last_run_log as ( "); sql.append("\n select max(created_utc_datetime) last_time,config_uuid from product_sys_data_center_log where type=1 and result=1 and deal_flag=0 GROUP BY config_uuid "); sql.append("\n ) "); sql.append("\n "); sql.append("\n SELECT ifnull(COUNT(no_extract.uuid),0) unextracted_batch ,ifnull(sum(no_extract.count),0) unextracted_total_count,b.`name`,(select last_time from last_run_log where b.uuid=last_run_log.config_uuid) last_success_time FROM product_sys_data_collect b "); sql.append("\n left join no_extract on no_extract.config_uuid=b.uuid "); sql.append("\n GROUP BY b.uuid "); sql.append("\n order by b.`name` "); FieldSetEntity fse = new FieldSetEntity(); fse.setTableName("temp"); fse.setValue("load_log_time", DateTime.now().toString()); DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), new Object[]{}); if (!DataTableEntity.isEmpty(dataTableEntity)) { fse.setValue("list", BaseUtil.dataTableEntityToJson(dataTableEntity, f -> { JSONObject jsonObject = f[0]; Date lastSuccessTime = jsonObject.getDate("last_success_time"); if (lastSuccessTime != null) { jsonObject.put("last_success_time", DateUtil.format(lastSuccessTime, "yyyy-MM-dd HH:mm:ss")); } })); } else { fse.setValue("list", "[]"); } this.collectLogCache = fse; } }