概述
Condition 是一個多線程協調通信的工具類,可以讓某些線程一起等待某個條件(condition),只有滿足條件時,線程才會被喚醒。
-
在使用Lock之前,使用的最多的同步方式應該是synchronized關鍵字來實現同步方式了。配合Object的wait()、notify()系列方法可以實現等待/通知模式。
-
Condition接口也提供了類似Object的監視器方法,與Lock配合可以實現等待/通知模式,但是這兩者在使用方式以及功能特性上還是有差別的。
Object和Condition接口的一些對比。
| 對比項 | Object 監視器方法 | Condition |
|---|---|---|
| 前置條件 | 獲取對象的監視器鎖 | 調用 Lock.lock() 獲取鎖調用 Lock.newCondition() 獲取 Condition 對象 |
| 調用方法 | 直接調用如:object.wait() | 直接調用如:condition.await() |
| 等待隊列個數 | 一個 | 多個 |
| 當前線程釋放鎖並進入等待隊列 | 支持 | 支持 |
| 當前線程釋放鎖並進入等待隊列,在等待狀態中不響應中斷 | 不支持 | 支持 |
| 當前線程釋放鎖並進入超時等待狀態 | 支持 | 支持 |
| 當前線程釋放鎖並進入等待狀態到將來的某個時間 | 不支持 | 支持 |
| 喚醒等待隊列中的一個線程 | 支持 | 支持 |
| 喚醒等待隊列中的全部線程 | 支持 | 支持 |
接口的介紹與示例
首先需要明白condition對象是依賴於lock對象的,意思就是説condition對象需要通過lock對象進行創建出來(調用Lock對象的newCondition()方法)。condition的使用方式非常的簡單。但是需要注意在調用方法前獲取鎖。
/**
* condition使用示例:
* 1、condition的使用必須要配合鎖使用,調用方法時必須要獲取鎖
* 2、condition的創建依賴於Lock lock.newCondition();
*/
public class ConditionUseCase {
/**
* 創建鎖
*/
public Lock readLock = new ReentrantLock();
/**
* 創建條件
*/
public Condition condition = readLock.newCondition();
public static void main(String[] args) {
ConditionUseCase useCase = new ConditionUseCase();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> {
//獲取鎖進行等待
useCase.conditionWait();
});
executorService.execute(() -> {
//獲取鎖進行喚起讀鎖
useCase.conditionSignal();
});
}
/**
* 等待線程
*/
public void conditionWait() {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "拿到鎖了");
System.out.println(Thread.currentThread().getName() + "等待信號");
condition.await();
System.out.println(Thread.currentThread().getName() + "拿到信號");
} catch (Exception e) {
} finally {
readLock.unlock();
}
}
/**
* 喚起線程
*/
public void conditionSignal() {
readLock.lock();
try {
//睡眠5s 線程1啓動
Thread.sleep(5000);
System.out.println(Thread.currentThread().getName() + "拿到鎖了");
condition.signal();
System.out.println(Thread.currentThread().getName() + "發出信號");
} catch (Exception e) {
} finally {
//釋放鎖
readLock.unlock();
}
}
}
//執行結果
1 pool-1-thread-1拿到鎖了
2 pool-1-thread-1等待信號 ---釋放鎖-線程等待 t1
3 pool-1-thread-2拿到鎖了
4 pool-1-thread-2發出信號 --- 喚起線程t2釋放鎖
5 pool-1-thread-1拿到信號---t1繼續執行
如示例所示,一般都會將Condition對象作為成員變量。當調用await()方法後,當前線程會釋放鎖並在此等待,而其他線程調用Condition對象的signal()方法,通知當前線程後,當前線程才從await()方法返回,並且在返回前已經獲取了鎖。
接口常用方法
condition可以通俗的理解為條件隊列。當一個線程在調用了await方法以後,直到線程等待的某個條件為真的時候才會被喚醒。這種方式為線程提供了更加簡單的等待/通知模式。Condition必須要配合鎖一起使用,因為對共享狀態變量的訪問發生在多線程環境下。一個Condition的實例必須與一個Lock綁定,因此Condition一般都是作為Lock的內部實現。
- await() :造成當前線程在接到信號或被中斷之前一直處於等待狀態。
- boolean await(long time, TimeUnit unit) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態---》是否超時,超時異常
- awaitNanos(long nanosTimeout) :造成當前線程在接到信號、被中斷或到達指定等待時間之前一直處於等待狀態。返回值表示剩餘時間,如果在nanosTimesout之前喚醒,那麼返回值 = nanosTimeout - 消耗時間,如果返回值 <= 0 ,則可以認定它已經超時了。
- awaitUninterruptibly() :造成當前線程在接到信號之前一直處於等待狀態。【注意:該方法對中斷不敏感】。
- awaitUntil(Date deadline) :造成當前線程在接到信號、被中斷或到達指定最後期限之前一直處於等待狀態。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。
- signal() :喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關的鎖。
- signalAll() :喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關的鎖。
這裏順便回顧一下 Object 類的主要方法:
wait():線程等待直到被通知或者中斷。wait(long timeout):線程等待指定的時間,或被通知,或被中斷。wait(long timeout, int nanos):線程等待指定的時間,或被通知,或被中斷。notify():喚醒一個等待的線程。notifyAll():喚醒所有等待的線程。
原理解析
Condition是AQS的內部類。可以通過 Lock.newCondition() 方法獲取 Condition 對象,而 Lock 對於同步狀態的實現都是通過內部的自定義同步器實現的,newCondition() 方法也不例外,所以,Condition 接口的唯一實現類是同步器 AQS 的內部類 ConditionObject,因為 Condition 的操作需要獲取相關的鎖,所以作為同步器的內部類也比較合理,該類定義如下:
public class ConditionObject implements Condition, java.io.Serializable
等待隊列
前面我們學過,AQS 內部維護了一個先進先出(FIFO)的雙端隊列,並使用了兩個引用 head 和 tail 用於標識隊列的頭部和尾部。
Condition 內部也使用了同樣的方式,內部維護了一個先進先出(FIFO)的單向隊列,我們把它稱為等待隊列。該隊列是 Condition 對象實現等待 / 通知功能的關鍵。等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那麼該線程將會釋放鎖、構造成節點加入等待隊列並進入等待狀態。
事實上,節點的定義複用了 AQS 中 Node 節點的定義,也就是説,同步隊列和等待隊列中節點類型都是 AQS 的靜態內部類 AbstractQueuedSynchronized.Node。一個 Condition 包含一個等待隊列,Condition 擁有首節點(firstWaiter)和尾節點(lastWaiter)。當前線程調用 Condition.await() 方法之後,將會以當前線程構造節點,並將節點從尾部加入等待隊列,等待隊列的基本結構如下所示。

- 等待隊列分為首節點和尾節點。當一個線程調用Condition.await()方法,將會以當前線程構造節點,並將節點從尾部加入等待隊列。
- 新增節點就是將尾部節點指向新增的節點。節點引用更新本來就是在獲取鎖以後的操作,所以不需要CAS保證。同時也是線程安全的操作。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
在 Object 的監視器模型上,一個對象擁有一個同步隊列和等待隊列,而併發包中的 Lock(更確切地説是同步器)擁有一個同步隊列和多個等待隊列
等待 await 方法
-
當線程調用了await方法以後。線程就作為隊列中的一個節點被加入到等待隊列中去了。同時會釋放鎖的擁有。
-
當從await方法返回的時候。一定會獲取condition相關聯的鎖。當等待隊列中的節點被喚醒的時候,則喚醒節點的線程開始嘗試獲取同步狀態。
-
如果不是通過 其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException異常信息。
-
通知調用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節點(條件隊列裏的首節點),在喚醒節點前,會將節點移到同步隊列中。
-
當前線程加入到等待隊列中如圖所示:
源碼如下:
public final void await() throws InterruptedException {
// 檢測線程中斷狀態
if (Thread.interrupted())
throw new InterruptedException();
// 將當前線程包裝為Node節點加入等待隊列
Node node = addConditionWaiter();
// 釋放同步狀態,也就是釋放鎖
int savedState = fullyRelease(node);
int interruptMode = 0;
// 檢測該節點是否在同步隊中,如果不在,則説明該線程還不具備競爭鎖的資格,則繼續等待
while (!isOnSyncQueue(node)) {
// 掛起線程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 競爭同步狀態
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清理條件隊列中的不是在等待條件的節點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
調用該方法的線程是成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構造節點並加入等待隊列中,然後釋放同步狀態,喚醒同步隊列中的後繼節點,然後當前線程會進入等待狀態。
可能會有這樣幾個問題:
- 怎樣將當前線程添加到等待隊列中?
- 釋放鎖的過程是?
- 怎樣才能從 await 方法中退出?
問題1:怎樣將當前線程添加到等待隊列中?
調用 addConditionWaiter 方法會將當前線程添加到等待隊列中,源碼如下:
private Node addConditionWaiter() {
// 尾節點
Node t = lastWaiter;
// 尾節點如果不是CONDITION狀態,則表示該節點不處於等待狀態,需要清理節點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 根據當前線程創建Node節點
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 將該節點加入等待隊列的末尾
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先將 t 指向尾節點,如果尾節點不為空並且它的waitStatus!=-2(-2 為 CONDITION,表示正在等待 Condition 條件),則將不處於等待狀態的節點從等待隊列中移除,並且將 t 指向新的尾節點。
然後將當前線程封裝成 waitStatus 為-2 的節點追加到等待隊列尾部。
如果尾節點為空,則表明隊列為空,將首尾節點都指向當前節點。
如果尾節點不為空,表明隊列中有其他節點,則將當前尾節點的 nextWaiter 指向當前節點,將當前節點置為尾節點。
簡單總結一下,這段代碼的作用就是通過尾插入的方式將當前線程封裝的 Node 插入到等待隊列中,同時可以看出,Condtion 的等待隊列是一個不帶頭節點的鏈式隊列,之前我們學習 AQS 時知道同步隊列是一個帶頭節點的鏈式隊列,這是兩者的一個區別。
關於頭節點的作用,我們這裏簡單説明一下。
不帶頭節點是指在鏈表數據結構中,鏈表的第一個節點就是實際存儲的第一個數據元素,而不是一個特定的"頭"節點,該節點不包含實際的數據。
1)不帶頭節點的鏈表:
- 鏈表的第一個節點就是第一個實際的數據節點。
- 當鏈表為空時,頭引用(通常稱為 head)指向 null。
2)帶頭節點的鏈表:
- 鏈表有一個特殊的節點作為鏈表的開頭,這個特殊的節點稱為頭節點。
- 頭節點通常不存儲任何實際數據,或者它的數據字段不被使用。
- 無論鏈表是否為空,頭節點總是存在的。當鏈表為空時,頭節點的下一個節點指向 null。
- 使用頭節點可以簡化某些鏈表操作,因為你不必特殊處理第一個元素的插入和刪除。
為了更好地解釋這兩種鏈表結構,我將為每種結構提供一個簡單的整數鏈表插入方法的示例。
1)不帶頭節點的鏈表
public class Node {
public int data;
public Node next;
public Node(int data) {
this.data = data;
this.next = null;
}
}
public class LinkedListWithoutHead {
public Node head;
public void insert(int value) {
Node newNode = new Node(value);
if (head == null) {
head = newNode;
} else {
Node temp = head;
while (temp.next != null) {
temp = temp.next;
}
temp.next = newNode;
}
}
}
2)帶頭節點的鏈表
public class NodeWithHead {
public int data;
public NodeWithHead next;
public NodeWithHead(int data) {
this.data = data;
this.next = null;
}
}
public class LinkedListWithHead {
private NodeWithHead head;
public LinkedListWithHead() {
head = new NodeWithHead(-1); // 初始化頭節點
}
public void insert(int value) {
NodeWithHead newNode = new NodeWithHead(value);
NodeWithHead temp = head;
while (temp.next != null) {
temp = temp.next;
}
temp.next = newNode;
}
}
這下是不是就徹底明白了?説明白了頭節點,我們再回到 Condition 的 await 方法。
問題 2:釋放鎖的過程是?
將當前線程加入到等待隊列之後,需要釋放同步狀態,該操作通過 fullyRelease(Node) 方法來完成:
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 獲取同步狀態
int savedState = getState();
// 釋放鎖
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
這段代碼也很容易理解,調用 AQS 的模板方法 release 釋放 AQS 的同步狀態並且喚醒在同步隊列中頭節點的後繼節點引用的線程,如果釋放成功則正常返回,若失敗的話就拋出異常。
問題3:怎樣才能從 await 方法中退出?
怎樣從 await 方法退出呢?現在回過頭再來看 await 方法,其中有這樣一段邏輯:
while (!isOnSyncQueue(node)) {
// 3. 當前線程進入到等待狀態
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue 方法用於判斷當前線程所在的 Node 是否在同步隊列中:
final boolean isOnSyncQueue(Node node) {
// 節點狀態為CONDITION,或者前驅節點為null,返回false
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 後繼節點不為null,那麼肯定在同步隊列中
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
如果當前節點的 waitStatus=-2,説明它在等待隊列中,返回 false;如果當前節點有前驅節點,則證明它在 AQS 隊列中,但是前驅節點為空,説明它是頭節點,而頭節點是不參與鎖競爭的,也返回 false。
如果當前節點既不在等待隊列中,又不是 AQS 中的頭節點且存在 next 節點,説明它存在於 AQS 中,直接返回 true。
這裏有必要給大家看一下同步隊列與等待隊列的關係圖了。
當線程第一次調用 condition.await 方法時,會進入到這個 while 循環,然後通過 LockSupport.park(this) 使當前線程進入等待狀態,那麼要想退出 await,第一個前提條件就是要先退出這個 while 循環,出口就只兩個地方:
- 走到 break 退出 while 循環;
- while 循環中的邏輯判斷為 false。
出現第 1 種情況的條件是,當前等待的線程被中斷後代碼會走到 break 退出,第 2 種情況是當前節點被移動到了同步隊列中(即另外一個線程調用了 condition 的 signal 或者 signalAll 方法),while 中邏輯判斷為 false 後結束 while 循環。
總結一下,退出 await 方法的前提條件是當前線程被中斷或者調用 condition.signal 或者 condition.signalAll 使當前節點移動到同步隊列後。
當退出 while 循環後會調用acquireQueued(node, savedState),該方法的作用是在自旋過程中線程不斷嘗試獲取同步狀態,直到成功(線程獲取到 lock)。這樣也説明了退出 await 方法必須是已經獲得了 condition 引用(關聯)的 lock。
到目前為止,上文提到的三個問題,我們都通過閲讀源碼的方式找到了答案,也加深了對 await 方法的理解。await 方法示意圖如下:
如圖,調用 condition.await 方法的線程必須是已經獲得了 lock 的線程,也就是當前線程是同步隊列中的頭節點。調用該方法後會使得當前線程所封裝的 Node 尾插入到等待隊列中。
超時機制的支持
condition 還額外支持超時機制,使用者可調用 awaitNanos、awaitUtil 這兩個方法,實現原理基本上與 AQS 中的 tryAcquire 方法如出一轍。
不響應中斷的支持
要想不響應中斷可以調用 condition.awaitUninterruptibly() 方法,該方法的源碼如下:
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
這段方法與上面的 await 方法基本一致,只不過減少了對中斷的處理。
通知-signal/signalAll 實現原理
調用 condition 的 signal 或者 signalAll 方法可以將等待隊列中等待時間最長的節點移動到同步隊列中,使得該節點能夠有機會獲得 lock。等待隊列是先進先出(FIFO)的,所以等待隊列的頭節點必然會是等待時間最長的節點,也就是每次調用 condition 的 signal 方法都會將頭節點移動到同步隊列中。
在調用signal()方法之前必須先判斷是否獲取到了鎖。接着獲取等待隊列的首節點,將其移動到同步隊列並且利用LockSupport喚醒節點中的線程。節點從等待隊列移動到同步隊列如下圖所示:
被喚醒的線程將從await方法中的while循環中退出。隨後加入到同步狀態的競爭當中去。成功獲取到競爭的線程則會返回到await方法之前的狀態。
源碼如下:
調用 Condition 的 signal() 方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列中。Condition 的 signal() 方法如下所示:
public final void signal() {
// 判斷是否是當前線程獲取了鎖
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 喚醒等待隊列的首節點
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
該方法最終調用 doSignal(Node) 方法來喚醒節點:
private void doSignal(Node first) {
do {
// 把等待隊列的首節點移除之後,要修改首結點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
真正對頭節點做處理的邏輯,將節點移動到同步隊列是通過 transferForSignal(Node) 方法完成的:
final boolean transferForSignal(Node node) {
// 嘗試將該節點的狀態從CONDITION修改為0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 將節點加入到同步隊列尾部,返回該節點的前驅節點
Node p = enq(node);
int ws = p.waitStatus;
// 如果前驅節點的狀態為CANCEL或者修改waitStatus失敗,則直接喚醒當前線程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
這段代碼主要做了兩件事情:
- 將頭節點的狀態更改為 CONDITION;
- 調用 enq 方法,將該節點尾插入到同步隊列中,關於 enq 方法請看 AQS 的底層實現這篇文章。
節點從等待隊列移動到同步隊列的過程如下圖所示:
被喚醒後的線程,將從 await() 方法中的 while 循環中退出(因為此時 isOnSyncQueue(Node) 方法返回 true),進而調用 acquireQueued() 方法加入到獲取同步狀態的競爭中。
成功獲取了鎖之後,被喚醒的線程將從先前調用的 await() 方法返回,此時,該線程已經成功獲取了鎖。
signalAll()
sigllAll 與 sigal 方法的區別體現在 doSignalAll 方法上,前面我們已經知道 doSignal 方法只會對等待隊列的頭節點進行操作, signalAll() 方法相當於對等待隊列的每個節點均執行一次 signal() 方法,效果就是將等待隊列中的所有節點移動到同步隊列中。doSignalAll 的源碼如下:
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
該方法會將等待隊列中的每一個節點都移入到同步隊列中,即“通知”當前調用 condition.await() 方法的每一個線程。
await 與 signal/signalAll
文章開篇提到的等待/通知機制,通過 condition 的 await 和 signal/signalAll 方法就可以實現,而這種機制能夠解決最經典的問題就是“生產者與消費者問題”
await、signal 和 signalAll 方法就像一個開關,控制着線程 A(等待方)和線程 B(通知方)。它們之間的關係可以用下面這幅圖來説明,會更貼切:
線程 awaitThread 先通過 lock.lock() 方法獲取鎖,成功後調用 condition.await 方法進入等待隊列,而另一個線程 signalThread 通過 lock.lock() 方法獲取鎖成功後調用了 condition.signal 或者 signalAll 方法,使得線程 awaitThread 能夠有機會移入到同步隊列中,當其他線程釋放 lock 後使得線程 awaitThread 能夠有機會獲取 lock,從而使得線程 awaitThread 能夠從 await 方法中退出並執行後續操作。如果 awaitThread 獲取 lock 失敗會直接進入到同步隊列。
總結
-
調用await方法後,將當前線程加入Condition等待隊列中。當前線程釋放鎖。否則別的線程就無法拿到鎖而發生死鎖。自旋(while)掛起,不斷檢測節點是否在同步隊列中了,如果是則嘗試獲取鎖,否則掛起。
-
當線程被signal方法喚醒,被喚醒的線程將從await()方法中的while循環中退出來,然後調用acquireQueued()方法競爭同步狀態。