T100738
2024-04-16 eeb86aaf2f73a02600195ce2637dde6caf858a88
commit
已修改5个文件
75 ■■■■■ 文件已修改
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataExtractService.java 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataSyncService.java 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/utils/CustomLock.java 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-web/resources/LicenseKey.dat 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
product-server-data-center/src/main/java/com/product/data/center/service/DataArchivingQueue.java
@@ -202,7 +202,7 @@
            ExecutorService executorService = queryThreadMap.get(tableName);
            if (executorService != null) {
                if (!executorService.isShutdown()) {
                    executorService.shutdown();
                    executorService.shutdownNow();
                }
                queryThreadMap.remove(tableName);
            }
product-server-data-center/src/main/java/com/product/data/center/service/DataExtractService.java
@@ -533,7 +533,7 @@
                                    maybeUpdate.getData().sort((o1, o2) -> DateUtil.compare(o1.getDate(extractUpdateTimeField), o2.getDate(extractUpdateTimeField)));
                                    batchResultEntity = targetNewDao.updateBatch(maybeUpdate, updateFilterEntity, false);
                                    batchResultEntity = targetNewDao.updateBatch(maybeUpdate, updateFilterEntity, true);
                                    WriteExtractUtil.append("更新提取过滤后数据:" + extractTargetTable + ",需要更新的条数:" + maybeUpdate.getRows() + ",耗时:" + tempTestTimer2.intervalMs());
                                    targetNewDao.closeConnection();
product-server-data-center/src/main/java/com/product/data/center/service/DataSyncService.java
@@ -132,8 +132,8 @@
            if (syncInfo != null && !StringUtils.isEmpty(syncInfo.getString(CmnConst.TABLE_NAME))) {
                String tableName = syncInfo.getString(CmnConst.TABLE_NAME);
                baseDao.executeUpdate("UPDATE product_sys_data_center_log set deal_flag=?,deal_result=?,updated_utc_datetime=?" +
                                " where type=3 and UPPER(other_info) like concat('%',UPPER(?),'%')"
                        , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), "\"table_name\":\"" + tableName + "\""});
                                " where type=3 and config_uuid in (select uuid from product_sys_data_sync_mes_sub where UPPER(table_name)=UPPER(?)) and (result=0 or deal_result=0)"
                        , new Object[]{1, count == lists.size() ? 1 : 0, new Date(), tableName});
            }
        } catch (Exception e) {
            e.printStackTrace();
@@ -157,6 +157,7 @@
        String lockKey = null;
        Dao sourceDao = null;
        try {
            try {
            String tableName = jsonObject.getString(CmnConst.TABLE_NAME);
            String[] uniqueSignFieldName = (StringUtils.isEmpty(jsonObject.getString(CmnConst.UNIQUE_NAME)) ? "" : jsonObject.getString(CmnConst.UNIQUE_NAME)).split(",");
            if (ArrayUtil.isEmpty(uniqueSignFieldName)) {
@@ -172,7 +173,7 @@
                throw new BaseException(ErrorCode.SYNC_GET_CONFIG_FAIL);
            }
            lockKey = jsonObject.getString(CmnConst.TABLE_NAME);
            if (!customLock.tryLock(lockKey)) {
                if (!customLock.tryLock(lockKey, 120)) {
                syncDataLogEntity.setOtherInfo("该表数据同步已有任务在执行,跳过本次执行!");
                journalEntities.add(getLogRecord(syncDataLogEntity, null, tableName,
                        null, null, null, 0L));
@@ -466,6 +467,11 @@
            }
        }
        } finally {
            if (lockStatus && !StringUtils.isEmpty(lockKey)) {
                customLock.unLock(lockKey);
            }
        }
    }
    private static String[] getUniqueValue(String value) {
product-server-data-center/src/main/java/com/product/data/center/utils/CustomLock.java
@@ -1,11 +1,15 @@
package com.product.data.center.utils;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.ArrayUtil;
import com.product.core.spring.context.SpringMVCContextHolder;
import org.apache.commons.lang3.StringUtils;
import javax.swing.*;
import java.text.DateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -19,6 +23,7 @@
    private Map<String, Long> obj = new ConcurrentHashMap<>();
    private Map<String, Long> threadObj = new ConcurrentHashMap<>();
    private long detectionWaitTime = 1000L;
@@ -41,12 +46,64 @@
        return this.tryLock(ArrayUtil.join(key, ","));
    }
    public static void main(String[] args) {
        long timemillis = 1709654400039L;
        //转换为年月日时分秒
        String format = DateUtil.format(DateUtil.date(timemillis), "yyyy-MM-dd HH:mm:ss");
        System.out.println(format);
    }
    public boolean tryLock(String key, long timeoutMinute) {
        if (!StringUtils.isEmpty(key)) {
            key = key.toUpperCase();
            synchronized (key.intern()) {
                if (!this.obj.containsKey(key)) {
                    lock(key);
                    //获取当前线程
                    long id = Thread.currentThread().getId();
                    threadObj.put(key, id);
                    return true;
                } else if (timeoutMinute > 0) {
                    //获取锁进入开始时间
                    long startTime = obj.get(key);
                    //获取当前时间
                    long currentTime = System.currentTimeMillis();
                    //将分钟转换为毫秒
                    long timeout = timeoutMinute * 60 * 1000;
                    //如果当前时间减去开始时间大于等于超时时间
                    if (currentTime - startTime >= timeout) {
                        long threadId = threadObj.get(key);
                        //根据线程id获取线程
                        Thread[] threads = ThreadUtil.getThreads();
                        for (Thread thread : threads) {
                            if (thread.getId() == threadId) {
                                //中断线程
                                ThreadUtil.interrupt(thread, true);
                                SpringMVCContextHolder.getSystemLogger().error("线程:" + thread.getName() + "超时被中断");
                                return tryLock(key, timeoutMinute);
                            }
                        }
                        return true;
                    }
                }
            }
        }
        return false;
    }
    public boolean tryLock(String key) {
        if (!StringUtils.isEmpty(key)) {
            key = key.toUpperCase();
            synchronized (key.intern()) {
                if (!this.obj.containsKey(key)) {
                    lock(key);
                    //获取当前线程
                    long id = Thread.currentThread().getId();
                    threadObj.put(key, id);
                    return true;
                }
            }
product-server-web/resources/LicenseKey.dat
@@ -1 +1 @@

