博客 / 詳情

返回

【Java併發編程線程池】 ForkJoinPool 線程池是什麼 怎麼工作的 和傳統的ThreadPoolExecutor比較

Java 中的 ForkJoinPool 線程池是什麼 怎麼工作的

Java 中的 ForkJoinPool 線程池是什麼 怎麼工作的

相比較於傳統的線程池,ForkJoinPool 線程池更適合處理大量的計算密集型任務,它的核心思想是將一個大任務拆分成多個小任務,然後將這些小任務分配給多個線程去執行,最後將這些小任務的結果合併起來,得到最終的結果。

工作竊取

值得注意的,ForkJoinPool中的每個線程都有自己的任務隊列,當線程任務隊列空了或者當前線程空閒,則會去別的線程的任務隊列獲取待執行任務,這個過程是工作竊取。

怎麼工作的

(一):傳統線程池

Java中傳統線程池(ThreadPoolExecutor)的工作流程的描述:

以 new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) 為例,線程池的工作流程如下:

代碼用例:

public static void main1(String[] args) {
    final var poolExecutor = new ThreadPoolExecutor(
            2, // 核心線程數
            4, // 最大線程數
            60,// 線程空閒時間
            TimeUnit.SECONDS, // 時間單位
            new ArrayBlockingQueue<>(10), // 任務隊列
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
    );
    // 定義一個Runnable
    MyRunnable runnable = new MyRunnable();
    // 提交任務
    for (int i = 0; i < 10; i++) {
        poolExecutor.submit(runnable);
    }
}

static class MyRunnable implements Runnable {
    @SneakyThrows
    @Override
    public void run() {
        Thread.sleep(1000);
        System.out.println("Hello, world!");
    }
}
  1. 提交任務;
  2. 線程池中的線程數量小於 corePoolSize(核心線程數)時,不管有沒有空閒的線程,都會創建一個新的線程來執行任務;
  3. 線程池中的線程數量等於 corePoolSize 時,若沒有空閒線程,則任務會被放入任務隊列中,等待線程池中的線程調度執行;
  4. 核心線程數已滿 且 任務隊列已滿時,線程數量小於 maximumPoolSize(最大線程數)時,會創建新的線程來執行任務;
  5. 最後若是核心線程數、任務隊列、最大線程數都滿了,會根據拒絕策略來處理新任務。

圖解:
傳統ThreadPoolExecutor執行示意圖

(二):ForkJoinPool 線程池

ForkJoinPool工作流程的描述:

以new ForkJoinPool(4) 和 普通的任務 為例,ForkJoinPool的工作流程如下:

代碼用例:

public static void main(String[] args) {
    final var forkJoinPool = new ForkJoinPool(4);// 這個參數 “4” 的作用指定線程池的線程數量
    // 定義一個Runnable
    MyRunnable runnable = new MyRunnable();
    // 提交任務
    for (int i = 0; i < 10; i++) {
        forkJoinPool.submit(runnable);
    }
}
  1. 提交任務;(初次執行時,會做一些初始化工作,如創建任務隊列的數組)
  2. 線程數量小於指定的線程數量時,會創建新的線程來執行任務,且每個線程都有自己的任務隊列,區分於傳統線程池只有一個任務隊列,ForkJoinPool的線程池裏的每個線程都有自己的任務隊列;
  3. 每個線程從自己的任務隊列中取出任務執行;
  4. 每個任務執行完畢後,且自己的任務隊列為空時,會從其他線程的任務隊列中偷取任務執行,這裏被稱作為“工作竊取”;

圖解:
ForkJoinPool運行示意圖

怎麼體現將大任務拆分成小任務

在剛剛的代碼用例中,我們提交了10個任務,但是我們的任務是一個普通的任務,沒有體現將大任務拆分成小任務,下面我們換一個任務,體現將大任務拆分成小任務。

任務要求

要求計算 1 ~ 100w 的階和,就是 "1+2+3+...+100w".

任務拆分方案:

我們可以將這個大任務拆分成多個小任務,將任務分配小到數的差值為10及以內時不再拆分,每個小任務計算區間的和,最後將這些小任務的結果合併起來,得到最終的結果。
如下

1:100w 的階和 = 1w 的階和 + 2w 的階和 + 3w 的階和 + ... + 10w 的階和
2:10w 的階和 = 1w 的階和 + 2w 的階和 + 3w 的階和 + ... + 10w 的階和
3:以此類推,直到區間的差值小於等於10時,不再拆分。
如100的階和 = 1~10 的階和 + 11~20 的階和 + ... + 91~100 的階和,其中 1~10的差值不大於10,不再拆分。

代碼用例:

public static void main(String[] args) {
    // 任務要求:計算 1 ~ 100w 的階和,就是 "1+2+3+...+100w".
    // 1. 創建一個 ForkJoinPool 對象
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    // 2. 提交一個任務
    Integer result = forkJoinPool.invoke(new MyTask(1, 1000000));
    System.out.println("result = " + result);
}

private static class MyTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 10;// 拆分最小任務的閾值
    private int begin;// 計算的區間的開始
    private int end;// 計算的區間的結束
    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if (end - begin <= THRESHOLD) {
            int sum = 0;
            for (int i = begin; i <= end; i++) {
                sum += i;
            }
            return sum;
        } else {
            int mid = (begin + end) / 2;
            MyTask left = new MyTask(begin, mid);
            MyTask right = new MyTask(mid + 1, end);
            left.fork();// 提交到任務隊列
            right.fork();
            return left.join() + right.join();// join等待結果,線程會出現空閒,會去別的任務隊列竊取任務執行
        }
    }
}

場景(常用於哪)

最常見、常用的場景中,其實就是在我們的Stream API中,我們使用了parallelStream()方法,底層就是使用了ForkJoinPool線程池。還有就是Java中新特性“虛擬線程”也是基於ForkJoinPool線程池實現的。

user avatar dm2box 頭像 mokeywie 頭像 yadong_zhang 頭像 goudantiezhuerzi 頭像 91cyz 頭像 gozhuyinglong 頭像 shadowck 頭像 yexiaobai_616e2b70869ae 頭像 tangtaixian_5fc4b5d1c3eff 頭像 u_17483758 頭像 u_15701057 頭像
11 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.