condition 介紹及demo

 Condition是在java 1.5中才出現的,它用來替代傳統的Object的wait()、notify()實現線程間的協作,相比使用Object的wait()、notify(),使用Condition的await()、signal()這種方式實現線程間協作更加安全和高效。因此通常來説比較推薦使用Condition,阻塞隊列實際上是使用了Condition來模擬線程間協作。

  • Condition是個接口,基本的方法就是await()和signal()方法;
  • Condition依賴於Lock接口,生成一個Condition的基本代碼是lock.newCondition() 
  •  調用Condition的await()和signal()方法,都必須在lock保護之內,就是説必須在lock.lock()和lock.unlock之間才可以使用

  Conditon中的await()對應Object的wait();

  Condition中的signal()對應Object的notify();

Condition中的signalAll()對應Object的notifyAll()。

下面是demo:  


[java]  view plain  copy


1. package thread;  
2.   
3. import java.util.concurrent.locks.Condition;  
4. import java.util.concurrent.locks.Lock;  
5. import java.util.concurrent.locks.ReentrantLock;  
6. /**
7.  * 
8.  * @author zhangliang
9.  *
10.  * 2016年4月8日 下午5:48:54
11.  */  
12. public class ConTest {  
13.       
14. final Lock lock = new ReentrantLock();  
15. final Condition condition = lock.newCondition();  
16.   
17. public static void main(String[] args) {  
18. // TODO Auto-generated method stub  
19. new ConTest();  
20. new Producer();  
21. new Consumer();  
22.                 
23.           
24.         consumer.start();   
25.         producer.start();  
26.     }  
27.       
28. class Consumer extends Thread{  
29.            
30. @Override  
31. public void run() {  
32.                 consume();  
33.             }  
34.                 
35. private void consume() {  
36.                                
37. try {  
38.                            lock.lock();  
39. "我在等一個新信號"+this.currentThread().getName());  
40.                         condition.await();  
41.                           
42. catch (InterruptedException e) {  
43. // TODO Auto-generated catch block  
44.                         e.printStackTrace();  
45. finally{  
46. "拿到一個信號"+this.currentThread().getName());  
47.                         lock.unlock();  
48.                     }  
49.                   
50.             }  
51.         }  
52.        
53. class Producer extends Thread{  
54.            
55. @Override  
56. public void run() {  
57.                 produce();  
58.             }  
59.                 
60. private void produce() {                   
61. try {  
62.                            lock.lock();  
63. "我拿到鎖"+this.currentThread().getName());  
64.                             condition.signalAll();                             
65. "我發出了一個信號:"+this.currentThread().getName());  
66. finally{  
67.                         lock.unlock();  
68.                     }  
69.                 }  
70.      }  
71.           
72. }


運行結果:

Java conn表達式 java中condition_System

Condition的執行方式,是當在線程Consumer中調用await方法後,線程Consumer將釋放鎖,並且將自己沉睡,等待喚醒,線程Producer獲取到鎖後,開始做事,完畢後,調用Condition的signalall方法,喚醒線程Consumer,線程Consumer恢復執行。

以上説明Condition是一個多線程間協調通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),只有當該條件具備( signal 或者 signalAll方法被帶調用)時 ,這些等待線程才會被喚醒,從而重新爭奪鎖。

Condition實現生產者、消費者模式:


[java]  view plain  copy


1. package thread;  
2.   
3. import java.util.PriorityQueue;  
4. import java.util.concurrent.locks.Condition;  
5. import java.util.concurrent.locks.Lock;  
6. import java.util.concurrent.locks.ReentrantLock;  
7.   
8. public class ConTest2 {  
9. private int queueSize = 10;  
10. private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);  
11. private Lock lock = new ReentrantLock();  
12. private Condition notFull = lock.newCondition();  
13. private Condition notEmpty = lock.newCondition();  
14.            
15. public static void main(String[] args) throws InterruptedException  {  
16. new ConTest2();  
17. new Producer();  
18. new Consumer();                
19.             producer.start();  
20.             consumer.start();  
21. 0);  
22.             producer.interrupt();  
23.             consumer.interrupt();  
24.         }  
25.             
26. class Consumer extends Thread{              
27. @Override  
28. public void run() {  
29.                 consume();  
30.             }  
31. volatile boolean flag=true;    
32. private void consume() {  
33. while(flag){  
34.                     lock.lock();  
35. try {  
36. while(queue.isEmpty()){  
37. try {  
38. "隊列空,等待數據");  
39.                                 notEmpty.await();  
40. catch (InterruptedException e) {                              
41. false;  
42.                             }  
43.                         }  
44. //每次移走隊首元素  
45.                         notFull.signal();  
46. "從隊列取走一個元素,隊列剩餘"+queue.size()+"個元素");  
47. finally{  
48.                         lock.unlock();  
49.                     }  
50.                 }  
51.             }  
52.         }  
53.             
54. class Producer extends Thread{              
55. @Override  
56. public void run() {  
57.                 produce();  
58.             }  
59. volatile boolean flag=true;    
60. private void produce() {  
61. while(flag){  
62.                     lock.lock();  
63. try {  
64. while(queue.size() == queueSize){  
65. try {  
66. "隊列滿,等待有空餘空間");  
67.                                 notFull.await();  
68. catch (InterruptedException e) {  
69.                                   
70. false;  
71.                             }  
72.                         }  
73. 1);        //每次插入一個元素  
74.                         notEmpty.signal();  
75. "向隊列取中插入一個元素,隊列剩餘空間:"+(queueSize-queue.size()));  
76. finally{  
77.                         lock.unlock();  
78.                     }  
79.                 }  
80.             }  
81.         }  
82.     }

運行結果如下:

Java conn表達式 java中condition_等待隊列_02

condition實現分析:




Java conn表達式 java中condition_等待隊列_03



  • Condition接口包含了多種await方式和兩個通知方法
  • ConditionObject實現了Condition接口,是AbstractQueuedSynchronizer的內部類
  • Reentrantlock的newCondition方法返回與某個lock實例相關的Condition對象


[java]  view plain  copy

1. public abstract class AbstractQueuedLongSynchronizer  
2. extends AbstractOwnableSynchronizer  
3. implements java.io.Serializable {


[java]  view plain  copy


    1. <span style="font-size:18px;">結合上面的類圖,我們看到condition實現是依賴於aqs,而aqs是個抽象類。裏面定義了同步器的基本框架,實現了基本的結構功能。只留有狀態條件的維護由具體同步器根據具體場景來定製,如常見的 ReentrantLock 、 RetrantReadWriteLock和CountDownLatch 等等,AQS內容太多,儘量只簡明梳理condition相關流程,不太深入理解底層源碼。</span>



    下面結合上面demo來分析流程。


    reentrantLock.newCondition() 返回的是Condition的一個實現,該類在AbstractQueuedSynchronizer(AQS)中被實現,叫做newCondition()



    [java]  view plain  copy

    1. public Condition newCondition() {  
    2. return sync.newCondition();  
    3. }

    我們看一下這個await的方法,它是AQS的方法,


    publicfinalvoid await() throws InterruptedException {
     
    02
    if (Thread.interrupted())
     
    03
    thrownew InterruptedException();
     
    04
    Node node = addConditionWaiter(); //將當前線程包裝下後,
     
    05
    //添加到Condition自己維護的一個鏈表中。
     
    06
    int savedState = fullyRelease(node);//釋放當前線程佔有的鎖,從demo中看到,
     
    07
    //調用await前,當前線程是佔有鎖的
     
    08
     
     
    09
    int interruptMode = 0;
     
    10
    while (!isOnSyncQueue(node)) {//釋放完畢後,遍歷AQS的隊列,看當前節點是否在隊列中,
     
    11
    //不在 説明它還沒有競爭鎖的資格,所以繼續將自己沉睡。
     
    12
    //直到它被加入到隊列中,聰明的你可能猜到了,
     
    13
    //沒有錯,在singal的時候加入不就可以了?
     
    14
    LockSupport.park(this);
     
    15
    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
     
    16
    break;
     
    17
    }
     
    18
    //被喚醒後,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。
     
    19
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
     
    20
    interruptMode = REINTERRUPT;
     
    21
    if (node.nextWaiter != null)
     
    22
    unlinkCancelledWaiters();
     
    23
    if (interruptMode != 0)
     
    24
    reportInterruptAfterWait(interruptMode);
     
    25
    }

    回到上面的demo,鎖被釋放後,線程Consumer開始沉睡,這個時候線程因為線程Consumer沉睡時,會喚醒AQS隊列中的頭結點,所所以線程Producer會開始競爭鎖,並獲取到,執行完後線程Producer會調用signal方法,“發出”signal信號,signal方法如下:

    1
    publicfinalvoid signal() {
     
    2
    if (!isHeldExclusively())
     
    3
    thrownew IllegalMonitorStateException();
     
    4
    Node first = firstWaiter; //firstWaiter為condition自己維護的一個鏈表的頭結點,
     
    5
    //取出第一個節點後開始喚醒操作
     
    6
    if (first != null)
     
    7
    doSignal(first);
     
    8
    }

    説明下,其實Condition內部維護了等待隊列的頭結點和尾節點,該隊列的作用是存放等待signal信號的線程,該線程被封裝為Node節點後存放於此。


    Java conn表達式 java中condition_java_04

    而Condition自己也維護了一個隊列,該隊列的作用是維護一個等待signal信號的隊列,兩個隊列的作用是不同,事實上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:

    Java conn表達式 java中condition_System_05


    注意:

    1.線程producer調用signal方法,這個時候Condition的等待隊列中只有線程Consumer一個節點,於是它被取出來,並被加入到AQS的等待隊列中。  注意,這個時候,線程Consumer 並沒有被喚醒。

    2.Sync是AQS的抽象子類,實現可重入和互斥的大部分功能。在Sync的子類中有FairSync和NonfairSync兩種代表公平鎖策略和非公平鎖策略。Sync lock方法留給子類去實現,NonfairSync的實現:



    [java]  view plain  copy

    1. final void lock() {  
    2. if (compareAndSetState(0, 1))  
    3.                setExclusiveOwnerThread(Thread.currentThread());  
    4. else  
    5. 1);  
    6.        }

    其中如果一開始獲取鎖成功,是直接設置當前線程。

    否則執行acquire(1),也就是進入aqs等待隊列。這裏不展開細節。

    可以這樣理解,整個協作過程是靠結點在AQS的等待隊列和Condition的等待隊列中來回移動實現的,每個隊列的意義不同,Condition作為一個條件類,很好的自己維護了一個等待信號的隊列,並在適時的時候將結點加入到AQS的等待隊列中來實現的喚醒操作

    本文先整理到這裏吧。

    後記:

    梳理本文的過程比較痛苦,為什麼呢?因為我沒有吃透這一塊,發現牽扯的很多,腦子很亂,有廣度又有深度,感覺沒法梳理,決定一點一點去啃,從淺入深的去梳理,從鎖,同步,阻塞隊列,併發容器開始,到依賴的底層aqs\原子變量,再到更底層的volatile、cas。其中aqs是其中的關鍵,很多j.u.c的包是圍繞它實現的。目標就是會用,熟悉原理,讀懂源碼,寫出demo,關鍵地方梳理出流程圖,加油!