package com.product.kt.test.service; import cn.hutool.core.collection.ListUtil; import cn.hutool.core.thread.ThreadUtil; import com.product.common.lang.StringUtils; import com.product.core.entity.DataTableEntity; import com.product.core.entity.FieldSetEntity; import com.product.core.spring.context.SpringMVCContextHolder; import com.product.datasource.config.DataBaseType; import com.product.datasource.dao.Dao; import com.product.datasource.entity.DataBaseEntity; import com.product.datasource.entity.UpdateFilterEntity; import com.product.datasource.service.RedisService; import redis.clients.jedis.Jedis; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; /** * @Author cheng * @Date 2022/12/21 17:57 * @Desc */ public class InitTProductSn { private static String[] sourceUuid = {"10.13.1.36", "10.13.1.40", "10.13.1.41", "10.13.1.42"}; private Dao dao; private DataBaseEntity dbe; private static RedisService redisService; private final int pageSize = 100000; private ExecutorService executorService = Executors.newFixedThreadPool(1); private Queue queue = new LinkedBlockingQueue(); private String ip; public void initDao(String ip) { DataBaseEntity dbe = new DataBaseEntity(DataBaseType.ORACLE.getValue()); dbe.setUserName("CHKTADMIN"); dbe.setPassWord("CHKTADMIN"); dbe.setPort("1521"); dbe.setServerName("ORCL"); dbe.setIp(ip); this.dbe = dbe; this.dao = dbe.getDao(); } public synchronized void getRedisBase() { if (redisService == null) { DataBaseEntity dbe = new DataBaseEntity(5); dbe.setIp("127.0.0.1"); dbe.setPort("6379"); dbe.setDbInstance("0"); this.redisService = new RedisService(dbe); } } public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(sourceUuid.length); for (String ip : sourceUuid) { executorService.submit(() -> { InitTProductSn initTProductSn = new InitTProductSn(); initTProductSn.main(ip); }); } executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS); } private boolean selectFlag = false; public void main(String ip) { this.ip = ip; this.initDao(ip); this.getRedisBase(); String maxValue = null; DataTableEntity data; executorService.submit(() -> { updateData(); }); this.executorService.shutdown(); int i = 1; try { do { while (queue.size() >= 10) { // print("已查询到10批次等待更新处理....."); // print("更新线程状态:" + executorService.isTerminated()); ThreadUtil.sleep(3000); } print("开始查询 第 " + i + " 次,最大值为:" + maxValue); data = getData(maxValue); if (DataTableEntity.isEmpty(data)) { break; } print("第 " + i + "次查询,总条数:" + data.getRows()); queue.add(data); maxValue = data.getData().stream().max(Comparator.comparing(item -> item.getString("product_sn"))).get().getString("product_sn"); i++; } while (!DataTableEntity.isEmpty(data)); } catch (Exception e) { SpringMVCContextHolder.getSystemLogger().error(e); e.printStackTrace(); } this.selectFlag = true; this.dao.closeConnection(); } public void updateData() { Dao dao = null; Long time = System.currentTimeMillis(); int updateCount = 1; while (!selectFlag) { try { DataTableEntity data = queue.poll(); if (DataTableEntity.isEmpty(data)) { // print("更新线程等待查询数据..."); ThreadUtil.sleep(3000); continue; } if (dao == null || (System.currentTimeMillis() - time > 1000 * 60 * 10)) { if (dao != null) { dao.closeConnection(); } else { dao = dbe.newDao(); } } int rows = data.getRows(); List> split = ListUtil.split(data.getData(), 1000); for (List fieldSetEntities : split) { data = new DataTableEntity(); data.setData(fieldSetEntities.stream().filter(item -> { item.setValue("row_id", getRowId(item.getString("product_sn"), item.getInteger("row_id"))); item.setValue("pre_master_key", ""); return true; }).collect(Collectors.toList())); if (this.ip.equals(sourceUuid[0])) { data.getMeta().setFields(new Object[]{"product_sn", "row_id", "pre_master_key"}); } else { data.getMeta().setFields(new Object[]{"product_sn", "row_id", "update_date", "pre_master_key"}); } data.getMeta().setTableName(new Object[]{"T_PM_PRODUCT_SN"}); dao.updateBatch(data, new UpdateFilterEntity(" PRODUCT_SN =? ", new String[]{"product_sn"}), true); } print("更新完成,更新条数:" + rows + ", 更新次数:" + updateCount++); } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } } if (dao != null) { dao.closeConnection(); } print("更新线程结束----------------------------------------------------------------------------------------------------------------------------------------------------------------"); } private static int rowId = 19144409; private static boolean initFlag = false; private static synchronized void initRow() { if (!initFlag) { redisService.set("T_PM_PRODUCT_SN", rowId, false); initFlag = true; } } public static int getRowId(String uniqueValue, Integer row) { if (!initFlag) { initRow(); } synchronized (uniqueValue.intern()) { Jedis jedis = redisService.getJedis(); try { String o = jedis.get(uniqueValue); if (StringUtils.isEmpty(o)) { row = Integer.valueOf(o); } else { if (row == null || row.intValue() <= 0) { row = jedis.incr("T_PM_PRODUCT_SN").intValue(); } jedis.set(uniqueValue, row.toString()); } return row; } catch (Exception e) { e.printStackTrace(); SpringMVCContextHolder.getSystemLogger().error(e); } finally { jedis.close(); } return -1; } } // public static int getRowId(String uniqueValue, Integer row) { // synchronized (uniqueValue.intern()) { // Num n = (Num) redisService.get(uniqueValue, false); // if (row != null && row > 0) { // if (n == null) { // n = new Num(); // n.num = row; // n.count = 1; // } else { // int num = n.num; // if (num != row.intValue()) { // } else { // row = num; // } // n.num++; // } // redisService.set(uniqueValue, n, false); // return row; // } // if (n != null) { // int num =n.num; // if (n.count >= 2) { // map.remove(uniqueValue); // } else { // jsonObject.put("count", jsonObject.getInt("count") + 1); // } // return num; // } else { // jsonObject = new JSONObject(); // jsonObject.put("num", rowId); // jsonObject.put("count", 1); // map.put(uniqueValue, jsonObject); // return rowId++; // } // } // } // private String ThreadName = Thread.currentThread().getName(); private void print(String msg) { // msg = "[" + ip + "] [" + DateUtils.getDateTime() + "] [" + ThreadName + "] " + msg; SpringMVCContextHolder.getSystemLogger().info(msg); } public DataTableEntity getData(String maxValue) { StringBuilder sql = new StringBuilder(); sql.append(" SELECT SN.ROW_ID,SN.PRODUCT_SN,NVL(TRACK.IN_STATION_TIME,TRACK_BACK.IN_STATION_TIME) UPDATE_DATE,'' PRE_MASTER_KEY "); sql.append(" FROM (SELECT * "); sql.append(" FROM ( SELECT PRODUCT_SN,ROW_ID "); sql.append(" FROM T_PM_PRODUCT_SN "); sql.append(" ORDER BY PRODUCT_SN ) SN "); sql.append(" WHERE ROWNUM<= ").append(this.pageSize); if (!StringUtils.isEmpty(maxValue)) { sql.append(" AND PRODUCT_SN> '").append(maxValue).append("'"); } sql.append(" ) SN "); sql.append(" LEFT JOIN T_WIP_TRACKING TRACK ON SN.PRODUCT_SN=TRACK.SERIAL_NUMBER "); sql.append(" LEFT JOIN T_WIP_TRACKING_BAK20220421 TRACK_BACK ON SN.PRODUCT_SN=TRACK_BACK.SERIAL_NUMBER "); return dao.getList(sql.toString()); } }