動態

詳情 返回 返回

時間輪在 Netty , Kafka 中的設計與實現 - 動態 詳情

本文基於 Netty 4.1.112.Final , Kafka 3.9.0 版本進行討論

在業務開發的場景中,我們經常會遇到很多定時任務的需求。比如,生成業務報表,週期性對賬,同步數據,訂單支付超時處理等。針對業務場景中定時任務邏輯複雜,執行時間長的特點,市面上已經有很多成熟的任務調度中間件可供我們選擇。比如:ElasticJob , XXL-JOB , PowerJob 等等。

而在中間件的場景中,同樣也存在很多定時任務的需求。比如,網絡連接的心跳檢測,網絡請求超時或失敗的重試機制,網絡連接斷開之後的重連機制。和業務場景不同的是,這些中間件場景的定時任務特點是邏輯簡單,執行時間非常短,而且對時間精度的要求比較低。比如,心跳檢測以及失敗重試這些定時任務,其實晚執行個幾十毫秒或者 100 毫秒也無所謂。

於是針對中間件場景中的這些定時任務特點:

  1. 海量任務
  2. 任務邏輯簡單
  3. 執行時間短
  4. 對任務調度的及時性沒有那麼高的要求

各大中間件設計了時間輪來調度具有上述 4 種特徵的定時任務,而本文主要討論的就是時間輪的設計與實現。但在這之前我們需要搞清楚時間輪這個設計需求是怎麼產生的,我們先從 JDK 中的任務調度組件開始聊起,看看 JDK 中的這些任務調度組件為什麼不能滿足中間件的場景。

image

1. JDK 中的任務調度組件

説到定時任務,我們第一時間就能想到的調度組件就是 JDK 中的 Timer,為什麼這麼説呢,因為筆者剛參加工作時的第一個任務就是用 Timer 實現的,當時對 Java 一無所知,完全零基礎。主管交給我一個定時任務的需求,兩眼抹黑。於是帶着清澈而又稚嫩的眼神到網上一頓搜索,找到了這個 Timer,如獲至寶。

1.1 Timer

public class Timer {
    // 優先隊列,按照任務的 ExecutionTime,由近到遠組織
    private final TaskQueue queue = new TaskQueue();
    // 延時任務的調度線程
    private final TimerThread thread = new TimerThread(queue);
}

Timer 中有兩個核心組件,一個是用於調度延時任務的 TimerThread ,另一個是 TaskQueue,用於組織延時任務。它是一個優先級隊列,其底層是一個數組實現的小根堆。

class TaskQueue {
    // 數組實現的小根堆
    private TimerTask[] queue = new TimerTask[128];

    // 向小根堆的堆底添加TimerTask 
    void add(TimerTask task) {
        if (size + 1 == queue.length)
            queue = Arrays.copyOf(queue, 2*queue.length);

        queue[++size] = task;
        // 向上調整
        fixUp(size);
    }

    // 獲取堆頂任務
    TimerTask getMin() {
        return queue[1];
    }
    
    // 刪除堆頂任務
    void removeMin() {
        queue[1] = queue[size];
        queue[size--] = null;
        //向下調整堆
        fixDown(1);
    }
}

TaskQueue 會將所有延時任務按照它們的 ExecutionTime ,由近到遠的組織在小根堆中,堆頂永遠存放的是 ExecutionTime 最近的延時任務。

TimerThread 會不斷的從 TaskQueue 中獲取堆頂任務,如果堆頂任務的 ExecutionTime 已經達到 —— executionTime <= currentTime , 則執行任務。如果該任務是一個週期性任務,則將任務重新放入到 TaskQueue 中。

如果堆頂任務的 ExecutionTime 還沒有到達,那麼 TimerThread 就會等待 executionTime - currentTime 的時間,一直到堆頂任務的執行時間到達,TimerThread 被重新喚醒執行堆頂任務。

    private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                // 堆頂任務的執行時間是否到達
                boolean taskFired;
                synchronized(queue) {
                    long currentTime, executionTime;
                    // 獲取堆頂延時任務
                    task = queue.getMin();
                    synchronized(task.lock) {
                        // 當前時間
                        currentTime = System.currentTimeMillis();
                        // 堆頂任務的執行時間
                        executionTime = task.nextExecutionTime;
                        // 是否到達堆頂任務的執行時間
                        if (taskFired = (executionTime<=currentTime)) {
                            if (task.period == 0) { // Non-repeating, remove
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else { // Repeating task, reschedule
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    // 如果堆頂任務的執行時間還未到達,那麼 TimerThread 就會在這裏等待
                    if (!taskFired)
                        queue.wait(executionTime - currentTime);
                }
                // 如果堆頂任務的執行時間已經到達,則立即執行
                if (taskFired)  // Task fired; run it, holding no locks
                    task.run();
            } catch(InterruptedException e) {
            }
        }
    }

根據以上 Timer 的核心實現,我們可以總結出 Timer 在應對中間件場景的延時任務時,有以下四種不足:

  1. 首先用於組織延時任務的 TaskQueue 本質上是一個小根堆。對於堆這種數據結構來説,添加,刪除一個延時任務時,堆都要向上,向下調整以便滿足小根堆的特性。單次操作的時間複雜度為 O(logn)。顯然在面對海量定時任務的添加,刪除時,性能上還是差點意思。
  2. Timer 調度框架中只有一個 TimerThread 線程來負責延時任務的調度,執行。在面對海量任務的時候,通常會顯得力不從心。
  3. 另外一個嚴重問題是,當延時任務在執行的過程中出現異常時, Timer 並不會捕獲,會導致 TimerThread 終止。這樣一來,TaskQueue 中的其他延時任務將永遠不會得到執行。
  4. Timer 依賴於系統的絕對時間,如果系統時間本身不準確,那麼延時任務的調度就可能會出問題。

1.2 DelayQueue

DelayQueue 是 JDK 提供的一個延時隊列,我們可以利用它來延時獲取隊列中的元素,它的實現其實和 Timer 中的 TaskQueue 很類似,其底層都是一個優先級隊列。

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
    
    //基於小根堆實現的優先級隊列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
}

本質上還是一個數組實現的小根堆。添加,刪除任務的時間複雜度仍然是 O(logn)

public class PriorityQueue<E> {
    // 數組實現的小根堆
    transient Object[] queue; 
}

DelayQueue 中的元素必須實現 Delayed 接口中的 getDelay , compareTo 方法。

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

public interface Comparable<T> {
   public int compareTo(T o);
}

其中 getDelay 方法用於獲取任務還有多久到期。返回值如果小於等於 0 ,則表示該任務已經到期了。

compareTo 方法用於調整任務在 DelayQueue 中的位置,DelayQueue 是一個小根堆,每次向 DelayQueue 添加新的任務時,先是把任務放到 DelayQueue 的末尾,然後依次向上調整,直到任務的過期時間大於等於其 parent 。 這樣就可以保證 DelayQueue 的小根堆特性 —— 堆頂元素永遠是過期時間最近的任務。

我們可以通過 take() 方法從 DelayQueue 獲取到期的堆頂任務,如果堆頂任務還沒到期,那麼就會在 DelayQueue 上阻塞等待,直到堆頂任務到期為止。

    public E take() throws InterruptedException {
        try {
            for (;;) {
                // 獲取 DelayQueue 堆頂任務
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                    // 獲取堆頂任務還有多久到期
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0L)
                        // 堆頂任務到期,則從 DelayQueue 中取出
                        return q.poll();
                    else {
                        try {
                            // 等待堆頂任務到期
                            available.awaitNanos(delay);
                        }
                    }
                }
            }
        } finally {
                    ...........
        }
    }

從 DelayQueue 的實現上可以看出,相比於 Timer ,DelayQueue 只是一個延時任務的管理隊列,而 Timer 卻是一個完整的任務調度組件。我們需要在 DelayQueue 的基礎之上,額外地實現任務調度功能。

但其底層的核心數據結構仍然是一個小根堆。和 Timer 一樣,添加刪除延時任務的時間複雜度都是 O(logn)。同樣無法滿足海量延時任務的調度。

1.3 ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor 是多線程版本的 Timer ,它是在 DelayQueue 的基礎上增加了多線程調度延時任務的能力。ScheduledThreadPoolExecutor 中負責組織管理延時任務的是 DelayedWorkQueue,它也是一個小根堆實現的優先級隊列,延時任務 ScheduledFutureTask 按照到期時間由近及遠的組織在 DelayedWorkQueue 中。DelayedWorkQueue 的第一個元素是到期時間最近的 ScheduledFutureTask。

業務線程可以通過 schedule , scheduleAtFixedRate , scheduleWithFixedDelay 方法將延時任務 ScheduledFutureTask 添加到 DelayedWorkQueue 中。

image

ScheduledThreadPoolExecutor 負責調度延時任務的是一個線程池,裏邊包含了多個 worker 調度線程,每個 worker 線程負責從 DelayedWorkQueue 中獲取已經到期的 ScheduledFutureTask,然後執行。如果 DelayedWorkQueue 中沒有任務到期,那麼 worker 線程就會在 DelayedWorkQueue 上阻塞等待,直到有到期的任務出現。

雖然 ScheduledThreadPoolExecutor 提供了多線程的調度能力,在一定程度上保證了延時任務調度的及時性,但是其底層仍然是依賴 DelayedWorkQueue 來管理延時任務,在面對海量延時任務的添加,刪除時,時間複雜度依然還是 O(logn) 。那麼有沒有一種數據結構可以將這個時間複雜度降低為 O(1) 呢 ? 這就是本文我們要討論的重點內容 —— 時間輪的設計與實現。

2. Netty 時間輪的設計原理

時間輪的設計靈感來自於我們日常生活中用的鐘表,鐘錶有秒針,分針,時針,共三個指針,60 個刻度。秒針每走一個刻度就是一秒,秒針走完一個時鐘週期(60s),分針走一個刻度就是一分鐘,當分針走完一個時鐘週期(60m),時針走一個刻度就是一個小時。

image

比如我們要在今天的 10 點 10 分 0 秒這個時刻去開一個重要的會議,那麼當鐘錶的秒針指向 0 這個刻度,分針指向 10 這個刻度,時針指向 10 這個刻度的時候,鬧鐘就會響起,提醒我們去執行開會這個延時任務。

如果我們能把鐘錶裏的刻度抽象成一個數據結構,用這個數據結構來存放對應刻度的延時任務的話,那麼當鐘錶的時針,分針,秒針指向某個刻度的時候,我們就去執行這個刻度對應的延時任務,這樣一來,一種新的延時任務調度思路就出來了,這也是時間輪的設計理念。

image

如上圖所示,Netty 將鐘錶的刻度抽象成了一個 HashedWheelBucket 的數據結構,鐘錶的錶盤被抽象成一個 HashedWheelBucket 類型的環形數組,鐘錶中有 60 個刻度,而 Netty 的時間輪 HashedWheelTimer 一共有 512 個刻度。

public class HashedWheelTimer implements Timer {
    // 數組大小默認為 512
    private final HashedWheelBucket[] wheel;
    // HashedWheelTimer 的時鐘精度,也就是時鐘間隔,多久轉動一次,默認 100ms, 最小值為 1ms
    private final long tickDuration;
}

鐘錶中一共有三個指針,分別是秒針,分針,時針。而 HashedWheelTimer 中只有一個 tick 指針,tick 每隔 tickDuration (100ms) 走一個刻度,也就是説 Netty 時間輪的時鐘精度就是 100 ms , 定時任務的調度延時有時會在 100ms 左右。如果你接受不了這麼大的調度誤差,那麼可以將 tickDuration 適當調小一些,但最小不能低於 1ms 。

什麼意思呢 ?比如現在我們需要在 5ms 之後執行一個延時任務,那麼時間輪可能在 8ms 之後才會調度這個任務,也可能在 65ms 之後調度,也有可能在 108ms 之後調度,這就使得定時任務的執行有了大約 100ms 左右的延時。

具體延時多少,取決於我們在什麼時刻將這個定時任務添加到時間輪中。關於這一點,筆者後面會在介紹時間輪具體實現細節的時候詳細討論,這裏點到為止,本小節我們還是主要聚焦於時間輪的設計原理。

對於鐘錶的秒針來説,它的 tickDuration 就是 1s , 走完一個時鐘週期就是 60s 。 對於分針來説,它的 tickDuration 就是 1m , 走完一個時鐘週期就是 60m。對於時針來説,它的 tickDuration 就是 1h , 走完一個時鐘週期就是 12h。

由於 HashedWheelTimer 中的 tickDuration 是 100ms , 有 512 個刻度 (HashedWheelBucket) , 所以時間輪中的 tick 指針走完一個時鐘週期需要 51200ms 。

HashedWheelBucket 是一個具有頭尾指針的雙向鏈表,鏈表中存儲的元素類型為 HashedWheelTimeout 用於封裝定時任務。HashedWheelBucket 中的 head 指向雙向鏈表中的第一個 HashedWheelTimeout , tail 指向雙向鏈表中的最後一個 HashedWheelTimeout。

    private static final class HashedWheelBucket {
        // Used for the linked-list datastructure
        private HashedWheelTimeout head;// 指向雙向鏈表中的第一個 timeout
        private HashedWheelTimeout tail;// 指向雙向鏈表中的最後一個 timeout
    }

image

HashedWheelTimeout 用於封裝時間輪中的延時任務,提交到時間輪中的延時任務必須實現 TimerTask 接口。

// 延時任務
public interface TimerTask {
    void run(Timeout timeout) throws Exception;
}

private static final class HashedWheelTimeout implements Timeout, Runnable {
        // 延時任務所屬的時間輪
        private final HashedWheelTimer timer;
        // 延時任務
        private final TimerTask task;
        // 延時任務的 deadline ,該時間是一個絕對時間,以時間輪的啓動時間 startTime 為起點
        private final long deadline;
        // 延時任務所屬的 bucket
        HashedWheelBucket bucket;
        // 指向其在 bucket 的下一個延時任務
        HashedWheelTimeout next;
        // 指向其在 bucket 的前一個延時任務
        HashedWheelTimeout prev;


        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }
}

HashedWheelTimeout 中有一個重要的屬性 deadline ,它規定了延時任務 TimerTask 的到期時間。deadline 是一個絕對時間值,它以時間輪的啓動時間 startTime 為起點,表示從 startTime 這個時間點開始,到 deadline 這個時間點到期。

// 計算延時任務到期的絕對時間戳
// 時間輪中的時間戳均以時間輪的啓動時間 startTime 為起點
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

image

Netty 時間輪中的時間座標系全部是以時間輪的啓動時間點 startTime 為基準的,當時間輪啓動之後,會將那一刻的時間戳設置到 startTime 中。

public class HashedWheelTimer implements Timer {
    // 時間輪的啓動時間戳(納秒)
    private volatile long startTime;
}

時間輪中的 tick 指針也是一個絕對值,當時間輪啓動之後,tick 指向 0 ,每隔 100ms (tickDuration),tick 向前轉動一下。但需要注意的是 tick 的值是隻增不減的,只要時間輪在運行,那麼 tick 的值就會一直遞增上去。比如,當 tick 轉動完一個時鐘週期(51200ms)之後,tick 的值是 512 而不是重新指向 0 。

tick 與 HashedWheelBucket 之間的映射關係通過 ticks & mask 計算得出。mask 為 HashedWheelBucket 的個數減 1 ,所以這就要求時間輪中 HashedWheelBucket 的個數必須是 2 的次冪。

在時間輪中,屬於同一個 HashedWheelBucket 中的延時任務 HashedWheelTimeouts ,它們的到期時間 deadline 都在同一時間範圍內 —— [ tick , tick + 1) * tickDuration

比如,在時間輪剛剛啓動之後,tick 指向 0 ,那麼 wheel[0] 指向的 HashedWheelBucket 裏存放的 HashedWheelTimeouts,它們的到期時間均在 [ 0 , 100) ms 之內。

假如我們在 tick = 0 這個時刻,向時間輪中添加了一個延時 5ms 執行的 HashedWheelTimeout,那麼它就會被放入 wheel[0] 中。如果添加的是一個延時 101ms 執行的 HashedWheelTimeout,那麼它就會被放入 wheel[1] 中。同樣的道理,如果添加的是一個延時 360ms 執行的 HashedWheelTimeout,那麼它就會被放入 wheel[3] 中。

image

當時間過了 100ms 之後,Netty 就會將 HashedWheelBucket0 中的延時任務拉出來執行,執行完之後,tick 的值加 1 ,從 0 轉動到 1 。在經過 100 ms 之後,Netty 就會將 HashedWheelBucket1 中的延時任務拉出來執行,執行完之後,tick 的值加 1 ,從 1 轉動到 2 ,如此往復循環。這就是整個時間輪的運轉邏輯。

但從這個過程中我們可以看出,延時任務的調度存在 tickDuration(100ms)左右的延遲。比如,在 tick = 0 這個時刻,添加到 HashedWheelBucket0 中的延時任務,我們本來是期望這些延時任務分別在 5ms , 10ms , 95ms 之後執行,但時間輪的真正調度時間卻在 100ms 之後。這就導致了任務調度產生了 100ms 左右的延遲。

如果你接受不了 100ms 的延遲,那麼可以在創建時間輪的時候,將 tickDuration 的值調低,但不能低於 1ms 。tickDuration 的值越小,時間輪的精度越高,性能開銷也就越大。tickDuration 的值越大,時間輪的精度也就越低,性能開銷越小。

    public HashedWheelTimer(long tickDuration, TimeUnit unit) {
        
    }

但在中間件的場景中,往往對延時任務調度的及時性沒有那麼高的要求,同時為了兼顧時間輪的精度與性能,tickDuration 默認設置為100ms 是剛好合適的,通常不需要調整。

另外在默認情況下,只有一個線程 workerThread 負責推動時間輪的轉動,以及延時任務的執行。

public class HashedWheelTimer implements Timer {
    // HashedWheelTimer 的 worker 線程,由它來驅動時間輪的轉動,延時任務的執行
    private final Thread workerThread;
}

從上面的過程可以看出,只有當前 tick 對應的 HashedWheelBucket 中的延時任務全部被執行完畢的時候,tick 才會向前推動。所以為了保證任務調度的及時性,時間輪中的延時任務執行時間不能太長,只適合邏輯簡單,執行時間短的延時任務。

但畢竟在默認情況下就只有這一個 workerThread,既負責延時任務的調度,又負責延時任務的執行,對於有海量併發延時任務的場景,還是顯得力不從心。為了應對這種情況,我們可以在創建時間輪的時候,指定一個專門用於執行延時任務的 Executor。

這樣一來,時間輪中的延時任務調度還是由單線程 workerThread 負責,到期的延時任務由線程池 Executor 來負責執行,近一步提升延時任務調度的及時性。但事實上,在大部分場景中,有一個 workerThread 就夠了,並不需要額外的指定 Executor。大家可以根據實際情況,自由裁定。

public class HashedWheelTimer implements Timer {
    // 負責執行延時任務,用於應對大量的併發延時任務場景
    // 默認為單線程 workerThread
    private final Executor taskExecutor;
    // 在構造函數中可以設置 taskExecutor
    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts, Executor taskExecutor) 
}

另外還有一個問題就是,上圖時間輪中的延時任務,它們的延時時間都在同一時鐘週期內。Netty 時間輪中的一個時鐘週期是 51200ms 。

也就是説,在 tick = 0 這個時刻,只要延時任務的延時時間在 51200ms 之內,那麼當 tick 轉動完 512 個刻度之後(一個時鐘週期),這 512 個刻度對應的 HashedWheelBucket 中的延時任務全部會被執行到。

如果我們在 tick = 0 這個時刻,添加一個延時任務,但它的延時時間超過了一個時鐘週期,比如在 51250ms 之後執行。 那麼這個延時任務也會被添加到 HashedWheelBucket0 中。

image

當時間過了 100ms 之後,workerThread 就會執行 HashedWheelBucket0 中的延時任務。但此時只能執行延時 5ms , 10ms 的任務,不能執行延時 51250ms 的任務,因為它需要等到下一個時鐘週期才能執行。

那麼 workerThread 在執行延時任務的時候如何才能知道,哪些任務是本次時鐘週期內可以執行的,哪些任務是要等到下一次或者下下次時鐘週期才能執行的呢 ?

在延時任務模型 HashedWheelTimeout 中有一個字段 —— remainingRounds,用於記錄延時任務還剩多少時鐘週期可以執行。

private static final class HashedWheelTimeout implements Timeout, Runnable {
    // 執行該延時任務需要經過多少時鐘週期
    long remainingRounds;
}

本次時鐘週期內可以執行的延時任務,它的 remainingRounds = 0 ,workerThread 在遇到 remainingRounds = 0 的 HashedWheelTimeout 就會執行。

下一個時鐘週期才能執行的延時任務,它的 remainingRounds = 1 ,依次類推。當 workerThread 遇到 remainingRounds > 0 的 HashedWheelTimeout 就會直接跳過,並將 remainingRounds 減 1 。

比如,上圖中 HashedWheelBucket0 中的這幾個延時任務,其中延時 5ms , 10ms 的 HashedWheelTimeout 它們的 remainingRounds = 0, 表示在本次時鐘週期內就可以馬上執行。

image

延時 51250ms 的 HashedWheelTimeout 它的 remainingRounds = 1 , 表示在下一個時鐘週期才能執行。

好了,現在整個時間輪的設計原理筆者就為大家介紹完了,那麼讓我們再次回到本小節開頭的問題,在面對海量延時任務的添加與取消時,時間輪如何將這個時間複雜度降低為 O(1) 呢 ?

首先,時間輪的核心數據結構就是一個 HashedWheelBucket 類型的環形數組 wheel , 數組長度默認為 512 。wheel 數組用於組織管理時間輪中的所有延時任務。

    // 數組大小默認為 512
    private final HashedWheelBucket[] wheel;

與之前介紹的延時隊列 DelayedWorkQueue 不同的是,環形數組 wheel 會按照延時時間的不同,將延時任務分散到 512 個 HashedWheelBucket 中管理。每個 HashedWheelBucket 負責管理到期時間範圍在 [ tick , tick + 1) * tickDuration 之間的任務。

而 DelayedWorkQueue 則是用一個優先級隊列來管理所有的延時任務,為了維護小根堆的特性,每次在向 DelayedWorkQueue 添加或者刪除延時任務的時間複雜度為 O(logn)

當我們向時間輪添加一個延時任務時,Netty 首先會根據延時任務的 deadline 以及 tickDuration 計算出該延時任務最終會停留在哪一個 tick 上。注意,延時任務中的 deadline 是一個絕對值而不是相對值,是以時間輪啓動時間 startTime 為基準的一個絕對時間戳。tick 也是一個絕對值而不是相對值,是以時間輪剛剛啓動時 tick = 0 為基準的絕對值,只增不減。

比如,前面這個延時 51250ms 的任務,它最終會停留在 tick = 512 上。但由於時間輪是一個環形數組,所以 tick 512 與數組長度 512 取餘得到所屬 HashedWheelBucket 在 wheel 數組中的 index = 0。

// 計算延時任務,最終會停留在哪一個 tick 上
long calculated = timeout.deadline / tickDuration;

// 獲取 calculated 對應的 HashedWheelBucket
int stopIndex = (int) (calculated & mask);
HashedWheelBucket bucket = wheel[stopIndex];

// 將延時任務添加到對應的 HashedWheelBucket 中
bucket.addTimeout(timeout);

然後將延時任務添加到 HashedWheelBucket 的末尾,前面我們已經提過,HashedWheelBucket 是一個雙向鏈表,向鏈表末尾添加一個元素的時間複雜度為 O(1)

private static final class HashedWheelBucket {

        public void addTimeout(HashedWheelTimeout timeout) {
            assert timeout.bucket == null;
            timeout.bucket = this;
            if (head == null) {
                head = tail = timeout;
            } else {
                tail.next = timeout;
                timeout.prev = tail;
                tail = timeout;
            }
        }      
}

延時任務的取消邏輯也很簡單,就是將這個延時任務從其所屬的 HashedWheelBucket 中刪除即可。從一個雙向鏈表中刪除某個指定的元素時間複雜度也是 O(1)

  public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
            HashedWheelTimeout next = timeout.next;
            // timeout 不是第一個元素
            if (timeout.prev != null) { 
                timeout.prev.next = next;
            }
            // timeout 不是最後一個元素
            if (timeout.next != null) {
                timeout.next.prev = timeout.prev;
            }

            if (timeout == head) {
                // Bucket 中只有一個任務,直接將頭尾指針置空
                if (timeout == tail) {
                    tail = null;
                    head = null;
                } else {
                    // 待刪除的任務是第一個任務,head 指針向後移動
                    head = next;
                }
            } else if (timeout == tail) {
                // 待刪除的任務是最後一個任務,tail 指針向前移動
                tail = timeout.prev;
            }
            // null out prev, next and bucket to allow for GC.
            timeout.prev = null;
            timeout.next = null;
            timeout.bucket = null;
            return next;
        }

從以上過程我們可以看出,時間輪在面對海量延時任務的添加,取消的時候,所需的時間複雜度都是 O(1),聊完了延時任務的管理,現在我們在來看一下延時任務的調度與執行。

Netty 只靠一個單線程 workThread 來推動時間輪一個 tick 一個 tick 地向前轉動,當時間輪轉動到某一個 tick 時,workThread 會等待一個 tickDuration (默認 100ms)的時間,隨後 workThread 會將該 tick 對應的 HashedWheelBucket 中 remainingRounds = 0 的延時任務全都拉取下來挨個執行。

當執行完 HashedWheelBucket 中的延時任務之後,tick 向前推進一格(tick++),workThread 繼續睡眠等待一個 tickDuration,然後重複上述過程。

這裏我們可以看出,延時任務的調度與執行在默認情況下全部都是由一個單線程 workThread 來執行。如果時間輪中的延時任務邏輯複雜,執行時間長,那麼就會影響整個時間輪的調度,tick 的轉動就會出現延時,所以時間輪雖然可以處理海量的延時任務,但是這些延時任務的邏輯必須要簡單,執行時間要短。當然了,我們也可以在創建時間輪的時候指定一個專門執行延時任務的線程池來加快任務的執行。

由於延時任務的調度通常會有一個 tickDuration 左右的延時。比如,任務的調度可能會晚幾毫秒或者幾十毫秒,也有可能晚一個 tickDuration 左右。所以時間輪只能處理那些對任務調度的及時性要求沒那麼高的場景

3. Netty 時間輪相關設計模型的實現

image

3.1 HashedWheelTimer

Netty 使用一個叫做 HashedWheelTimer 的結構來描述時間輪,其中包含了第二小節中介紹的所有重要屬性以及核心結構。其中最核心的就是 wheel 環形數組,它相當於鐘錶的錶盤,錶盤中的每一個刻度用 HashedWheelBucket 結構描述。

private final HashedWheelBucket[] wheel;

時間輪中究竟包含多少個刻度,是由構造參數 ticksPerWheel 決定的,默認為 512 。Netty 會根據延時時間的不同將所有提交到時間輪的延時任務分散到 512 個 HashedWheelBucket 中組織管理。定位延時任務所在的 HashedWheelBucket 以及向 HashedWheelBucket 中添加,取消延時任務的時間複雜度均為 O(1)

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts, Executor taskExecutor) 

如果時間輪中需要調度的延時任務非常多,那麼每個 HashedWheelBucket 中就可能包含大量的延時任務,這就導致時間輪的調度發生延遲。針對這種情況,我們可以適當增加 ticksPerWheel 的個數,讓更多的 HashedWheelBucket 來分攤延時任務。但 ticksPerWheel 必須是 2 的次冪。

    private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        // ticksPerWheel 必須是 2 的次冪,默認為 512
        ticksPerWheel = MathUtil.findNextPositivePowerOfTwo(ticksPerWheel);
        // 創建時間輪中的 hash 槽
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i ++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

這樣一來,當我們向時間輪添加延時的任務的時候,就可以通過 & 運算來代替 % 運算去尋找延時任務對應的 HashedWheelBucket。

mask = wheel.length - 1;

第二小節我們已經介紹過了,在向時間輪添加延時任務時,我們需要首先定位到這個延時任務最終會停留在哪一個 tick 上,時間輪中的 tick 是一個絕對值,它不會按照時鐘週期的結束而自動歸 0 ,而是一直會往上遞增。

calculated 也是一個絕對值,表示延時任務最終會停留在哪一個 tick 上,隨後通過 calculated & mask 定位到對應的 HashedWheelBucket,時間複雜度為 O(1)

// 計算延時任務,最終會停留在哪一個 tick 上
long calculated = timeout.deadline / tickDuration;

// 獲取 calculated 對應的 HashedWheelBucket
int stopIndex = (int) (calculated & mask);
HashedWheelBucket bucket = wheel[stopIndex];

tickDuration 表示時間輪中的時鐘精度,也就是 tick 指針多久轉動一次,默認為 100ms,我們可以通過構造參數 tickDuration 進行指定,但最小不能低於 1ms。

private final long tickDuration;

tickDuration 的值越小,時間輪的精度越高,性能開銷也就越大。tickDuration 的值越大,時間輪的精度也就越低,性能開銷越小。

現在時間輪的基本骨架就有了,而時間輪的運轉靠的就是 workerThread ,由它來驅動時鐘 tick 一下一下的轉動,並執行對應 HashedWheelBucket 中的延時任務。

private final Worker worker = new Worker();
private final Thread workerThread;

由於在默認情況下,Netty 時間輪中就只有這一個單線程 workerThread 來負責延時任務的調度與執行,在面對海量併發任務的時候,難免顯得有點力不從心。執行任務的時間過長,就會導致 tick 的轉動產生很大的延時。於是 Netty 又在 4.1.69.Final 中引入了一個 taskExecutor,來專門負責執行延時任務。

 private final Executor taskExecutor;

我們可以通過構造參數 taskExecutor 來指定自定義的線程池,默認情況下為 ImmediateExecutor ,其本質還是由 workerThread 來執行延時任務。

public final class ImmediateExecutor implements Executor {
    @Override
    public void execute(Runnable command) {
        ObjectUtil.checkNotNull(command, "command").run();
    }
}

workerThread 負責從對應 tick 的 HashedWheelBucket 中拉取延時任務,然後將延時任務丟給 taskExecutor 來執行。這在一定程度上提高了延時任務的消費速度,不至於拖慢 workerThread 從而影響到整個時間輪的運轉。

時間輪中待執行延時任務的最大個數受到參數 maxPendingTimeouts 限制,默認為 -1 。當 maxPendingTimeouts 的值小於等於 0 的時候,表示 Netty 不會對時間輪中的延時任務個數進行限制。

// 時間輪中待執行延時任務的最大個數
private final long maxPendingTimeouts;
// 時間輪當前待執行的延時任務個數
private final AtomicLong pendingTimeouts = new AtomicLong(0);

當時間輪中的延時任務個數超過了 maxPendingTimeouts 的限制時,再向時間輪添加延時任務就會得到 RejectedExecutionException 異常。

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {

        // 待執行的延時任務計數 + 1
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        // maxPendingTimeouts 默認為 -1 。表示不對時間輪的延時任務個數進行限制
        // 如果達到限制,則不能繼續向時間輪添加任務
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
                    ,,,,,,,,
    }

另外時間輪 HashedWheelTimer 在 JVM 進程中的實例個數會受到 INSTANCE_COUNT_LIMIT 的限制,默認為 64 。

    // 系統當前時間輪實例的個數
    private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();
    // 默認允許的 HashedWheelTimer 最大實例個數
    private static final int INSTANCE_COUNT_LIMIT = 64;

如果當前 JVM 進程中的 HashedWheelTimer 實例個數超過了 64 ,那麼 Netty 就會打印 Error 日誌進行警告。

    private static void reportTooManyInstances() {
        if (logger.isErrorEnabled()) {
            String resourceType = simpleClassName(HashedWheelTimer.class);
            logger.error("You are creating too many " + resourceType + " instances. " +
                    resourceType + " is a shared resource that must be reused across the JVM, " +
                    "so that only a few instances are created.");
        }
    }

從上面的警告信息我們可以看出,時間輪是一種共享的資源,既然是一種系統資源,那麼就和內存資源一樣(ByteBuf)都存在資源泄露的風險。當我們使用完時間輪但忘記調用它的 stop 方法進行關閉的時候,就發生了資源泄露。

和 ByteBuf 一樣,在 HashedWheelTimer 中也有一個 ResourceLeakTracker 用於跟蹤探測資源泄露的發生,如果發生資源泄露,Netty 就會以 Error 日誌的形式打印出泄露的位置。

class SimpleLeakAwareByteBuf extends WrappedByteBuf {
   final ResourceLeakTracker<ByteBuf> leak;
}

public class HashedWheelTimer implements Timer {
    private final ResourceLeakTracker<HashedWheelTimer> leak;
}
關於 ResourceLeakTracker 的實現原理,感興趣的讀者可以回看下筆者之前的文章 《Netty 如何自動探測內存泄露的發生》

我們在創建 HashedWheelTimer 的時候可以通過構造參數 leakDetection 來開啓,關閉時間輪的資源泄露探測。leakDetection 默認為 true , 表示無條件開啓資源泄露的探測。如果設置為 false , 那麼只有當 workerThread 不是守護線程的時候才會開啓資源泄露探測。

workerThread 默認情況下並不是一個守護線程。
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

3.2 延時任務的添加

image

如上圖所示,當我們向時間輪添加一個延時任務時,並不是大家想象的那樣,直接將延時任務添加到時間輪中,而是首先添加到一個叫做 timeouts 的 MpscQueue 中。

    // 多線程在向 HashedWheelTimer 添加延時任務的時候,首先會將任務添加到 timeouts 中,而不是直接添加到時間輪裏
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();

為什麼會這麼設計呢 ? 時間輪是一個單線程驅動的模型,內部只有一個 workerThread 來推動 tick 的轉動,並從對應 HashedWheelBucket 中拉取延時任務。所以時間輪採用的是無鎖化的設計,workerThread 在訪問內部任何數據結構的時候都不會加鎖。

而向時間輪添加延時任務的操作卻是多線程執行的,如果任務被直接添加到時間輪中,那麼就破壞了無鎖化的設計,workerThread 在訪問內部相關數據結構的時候就必須加鎖了。

所以為了避免加鎖的開銷,Netty 引入了一個 MpscQueue 作為中轉,業務多線程首先會將延時任務添加到 MpscQueue 中。等到下一個 tick , workerThread 調度延時任務的時候,會統一將 MpscQueue 中的延時任務轉移到時間輪中。保證了 workerThread 單線程的無鎖化運行。

另外 Netty 時間輪採用了懶啓動的設計,只有第一次向時間輪添加延時任務的時候才會啓動。因為時間輪一旦啓動之後,workerThread 就開始運行,推動 tick 一下一下的向前推進。如果時間輪剛被創建出來就啓動,此時內部又沒有任何延時任務,這就導致了 tick 不必要的空轉。

當時間輪啓動之後,就會根據延時任務 TimerTask 的延時時間 delay 計算到期時間 deadline , 然後將 TimerTask 封裝成 HashedWheelTimeout 添加到 MpscQueue 中。

private static final class HashedWheelTimeout implements Timeout, Runnable {
        // 延時任務所屬的時間輪
        private final HashedWheelTimer timer;
        // 延時任務
        private final TimerTask task;
        // 延時任務的 deadline ,該時間是一個絕對時間,以時間輪的啓動時間 startTime 為起點
        private final long deadline;

        HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
            this.timer = timer;
            this.task = task;
            this.deadline = deadline;
        }
}

之前我們提到過,HashedWheelTimeout 中最重要的一個屬性就是延時任務的到期時間 deadline , deadline 是一個絕對時間戳,Netty 時間輪中的時間座標系全部是以時間輪的啓動時間點 startTime 為基準的,deadline 表示從 startTime 這個時間點開始,到 deadline 這個時間點到期。

image

為什麼這麼設計呢 ? 這是因為我們需要把時間輪的啓動時間也考慮進延時時間的計算當中。比如,我們向時間輪中添加一個延時 5ms 執行的任務,其中時間輪啓動花了 2ms , 那麼這個延時任務就應該在時間輪啓動後 3ms 開始執行。所以在計算延時任務到期時間戳 deadline 的時候需要減去時間輪的啓動時間。後續時間輪的時間座標軸均以 startTime 為基準。

long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

下面是延時任務完整的添加邏輯,整個時間複雜度為 O(1)

public class HashedWheelTimer implements Timer {
    // 時間輪的啓動時間戳(納秒)
    private volatile long startTime;

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        // 懶啓動時間輪,worker 線程會等待 100ms 後執行
        start();
        // 計算延時任務到期的時間戳
        // 時間戳的參考座標系均以時間輪的啓動時間 startTime 為起點
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
}

3.3 延時任務的取消

延時任務的取消和添加一樣,它們都是在 workerThread 之外進行操作的,所以當業務線程取消一個延時任務時,也是先將這個被取消的延時任務放到一個 MpscQueue 中暫存。

private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

等到下一個 tick 到來的時候,workerThread 會統一處理 cancelledTimeouts 集合中已經被取消的延時任務。

        private void processCancelledTasks() {
            for (;;) {
                // workerThread 不斷的從 cancelledTimeouts 中拉取被取消的延時任務
                HashedWheelTimeout timeout = cancelledTimeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                try {
                    // 將延時任務從 HashedWheelBucket 中刪除,時間複雜度 O(1)
                    timeout.remove();
                } catch (Throwable t) {

                }
            }
        }

延時任務 HashedWheelTimeout 的狀態一共有三個,初始為 ST_INIT,任務被取消之後會更新為 ST_CANCELLED,任務準備執行的時候會更新為 ST_EXPIRED。

private static final class HashedWheelTimeout implements Timeout, Runnable {
        private static final int ST_INIT = 0;
        private static final int ST_CANCELLED = 1;
        private static final int ST_EXPIRED = 2;

        private volatile int state = ST_INIT;
        // 延時任務所屬的時間輪
        private final HashedWheelTimer timer;

        @Override
        public boolean cancel() {
            // 更新任務狀態為 ST_CANCELLED
            if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
                return false;
            }

            timer.cancelledTimeouts.add(this);
            return true;
        }
}

3.4 時間輪的啓動

之前我們提過,Netty 時間輪採用了懶啓動的設計,當我們首次向時間輪添加延時任務的時候才會啓動。時間輪有三種狀態,剛被創建出來的時候狀態為 WORKER_STATE_INIT,啓動之後狀態為 WORKER_STATE_STARTED,關閉之後狀態為 WORKER_STATE_SHUTDOWN。

public class HashedWheelTimer implements Timer {
    // 初始狀態
    public static final int WORKER_STATE_INIT = 0;
    // 啓動狀態
    public static final int WORKER_STATE_STARTED = 1;
    // 關閉狀態
    public static final int WORKER_STATE_SHUTDOWN = 2;
    // 時間輪的狀態,初始為 WORKER_STATE_INIT
    private volatile int workerState; 
    // 原子更新時間輪狀態的 Updater
    private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
            AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
    // 監聽時間輪的啓動事件
    private final CountDownLatch startTimeInitialized = new CountDownLatch(1);
    // 時間輪的啓動時間戳(納秒)
    private volatile long startTime;

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    // 啓動 workerThread , 它是一個非守護線程
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // 等待 workerThread 的啓動,workerThread 啓動之後會設置 startTime,並執行 startTimeInitialized.countdown
        while (startTime == 0) {
            try {
                // 等待時間輪的啓動
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
              
            }
        }
    }
}

時間輪中有一個重要的屬性 startTime,初始狀態下為 0 ,啓動之後,workerThread 會將啓動那一刻的時間戳設置到 startTime 中,這個 startTime 非常重要,因為後續時間輪中的時間座標系均是以 startTime 為基準的。時間輪啓動的一項重要工作就是設置這個 startTime。

private final class Worker implements Runnable {
       @Override
        public void run() {
              // 設置時間輪的啓動時間戳
              startTime = System.nanoTime();
              // 通知正在等待時間輪啓動的業務線程
              startTimeInitialized.countDown();

                        ..........
        }
}

3.5 時間輪的運轉

image

時間輪會按照一定的到期 deadline 時間範圍將所有的延時任務分別打散到 512 個 HashedWheelBucket 中,比如,我們在 tick = 0 這個時刻向時間輪添加延時任務,如果這個任務的 deadline 在 [ 0 , 100 )ms 內,那麼它將會被添加到 HashedWheelBucket0,中,如果 deadline 在 [ 100 , 200 )ms 內,那麼就會被添加到 HashedWheelBucket1 中。同樣的道理,如果 deadline 在 [ 200, 300 )ms 內,它將會被添加到 HashedWheelBucket2 中,以此類推。

假設當前 tick = 2 , 那麼就表示 HashedWheelBucket2 中的延時任務馬上要被 workerThread 調度執行,那麼具體在什麼時間執行呢 ?

時間輪中的時間紀元是 tick = 0 ,也就是從 0ms 開始, HashedWheelBucket2 中所有的延時任務,它們的 deadline 都在 [ 200, 300 )ms 以內。那麼當 tick 從 0 轉動到 2 的時候,就表示時間已經過去了 200ms。

但此時還不能馬上就開始執行 HashedWheelBucket2 中的任務,因為它們的延時時間可能是 210ms , 250ms 也可能是 299ms ,如果在 tick = 2 也就是 200ms 的這個時間點就馬上執行,那麼這些任務就被提前執行了。

所以我們需要等到 300ms 也就是 tick = 3 這個時刻才能執行 HashedWheelBucket2 中的延時任務,注意這裏 tick = 3 指的是具體真實的時間已經到了 300ms 這個時間點,而時間輪中的 tick 還是指向 2 ,並沒有向前推進

也就是説,延時 210ms , 250ms , 299ms 的任務,需要等到 300ms 之後才能得到執行,這裏我們也可以看出,時間輪的精度是 tickDuration (默認 100ms),延時任務的調度通常會晚一個 100ms 左右。

這裏提到 "100ms 左右" 的意思是,時間輪中的延時任務可能會被晚調度 5ms ,也可能晚調度 9ms ,也可能是幾十毫秒,也有可能是 105ms , 108ms , 111ms 。具體這個調度延遲的不確定性是如何產生的,我們放在下一個小節在討論,這裏大家有這個概念就可以了。

因此 workerThread 在調度延時任務的時候,通常會首先等到 next tick 的時間點來臨才會開始執行當前 tick 對應的 HashedWheelBucket。

        private long waitForNextTick() {
            // 獲取 tick + 1 對應的時間戳 deadline,後續 workerThread 會 sleep 直到 deadline 到達
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                // 時間輪時間軸中的當前時間戳
                final long currentTime = System.nanoTime() - startTime;
                // 這裏需要保證 workerThread 至少要 sleep 1ms ,防止其被頻繁的喚醒
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
                // 如果 deadline 已過期,那麼直接返回 currentTime
                // tick bucket 中延時任務的 deadline 小於等於 currentTime 的就會被執行
                if (sleepTimeMs <= 0) {
                    // currentTime 溢出
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }

                try {
                    // sleep 等待到 deadline 時間點,然後執行當前 tick bucket 中的延時任務(timeout.deadline <= deadline)
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    // 時間輪被其他線程關閉,中斷 worker 線程
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

時間輪的精度是由 tickDuration 決定的,這個值我們可以調節,默認為 100ms , 但最小不能低於 1ms 。tickDuration 越小,時間輪的精度越高,同時 workerThread 的繁忙程度也越高 。如果 tickDuration 設置的過小,那麼 workerThread 在這裏就會被頻繁的喚醒。

所以為了防止 workerThread 被頻繁的喚醒,我們需要保證至少要 sleep 1ms 。

long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

如果 sleepTimeMs <= 0 則説明當前時間點 currentTime 已經過了 tick + 1 對應的時間戳 deadline , 這樣就不用在這裏等待了,直接返回 currentTime。

只要當前 tick 對應的 HashedWheelBucket 中的延時任務到期時間小於等於 currentTime (延時任務以過期),workerThread 會將會執行它們。

如果 sleepTimeMs > 0 則説明當前時間還沒有到達 tick + 1 這個時間點,那麼 workerThread 就會在這裏睡眠等待。

當時間到達 tick + 1 這個時間點之後,workerThread 就會從這裏喚醒,轉去執行當前 tick 對應的 HashedWheelBucket 裏的延時任務。

int idx = (int) (tick & mask);
HashedWheelBucket bucket = wheel[idx];

但 HashedWheelBucket 裏面此時可能還是空的,沒有任何延時任務。因為當業務線程在向時間輪添加延時任務的時候,首先是要將任務添加到一個叫做 timeouts 的 MpscQueue 中。也就是説延時任務首先會在 timeouts 中緩存,並不會直接添加到對應的 HashedWheelBucket 中,

那麼 workerThread 在被喚醒之後,首先要做的就是從 timeouts 中將延時任務轉移到時間輪對應的 HashedWheelBucket 中。

        private void transferTimeoutsToBuckets() {
            // 每個 tick 最多隻能從 timeouts 中轉移 10 萬個延時任務到時間輪中
            // 防止極端情況下 worker 線程在這裏不停地拉取任務,執行任務
            // 剩下的任務等到下一個 tick 在進行轉移
            for (int i = 0; i < 100000; i++) {
                // 拉取待執行的延時任務
                HashedWheelTimeout timeout = timeouts.poll();
                // 跳過已被取消的任務
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    continue;
                }
                // 計算延時任務最終會停留在哪個 tick 上
                // 這裏的 tick 和 calculated 是一個絕對值,從 0 開始增加,只增不減
                long calculated = timeout.deadline / tickDuration;
                // 時間輪從當前 tick 開始轉動到 calculated 需要經過多少個時鐘週期
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                // calculated < 當前 tick , 則表示延時任務 timeout 已經過期了
                // 那麼就將過期的 timeout 放在當前 tick 中執行
                final long ticks = Math.max(calculated, tick);
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }

當任務轉移完成之後,workerThread 開始處理當前 tick 對應的HashedWheelBucket,將 HashedWheelBucket 中的延時任務挨個拉取出來執行。當所有到期的延時任務被執行完之後,tick 向前推進一格,開啓新一輪的循環。

image

private static final class HashedWheelBucket {

       public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;
            // process all timeouts
            while (timeout != null) { // bucket 不為空
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) { // 屬於當前時鐘週期
                    next = remove(timeout); // 從 bucket 中刪除
                    if (timeout.deadline <= deadline) {
                        // 執行延時任務
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds --;
                }
                timeout = next;
            }
        }
}

時間輪中的延時任務默認情況下是由 workerThread 執行的,但如果我們在創建時間輪的時候指定了專門的 taskExecutor , 那麼延時任務就會由這個 taskExecutor 負責執行,workerThread 只負責調度,大大減輕了 workerThread 的負荷。

private static final class HashedWheelTimeout implements Timeout, Runnable { 
        // 時間輪
        private final HashedWheelTimer timer;

        public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                // 時間輪的 taskExecutor 負責執行延時任務,默認為 workerThread
                timer.taskExecutor.execute(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown while submit " + TimerTask.class.getSimpleName()
                            + " for execution.", t);
                }
            }
        }
}

下面是時間輪運轉的完整邏輯流程:

    private final class Worker implements Runnable {
        @Override
        public void run() {
          
            do {
                // workerThread 這裏會等待到下一個 tick 的時間點
                final long deadline = waitForNextTick();
                // deadline < 0 表示 currentTime 溢出
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    // 將 cancelledTimeouts 中已經取消的 task 從對應 bucket 中刪除
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    // 將 timeouts 中收集的延時任務添加到時間輪中
                    transferTimeoutsToBuckets();
                    // 執行當前 tick 對應 bucket 中的所有延時任務
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

                      ..............

    }

3.6 調度延遲的不確定性是如何產生的

前面我們提到過,時間輪只適合那種對延時任務的調度及時性要求沒那麼高的場景,Netty 時間輪的精度為一個 tickDuration,默認為 100ms 。延時任務的調度通常會晚 100ms 左右。

比如,現在我們向時間輪添加一個延時 5ms 之後執行的任務,那麼這個延時任務可能會在 8ms 之後執行,也可能是 65ms 之後執行,也有可能在 108ms 之後執行。總之,時間輪調度的延遲範圍會在 100ms 左右。那為什麼會出現這種不確定性呢 ?

這其中主要有兩個原因,首先第一個原因就是時間輪的延時任務太多,延時任務的邏輯比較複雜,執行時間略長,導致了 workerThread 的阻塞,從而造成了任務調度的延遲。減緩這種情況的一個措施就是在創建時間輪的時候,我們可以指定一個自定義的 taskExecutor 來專門負責延時任務的執行,減輕 workerThread 的負荷。或者增大 HashedWheelBucket 的個數,儘量的分散延時任務,不要讓它們集中在某一個 HashedWheelBucket 中。

第二個原因是要看我們究竟在哪一個時間點向時間輪添加延時任務。不同的添加時機,也會造成調度的不確定性。這可能有點難以理解,我們來看一個具體的例子。

image

比如,我們在下圖時間軸中 1ms 這個時刻向時間輪添加一個延時 5ms 執行的任務。當前時間輪如上圖所示,tick 指向 0 。

image

延時 5ms 的任務會被添加到 HashedWheelBucket0 中,此時 workerThread 還在 sleep 等待 next tick 也就是 100ms 這個時間點的到來。

我們在 1ms 這個時刻添加的這個延時任務本來應該在時間軸中的 6ms 這個時間點執行,但是現在 workerThread 還在睡眠,需要等到 100ms 這個時間點才能被喚醒去執行 HashedWheelBucket0 中的延時任務。這就產生了 90 ms 的調度延時。

但如果我們在時間軸的 94ms 位置處添加這個 5ms 的延時任務,那麼這個延時任務本應該在時間軸的 99ms 這個時間點被執行,但由於 workerThread 在 100ms 這個時間點才會被喚醒,所以產生了 1ms 的調度延時。

如果非常不幸,我們恰好卡在了時間軸 95ms 這個時間點添加這個 5ms 的延時任務,此時要注意,這個延時任務會被放在 HashedWheelBucket1 中而不是 HashedWheelBucket0。

而 HashedWheelBucket1 中的延時任務,workerThread 需要等到時間軸 200ms 這個時間點才會去執行,這樣一來,本應該在 100ms 這個時間點執行的延時任務,時間輪卻在 200ms 這個時間點來調度,這就產生了 100ms 的調度延時。如果在算上 CPU 調度 workerThread 的時間,那麼這個延遲可能就在 105ms 或者 108ms 左右。這裏大家可以對照上小節的內容,仔細想想是不是這麼回事。

3.7 時間輪的關閉

時間輪定義了三種狀態,在剛被創建出來的時候狀態為 WORKER_STATE_INIT,啓動之後狀態為 WORKER_STATE_STARTED。

public class HashedWheelTimer implements Timer {
    // 初始狀態
    public static final int WORKER_STATE_INIT = 0;
    // 啓動狀態
    public static final int WORKER_STATE_STARTED = 1;
    // 關閉狀態
    public static final int WORKER_STATE_SHUTDOWN = 2;
    // 時間輪的狀態,初始為 WORKER_STATE_INIT
    private volatile int workerState; 
}

當時間輪要關閉的時候,我們就需要將 workerState 更新為 WORKER_STATE_SHUTDOWN。當 workerThread 檢測到時間輪的狀態不是 WORKER_STATE_STARTED 的時候,就會退出 do ... while 循環,停止時間輪的轉動。

隨後 workerThread 會將當前時間輪中所有 HashedWheelBuckets 中遺留的(未來得及執行,未取消)的延時任務以及 timeouts 隊列中緩存的待執行延時任務(未取消)統統轉移到 unprocessedTimeouts 集合中。並將已經取消的延時任務從對應 HashedWheelBucket 中刪除。

private final class Worker implements Runnable {

        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

        @Override
        public void run() {
            do {

                      ........ 時間輪運轉邏輯 .......

            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);


            for (HashedWheelBucket bucket: wheel) {
                // 將 bucket 中還沒來得及執行並且沒有被取消的任務轉移到 unprocessedTimeouts
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            // 將 timeouts 中緩存的待執行任務(沒有被取消)轉移到 unprocessedTimeouts
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            // 將 cancelledTimeouts 中的延時任務從對應 bucket 中刪除
            processCancelledTasks();
        }
}

最後 Netty 會將 unprocessedTimeouts 集合中收集到的那些還未來得及執行的延時任務全部取消,然後將這些延時任務返回給業務線程,由業務線程自行處理。

時間輪關閉的完整流程總結如下:

  1. 在延時任務中不能執行時間輪關閉的操作。
  2. 原子更新時間輪的狀態為 WORKER_STATE_SHUTDOWN
  3. 中斷 workerThread,並等待 workerThread 結束。
  4. 取消所有還未來得及調度的延時任務,並返回給業務線程。
public class HashedWheelTimer implements Timer {
    @Override
    public Set<Timeout> stop() {
        // 在延時任務中不能執行停止時間輪的操作
        if (Thread.currentThread() == workerThread) {
            throw new IllegalStateException(
                    HashedWheelTimer.class.getSimpleName() +
                            ".stop() cannot be called from " +
                            TimerTask.class.getSimpleName());
        }
        // 停止時間輪
        if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
            // cas 更新狀態失敗,這裏時間輪的狀態可能會是兩種:
            // 1. WORKER_STATE_INIT 還未啓動
            // 2. WORKER_STATE_SHUTDOWN 已經停止
            if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
                // 如果時間輪還未啓動,那麼直接停止就好了
                if (leak != null) {
                    boolean closed = leak.close(this);
                    assert closed;
                }
            }
            // 時間輪沒有啓動,那麼肯定也就沒有延時任務,這裏直接返回一個空集合就行
            return Collections.emptySet();
        }

        try {
            boolean interrupted = false;
            while (workerThread.isAlive()) {
                // 中斷 workerThread,使其從 sleep 中喚醒,執行時間輪關閉的邏輯
                // 如果 workerThread 在運行,那麼此時時間輪已經是 WORKER_STATE_SHUTDOWN 狀態
                // workerThread 會退出 do while 循環轉去執行時間輪的關閉邏輯
                workerThread.interrupt();
                try {
                    // 等待 workerThread 的結束
                    workerThread.join(100);
                } catch (InterruptedException ignored) {
                    // 當前線程被中斷
                    interrupted = true;
                }
            }

            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        } finally {
            if (leak != null) {
                // 關閉資源泄露探測
                boolean closed = leak.close(this);
                assert closed;
            }
        }
        // 獲取還未來得及處理的延時任務
        Set<Timeout> unprocessed = worker.unprocessedTimeouts();
        Set<Timeout> cancelled = new HashSet<Timeout>(unprocessed.size());
        // 將還未來得及處理的任務全部取消,然後返回
        for (Timeout timeout : unprocessed) {
            if (timeout.cancel()) {
                cancelled.add(timeout);
            }
        }
        return cancelled;
    }
}

4. 單層時間輪的設計缺陷

Netty 採用的是單層時間輪的設計,時鐘間隔 tickDuration 為 100ms , 一共 512 個刻度。

image

workerThread 每隔 100ms 將時鐘 tick 向前推進一格,並執行對應 bucket 中的延時任務。但如果長時間沒有任務過期的話,那麼時鐘 tick 就會一直空轉,造成不必要的性能損耗。

比如,我們在 tick = 0 這個時刻向時間輪分別添加延時 100ms , 延時 51100ms 的定時任務,它們會被添加到 HashedWheelBucket1, HashedWheelBucket511 中,剩下的 HashedWheelBucket 全部都是空的。

當 100ms 的延時任務被執行完之後,時鐘 tick 將不得不一直空轉,直到空轉到 511 這個刻度,才會去執行 51100ms 的延時任務。當這兩個延時任務被執行完之後,如果不在向時間輪添加新的任務,那麼時間輪將會一直空轉下去。

除了會有時鐘 tick 空轉的現象之外,單層時間輪還無法高效應對延時時間跨度比較大的定時任務。比如,定時任務的延時時間橫跨多個時鐘週期。Netty 時間輪的時鐘週期為 51200ms ,假設我們在 tick = 1 這個時刻向時間輪添加三個延時任務,它們的延時時間分別為:51200ms , 2 51200ms , 3 51200ms 。

這三個延時任務都會被添加到 HashedWheelBucket1 中,但它們的 remainingRounds 都是大於 0 的。

private static final class HashedWheelTimeout {
    // 執行該延時任務需要經過多少時鐘週期
    long remainingRounds;
}

雖然在當前時鐘週期內,HashedWheelBucket1 中沒有任何可以執行的延時任務,但 workerThread 仍然需要將 HashedWheelBucket1 中的延時任務全部遍歷一遍。如果像這樣的延時任務非常多,那麼 workerThread 遍歷 HashedWheelBucket 的操作也會產生不必要的性能開銷。

綜上所述,單層時間輪的設計缺陷總結下來主要有兩個方面:

  1. 在長時間沒有到期任務的情況下,單層時間輪會有時鐘 tick 空轉的現象。
  2. 面對海量延時時間跨度比較大的定時任務場景,除了時鐘 tick 空轉之外,還會產生不必要的 HashedWheelBucket 遍歷開銷。

而 Kafka 中的多層時間輪設計則完美地解決了上面的這兩個缺陷,那怎麼解決的呢 ? 下面就讓我們帶着這兩個疑問來深入探究一下 Kafka 多層時間輪的設計與實現。

5. Kafka 多層時間輪的設計

如下圖所示,Kafka 時間輪的設計模型可以説是和 Netty 時間輪的設計非常相似。

image

時間輪的錶盤都是一個環形數組,Kafka 中的刻度為 20 ,Netty 中的刻度為 512 。

// Kafka
public class TimingWheel {
    private final TimerTaskList[] buckets;
}

// Netty
public class HashedWheelTimer implements Timer {
    private final HashedWheelBucket[] wheel;
}

Netty 使用的是 HashedWheelBucket 結構來描述時間輪中的刻度,它是一個帶有頭尾指針的雙向鏈表。

image

    private static final class HashedWheelBucket {
        private HashedWheelTimeout head;
        private HashedWheelTimeout tail;
    }

Kafka 則使用 TimerTaskList 結構來描述時間輪中的刻度,它是一個帶有頭結點的雙向循環鏈表。

image

class TimerTaskList implements Delayed {
    // 頭結點
    private final TimerTaskEntry root;

    TimerTaskList(
        AtomicInteger taskCounter,
        Time time
    ) {
        this.root = new TimerTaskEntry(null, -1L);
        this.root.next = root;
        this.root.prev = root;
    }
}

時間輪的延時任務在 Netty 中是用 HashedWheelTimeout 結構來表示。

private static final class HashedWheelTimeout implements Timeout, Runnable {
        private final TimerTask task;
        private final long deadline;
        HashedWheelTimeout next;
        HashedWheelTimeout prev;
}

而在 Kafka 中則是用 TimerTaskEntry 結構來表示,可以看出兩種結構是非常相似的,只不過是名字變了一下而已。

public class TimerTaskEntry {
    public final TimerTask timerTask;
    public final long expirationMs;
    TimerTaskEntry next;
    TimerTaskEntry prev;
}

Netty 時間輪中的時鐘間隔是用 tickDuration 來表示,默認為 100ms 。Kafka 中的時鐘間隔是用 tickMs 來表示,默認為 1ms 。

// Netty
private final long tickDuration;
// Kafka
private final long tickMs;

Netty 中的時鐘週期是 51200ms , Kafka 中的時鐘週期是 20ms 。Netty 時間輪的轉動是一個 tick 一個 tick 地向前轉動,時間每經過一個 tickDuration,時鐘 tick++ 。tick 是一個絕對值,從 0 開始遞增,只增不減。

// Netty
private long tick;
// Kafka
private long currentTimeMs;

而 Kafka 時鐘是由 currentTimeMs 來表示的,與 Netty 不同的是,currentTimeMs 是系統當前時間的一個時間戳,單位為毫秒。時間每經過 tickMs,currentTimeMs 就增加 tickMs。Kafka 時間輪的轉動是通過更新 currentTimeMs 時間戳來驅動的。

以上就是 Kafka 與 Netty 在時間輪的設計上比較相似的地方,那麼不同的地方在哪裏呢 ?

我們都知道 Kafka 採用的是多層時間輪的設計,主要是為了應對海量延時時間跨度比較大的定時任務場景。 Kafka 中的時間輪是按需添加的,在初始的時候只有一個時間輪(第一層),如下圖所示:

image

第一層時間輪的精度,也就是時鐘間隔 tickMs 為 1ms , 一共 20 個刻度,時鐘週期 interval 為 20ms 。和 Netty 一樣,時間輪中每個 bucket (TimerTaskList) 負責存放到期時間在相同範圍內的延時任務 TimerTaskEntry。

比如我們以時間輪的初始狀態為例, Kafka 第一層時間輪的環形數組 buckets[0] 中,存放的是到期時間在 [ 0 , 1 ) ms 內的延時任務,buckets[1] 中存放的是到期時間在 [ 1 , 2 ) ms 內的延時任務,buckets[19] 中存放的是到期時間在 [ 19 , 20) ms 內的延時任務。

假設現在我們要向時間輪中添加一個延時 28ms 執行的定時任務。很明顯,它已經超過了第一層時間輪的時鐘週期 —— 20ms,所以我們應該將這個定時任務添加到第二層時間輪中。

前面我們剛剛提過,Kafka 的多層時間輪是按需創建的,初始的時候只有一層時間輪,這時我們就需要創建第二層時間輪,每一層時間輪 TimingWheel 中有一個字段 overflowWheel 用於指向其上層時間輪。

public class TimingWheel {
    // 上層時間輪
    private volatile TimingWheel overflowWheel = null;
}

image

第二層時間輪的時鐘間隔 tickMs 就是第一層時間輪的時鐘週期 interval 為 20ms , 時鐘刻度仍然是 20 , 所以第二層時間輪的時鐘週期 interval 為 400ms。這個延時 28ms 執行的定時任務將會添加到第二層時間輪的 buckets[1] 中。

假設現在我們又向時間輪中添加一個延時 450ms 執行的定時任務,然而第二層時間輪的時鐘週期卻是 400ms , 無法滿足這個延時時間,所以我們就需要創建第三層時間輪。將這個延時 450ms 的定時任務添加到第三層時間輪中。

image

同樣的道理,第三層時間輪的時鐘間隔 tickMs 又是第二層時間輪的時鐘週期 interval 為 400ms , 時鐘刻度仍然是 20 , 所以第三層時間輪的時鐘週期 interval 為 8000ms。這個延時 450ms 執行的定時任務將會添加到第三層時間輪的 buckets[1] 中。

image

Kafka 並沒有限制時間輪的層數,每當最高層時間輪的時鐘週期 interval 無法滿足延時任務的延時時間 delayMs 時,Kafka 就會按需創建一個更高層的時間輪,該時間輪的時鐘間隔 tickMs 就是其下一層時間輪的時鐘週期 interval。

Kafka 的多層時間輪設計與我們日常生活中的鐘表非常的相似,筆者以三層時間輪進行類比説明:

image

第一層時間輪可以看做是隻有秒針的一個錶盤,秒錶的時鐘間隔 tickMs 就是 1s , 時鐘週期 interval 是 60s。

第二層時間輪可以看做是隻有分針的一個錶盤,時鐘間隔 tickMs 是 60s , 正好是秒錶的時鐘週期。分表的時鐘週期 interval 是 60m 。

第三層時間輪可以看做是隻有時針的一個錶盤,時鐘間隔 tickMs 是 60m,正好是分表的時鐘週期。時表的時鐘週期 interval 是 12h 。

秒針每走一個刻度就是一秒,秒針走完一個時鐘週期,分針走一個刻度就是一分鐘,當分針走完一個時鐘週期,時針走一個刻度就是一個小時。

同樣的道理, Kafka 第一層時間輪走一個刻度,就是 1ms , 走完一個時鐘週期就是 20ms 。隨後第二層時間輪走一個刻度,當第二層時間輪走完一個時鐘週期(400ms) 之後,第三層時間輪走一個刻度。第三層時間輪走完一個時鐘週期就是 8000ms 。

但是這樣的設計只是解決了延時任務跨度比較大的問題,卻無法解決 Netty 時間輪中存在的空推進現象。假設,我們現在一共有三層時間輪,但是卻沒有一個延時任務。這種情況下,每一層時間輪也還是會一下一下的向前推進,但是卻執行不到任何延時任務。產生了更加嚴重的空推進問題。

那如何解決呢 ? 我們是不是應該換一種思路去想,如果時間輪中沒有任何延時任務我們就不推進,彷彿時間靜止了一樣,因為在這種情況下繼續推進時間輪沒有任何意義。

如果時間輪中有延時任務,那我們也不會一個刻度一個刻度地推進,而是等到延時任務到期之後,才去推進時間輪。這樣一來,空推進的問題不就解決了嗎 ?

設計思路確定了,那麼如何實現呢 ? 核心問題就是有沒有一種數據結構,可以將延時任務按照過期時間由近及遠地組織起來,如果沒有任何任務過期,時間輪就阻塞在這個數據結構上,如果有任務過期,時間輪就從這個數據結構上喚醒,然後執行延時任務,並將時間輪的 currentTimeMs 直接推進到任務的過期時間點。

這種數據結構不就是筆者在本文 1.2 小節中介紹的 DelayQueue 嗎 ?但是如果我們用 DelayQueue 來組織時間輪中的所有延時任務的話,那麼又會遇到同樣的問題,就是面對海量延時任務的添加,取消時,DelayQueue 的時間複雜度太高 —— O(logn)

我們可以進一步折中設計一下,仔細觀察一下時間輪的設計特點,每一個 bucket (TimerTaskList) 存放的是過期時間在相同範圍內的任務。

image

我們還是以時間輪的初始狀態為例進行説明,每個 bucket 都有一個固定的過期時間範圍 —— [ startTime , endTime ) , 延時任務的過期時間只要在這個範圍之內,那麼他們都會被組織在同一個 bucket 中。

延時任務有過期時間的概念,而 bucket 是用來組織延時任務的,那麼我們反過來思考一下,bucket 自然也就具備了過期時間的概念。在 Kafka 中,bucket 的過期時間就是對應的 startTime。

class TimerTaskList implements Delayed {
    // 過期時間
    private final AtomicLong expiration;
}

當時間到達 startTime 這個時刻之後,Kafka 就會將對應 bucket 中的延時任務挨個拿出來進行處理,這就是多層時間輪整體的一個運轉思路。

既然 bucket 有過期時間的概念,那麼我們何不用 DelayQueue 只保存 bucket ,而時間輪中 bucket 的數量是很少的。比如,單層時間輪只有 20 個 bucket,雙層時間輪也才 40 個 bucket,三層時間輪也只有 60 個 bucket。

用 DelayQueue 來組織時間輪中所有的 bucket , 時間複雜度 O(logn) 就可以忽略不計了,因為 bucket 的數量太少了。

這樣一來,當時間輪中沒有任何到期的 bucket 時,時間輪將會一直阻塞在 DelayQueue 上,不會向前推進,這樣就避免了無意義的空推進現象。當 DelayQueue 中有 bucket 到期時,時間輪被喚醒,將 currentTimeMs 推進到過期的時間點 expiration , 然後將到期 bucket 中的延時任務挨個拿下來處理。

筆者還是以之前的例子進行説明,假設我們現在有三層時間輪,初始狀態下,這三層時間輪的 currentTimeMs 均指向 0ms 這個時刻,時間輪中有兩個延時任務,分別在 28ms , 450ms 之後執行。

image

其中延時 28ms 之後執行的任務被存放在第二層時間輪的 bucket[1] 中,它的到期時間是 20ms 。延時 450ms 之後執行的任務被存放在第三層時間輪的 bucket[1] 中,它的到期時間是 400ms。

在 20ms 之內,時間輪就一直在 DelayQueue 上阻塞,不會向前空推進。當時間達到 20ms 這個時刻之後,第二層時間輪 bucket[1] 到期,時間輪從 DelayQueue 上喚醒。

時間輪的推進首先會從第一層時間輪開始,將第一層時間輪的 currentTimeMs 向前推進到 20ms 這個時刻,接着將第二層時間輪的 currentTimeMs 也向前推進到 20ms 這個時刻。

第三層時間輪的 currentTimeMs 保持不變,仍然停留在 0ms 這個時刻。因為第三層時間輪的時鐘間隔 tickMs 是 400ms , 20ms 還不足以讓第三層時間輪向前推進一格。

時間輪向前推進完畢之後,接下來 Kafka 就會處理到期的 bucket,就是第二層時間輪的 bucket[1] ,其中只有一個任務,它是延時 28ms 之後執行。但現在才過去 20ms ,還有 8ms 才能執行這個延時任務。所以 Kafka 會先將這個任務從第二層時間輪的 bucket[1] 中摘下,重新按照延時 8ms 執行的延時任務插入到第一層時間輪的 bucket[8] 中,然後將 bucket[8] 放入到 DelayQueue 中。

image

此時 DelayQueue 中有兩個 bucket,一個是第一層時間輪的 bucket[8],8ms 之後到期,另一個是之前第三層時間輪的 bucket[1],380ms 之後到期(因為時間已經過了 20ms)。

還是同樣的推進邏輯,當時間過了 8ms 之後,第一層時間輪的 bucket[8] 到期,時間輪從 DelayQueue 中喚醒,還是從第一層時間輪開始推進,不過此時經過上一輪的推進,第一層時間輪的 currentTimeMs 現在停留在 20 ms 這個時刻,所以首先需要將第一層的 currentTimeMs 向前推進到 28ms 。

第二層時間輪當前的 currentTimeMs 也是停留在 20 ms 這個時刻,而第二層時間輪的時鐘間隔是 20ms , 但此時只是過了 8ms ,所以第二層的 currentTimeMs 不會向前推進,仍然是 20ms 。同理第三層時間輪的 currentTimeMs 也不會向前推進,仍然停留在 0ms 這個時刻。

當新一輪的推進工作完成之後,Kafka 就會着手處理過期的 bucket[8](第一層),此時 bucket[8] 中的延時任務也已經到期,所以 Kafka 會將 bucket[8] 中的所有延時任務摘下挨個執行。此時第一層時間輪 currentTimeMs 指向 28ms 這個時刻,一開始加入時間輪的這個延時 28ms 的任務剛好被執行。

當時間來到了 400ms 這個時刻,第三層時間輪的 bucket[1] 到期,Kafka 再一次從 DelayQueue 中喚醒,首先將第一層時間輪的 currentTimeMs 推進到 400ms 這個時刻,然後將第二層時間輪的 currentTimeMs 也推進到 400ms 這個時刻。

由於 400ms 剛好滿足第三層時間輪的時鐘間隔 tickMs(400ms),所以第三層時間輪的 currentTimeMs 也推進到 400ms 這個位置。但 bucket[1] 中的延時任務是要延時 450ms 之後執行,現在時間才過了 400ms ,還有 50ms 才能執行該任務。

所以需要將該任務近一步降級到低層時間輪中,延時任務從高層時間輪降級到低層時間輪的邏輯是:

  1. 首先嚐試將延時任務降級到第一層時間輪中,但第一層時間輪的時鐘週期是 20ms ,無法滿足延時 50ms 的任務。接着嘗試降級到第二層時間輪中。
  2. 第二層時間輪的時鐘週期是 400ms , 可以滿足延時 50ms 的任務,於是將該任務重新添加到第二層時間輪的 bucket[2] 中。然後將 bucket[2] 添加到 DelayQueue 中。

image

此時 DelayQueue 中就只包含一個 bucket 了,這個 bucket 就是第二層時間輪的 bucket[2],它在 40ms 之後過期。此時三層時間輪的 currentTimeMs 都是指向 400ms 這個時刻。

當時間在一次經過 40ms 之後,bucket[2] 到期,第一層時間輪的 currentTimeMs 從 400ms 推進到 440ms ,第二層時間輪的 currentTimeMs 也從 400ms 推進到 440ms,指向 bucket[2] 。由於 40ms 不滿足第三層時間輪的時鐘間隔(400ms),所以第三層的 currentTimeMs 仍然停留在 400ms 。

然而 bucket[2] 中的任務是要在 50ms 之後才能執行,但現在才過了 40ms ,還有 10ms 才能執行該任務。所以需要近一步降級,而第一層時間輪的時鐘週期(20ms)剛好能滿足這個延時時間(10ms), 所以該任務會被重新插入到第一層時間輪的 bucket[10] 中。隨後 bucket[10] 被加入到 DelayQueue 中。

當時間在一次經過 10ms 之後,DelayQueue 中的 bucket[10](第一層)到期,第一層時間輪的 currentTimeMs 從 440ms 推進到 450ms 這個時刻,指向 bucket[10] 。由於 10ms 不滿足第二層時間輪的時鐘間隔(20ms),所以第二層的 currentTimeMs 仍然停留在 440ms。同理第三層的 currentTimeMs 仍然停留在 400ms 。

當時間輪向前推進的工作結束之後,Kafka 就開始處理過期的 bucket[10],其中需要延時 10ms 執行的任務也已經過期,所以 Kafka 會將 bucket[10] 中的所有延時任務挨個摘下執行。那麼一開始加入時間輪的這個需要延時 450ms 之後執行的任務就剛好被執行了。

那麼到現在為止,筆者在第 4 小節最後提出的那兩個問題就完美的解決了,總結一下就是 Kafka 通過引入多層時間輪的設計解決了海量延時時間跨度比較大的定時任務場景。通過引入 DelayQueue 解決了時間輪空推進的問題。

6. Kafka 多層時間輪相關設計模型的實現

經過上一小節的介紹,現在我們已經對 Kafka 多層時間輪的設計要點以及眾多模型概念非常熟悉了,那麼在此基礎之上,在回頭來看源碼實現就很清晰明瞭了,下面筆者先從 Kafka 的時間輪模型 SystemTimer ,TimingWheel 開始介紹。

6.1 時間輪的創建

SystemTimer 用於組織管理多層時間輪,核心屬性如下:

public class SystemTimer implements Timer {
    // 用於執行延時任務的 Executor
    private final ExecutorService taskExecutor;
    // 組織時間輪中所有的 TimerTaskList
    private final DelayQueue<TimerTaskList> delayQueue;
    // 時間輪中管理的延時任務個數
    private final AtomicInteger taskCounter;
    // 指向第一層時間輪,初始狀態下只有一層時間輪,後續按需創建
    private final TimingWheel timingWheel;
}

taskExecutor 用於執行多層時間輪中的延時任務,它是一個單線程的 FixedThreadPool。

delayQueue 用於組織管理多層時間輪中的所有 TimerTaskList,按照 TimerTaskList 的到期時間 expiration ,由近及遠的排列。主要用於防止時間輪空推進的現象。

taskCounter 用於統計多層時間輪中總共管理的延時任務個數。

timingWheel 用於指向第一層時間輪,在初始狀態下,只有一層時間輪,後續會根據延時任務的時間跨度按需創建多層時間輪。

public class SystemTimer implements Timer {

    public SystemTimer(String executorName) {
        this(executorName, 1, 20, Time.SYSTEM.hiResClockMs());
    }

    public SystemTimer(
        String executorName,
        long tickMs,
        int wheelSize,
        long startMs
    ) {
        this.taskExecutor = Executors.newFixedThreadPool(1,
            runnable -> KafkaThread.nonDaemon(SYSTEM_TIMER_THREAD_PREFIX + executorName, runnable));
        this.delayQueue = new DelayQueue<>();
        this.taskCounter = new AtomicInteger(0);
        this.timingWheel = new TimingWheel(
            tickMs, // 1ms
            wheelSize, // 20
            startMs, // 時間輪創建那一刻的時間戳
            taskCounter,
            delayQueue
        );
    }
}

TimingWheel 則是真正的時間輪模型,其核心結構如下圖所示:

image

public class TimingWheel {
    // 時間輪的時鐘間隔,第一層時間輪的 tickMs 為 1ms
    private final long tickMs;
    // 時間輪的刻度,默認為 20 
    private final int wheelSize;
    // 延時任務個數在多層時間輪中的全局計數
    private final AtomicInteger taskCounter;
    // 多層時間輪全局只有一個 DelayQueue 實例,組織管理所有 TimerTaskList
    private final DelayQueue<TimerTaskList> queue;
    // 時間輪的時鐘週期,同時也是其上一層時間輪的時鐘間隔 tickMs
    // 第一層時間輪的 interval 為 20ms
    private final long interval;
    // 時間輪的錶盤,環形數組
    private final TimerTaskList[] buckets;
    // 時間輪的指針,初始為創建時間輪時候的時間戳,它是一個絕對值,只增不減
    private long currentTimeMs;
    // 用於指向其上一層時間輪,按需創建
    private volatile TimingWheel overflowWheel = null;
}

Kafka 時間輪中的指針是 currentTimeMs,它是一個絕對時間戳,初始為時間輪創建時候的時間戳 —— Time.SYSTEM.hiResClockMs(), 單位為毫秒。由於時間輪是根據 tickMs 來一下一下的轉動,所以 currentTimeMs 必須是 tickMs 的整數倍。

    TimingWheel(
        long tickMs,
        int wheelSize,
        long startMs,
        AtomicInteger taskCounter,
        DelayQueue<TimerTaskList> queue
    ) {
        this.tickMs = tickMs;
        this.wheelSize = wheelSize;
        this.taskCounter = taskCounter;
        this.queue = queue;
        this.buckets = new TimerTaskList[wheelSize];
        this.interval = tickMs * wheelSize;
        // currentTimeMs 必須是 tickMs 的整數倍
        this.currentTimeMs = startMs - (startMs % tickMs);

        for (int i = 0; i < buckets.length; i++) {
            buckets[i] = new TimerTaskList(taskCounter);
        }
    }

TimerTaskList 結構來描述時間輪中的刻度,它是一個帶有頭結點的雙向循環鏈表。

image

class TimerTaskList implements Delayed {
    // 時間工具類
    private final Time time;
    // 全局延時任務個數統計
    private final AtomicInteger taskCounter;
    // TimerTaskList 的過期時間
    private final AtomicLong expiration;
    // 雙向循環鏈表的頭結點
    private final TimerTaskEntry root;

    TimerTaskList(
        AtomicInteger taskCounter,
        Time time
    ) {
        this.time = time;
        this.taskCounter = taskCounter;
        // 初始狀態下,一個空的 TimerTaskList,它的 expiration 為 -1
        // 表示未使用,不會加入到 DelayQueue
        this.expiration = new AtomicLong(-1L);
        this.root = new TimerTaskEntry(null, -1L);
        this.root.next = root;
        this.root.prev = root;
    }
}

6.2 延時任務的添加

image

由於 Kafka 多層時間輪的設計,所以在延時任務添加這一塊會比 Netty 的單層時間輪更加複雜一點,因為會涉及到延時任務的升級。另外當 DelayQueue 中的 TimerTaskList 到期的時候,如果 TimerTaskList 中的延時任務還沒到期,也會涉及到延時任務的降級,那麼在降級的過程中延時任務會被添加到低層時間輪中。

public class SystemTimer implements Timer {

    public void add(TimerTask timerTask) {
        readLock.lock();
        try {
            // 將延時任務 TimerTask 封裝成 TimerTaskEntry 添加到時間輪中
            addTimerTaskEntry(new TimerTaskEntry(timerTask, timerTask.delayMs + Time.SYSTEM.hiResClockMs()));
        } finally {
            readLock.unlock();
        }
    }

    private void addTimerTaskEntry(TimerTaskEntry timerTaskEntry) {
        // 嘗試向時間輪中添加延時任務
        // 返回 false 表示延時任務已經被取消或者到期
        // 返回 true 表示延時任務添加成功
        if (!timingWheel.add(timerTaskEntry)) {
            // Already expired or cancelled
            if (!timerTaskEntry.cancelled()) {
                // 如果延時任務到期,則立馬執行延時任務
                taskExecutor.submit(timerTaskEntry.timerTask);
            }
        }
    }
}

無論是在升級還是降級的過程中,如果發現延時任務已經到期,那麼 Kafka 就會立即執行延時任務,延時任務的執行由單線程的 FixedThreadPool 負責。

無論是延時任務的添加還是延時任務從高層時間輪降級到低層時間輪,Kafka 首先都是從第一層時間輪開始嘗試添加延時任務。

image

第一層時間輪的時鐘間隔 tickMs 是 1ms , 一共 20 個 TimerTaskList,時鐘週期為 20ms。每個 TimerTaskList 都有一個延時任務的到期時間範圍 [ startTime , endTime ) 。

我們以 currentTimeMs = 0ms 這個時刻為例,TimerTaskList0 表示的延時任務到期時間範圍是 [ 0ms , 1ms) ,過期時間在這個範圍內的延時任務都會被添加到 TimerTaskList0 中。同理,TimerTaskList1 表示的時間範圍為 [ 1ms , 2ms) , TimerTaskList19 表示的時間範圍為 [ 19ms , 20ms)

每個 TimerTaskList 也有一個自己的過期時間戳 expiration。TimerTaskList 在初始狀態下,也就是沒有任何延時任務的時候,它的 expiration 是 -1 。

class TimerTaskList implements Delayed {
  // 過期時間戳
  private final AtomicLong expiration;
}

當向 TimerTaskList 第一次添加延時任務的時候,expiration 將會被設置為 startTime 。隨後會被加入到 DelayQueue 中。

當 currentTimeMs 到達某個 TimerTaskList 的 startTime 的時候,那麼該 TimerTaskList 就過期了,Kafka 就可以處理該 TimerTaskList 中的延時任務。

比如,當第一層時間輪的 currentTimeMs 到達 0ms 這個時刻時,TimerTaskList0 過期,TimerTaskList0 中存放的是過期時間範圍在 [ 0ms , 1ms) 之內的延時任務。例如,延時 0.5ms ,0.6ms , .... , 0.9ms 的任務將會在 currentTimeMs = 0ms 這個時刻執行。從這裏我們可以看出 Kafka 時間輪的時鐘精度就是 1ms 。

所以判斷一個延時任務是否過期的條件就是看該任務的過期時間 expiration 是否小於當前時間輪的 currentTimeMs + tickMs,如果小於,則表示該延時任務已經過期,需要被立即執行。

當延時任務的過期時間 expiration 小於當前時間輪的時鐘週期 —— currentTimeMs + interval 的時候,就表示當前時間輪可以滿足該延時任務的時間跨度,所以該延時任務就會被添加到當前時間輪中,這裏的邏輯就和 Netty 的單層時間輪一樣了。

// Kafka
long virtualId = expiration / tickMs;
int bucketId = (int) (virtualId % (long) wheelSize);
TimerTaskList bucket = buckets[bucketId];
bucket.add(timerTaskEntry);

// Netty
long calculated = timeout.deadline / tickDuration;            
final long ticks = Math.max(calculated, tick);
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
bucket.addTimeout(timeout);

比如第一層時間輪中,在 1.5ms 這個時刻到期的延時任務就會被添加到 TimerTaskList1 中,TimerTaskList1 的時間範圍為 [ 1ms , 2ms),TimerTaskList1 的過期時間為 virtualId * tickMs = 1ms

在 19.6ms 這個時刻到期的延時任務就會被添加到 TimerTaskList19 中,TimerTaskList19 的時間範圍為 [ 19ms , 20ms),TimerTaskList19 的過期時間為 virtualId * tickMs = 19ms

當延時任務被添加到對應的 TimerTaskList 之後,我們就需要設置 TimerTaskList 的過期時間 expiration。

class TimerTaskList implements Delayed {
    public boolean setExpiration(long expirationMs) {
        return expiration.getAndSet(expirationMs) != expirationMs;
    }
}

TimerTaskList 的過期時間 expiration 初始為 -1 , 當第一次向 TimerTaskList 添加延時任務的時候 setExpiration 方法返回 true ,後續再向該 TimerTaskList 添加延時任務時,TimerTaskList 的 expiration 就保持不變了, setExpiration 方法返回 false 。

setExpiration 方法返回 true 的時候,表示我們第一次向 TimerTaskList 添加延時任務,那麼這個 TimerTaskList 就會被加入到 DelayQueue 中。

如果延時任務的過期時間 expiration 大於等於當前時間輪的時鐘週期 —— currentTimeMs + interval 的時候,就表示當前時間輪無法滿足該延時任務的時間跨度。這時候 Kafka 就會按需創建上一層時間輪,隨後嘗試向上一層時間輪添加延時任務。

    private synchronized void addOverflowWheel() {
        if (overflowWheel == null) {
            overflowWheel = new TimingWheel(
                interval, // 高層時間輪的 tickMs 恰好是其低一層時間輪的 interval
                wheelSize,
                currentTimeMs,
                taskCounter,
                queue
            );
        }
    }

這裏我們可以看到,上一層時間輪的時鐘間隔 tickMs 恰好是其下一層時間輪的時鐘週期 interval。

image

下面是向 Kafka 多層時間輪添加延時任務的完整邏輯:

public class TimingWheel {
    public boolean add(TimerTaskEntry timerTaskEntry) {
        // 獲取延時任務到期時間戳
        long expiration = timerTaskEntry.expirationMs;

        if (timerTaskEntry.cancelled()) {
            // 延時任務被取消則返回 false
            return false;
        } else if (expiration < currentTimeMs + tickMs) { 
            // 表示延時任務已經到期     
            return false;
        } else if (expiration < currentTimeMs + interval) {
            // 當前時間輪的時鐘週期可以滿足延時任務的時間跨度
            // 那麼就將延時任務添加到當前時間輪中
            // 計算延時任務應該被添加到哪個 TimerTaskList 中
            long virtualId = expiration / tickMs;
            int bucketId = (int) (virtualId % (long) wheelSize);
            TimerTaskList bucket = buckets[bucketId];
            bucket.add(timerTaskEntry);
            // 設置 TimerTaskList 的過期時間
            if (bucket.setExpiration(virtualId * tickMs)) {
                // 將 TimerTaskList 添加到 DelayQueue 中
                queue.offer(bucket);
            }
            // 延時任務添加成功返回 true
            return true;
        } else {
            // 當前時間輪的時鐘週期無法滿足延時任務的時間跨度
            // 那麼就將延時任務升級到上一層時間輪中
            if (overflowWheel == null) addOverflowWheel(); // 按需添加多層時間輪
            return overflowWheel.add(timerTaskEntry);
        }
    }
}

6.3 多層時間輪的運轉

Netty 是通過一個 workerThread 每隔 tickDuration(100ms)將時鐘 tick 向前推進一格。Kafka 中也有一個工作線程 —— Reaper 來推動多層時間輪的運轉。

    private static final long WORK_TIMEOUT_MS = 200L;

    class Reaper extends ShutdownableThread {
        @Override
        public void doWork() {
            try {
                timer.advanceClock(WORK_TIMEOUT_MS);
            } catch (InterruptedException ex) {
                // Ignore.
            }
        }
    }

而 Kafka 為了解決時間輪空推進的問題,只有 TimerTaskList 到期的時候 Reaper 線程才會向前推進多層時間輪,如果沒有 TimerTaskList 到期,Kafka 是不會向前推進的,彷彿時間靜止了一樣。

延時任務會按照過期時間的不同被組織在不同的 TimerTaskList 中, 每個 TimerTaskList 都有一個過期時間範圍 —— [ startTime , endTime) , 只要過期時間在這個範圍內的延時任務,那麼它就會被添加到該 TimerTaskList 中。TimerTaskList 自身的過期時間被設置為 startTime。關於這一點,筆者已經在前面的內容中反覆強調過了。

image

當沒有任何 TimerTaskList 到期的情況下,Reaper 線程就會一直在 delayQueue 上阻塞等待,直到有到期的 TimerTaskList 出現,Reaper 線程從 delayQueue 上被喚醒,開始處理 TimerTaskList 中的延時任務,並向前推進多層時間輪。

TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);

當某一個 TimerTaskList 到期之後,説明現在時間已經來到了該 TimerTaskList 對應的 startTime , 那麼 Kafka 就會先從第一層時間輪開始,嘗試將多層時間輪的 currentTimeMs 推進到 startTime。

但在推進之前,首先需要判斷 startTime 與當前 currentTimeMs 的時間間隔是否滿足當前時間輪的 tickMs(時鐘間隔),如果不滿足,時間輪就不能向前推進,因為還不夠一個時鐘間隔。

比如上圖中展示的延時任務。當時間達到 20ms 這個時刻之後,第二層時間輪 bucket[1] 到期,時間輪從 DelayQueue 上喚醒。時間輪的推進首先會從第一層時間輪開始,因為 20ms 已經達到了第一層時間輪的時鐘間隔(1ms), 所以 Kafka 會將第一層時間輪的 currentTimeMs 向前推進到 20ms 這個時刻。

接着開始嘗試推進第二層時間輪,由於 20ms 仍然滿足第二層時間輪的時鐘間隔(20ms),所以也將第二層時間輪的 currentTimeMs 向前推進到 20ms 這個時刻。

後面緊跟着就會嘗試推進第三層時間輪。但此時 20ms 已經無法滿足第三層時間輪的時鐘間隔(400ms)了,所以第三層時間輪的 currentTimeMs 仍然停留在 0ms 這個時刻,不會向前推進。

public class TimingWheel {
    // 從第一層時間輪開始,嘗試將各層時間輪的 currentTimeMs 推進到過期 TimerTaskList 的 startTime(timeMs)
    public void advanceClock(long timeMs) {
        // 時間跨度是否滿足當前時間輪的時鐘間隔
        if (timeMs >= currentTimeMs + tickMs) {
            // 將時間輪的時間推進到過期 TimerTaskList 的 startTime 位置
            // 必須確保 currentTimeMs 是時鐘間隔 tickMs 的整數倍
            currentTimeMs = timeMs - (timeMs % tickMs);
            // 嘗試推進更高層時間輪
            if (overflowWheel != null) overflowWheel.advanceClock(currentTimeMs);
        }
    }
}

當多層時間輪的推進工作結束之後,Reaper 線程就會着手處理到期 TimerTaskList 中的延時任務,那麼 Kafka 這裏該如何處理這些延時任務呢 ?

由於是多層時間輪的設計,所以 TimerTaskList 到期,並不代表其中的延時任務到期。比如,上圖第二層時間輪中的 TimerTaskList1,它在 20ms 這個時刻到期,但是其中的延時任務卻是在 28ms 這個時刻到期。

所以就要將 TimerTaskList 中所有的延時任務挨個進行降級,降級的過程上一小節筆者已經詳細介紹過了,Kafka 首先會嘗試將延時任務降級到第一層時間輪,如果第一層時間輪的時鐘週期無法滿足延時跨度,那麼在嘗試向第二層時間輪降級,這樣循環往復,直到可以找到某一層時間輪的時鐘週期能夠滿足該延時跨度。

比如,第一層時間輪的 currentTimeMs 現在已經推進到了 20ms 這個時刻,指向 TimerTaskList0 , 而延時 28ms 的任務降級到第一層時間輪之後,currentTimeMs 只需要等待 8ms 之後就可以轉動到 28ms 的位置,而第一層時間輪的時鐘週期是 20ms ,所以可以滿足這個 8ms 的延時跨度,於是該延時任務就會被降級到第一層時間輪的 TimerTaskList8 中。

image

同理,當時間經過 400ms 之後,第三層時間輪的 TimerTaskList1 到期,但是其中的延時任務卻是在 450ms 這個時刻才能到期,於是首先嚐試向第一層時間輪降級。

第一層時間輪的 currentTimeMs 現在已經推進到了 400ms 這個時刻,仍然指向 TimerTaskList0,如果延時 450ms 的任務添加到第一層時間輪之後,currentTimeMs 需要等待 50ms 之後才可以轉動到 450ms 的位置,但是第一層時間輪的時鐘週期是 20ms ,無法滿足這個 50ms 的延時跨度。

於是該延時任務就會近一步嘗試向第二層時間輪降級,第二層時間輪的 currentTimeMs 也已經推進到了 400ms 這個時刻,等待 50ms 之後就可以轉動到 450ms 的位置,而第二層時間輪的時鐘週期是 400ms ,可以滿足這個 50ms 的延時跨度,於是該延時任務最終會被降級到第二層時間輪的 TimerTaskList2中。

image

下面就是 Kafka 針對以上過程的源碼實現,過程梳理清晰了,源碼就變得簡單明瞭了:

public class SystemTimer implements Timer {

    public boolean advanceClock(long timeoutMs) throws InterruptedException {
        // 等待到期的 TimerTaskList
        TimerTaskList bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS);
        if (bucket != null) {
            writeLock.lock();
            try {
                while (bucket != null) {
                    // 嘗試將多層時間輪的 currentTimeMs 向前推進到 TimerTaskList 的過期時間
                    timingWheel.advanceClock(bucket.getExpiration());
                    // 將過期 TimerTaskList 中的延時任務挨個降級到低層時間輪中
                    bucket.flush(this::addTimerTaskEntry);
                    bucket = delayQueue.poll();
                }
            } finally {
                writeLock.unlock();
            }
            return true;
        } else {
            // 沒有到期的任務
            return false;
        }
    }
}

總結

本文我們主要討論了定時任務調度相關的主題,筆者一開始介紹了 JDK 的三種調度組件:Timer,DelayQueue,ScheduledThreadPoolExecutor。但他們的共同特點都是採用了小根堆這種數據結構來組織管理定時任務,然而在面對海量定時任務的添加與取消時,這種設計的時間複雜度會比較高 —— O(logn)

隨後筆者介紹了 Netty 的單層時間輪 HashedWheelTimer,它將海量定時任務的添加與取消操作的時間複雜度降低到了 O(1) , 但無法避免時間輪的空推進現象以及無法應對海量延時跨度比較大的定時任務場景。

最後筆者詳細討論了 Kafka 時間輪的設計與實現,Kafka 通過引入 DelayQueue 以及多層時間輪,巧妙地解決了時間輪的空推進現象和海量延時任務時間跨度大的管理問題。好了,今天的內容到這裏就全部結束了,我們下篇文章見~~~

user avatar journey_64224c9377fd5 頭像 eolink 頭像 ahahan 頭像 tssc 頭像 dalideshoushudao 頭像 cbuc 頭像 jianghushinian 頭像 chaochenyinshi 頭像 wuliaodeliema 頭像 emanjusaka 頭像 gvison 頭像 changqingdezi 頭像
點贊 44 用戶, 點贊了這篇動態!
點贊

Add a new 評論

Some HTML is okay.