博客 / 詳情

返回

基於AQS實現的ReentrantLock

基於AQS實現的ReentrantLock

這裏的源碼用的Java8版本

lock方法

當ReentrantLock類的實例對象嘗試獲取鎖的時候,調用lock方法,

image

會進入sync的lock方法,其中Sync是ReentrantLock的一個內部類,ReentrantLock構造方法會默認使用非公平鎖NonfairSync,這個類是繼承於Sync的

        final void lock() {
            if (!initialTryLock())
                acquire(1);
        }
// 其中Sync的initialTryLock是抽象方法,需要看非公平鎖實現方法

[!TIP]
在這裏是第一次嘗試獲取鎖

由於ReentrantLock是個可重入鎖,判斷裏有重入的判斷

final boolean initialTryLock() {
            Thread current = Thread.currentThread();
			// 獲取當前線程的對象
            if (compareAndSetState(0, 1)) { // first attempt is unguarded
			// 用CAS比較state狀態是否為0(無人持有鎖),如果是,就轉為1(獲取到鎖)
                setExclusiveOwnerThread(current);
			// 將當前進程設置為擁有鎖的線程
                return true;
            } else if (getExclusiveOwnerThread() == current) {
			// 當前線程為擁有鎖的線程(重複獲取),重入
                int c = getState() + 1;
                if (c < 0) // overflow
			// 負數,state是個int類型數據,超出可能導致溢出變為負數
                    throw new Error("Maximum lock count exceeded");
                setState(c);
			// 設置新的state
                return true;
            } else
			// 已有線程佔鎖,返回為false
                return false;
        }

然後開始調用acquire方法,傳入1

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

調用tryAcquire()方法,其中tryAcquire()方法是一個只有拋出異常的方法,需要重寫,我們看非公平鎖的寫法

[!TIP]
這是第二次獲取鎖

        protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && !hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

這裏,如果state是0,即沒有線程佔用鎖的情況下getState() == 0​這個為真!hasQueuedPredecessors()執行這個方法,這個方法會檢查是否已經出現了等待隊列

    public final boolean hasQueuedPredecessors() {
        Thread first = null; Node h, s;
        if ((h = head) != null && ((s = h.next) == null ||
                                   (first = s.waiter) == null ||
                                   s.prev == null))
            first = getFirstQueuedThread(); // retry via getFirstQueuedThread
        return first != null && first != Thread.currentThread();
    }

當未出現 同步隊列/阻塞隊列 ,或者當前線程是隊列的第一個時,執行compareAndSetState(0, acquires),第二次嘗試獲取鎖,如果成功,返回真

否則返回假,執行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
			// 嘗試加入隊尾
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

Node是雙向隊列:阻塞隊列一個節點,是為了保證原子化所以包裝起來的

如果tail尾指針指向的節點不為空,則設置新生成的為尾指針指向的

否則(阻塞隊列為空),調用enq函數

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
			// 使用CAS,防止多線程同時創建頭節點,所以本質上還是需要搶入隊順序
                    tail = head;
			// 初始化頭節點,並將尾指針指向頭節點
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
			// 判斷t是否為尾節點,如果有線程更快的改掉尾節點,那麼修改失敗,
			// 重新進入for循環
                    t.next = node;
                    return t;
			// 修改成功
                }
            }
        }
    }

[!TIP]
這是第三次嘗試獲取鎖

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
			// 獲取node的前一個節點,如果前一個節點是頭節點(當前節點是第一個)
			// 執行tryAcquire(arg),執行第三次嘗試獲取鎖
                if (p == head && tryAcquire(arg)) {
			// 獲取鎖成功,出隊
                    setHead(node);// 將node設為頭節點
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

如果第三次嘗試獲取鎖失敗了,會調用shouldParkAfterFailedAcquire()方法,將node的前一個節點傳入(node一直都是加入的節點)

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
		// 確認前面的節點處於SIGNAL狀態,即確認前面的節點會叫醒自己
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
			// Node裏面僅有一個大於零的狀態,即1取消狀態,也就是説當前任務被取消了
			// 持續循環值找到不再取消的節點
            pred.next = node;
        } else {
		// 將前一個節點用CAS轉為Node.SIGNAL狀態-1,返回為false
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

這裏插一嘴,Node節點有一些狀態,來體現其的任務狀態,如前面傳入的就是獨佔隊列,addWaiter(Node.EXCLUSIVE)

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
		// 共享隊列
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;
		// 獨佔隊列
        /** waitStatus value to indicate thread has cancelled */// 取消
        static final int CANCELLED =  1;
		// 已被取消
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
		// 表示next節點已經park,需要被喚醒
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
		// 共享狀態
        static final int PROPAGATE = -3;
if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;

如果前一個節點的waitState是0,會被CAS轉為-1,然後返回false,進而不會執行parkAndCheckInterrupt(),繼續for的無限循環,這裏有可能出現第四次嘗試

如果前一個節點的waitState是-1,該函數返回一個true,也就可以繼續執行parkAndCheckInterrupt()

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

當前線程進入park狀態

至此我們完成了這個的lock過程

unlock方法

unlock()也是公平鎖以及非公平鎖都有的方法,同樣繼承了Sync

    public void unlock() {
        sync.release(1);
    }

Sync的release方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

首先嚐試tryRelease方法

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

如果成功醒過來,該線程依然處於一種park的位置上,即parkAndCheckInterrupt這個方法上,這個方法返回是否被中斷ReentrantLock這個鎖僅獲取中斷信息,而不會做出任何操作

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

甦醒過來之後,繼續for循環,嘗試獲取鎖,失敗之後會接着park,成功就會獲取鎖,並返回中斷狀態,在acquire中決定自我中斷

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

並將setExclusiveOwnerThread傳入當前線程,返回為真,因此在TryRelease方法裏的Thread.currentThread() != getExclusiveOwnerThread()一定為假,不會拋出異常,並設置free為false,當c也就是資源的state如果是0

			if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;

c如果是0,即沒有線程佔用資源,setExclusiveOwnerThread將鎖的線程設置為空,如果不為0,也就是重入鎖僅僅解鎖一次,c依然存在多個,設置c為新的state值,然會free值(資源鎖的使用情況)

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;、
		// 如果下一個節點的狀態為取消或者為空,從後向前找最後一個滿足條件的,賦值為s
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
		// s不為空的話作為下一個被喚醒的節點,嘗試喚醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

此時,當前節點為頭節點,調用unparkSuccessor()方法,獲取頭節點的下一個節點

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.