package com.product.data.center.service;
|
|
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.date.DateTime;
|
import cn.hutool.core.date.DateUtil;
|
import cn.hutool.core.thread.ThreadUtil;
|
import cn.hutool.core.util.NumberUtil;
|
import cn.hutool.http.HttpRequest;
|
import cn.hutool.http.HttpResponse;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.google.common.collect.Sets;
|
import com.product.common.utils.spring.SpringUtils;
|
import com.product.core.config.CoreConst;
|
import com.product.core.config.Global;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.exception.BaseException;
|
import com.product.core.service.support.AbstractBaseService;
|
import com.product.core.sign.SignUtil;
|
import com.product.core.spring.context.SpringBeanUtil;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.core.transfer.Transactional;
|
import com.product.core.util.JsonUtil;
|
import com.product.data.center.config.CmnConst;
|
import com.product.data.center.config.ErrorCode;
|
import com.product.data.center.entity.HistoryEntity;
|
import com.product.data.center.service.ide.IMesExternalService;
|
import com.product.data.center.utils.QuerySqlParseUtil;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.dao.impl.OracleDaoImpl;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.quartz.service.IRemoteService;
|
import com.product.util.BaseUtil;
|
import oracle.jdbc.internal.OracleTypes;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
|
import javax.servlet.http.HttpServletRequest;
|
import java.sql.CallableStatement;
|
import java.sql.Connection;
|
import java.sql.SQLException;
|
import java.time.format.DateTimeFormatter;
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/12/16 13:20
|
* @Desc MES外部接口
|
*/
|
@Service
|
public class MesExternalService extends AbstractBaseService implements IMesExternalService, IRemoteService, com.product.data.service.impl.IRemoteService {
|
|
@Value("${data.system.name}")
|
private String dataSystemName;
|
|
private CommonService commonService = null;
|
|
private FieldSetEntity collectLogCache = null;
|
|
public CommonService getCommonService() {
|
if (this.commonService == null) {
|
this.commonService = SpringBeanUtil.getBean(CommonService.class);
|
}
|
return commonService;
|
}
|
|
public static void main(String[] args) {
|
Date parse = DateUtil.parse("2023-10-23 18:22:50", "yyyy-MM-dd HH:mm:ss");
|
Date parse1 = DateUtil.parse("2023-10-23 18:24:11", "yyyy-MM-dd HH:mm:ss");
|
Date parse2 = DateUtil.parse("2023-10-23 18:24:21", "yyyy-MM-dd HH:mm:ss");
|
|
Set<Date> set = Sets.newHashSet(parse, parse1, parse2);
|
|
List<Date> sets = CollectionUtil.toList(set.toArray(new Date[0]));
|
|
Optional<Date> max = sets.stream().max(Comparator.comparing((a) -> a.getTime()));
|
System.out.println(max.get());
|
|
}
|
|
|
public void splitTableData() {
|
// FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
|
// if (FieldSetEntity.isEmpty(reportDbConfig)) {
|
// throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
|
// }
|
// DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
|
// Dao reportDao = dbe.getDao();
|
// Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
|
//
|
// for (String tableName : trackingTableSet) {
|
// //获取年份从表名最后一个 下划线开始截取
|
// String year = tableName.substring(tableName.lastIndexOf("_") + 1);
|
// if (!StringUtils.equalsAny(year, "2023") || year.length() > 4) {
|
// continue;
|
// }
|
// //获取表前缀
|
// String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
|
// ExecutorService executorService = Executors.newFixedThreadPool(12);
|
// for (int i = 1; i <= 12; i++) {
|
// final int finalI = i;
|
// executorService.submit(() -> {
|
// Dao currentDao = dbe.newDao();
|
// //获取当前月份 以MM格式化
|
// String month = String.format("%02d", finalI);
|
// //检查月份对应的表是否存在
|
// String monthTableName = tablePrefix + "_" + year + month;
|
// Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
|
// if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
|
// //根据原始表结构创建新表
|
// String sql = "create table " + monthTableName + " like " + tableName;
|
// currentDao.executeSql(sql);
|
// SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
|
// }
|
// String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
|
// currentDao.executeSql(sql);
|
// currentDao.closeConnection();
|
// });
|
//
|
// }
|
// executorService.shutdown();
|
// while (true) {
|
// try {
|
// if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
|
// Thread.sleep(5000);
|
// SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
|
// } catch (InterruptedException e) {
|
// e.printStackTrace();
|
// }
|
//
|
// }
|
// }
|
// reportDao.closeConnection();
|
}
|
|
/**
|
* 获取历史数据
|
*/
|
public void getHistoryData(FieldSetEntity fse) throws BaseException, ExecutionException, InterruptedException {
|
|
//机号
|
String serialNumber = fse.getString("serial_number");
|
SpringMVCContextHolder.getSystemLogger().info("准备回写机号数据:" + serialNumber);
|
if (StringUtils.isEmpty(serialNumber)) {
|
throw new BaseException(ErrorCode.SERIAL_NUMBER_IS_NULL);
|
}
|
|
FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1 limit 1", null, false);
|
if (FieldSetEntity.isEmpty(fs)) {
|
throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
|
}
|
FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
|
if (FieldSetEntity.isEmpty(reportDbConfig)) {
|
throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
|
}
|
DataTableEntity dt = getBaseDao().listTable(CmnConst.PRODUCT_SYS_DATA_COLLECT, "upper(source_table) in ('T_WIP_TRACKING','T_WIP_PRODUCT_KEYP','T_WIP_DETAIL','T_PM_PRODUCT_SN') and data_source in (select uuid from product_sys_data_sync_manager)", new Object[]{}, new Object[]{"uuid,id,data_source,source_table"});
|
|
Map<String, List<FieldSetEntity>> groupByCollectId = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("id")));
|
Map<String, List<FieldSetEntity>> groupBySourceTable = dt.getData().stream().collect(Collectors.groupingBy(item -> item.getString("source_table")));
|
|
DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
|
Dao reportDao = dbe.getDao();
|
String reportDbName = dbe.getDbName();
|
Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_tracking");
|
HistoryEntity trackingData = historyBeforeDispose(getData(reportDao, trackingTableSet, "serial_number",
|
serialNumber, new ErrorCode[]{ErrorCode.TRACKING_TABLE_NOT_EXISTS, ErrorCode.NOT_FOUND_SERIAL_NUMBER}), CmnConst.T_WIP_TRACKING);
|
|
Set<String> keypTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_product_keyp");
|
HistoryEntity keypData = historyBeforeDispose(getData(reportDao, keypTableSet, "pk_product_sn", serialNumber, new ErrorCode[]{ErrorCode.KEYP_TABLE_NOT_EXISTS, ErrorCode.KEYP_DATA_NOT_FOUND}), CmnConst.T_WIP_PRODUCT_KEYP);
|
|
Set<String> detailTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_wip_detail");
|
HistoryEntity detailData = historyBeforeDispose(getData(reportDao, detailTableSet, "serial_number", serialNumber, new ErrorCode[]{ErrorCode.DETAIL_TABLE_NOT_EXISTS, ErrorCode.DETAIL_DATA_NOT_FOUND}), CmnConst.T_WIP_DETAIL);
|
|
// Set<String> productSnTableSet = QuerySqlParseUtil.getAllTableName(reportDao, reportDbName, "da_t_pm_product_sn");
|
// HistoryEntity productSnData = historyBeforeDispose(getData(reportDao, productSnTableSet, "product_sn", serialNumber, new ErrorCode[]{ErrorCode.PRODUCT_SN_TABLE_NOT_EXISTS, ErrorCode.PRODUCT_SN_DATA_NOT_FOUND}), CmnConst.T_PM_PRODUCT_SN);
|
|
|
//主库数据源配置
|
String masterDataSource = fs.getString("data_source");
|
dbe = new DataBaseEntity(masterDataSource);
|
Map<String, Dao> groupDao = new HashMap<>();
|
Dao dao = dbe.getDao();
|
boolean success = false;
|
try {
|
Connection connection = dao.getConnection();
|
connection.setAutoCommit(false);
|
HistoryEntity[] historyEntities = {trackingData, keypData, detailData};
|
insertMasterTableData(dao, historyEntities);
|
insertSubTableData(groupDao, groupByCollectId, groupBySourceTable, historyEntities);
|
connection.commit();
|
batchCommit(groupDao);
|
success = true;
|
SpringMVCContextHolder.getSystemLogger().info("回写机号:" + serialNumber + "成功");
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error("回写机号:" + serialNumber + "失败");
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
throw new BaseException(ErrorCode.INSERT_DATA_FAIL);
|
} finally {
|
try {
|
if (!success && !dao.getConnection().getAutoCommit()) {
|
dao.getConnection().rollback();
|
}
|
dao.closeConnection();
|
for (Dao value : groupDao.values()) {
|
try (Connection connection = value.getConnection()) {
|
if (!connection.getAutoCommit()) {
|
connection.rollback();
|
}
|
}
|
value.closeConnection();
|
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
}
|
|
public void batchCommit(Map<String, Dao> groupDao) throws SQLException {
|
for (Dao value : groupDao.values()) {
|
value.getConnection().commit();
|
}
|
}
|
|
|
/**
|
* 插入数据到主库
|
*
|
* @param dao
|
* @param historyEntities
|
*/
|
public void insertMasterTableData(Dao dao, HistoryEntity[] historyEntities) {
|
for (HistoryEntity historyEntity : historyEntities) {
|
if (historyEntity == null) {
|
continue;
|
}
|
DataTableEntity masterDataTable = historyEntity.getMasterDataTable();
|
if (DataTableEntity.isEmpty(masterDataTable)) {
|
continue;
|
}
|
Object[] objects = masterDataTable.getData().stream().map(item -> {
|
String primaryValue = item.getString(historyEntity.getPrimaryField());
|
//判断是否为数值且包含小数点 小数点后面是否全是0
|
if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
|
indexOf(".") + 1).matches("^0*$")) {
|
//返回整数字符串
|
return primaryValue.substring(0, primaryValue.indexOf("."));
|
}
|
return primaryValue;
|
}).toArray();
|
//查询主库数据是否存在
|
DataTableEntity list = dao.getList(historyEntity.getTableName(),
|
BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), objects.length, true),
|
new String[]{historyEntity.getPrimaryField()}, objects);
|
List<String> existsPrimaryValues = null;
|
if (!DataTableEntity.isEmpty(list)) {
|
existsPrimaryValues = list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
|
}
|
for (int i = 0; i < masterDataTable.getRows(); i++) {
|
String primaryValue = masterDataTable.getString(i, historyEntity.getPrimaryField());
|
if (existsPrimaryValues != null && existsPrimaryValues.contains(primaryValue)) {
|
//数据存在跳过该数据
|
continue;
|
}
|
FieldSetEntity fieldSetEntity = masterDataTable.getFieldSetEntity(i);
|
fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
|
fieldSetEntity.remove("~table_name~");
|
dao.add(fieldSetEntity);
|
}
|
}
|
}
|
|
|
/**
|
* 插入子库表数据
|
*
|
* @param groupDao 子库dao
|
* @param groupByCollectId 采集配置按采集id分组
|
* @param groupBySourceTable 采集配置按表名分组
|
* @param historyEntities 历史数据
|
*/
|
public void insertSubTableData(Map<String, Dao> groupDao, Map<String, List<FieldSetEntity>> groupByCollectId,
|
Map<String, List<FieldSetEntity>> groupBySourceTable, HistoryEntity[] historyEntities) throws Exception {
|
for (HistoryEntity historyEntity : historyEntities) {
|
if (historyEntity == null) {
|
continue;
|
}
|
Map<String, List<FieldSetEntity>> groupData = historyEntity.getGroupData();
|
if (groupData == null || groupData.isEmpty()) {
|
continue;
|
}
|
for (Map.Entry<String, List<FieldSetEntity>> entry : groupData.entrySet()) {
|
List<Dao> daoList = new ArrayList<>();
|
if ("ch-kt".equals(entry.getKey())) {
|
List<FieldSetEntity> fieldSetEntityList = groupBySourceTable.get(historyEntity.getTableName().toLowerCase());
|
Set<String> dataSourceSet = fieldSetEntityList.stream().map(item -> item.getString("data_source")).collect(Collectors.toSet());
|
for (String sourceUuid : dataSourceSet) {
|
daoList.add(getDao(groupDao, sourceUuid));
|
}
|
} else {
|
daoList.add(getDao(groupDao, groupByCollectId.get(entry.getKey()).get(0).getString("data_source")));
|
}
|
for (Dao dao : daoList) {
|
List<FieldSetEntity> value = entry.getValue();
|
//查询已存在的数据
|
DataTableEntity list = dao.getList(historyEntity.getTableName(),
|
BaseUtil.buildQuestionMarkFilter(historyEntity.getPrimaryField(), value.size(), true),
|
value.stream().map(item -> {
|
String primaryValue = item.getString(historyEntity.getPrimaryField());
|
//判断是否为数值且包含小数点 小数点后面是否全是0
|
if (NumberUtil.isNumber(primaryValue) && primaryValue.contains(".") && primaryValue.substring(primaryValue.
|
indexOf(".") + 1).matches("^0*$")) {
|
//返回整数字符串
|
return primaryValue.substring(0, primaryValue.indexOf("."));
|
}
|
return primaryValue;
|
}).toArray());
|
List<String> existIds = DataTableEntity.isEmpty(list) ? null : list.getData().stream().map(item -> item.getString(historyEntity.getPrimaryField())).collect(Collectors.toList());
|
for (FieldSetEntity fieldSetEntity : value) {
|
String primaryValue = fieldSetEntity.getString(historyEntity.getPrimaryField());
|
if (existIds != null && existIds.contains(NumberUtil.parseNumber(primaryValue).toString())) {
|
continue;
|
}
|
fieldSetEntity.getMeta().setTableName(new Object[]{historyEntity.getTableName()});
|
fieldSetEntity.remove("~table_name~");
|
dao.add(fieldSetEntity);
|
}
|
}
|
}
|
}
|
}
|
|
public Dao getDao(Map<String, Dao> groupDao, String sourceUuid) throws Exception {
|
Dao dao = groupDao.get(sourceUuid);
|
if (null == dao) {
|
DataBaseEntity dbe = new DataBaseEntity(sourceUuid);
|
dao = dbe.getDao();
|
dao.getConnection().setAutoCommit(false);
|
groupDao.put(sourceUuid, dao);
|
}
|
return dao;
|
}
|
|
public HistoryEntity historyBeforeDispose(DataTableEntity dt, String targetTableName) {
|
if (DataTableEntity.isEmpty(dt)) {
|
return null;
|
}
|
HistoryEntity historyEntity = new HistoryEntity();
|
historyEntity.setMoNumberField("mo_number");
|
if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName)) {
|
historyEntity.setPrimaryField("wip_id");
|
historyEntity.setTimeField("update_date");
|
} else if (CmnConst.T_WIP_PRODUCT_KEYP.equalsIgnoreCase(targetTableName)) {
|
historyEntity.setPrimaryField("pk_id");
|
historyEntity.setTimeField("pk_loadtime");
|
|
historyEntity.setMoNumberField("pk_mo");
|
} else if (CmnConst.T_WIP_DETAIL.equalsIgnoreCase(targetTableName)) {
|
historyEntity.setPrimaryField("wip_detail_id");
|
historyEntity.setTimeField("update_date");
|
} else if (CmnConst.T_PM_PRODUCT_SN.equalsIgnoreCase(targetTableName)) {
|
historyEntity.setPrimaryField("row_id");
|
historyEntity.setTimeField("update_date");
|
} else {
|
return null;
|
}
|
historyEntity.setTableName(targetTableName.toUpperCase());
|
Date now = new Date();
|
for (int i = 0; i < dt.getRows(); i++) {
|
Map<Object, Object> values = dt.getFieldSetEntity(i).getValues();
|
//遍历map中的value是否为数字,如果是数字判断小数点后面是否有值,如果没有值则转换为整数
|
for (Map.Entry<Object, Object> entry : values.entrySet()) {
|
if (entry.getValue() instanceof Number) {
|
Number number = (Number) entry.getValue();
|
if (number.doubleValue() == number.intValue()) {
|
entry.setValue(number.intValue());
|
}
|
}
|
}
|
}
|
if (CmnConst.T_WIP_TRACKING.equalsIgnoreCase(targetTableName) && dt.getRows() > 1) {
|
List<FieldSetEntity> data = dt.getData();
|
FieldSetEntity newData = null;
|
for (int i = 0; i < data.size(); i++) {
|
FieldSetEntity fieldSetEntity = data.get(i);
|
if ("ch-kt".equals(fieldSetEntity.getValue("pre_master_key"))) {
|
data.remove(i);
|
dt.removeFieldSetEntity(i);
|
break;
|
} else if (newData == null) {
|
newData = fieldSetEntity;
|
continue;
|
}
|
|
Date date = fieldSetEntity.getDate(historyEntity.getTimeField());
|
if (date.getTime() > newData.getDate(historyEntity.getTimeField()).getTime()) {
|
newData = fieldSetEntity;
|
}
|
}
|
if (newData != null) {
|
dt = new DataTableEntity();
|
dt.addFieldSetEntity(newData);
|
}
|
}
|
DataTableEntity subData = dt.clones();
|
DataTableEntity masterData = dt.clones();
|
for (int i = 0; i < subData.getRows(); i++) {
|
FieldSetEntity fs = subData.getFieldSetEntity(i);
|
String preMasterKey = fs.getString("pre_master_key");
|
if (historyEntity.getPrimaryField().equals(preMasterKey)) {
|
preMasterKey = null;
|
|
}
|
if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) {
|
preMasterKey = fs.getString(historyEntity.getPrimaryField());
|
masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
|
}
|
fs.setValue(historyEntity.getPrimaryField(), preMasterKey);
|
fs.setValue("pre_master_key", null);
|
masterData.setFieldValue(i, historyEntity.getTimeField(), now);
|
fs.setValue(historyEntity.getTimeField(), now);
|
fs.remove("~table_name~");
|
masterData.getFieldSetEntity(i).remove("~table_name~");
|
if (StringUtils.isEmpty(preMasterKey)) {
|
masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
|
}
|
if (StringUtils.isEmpty(historyEntity.getPrimaryField())) {
|
subData.removeFieldSetEntity(i);
|
masterData.removeFieldSetEntity(i);
|
i--;
|
}
|
}
|
historyEntity.setArchivedDataTable(dt);
|
historyEntity.setMasterDataTable(masterData);
|
historyEntity.setSubDataTable(subData);
|
return historyEntity;
|
}
|
|
public DataTableEntity getData(Dao dao, Set<String> tableSet, String filterFieldName, String
|
serialNumber, ErrorCode[] errorCodes) throws InterruptedException, ExecutionException {
|
if (CollectionUtil.isEmpty(tableSet)) {
|
throw new BaseException(errorCodes[0]);
|
}
|
String[] tableArray = tableSet.toArray(new String[]{});
|
StringBuilder sql = new StringBuilder();
|
for (String tableName : tableArray) {
|
if (sql.length() > 0) {
|
sql.append(" union all ");
|
}
|
sql.append("select a.*,'" + tableName + "' as '~table_name~' from " + tableName + " a where " + filterFieldName + " = ?");
|
}
|
DataTableEntity data = dao.getList(sql.toString(), Arrays.stream(tableArray).map((t) -> serialNumber).toArray());
|
// CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
|
// //多线程查询单张表,等待所有线程查询完毕
|
// for (String tableName : tableArray) {
|
// objectCompletionService.submit(() -> {
|
// DataTableEntity list = dao.getList("select a.*,'" + tableName + "' as '~table_name~' from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber});
|
// return list;
|
// });
|
// }
|
// DataTableEntity data = new DataTableEntity();
|
// for (int i = 0; i < tableArray.length; i++) {
|
// DataTableEntity dataTableEntity = objectCompletionService.take().get();
|
// BaseUtil.dataTableMerge(data, dataTableEntity);
|
// }
|
if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
|
throw new BaseException(errorCodes[1]);
|
}
|
return data;
|
}
|
|
|
/**
|
* 初始化制令单
|
*
|
* @param moNumbers
|
* @throws BaseException
|
*/
|
public void initMoBase(String[] moNumbers) throws BaseException {
|
FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1 limit 1", null, false);
|
if (FieldSetEntity.isEmpty(fs)) {
|
throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
|
}
|
|
String[] targetDataSource = fs.getString("target_data_source").split(",");
|
|
Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
|
StringBuilder errorMsg = new StringBuilder();
|
for (Dao dao : targetDao) {
|
for (String moNumber : moNumbers) {
|
synchronized (moNumber.intern()) {
|
try (Connection connection = dao.getConnection();
|
CallableStatement callableStatement = connection.prepareCall(
|
"{CALL SMT_T_PM_MO_BASE(?,?,?,?,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,?,null,null" +
|
",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null)}")) {
|
callableStatement.setInt(1, 5);
|
callableStatement.registerOutParameter(2, OracleTypes.SMALLINT);
|
callableStatement.registerOutParameter(3, OracleTypes.VARCHAR);
|
callableStatement.setString(4, moNumber);
|
callableStatement.setString(5, "1");
|
callableStatement.execute();
|
//执行后返回的错误码
|
int errorCode = callableStatement.getInt(2);
|
//执行后返回的错误信息
|
String errorText = callableStatement.getString(3);
|
if (errorCode != 0) {
|
//错误的
|
errorMsg.append("制令单号:").append(moNumber);
|
errorMsg.append("\nerrorCode:").append(errorCode);
|
errorMsg.append("\nerrorText:").append(errorText);
|
}
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
e.printStackTrace();
|
errorMsg.append("制令单号:").append(moNumber);
|
errorMsg.append("\n执行时未知错误:").append(e.getMessage());
|
}
|
}
|
}
|
}
|
if (errorMsg.length() > 0) {
|
throw new BaseException(ErrorCode.SUB_MO_BASE_INI_FAIL.getValue(), ErrorCode.SUB_MO_BASE_INI_FAIL.getText() + "。\n" + errorMsg);
|
}
|
}
|
|
/**
|
* @param moNumbers 制令单号,多个逗号分隔
|
* @param type 操作类型 1 创建制令单 2 更新制令单中的指定字段
|
* @throws BaseException
|
*/
|
public void updateMoBase(String[] moNumbers, int type) throws BaseException {
|
FieldSetEntity fs = getBaseDao().getFieldSetEntityByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MES, "LENGTH(data_source)>1 limit 1", null, false);
|
if (FieldSetEntity.isEmpty(fs)) {
|
throw new BaseException(ErrorCode.MO_NUMBER_SYNC_DATASOURCE_ERROR);
|
}
|
String dataSource = fs.getString("data_source");
|
String[] targetDataSource = fs.getString("target_data_source").split(",");
|
StringBuilder errorMessage = new StringBuilder();
|
DataBaseEntity dbe = new DataBaseEntity(dataSource);
|
for (int i = 0; i < moNumbers.length; i++) {
|
String moNumber = moNumbers[i];
|
try {
|
synchronized (moNumber.intern()) {
|
Dao dao = dbe.getDao();
|
FieldSetEntity one = dao.getOne("T_PM_MO_BASE", "MO_NUMBER=?", new Object[]{moNumber});
|
if (FieldSetEntity.isEmpty(one)) {
|
throw new BaseException(ErrorCode.MO_NUMBER_RECORD_SELECT_EMPTY);
|
}
|
one.setTableName("T_PM_MO_BASE");
|
dao.closeConnection();
|
Dao[] targetDao = Arrays.stream(targetDataSource).map(item -> new DataBaseEntity(item).getDao()).toArray(Dao[]::new);
|
StringBuilder errorMsg = new StringBuilder();
|
try {
|
if (1 == type) {
|
createMoBase(errorMsg, one, targetDao);
|
} else if (2 == type) {
|
updateMoBase(errorMsg, one, targetDao);
|
}
|
if (errorMsg.length() > 0) {
|
//有错误
|
throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMsg);
|
}
|
} finally {
|
for (Dao dao1 : targetDao) {
|
dao1.closeConnection();
|
}
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
errorMessage.append(e.getMessage()).append("\n");
|
}
|
|
}
|
if (errorMessage.length() > 0) {
|
throw new BaseException(ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getValue(), ErrorCode.SUB_MO_BASE_CREATE_OR_UPDATE_FAIL.getText() + "。\n" + errorMessage);
|
}
|
}
|
|
/**
|
* 创建制令单
|
*
|
* @param errorMsg
|
* @param moBase
|
* @param subDao
|
*/
|
private void createMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
|
String moNumber = moBase.getString("mo_number");
|
for (int i = 0; i < subDao.length; i++) {
|
Dao dao = subDao[i];
|
try {
|
//检查工单表是否存在
|
FieldSetEntity projectBase = dao.getOne("T_PM_PROJECT_BASE", "PROJECT_ID = ? ", new Object[]{moNumber});
|
if (FieldSetEntity.isEmpty(projectBase)) {
|
throw new BaseException(ErrorCode.SUB_PROJECT_BASE_CAN_NOT_EMPTY);
|
}
|
//检查制令单表是否已存在
|
FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
|
if (!FieldSetEntity.isEmpty(fs)) {
|
//进行更新操作
|
updateMoBase(errorMsg, moBase, new Dao[]{dao});
|
continue;
|
}
|
//将 T_PM_MO_BASE.PROJECT_ID 更改为子库中对应 T_PM_PROJECT_BASE.PROJECT_BASE_ID
|
moBase.setValue("project_id", projectBase.getString("project_base_id"));
|
// moBase.setValue("row_id", -1);
|
dao.add(moBase);
|
} catch (BaseException e) {
|
errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
|
append("]").append(e.getMessageInfo());
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
} catch (Exception e) {
|
errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("创建制令单失败,未知的错误");
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
}
|
|
/**
|
* 更新制令单
|
*
|
* @param errorMsg
|
* @param moBase
|
* @param subDao
|
*/
|
private void updateMoBase(StringBuilder errorMsg, FieldSetEntity moBase, Dao[] subDao) {
|
String moNumber = moBase.getString("mo_number");
|
for (int i = 0; i < subDao.length; i++) {
|
Dao dao = subDao[i];
|
try {
|
//检查制令单表是否已存在
|
FieldSetEntity fs = dao.getOne(moBase.getTableName(), "mo_number = ? ", new Object[]{moNumber});
|
if (FieldSetEntity.isEmpty(fs)) {
|
//进行创建操作
|
createMoBase(errorMsg, moBase, new Dao[]{dao});
|
continue;
|
}
|
// 当子库中制令单 input_qty 字段为 0 或者 为 null 时 更新多个字段,否则只更新target_qty 字段
|
String[] updateField = StringUtils.isEmpty(fs.getString("input_qty")) || "0".equals(fs.getString("input_qty")) ?
|
new String[]{"target_qty", "owner", "mo_create_date", "mo_schedule_date", "mo_due_date", "areaid", "technicsid", "close_flag", "default_group", "end_group"}
|
: new String[]{"target_qty"};
|
StringBuilder sql = new StringBuilder();
|
sql.append(" UPDATE T_PM_MO_BASE SET ");
|
Object[] params = new Object[updateField.length + 1];
|
for (int k = 0; k < updateField.length; k++) {
|
if (k > 0) {
|
sql.append(",");
|
}
|
params[k] = moBase.getObject(updateField[k]);
|
sql.append(updateField[k].toUpperCase()).append(" = ? ");
|
}
|
params[updateField.length] = moNumber;
|
sql.append(" WHERE MO_NUMBER = ? ");
|
dao.executeSql(sql.toString(), params);
|
} catch (BaseException e) {
|
errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).
|
append("]").append(e.getMessageInfo());
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
} catch (Exception e) {
|
errorMsg.append("制令单号:[ " + moNumber + " ]IP:[").append(getIp(dao)).append("]").append("更新制令单失败,未知的错误");
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
}
|
|
private String getIp(Dao dao) {
|
return ((OracleDaoImpl) dao).getDataBaseEntity().getIp();
|
}
|
|
/**
|
* 采集配置保存
|
*
|
* @param request
|
* @throws BaseException
|
*/
|
@Override
|
@Transactional
|
public void saveCollectConfig(HttpServletRequest request) throws BaseException {
|
//服务名称
|
String serverName = request.getHeader("server-name");
|
if (StringUtils.isEmpty(serverName)) {
|
throw new BaseException(ErrorCode.SERVER_NAME_CAN_NOT_EMPTY);
|
}
|
FieldSetEntity fse = BaseUtil.getFieldSetEntity(request, CmnConst.PRODUCT_SYS_DATA_COLLECT);
|
//采集来源
|
String sourceInfo = fse.getString("id");
|
if (StringUtils.isEmpty(sourceInfo) || StringUtils.contains(sourceInfo, dataSystemName)) {
|
throw new BaseException(ErrorCode.COLLECT_SOURCE_VALUE);
|
}
|
commonSave(fse);
|
}
|
|
/**
|
* 提取配置保存
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
@Override
|
@Transactional
|
public void saveExtractConfig(FieldSetEntity fse) throws BaseException {
|
String dataSource = fse.getString(CmnConst.TABLE_SYNC_MANAGER);
|
if (!StringUtils.isEmpty(dataSource)) {
|
FieldSetEntity fs = JsonUtil.pareseJsonToFieldSetEntity(dataSource);
|
if (!FieldSetEntity.isEmpty(fs)) {
|
commonSave(fs);
|
}
|
}
|
commonSave(fse);
|
getCommonService().saveDelRecordConfig(2, fse.getUUID());
|
}
|
|
/**
|
* 归档配置保存
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
@Override
|
@Transactional
|
public void saveArchiveConfig(FieldSetEntity fse) throws BaseException {
|
commonSave(fse);
|
}
|
|
/**
|
* @param fse
|
* @throws BaseException
|
*/
|
@Override
|
public void saveCollectLog(FieldSetEntity fse) throws BaseException {
|
this.commonSave(fse);
|
}
|
|
/**
|
* @param fse
|
* @throws BaseException
|
*/
|
@Override
|
public void saveExtractLog(FieldSetEntity fse) throws BaseException {
|
this.commonSave(fse);
|
}
|
|
/**
|
* 数据源保存会调用该方法
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
@Override
|
public void saveSyncConnectionConfig(FieldSetEntity fse) throws BaseException {
|
//子服务保存数据库连接配置后调用该方法,传输数据到主服务
|
//TODO
|
}
|
|
/**
|
* 定时任务生成日志会调用该方法
|
*
|
* @param fse
|
* @throws BaseException
|
*/
|
@Override
|
public void saveTimeLog(FieldSetEntity fse) throws BaseException {
|
//主服务提取日志保存后调用该方法传输到子服务
|
//TODO
|
|
}
|
|
private void commonSave(FieldSetEntity fse) throws BaseException {
|
String uuid = fse.getUUID();
|
FieldSetEntity fs = getBaseDao().getFieldSetEntity(fse.getTableName(), new String[]{CmnConst.UUID}, uuid, false);
|
Map<String, DataTableEntity> subData = fse.getSubData();
|
if (!CollectionUtil.isEmpty(subData)) {
|
DataTableEntity addDt = new DataTableEntity();
|
DataTableEntity updateDt = new DataTableEntity();
|
for (Map.Entry<String, DataTableEntity> entry : subData.entrySet()) {
|
String tableName = entry.getKey();
|
DataTableEntity value = entry.getValue();
|
if (DataTableEntity.isEmpty(value)) {
|
continue;
|
}
|
Object[] uuids = value.getUuids();
|
DataTableEntity dt = getBaseDao().listTable(tableName, BaseUtil.buildQuestionMarkFilter(CmnConst.UUID, uuids.length, true), uuids, new String[]{CmnConst.UUID});
|
for (int i = 0; i < value.getRows(); i++) {
|
uuid = value.getString(i, CmnConst.UUID);
|
List<FieldSetEntity> fieldSetEntity = dt.getFieldSetEntity(uuid);
|
if (CollectionUtil.isEmpty(fieldSetEntity)) {
|
addDt.addFieldSetEntity(value.getFieldSetEntity(i));
|
value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "add");
|
} else {
|
addDt.addFieldSetEntity(updateDt.getFieldSetEntity(i));
|
value.setFieldValue(i, CoreConst.SYSTEM_DATA_OPERATE_TYPE, "update");
|
}
|
}
|
if (!DataTableEntity.isEmpty(addDt)) {
|
getBaseDao().add(addDt);
|
}
|
if (!DataTableEntity.isEmpty(updateDt)) {
|
getBaseDao().update(updateDt);
|
}
|
}
|
}
|
if (FieldSetEntity.isEmpty(fs)) {
|
getBaseDao().add(fse, false);
|
} else {
|
getBaseDao().update(fse, false);
|
}
|
}
|
|
/**
|
* 调用远程主服务器采集保存
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
public FieldSetEntity remoteSaveCollectConfig(FieldSetEntity fse) throws BaseException {
|
if ("ch-kt".equals(dataSystemName)) {
|
return fse;
|
}
|
FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
|
if (FieldSetEntity.isEmpty(fs)) {
|
throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
|
}
|
//服务域名的端口
|
String ipPort = fs.getString("server_url");
|
String serverName = fs.getString("server_name");
|
String serverUrl = ipPort + CmnConst.SAVE_COLLECT_URL;
|
FieldSetEntity res = doPost(serverUrl, serverName, fse);
|
return res;
|
}
|
|
/**
|
* 主服务提取保存日志后调用该方法传入到子服务保存
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
public void remoteSaveExtractLog(FieldSetEntity fse) throws BaseException {
|
try {
|
if ("ch-kt".equals(dataSystemName)) {
|
//主服务采集日志uuid
|
String preStepUuid = fse.getString(CmnConst.PRE_STEP_UUID);
|
StringBuilder sql = new StringBuilder();
|
sql.append("\nSELECT ");
|
sql.append("\nserver.* ");
|
sql.append("\nFROM ").append(CmnConst.PRODUCT_SYS_DATA_COLLECT).append(" collect ");
|
sql.append("\nJOIN ").append(CmnConst.PRODUCT_SYS_DATA_CENTER_LOG).append(" log ");
|
sql.append("\nON log.config_uuid=collect.uuid and log.type=1 ");
|
sql.append("\nJOIN ").append(CmnConst.PRODUCT_MES_SERVER).append(" server ");
|
sql.append("\nON collect.id like concat(server.server_name,'%') ");
|
sql.append("\nWHERE log.uuid=? and collect.id not like concat(?,'%') limit 1");
|
FieldSetEntity fs = getBaseDao().getFieldSetBySQL(sql.toString(), new Object[]{preStepUuid, dataSystemName}, false);
|
if (FieldSetEntity.isEmpty(fs)) {
|
return;
|
}
|
String ipPort = fs.getString("server_url");
|
String serverName = fs.getString("server_name");
|
String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_LOG_URL;
|
doPostAsync(serverUrl, serverName, fse);
|
}
|
} catch (Exception e) {
|
//捕获异常为了使采集定时任务正常运行
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 调用远程主服务器保存
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
public void remoteSaveExtractConfig(FieldSetEntity fse) throws BaseException {
|
DataTableEntity dt = getRemoteSubServer();
|
if (DataTableEntity.isEmpty(dt)) {
|
return;
|
}
|
FieldSetEntity extractTargetSource = getBaseDao().getFieldSetEntity(CmnConst.TABLE_SYNC_MANAGER, fse.getString("extract_target_source"), false);
|
if (!FieldSetEntity.isEmpty(extractTargetSource)) {
|
extractTargetSource.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, CmnConst.TABLE_SYNC_MANAGER);
|
fse.setValue(CmnConst.TABLE_SYNC_MANAGER, BaseUtil.fieldSetEntityToJson(extractTargetSource));
|
}
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fs = dt.getFieldSetEntity(i);
|
//服务域名的端口
|
String ipPort = fs.getString("server_url");
|
String serverName = fs.getString("server_name");
|
String serverUrl = ipPort + CmnConst.SAVE_EXTRACT_URL;
|
doPost(serverUrl, serverName, fse);
|
}
|
}
|
|
/**
|
* 调用远程主服务器保存
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
public void remoteSaveArchiveConfig(FieldSetEntity fse) throws BaseException {
|
DataTableEntity dt = getRemoteSubServer();
|
if (DataTableEntity.isEmpty(dt)) {
|
return;
|
}
|
for (int i = 0; i < dt.getRows(); i++) {
|
FieldSetEntity fs = dt.getFieldSetEntity(i);
|
//服务域名的端口
|
String ipPort = fs.getString("server_url");
|
String serverName = fs.getString("server_name");
|
String serverUrl = ipPort + CmnConst.SAVE_ARCHIVE_URL;
|
doPost(serverUrl, serverName, fse);
|
}
|
}
|
|
/**
|
* 调用远程主服务器保存
|
*
|
* @param fse
|
* @return
|
* @throws BaseException
|
*/
|
public void remoteSaveCollectLog(FieldSetEntity fse) throws BaseException {
|
if ("ch-kt".equals(dataSystemName) || FieldSetEntity.isEmpty(fse)) {
|
return;
|
}
|
FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=0", null, false);
|
if (FieldSetEntity.isEmpty(fs)) {
|
throw new BaseException(ErrorCode.REMOTE_SERVER_CONFIG_EMPTY);
|
}
|
//服务域名的端口
|
String ipPort = fs.getString("server_url");
|
String serverName = fs.getString("server_name");
|
String serverUrl = ipPort + CmnConst.SAVE_COLLECT_LOG_URL;
|
doPost(serverUrl, serverName, fse);
|
}
|
|
|
/**
|
* 获取子服务配置
|
*
|
* @return
|
* @throws BaseException
|
*/
|
private DataTableEntity getRemoteSubServer() throws BaseException {
|
return getBaseDao().listTable(CmnConst.PRODUCT_MES_SERVER, "server_type!=0");
|
}
|
|
// @Async
|
void doPostAsync(String url, String serverName, FieldSetEntity fse) throws BaseException {
|
doPost(url, serverName, fse);
|
}
|
|
public String getDataSystemName() {
|
return dataSystemName;
|
}
|
|
public boolean remoteRehandle(FieldSetEntity fse) {
|
String type = fse.getString(CmnConst.TYPE);//类型
|
String logUuid;
|
if ("2".equals(type)) {
|
logUuid = fse.getString(CmnConst.PRE_STEP_UUID);
|
} else if ("1".equals(type)) {
|
logUuid = fse.getUUID();
|
} else {
|
return false;
|
}
|
FieldSetEntity fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_COLLECT, "uuid =(select config_uuid FROM product_sys_data_center_log where uuid=?)", new Object[]{logUuid}, false);
|
if (!FieldSetEntity.isEmpty(fs)) {
|
//采集id
|
String collectId = fs.getString(CmnConst.ID);
|
if (collectId.indexOf(dataSystemName) == -1) {
|
fs = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_MES_SERVER, "server_type=1 and ? like concat(server_name,'%')", new Object[]{collectId}, false);
|
if (!FieldSetEntity.isEmpty(fs)) {
|
//服务域名的端口
|
String ipPort = fs.getString("server_url");
|
String serverName = fs.getString("server_name");
|
String serverUrl = ipPort + CmnConst.REHANDLE_ERROR_URL;
|
doPost(serverUrl, serverName, fse);
|
//标记日志成功
|
JournalManagerService journalManagerService = SpringUtils.getBean(JournalManagerService.class);
|
journalManagerService.writeBackReDealResult(fse, true);
|
}
|
return true;
|
}
|
}
|
return false;
|
}
|
|
/**
|
* post 请求
|
*
|
* @param url
|
* @param serverName
|
* @param fse
|
* @throws BaseException
|
*/
|
private FieldSetEntity doPost(String url, String serverName, FieldSetEntity fse) throws BaseException {
|
fse.setValue(CoreConst.SYSTEM_TABLE_NAME_LABEL, fse.getTableName());
|
JSONObject requestBody = BaseUtil.fieldSetEntityToJson(fse);
|
String requestData = requestBody.toJSONString();
|
//签名
|
String signature = SignUtil.getHmacSHA1(requestData, Global.getSystemConfig("signature.key", ""));
|
try (HttpResponse response = HttpRequest.post(url)
|
.contentType("application/x-www-form-urlencoded")
|
.header(CoreConst.SYSTEM_LANGUAGE_CODE_, "zh-CN") //语言编码
|
.header(CoreConst.SYSTEM_CLIENT_TYPE_, "Web") //客户端类型
|
.header(CoreConst.SYSTEM_CLIENT_VERSION_, "1.0.0") //客户端版本
|
.header("server-name", serverName) //系统名称
|
.header("signature", signature) //签名
|
.body("formData=" + requestData).execute()) {
|
if (response.getStatus() == 200) {
|
//请求成功
|
String res = response.body();
|
if (!StringUtils.isEmpty(res)) {
|
JSONObject resBody = JSON.parseObject(res);
|
if (resBody != null) {
|
String code = resBody.getString(CoreConst.API_RETURN_KEY_CODE);
|
if (!CoreConst.API_RETURN_VALUE_CODE_200.equals(code)) {
|
//服务内部抛出的错误
|
throw new BaseException("调用接口失败,", resBody.getString(CoreConst.API_RETURN_KEY_MSG));
|
}
|
String formData = resBody.getString(CoreConst.API_RETURN_KEY_DATA);
|
if (StringUtils.isEmpty(formData)) {
|
return null;
|
}
|
FieldSetEntity result = JsonUtil.pareseJsonToFieldSetEntity(formData);
|
return result;
|
}
|
}
|
}
|
throw new BaseException(ErrorCode.OPEN_API_REQUEST_FAIL);
|
}
|
|
}
|
|
/**
|
* 获取采集日志
|
*
|
* @return
|
*/
|
public FieldSetEntity getCollectLog() {
|
collectLogCache = null;
|
if (collectLogCache == null) {
|
loadCollectLogCache();
|
}
|
|
return collectLogCache;
|
}
|
|
public void loadCollectLogCache() {
|
StringBuilder sql = new StringBuilder();
|
sql.append("\n with no_extract as ( ");
|
sql.append("\n SELECT ");
|
sql.append("\n a.* ");
|
sql.append("\n FROM ");
|
sql.append("\n product_sys_data_center_log A ");
|
sql.append("\n LEFT JOIN product_sys_data_center_log b ON a.uuid = b.pre_step_uuid ");
|
sql.append("\n WHERE ");
|
sql.append("\n a.type = 1 ");
|
sql.append("\n and a.result=1 and (a.deal_flag is null or a.deal_flag =0) ");
|
sql.append("\n AND ( ");
|
sql.append("\n b.pre_step_uuid IS NULL ");
|
sql.append("\n OR ");
|
sql.append("\n ( ");
|
sql.append("\n b.pre_step_uuid=a.uuid ");
|
sql.append("\n AND ");
|
sql.append("\n (b.result=0 and (b.deal_result = 0 or b.deal_flag is null)) ");
|
sql.append("\n ) ");
|
sql.append("\n ) ");
|
sql.append("\n ), ");
|
sql.append("\n last_run_log as ( ");
|
sql.append("\n select max(created_utc_datetime) last_time,config_uuid from product_sys_data_center_log where type=1 and result=1 and deal_flag=0 GROUP BY config_uuid ");
|
sql.append("\n ) ");
|
sql.append("\n ");
|
sql.append("\n SELECT ifnull(COUNT(no_extract.uuid),0) unextracted_batch ,ifnull(sum(no_extract.count),0) unextracted_total_count,b.`name`,(select last_time from last_run_log where b.uuid=last_run_log.config_uuid) last_success_time FROM product_sys_data_collect b ");
|
sql.append("\n left join no_extract on no_extract.config_uuid=b.uuid ");
|
sql.append("\n GROUP BY b.uuid ");
|
sql.append("\n order by b.`name` ");
|
|
FieldSetEntity fse = new FieldSetEntity();
|
fse.setTableName("temp");
|
fse.setValue("load_log_time", DateTime.now().toString());
|
DataTableEntity dataTableEntity = getBaseDao().listTable(sql.toString(), new Object[]{});
|
if (!DataTableEntity.isEmpty(dataTableEntity)) {
|
fse.setValue("list", BaseUtil.dataTableEntityToJson(dataTableEntity, f -> {
|
JSONObject jsonObject = f[0];
|
Date lastSuccessTime = jsonObject.getDate("last_success_time");
|
if (lastSuccessTime != null) {
|
jsonObject.put("last_success_time", DateUtil.format(lastSuccessTime, "yyyy-MM-dd HH:mm:ss"));
|
}
|
}));
|
} else {
|
fse.setValue("list", "[]");
|
}
|
this.collectLogCache = fse;
|
}
|
}
|