動態

詳情 返回 返回

Java延遲隊列 - 動態 詳情

應用開發過程中,我們常常需要用到延時任務的地方,最近在工作遇到了一個需求,用UDP發送報文,發送後30s後要是還沒有收到回報報文,就對對於報文進行重發。

類似於訂單超時未支付取消訂單一樣,可以有很多解決方法,我這裏採用其中一種,java的延時隊列來實現。

用這篇筆記簡易記錄一下實現過程。

什麼是DelayQueue

DelayQueue 是按照元素的延時時間排序的隊列。元素必須實現 Delayed 接口,該接口定義了一個 getDelay 方法,用於返回元素的剩餘延時時間。

Delayed接口繼承了Comparable接口,所以延時隊列中的元素對象也必須要實現compareTo方法。

延時隊列在內部使用了一個優先級隊列(PriorityQueue)來實現,確保隊頭元素始終是剩餘延時時間最小的元素。

實現過程

1. 創建報文內容類

/**
 * 報文內容類
 * 報文類別號和流水號能確定唯一一條報文
 */
@Data
public class MessageInnerProtocol {
    /**
     * 報文類別號
     */
    private Integer infoClass;
    /**
     * 流水號
     */
    private Integer serialNo;
    
    // 省略其他字段
    // ......
}

2. 創建延時隊列元素對象

@Getter
public class MessageDelayed implements Delayed {

    /**
     * 報文內容
     */
    private final MessageInnerProtocol message;

    /**
     * 計時開始時間
     */
    private final long startTime;
    /**
     * 超時時間
     */
    private static final long EXPIRE_TIME = 30 * 1000;

    /**
     * 構造函數
     * @param message 報文內容
     */
    public MessageDelayed(MessageInnerProtocol message) {
        this.message = message;
        this.startTime = System.currentTimeMillis();
    }


    @Override
    public long getDelay(TimeUnit unit) {
        long elapsedTime = System.currentTimeMillis() - startTime;
        return unit.convert(System.currentTimeMillis() - elapsedTime, TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if(this == o){
            return 0;
        }
        // 根據剩餘時間來進行排序
        long diff = getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        if(diff == 0){
            return 0;
        }else if(diff < 0){
            return -1;
        }else {
            return 1;
        }
    }
}

3 報文回報狀態枚舉類

/**
 * 回報報文狀態
 */
public enum ResultMessageStatusEnums {
    /**
     * 等待報文回報中
     */
    WAITING,
    /**
     * 成功收到報文回報,且校驗成功
     */
    SUCCESS,
    /**
     * 收到回報報文,但是校驗發生錯誤
     */
    FAIL;
}

4 創建線程不停的處理超時報文

public class MessageTimeoutDeal {

    private static final int KNOWN_CAPACITY = 16;

    /**
     * 延時隊列存儲報文
     */
    public static final DelayQueue<MessageDelayed> RESULT_MESSAGE_DELAY_QUEUE = new DelayQueue<>();

    /**
     * <infoClass, <serialNo, ResultMessageStatusEnums>>
     * 用保報文類別和流水號,報文狀態來對報文進行處理
     */
    public static final Map<Integer, Map<Integer, ResultMessageStatusEnums>> RESULT_MESSAGE_MAP = new ConcurrentHashMap<>(KNOWN_CAPACITY);


    public static void main(String[] args) {
        // 生成報文
        MessageInnerProtocol messageInnerProtocol = new MessageInnerProtocol();
        // 設置報文內容
        messageInnerProtocol.setSerialNo(0x243);
        // 流水號建議用 AtomicInteger, 測試簡易寫一下
        messageInnerProtocol.setSerialNo(25);
        Map<Integer, ResultMessageStatusEnums> resultMessageMap =
                RESULT_MESSAGE_MAP.computeIfAbsent(messageInnerProtocol.getInfoClass(), k -> new ConcurrentHashMap<>(KNOWN_CAPACITY));
        // 報文狀態放入map中
        resultMessageMap.put(messageInnerProtocol.getSerialNo(), ResultMessageStatusEnums.WAITING);
        // 將報文加入延時隊列
        RESULT_MESSAGE_DELAY_QUEUE.add(new MessageDelayed(messageInnerProtocol));


        // 創建線程去處理延時隊列中的任務
        new Thread(() -> {
            try {
                while (true){
                    // 從延時隊列中獲取任務
                    MessageDelayed messageDelayed = RESULT_MESSAGE_DELAY_QUEUE.take();
                    MessageInnerProtocol message = messageDelayed.getMessage();
                    Map<Integer, ResultMessageStatusEnums> resultMessageStatusEnumsMap =
                            RESULT_MESSAGE_MAP.get(message.getInfoClass());
                    // 獲取到對應報文的回報狀態
                    ResultMessageStatusEnums resultMessageStatusEnums = resultMessageStatusEnumsMap.get(message.getSerialNo());
                    switch (resultMessageStatusEnums){
                        case WAITING:
                        case FAIL:
                            // 沒有收到回報報文或校驗和失敗
                            // 需要重發或其他流程
                        case SUCCESS:
                            resultMessageStatusEnumsMap.remove(message.getSerialNo());
                            break;
                        default:
                            break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }).start();
    }
}

測試就跳過了,需要報文處理類,對不同報文進行處理,包括回報報文,修改RESULT_MESSAGE_MAP中報文的狀態。

如果需要處理大量的延遲任務, 可以使用netty的時間輪

Add a new 評論

Some HTML is okay.