Stories

Detail Return Return

BlockingQueue - 基於TransferStack的SynchronousQueue - Stories Detail

ThreadPoolExecutor以BlockingQueue存儲待執行任務,包括SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue,今天的目的是源碼角度深入研究SynchronousQueue。

之後計劃是繼續研究LinkedBlockingQueue和ArrayBlockingQueue,搬開所有絆腳石之後再開始線程池。

基本概念#BlockingQueue

BlockingQueue是SynchronousQueue的爹,他們的祖先是Queue,所以他們都會遵從Queue的一些基本邏輯:比如按順序存入數據、按順序(FIFO或者LIFO)取出數據,都是從隊首(head)獲取數據,FIFO隊列新數據從隊尾入隊、LIFO隊列隊列新數據入隊首。

對於BlockingQueue,我們還是認真看一下他的javaDoc:

BlockingQueue是一個特殊的Queue:當獲取數據的時候阻塞等待隊列非空,存入數據的時候阻塞等待隊列可用(有界隊列未滿)。
BlockingQueue的方法(獲取數據或者存入數據)不能立即成功、但是將來某一時間點可能會成功的情況下,有四種不同的處理方式:一種是拋出異常,第二種是返回特殊值(空值或者false),第三種是無限期阻塞當前線程知道成功,第四種是阻塞等待設定的時間。
具體如下表:
Throws exception Special value Blocks Times out
Insert add add(e) offer offer(e) put put(e) offer(Object, long, TimeUnit) offer(e, time, unit)
Remove #remove remove() #poll poll() #take take() #poll(long, TimeUnit) poll(time, unit)
Examine #element element() #peek peek() not applicable not applicable
BlockingQueue不接受空值null,嘗試寫入null會拋出異常,因為BlockingQueue用null表示操作失敗。
BlockingQueue可以是有界的也可以是無界的,有界隊列維護剩餘容量屬性remainingCapacity,超出該屬性後會阻塞寫入操作。
無界隊列沒有容量限制,始終維護remainingCapacity為Integer.MAX_VALUE
BlockingQueue是線程安全的,BlockingQueue的實現類的方法都是原子操作、或者通過其他併發控制方式實現線程安全性。

基本概念SynchronousQueue

SynchronousQueue是一個每次寫入數據都必須等待其他線程獲取數據(反之亦然)的BlockingQueue。SynchronousQueue沒有容量的概念、一條數據都不能存儲。你不能對SynchronousQueue隊列執行peek操作因為只有執行remove操作才能獲取到數據,只有其他線程要remove數據的時候你才能插入數據,你也不能進行迭代因為他根本就沒有數據。隊列的隊首數據就是第一個寫入數據的線程嘗試寫入的數據,如果沒有寫入線程則隊列中就沒有數據可獲取,poll()方法會返回null。對於集合類的其他方法、比如contains,SynchronousQueue的返回等同於空集合。
SynchronousQueue不允許空(null)元素。
通過構造參數設置fairness為true提供公平隊列,公平隊列遵從FIFO(先進先出)。

基本概念 Transferer

Transferer是SynchronousQueue的內部類,他的JavaDoc也很長:

Transferer擴展實現了W. N. Scherer III和M. L.Scott在"Nonblocking Concurrent Objects with Condition Synchronization"中描述的雙棧(dual stack)或者雙隊列(dual queue)算法。
後進先出(Lifo)棧用來實現非公平模式,先進先出(Fifo)隊列用來實現公平模式。兩者的性能是一樣的,一般情況下Fifo支持大吞吐量、Lifo maintains higher thread locality(抱歉,沒搞懂什麼意思)。
dual queue(或dual stack)在任一時間要麼持有數據(寫入操作提供的)、要麼持有請求(獲取數據的請求),向正好持有數據的隊列請求數據、或者向持有請求的隊列寫入數據被稱之為"fulfill"。最有趣的特性是對隊列的任何操作都能知道隊列當時處於什麼狀態,因此操作不必要上鎖。
queue和stack都擴展自虛擬類Transferer,Transferer定義了一個方法transfer,可以同時實現put和take功能。把put和take統一在一個transfer方法中的原因是dual數據結構使得put和take操作是對稱操作,所以兩個方法的大部分代碼都可以被合併。

好了,有關JavaDoc描述的特性就到這裏了,SynchronousQueue的JavaDoc很不容易理解,代碼也是。

開始分析源碼:

  1. 構造函數:決定SynchronousQueue底層數據結構是用Queue還是Stack
  2. 由於SynchronousQueue是比較特殊的:不存儲數據的(JavaDoc提到過),所以需要明確的是相關集合方法的返回也相應比較特殊,比如size=0,isEmpty=true等等,這部分源碼就不看了,特別簡單
  3. 隊列存、取數據的方法,最終調用的都是Transferer的tranfer方法,所以我們主要就看這個方法

SynchronousQueue構造方法

提供一個有參構造方法接收一個布爾量fair,我們前面説過,Queue是公平的、Stack是非公平的,所以fair=true的話創建TransferQueue,否則創建TransferStack。

TransferStack#SNode

TransferStack(TransferQueue也一樣)的源碼雖然不多,但是必須首先了解清楚他的數據結構,否則不太容易讀懂。

節點SNode:也就是存儲到棧內的內容,注意我這裏沒有説存儲在棧內的數據而是説內容,是因為TransferStack的特殊性導致説數據容易引起誤解:棧內有兩種類型的節點,一種是“data”,可以理解為“生產者”放到棧內等得消費者消費的數據,另一種是“request”,可以理解為消費者的消費請求,也就是説請求和數據都會入棧,都屬於“節點”。

TransferStack通過內部類SNode定義節點,主要屬性:

static final class SNode {
            volatile SNode next;        // next node in stack
            volatile SNode match;       // the node matched to this
            volatile Thread waiter;     // to control park/unpark
            Object item;                // data; or null for REQUESTs
            int mode;

next:下一節點。
match:當前節點的匹配節點,比如一個請求數據的Request節點入棧後,正好有一個data節點入棧,他們兩個如果匹配成功的話,match就是對方節點。
waiter:如果當前節點即使在自旋等待後仍然沒有被匹配,比如一個請求線程發送獲取數據的請求後,該請求會以請求節點(Request節點)入棧,始終沒有數據送進來,則當前節點的waiters就記錄為當前線程,之後當前線程自己掛起,等待匹配。這個等待匹配的過程是被動的,只能被另外一個data線程送進來的data節點匹配,匹配之後data線程通過Request節點的waiters獲取到其對應的線程後喚醒該線程。
item:data類的節點,記錄送進來的待消費的數據,Request類的節點,item為null。
mode:當前節點的mode,有三個mode:data mode表示當前節點是數據節點(生產者發來的),Request mode表示當前節點是請求節點(消費者發來的),還有一個比較特殊的mode是:匹配中的數據節點或匹配中的請求節點,這個mode後面分析tranfer代碼的時候再説。
head:頭節點,棧結構嘛,入棧節點始終是頭節點,也只有頭節點具有正常出棧的權限。

SNode提供了幾個原子性的操作:

  1. casNext:cas方式替換當前節點的下一節點
  2. tryCancel;這個實現比較特殊:當前節點的match如果為null的話則將match指向自己。用這種方式表示該節點被calcel
  3. casHead:cas的方式修改頭節點,其實就是入棧或出棧操作

TransferStack#transfer

E transfer(E e, boolean timed, long nanos) {
      SNode s = null; // constructed/reused as needed
      //如果e為null的話就是REQUEST操作,否則就是DATA操作
      int mode = (e == null) ? REQUEST : DATA;
      for (;;) {
            //取頭節點(首節點)h
           SNode h = head;
           //空棧,或者首節點mode與當前操作的mode相同,説明當前節點與首節點不可能匹配了
           if (h == null || h.mode == mode) {  // empty or same-mode
               //時間到,等不了了
               if (timed && nanos <= 0) {      // can't wait
                   //首節點被calcel了
                   if (h != null && h.isCancelled())
                       //首節點出棧
                       casHead(h, h.next);     // pop cancelled node
                   //既然等不及了,就返回null
                   else
                       return null;
               //否則,當前節點入棧,如果入棧成功,s就是首節點了
               } else if (casHead(h, s = snode(s, e, h, mode))) {
                   //調用awaitFulfill阻塞等待匹配節點
                   SNode m = awaitFulfill(s, timed, nanos);
                   //阻塞等待調用結果如果是s的話,説明s被取消了
                   if (m == s) {               // wait was cancelled
                       clean(s);
                       return null;
                   }
                   //否則就是阻塞等待後匹配成功了,那麼判斷如果頭節點不空並且下一節點是s的話
                   //説明除了等來一個匹配節點之外,沒有其他節點加入,那麼這一對兒匹配節點都出棧
                   if ((h = head) != null && h.next == s)
                       casHead(h, s.next);     // help s's fulfiller
                   //成功匹配,可以返回了
                   return (E) ((mode == REQUEST) ? m.item : s.item);
               }
           //否則,存在頭節點並且當前節點mode不同可以匹配,並且頭節點尚未被其他線程匹配
           } else if (!isFulfilling(h.mode)) { // try to fulfill
               //如果頭節點已經被取消,則出棧
               if (h.isCancelled())            // already cancelled
                   casHead(h, h.next);         // pop and retry
               //否則當前節點以FULLFILLING模式入棧,s變為首節點
               else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                   //一直循環直到成功匹配或棧內的等待節點突然消失
                   for (;;) { // loop until matched or waiters disappear
                       //m為s的下一節點
                       SNode m = s.next;       // m is s's match
                       //m空,説明以前的等待節點突然消失,比如等待節點超時取消
                       if (m == null) {        // all waiters are gone
                           //清空棧
                           casHead(s, null);   // pop fulfill node
                           //清空s,重新進入主循環
                           s = null;           // use new node next time
                           break;              // restart main loop
                       }
                       //否則,可以開始匹配了,mn為m的下一節點,為出棧做好準備
                       SNode mn = m.next;
                       //如果m和s能匹配成功,則m和s都出棧,返回結果
                       if (m.tryMatch(s)) {
                           casHead(s, mn);     // pop both s and m
                           return (E) ((mode == REQUEST) ? m.item : s.item);
                       //匹配沒有成功,這種情況應該是m超時取消掉了,則m出棧
                       } else                  // lost match
                           s.casNext(m, mn);   // help unlink
                   }
               }
           //否則,棧首節點處於匹配中FULLFILLING的狀態(其他線程正在匹配但是尚未完成)
           //這種情況下,新來的節點不入棧,先協助完成棧首節點的匹配
           } else {                            // help a fulfiller
                    SNode m = h.next;               // m is h's match
                    //首節點的下一節點為空(被取消了),則清空棧
                    if (m == null)                  // waiter is gone
                        casHead(h, null);           // pop fulfilling node
                    else {
                        //否則,去匹配首節點h和他的下一節點m,如果匹配成功了則h和m出棧
                        //這種情況下是不需要返回,因為是協助其他線程完成匹配,自己的匹配任務尚未開始呢...,其他線程如果獲得執行權之後,會發現已經有人幫助他完成匹配了,所以會很快返回結果
                        SNode mn = m.next;
                        if (m.tryMatch(h))          // help match
                            casHead(h, mn);         // pop both h and m
                        else                        // lost match
                            h.casNext(m, mn);       // help unlink
                    }
                }
            }
        }

TransferStack#awaitFulfill

awaitFulfill的作用是通過自旋、或者阻塞當前線程來等待節點被匹配。

3個參數:
SNode s:等待匹配的節點。
booean timed:true則表示限時等待。
long nanos:限時等待時長。

SNode awaitFulfill(SNode s, boolean timed, long nanos) {
            //計算等待時長
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            //獲取當前線程
            Thread w = Thread.currentThread();
            //計算自旋時長
            int spins = (shouldSpin(s) ?
                         (timed ? maxTimedSpins : maxUntimedSpins) : 0);
            //自旋開始
            for (;;) {
                //如果當前線程被中斷,則calcel掉當前節點:將s的match指向自己
                if (w.isInterrupted())
                    s.tryCancel();
                //自旋過程中完成匹配,則直接返回匹配節點
                SNode m = s.match;
                if (m != null)
                    return m;
                //如果是顯示匹配並且匹配超時,則cancel掉s節點
                if (timed) {
                    nanos = deadline - System.nanoTime();
                    if (nanos <= 0L) {
                        s.tryCancel();
                        continue;
                    }
                }
                //自旋時長未到則繼續自旋
                if (spins > 0)
                    spins = shouldSpin(s) ? (spins-1) : 0;
                //完成自旋後記錄當前線程
                else if (s.waiter == null)
                    s.waiter = w; // establish waiter so can park next iter
                //阻塞當前線程
                else if (!timed)
                    LockSupport.park(this);
                else if (nanos > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanos);
            }
        }

TransferStack#clean

clean方法在transfer方法中調用,如果當前節點在等待匹配的過程中已經被cancel掉的話。

代碼不貼出了,基本邏輯就是首先清掉s及s關聯的線程(s=null,s.waiter=null),然後清查並清理掉被cancel掉的head節點(從head到s之後的第一個未被cancel掉的節點逐個檢查),直到確保棧的head節點正常(未被calcel)。

然後從head開始、到s之後的第一個未被cancel掉的節點逐個檢查,如果有節點被標記為cancel則該節點出棧。

執行完成之後,不止是s節點被清理,棧內從head節點開始直到s節點的下一個未被cancel掉的節點之間的節點,如果被cancel掉的話,全部會被清理出棧。

小結

基於TransferStack的SynchronousQueue的源碼就分析完成了,感覺不對照代碼逐行説明的話,就很不容易説清楚TransferStack的transfer、awaitFulfill方法的代碼邏輯,所以就採用在源碼中逐行註釋的方式來説明了。

篇幅原因,TransferQueue下次再説!

Thanks a lot!

上一篇 Runable和Callable的區別?你必須要搞清楚Thread以及FutureTask!
下一篇 BlockQueue - 基於TransferQueue的SynchronousQueue

Add a new Comments

Some HTML is okay.