博客 / 詳情

返回

ForkJoinPool在生產環境中使用遇到的一個問題

1、背景

在我們的項目中有這麼一個場景,需要消費kafka中的消息,並生成對應的工單數據。早些時候程序運行的好好的,但是有一天,我們升級了容器的配置,結果導致部分消息無法消費。而消費者的代碼是使用CompletableFuture.runAsync(() -> {while (true){ ..... }}) 來實現的。
即:

  1. 需要消費Kafka topic的個數: 7個,每個線程消費一個topic
  2. 消費方式:使用線程池異步消費
  3. 消費池:默認的 ForkJoin 線程池???,並且沒有做任何配置
  4. 是否會釋放線程池中的核心線程: 不會釋放
  5. 沒出問題時容器配置: 2核4G
  6. 出問題時容器配置:4核8G,影響的結果:只有3個topic的數據可以消費。

2、容器2核4G可以正常消費

容器2核4G可以正常消費

即:此時程序會啓動7個線程來進行消費。

3、容器4核8G只有部分可以消費

容器4核8G只有部分可以消費

即:此時程序會啓動3個線程來進行消費。

4、問題原因分析

1、通過上面的背景我們可以知道,是因為升級了容器的配置,才導致我們消費kafka中的消息失敗了。
2、針對kafka中的每個topic,我們都會使用一個單獨的線程來消費,並且不會釋放這個線程。\
3、而線程的啓動方式是通過CompletableFuture.runAsync()方法來啓動的,那麼通過這種方式啓動的線程,是每個任務一個啓動一個線程,還是隻啓動固定的線程呢?.

通過以上分析,那麼問題肯定是出現在線程池身上,那麼我們默認使用的是什麼線程池呢?查看CompletableFuture.runAsync()的源碼可知,有一定的機率是ForkJoinPool。那麼我們一起看下源碼。

5、源碼分析

源碼分析

1、確認使用什麼線程池

public static CompletableFuture<Void> runAsync(Runnable runnable) {
   return asyncRunStage(asyncPool, runnable);
}
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

通過上述源碼可知,我們可能使用的ForkJoin線程池,也可能使用的是ThreadPerTaskExecutor線程池。

  1. ThreadPerTaskExecutor 這個是每個任務,一個線程。
  2. ForkJoinPool 那麼就需要確定啓動了多少個線程。

2、確認是否使用 ForkJoin 線程池

需要確定 useCommonPool 字段是如何賦值的。

private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

通過上面代碼可知,是否使用ForkJoin線程池,是由 ForkJoinPool.getCommonPoolParallelism()的值確定的。(即並行度是否大於1,大於則使用ForkJoin線程池)

public static int getCommonPoolParallelism() {
    return commonParallelism;
}

3、commonParallelism 的賦值

在這裏插入圖片描述
1、從上圖中可知parallelism的設置有2種方式

  • 通過Jvm的啓動參數java.util.concurrent.ForkJoinPool.common.parallelism進行設置,且這個值最大為 MAX_CAP即32727。
  • 若沒有通過Jvm的參數配置,則有2種情況,若cpu的核數<=1,則返回1,否則返回cpu的核數-1

2、commonParallelism的取值

common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
int par = common.config & SMASK; // report 1 even if threads disabled
commonParallelism = par > 0 ? par : 1;

SMASK 的值是 65535。
common.config 的值就是 (parallelism & SMASK) | 0的值,即最大為65535,若parallelism的值為0,則返回0。\
int par = common.config & SMASK ,即最大為 65535
commonParallelism = par > 0 ? par : 1 的值就為 parallelism的值或1

6、結論

線程池選擇
結論:\
由上面的知識點,我們可以得出,當我們的容器是2核4G時,程序選擇的線程池是ThreadPerTaskExecutor,當我們的容器是4核8G時,程序選擇的線程池是ForkJoinPool

user avatar markerhub 頭像 xiaoweiyu 頭像 prepared 頭像 edagarli 頭像 huzilachadedanche 頭像 yaoyaolx_wiki 頭像 async_wait 頭像 wodingshangniliao 頭像 qiehxb8 頭像 dadegongjian 頭像 jellyfishmix 頭像 lianhuatongzina 頭像
12 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.