cheng
2023-10-25 07bd010b2bbd939ade44c7b1a299fa911f8742e8
commit
已修改1个文件
144 ■■■■ 文件已修改
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java 144 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/MesExternalService.java
@@ -84,60 +84,57 @@
    public void splitTableData() {
        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
        if (FieldSetEntity.isEmpty(reportDbConfig)) {
            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
        }
        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
        Dao reportDao = dbe.getDao();
        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
        for (String tableName : trackingTableSet) {
            //获取年份从表名最后一个 下划线开始截取
            String year = tableName.substring(tableName.lastIndexOf("_") + 1);
            if (StringUtils.equalsAny(year, "2017", "2018") || year.length() > 4) {
                continue;
            }
            //获取表前缀
            String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
            ExecutorService executorService = Executors.newFixedThreadPool(12);
            for (int i = 1; i <= 12; i++) {
                if ("2023".equals(year) && i > 9) {
                    break;
                }
                final int finalI = i;
                executorService.submit(() -> {
                    Dao currentDao = dbe.newDao();
                    //获取当前月份 以MM格式化
                    String month = String.format("%02d", finalI);
                    //检查月份对应的表是否存在
                    String monthTableName = tablePrefix + "_" + year + month;
                    Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
                    if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
                        //根据原始表结构创建新表
                        String sql = "create table " + monthTableName + " like " + tableName;
                        currentDao.executeSql(sql);
                        SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
                    }
                    String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
                    currentDao.executeSql(sql);
                    currentDao.closeConnection();
                });
            }
            executorService.shutdown();
            while (true) {
                try {
                    if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
                    Thread.sleep(5000);
                    SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        reportDao.closeConnection();
//        FieldSetEntity reportDbConfig = getBaseDao().getFieldSetByFilter(CmnConst.PRODUCT_SYS_DATA_SYNC_MANAGER, "SYNC_NAME=?", new Object[]{"报表数据库"}, false);
//        if (FieldSetEntity.isEmpty(reportDbConfig)) {
//            throw new BaseException(ErrorCode.REPORT_DB_CONFIG_FAIL);
//        }
//        DataBaseEntity dbe = new DataBaseEntity(reportDbConfig);
//        Dao reportDao = dbe.getDao();
//        Set<String> trackingTableSet = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), "da_t_wip_tracking");
//
//        for (String tableName : trackingTableSet) {
//            //获取年份从表名最后一个 下划线开始截取
//            String year = tableName.substring(tableName.lastIndexOf("_") + 1);
//            if (!StringUtils.equalsAny(year, "2023") || year.length() > 4) {
//                continue;
//            }
//            //获取表前缀
//            String tablePrefix = tableName.substring(0, tableName.lastIndexOf("_"));
//            ExecutorService executorService = Executors.newFixedThreadPool(12);
//            for (int i = 1; i <= 12; i++) {
//                final int finalI = i;
//                executorService.submit(() -> {
//                    Dao currentDao = dbe.newDao();
//                    //获取当前月份 以MM格式化
//                    String month = String.format("%02d", finalI);
//                    //检查月份对应的表是否存在
//                    String monthTableName = tablePrefix + "_" + year + month;
//                    Set<String> allTableName = QuerySqlParseUtil.getAllTableName(reportDao, dbe.getDbName(), tablePrefix + "_" + year + month);
//                    if (allTableName.size() == 0 || !allTableName.contains(monthTableName)) {
//                        //根据原始表结构创建新表
//                        String sql = "create table " + monthTableName + " like " + tableName;
//                        currentDao.executeSql(sql);
//                        SpringMVCContextHolder.getSystemLogger().info("创建表:" + monthTableName);
//                    }
//                    String sql = "INSERT INTO " + monthTableName + " SELECT * FROM " + tableName + " WHERE MONTH(update_date)=" + finalI;
//                    currentDao.executeSql(sql);
//                    currentDao.closeConnection();
//                });
//
//            }
//            executorService.shutdown();
//            while (true) {
//                try {
//                    if (executorService.awaitTermination(5, TimeUnit.SECONDS)) break;
//                    Thread.sleep(5000);
//                    SpringMVCContextHolder.getSystemLogger().info("线程等待中...");
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//
//            }
//        }
//        reportDao.closeConnection();
    }
    /**
@@ -410,9 +407,6 @@
                dt = new DataTableEntity();
                dt.addFieldSetEntity(newData);
            }
            //在list中取出时间最近的数据
            Optional<FieldSetEntity> max = data.stream().max(Comparator.comparing((a) -> a.getDate(historyEntity.getTimeField()).getTime()));
        }
        DataTableEntity subData = dt.clones();
        DataTableEntity masterData = dt.clones();
@@ -421,17 +415,21 @@
            String preMasterKey = fs.getString("pre_master_key");
            if (historyEntity.getPrimaryField().equals(preMasterKey)) {
                preMasterKey = null;
            }
            if ("ch-kt".equals(fs.getString("source_info")) && StringUtils.isEmpty(preMasterKey)) {
                preMasterKey = fs.getString(historyEntity.getPrimaryField());
                masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
            }
            fs.setValue(historyEntity.getPrimaryField(), preMasterKey);
            fs.setValue("pre_master_key", preMasterKey);
            fs.setValue("pre_master_key", null);
            masterData.setFieldValue(i, historyEntity.getTimeField(), now);
            fs.setValue(historyEntity.getTimeField(), now);
            fs.remove("~table_name~");
            masterData.getFieldSetEntity(i).remove("~table_name~");
            masterData.setFieldValue(i, "pre_master_key", null);
            if (StringUtils.isEmpty(preMasterKey)) {
                masterData.setFieldValue(i, "pre_master_key", masterData.getObject(i, historyEntity.getPrimaryField()));
            }
            if (StringUtils.isEmpty(historyEntity.getPrimaryField())) {
                subData.removeFieldSetEntity(i);
                masterData.removeFieldSetEntity(i);
@@ -450,15 +448,29 @@
            throw new BaseException(errorCodes[0]);
        }
        String[] tableArray = tableSet.toArray(new String[]{});
        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
        //多线程查询单张表,等待所有线程查询完毕
        StringBuilder sql = new StringBuilder();
        for (String tableName : tableArray) {
            objectCompletionService.submit(() -> dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber}));
            if (sql.length() > 0) {
                sql.append(" union all ");
        }
        DataTableEntity data = new DataTableEntity();
        for (int i = 0; i < tableArray.length; i++) {
            DataTableEntity dataTableEntity = objectCompletionService.take().get();
            BaseUtil.dataTableMerge(data, dataTableEntity);
            sql.append("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?");
        }
        DataTableEntity data = dao.getList(sql.toString(), Arrays.stream(tableArray).map((t) -> serialNumber).toArray());
//        CompletionService<DataTableEntity> objectCompletionService = ThreadUtil.newCompletionService();
//        //多线程查询单张表,等待所有线程查询完毕
//        for (String tableName : tableArray) {
//            objectCompletionService.submit(() -> {
//                DataTableEntity list = dao.getList("select a.*,'" + tableName + "' as '~table_name~'  from " + tableName + " a where " + filterFieldName + " = ?", new Object[]{serialNumber});
//                return list;
//            });
//        }
//        DataTableEntity data = new DataTableEntity();
//        for (int i = 0; i < tableArray.length; i++) {
//            DataTableEntity dataTableEntity = objectCompletionService.take().get();
//            BaseUtil.dataTableMerge(data, dataTableEntity);
//        }
        if (DataTableEntity.isEmpty(data) && !"product_sn".equals(filterFieldName)) {
            throw new BaseException(errorCodes[1]);
        }
        return data;
    }