博客 / 詳情

返回

併發編程 | Java中AQS的基本實現原理及簡單使用

AbstractQueuedSynchronizer(AQS),是阻塞式鎖和同步器工具的框架。本文將初步介紹Java中AQS的基本原理,並基於AQS實現自定義阻塞式不可重入鎖,以此來演示AQS的使用。下期會以 ReentrantLock 為例,從源碼的層面介紹 AQS 的核心實現 acquire() 方法。

AQS 的目標

  • 提供阻塞式獲取鎖 acquire() 和非阻塞式嘗試獲取鎖 tryAcquire();
  • 提供獲取鎖超時機制;
  • 通過打斷取消機制;
  • 獨佔機制與共享機制;
  • 條件不滿足時的等待機制;

AQS 的設計

  1. 使用 state 表示資源狀態

    • 使用 volatile 配合 cas 保證其修改時的原子性;
    • 使用 32bit int 來維護同步狀態。
  2. 使用先進先出隊列

    • 借鑑 CLH 隊列:無鎖,使用自旋;快速,無阻塞;
    • 頭節點 head 不存儲數據,尾節點 指向隊列最後一個等待線程。
  3. 使用 park & unpark 來調度線程

    • 可以先 unpark() 再 park();
    • unpark & park 針對線程,而不是同步器,控制粒度更為精細;
    • park() 可通過 interrupt() 打斷。
  4. 使用條件變量實現等待、喚醒機制

    • 每個條件變量維護一個條件變量隊列;
    • 線程等待:先將線程從等待隊列中移除,再將線程加入條件變量隊列;
    • 線程喚醒:先將線程從條件變量隊列中移除,再將線程加入等待隊列。

等待隊列和條件變量隊列的維護已由 AQS 實現,AQS 的子類只需定義如何維護 state、如何獲取和釋放鎖,主要需要實現的方法如下

  • tryAcquire():嘗試獲取獨佔鎖;
  • tryRelease():嘗試釋放獨佔鎖;
  • tryAcquireShared():嘗試獲取共享鎖;
  • tryReleaseShared():嘗試釋放共享鎖;
  • isHeldExclusively():判斷當前線程是否持有鎖。

AQS 分為獨佔模式和共享模式,獨佔模式同時只允許一個線程訪問資源,共享模式同時允許多個線程訪問資源。

使用 AQS 的主要併發工具類如下圖所示,主要包括:ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore。

自定義阻塞式不可重入鎖

自定義同步器

自定義獨佔式同步器需要重寫 AQS 的如下方法:

  • tryAcquire():嘗試獲取鎖,約定 state 為 0 表示未加鎖,為 1 表示已加鎖。
  • tryRelease():嘗試釋放鎖
  • isHeldExclusively():判斷當前線程是否持有鎖
  • newCondition():創建條件變量
static class ISync extends AbstractQueuedSynchronizer {

    /**
     * 嘗試獲取鎖
     * @param arg 忽略
     */
    @Override
    protected boolean tryAcquire(int arg) {
        if (
            // 將state的值置為1
            compareAndSetState(0, 1)
        ) {
            // 設置owner為當前線程
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    /**
     * 嘗試釋放鎖
     * @param arg 忽略
     */
    @Override
    protected boolean tryRelease(int arg) {
        // 持有鎖的線程才可以釋放鎖
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        // 注意setExclusiveOwnerThread和setState的執行順序
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    /**
     * 是否持有鎖
     */
    @Override
    protected boolean isHeldExclusively() {
        // 判斷鎖的持有者是否為當前線程
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    /**
     * 創建條件變量
     */
    public Condition newCondition() {
        return new ConditionObject();
    }
}

在 tryRelease() 方法中,注意 setExclusiveOwnerThread() 和 setState() 的執行順序,state 被 volatile 修飾,而 exclusiveOwnerThread 沒有,因此,後設置 state 可保證 exclusiveOwnerThread 的設置對其他線程可見。

private transient Thread exclusiveOwnerThread;
private volatile int state;

自定鎖

使用自定義同步器實現自定義阻塞式不可重入鎖。

class ILock implements Lock {
    
    // 自定義同步器
    private ISync sync = new ISync();

    /**
     * 加鎖
     * 加鎖失敗則進入隊列等待
     */
    @Override
    public void lock() {
        sync.acquire(1);
    }

    /**
     * 加鎖
     * 加鎖失敗則進入隊列等待,在加鎖的過程中可被打斷
     */
    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    /**
     * 嘗試加鎖
     * 加鎖失敗則返回false
     */
    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    /**
     * 嘗試加鎖
     * 指定時間內加鎖失敗則返回false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    /**
     * 解鎖
     */
    @Override
    public void unlock() {
        sync.release(1);
    }

    /**
     * 創建條件變量
     */
    @Override
    @NonNull
    public Condition newCondition() {
        return sync.newCondition();
    }
}

自定義鎖中使用到的 AQS 中的方法

public abstract class AbstractQueuedSynchronizer {
    // 嘗試獲取鎖
    public final void acquire(int arg) {
        // 獲取鎖失敗
        if (!tryAcquire(arg))
            // 加入阻塞隊列
            acquire(null, arg, false, false, false, 0L);
    }
    // 嘗試獲取鎖(可打斷)
    public final void acquireInterruptibly(int arg) throws InterruptedException {
        if (
            // 線程被打斷
            Thread.interrupted() ||
            (
                // 獲取鎖失敗
                !tryAcquire(arg) && 
                // 加入阻塞隊列的過程中出現問題併成功取消獲取鎖
                acquire(null, arg, false, true, false, 0L) < 0)
            )
            throw new InterruptedException();
    }
    // 釋放鎖
    public final boolean release(int arg) {
        // 釋放鎖成功
        if (tryRelease(arg)) {
            // 喚醒阻塞的線程
            signalNext(head);
            return true;
        }
        return false;
    }
}

END

文章文檔:公眾號 字節幺零二四 回覆關鍵字可獲取本文文檔。

如果覺得本文對您有一點點幫助,歡迎點贊、轉發加關注,這會對我有非常大的幫助,咱們下期見!

user avatar deltaf 頭像 tracy_5cb7dfc1f3f67 頭像 91cyz 頭像 saoming_zhang 頭像 u_16213560 頭像 jellyfishmix 頭像
6 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.