基於AQS實現的ReentrantLock
這裏的源碼用的Java8版本
lock方法
當ReentrantLock類的實例對象嘗試獲取鎖的時候,調用lock方法,
會進入sync的lock方法,其中Sync是ReentrantLock的一個內部類,ReentrantLock構造方法會默認使用非公平鎖NonfairSync,這個類是繼承於Sync的
final void lock() {
if (!initialTryLock())
acquire(1);
}
// 其中Sync的initialTryLock是抽象方法,需要看非公平鎖實現方法
[!TIP]
在這裏是第一次嘗試獲取鎖
由於ReentrantLock是個可重入鎖,判斷裏有重入的判斷
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 獲取當前線程的對象
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 用CAS比較state狀態是否為0(無人持有鎖),如果是,就轉為1(獲取到鎖)
setExclusiveOwnerThread(current);
// 將當前進程設置為擁有鎖的線程
return true;
} else if (getExclusiveOwnerThread() == current) {
// 當前線程為擁有鎖的線程(重複獲取),重入
int c = getState() + 1;
if (c < 0) // overflow
// 負數,state是個int類型數據,超出可能導致溢出變為負數
throw new Error("Maximum lock count exceeded");
setState(c);
// 設置新的state
return true;
} else
// 已有線程佔鎖,返回為false
return false;
}
然後開始調用acquire方法,傳入1
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
調用tryAcquire()方法,其中tryAcquire()方法是一個只有拋出異常的方法,需要重寫,我們看非公平鎖的寫法
[!TIP]
這是第二次獲取鎖
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
這裏,如果state是0,即沒有線程佔用鎖的情況下getState() == 0這個為真!hasQueuedPredecessors()執行這個方法,這個方法會檢查是否已經出現了等待隊列
public final boolean hasQueuedPredecessors() {
Thread first = null; Node h, s;
if ((h = head) != null && ((s = h.next) == null ||
(first = s.waiter) == null ||
s.prev == null))
first = getFirstQueuedThread(); // retry via getFirstQueuedThread
return first != null && first != Thread.currentThread();
}
當未出現 同步隊列/阻塞隊列 ,或者當前線程是隊列的第一個時,執行compareAndSetState(0, acquires),第二次嘗試獲取鎖,如果成功,返回真
否則返回假,執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 嘗試加入隊尾
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node是雙向隊列:阻塞隊列一個節點,是為了保證原子化所以包裝起來的
如果tail尾指針指向的節點不為空,則設置新生成的為尾指針指向的
否則(阻塞隊列為空),調用enq函數
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
// 使用CAS,防止多線程同時創建頭節點,所以本質上還是需要搶入隊順序
tail = head;
// 初始化頭節點,並將尾指針指向頭節點
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
// 判斷t是否為尾節點,如果有線程更快的改掉尾節點,那麼修改失敗,
// 重新進入for循環
t.next = node;
return t;
// 修改成功
}
}
}
}
[!TIP]
這是第三次嘗試獲取鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 獲取node的前一個節點,如果前一個節點是頭節點(當前節點是第一個)
// 執行tryAcquire(arg),執行第三次嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,出隊
setHead(node);// 將node設為頭節點
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果第三次嘗試獲取鎖失敗了,會調用shouldParkAfterFailedAcquire()方法,將node的前一個節點傳入(node一直都是加入的節點)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 確認前面的節點處於SIGNAL狀態,即確認前面的節點會叫醒自己
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// Node裏面僅有一個大於零的狀態,即1取消狀態,也就是説當前任務被取消了
// 持續循環值找到不再取消的節點
pred.next = node;
} else {
// 將前一個節點用CAS轉為Node.SIGNAL狀態-1,返回為false
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這裏插一嘴,Node節點有一些狀態,來體現其的任務狀態,如前面傳入的就是獨佔隊列,
addWaiter(Node.EXCLUSIVE)static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); // 共享隊列 /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; // 獨佔隊列 /** waitStatus value to indicate thread has cancelled */// 取消 static final int CANCELLED = 1; // 已被取消 /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; // 表示next節點已經park,需要被喚醒 /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 共享狀態 static final int PROPAGATE = -3;
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
如果前一個節點的waitState是0,會被CAS轉為-1,然後返回false,進而不會執行parkAndCheckInterrupt(),繼續for的無限循環,這裏有可能出現第四次嘗試
如果前一個節點的waitState是-1,該函數返回一個true,也就可以繼續執行parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
當前線程進入park狀態
至此我們完成了這個的lock過程
unlock方法
unlock()也是公平鎖以及非公平鎖都有的方法,同樣繼承了Sync
public void unlock() {
sync.release(1);
}
Sync的release方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先嚐試tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
如果成功醒過來,該線程依然處於一種park的位置上,即parkAndCheckInterrupt這個方法上,這個方法返回是否被中斷ReentrantLock這個鎖僅獲取中斷信息,而不會做出任何操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
甦醒過來之後,繼續for循環,嘗試獲取鎖,失敗之後會接着park,成功就會獲取鎖,並返回中斷狀態,在acquire中決定自我中斷
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
並將setExclusiveOwnerThread傳入當前線程,返回為真,因此在TryRelease方法裏的Thread.currentThread() != getExclusiveOwnerThread()一定為假,不會拋出異常,並設置free為false,當c也就是資源的state如果是0
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
c如果是0,即沒有線程佔用資源,setExclusiveOwnerThread將鎖的線程設置為空,如果不為0,也就是重入鎖僅僅解鎖一次,c依然存在多個,設置c為新的state值,然會free值(資源鎖的使用情況)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;、
// 如果下一個節點的狀態為取消或者為空,從後向前找最後一個滿足條件的,賦值為s
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// s不為空的話作為下一個被喚醒的節點,嘗試喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
此時,當前節點為頭節點,調用unparkSuccessor()方法,獲取頭節點的下一個節點