动态

详情 返回 返回

源碼解讀 | Java中ReentrantLock的實現原理 - 动态 详情

ReentrantLock 依賴關係如下圖所示

非公平鎖實現原理

ReentrantLock 默認採用非公平鎖。

// ReentrantLock
public ReentrantLock() {
    sync = new NonfairSync();
}

加鎖流程

ReentrantLock 的 lock 方法通過同步器的 lock 方法實現。

// ReentrantLock
public void lock() {
    sync.lock();
}

同步器的 lock 方法會先調用 initialTryLock() 方法,如果失敗,則調用 acquire() 方法。

// Sync extends AbstractQueuedSynchronizer
final void lock() {
    if (!initialTryLock())
        acquire(1);
}
// AbstractQueuedSynchronizer
public final void acquire(int arg) {
    // 嘗試獲取鎖
    if (!tryAcquire(arg))
        // 獲取鎖失敗則加入的等待隊列
        // null, 自定義參數, 非共享鎖, 不可打斷, 無時限, 時限 
        acquire(null, arg, false, false, false, 0L);
}

NonfairSync 的 initialTryLock 分兩步:嘗試獲取鎖、嘗試重入。

NonfairSync 的 tryAcquire 方法被調用前,必定會調用 initialTryLock() 方法檢查鎖是否被當前線程持有,也即,調用 tryAcquire 方法時,鎖必定未被當前線程持有。因此,當未有線程持有鎖時,tryAcquire 才能嘗試獲取鎖。

// NonfairSync extends Sync
static final class NonfairSync extends Sync {
    
    // 初次嘗試獲取鎖
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        // 通過CAS嘗試獲取鎖
        if (compareAndSetState(0, 1)) {
            // 將鎖的持有者設為當前線程
            setExclusiveOwnerThread(current);
            return true;
        } 
        // 嘗試獲取鎖失敗,判斷鎖的持有者是否為當前線程
        else if (getExclusiveOwnerThread() == current) {
            // 鎖重入
            int c = getState() + 1;
            // 整數溢出
            if (c < 0) 
                throw new Error("Maximum lock count exceeded");
            // 設置鎖的狀態:state 表示重入次數
            setState(c);
            return true;
        } else
            return false;
    }

    // 非初次嘗試獲取鎖
    protected final boolean tryAcquire(int acquires) {
        // 未有線程持有鎖時,當前線程嘗試獲取鎖
        if (getState() == 0 && compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

AbstractQueuedSynchronizer 的 acquire 方法負責將線程加入等待隊列,acquire 方法的主要執行步驟如下:

  1. 若非第一個等待線程且前驅存在,則檢查當前節點前驅是否已被取消(前驅線程被取消)

    • 若已被取消,則需要從隊尾開始,往前清理已取消的節點,進入下一輪循環。
    • 若未被取消,檢查當前線程是否已成為第一個線程,若是,則自旋等待,進入下一輪循環
  2. 若是第一個等待線程或者前驅不存在,則嘗試獲取鎖,獲取鎖成功則直接返回 1 表示獲取鎖成功,
  3. 準備將當前線程加入等待隊列

    • 3.1 若等待隊列未創建,則創建等待隊列,進入下一輪循環
    • 3.2 否則,若節點未創建,則創建節點,進入下一輪循環
    • 3.3 否則,若節點信息未設置,則設置節點信息(包括將節點加入隊尾),進入下一輪循環
    • 3.4 否則,若為第一個等待線程且自旋次數不為 0,提示 JVM 當前線程正在忙等,進入下一輪循環
    • 3.5 否則,若節點狀態為 0(默認),則將節點狀態設為 1(WAITING)
    • 3.6 否則,重置自旋自旋次數,當前線程陷入等待,被喚醒後將等待狀態置為 0。
// AbstractQueuedSynchronizer
final int acquire(
    // 嘗試獲取鎖的節點
    Node node, 
    // 自定義參數
    int arg, 
    // 是否為共享鎖
    boolean shared,
    // 是否可打斷
    boolean interruptible, 
    // 是否帶時限
    boolean timed,
    // 時限
    long time
) {
    Thread current = Thread.currentThread();
    byte spins = 0, postSpins = 0;  
    boolean interrupted = false, first = false;
    Node pred = null;               
    
    for (;;) {
        // 檢查前驅是否被取消
        if (
            // 非第一個等待線程
            !first && 
            // 前驅存在
            // 除非本方法被ConditionNode.await()方法調用,否則node初始必為null
            // 也即,此條件在第一輪循環必為false
            (pred = (node == null) ? null : node.prev) != null &&
            // 非第一個等待線程
            !(first = (head == pred))
        ) {
            if (pred.status < 0) {
                // 若前驅被取消,則需要清理隊列中取消的線程
                // 此舉是為確保給當前節點的前驅能喚醒當前節點
                cleanQueue(); 
                continue;
            } else if (pred.prev == null) {
                // 若前驅的前驅為null,説明當前線程是第一個線程
                // !first 和 pred.prev == null 之間,head的值被修改,導致當前線程成為第一個線程
                // 第一個線程自旋等待,以減少線程切換的開銷
                Thread.onSpinWait();
                continue;
            }
        }
        // 嘗試獲取鎖
        if (
            // 第一個線程 或者 前驅不存在
            first || pred == null
        ) {
            boolean acquired;
            // 嘗試獲取鎖
            try {
                if (shared)
                    acquired = (tryAcquireShared(arg) >= 0);
                else
                    acquired = tryAcquire(arg);
            } catch (Throwable ex) {
                cancelAcquire(node, interrupted, false);
                throw ex;
            }
            // 獲取鎖成功
            if (acquired) {
                // 當前線程是第一個線程
                if (first) {
                    node.prev = null;
                    head = node;
                    pred.next = null;
                    node.waiter = null;
                    if (shared)
                        // 共享節點獲取到鎖之後需要喚醒下一個節點
                        signalNextIfShared(node);
                    if (interrupted)
                        // 獨佔節點獲取到鎖之後需要重新設置打斷標記
                        current.interrupt();
                }
                return 1;
            }
        }
        // 嘗試獲取鎖失敗後,準備將線程加入等待隊列
        Node t;
        if (
            // 等待隊列未創建
            (t = tail) == null
        ) { 
            // 創建等待隊列
            if (tryInitializeHead() == null)
                // 創建隊列失敗則調用tryAcquire嘗試獲取鎖,失敗則park(long)等待,重複
                return acquireOnOOME(shared, arg);
        } else if (
            // 節點未創建:創建節點
            node == null
        ) {
            try {
                // 創建節點
                node = (shared) ? new SharedNode() : new ExclusiveNode();
            } catch (OutOfMemoryError oome) {
                // 創建節點失敗則調用tryAcquire嘗試獲取鎖,失敗則park(long)等待,重複
                return acquireOnOOME(shared, arg);
            }
        } else if (
            // 前驅未設置:設置節點信息
            pred == null
        ) {
            // 當前節點線程
            node.waiter = current;
            // 當前節點前驅(節點將由前驅喚醒)
            node.setPrevRelaxed(t);
            // 通過CAS將尾節點置為當前節點
            if (!casTail(t, node))
                // 失敗則進入下一次循環
                node.setPrevRelaxed(null);
            else
                // 將當前節點加入隊尾
                // 此時t仍指向原來的尾節點,當前節點已為實際尾節點
                t.next = node;
        } else if (
            // 第一個節點 自旋次數不為0
            first && spins != 0
        ) {
            // 自旋等待
            --spins;
            Thread.onSpinWait();
        } else if (
            // 當前節點狀態為0(新建節點狀態默認為0)
            node.status == 0
        ) {
            // 當前節點狀態置為等待(WAITING=1)
            node.status = WAITING;  
        } else {
            // 到此處,節點已被創建,節點信息已被設置,節點狀態已被設置
            long nanos;
            // 設置自旋次數
            spins = postSpins = (byte)((postSpins << 1) | 1);
            // 當前線程等待
            if (!timed)
                // 無時限等待
                LockSupport.park(this);
            else if ((nanos = time - System.nanoTime()) > 0L)
                // 有時限等待
                LockSupport.parkNanos(this, nanos);
            else
                // 等待超時
                break;
            // 線程被喚醒後,將狀態設置為0
            node.clearStatus();
            // 被打斷且可被打斷
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    // (等待超時)或者(被打斷且可被打斷)則取消獲取鎖
    return cancelAcquire(node, interrupted, interruptible);
}

線程被喚醒後,再次嘗試獲取鎖,若獲取鎖失敗,不立即陷入等待,而是自旋,再次嘗試獲取鎖,直到自旋次數耗盡。線程每次陷入等待前,會重置自旋次數,自旋次數為 2^k,其中,k 表示當前線程是第 k 陷入等待。注意,陷入等待是指執行 park() 方法,而不是指加入等待隊列,加入等待隊列的操作只會執行一次。

非公平體現在哪?先進先出不是公平的嗎?閲讀非公平同步器的 acquire() 源碼可知,先將嘗試獲取鎖,再將線程加入等待隊列,也即,新來的線程可直接與等待隊列中的線程競爭,準確地説,新來的線程可直接與等待隊列中的第一個等待線程競爭,這導致新來的線程可能比等待隊列中的線程先執行,這是不公平的。

檢查線程是否被打斷使用的是 Thread.interrupted() 方法,Thread.interrupted() 會清除打斷標記,線程再次調用 LockSupport.park() 時仍能生效,Thread.isInterrupted() 不會清除打斷標記,線程再次調用 LockSupport.park() 時無效。

如果線程(等待超時)或者(被打斷且可被打斷),則會取消嘗試獲取鎖 cancelAcquire(),如果因為等待超時而導致取消嘗試獲取鎖,則會保留打斷標記。非公平同步器 NonfairSync 沒有用到 cancelAcquire() 的返回值。

private int cancelAcquire(
    Node node, 
    boolean interrupted,
    boolean interruptible
) {
    // 修改節點信息
    if (node != null) {
        node.waiter = null;
        node.status = CANCELLED;
        // 清理隊列
        if (node.prev != null)
            cleanQueue();
    }
    if (interrupted) {
        if (interruptible)
            // CANCELLED=0x80000000,負數
            return CANCELLED;
        else
            // 重新打斷自己,將打斷標記置為 true
            Thread.currentThread().interrupt();
    }
    return 0;
}

解鎖流程

ReentrantLock 的 unlock 方法通過同步器的 release方法實現。

// ReentrantLock
public void unlock() {
    sync.release(1);
}

同步器的 release 方法會先調用 tryRelease 嘗試釋放鎖,若釋放成功則會喚醒下一個線程。

// AbstractQueuedSynchronizer
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        signalNext(head);
        return true;
    }
    return false;
}

同步器的 tryRelease 方法先修改鎖的持有者(置為 null),再修改鎖的狀態。

// Sync extends AbstractQueuedSynchronizer
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 僅鎖的持有者才可以釋放鎖
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    // free=true 表示現在重入次數為0
    boolean free = (c == 0);
    // 重入次數為0則將線程持有者置為null
    if (free)
        setExclusiveOwnerThread(null);
    // 設置線程狀態
    setState(c);
    return free;
}

同步器中的 signalNext 方法將等待隊列中的第一個線程喚醒。

// AbstractQueuedSynchronizer
private static void signalNext(Node h) {
    Node s;
    if (h != null && (s = h.next) != null && s.status != 0) {
        // 將state設置為 ~WAITING
        s.getAndUnsetStatus(WAITING);
        // 喚醒下一個線程(等待隊列中的第一個等待線程)
        LockSupport.unpark(s.waiter);
    }
}

可重入

閲讀 NonfairSync 的源碼可以發現,線程第一次嘗試獲取鎖時,嘗試將 state 由 0 置為 1 失敗後,會檢查鎖的持有者是否是自己,若是,則累計重入次數,並返回 true,表示獲取鎖成功。

當鎖的持有者為當前線程,當前線程再次嘗試獲取鎖仍能成功,此即可重入。

// NonfairSync extends Sync
static final class NonfairSync extends Sync {
    // 初次嘗試獲取鎖
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        // 通過CAS嘗試獲取鎖
        if (compareAndSetState(0, 1)) {
            // 將鎖的持有者設為當前線程
            setExclusiveOwnerThread(current);
            return true;
        } 
        // 嘗試獲取鎖失敗,判斷鎖的持有者是否為當前線程
        else if (getExclusiveOwnerThread() == current) {
            // 鎖重入
            int c = getState() + 1;
            // 整數溢出
            if (c < 0) 
                throw new Error("Maximum lock count exceeded");
            // 設置鎖的狀態:state 表示重入次數
            setState(c);
            return true;
        } else
            return false;
    }
    ...
}

閲讀 Sync 的源碼可以發現,釋放鎖時,會將重入次數減去 releases(實際為 1),僅當重入次數為 0 時,線程才將鎖的持有者置為 null 並返回 true,表示鎖已成功釋放。

// Sync extends AbstractQueuedSynchronizer
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 僅鎖的持有者才可以釋放鎖
    if (getExclusiveOwnerThread() != Thread.currentThread())
        throw new IllegalMonitorStateException();
    // free=true 表示現在重入次數為0
    boolean free = (c == 0);
    // 重入次數為0則將線程持有者置為null
    if (free)
        setExclusiveOwnerThread(null);
    // 設置線程狀態
    setState(c);
    return free;
}

可打斷

理解一下參數 interruptible

  • interruptible = true,獲取鎖的過程中允許被打斷,立即響應打斷,不再嘗試獲取鎖。
  • interruptible = false,獲取鎖的過程中不允許被打斷,不立即響應打斷,繼續嘗試獲取鎖。
// AbstractQueuedSynchronizer
final int acquire(...) {
    for (;;) {
        ...
        {
            ...
            // 被打斷且可被打斷
            if ((interrupted |= Thread.interrupted()) && interruptible)
                break;
        }
    }
    // (等待超時)或者(被打斷且可被打斷)則取消獲取鎖
    return cancelAcquire(node, interrupted, interruptible);
}

公平鎖的實現原理

通過 ReentrantLock 的有參構造可創建公平鎖。

// ReentrantLock
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

FairSync 和 NonfairSync 的區別即為公平鎖和非公平鎖的區別,主要關注 initialTryLock 方法。

公平鎖在初次嘗試獲取鎖時,僅當鎖未被佔有且等待隊列無等待線程時,當前線程才會立即嘗試獲取鎖,否則將加入等待隊列隊尾。

// ReentrantLock
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    // 初次嘗試獲取鎖
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        int c = getState();
        // 鎖未被佔有
        if (c == 0) {
            // 等待隊列無未取消的等待線程 且 獲取鎖成功
            if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
                // 將鎖的持有者設為當前線程
                setExclusiveOwnerThread(current);
                return true;
            }
        } else if (getExclusiveOwnerThread() == current) {
            // 重入
            if (++c < 0) 
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        }
        return false;
    }

    // 非初次嘗試獲取鎖
    protected final boolean tryAcquire(int acquires) {
        if (
            // 鎖未被佔有
            getState() == 0 && 
            // 等待隊列無未取消的等待線程
            !hasQueuedPredecessors() &&
            // 獲取鎖成功
            compareAndSetState(0, acquires)
        ) {
            // 將鎖的持有者設為當前線程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }
}

hasQueuedThread() 用於判斷等待隊列中是否未取消的等待線程。

// AbstractQueuedSynchronizer
public final boolean hasQueuedThreads() {
    for (Node p = tail, h = head; p != h && p != null; p = p.prev)
        // 存在未取消的線程
        if (p.status >= 0)
            return true;
    return false;
}

條件變量的實現原理

ReentrantLock 通過 newCondition() 方法創建條件變量,而 ReentrantLock 的 newCondition() 方法基於同步器的 newCondition() 方法。

// ReentrantLock
public Condition newCondition() {
    return sync.newCondition();
}

同步器的 newCondition() 方法會創建一個 ConditionObject 對象,即條件變量對象。

// Sync extends AbstractQueuedSynchronizer
final ConditionObject newCondition() {
    return new ConditionObject();
}

每個條件變量維護一個條件變量隊列。

// ReentrantLock
public class ConditionObject implements Condition{
    // 頭節點
    private transient ConditionNode firstWaiter;
    // 尾節點
    private transient ConditionNode lastWaiter;
}

線程的等待和喚醒可粗略理解為

  • 線程等待:先將線程從等待隊列中移除,再將線程加入條件變量隊列
  • 線程喚醒:先將線程從條件變量隊列中移除,再將線程加入等待隊列
  • 條件變量隊列中的線程不能參與鎖的競爭,而等待隊列中的線程可以參與鎖的競爭

加入條件變量隊列的流程

// ReentrantLock
public class ConditionObject implements Condition{
    // 將線程加入條件變量隊列
    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 創建節點,創建失敗則自旋嘗試獲取鎖
        ConditionNode node = newConditionNode();
        // 創建節點失敗
        if (node == null)
            return;
        // 將當前線程加入條件變量隊列
        // 當前線程釋放持有的鎖(會將當前線程從等待隊列溢出)並喚醒等待隊列下一個線程
        // 返回重入次數
        int savedState = enableWait(node);
        // 當前線程等待原因置為this,常用於調試
        LockSupport.setCurrentBlocker(this); // for back-compatibility
        boolean interrupted = false, cancelled = false, rejected = false;
        // !(可以嘗試獲取鎖)
        while (!canReacquire(node)) {
            if (interrupted |= Thread.interrupted()) {
                if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                    break;              // else interrupted after signal
            } else if ((node.status & COND) != 0) {
                // 陷入等待
                try {
                    if (rejected)
                        node.block();
                    else
                        ForkJoinPool.managedBlock(node);
                } catch (RejectedExecutionException ex) {
                    // ForkJoinPool線程池無法接收任務
                    rejected = true;
                } catch (InterruptedException ie) {
                    interrupted = true;
                }
            } else
                Thread.onSpinWait();
        }
        // 當前線程等待原因置為null
        LockSupport.setCurrentBlocker(null);
        // 重新參與競爭
        node.clearStatus();
        acquire(node, savedState, false, false, false, 0L);
        if (interrupted) {
            if (cancelled) {
                unlinkCancelledWaiters(node);
                throw new InterruptedException();
            }
            Thread.currentThread().interrupt();
        }
    }
}
// ConditionObject implements Condition
private ConditionNode newConditionNode() {
    int savedState;
    // 初始化等待隊列頭節點
    if (tryInitializeHead() != null) {
        try {
            return new ConditionNode();
        } catch (OutOfMemoryError oome) {
        }
    }
    // 發生OutOfMemoryError時才會往下執行
    // 當前線程未持有鎖或者釋放鎖失敗
    if (!isHeldExclusively() || !release(savedState = getState()))
        throw new IllegalMonitorStateException();
    // 當前線程暫停一段時間
    U.park(false, OOME_COND_WAIT_DELAY);
    // 自旋嘗試獲取鎖
    acquireOnOOME(false, savedState);
    return null;
}
// ConditionObject implements Condition
private int enableWait(ConditionNode node) {
    // 當前線程持有鎖才可以加入條件變量隊列
    if (isHeldExclusively()) {
        // 設置節點信息
        node.waiter = Thread.currentThread();
        node.setStatusRelaxed(COND | WAITING);
        // 將節點加入條件變量隊列
        ConditionNode last = lastWaiter;
        if (last == null)
            firstWaiter = node;
        else
            last.nextWaiter = node;
        lastWaiter = node;
        // 當前線程的重入次數
        int savedState = getState();
        // 釋放鎖返回重入次數
        if (release(savedState))
            return savedState;
    }
    // 未持有鎖或者釋放鎖失敗則拋出異常
    node.status = CANCELLED;
    throw new IllegalMonitorStateException();
}
// ConditionObject implements Condition
private boolean canReacquire(ConditionNode node) {
    Node p;
    // node和p的鏈接關係沒有被破壞或者node在等待隊列中
    return node != null && 
        (p = node.prev) != null &&
        (p.next == node || isEnqueued(node));
}

移出條件變量隊列的流程

// ConditionObject implements Condition
public final void signal() {
    ConditionNode first = firstWaiter;
    // 持有鎖的線程才能喚醒條件變量隊列中的線程
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    else if (first != null)
        doSignal(first, false);
}
// ConditionObject implements Condition
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        ConditionNode next = first.nextWaiter;
        // 將條件變量隊列中的第一個等待線程移除
        if ((firstWaiter = next) == null)
            lastWaiter = null;
         // 將剛從條件變量隊列中移除的等待線程加入等待隊列
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            // 加入等待隊列
            enqueue(first);
            // 不是喚醒所有線程
            if (!all)
                break;
        }
        // 由於線程被取消等原因可能導致線程無法加入等待隊列
        // 此時嘗試等待隊列的下一個線程
        first = next;
    }
}
// ConditionObject implements Condition
final void enqueue(ConditionNode node) {
    if (node != null) {
        // 是否立即喚醒線程node
        boolean unpark = false;
        for (Node t;;) {
            if ((t = tail) == null && (t = tryInitializeHead()) == null) {
                // 等待隊列為null且初始化頭節點失敗時,應立即喚醒線程node
                unpark = true;
                break;
            }
            // 設置前驅
            node.setPrevRelaxed(t);
            // 通過CAS操作將等待隊列隊尾置為node
            if (casTail(t, node)) {
                t.next = node;
                // 等待隊列隊尾被取消,則喚醒線程node,以清理隊列
                if (t.status < 0)
                    unpark = true;
                break;
            }
        }
        // 立即喚醒線程node
        if (unpark)
            LockSupport.unpark(node.waiter);
    }
}

END

如果覺得本文對您有一點點幫助,歡迎點贊、轉發加關注,這會對我有非常大的幫助,如果有任何問題,歡迎在評論區留言或者後台私信,咱們下期見!

文章文檔:公眾號 字節幺零二四 回覆關鍵字可獲取本文文檔。

user avatar u_16297326 头像 tech 头像 u_15702012 头像 haoqingwanqiandesigua 头像 dl1024 头像 liubo86 头像 lvweifu 头像 yangrd 头像 junyidedalianmao 头像 javaedge 头像 swifter 头像 dreamlu 头像
点赞 26 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.