package com.product.kt.test.service;
|
|
import cn.hutool.core.collection.ListUtil;
|
import cn.hutool.core.thread.ThreadUtil;
|
import com.product.common.lang.StringUtils;
|
import com.product.core.entity.DataTableEntity;
|
import com.product.core.entity.FieldSetEntity;
|
import com.product.core.spring.context.SpringMVCContextHolder;
|
import com.product.datasource.config.DataBaseType;
|
import com.product.datasource.dao.Dao;
|
import com.product.datasource.entity.DataBaseEntity;
|
import com.product.datasource.entity.UpdateFilterEntity;
|
import com.product.datasource.service.RedisService;
|
import redis.clients.jedis.Jedis;
|
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* @Author cheng
|
* @Date 2022/12/21 17:57
|
* @Desc
|
*/
|
public class InitTProductSn {
|
|
|
private static String[] sourceUuid = {"10.13.1.36", "10.13.1.40", "10.13.1.41", "10.13.1.42"};
|
|
|
private Dao dao;
|
|
private DataBaseEntity dbe;
|
|
private static RedisService redisService;
|
|
private final int pageSize = 100000;
|
|
private ExecutorService executorService = Executors.newFixedThreadPool(1);
|
|
private Queue<DataTableEntity> queue = new LinkedBlockingQueue();
|
|
private String ip;
|
|
public void initDao(String ip) {
|
DataBaseEntity dbe = new DataBaseEntity(DataBaseType.ORACLE.getValue());
|
dbe.setUserName("CHKTADMIN");
|
dbe.setPassWord("CHKTADMIN");
|
dbe.setPort("1521");
|
dbe.setServerName("ORCL");
|
dbe.setIp(ip);
|
this.dbe = dbe;
|
this.dao = dbe.getDao();
|
}
|
|
public synchronized void getRedisBase() {
|
if (redisService == null) {
|
DataBaseEntity dbe = new DataBaseEntity(5);
|
dbe.setIp("127.0.0.1");
|
dbe.setPort("6379");
|
dbe.setDbInstance("0");
|
this.redisService = new RedisService(dbe);
|
}
|
}
|
|
public static void main(String[] args) throws InterruptedException {
|
ExecutorService executorService = Executors.newFixedThreadPool(sourceUuid.length);
|
for (String ip : sourceUuid) {
|
executorService.submit(() -> {
|
InitTProductSn initTProductSn = new InitTProductSn();
|
initTProductSn.main(ip);
|
});
|
}
|
executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
|
}
|
|
private boolean selectFlag = false;
|
|
public void main(String ip) {
|
this.ip = ip;
|
this.initDao(ip);
|
this.getRedisBase();
|
String maxValue = null;
|
DataTableEntity data;
|
|
executorService.submit(() -> {
|
updateData();
|
});
|
this.executorService.shutdown();
|
int i = 1;
|
try {
|
do {
|
while (queue.size() >= 10) {
|
// print("已查询到10批次等待更新处理.....");
|
// print("更新线程状态:" + executorService.isTerminated());
|
ThreadUtil.sleep(3000);
|
}
|
print("开始查询 第 " + i + " 次,最大值为:" + maxValue);
|
data = getData(maxValue);
|
if (DataTableEntity.isEmpty(data)) {
|
break;
|
}
|
print("第 " + i + "次查询,总条数:" + data.getRows());
|
queue.add(data);
|
maxValue = data.getData().stream().max(Comparator.comparing(item -> item.getString("product_sn"))).get().getString("product_sn");
|
i++;
|
} while (!DataTableEntity.isEmpty(data));
|
} catch (Exception e) {
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
e.printStackTrace();
|
}
|
this.selectFlag = true;
|
this.dao.closeConnection();
|
}
|
|
public void updateData() {
|
Dao dao = null;
|
Long time = System.currentTimeMillis();
|
int updateCount = 1;
|
while (!selectFlag) {
|
try {
|
|
DataTableEntity data = queue.poll();
|
if (DataTableEntity.isEmpty(data)) {
|
// print("更新线程等待查询数据...");
|
ThreadUtil.sleep(3000);
|
continue;
|
}
|
if (dao == null || (System.currentTimeMillis() - time > 1000 * 60 * 10)) {
|
if (dao != null) {
|
dao.closeConnection();
|
} else {
|
dao = dbe.newDao();
|
}
|
}
|
int rows = data.getRows();
|
List<List<FieldSetEntity>> split = ListUtil.split(data.getData(), 1000);
|
for (List<FieldSetEntity> fieldSetEntities : split) {
|
data = new DataTableEntity();
|
|
data.setData(fieldSetEntities.stream().filter(item -> {
|
item.setValue("row_id", getRowId(item.getString("product_sn"), item.getInteger("row_id")));
|
item.setValue("pre_master_key", "");
|
return true;
|
}).collect(Collectors.toList()));
|
if (this.ip.equals(sourceUuid[0])) {
|
data.getMeta().setFields(new Object[]{"product_sn", "row_id", "pre_master_key"});
|
} else {
|
data.getMeta().setFields(new Object[]{"product_sn", "row_id", "update_date", "pre_master_key"});
|
}
|
data.getMeta().setTableName(new Object[]{"T_PM_PRODUCT_SN"});
|
dao.updateBatch(data, new UpdateFilterEntity(" PRODUCT_SN =? ", new String[]{"product_sn"}), true);
|
}
|
print("更新完成,更新条数:" + rows + ", 更新次数:" + updateCount++);
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
}
|
}
|
|
if (dao != null) {
|
dao.closeConnection();
|
}
|
print("更新线程结束----------------------------------------------------------------------------------------------------------------------------------------------------------------");
|
}
|
|
|
private static int rowId = 19144409;
|
|
private static boolean initFlag = false;
|
|
private static synchronized void initRow() {
|
if (!initFlag) {
|
redisService.set("T_PM_PRODUCT_SN", rowId, false);
|
initFlag = true;
|
}
|
}
|
|
public static int getRowId(String uniqueValue, Integer row) {
|
if (!initFlag) {
|
initRow();
|
}
|
synchronized (uniqueValue.intern()) {
|
Jedis jedis = redisService.getJedis();
|
try {
|
String o = jedis.get(uniqueValue);
|
if (StringUtils.isEmpty(o)) {
|
row = Integer.valueOf(o);
|
} else {
|
if (row == null || row.intValue() <= 0) {
|
row = jedis.incr("T_PM_PRODUCT_SN").intValue();
|
}
|
jedis.set(uniqueValue, row.toString());
|
}
|
return row;
|
} catch (Exception e) {
|
e.printStackTrace();
|
SpringMVCContextHolder.getSystemLogger().error(e);
|
} finally {
|
jedis.close();
|
}
|
return -1;
|
}
|
}
|
|
// public static int getRowId(String uniqueValue, Integer row) {
|
// synchronized (uniqueValue.intern()) {
|
// Num n = (Num) redisService.get(uniqueValue, false);
|
// if (row != null && row > 0) {
|
// if (n == null) {
|
// n = new Num();
|
// n.num = row;
|
// n.count = 1;
|
// } else {
|
// int num = n.num;
|
// if (num != row.intValue()) {
|
// } else {
|
// row = num;
|
// }
|
// n.num++;
|
// }
|
// redisService.set(uniqueValue, n, false);
|
// return row;
|
// }
|
// if (n != null) {
|
// int num =n.num;
|
// if (n.count >= 2) {
|
// map.remove(uniqueValue);
|
// } else {
|
// jsonObject.put("count", jsonObject.getInt("count") + 1);
|
// }
|
// return num;
|
// } else {
|
// jsonObject = new JSONObject();
|
// jsonObject.put("num", rowId);
|
// jsonObject.put("count", 1);
|
// map.put(uniqueValue, jsonObject);
|
// return rowId++;
|
// }
|
// }
|
// }
|
|
// private String ThreadName = Thread.currentThread().getName();
|
|
private void print(String msg) {
|
// msg = "[" + ip + "] [" + DateUtils.getDateTime() + "] [" + ThreadName + "] " + msg;
|
SpringMVCContextHolder.getSystemLogger().info(msg);
|
}
|
|
|
public DataTableEntity getData(String maxValue) {
|
StringBuilder sql = new StringBuilder();
|
sql.append(" SELECT SN.ROW_ID,SN.PRODUCT_SN,NVL(TRACK.IN_STATION_TIME,TRACK_BACK.IN_STATION_TIME) UPDATE_DATE,'' PRE_MASTER_KEY ");
|
sql.append(" FROM (SELECT * ");
|
sql.append(" FROM ( SELECT PRODUCT_SN,ROW_ID ");
|
sql.append(" FROM T_PM_PRODUCT_SN ");
|
sql.append(" ORDER BY PRODUCT_SN ) SN ");
|
|
|
sql.append(" WHERE ROWNUM<= ").append(this.pageSize);
|
if (!StringUtils.isEmpty(maxValue)) {
|
sql.append(" AND PRODUCT_SN> '").append(maxValue).append("'");
|
}
|
sql.append(" ) SN ");
|
sql.append(" LEFT JOIN T_WIP_TRACKING TRACK ON SN.PRODUCT_SN=TRACK.SERIAL_NUMBER ");
|
sql.append(" LEFT JOIN T_WIP_TRACKING_BAK20220421 TRACK_BACK ON SN.PRODUCT_SN=TRACK_BACK.SERIAL_NUMBER ");
|
return dao.getList(sql.toString());
|
}
|
|
}
|