ThreadPoolExecutor以BlockingQueue存儲待執行任務,包括SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue,今天的目的是源碼角度深入研究SynchronousQueue。
之後計劃是繼續研究LinkedBlockingQueue和ArrayBlockingQueue,搬開所有絆腳石之後再開始線程池。
基本概念#BlockingQueue
BlockingQueue是SynchronousQueue的爹,他們的祖先是Queue,所以他們都會遵從Queue的一些基本邏輯:比如按順序存入數據、按順序(FIFO或者LIFO)取出數據,都是從隊首(head)獲取數據,FIFO隊列新數據從隊尾入隊、LIFO隊列隊列新數據入隊首。
對於BlockingQueue,我們還是認真看一下他的javaDoc:
BlockingQueue是一個特殊的Queue:當獲取數據的時候阻塞等待隊列非空,存入數據的時候阻塞等待隊列可用(有界隊列未滿)。
BlockingQueue的方法(獲取數據或者存入數據)不能立即成功、但是將來某一時間點可能會成功的情況下,有四種不同的處理方式:一種是拋出異常,第二種是返回特殊值(空值或者false),第三種是無限期阻塞當前線程知道成功,第四種是阻塞等待設定的時間。
具體如下表:
| Throws exception | Special value | Blocks | Times out | |
| Insert | add add(e) | offer offer(e) | put put(e) | offer(Object, long, TimeUnit) offer(e, time, unit) |
| Remove | #remove remove() | #poll poll() | #take take() | #poll(long, TimeUnit) poll(time, unit) |
| Examine | #element element() | #peek peek() | not applicable | not applicable |
BlockingQueue不接受空值null,嘗試寫入null會拋出異常,因為BlockingQueue用null表示操作失敗。
BlockingQueue可以是有界的也可以是無界的,有界隊列維護剩餘容量屬性remainingCapacity,超出該屬性後會阻塞寫入操作。
無界隊列沒有容量限制,始終維護remainingCapacity為Integer.MAX_VALUE
BlockingQueue是線程安全的,BlockingQueue的實現類的方法都是原子操作、或者通過其他併發控制方式實現線程安全性。
基本概念SynchronousQueue
SynchronousQueue是一個每次寫入數據都必須等待其他線程獲取數據(反之亦然)的BlockingQueue。SynchronousQueue沒有容量的概念、一條數據都不能存儲。你不能對SynchronousQueue隊列執行peek操作因為只有執行remove操作才能獲取到數據,只有其他線程要remove數據的時候你才能插入數據,你也不能進行迭代因為他根本就沒有數據。隊列的隊首數據就是第一個寫入數據的線程嘗試寫入的數據,如果沒有寫入線程則隊列中就沒有數據可獲取,poll()方法會返回null。對於集合類的其他方法、比如contains,SynchronousQueue的返回等同於空集合。
SynchronousQueue不允許空(null)元素。
通過構造參數設置fairness為true提供公平隊列,公平隊列遵從FIFO(先進先出)。
基本概念 Transferer
Transferer是SynchronousQueue的內部類,他的JavaDoc也很長:
Transferer擴展實現了W. N. Scherer III和M. L.Scott在"Nonblocking Concurrent Objects with Condition Synchronization"中描述的雙棧(dual stack)或者雙隊列(dual queue)算法。
後進先出(Lifo)棧用來實現非公平模式,先進先出(Fifo)隊列用來實現公平模式。兩者的性能是一樣的,一般情況下Fifo支持大吞吐量、Lifo maintains higher thread locality(抱歉,沒搞懂什麼意思)。
dual queue(或dual stack)在任一時間要麼持有數據(寫入操作提供的)、要麼持有請求(獲取數據的請求),向正好持有數據的隊列請求數據、或者向持有請求的隊列寫入數據被稱之為"fulfill"。最有趣的特性是對隊列的任何操作都能知道隊列當時處於什麼狀態,因此操作不必要上鎖。
queue和stack都擴展自虛擬類Transferer,Transferer定義了一個方法transfer,可以同時實現put和take功能。把put和take統一在一個transfer方法中的原因是dual數據結構使得put和take操作是對稱操作,所以兩個方法的大部分代碼都可以被合併。
好了,有關JavaDoc描述的特性就到這裏了,SynchronousQueue的JavaDoc很不容易理解,代碼也是。
開始分析源碼:
- 構造函數:決定SynchronousQueue底層數據結構是用Queue還是Stack
- 由於SynchronousQueue是比較特殊的:不存儲數據的(JavaDoc提到過),所以需要明確的是相關集合方法的返回也相應比較特殊,比如size=0,isEmpty=true等等,這部分源碼就不看了,特別簡單
- 隊列存、取數據的方法,最終調用的都是Transferer的tranfer方法,所以我們主要就看這個方法
SynchronousQueue構造方法
提供一個有參構造方法接收一個布爾量fair,我們前面説過,Queue是公平的、Stack是非公平的,所以fair=true的話創建TransferQueue,否則創建TransferStack。
TransferStack#SNode
TransferStack(TransferQueue也一樣)的源碼雖然不多,但是必須首先了解清楚他的數據結構,否則不太容易讀懂。
節點SNode:也就是存儲到棧內的內容,注意我這裏沒有説存儲在棧內的數據而是説內容,是因為TransferStack的特殊性導致説數據容易引起誤解:棧內有兩種類型的節點,一種是“data”,可以理解為“生產者”放到棧內等得消費者消費的數據,另一種是“request”,可以理解為消費者的消費請求,也就是説請求和數據都會入棧,都屬於“節點”。
TransferStack通過內部類SNode定義節點,主要屬性:
static final class SNode {
volatile SNode next; // next node in stack
volatile SNode match; // the node matched to this
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
next:下一節點。
match:當前節點的匹配節點,比如一個請求數據的Request節點入棧後,正好有一個data節點入棧,他們兩個如果匹配成功的話,match就是對方節點。
waiter:如果當前節點即使在自旋等待後仍然沒有被匹配,比如一個請求線程發送獲取數據的請求後,該請求會以請求節點(Request節點)入棧,始終沒有數據送進來,則當前節點的waiters就記錄為當前線程,之後當前線程自己掛起,等待匹配。這個等待匹配的過程是被動的,只能被另外一個data線程送進來的data節點匹配,匹配之後data線程通過Request節點的waiters獲取到其對應的線程後喚醒該線程。
item:data類的節點,記錄送進來的待消費的數據,Request類的節點,item為null。
mode:當前節點的mode,有三個mode:data mode表示當前節點是數據節點(生產者發來的),Request mode表示當前節點是請求節點(消費者發來的),還有一個比較特殊的mode是:匹配中的數據節點或匹配中的請求節點,這個mode後面分析tranfer代碼的時候再説。
head:頭節點,棧結構嘛,入棧節點始終是頭節點,也只有頭節點具有正常出棧的權限。
SNode提供了幾個原子性的操作:
- casNext:cas方式替換當前節點的下一節點
- tryCancel;這個實現比較特殊:當前節點的match如果為null的話則將match指向自己。用這種方式表示該節點被calcel
- casHead:cas的方式修改頭節點,其實就是入棧或出棧操作
TransferStack#transfer
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
//如果e為null的話就是REQUEST操作,否則就是DATA操作
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
//取頭節點(首節點)h
SNode h = head;
//空棧,或者首節點mode與當前操作的mode相同,説明當前節點與首節點不可能匹配了
if (h == null || h.mode == mode) { // empty or same-mode
//時間到,等不了了
if (timed && nanos <= 0) { // can't wait
//首節點被calcel了
if (h != null && h.isCancelled())
//首節點出棧
casHead(h, h.next); // pop cancelled node
//既然等不及了,就返回null
else
return null;
//否則,當前節點入棧,如果入棧成功,s就是首節點了
} else if (casHead(h, s = snode(s, e, h, mode))) {
//調用awaitFulfill阻塞等待匹配節點
SNode m = awaitFulfill(s, timed, nanos);
//阻塞等待調用結果如果是s的話,説明s被取消了
if (m == s) { // wait was cancelled
clean(s);
return null;
}
//否則就是阻塞等待後匹配成功了,那麼判斷如果頭節點不空並且下一節點是s的話
//説明除了等來一個匹配節點之外,沒有其他節點加入,那麼這一對兒匹配節點都出棧
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
//成功匹配,可以返回了
return (E) ((mode == REQUEST) ? m.item : s.item);
}
//否則,存在頭節點並且當前節點mode不同可以匹配,並且頭節點尚未被其他線程匹配
} else if (!isFulfilling(h.mode)) { // try to fulfill
//如果頭節點已經被取消,則出棧
if (h.isCancelled()) // already cancelled
casHead(h, h.next); // pop and retry
//否則當前節點以FULLFILLING模式入棧,s變為首節點
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
//一直循環直到成功匹配或棧內的等待節點突然消失
for (;;) { // loop until matched or waiters disappear
//m為s的下一節點
SNode m = s.next; // m is s's match
//m空,説明以前的等待節點突然消失,比如等待節點超時取消
if (m == null) { // all waiters are gone
//清空棧
casHead(s, null); // pop fulfill node
//清空s,重新進入主循環
s = null; // use new node next time
break; // restart main loop
}
//否則,可以開始匹配了,mn為m的下一節點,為出棧做好準備
SNode mn = m.next;
//如果m和s能匹配成功,則m和s都出棧,返回結果
if (m.tryMatch(s)) {
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
//匹配沒有成功,這種情況應該是m超時取消掉了,則m出棧
} else // lost match
s.casNext(m, mn); // help unlink
}
}
//否則,棧首節點處於匹配中FULLFILLING的狀態(其他線程正在匹配但是尚未完成)
//這種情況下,新來的節點不入棧,先協助完成棧首節點的匹配
} else { // help a fulfiller
SNode m = h.next; // m is h's match
//首節點的下一節點為空(被取消了),則清空棧
if (m == null) // waiter is gone
casHead(h, null); // pop fulfilling node
else {
//否則,去匹配首節點h和他的下一節點m,如果匹配成功了則h和m出棧
//這種情況下是不需要返回,因為是協助其他線程完成匹配,自己的匹配任務尚未開始呢...,其他線程如果獲得執行權之後,會發現已經有人幫助他完成匹配了,所以會很快返回結果
SNode mn = m.next;
if (m.tryMatch(h)) // help match
casHead(h, mn); // pop both h and m
else // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
TransferStack#awaitFulfill
awaitFulfill的作用是通過自旋、或者阻塞當前線程來等待節點被匹配。
3個參數:
SNode s:等待匹配的節點。
booean timed:true則表示限時等待。
long nanos:限時等待時長。
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
//計算等待時長
final long deadline = timed ? System.nanoTime() + nanos : 0L;
//獲取當前線程
Thread w = Thread.currentThread();
//計算自旋時長
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
//自旋開始
for (;;) {
//如果當前線程被中斷,則calcel掉當前節點:將s的match指向自己
if (w.isInterrupted())
s.tryCancel();
//自旋過程中完成匹配,則直接返回匹配節點
SNode m = s.match;
if (m != null)
return m;
//如果是顯示匹配並且匹配超時,則cancel掉s節點
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}
//自旋時長未到則繼續自旋
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
//完成自旋後記錄當前線程
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
//阻塞當前線程
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
TransferStack#clean
clean方法在transfer方法中調用,如果當前節點在等待匹配的過程中已經被cancel掉的話。
代碼不貼出了,基本邏輯就是首先清掉s及s關聯的線程(s=null,s.waiter=null),然後清查並清理掉被cancel掉的head節點(從head到s之後的第一個未被cancel掉的節點逐個檢查),直到確保棧的head節點正常(未被calcel)。
然後從head開始、到s之後的第一個未被cancel掉的節點逐個檢查,如果有節點被標記為cancel則該節點出棧。
執行完成之後,不止是s節點被清理,棧內從head節點開始直到s節點的下一個未被cancel掉的節點之間的節點,如果被cancel掉的話,全部會被清理出棧。
小結
基於TransferStack的SynchronousQueue的源碼就分析完成了,感覺不對照代碼逐行説明的話,就很不容易説清楚TransferStack的transfer、awaitFulfill方法的代碼邏輯,所以就採用在源碼中逐行註釋的方式來説明了。
篇幅原因,TransferQueue下次再説!
Thanks a lot!
上一篇 Runable和Callable的區別?你必須要搞清楚Thread以及FutureTask!
下一篇 BlockQueue - 基於TransferQueue的SynchronousQueue