动态

详情 返回 返回

AQS 核心方法和源碼 - 动态 详情

AQS (AbstractQueuedSynchronizer) 中,這些方法涉及到同步的獲取和排隊機制,它們實現了類似於鎖(Lock)和信號量(Semaphore)的功能。AQS 通過內部維護一個 FIFO 隊列和一些節點來管理線程的同步。下面逐個解釋這些方法的作用:


AQS 核心方法和源碼

1. acquire(int arg)

  • 作用:嘗試獲取同步狀態,如果失敗,則加入隊列並阻塞線程。
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
        selfInterrupt();
    }
}
  • 流程

    1. 調用 tryAcquire(arg) 嘗試獲取鎖。
    2. 如果獲取失敗,則調用 addWaiter() 將線程加入隊列。
    3. 調用 acquireQueued() 進行排隊等待。
    4. 如果等待過程中被中斷,調用 selfInterrupt() 重新設置中斷狀態。

2. tryAcquire(int arg)

  • 作用:嘗試獲取同步狀態。這個方法是抽象方法,子類需要實現。
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
  • 注意:具體的同步工具(如 ReentrantLock)會重寫此方法來定義獲取同步狀態的邏輯。

3. addWaiter(Node.EXCLUSIVE)

  • 作用:將當前線程封裝成一個 Node,並加入等待隊列末尾。
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
  • 流程

    1. 封裝當前線程為一個 Node 節點。
    2. 嘗試將節點加入等待隊列尾部,如果失敗則調用 enq() 進行入隊。

4. enq(Node node)

  • 作用:將節點安全地加入等待隊列末尾(無限重試,直到成功)。
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 隊列未初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  • 流程

    1. 檢查隊列是否存在頭節點,如果不存在,初始化。
    2. 使用 CAS 操作將新節點加入隊列尾部,保證線程安全。

5. acquireQueued(Node node, int arg)

  • 作用:讓線程在隊列中等待,直到獲取到同步狀態。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // 幫助 GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) {
                interrupted = true;
            }
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}
  • 流程

    1. 獲取前驅節點。
    2. 如果前驅節點是頭節點,則嘗試獲取同步狀態。
    3. 如果獲取失敗,調用 shouldParkAfterFailedAcquire() 判斷是否需要掛起線程。
    4. 掛起線程並檢查中斷狀態。

6. release(int arg)

  • 作用:釋放同步狀態,喚醒後繼線程。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0) unparkSuccessor(h);
        return true;
    }
    return false;
}
  • 流程

    1. 調用 tryRelease(arg) 釋放同步狀態。
    2. 如果釋放成功,則喚醒等待隊列中的後繼線程。

7. tryRelease(int arg)

  • 作用:嘗試釋放同步狀態。這個方法是抽象方法,需要子類實現。
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
  • 注意:子類重寫此方法,定義釋放邏輯。

8. unparkSuccessor(Node node)

  • 作用:喚醒等待隊列中的下一個線程。
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
  • 流程

    1. 將當前節點的 waitStatus 重置。
    2. 找到下一個需要喚醒的線程節點。
    3. 使用 LockSupport.unpark() 喚醒線程。

9. shouldParkAfterFailedAcquire()

  • 作用:檢查節點的狀態,決定線程是否需要掛起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

總結核心方法

方法名 作用
acquire(int arg) 獲取同步狀態,失敗則排隊等待。
tryAcquire(int arg) 嘗試獲取同步狀態,子類實現。
addWaiter(Node.EXCLUSIVE) 添加節點到等待隊列末尾。
enq(Node node) 使用 CAS 操作將節點加入隊列末尾。
acquireQueued() 線程在隊列中等待,直到獲取同步狀態。
release(int arg) 釋放同步狀態並喚醒等待線程。
tryRelease(int arg) 嘗試釋放同步狀態,子類實現。
unparkSuccessor(Node node) 喚醒等待隊列中的下一個線程。
shouldParkAfterFailedAcquire() 判斷線程是否需要掛起。

其他核心概念:

  1. Node 類:代表等待隊列中的線程節點。
  2. 狀態管理

    • waitStatus:節點狀態,包括 SIGNAL(等待喚醒)和 CANCELLED 等。
    • headtail:管理等待隊列的頭尾節點。
  3. CAS 操作:確保節點入隊等操作的線程安全。

這些方法共同配合,實現了 AQS 的底層同步控制機制,包括線程的排隊、阻塞、喚醒等功能。

user avatar abai_681266b7f0de8 头像
点赞 1 用户, 点赞了这篇动态!
点赞

Add a new 评论

Some HTML is okay.