ReentrantLock 依賴關係如下圖所示
非公平鎖實現原理
ReentrantLock 默認採用非公平鎖。
// ReentrantLock
public ReentrantLock() {
sync = new NonfairSync();
}
加鎖流程
ReentrantLock 的 lock 方法通過同步器的 lock 方法實現。
// ReentrantLock
public void lock() {
sync.lock();
}
同步器的 lock 方法會先調用 initialTryLock() 方法,如果失敗,則調用 acquire() 方法。
// Sync extends AbstractQueuedSynchronizer
final void lock() {
if (!initialTryLock())
acquire(1);
}
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
// 嘗試獲取鎖
if (!tryAcquire(arg))
// 獲取鎖失敗則加入的等待隊列
// null, 自定義參數, 非共享鎖, 不可打斷, 無時限, 時限
acquire(null, arg, false, false, false, 0L);
}
NonfairSync 的 initialTryLock 分兩步:嘗試獲取鎖、嘗試重入。
NonfairSync 的 tryAcquire 方法被調用前,必定會調用 initialTryLock() 方法檢查鎖是否被當前線程持有,也即,調用 tryAcquire 方法時,鎖必定未被當前線程持有。因此,當未有線程持有鎖時,tryAcquire 才能嘗試獲取鎖。
// NonfairSync extends Sync
static final class NonfairSync extends Sync {
// 初次嘗試獲取鎖
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 通過CAS嘗試獲取鎖
if (compareAndSetState(0, 1)) {
// 將鎖的持有者設為當前線程
setExclusiveOwnerThread(current);
return true;
}
// 嘗試獲取鎖失敗,判斷鎖的持有者是否為當前線程
else if (getExclusiveOwnerThread() == current) {
// 鎖重入
int c = getState() + 1;
// 整數溢出
if (c < 0)
throw new Error("Maximum lock count exceeded");
// 設置鎖的狀態:state 表示重入次數
setState(c);
return true;
} else
return false;
}
// 非初次嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
// 未有線程持有鎖時,當前線程嘗試獲取鎖
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
AbstractQueuedSynchronizer 的 acquire 方法負責將線程加入等待隊列,acquire 方法的主要執行步驟如下:
-
若非第一個等待線程且前驅存在,則檢查當前節點前驅是否已被取消(前驅線程被取消)
- 若已被取消,則需要從隊尾開始,往前清理已取消的節點,進入下一輪循環。
- 若未被取消,檢查當前線程是否已成為第一個線程,若是,則自旋等待,進入下一輪循環
- 若是第一個等待線程或者前驅不存在,則嘗試獲取鎖,獲取鎖成功則直接返回 1 表示獲取鎖成功,
-
準備將當前線程加入等待隊列
- 3.1 若等待隊列未創建,則創建等待隊列,進入下一輪循環
- 3.2 否則,若節點未創建,則創建節點,進入下一輪循環
- 3.3 否則,若節點信息未設置,則設置節點信息(包括將節點加入隊尾),進入下一輪循環
- 3.4 否則,若為第一個等待線程且自旋次數不為 0,提示 JVM 當前線程正在忙等,進入下一輪循環
- 3.5 否則,若節點狀態為 0(默認),則將節點狀態設為 1(WAITING)
- 3.6 否則,重置自旋自旋次數,當前線程陷入等待,被喚醒後將等待狀態置為 0。
// AbstractQueuedSynchronizer
final int acquire(
// 嘗試獲取鎖的節點
Node node,
// 自定義參數
int arg,
// 是否為共享鎖
boolean shared,
// 是否可打斷
boolean interruptible,
// 是否帶時限
boolean timed,
// 時限
long time
) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0;
boolean interrupted = false, first = false;
Node pred = null;
for (;;) {
// 檢查前驅是否被取消
if (
// 非第一個等待線程
!first &&
// 前驅存在
// 除非本方法被ConditionNode.await()方法調用,否則node初始必為null
// 也即,此條件在第一輪循環必為false
(pred = (node == null) ? null : node.prev) != null &&
// 非第一個等待線程
!(first = (head == pred))
) {
if (pred.status < 0) {
// 若前驅被取消,則需要清理隊列中取消的線程
// 此舉是為確保給當前節點的前驅能喚醒當前節點
cleanQueue();
continue;
} else if (pred.prev == null) {
// 若前驅的前驅為null,説明當前線程是第一個線程
// !first 和 pred.prev == null 之間,head的值被修改,導致當前線程成為第一個線程
// 第一個線程自旋等待,以減少線程切換的開銷
Thread.onSpinWait();
continue;
}
}
// 嘗試獲取鎖
if (
// 第一個線程 或者 前驅不存在
first || pred == null
) {
boolean acquired;
// 嘗試獲取鎖
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
// 獲取鎖成功
if (acquired) {
// 當前線程是第一個線程
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
// 共享節點獲取到鎖之後需要喚醒下一個節點
signalNextIfShared(node);
if (interrupted)
// 獨佔節點獲取到鎖之後需要重新設置打斷標記
current.interrupt();
}
return 1;
}
}
// 嘗試獲取鎖失敗後,準備將線程加入等待隊列
Node t;
if (
// 等待隊列未創建
(t = tail) == null
) {
// 創建等待隊列
if (tryInitializeHead() == null)
// 創建隊列失敗則調用tryAcquire嘗試獲取鎖,失敗則park(long)等待,重複
return acquireOnOOME(shared, arg);
} else if (
// 節點未創建:創建節點
node == null
) {
try {
// 創建節點
node = (shared) ? new SharedNode() : new ExclusiveNode();
} catch (OutOfMemoryError oome) {
// 創建節點失敗則調用tryAcquire嘗試獲取鎖,失敗則park(long)等待,重複
return acquireOnOOME(shared, arg);
}
} else if (
// 前驅未設置:設置節點信息
pred == null
) {
// 當前節點線程
node.waiter = current;
// 當前節點前驅(節點將由前驅喚醒)
node.setPrevRelaxed(t);
// 通過CAS將尾節點置為當前節點
if (!casTail(t, node))
// 失敗則進入下一次循環
node.setPrevRelaxed(null);
else
// 將當前節點加入隊尾
// 此時t仍指向原來的尾節點,當前節點已為實際尾節點
t.next = node;
} else if (
// 第一個節點 自旋次數不為0
first && spins != 0
) {
// 自旋等待
--spins;
Thread.onSpinWait();
} else if (
// 當前節點狀態為0(新建節點狀態默認為0)
node.status == 0
) {
// 當前節點狀態置為等待(WAITING=1)
node.status = WAITING;
} else {
// 到此處,節點已被創建,節點信息已被設置,節點狀態已被設置
long nanos;
// 設置自旋次數
spins = postSpins = (byte)((postSpins << 1) | 1);
// 當前線程等待
if (!timed)
// 無時限等待
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
// 有時限等待
LockSupport.parkNanos(this, nanos);
else
// 等待超時
break;
// 線程被喚醒後,將狀態設置為0
node.clearStatus();
// 被打斷且可被打斷
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
// (等待超時)或者(被打斷且可被打斷)則取消獲取鎖
return cancelAcquire(node, interrupted, interruptible);
}
線程被喚醒後,再次嘗試獲取鎖,若獲取鎖失敗,不立即陷入等待,而是自旋,再次嘗試獲取鎖,直到自旋次數耗盡。線程每次陷入等待前,會重置自旋次數,自旋次數為 2^k,其中,k 表示當前線程是第 k 陷入等待。注意,陷入等待是指執行 park() 方法,而不是指加入等待隊列,加入等待隊列的操作只會執行一次。
非公平體現在哪?先進先出不是公平的嗎?閲讀非公平同步器的 acquire() 源碼可知,先將嘗試獲取鎖,再將線程加入等待隊列,也即,新來的線程可直接與等待隊列中的線程競爭,準確地説,新來的線程可直接與等待隊列中的第一個等待線程競爭,這導致新來的線程可能比等待隊列中的線程先執行,這是不公平的。
檢查線程是否被打斷使用的是 Thread.interrupted() 方法,Thread.interrupted() 會清除打斷標記,線程再次調用 LockSupport.park() 時仍能生效,Thread.isInterrupted() 不會清除打斷標記,線程再次調用 LockSupport.park() 時無效。
如果線程(等待超時)或者(被打斷且可被打斷),則會取消嘗試獲取鎖 cancelAcquire(),如果因為等待超時而導致取消嘗試獲取鎖,則會保留打斷標記。非公平同步器 NonfairSync 沒有用到 cancelAcquire() 的返回值。
private int cancelAcquire(
Node node,
boolean interrupted,
boolean interruptible
) {
// 修改節點信息
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
// 清理隊列
if (node.prev != null)
cleanQueue();
}
if (interrupted) {
if (interruptible)
// CANCELLED=0x80000000,負數
return CANCELLED;
else
// 重新打斷自己,將打斷標記置為 true
Thread.currentThread().interrupt();
}
return 0;
}
解鎖流程
ReentrantLock 的 unlock 方法通過同步器的 release方法實現。
// ReentrantLock
public void unlock() {
sync.release(1);
}
同步器的 release 方法會先調用 tryRelease 嘗試釋放鎖,若釋放成功則會喚醒下一個線程。
// AbstractQueuedSynchronizer
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
同步器的 tryRelease 方法先修改鎖的持有者(置為 null),再修改鎖的狀態。
// Sync extends AbstractQueuedSynchronizer
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 僅鎖的持有者才可以釋放鎖
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
// free=true 表示現在重入次數為0
boolean free = (c == 0);
// 重入次數為0則將線程持有者置為null
if (free)
setExclusiveOwnerThread(null);
// 設置線程狀態
setState(c);
return free;
}
同步器中的 signalNext 方法將等待隊列中的第一個線程喚醒。
// AbstractQueuedSynchronizer
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
// 將state設置為 ~WAITING
s.getAndUnsetStatus(WAITING);
// 喚醒下一個線程(等待隊列中的第一個等待線程)
LockSupport.unpark(s.waiter);
}
}
可重入
閲讀 NonfairSync 的源碼可以發現,線程第一次嘗試獲取鎖時,嘗試將 state 由 0 置為 1 失敗後,會檢查鎖的持有者是否是自己,若是,則累計重入次數,並返回 true,表示獲取鎖成功。
當鎖的持有者為當前線程,當前線程再次嘗試獲取鎖仍能成功,此即可重入。
// NonfairSync extends Sync
static final class NonfairSync extends Sync {
// 初次嘗試獲取鎖
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 通過CAS嘗試獲取鎖
if (compareAndSetState(0, 1)) {
// 將鎖的持有者設為當前線程
setExclusiveOwnerThread(current);
return true;
}
// 嘗試獲取鎖失敗,判斷鎖的持有者是否為當前線程
else if (getExclusiveOwnerThread() == current) {
// 鎖重入
int c = getState() + 1;
// 整數溢出
if (c < 0)
throw new Error("Maximum lock count exceeded");
// 設置鎖的狀態:state 表示重入次數
setState(c);
return true;
} else
return false;
}
...
}
閲讀 Sync 的源碼可以發現,釋放鎖時,會將重入次數減去 releases(實際為 1),僅當重入次數為 0 時,線程才將鎖的持有者置為 null 並返回 true,表示鎖已成功釋放。
// Sync extends AbstractQueuedSynchronizer
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 僅鎖的持有者才可以釋放鎖
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
// free=true 表示現在重入次數為0
boolean free = (c == 0);
// 重入次數為0則將線程持有者置為null
if (free)
setExclusiveOwnerThread(null);
// 設置線程狀態
setState(c);
return free;
}
可打斷
理解一下參數 interruptible
- interruptible = true,獲取鎖的過程中允許被打斷,立即響應打斷,不再嘗試獲取鎖。
- interruptible = false,獲取鎖的過程中不允許被打斷,不立即響應打斷,繼續嘗試獲取鎖。
// AbstractQueuedSynchronizer
final int acquire(...) {
for (;;) {
...
{
...
// 被打斷且可被打斷
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
// (等待超時)或者(被打斷且可被打斷)則取消獲取鎖
return cancelAcquire(node, interrupted, interruptible);
}
公平鎖的實現原理
通過 ReentrantLock 的有參構造可創建公平鎖。
// ReentrantLock
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
FairSync 和 NonfairSync 的區別即為公平鎖和非公平鎖的區別,主要關注 initialTryLock 方法。
公平鎖在初次嘗試獲取鎖時,僅當鎖未被佔有且等待隊列無等待線程時,當前線程才會立即嘗試獲取鎖,否則將加入等待隊列隊尾。
// ReentrantLock
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 初次嘗試獲取鎖
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
// 鎖未被佔有
if (c == 0) {
// 等待隊列無未取消的等待線程 且 獲取鎖成功
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
// 將鎖的持有者設為當前線程
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
// 重入
if (++c < 0)
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
// 非初次嘗試獲取鎖
protected final boolean tryAcquire(int acquires) {
if (
// 鎖未被佔有
getState() == 0 &&
// 等待隊列無未取消的等待線程
!hasQueuedPredecessors() &&
// 獲取鎖成功
compareAndSetState(0, acquires)
) {
// 將鎖的持有者設為當前線程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
}
hasQueuedThread() 用於判斷等待隊列中是否未取消的等待線程。
// AbstractQueuedSynchronizer
public final boolean hasQueuedThreads() {
for (Node p = tail, h = head; p != h && p != null; p = p.prev)
// 存在未取消的線程
if (p.status >= 0)
return true;
return false;
}
條件變量的實現原理
ReentrantLock 通過 newCondition() 方法創建條件變量,而 ReentrantLock 的 newCondition() 方法基於同步器的 newCondition() 方法。
// ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}
同步器的 newCondition() 方法會創建一個 ConditionObject 對象,即條件變量對象。
// Sync extends AbstractQueuedSynchronizer
final ConditionObject newCondition() {
return new ConditionObject();
}
每個條件變量維護一個條件變量隊列。
// ReentrantLock
public class ConditionObject implements Condition{
// 頭節點
private transient ConditionNode firstWaiter;
// 尾節點
private transient ConditionNode lastWaiter;
}
線程的等待和喚醒可粗略理解為
- 線程等待:先將線程從等待隊列中移除,再將線程加入條件變量隊列
- 線程喚醒:先將線程從條件變量隊列中移除,再將線程加入等待隊列
- 條件變量隊列中的線程不能參與鎖的競爭,而等待隊列中的線程可以參與鎖的競爭
加入條件變量隊列的流程
// ReentrantLock
public class ConditionObject implements Condition{
// 將線程加入條件變量隊列
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 創建節點,創建失敗則自旋嘗試獲取鎖
ConditionNode node = newConditionNode();
// 創建節點失敗
if (node == null)
return;
// 將當前線程加入條件變量隊列
// 當前線程釋放持有的鎖(會將當前線程從等待隊列溢出)並喚醒等待隊列下一個線程
// 返回重入次數
int savedState = enableWait(node);
// 當前線程等待原因置為this,常用於調試
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false, rejected = false;
// !(可以嘗試獲取鎖)
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
// 陷入等待
try {
if (rejected)
node.block();
else
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
// ForkJoinPool線程池無法接收任務
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait();
}
// 當前線程等待原因置為null
LockSupport.setCurrentBlocker(null);
// 重新參與競爭
node.clearStatus();
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
}
// ConditionObject implements Condition
private ConditionNode newConditionNode() {
int savedState;
// 初始化等待隊列頭節點
if (tryInitializeHead() != null) {
try {
return new ConditionNode();
} catch (OutOfMemoryError oome) {
}
}
// 發生OutOfMemoryError時才會往下執行
// 當前線程未持有鎖或者釋放鎖失敗
if (!isHeldExclusively() || !release(savedState = getState()))
throw new IllegalMonitorStateException();
// 當前線程暫停一段時間
U.park(false, OOME_COND_WAIT_DELAY);
// 自旋嘗試獲取鎖
acquireOnOOME(false, savedState);
return null;
}
// ConditionObject implements Condition
private int enableWait(ConditionNode node) {
// 當前線程持有鎖才可以加入條件變量隊列
if (isHeldExclusively()) {
// 設置節點信息
node.waiter = Thread.currentThread();
node.setStatusRelaxed(COND | WAITING);
// 將節點加入條件變量隊列
ConditionNode last = lastWaiter;
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
// 當前線程的重入次數
int savedState = getState();
// 釋放鎖返回重入次數
if (release(savedState))
return savedState;
}
// 未持有鎖或者釋放鎖失敗則拋出異常
node.status = CANCELLED;
throw new IllegalMonitorStateException();
}
// ConditionObject implements Condition
private boolean canReacquire(ConditionNode node) {
Node p;
// node和p的鏈接關係沒有被破壞或者node在等待隊列中
return node != null &&
(p = node.prev) != null &&
(p.next == node || isEnqueued(node));
}
移出條件變量隊列的流程
// ConditionObject implements Condition
public final void signal() {
ConditionNode first = firstWaiter;
// 持有鎖的線程才能喚醒條件變量隊列中的線程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
else if (first != null)
doSignal(first, false);
}
// ConditionObject implements Condition
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
// 將條件變量隊列中的第一個等待線程移除
if ((firstWaiter = next) == null)
lastWaiter = null;
// 將剛從條件變量隊列中移除的等待線程加入等待隊列
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
// 加入等待隊列
enqueue(first);
// 不是喚醒所有線程
if (!all)
break;
}
// 由於線程被取消等原因可能導致線程無法加入等待隊列
// 此時嘗試等待隊列的下一個線程
first = next;
}
}
// ConditionObject implements Condition
final void enqueue(ConditionNode node) {
if (node != null) {
// 是否立即喚醒線程node
boolean unpark = false;
for (Node t;;) {
if ((t = tail) == null && (t = tryInitializeHead()) == null) {
// 等待隊列為null且初始化頭節點失敗時,應立即喚醒線程node
unpark = true;
break;
}
// 設置前驅
node.setPrevRelaxed(t);
// 通過CAS操作將等待隊列隊尾置為node
if (casTail(t, node)) {
t.next = node;
// 等待隊列隊尾被取消,則喚醒線程node,以清理隊列
if (t.status < 0)
unpark = true;
break;
}
}
// 立即喚醒線程node
if (unpark)
LockSupport.unpark(node.waiter);
}
}
END
如果覺得本文對您有一點點幫助,歡迎點贊、轉發加關注,這會對我有非常大的幫助,如果有任何問題,歡迎在評論區留言或者後台私信,咱們下期見!
文章文檔:公眾號 字節幺零二四 回覆關鍵字可獲取本文文檔。