Java 中的 ForkJoinPool 線程池是什麼 怎麼工作的
Java 中的 ForkJoinPool 線程池是什麼 怎麼工作的
相比較於傳統的線程池,ForkJoinPool 線程池更適合處理大量的計算密集型任務,它的核心思想是將一個大任務拆分成多個小任務,然後將這些小任務分配給多個線程去執行,最後將這些小任務的結果合併起來,得到最終的結果。
工作竊取
值得注意的,ForkJoinPool中的每個線程都有自己的任務隊列,當線程任務隊列空了或者當前線程空閒,則會去別的線程的任務隊列獲取待執行任務,這個過程是工作竊取。
怎麼工作的
(一):傳統線程池
Java中傳統線程池(ThreadPoolExecutor)的工作流程的描述:
以 new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)) 為例,線程池的工作流程如下:
代碼用例:
public static void main1(String[] args) {
final var poolExecutor = new ThreadPoolExecutor(
2, // 核心線程數
4, // 最大線程數
60,// 線程空閒時間
TimeUnit.SECONDS, // 時間單位
new ArrayBlockingQueue<>(10), // 任務隊列
new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
);
// 定義一個Runnable
MyRunnable runnable = new MyRunnable();
// 提交任務
for (int i = 0; i < 10; i++) {
poolExecutor.submit(runnable);
}
}
static class MyRunnable implements Runnable {
@SneakyThrows
@Override
public void run() {
Thread.sleep(1000);
System.out.println("Hello, world!");
}
}
- 提交任務;
- 線程池中的線程數量小於 corePoolSize(核心線程數)時,不管有沒有空閒的線程,都會創建一個新的線程來執行任務;
- 線程池中的線程數量等於 corePoolSize 時,若沒有空閒線程,則任務會被放入任務隊列中,等待線程池中的線程調度執行;
- 核心線程數已滿 且 任務隊列已滿時,線程數量小於 maximumPoolSize(最大線程數)時,會創建新的線程來執行任務;
- 最後若是核心線程數、任務隊列、最大線程數都滿了,會根據拒絕策略來處理新任務。
圖解:
(二):ForkJoinPool 線程池
ForkJoinPool工作流程的描述:
以new ForkJoinPool(4) 和 普通的任務 為例,ForkJoinPool的工作流程如下:
代碼用例:
public static void main(String[] args) {
final var forkJoinPool = new ForkJoinPool(4);// 這個參數 “4” 的作用指定線程池的線程數量
// 定義一個Runnable
MyRunnable runnable = new MyRunnable();
// 提交任務
for (int i = 0; i < 10; i++) {
forkJoinPool.submit(runnable);
}
}
- 提交任務;(初次執行時,會做一些初始化工作,如創建任務隊列的數組)
- 線程數量小於指定的線程數量時,會創建新的線程來執行任務,且每個線程都有自己的任務隊列,區分於傳統線程池只有一個任務隊列,ForkJoinPool的線程池裏的每個線程都有自己的任務隊列;
- 每個線程從自己的任務隊列中取出任務執行;
- 每個任務執行完畢後,且自己的任務隊列為空時,會從其他線程的任務隊列中偷取任務執行,這裏被稱作為“工作竊取”;
圖解:
怎麼體現將大任務拆分成小任務
在剛剛的代碼用例中,我們提交了10個任務,但是我們的任務是一個普通的任務,沒有體現將大任務拆分成小任務,下面我們換一個任務,體現將大任務拆分成小任務。
任務要求
要求計算 1 ~ 100w 的階和,就是 "1+2+3+...+100w".
任務拆分方案:
我們可以將這個大任務拆分成多個小任務,將任務分配小到數的差值為10及以內時不再拆分,每個小任務計算區間的和,最後將這些小任務的結果合併起來,得到最終的結果。
如下
1:100w 的階和 = 1w 的階和 + 2w 的階和 + 3w 的階和 + ... + 10w 的階和
2:10w 的階和 = 1w 的階和 + 2w 的階和 + 3w 的階和 + ... + 10w 的階和
3:以此類推,直到區間的差值小於等於10時,不再拆分。
如100的階和 = 1~10 的階和 + 11~20 的階和 + ... + 91~100 的階和,其中 1~10的差值不大於10,不再拆分。
代碼用例:
public static void main(String[] args) {
// 任務要求:計算 1 ~ 100w 的階和,就是 "1+2+3+...+100w".
// 1. 創建一個 ForkJoinPool 對象
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
// 2. 提交一個任務
Integer result = forkJoinPool.invoke(new MyTask(1, 1000000));
System.out.println("result = " + result);
}
private static class MyTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10;// 拆分最小任務的閾值
private int begin;// 計算的區間的開始
private int end;// 計算的區間的結束
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if (end - begin <= THRESHOLD) {
int sum = 0;
for (int i = begin; i <= end; i++) {
sum += i;
}
return sum;
} else {
int mid = (begin + end) / 2;
MyTask left = new MyTask(begin, mid);
MyTask right = new MyTask(mid + 1, end);
left.fork();// 提交到任務隊列
right.fork();
return left.join() + right.join();// join等待結果,線程會出現空閒,會去別的任務隊列竊取任務執行
}
}
}
場景(常用於哪)
最常見、常用的場景中,其實就是在我們的Stream API中,我們使用了parallelStream()方法,底層就是使用了ForkJoinPool線程池。還有就是Java中新特性“虛擬線程”也是基於ForkJoinPool線程池實現的。