博客 / 詳情

返回

分佈式鎖的實現原理

作者:來自 vivo 互聯網服務器團隊- Xu Yaoming

介紹分佈式鎖的實現原理。

一、分佈式鎖概述

分佈式鎖,顧名思義,就是在分佈式環境下使用的鎖。眾所周知,在併發編程中,我們經常需要藉助併發控制工具,如 mutex、synchronized 等,來保障線程安全。但是,這種線程安全僅作用在同一內存環境中。在實際業務中,為了保障服務的可靠性,我們通常會採用多節點進行部署。在這種分佈式情況下,各實例間的內存不共享,線程安全並不能保證併發安全,如下例,同一實例中線程A與線程B之間的併發安全並不能保證實例1與實例2之間的併發安全:

圖片

因此,當遇到分佈式系統的併發安全問題時,我們就可能會需要引入分佈式鎖來解決。  

用於實現分佈式鎖的組件通常都會具備以下的一些特性:

  • 互斥性:提供分佈式環境下的互斥原語來加鎖/釋放鎖,當然是分佈式鎖最基本的特性。 
  • 自動釋放:為了應對分佈式系統中各實例因通信故障導致鎖不能釋放的問題,自動釋放的特性通常也是很有必要的。
  • 分區容錯性:應用在分佈式系統的組件,具備分區容錯性也是一項重要的特性,否則就會成為整個系統的瓶頸。

目前開源社區中常見的分佈式鎖解決方案,大多是基於具備集羣部署能力的 key-value 存儲中間件來實現,最為常用的方案基本上是基於 Redis、zookeeper 來實現,筆者將從上述分佈式鎖的特性出發,介紹一下這兩類的分佈式鎖解決方案的優缺點。

二、分佈式鎖的實現原理

2.1  Redis 實現分佈式鎖  

Redis 由於其高性能、使用及部署便利性,在很多場景下是實現分佈式鎖的首選。首先我們看下 Redis 是如何實現互斥性的。在單機部署的模式下,Redis 由於其單線程處理命令的線程模型,天然的具備互斥能力;而在哨兵/集羣模式下,寫命令也是單獨發送到某個單獨節點上進行處理,可以保證互斥性;其核心的命令是 set [NX](set if ot exist):

SET lockKey lockValue NX

成功設置 lockValue 的實例,就相當於搶鎖成功。但如果持有鎖的實例宕機,因為 Redis 服務端並沒有感知客户端狀態的能力,因此會出現鎖無法釋放的問題:

圖片

這種情況下,就需要給 key 設置一個過期時間 expireTime:

SET lockKey lockValue EX expireTime NX

如果持有鎖的實例宕機無法釋放鎖,則鎖會自動過期,這樣可以就避免鎖無法釋放的問題。在一些簡單的場景下,通過該方式實現的分佈式鎖已經可以滿足需求。但這種方式存在一個明顯問題:如果業務的實際處理時間比鎖過期時間長,鎖就會被誤釋放,導致其他實例也可以加鎖:

圖片

這種情況下,就需要通過其他機制來保證鎖在業務處理結束後再釋放,一個常用的方式就是通過後台線程的方式來實現鎖的自動續期。

圖片

Redssion 是開源社區中比較受歡迎的一個 Java 語言實現的 Redis 客户端,其對 Java 中 Lock 接口定義進行擴展,實現了 Redis 分佈式鎖,並通過 watchDog 機制(本質上即是後台線程運作)來對鎖進行自動續期。以下是一個簡單的 Reddison 分佈式鎖的使用例子:

RLock rLock = RedissonClient.getLock("test-lock");
try {
    if (rLock.tryLock()) {
        // do something
    }
} finally {
    rLock.unlock();
}

Redssion 的默認實現 RedissonLock 為可重入互斥非公平鎖,其 tryLock 方法會基於三個可選參數執行:

  • waitTime(獲取鎖的最長等待時長):默認為-1,waitTime 參數決定在獲取鎖的過程中是否需要進行等待,如果 waitTime>0,則在獲取鎖的過程中線程會等待一定時間並持續嘗試獲取鎖,否則獲取鎖失敗會直接返回。
  • leaseTime(鎖持有時長):默認為-1。當 leaseTime<=0 時,會開啓 watchDog 機制進行自動續期,而 leaseTime>0 時則不會進行自動續期,到達 leaseTime 鎖即過期釋放
  • unit(時間單位):標識 waitTime 及 leaseTime 的時間單位

我們不妨通過參數最全的 RedissonLock#tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法源碼來一探其完整的加鎖過程:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    ...
    // tryAcquire方法返回鎖的剩餘有效時長ttl,如果未上鎖,則為null
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    if (ttl == null) {
        // 獲取鎖成功
        return true;
    }
     
    // 計算剩餘等待時長,剩餘等待時長小於0,則不再嘗試獲取鎖,獲取鎖失敗,後續有多處同樣的判斷邏輯,將精簡省略
   time -= System.currentTimeMillis() - current;
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
     
    // 等待時長大於0,則會對鎖釋放的事件進行訂閲,持有鎖的客户端在鎖釋放時會發布鎖釋放事件通知其他客户端搶鎖,由此可得知該默認實現為非公平鎖。
    // Redisson對Redis發佈訂閲機制的實現,底層大量使用了CompletableFuture、CompletionStage等接口來編寫異步回調代碼,感興趣的讀者可以詳細瞭解,此處不作展開
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
        ...
    } catch (ExecutionException e) {
        ...
    }
 
    try {
        ...
        // 循環嘗試獲取鎖
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }
            ...
            // 此處通過信號量來將線程阻塞一定時間,避免無效的申請鎖浪費資源;在阻塞期間,如果收到了鎖釋放的事件,則會通過信號量提前喚起阻塞線程,重新嘗試獲取鎖;
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                // 若ttl(鎖過期時長)小於time(剩餘等待時長),則將線程阻塞ttl
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                // 若等待時長小於ttl,則將線程阻塞time
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            ...
        }
    } finally {
        // 取消訂閲
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
}

上述代碼邏輯主要集中在處理 waitTime 參數,在併發競爭不激烈、可以容忍一定的等待時間的情況下,合理設置 waitTime 參數可以提高業務併發運行成功率,避免搶鎖失敗直接返回錯誤;但在併發競爭激烈、對性能有較高要求時,建議不設置 waitTime,或者直接使用沒有 waitTime 參數的 lock() 方法,通過快速失敗來提高系統吞吐量。

一個比較值得注意的點是,如果設置了 waitTime 參數,則 Redisson 通過將 RedissonLockEntry 中信號量(Semaphore)的許可證數初始化為0來達到一定程度的限流,保證鎖釋放後只有一個等待中的線程會被喚醒去請求 Redis 服務端,把喚醒等待線程的工作分攤到各個客户端實例上,可以很大程度上緩解非公平鎖給 Redis 服務端帶來的驚羣效應壓力。

public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
    ...
    private final Semaphore latch;
 
    public RedissonLockEntry(CompletableFuture<RedissonLockEntry> promise) {
        super();
        //  RedissonLockEntry 中的Semaphore的許可證數初始化為0
        this.latch = new Semaphore(0);
        this.promise = promise;
    }
    ...
}

獲取鎖的核心邏輯,會通過 RedissonLock#tryAcquire 方法調用到 RedissonLock#tryAcquireAsync 方法。

private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        // 若leaseTime大於零,會設置鎖的租期為leaseTime
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 若leaseTime小於或等於零,會設置鎖的租期為internalLockLeaseTime,這是一個通過lockWatchdogTimeout配置的值,默認為30s
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
 
    // 此處的handleNoSync方法是為了解決Redis發生故障轉移,集羣拓撲改變後,只有持有鎖的客户端能再次獲得鎖的bug,為3.20.1版本修復,詳見Redisson issue#4822
    CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);
    ttlRemainingFuture = new CompletableFutureWrapper<>(s);
 
    // 根據加鎖情況來進行後續處理
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // lock acquired
        // 若ttl為空,説明加鎖不成功
        if (ttlRemaining == null) {
            if (leaseTime > 0) {
                // 若leaseTime>0,則將internalLockLeaseTime變量設置為leaseTime,以便後續解鎖使用
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 若leaseTime<=0,則開啓看門狗機制,通過定時任務進行鎖續期
                scheduleExpirationRenewal(threadId);
            }
        }
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}
 
// 加鎖的lua腳本
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if ((Redis.call('exists', KEYS[1]) == 0) " +
                        "or (Redis.call('hexists', KEYS[1], ARGV[2]) == 1)) then " +
                    "Redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "Redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return Redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

可以看到,若 leaseTime 大於0,則不會開啓看門狗機制,鎖在過期後即失效,在使用時請務必留意。上述代碼中執行的 scheduleExpirationRenewal 方法即為看門狗機制的實現邏輯:

protected void scheduleExpirationRenewal(long threadId) {
    // 每個鎖都會對應一個ExpirationEntry類,第一次加鎖時不存在oldEntry
    ExpirationEntry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 非首次加鎖,重入計數,不作其他操作
        oldEntry.addThreadId(threadId);
    } else {
        // 首次加鎖,調用renewExpiration()方法進行自動續期
        entry.addThreadId(threadId);
        try {
            renewExpiration();
        } finally {
            // 若當前線程被中斷,則取消對鎖的自動續期。
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}
 
private void renewExpiration() {
    ...
    // 此處使用的是netty的時間輪來執行定時續期,此處不對時間輪做展開,感興趣的讀者可詳細瞭解
    Timeout task = getServiceManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ...
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                 
                if (res) {
                    // 若續期成功,則遞歸調用,等待任務的下一次執行
                    renewExpiration();
                } else {
                    // 若續期結果為false,説明鎖已經過期了,或鎖易主了,則清理當前線程關聯的信息,等待線程結束
                    cancelExpirationRenewal(null);
                }
            });
        }
        // 時間輪的執行週期為internalLockLeaseTime / 3,即默認情況下,internalLockLeaseTime為30s時,每10s觸發一次自動續期
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
     
    ee.setTimeout(task);
}
 
protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
    // 執行重置過期時間的lua腳本
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (Redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "Redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
}

上面一段代碼即是看門狗調度的核心代碼,本質上即是通過定時調度線程執行 lua 腳本來進行鎖續期。值得留意的是 scheduleExpirationRenewal 

方法中的 ExpirationEntry,該對象與鎖一一關聯,會存儲嘗試獲取該鎖的線程(無論是否獲取成功)以及重入鎖的次數,在鎖失效/鎖釋放時,會根據該對象中存儲的線程逐一進行資源釋放操作,以保證資源的正確釋放。

最後,對上述 Redisson 可重入非公平鎖源碼進行一下總結:

  • Redisson 加鎖時,根據 waitTime 參數是否大於0來決定加鎖失敗時採用等待並再次嘗試/快速失敗的策略;
  • Redisson 加鎖時根據 leaseTime 參數是否小於等於0來決定是否開啓看門狗機制進行定時續期;
  • Redisson 底層使用了 netty 實現的時間輪來進行定時續期任務的調度,執行週期為 internalLockLeaseTime / 3,默認為10s。

2.2 zookeeper 實現分佈式鎖

zookeeper(後文均簡稱 zk )基於 zab 協議實現的分佈式協調服務,天生具備實現分佈式鎖的基礎條件。我們可以從zk的一些基本機制入手,瞭解其是如何實現分佈式鎖的。

  • zab:為了保證分佈式一致性,zk 實現了 zab(Zk Atomic Broadcast,zk 原子廣播)協議,在 zab 協議下,zk集羣分為 Leader 節點及  Follower 節點,其中,負責處理寫請求的 Leader 節點在集羣中是唯一的,多個 Follower 則負責同步 Leader 節點的數據,處理客户端的讀請求。同時,zk 處理寫請求時底層數據存儲使用的是 ConcurrentHashMap,以保證併發安全;
public class NodeHashMapImpl implements NodeHashMap {
 
    private final ConcurrentHashMap<String, DataNode> nodes;
    private final boolean digestEnabled;
    private final DigestCalculator digestCalculator;
    private final AdHash hash;
     
    ...
 
}
  • 臨時順序節點:zk 的數據呈樹狀結構,樹上的每一個節點為一個基本數據單元,稱為 Znode。zk 可以創建一類臨時順序(EPHEMERAL_SEQUENTIAL)節點,在滿足一定條件時會可以自動釋放;同時,同一層級的節點名稱會按節點的創建順序進行命名,第一個節點為xxx-0000000000,第二個節點則為xxx-0000000001,以此類推;

圖片

  • session:zk 的服務端與客户端使用 session 機制進行通信,簡單來説即是通過長連接來進行交互,zk 服務端會通過心跳來監控客户端是否處於活動狀態。若客户端長期無心跳或斷開連接,則 zk 服務端會定期關閉這些 session,主動斷開與客户端的通信。

瞭解了上述 zk 特點,我們不難發現 zk 也是具備互斥性、自動釋放的特性的。同時,zk 由於 session 機制的存在,服務端可以感知到客户端的狀態,因此不需要有由客户端來進行節點續期,zk 服務端可以主動地清理失聯客户端創建的節點,避免鎖無法釋放的問題。zk 實現分佈式鎖的主要步驟如下:

  1. client1 申請加鎖,創建 /lock/xxx-lock-0000000000節點(臨時順序節點),並監聽其父節點 /lock;
  2. client1 查詢 /lock 節點下的節點列表,並判斷自己創建的 /xxx-lock-0000000000 是否為 /lock 節點下的第一個節點;當前沒有其他客户端加鎖,所以 client1 獲取鎖成功;
  3. 若 client2 此時來加鎖,則會創建 /lock/xxx-lock-0000000001 節點;此時 client2 查詢 /lock 節點下的節點列表,此時 /xxx-lock-0000000001 並非 /lock 下的第一個節點,因此加鎖不成功,此時 client2 則會監聽其上一個節點 /xxx-lock-0000000000;
  4. client1 釋放鎖,client1 刪除 /xxx-lock-0000000000 節點,zk 服務端通過長連接 session 通知監聽了 /xxx-lock-0000000000 節點的 client2 來獲取鎖
  5. 收到釋放事件的 client2 查詢 /lock 節點下的節點列表,此時自己創建的 /xxx-lock-0000000001 為最小節點,因此獲取鎖成功。

圖片

圖片

圖片

圖片

上述是 zk 公平鎖的一種常見實現方式。值得注意的是, zk 客户端通常並不會實現非公平鎖。事實上,zk 上鎖的粒度不侷限於上述步驟中的客户端,zk 客户端每次獲取鎖請求(即每一個嘗試獲取鎖的線程)都會向 zk 服務端請求創建一個臨時順序節點。

以上述步驟為例,如果需要實現非公平鎖,則會導致其餘的所有節點都需要監聽第一個節點 /xxx-lock-0000000000 的釋放事件,相當於所有等待鎖釋放的線程都會監聽同一個節點,這種機制無法像 Redisson 一樣把喚醒鎖的壓力分攤到客户端上(或者説實現起來比較困難),會產生比較嚴重的驚羣效應,因此使用 zk 實現的分佈式鎖一般情況下都是公平鎖。

Curator 是一個比較常用的 zk 客户端,我們可以通過 Curator 的加鎖過程,來了解 zk 分佈式鎖的設計原理。Curator 中比較常用的是可重入互斥公平鎖 InterProcessMutex:

InterProcessMutex mutex = new InterProcessMutex(zkClient, "/lock");
try {
    // acquire方法的兩個參數:等待時長及時間單位
    if (mutex.acquire(3, TimeUnit.SECONDS)) {
        log.info("加鎖成功");
    } else {
        log.info("加鎖失敗");
    }
} finally {
    mutex.release();
}

InterProcessMutex 同樣提供了等待時長參數,用於設置沒有立即獲取到鎖時是快速失敗還是阻塞等待,下一步,方法會調用到 InterProcessMutex#internalLock 方法中:

private boolean internalLock(long time, TimeUnit unit) throws Exception
{
    // 註釋的意思:一個LockData對象只會被一個持有鎖的線程進行修改,因此不需要對LockData進行併發控制。如此説明的原因是zk的互斥特性保證了下方attemptLock方法的互斥,由此保證了LockData不會被併發修改
    /*
        Note on concurrency: a given lockData instance
        can be only acted on by a single thread so locking isn't necessary
    */
 
    Thread currentThread = Thread.currentThread();
     
    // LockData用於記錄當前持有鎖的線程數據
    LockData lockData = threadData.get(currentThread);
    if ( lockData != null )
    {
        // 線程不為空,則進行重入,重入次數+1
        // re-entering
        lockData.lockCount.incrementAndGet();
        return true;
    }
     
    // 向zk服務獲取分佈式鎖,getLockNodeBytes
    String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
    if ( lockPath != null )
    {
        // 若lockPath不為空,則獲取鎖成功,記錄當前持有鎖的線程
        LockData newLockData = new LockData(currentThread, lockPath);
        threadData.put(currentThread, newLockData);
        return true;
    }
 
    return false;
}

InterProcessMutex#internalLock會調用到 LockInternals#attemptLock 方法:

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
    ...
    while ( !isDone )
    {
        isDone = true;
 
        try
        {
            // 創建鎖節點
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            // 判斷是否成功獲取鎖
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        catch ( KeeperException.NoNodeException e )
        {
            // 捕獲由於網絡中斷、session過期等原因導致的無法獲得節點異常,此處根據配置的zk客户端重試策略決定是否重試,默認重試策略為Exponential Backoff
            ...retry or not...
        }
    }
 
    if ( hasTheLock )
    {
        return ourPath;
    }
 
    return null;
}
 
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception
{
    String ourPath;
    if ( lockNodeBytes != null )
    {  
        // 在其他類型的鎖實現中,lockNodeBytes可能不為空,則根據lockNodeBytes來獲取節點路徑,此處暫不作展開
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
    }
    else
    {
        // 在可重入互斥鎖中,客户端向zk服務端請求創建一個 EPHEMERAL_SEQUENTIAL 臨時順序節點
        ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
    }
    return ourPath;
}

上述代碼中,創建鎖節點並不會產生互斥,而是會直接向 zk 服務端請求創建臨時順序節點。此時,客户端還未真正的獲得鎖,判斷加鎖成功的核心邏輯在 LockInternals#internalLockLoop 方法中:

private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
    boolean     haveTheLock = false;
    boolean     doDelete = false;
    try
    {
        if ( revocable.get() != null )
        {  
            // curator鎖撤銷機制,通過實現Curator中的Revocable接口的makeRevocable方法,可以將鎖設置為可撤銷鎖,其他線程可以在符合條件時將鎖撤銷,此處暫不涉及
            client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
        }
         
        // 客户端實例就緒,則嘗試循環獲取鎖
        while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock ) 
        {
            // 獲取當前父節點下的排好序的子節點
            List<String>        children = getSortedChildren();
            // 得到當前節點名
            String              sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            // 根據 children 列表與當前節點名,計算當前節點是否為第一個節點,若不是第一個節點,則在 PredicateResults中返回需要監聽的前一個節點節點,若為最小節點,則獲取鎖成功
            PredicateResults    predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if ( predicateResults.getsTheLock() )
            {
                // 獲取鎖成功
                haveTheLock = true;
            }
            else
            {
                // 拼接前一個節點的節點路徑
                String  previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                 
                synchronized(this)
                {
                    try
                    {
                        // 將前一個節點的監聽器放到當前客户端中,當前一個節點被釋放時,就會喚醒當前客户端
                        client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                        if ( millisToWait != null )
                        {
                            millisToWait -= (System.currentTimeMillis() - startMillis);
                            startMillis = System.currentTimeMillis();
                            // 計算剩餘等待時長,若等待時長小於0,則不再嘗試獲取鎖,並標記當前線程創建的節點需要刪除
                            if ( millisToWait <= 0 )
                            {
                                doDelete = true;    // timed out - delete our node
                                break;
                            }
                            // 若等待時長大於0,則阻塞線程,等待鎖釋放
                            wait(millisToWait);
                        }
                        else
                        {
                            // 在其他的一些加鎖場景中,默認會持久等待到鎖釋放位置,當前可重入互斥鎖暫不涉及
                            wait();
                        }
                    }
                    catch ( KeeperException.NoNodeException e )
                    {
                        // it has been deleted (i.e. lock released). Try to acquire again
                    }
                }
            }
        }
    }
    catch ( Exception e )
    {
        ThreadUtils.checkInterrupted(e);
        doDelete = true;
        throw e;
    }
    finally
    {
        if ( doDelete )
        {
            // 刪除當前節點
            deleteOurPath(ourPath);
        }
    }
    return haveTheLock;
}
 
private synchronized void notifyFromWatcher()
{
    // 當zk客户端收到鎖釋放事件時,會遍歷當前客户端註冊過的所有的監聽器,並找到合適的監聽器進行回調,最終通過notifyAll喚醒監聽被釋放節點的線程
    notifyAll();
}

上述 curator 加鎖的核心代碼雖然比較長,但整體邏輯與我們前面分析過的加鎖邏輯是一致的,主要做了三件事:

  • 獲取當前父節點的有序子節點序列;
  • 判斷當前節點是否為第一個節點;
  • 若為第一個節點,則獲取鎖成功,否則為當前 zk 客户端增加一個前一節點的監聽器,如果此時還在等待時長內,則使用wait方法掛起線程,否則刪除當前節點。

三、總結——如何選擇合適的分佈式併發安全解決方案?

  • 繞不過的 CAP 理論

Redis 與 zk 由於客户端與服務端的交互機制上存在比較大的差異,相應的分佈式鎖實現原理也有所不同。兩者都是優秀的支持分佈式部署的系統,自然具備分區容錯性,但分佈式系統總繞不過去一個經典的問題——CAP理論:在滿足了分區容錯性的前提下,分佈式系統只能滿足可用性、數據一致性兩者其一。

圖片

對比之下,Redis 在可用性上更勝一籌,屬於 AP 系統;zk 具備更強的數據一致性,屬於 CP 系統,而基於 AP、CP 的特性去實現的分佈式鎖,自然也會存在不同程度的問題。

  • Redis 分佈式鎖的一致性問題

Redis 的集羣模式並沒有嚴格地實現分佈式共識算法,因此 Redis 是不具備一致性的。為了保證高可用性,Redis 集羣的主從節點使用的是異步複製,從節點並不保證與主節點數據一致,只能儘量的追趕主節點的最新數據;因此,當主節點發生故障,進行主從切換時,實際上有可能會發生數據丟失問題:

圖片

  • zk 性能及可用性問題

zk 實現了 zab 算法,在數據一致性上給出了比較可靠的方案,但是由於 zab 協議的兩階段提交要求所有節點的寫請求處理就緒後,才算寫入成功,這無疑會導致性能的下降。此外,在zk集羣發生 leader 重選舉的過程中,對外會表現為不可用狀態,此時可用性上就會存在問題:

圖片

由上可知,分佈式併發安全解決方案並不存在完美的“銀彈”,因此更多時候我們應當根據自身業務情況,合理地選擇合適的解決方案。

顯而易見地,如果業務場景有較高的請求量,併發競爭比較激烈,對性能有較高要求,此時通過 Redis 來實現分佈式鎖會是比較合適的方案。但是如果業務場景對數據一致性要求比較高,或是系統交互鏈路比較長,一但發生數據不一致時,會導致系統出現難以恢復的問題時,採用zk來實現分佈式鎖則是更優的解決方案。

  • 上述方案都無法滿足要求?

總體上看,Redis 由於其本身的高性能可以滿足大多數場景下的性能要求,而 zk 則保證了較高數據一致性。但倘若遇到了既要求高性能、又要求數據一致性、還要引入鎖機制來保障併發安全的場景,這時候就必須重新審視系統設計是否合理了,畢竟高併發與鎖是一對矛盾,可用性與數據一致性是一對矛盾,我們應該通過良好的方案、系統設計,來避免讓我們的系統陷入這些矛盾的困境中。

user avatar huishou 頭像 u_16213560 頭像 lab4ai 頭像 haoqingwanqiandehongcha 頭像 iot_full_stack 頭像 beiyinglunkuo 頭像 birenxuemou 頭像
7 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.