一、引 言
為什麼進行源碼角度的深度解析?
大家在項目中到處都在使用線程池做一些性能接口層次的優化,原先串行的多個遠程調用,因為rt過高,通過線程池批量異步優化,從而降低rt。還有像RocketMQ中broker啓動時,同時通過ScheduledThreadPoolExecutor線程池執行其他組件的定時任務,每隔一段時間處理相關的任務。線程池廣泛的應用在外面各種實際開發場景中,我們很多同學可能在項目裏只是簡單的copy了一些前人的代碼參數並不知道其中的含義,從而導致生產級別的bug。所以本篇文章,旨在幫助還不熟悉或者想要熟悉線程池的同學,分享我自己在學習線程池源碼上的一些內容來更簡單、快速的掌握線程池。
二、為什麼使用線程池?
併發編程中,對於常見的操作系統,線程都是執行任務的基本單元,如果每次執行任務時都創建新的線程,任務執行完畢又進行銷燬,會出現以下的問題:
- 資源開銷:比如在Linux系統中,頻繁的創建和銷燬線程,一個是頻繁的進行一個系統調用,另外是一些內存和CPU資源調度的佔用。雖然有一些寫時複製的策略防止lwp的創建時的內存佔用,但是實際寫入還是會申請系統內存的,何況一些頁表等本身就有內存佔用。
- 性能瓶頸:線程的創建需要系統調用,如果只是簡單的計算任務,可能耗時還沒創建的rt高,這裏反而降低了系統的吞吐量。
- 缺乏資源管理:無限制的創建線程會導致內存溢出,java.lang.OutOfMemoryError: unable to create native thread,這裏主要因為Java的線程其實Linux中是lwp線程,需要通過JNI進行系統調用創建,每個線程默認需要1MB的棧空間,很容易導致無休止的創建線程導致內存溢出,另外就是頻繁的系統調用,導致的上下文切換,佔用了過多的CPU,反而起到了相反的作用。
- 功能受限:手動管理線程難以實現更高級的功能,如定時任務、週期任務、任務管理、併發任務數的控制等。
通過上面的問題,我們其實可以清晰的感知到這些問題都是歸攏到資源沒有得到合理的分配和控制導致的,線程池出現的核心宗旨其實就是對資源的合理分配和控制。除了線程池,其實更多的也接觸過數據庫連接池、netty的對象池等池化技術,這些池化思想其實都是為了更好的降低資源的消耗以及更好的進行資源管理。
三、JDK線程池的架構設計
3.1 JUC併發包下的Executor框架的uml類圖
- Executor:任務執行的頂層接口,主要是分離任務提交與執行邏輯,支持同步/異步執行,遵循Java內存模型的 happen-before規則。
- ExecutorService:繼承Executor接口,提供了更完善的生命週期管理能力,通過Future對象提供任務取消、狀態查詢、結果獲取能力實現了任務監控。
- AbstractExecutorService:常見的設計模式為了簡化線程池的開發,通常通過父類進行一些基礎的默認實現讓子類繼承。
- ScheduledExecutorService:ExecutorService的擴展接口,支持延遲執行和週期性任務調度。
- ThreadPoolExecutor:是ExecutorService接口最核心和最常用的實現類,它提供了高度可配置的線程池,允許我們精細控制線程池的各種行為。
- ScheduledThreadPoolExecutor:是ScheduledExecutorService接口的實現類,它繼承自ThreadPoolExecutor,專門用於處理定時和週期任務。
- Executors:一個靜態工廠模式的工具類,提供了一系列靜態方法來創建各種常見配置的線程池,newFixedThreadPool(), newCachedThreadPool(),等,簡化了創建線程池的使用但是會帶來一些問題,很多開發規範裏都不建議大家直接使用。JDK內置的線程池如果我們不熟悉裏面的參數很有可能導致出乎自己意料的結果,池大小設置、阻塞隊列選擇等等都是有考究的,這一點後續會進行一些詳細説明。生產環境中建議謹慎使用或直接使用ThreadPoolExecutor構造函數自定義。
3.2 ThreadPoolExecutor的參數解析
- corePoolSize 核心線程數:
-
- 線程池中還未退出的alive的核心線程數量。
- 雖然線程處於空閒狀態(其實是阻塞在阻塞隊列中),除非顯示設置了allowCoreThreadTimeOut=true,否則這些線程不會從自己的run方法中退出被回收。
- 添加新任務時,如果當前工作線程小於coreSize,此時即使存在空閒的core線程,線程池也會通過addWorker方法創建一個新的線程。
*
- maximumPoolSize 最大線程數:
-
- 線程池可以創建的最大線程數。
- 如果是有界隊列,當隊列滿時,仍然有任務進來,此時線程池會創建小於最大線程數的線程來完成任務,空閒。
- 如果是無界隊列,那麼永遠不會出現第二點的情況,除了內存異常,否則會一直保持核心線程數,多餘的任務會一直往隊列中加入。
<!---->
- keepAliveTime 線程空閒存活時間
-
- 線程數超過corePoolSize後創建的線程我們理解為非核心線程,對於這類線程,他的回收機制在於我們設置的keepAliveTime,線程會限期阻塞在隊列中獲取任務,如果超時未獲取就會進行清理並退出。
- 另外如果設置allowCoreThreadTimeOut=true,所謂的核心線程在空閒時間達到keepAliveTime時也會被回收。
<!---->
- unit 時間單位
-
- keepAliveTime參數的時間單位,TimeUnit中時分秒等。
*
- keepAliveTime參數的時間單位,TimeUnit中時分秒等。
- workQueue 任務隊列
-
- 阻塞隊列,核心線程數滿時,新加入的任務,會先添加到阻塞隊列中等待線程獲取任務並執行。
- 常用的BlockingQueue實現有:
-
- ArrayBlockingQueue:數組實現的先進先出原則的有界阻塞隊列,構造方法必須指定容量。
- LinkedBlockingQueue:鏈表實現的阻塞隊列,構造傳入容量則有界,未傳則是無界隊列,此時設置的最大線程數其實就不會有作用了。
- SynchronousQueue:一個不存儲元素的阻塞隊列。每個put操作必須等待一個take操作,反之亦然。它相當於一個傳遞通道,非常適合傳遞性需求,吞吐量高,但要求maximumPoolSize足夠大。
- PriorityBlockingQueue:二叉堆實現的優先級阻塞隊列,構造時可自行調整排序行為(小頂堆或大頂堆)。
- DelayQueue:支持延時的無界阻塞隊列,主要用於週期性的任務,我們可以直接通過它來實現一些簡單的延遲任務需求,複雜的週期性任務建議使用ScheduledThreadPoolExecutor。
<!---->
- threadFactory 線程工廠
-
- 用於創建新線程的工廠。通過自定義ThreadFactory,我們可以為線程池中的線程設置更有意義的名稱、設置守護線程狀態、設置線程優先級、指定UncaughtExceptionHandler等。
- Executors.defaultThreadFactory()是默認實現。
<!---->
- handler 拒絕策略
-
- ThreadPoolExecutor.AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。
- JDK內置了四種拒絕策略:
-
- ThreadPoolExecutor.AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。
- ThreadPoolExecutor.CallerRunsPolicy:提交任務的線程,直接執行任務。變相的背壓機制,可以降低任務往線程中加入。
- ThreadPoolExecutor.DiscardPolicy:直接丟棄被拒絕的任務,不做任何通知,需容忍數據丟失。
- ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列中最舊的任務,然後重試提交當前任務,需容忍數據丟失。
- 實現RejectedExecutionHandler接口自定義拒絕策略,在實際生產應用中推薦使用,可以做一些打印觀察日誌的操作,告警、兜底的相關處理等。
3.3 運行機制詳解
新任務通過execute()方法提交給ThreadPoolExecutor時,其處理流程如下:
判斷核心線程數:如果當前運行的線程數小於corePoolSize,則創建新線程(即使有空閒的核心線程)來執行任務。
嘗試入隊:如果當前運行的線程數大於或等於corePoolSize,則嘗試將任務添加到workQueue中。
- 如果workQueue.offer()成功(隊列未滿),任務入隊等待執行。
嘗試創建非核心線程:如果workQueue.offer()失敗(隊列已滿):
- 判斷當前運行的線程數是否小於maximumPoolSize;
- 如果是,則創建新的非核心線程來執行任務。
執行拒絕策略:
如果當前運行的線程數也達到了maximumPoolSize(即核心線程和非核心線程都已用盡,且隊列也滿了),則執行RejectedExecutionHandler所定義的拒絕策略。
參考網絡中的經典執行圖:
這個圖能很好的表明運行原理,但是忽略了很多細節,比如所謂的緩衝執行是在什麼條件下去走的呢?直接執行又是什麼邏輯下執行呢?最後的任務拒絕又是怎麼回事?帶着這些疑問點,我們直接來進行一個源碼級別的分析:
execute核心流程的源碼分析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//線程池狀態 高3位表示線程狀態 低29位代表線程數量
int c = ctl.get();
//判斷當前線程池線程數量是否小於核心線程數
if (workerCountOf(c) < corePoolSize) {
//作為核心線程數進行線程的創建,並且創建成功線程會將command的任務執行--》對應圖上的直接執行
if (addWorker(command, true))
return;
c = ctl.get();
}
//創建核心線程失敗或者當前線程數量超過核心線程數
//當前線程池是否還在運行狀態,嘗試將任務添加到阻塞隊列 --》對應圖上的緩衝執行
//BlockingQueue隊列的頂級抽象定義了offer不是進行阻塞添加而是立即返回,添加失敗直接返回false,區別於put
if (isRunning(c) && workQueue.offer(command)) {
//重新獲取線程池標誌位
int recheck = ctl.get();
//如果線程此時不在運行狀態中,那麼將任務刪除
if (! isRunning(recheck) && remove(command))
//刪除任務成功,走拒絕策略拒絕掉當前任務
reject(command);
else if (workerCountOf(recheck) == 0)
//如果線程池中的工作線程都沒有的時候,這裏需要創建一個線程去執行添加到隊列中的任務
//防止因為併發的原因工作線程都被終止掉了,此時任務在阻塞隊列裏等着,缺沒有工作線程
addWorker(null, false);
}
//到這裏那就是添加隊列失敗,或者線程池狀態異常,但是這裏仍然嘗試進行創建一個worker
//如果創建失敗,也是走拒絕策略拒絕當前任務
else if (!addWorker(command, false))
reject(command);
}
接下來我們仔細看看addWorker這個方法具體是在做什麼:
//核心邏輯其實就是在無限循環創建一個worker,創建失敗直接返回,創建成功,則將worker執行
// 因為worker有thread的成員變量,最終添加worker成功,會啓動線程的start方法
//start方法最終會回調到外層的runWorker方法,改方法會不停的從阻塞隊列裏以阻塞的take方式
//獲取任務,除非達到能被終止的條件,此時當前線程會終止
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//不停的重試添加worker的計數,只有添加成功的才會進行後續的worker啓動
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//重試期間,如果其他線程導致線程池狀態不一致了。重新回到第一個循環進行check判斷
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//這裏加鎖一個是workers.add時需要加鎖,另外是防止其他線程已經在嘗試修改線程池狀態了
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//將worker的引用添加到workers的hashSet中
workers.add(w);
int s = workers.size();
//更新線程池此時最大的線程數
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果添加成功,就啓動worker中的線程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
//這裏添加失敗的話,需要把線程池的count數進行--,並且要把worker引用從hashSer中移除
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
3.4 線程池的生命週期
在介紹運行機制原理的源碼分析時,其實是有提到線程池狀態這個概念的。介紹這個狀態其實也是讓大家更方便的去管理線程池,比如我們關閉線程池時,怎麼去優雅的關閉,使用不同的方法可能會有不同的效果,我們需要根據自己的業務場景去酌情分析、權衡使用。
//線程池的狀態和計數採用一個Integer變量設置的
//這裏之所以用一個變量來儲存狀態和數量,其實很有講究的,因為我們在上面的運行原理上可以看到
//源碼中有大量的進行狀態以及數量的判斷,如果分開採用變量的記錄的話,在維護二者一致性方面
//可能就需要加鎖的維護成本了,而且計算中都是位移運算也是非常高效的
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//線程池的大小由ctl低29位表示,現成狀態由ctl高3位表示
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 線程池的狀態通過簡單的位移就能計算出來,狀態只能從低到高流轉,不能逆向
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
// 這裏是獲取線程狀態以及獲取線程數量的簡單高效的位移方法
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
接下來結合源碼詳細介紹下線程池的5種狀態以及分別有什麼不同的表現行為?
先説下結論:
RUNNING 這個就是線程池運行中狀態,我們可以添加任務也可以處理阻塞隊列任務
SHUTDOWN 不能添加新的任務,但是會將阻塞隊列中任務執行完畢
STOP 不能添加新的任務,執行中的線程也會被打斷,也不會處理阻塞隊列的任務
TIDYING 所有線程都被終止,並且workCount=0時會被置為的狀態
TERMINATED 調用完鈎子方法terminated()被置為的狀態
shutdown狀態源碼分析:
//線程池關閉
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//循環cas設置線程池狀態,直到成功或狀態已經state>=SHUTDOWN
advanceRunState(SHUTDOWN);
//這個是真正得出結論的地方
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//打斷空閒的線程,如何判斷線程是否空閒還是運行?
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//worker的線程沒有被打斷過,並且能獲取到worker的aqs獨佔鎖
if (!t.isInterrupted() && w.tryLock()) {
try {
//打斷當前線程,如果線程在阻塞隊列中阻塞,此時會被中斷
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
STOP狀態分析
//循環cas修改線程池狀態為stop。打斷所有線程,取出阻塞隊列的所有任務
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//檢查線程的權限
checkShutdownAccess();
//將狀態case為stop
advanceRunState(STOP);
//打斷所有worker不管是不是正在執行任務
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
//這裏獲取鎖之後。打斷了所有的線程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
TIDYING、TERMINATED 狀態分析
//這個方法在每個線程退出時都會進行調用,如果是運行中、或者狀態大於等於TIDYING或者shutdown但是隊列不為空都
//直接返回,如果不滿足以上條件,並且線程數不為0的話,打斷一個空閒線程
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
//此時到這裏,狀態要麼為STOP。要麼是shutdown並且隊列為空了
// 獲取一個鎖,嘗試cas修改狀態為TIDYING
//調用terminated()的鈎子方法,
//修改線程池為終態TERMINATED,並且喚醒阻塞在termination隊列上的線程
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
四、JDK內置線程池的問題
java.util.concurrent.Executors工廠類提供了一些靜態方法,方便我們快速創建幾種預設配置的線程池:
- Executors.newFixedThreadPool(int nThreads):
-
- 創建一個固定大小的線程池。corePoolSize和maximumPoolSize都等於nThreads。
- keepAliveTime為0L(因為線程數不會超過corePoolSize,所以此參數無效,除非allowCoreThreadTimeOut為true)。
- 使用無界的LinkedBlockingQueue作為工作隊列。
- 問題:由於使用無界隊列,當任務提交速度遠大於處理速度時,隊列會持續增長,可能導致內存溢出(OOM)。此時maximumPoolSize參數實際上是無效的,線程數永遠不會超過nThreads。
<!---->
- Executors.newSingleThreadExecutor():
-
- 創建一個只有一個工作線程的線程池。corePoolSize和maximumPoolSize都為1。
- 同樣使用無界的LinkedBlockingQueue。
- 保證所有任務按照提交順序(FIFO)執行。
- 問題:與newFixedThreadPool類似,無界隊列可能導致OOM。
<!---->
- Executors.newCachedThreadPool():
-
- 創建一個可緩存的線程池。
- corePoolSize為0。
- maximumPoolSize為Integer.MAX\_VALUE (幾乎是無界的)。
- keepAliveTime為60秒。
- 使用SynchronousQueue作為工作隊列。這種隊列不存儲元素,任務提交後必須有空閒線程立即接收,否則會創建新線程(如果未達到maximumPoolSize)。
- 問題:如果任務提交速度過快,會創建大量線程(理論上可達Integer.MAX\_VALUE個),可能耗盡系統資源,導致OOM以及頻繁的上下文切換。
<!---->
- Executors.newSingleThreadScheduledExecutor()、Executors.newScheduledThreadPool(int corePoolSize):
-
- 創建用於調度任務的線程池。
- 內部使用ScheduledThreadPoolExecutor實現,其任務隊列是DelayedWorkQueue (一種特殊的PriorityQueue)。
- newSingleThreadScheduledExecutor的corePoolSize為1,maximumPoolSize為Integer.MAX\_VALUE(但由於隊列是DelayedWorkQueue,通常不會無限增長線程,除非有大量同時到期的任務且處理不過來)。
- newScheduledThreadPool可以指定corePoolSize。
- 問題:雖然DelayedWorkQueue本身是無界的,但ScheduledThreadPoolExecutor在任務執行邏輯上與普通ThreadPoolExecutor有所不同。主要風險仍然是如果corePoolSize設置不當,且大量任務同時到期並執行緩慢,可能導致任務積壓。
!
某一線互聯網Java開發手冊
五、線程池中的問題與最佳實踐
5.1 invokeAll 超時機制無效?
ExecutorService.invokeAll(Collection\<? extends Callable<T>> tasks, long timeout, TimeUnit unit)方法會提交一組Callable任務,並等待所有任務完成,或者直到超時。如果超時發生,它會嘗試取消(中斷)所有尚未完成的任務,然後返回一個List<Future>。
失效場景分析:
- 任務不響應中斷(最常見):任務內部捕獲 InterruptedException 後靜默處理,或執行不檢查中斷狀態的阻塞操作(如循環計算):
<!---->
Callable<String> task = () -> {
while (true) {
//缺少此檢查將導致超時失效
if (Thread.interrupted()) break;
// 耗時計算...
}
return "done";
};
- 使用非響應中斷的API:任務調用了不響應 interrupt() 的第三方庫或JNI代碼(如某些IO操作)
<!---->
Callable<Integer> task = () -> {
Files.copy(in, path); // 某些NIO操作不響應中斷
return 1;
};
- 任務依賴外部資源阻塞:任務因外部資源(如數據庫連接、網絡請求)阻塞且未設置超時。
<!---->
Callable<Result> task = () -> {
//未設查詢超時時間
return jdbcTemplate.query("SELECT * FROM large_table");
};
- 線程池配置缺陷:核心線程數過大或隊列無界,導致 invokeAll 超時前任務無法全部啓動,任務堆積在隊列,invokeAll 超時後仍有大量任務未執行。
<!---->
new ThreadPoolExecutor(
100, 100, // 核心線程數過大
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>() // 無界隊列
);
invokeAll超時失效demo:
import java.util.*;
import java.util.concurrent.*;
public class InvokeAllTimeoutDemo {
// 模擬耗時任務(可配置是否響應中斷)
static class Task implements Callable<String> {
private final int id;
private final long durationMs;
private final boolean respectInterrupt;
Task(int id, long durationMs, boolean respectInterrupt) {
this.id = id;
this.durationMs = durationMs;
this.respectInterrupt = respectInterrupt;
}
@Override
public String call() throws Exception {
System.out.printf("Task %d started%n", id);
long start = System.currentTimeMillis();
// 模擬工作(檢查中斷狀態)
while (System.currentTimeMillis() - start < durationMs) {
if (respectInterrupt && Thread.interrupted()) {
throw new InterruptedException("Task " + id + " interrupted");
}
// 不響應中斷的任務會繼續執行
}
System.out.printf("Task %d completed%n", id);
return "Result-" + id;
}
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(2);
List<Callable<String>> tasks = Arrays.asList(
new Task(1, 2000, true), // 2秒,響應中斷
new Task(2, 10000, false) // 10秒,不響應中斷
);
System.out.println("Invoking with 3s timeout...");
try {
//設置3秒超時
List<Future<String>> futures = executor.invokeAll(tasks, 3, TimeUnit.SECONDS);
for (Future<String> f : futures) {
// 明確處理取消狀態
if (f.isCancelled()) {
System.out.println("Task was cancelled");
} else {
try {
System.out.println("Result: " + f.get(100, TimeUnit.MILLISECONDS));
} catch (TimeoutException | ExecutionException e) {
System.out.println("Task failed: " + e.getCause());
}
}
}
} finally {
executor.shutdownNow();
System.out.println("Executor shutdown");
}
}
}
當我們使用invokeAll(tasks, timeout) 提交多個任務時,如果出現某個任務對中斷不響應或者響應不及時,那我們即使設置了超時時間,不響應中斷的任務2仍在後台運行(即使調用了 shutdownNow())
5.2 submit()的異常消失了?
使用ExecutorService.submit()提交任務時,任務執行過程中如果拋出未捕獲的異常(無論是受檢異常還是運行時異常),這個異常會被Future的包裝類如FutureTask重寫的run()方法捕獲並封裝在返回的Future包裝對象的成員變量中。
- 不顯示調用Future.get(),該異常我們就無法感知,好像沒有發生過一樣。線程池的工作線程本身通常會有一個默認的未捕獲異常處理器,可能會打印堆棧到控制枱,但你的主業務邏輯不會知道。
- 顯示調用Future.get(),拋出聲明式的ExecutionException,其cause屬性才是原始的任務異常。
- 如果調用Future.get(long timeout, TimeUnit unit)超時,向外拋出聲明式的TimeoutException。此時任務可能仍在後台執行,可能錯過了內部的異常。
submit()異常消失demo:
public class ThreadPoolExceptionDemo {
public static void main(String[] args) {
// 創建單線程線程池(便於觀察異常)
ExecutorService executor = Executors.newSingleThreadExecutor();
// 場景1:Callable拋出異常(通過Future.get()捕獲)
Future<String> future1 = executor.submit(() -> {
System.out.println("[Callable] 開始執行");
Thread.sleep(100);
throw new RuntimeException("Callable故意拋出的異常");
});
try {
System.out.println("Callable結果: " + future1.get());
} catch (ExecutionException e) {
System.err.println("捕獲到Callable異常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 場景2:Runnable拋出異常(同樣通過Future.get()捕獲)
Future<?> future2 = executor.submit(() -> {
System.out.println("[Runnable] 開始執行");
throw new IllegalArgumentException("Runnable故意拋出的異常");
});
try {
future2.get(); // Runnable成功時返回null
} catch (ExecutionException e) {
System.err.println("捕獲到Runnable異常: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 場景3:未處理的任務異常(需設置異常處理器)
executor.submit(() -> {
System.out.println("[未捕獲的任務] 開始執行");
throw new IllegalStateException("這個異常會被默認處理器處理");
});
executor.shutdown();
}
}
5.3 異常處理實踐
- Callable/Runnable catch處理異常:
-
- 不要捕獲Throwable或Exception然後靜默處理(只打日誌) 。如果確實需要捕獲,請考慮是否應該重新拋出(包裝成業務允許的受檢異常或運行時異常)。
- 禁止靜默處理 InterruptedException:
- 在JDK的JUC底層源碼中,我們可以看到很多聲明瞭InterruptedException的方法,基本上都是對這類方法catch異常,要麼繼續往外拋出,或者處理完相關資源後,重置中斷狀態,絕對不要靜默處理。
- 如果方法沒有聲明InterruptedException如Runnable.run(),在catch InterruptedException後最好調用Thread.currentThread().interrupt()來恢復中斷標記。
- 正確處理中斷:callable在耗時的loop任務處理中,如果出現了中斷異常,因為Java代碼中中斷只是一種協作方式,其並沒真的終止線程,所以一般都是需要我們進行一箇中斷標誌的傳遞,如線程池中的shutdownNow()就依賴次機制處理。
<!---->
- submit()執行的任務,謹慎處理Future:
-
- 使用帶過期時間的future.get(long timeOut)獲取結果,並要對該方法進行try cache防止其他異常拋出。
- 多個任務並行處理時,如果有下個請求依賴上個請求,務必使用get()讓主線程等待這一結果執行完成後,流轉到下一個異步任務。
<!---->
- 實現線程Thread的UncaughtExceptionHandler屬性,在自定義的TheadFactory中通過set方法賦值:execute()方法執行時,對於沒有捕獲的異常使用線程組的兜底統一處理機制。
<!---->
//自定義當前線程組創建線程的統一異常處理,類似於controller的統一異常處理機制
ThreadFactory myThreadFactory = new ThreadFactory() {
private final AtomicInteger atomicInteger = new AtomicInteger(0);
private final String threadNamePrefix = "myThreadFactory-";
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r,threadNamePrefix + atomicInteger.getAndIncrement());
t.setUncaughtExceptionHandler((thread, throwable) -> {
//異常的統一處理,日誌打印、兜底處理、監控、資源釋放等
System.err.println("線程[" + thread.getName() + "]異常: " + throwable);});
return t;
}
};
//構造方法時使用自定義的線程工廠
ExecutorService executor = new ThreadPoolExecutor(
corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory,
handler
);
- 使用自定義線程池時建議重寫鈎子方法afterExecute(Runnable r, Throwable t) :這個hook方法是用來解決當前任務線程發生的異常,默認是空實現,我們可以重寫他,比如進行兜底的線程繼續執行,打印日誌記錄,以及同步失敗使用兜底異步處理等等方式。還要注意釋放應用中的資源,比如文件鎖的佔用等,最好手動釋放掉,避免底層操作系統線程對這類資源釋放失敗導致長期佔用,最後只能重啓Java進程的尷尬地步。
<!---->
public class MyThreadPoolExecutor extends ThreadPoolExecutor {
public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
//需要特別注意任務是否為submit提交,如果是execute提交的任務,那這裏很直接的知道任務是否發生異常以及後續去怎麼處理
if(r instanceof Future){
if(((Future<?>) r).isDone() || ((Future<?>) r).isCancelled()){
//繼續使用主線程完成任務,一般不建議,最好使用兜底方式:例如異步發消息,由後續的消費組統一處理異常的任務
}
}else if( t != null){
//execute異常處理
}
}
}
//FutureTask 把run方法進行了重寫,並且catch住了異常,所以説afterExecute的t 如果是submit提交的方式
//那麼t基本上就是null
public void run() {
//....
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
//...
}
afterExecute可以借鑑的示例:
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import org.slf4j.*;
public class RobustThreadPool extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(RobustThreadPool.class);
private final AtomicLong failureCounter = new AtomicLong();
private final RetryPolicy retryPolicy; // 重試策略
private final ThreadLocal<Long> startTime = new ThreadLocal<>();
public RobustThreadPool(int corePoolSize, int maxPoolSize,
BlockingQueue<Runnable> workQueue,
RetryPolicy retryPolicy) {
super(corePoolSize, maxPoolSize, 60L, TimeUnit.SECONDS, workQueue);
this.retryPolicy = retryPolicy;
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
logger.debug("開始執行任務: {}", r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
// 1. 異常分類處理
if(r instanceof Future){
if(((Future<?>) r).isDone()){
//錯誤記錄以及異常處理
failureCounter.incrementAndGet();
handleFailure(r, t, costTime);
}
}else if( t != null){
//execute異常處理
failureCounter.incrementAndGet();
handleFailure(r, t, costTime);
}
// 2. 資源清理
cleanThreadLocals();
}
private void handleFailure(Runnable r, Throwable t) {
// 1. 異常類型識別
if (t instanceof OutOfMemoryError) {
logger.error("JVM內存不足,終止任務: {}", t.getMessage());
System.exit(1); // 嚴重錯誤直接終止
}
// 2. 可重試異常處理
else if (isRetryable(t)) {
int retryCount = retryPolicy.getCurrentRetryCount(r);
if (retryCount < retryPolicy.getMaxRetries()) {
logger.warn("任務第{}次失敗,準備重試...",
retryCount + 1, t);
retryPolicy.retry(r, this);
} else {
logger.error("任務超過最大重試次數({}),轉入死信隊列",
retryPolicy.getMaxRetries(), t);
DeadLetterQueue.add(r, t);
}
}
// 3. 不可重試異常
else {
logger.error("不可恢復任務失敗", t);
Metrics.recordFailure(t.getClass()); // 上報監控
}
}
private boolean isRetryable(Throwable t) {
return t instanceof IOException ||
t instanceof TimeoutException ||
(t.getCause() != null && isRetryable(t.getCause()));
}
private void cleanThreadLocals() {
// 清理可能的內存泄漏
try {
ThreadLocal<?>[] holders = { /* 其他ThreadLocal */};
for (ThreadLocal<?> holder : holders) {
holder.remove();
}
} catch (Exception e) {
logger.warn("清理ThreadLocal失敗", e);
}
}
// 重試策略嵌套類
public static class RetryPolicy {
private final int maxRetries;
private final long retryDelayMs;
private final Map<Runnable, AtomicInteger> retryMap = new ConcurrentHashMap<>();
public RetryPolicy(int maxRetries, long retryDelayMs) {
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
public void retry(Runnable task, Executor executor) {
retryMap.computeIfAbsent(task, k -> new AtomicInteger()).incrementAndGet();
if (retryDelayMs > 0) {
executor.execute(() -> {
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ignored) {}
executor.execute(task);
});
} else {
executor.execute(task);
}
}
public int getCurrentRetryCount(Runnable task) {
return retryMap.getOrDefault(task, new AtomicInteger()).get();
}
public int getMaxRetries() {
return maxRetries;
}
}
}
異常處理小結:要特別注意使用future.get()方法時,我們一定要注意設置超時時間,防止主線程無限期的阻塞避免邊緣的業務查詢影響了主業務造成得不償失的效果,另外我們需要注意一個點就是submit()方法的提交任務時,afterExecute(Runnable r, Throwable t)中的t恆為null,如果是execute方法提交的任務,那麼就是直接獲取的任務執行的異常,對於submit提交的任務異常其被封裝到了Futrure 包裝對象中,一般需要我們再次判斷任務時執行完畢還是異常或被取消了,如果發生了異常,Future.get()會拋出封裝的ExecutionException異常,當然還可能是取消異常以及中斷異常。invokeAll和invokeAny我們需要對返回的Future結果檢查可能拋出的異常,對於callable 前面一再強調了要對InterruptedException不要靜默處理,因為線程的中斷標記只是一個協作方式,他並沒有停止當前線程的運行,我們需要根據自身的場景對發生的中斷進行快速響應以及傳遞中斷標誌。
5.4 拒絕策略實踐
先帶大家回顧一下策略是如何觸發執行的流程:
//添加任務,當不滿足條件時會執行拒絕方法reject
public void execute(Runnable command) {
//...
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
//這裏就是拒絕的入口。handler是有構造方法傳入
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//....
//指定拒絕策略
this.handler = handler;
}
AbortPolicy:默認的拒絕策略,簡單粗暴,當execute中添加woker失敗時,直接在當前線程拋出異常。
public static class AbortPolicy implements RejectedExecutionHandler {
//直接拋出RejectedExecutionException
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
優點:快速失敗,立即暴露系統過載問題、避免任務靜默丟失,便於監控系統捕獲
缺點:需要調用方顯式處理異常,增加代碼複雜度,可能中斷主業務流程
適用場景:適用於那些對任務丟失非常敏感,配合熔斷機制使用的快速失敗場景
CallerRunsPolicy:提交任務的線程,直接執行任務
public static class CallerRunsPolicy implements RejectedExecutionHandler {
//直接在提交任務的線程中執行任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
優點:任務都會被執行,不會丟任務,並且由於主線程執行任務,天然的流量控制,避免了大量的任務進入線程池。
缺點:調用線程可能被阻塞,導致上游服務雪崩。不適合高併發場景(可能拖垮整個調用鏈)。
適用場景:適用於處理能力不高,並且資源過載能夠平滑過渡,同時不丟失任務的場景。如:低併發、高可靠性的後台任務(如日誌歸檔)、允許同步執行的批處理系統。
DiscardPolicy:直接丟棄被拒絕的任務,不做任何通知。
public static class DiscardPolicy implements RejectedExecutionHandler {
//空實現
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
優點:實現簡單,無額外性能開銷。避免異常傳播影響主流程
缺點:數據靜默丟失,可能會掩蓋系統容量問題
適用場景:邊緣業務的監控上報數據,統計類的uv、pv統計任務
DiscardOldestPolicy:丟棄隊列中最舊的任務,然後重試提交當前任務。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
//丟棄隊列中最舊的任務,重試當前任務
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
優點:優先保證新任務執行,避免隊列堆積導致內存溢出。
缺點:可能丟失關鍵舊任務、任務執行順序無法保證。
適用場景:適用於可容忍部分數據丟失,並且實時性要求高於歷史數據的場景,比如:行情推送。
通過上線的介紹,我們可以看到JDK內置策略基本上只使用於簡單處理的場景,在生產實踐中一般推薦我們自定義拒絕策略,進行相關的業務處理。
1. 自定義RejectedExecutionHandler:
/**
* 帶監控統計的拒絕策略處理器
*/
public class MetricsRejectedExecutionHandler implements RejectedExecutionHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MetricsRejectedExecutionHandler.class);
// 統計被拒絕的任務數量
private final AtomicLong rejectedCount = new AtomicLong(0);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 1. 採集線程池關鍵指標
int poolSize = executor.getPoolSize();
int activeThreads = executor.getActiveCount();
int corePoolSize = executor.getCorePoolSize();
int maxPoolSize = executor.getMaximumPoolSize();
int queueSize = executor.getQueue().size();
long completedTasks = executor.getCompletedTaskCount();
// 2. 遞增拒絕計數器
long totalRejected = rejectedCount.incrementAndGet();
// 3. 輸出警告日誌(包含完整指標)
logger.warn("""
任務被拒絕執行!線程池狀態:
|- 活躍線程數/當前線程數: {}/{}
|- 核心/最大線程數: {}/{}
|- 隊列大小: {}
|- 已完成任務數: {}
|- 歷史拒絕總數: {}
|- 被拒絕任務: {}
""",
activeThreads, poolSize,
corePoolSize, maxPoolSize,
queueSize,
completedTasks,
totalRejected,
r.getClass().getName());
// 4. 可選:降級處理(如存入數據庫等待重試)
// fallbackToDatabase(r);
// 5. 拋出RejectedExecutionException(保持默認行為)
throw new RejectedExecutionException("Task " + r.toString() + " rejected");
}
// 獲取累計拒絕次數(用於監控)
public long getRejectedCount() {
return rejectedCount.get();
}
}
- 記錄日誌並告警:所有的異常處理中,最常見簡單的方式無外乎,先記錄個日誌,然後有告警系統的進行相關的某書、某信以及短信等的告警信息推送,方便開發人員以及運維人員的及時發現問題並介入處理。
- 兜底處理機制:一般常見的就是通過異步的方式提交到MQ,然後統一進行兜底處理。
- 帶超時和重試的拒絕:可以嘗試等待一小段時間,或者重試幾次提交,如果仍然失敗,再執行最終的拒絕邏輯(如告警、持久化或拋異常)。
- 動態調整策略:根據系統的負載或任務類型,動態的執行兜底策略機制,就如前面寫的源碼示例方式。
2. 根據自身業務場景選擇合適的拒絕策略:
- 核心業務,不容丟失:如果任務非常重要,不能丟失,可以考慮:
-
- CallerRunsPolicy:調用線程承擔任務執行壓力,是否可支撐;
- 自定義策略:嘗試持久化到MQ或DB,然後由專門的消費組補償任務處理;
- AbortPolicy:如果希望系統快速失敗並由上層進行重試或熔斷。
*
- 非核心業務,可容忍部分丟失:
-
- DiscardOldestPolicy:新任務更重要時,如行情推送;
- DiscardPolicy:邊緣業務場景,比如一些pv統計等,丟失了無所謂;
- 及時的進行監控查看,瞭解任務的丟失情況。
3. 結合線程池參數綜合考慮:
- 拒絕策略的選擇也與線程池的隊列類型(有界/無界)、隊列容量、maximumPoolSize等參數密切相關。
- 如果使用無界隊列LinkedBlockingQueue的無參構造,只有機器內存不夠時才會進行拒絕策略,不過這種極端場景已經不是影響線程池本身,內存不夠可能導致Java進程被操作系統直接kill可能。
- 如果使用有界隊列,需要權衡隊列的大小,核心場景甚至可以動態追蹤阻塞隊列大小,以及動態調整隊列大小來保證核心業務的正常流轉。
- 充分測試和監控:無論選擇哪種策略,都必須在壓測環境中充分測試其行為,並在線上環境建立完善的監控體系,監控線程池的各項指標(活躍線程數、隊列長度、任務完成數、任務拒絕數等)。當拒絕發生時,應有相應的告警通知。
拒絕策略小結:
策略的選擇跟我們大多數的系統設計哲學是保持一致的,都是在應對不同的場景中,做出一定的trade off。最好的策略需要根據業務場景、系統容忍度、資源等方面的綜合考量,一個黃金的實踐原則:拒絕事件做好監控告警、根據業務SLA定義策略,是否可丟失,快速失敗等,定期的進行壓力測試,驗證策略的有效性。
5.5 池隔離實踐
核心思想:根據任務的資源類型 、優先級和業務特性 ,劃分多個獨立的線程池,避免不同性質的任務相互干擾。
1. 隔離維度:
- 資源類型:CPU密集型 vs I/O密集型任務
- 執行時長:短時任務(毫秒級) vs 長時任務(分鐘級)
- 實時性要求:高實時性 vs 可延遲(最終一致即可)
- 業務重要性:支付交易(高優先級) vs 日誌清理(低優先級)
- 是否依賴外部資源:例如,訪問特定數據庫、調用特定第三方API的任務可以歸為一類。
2. 不同業務場景線程池獨立使用:在不同的業務場景下,為自己的特定業務,創建獨立的線程池。
- 線程命名:通過ThreadFactory為每個線程池及其線程設置有意義的名稱,例如netty-io-compress-pool-%d,excel-export-pool-%d, 主要方便區別不同的業務場景以及問題排查。
- 參數調優:不同的業務場景設置不同的參數。
-
- corePoolSize, maximumPoolSize:CPU密集型的計算任務可以設置小點減少上下文的切換,I/O密集型可以較大,在io阻塞等待期間,多去處理其他任務。
- 阻塞隊列blockQueue:選擇合適的隊列類型,以及設置合理的隊列大小。
- RejectedExecutionHandler:有內置的四種的策略以及自定義策略選擇,一般建議做好日誌、監控以及兜底的處理。
*
3. 自定義Executor避免線程池共用
// 創建CPU密集型任務線程池(線程數=CPU核心數)
ExecutorService cpuIntensiveExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(), // 核心線程數=CPU核心數
Runtime.getRuntime().availableProcessors(), // 最大線程數=CPU核心數
30L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new ThreadFactoryBuilder()
.setNameFormat("cpu-pool-%d")
.setPriority(Thread.MAX_PRIORITY) // 提高優先級
.build(),
new ThreadPoolExecutor.AbortPolicy() // 直接拒絕
);
// 使用示例
CompletableFuture.supplyAsync(() -> {
// 矩陣計算等CPU密集型任務
double[][] result = matrixMultiply(largeMatrixA, largeMatrixB);
return result;
}, cpuIntensiveExecutor)
.thenAccept(result -> {
System.out.println("計算結果維度: " + result.length + "x" + result[0].length);
});
線程池隔離小結:
專池專用的本質是通過物理隔離實現:
- 資源保障 :關鍵業務獨佔線程資源
- 故障隔離 :避免級聯雪崩
- 性能優化 :針對任務類型最大化吞吐量
最終呈現的效果是像專業廚房的分區(切配區/炒菜區/麪點區)一樣,讓每個線程池專注處理同類任務,提升整體效率和可靠性。
六、總結
線程池是Java併發編程的核心組件,通過複用線程減少資源開銷,提升系統吞吐量。其核心設計包括線程複用機制 、任務隊列和拒絕策略 ,通過ThreadPoolExecutor的參數(核心線程數、最大線程數、隊列容量等)實現靈活的資源控制。線程池的生命週期由RUNNING、SHUTDOWN等狀態管理,確保任務有序執行或終止。
內置線程池(如Executors.newCachedThreadPool)雖便捷,但存在內存溢出或無界隊列堆積的風險,需謹慎選擇。invokeAll的超時失效和submit提交任務的異常消失是常見陷阱需通過正確處理中斷和檢查Future.get()規避。
最佳實踐包括:
- 異常處理:通過afterExecute來對發生的異常進行兜底處理,任務細粒度的try catch或UncaughtExceptionH捕獲異常處理防止線程崩潰退出;
- 拒絕策略:根據業務選擇拒絕策略或自定義降級邏輯,生產級應用建議儘量自定義處理;
- 線程隔離 :按任務類型(CPU/I/O)或優先級劃分線程池,避免資源競爭。
合理使用線程池能顯著提升性能,但需結合業務場景精細調參,確保穩定性和可維護性,希望這篇文章能給大家帶來一些生產實踐上的指導,減少一些因為不熟悉線程池相關原理生產誤用導致的一些問題。
往期回顧
1. 基於瀏覽器擴展 API Mock 工具開發探索|得物技術
2. 破解gh-ost變更導致MySQL表膨脹之謎|得物技術
3. MySQL單表為何別超2000萬行?揭秘B+樹與16KB頁的生死博弈|得物技術
4. 0基礎帶你精通Java對象序列化--以Hessian為例|得物技術
5. 前端日誌回撈系統的性能優化實踐|得物技術
文 /捨得
關注得物技術,每週更新技術乾貨
要是覺得文章對你有幫助的話,歡迎評論轉發點贊~
未經得物技術許可嚴禁轉載,否則依法追究法律責任。