博客 / 詳情

返回

Etcd基本使用

1. 寫入

1.1. etcd一致性

etcd 提供了強一致性(Strong Consistency)。所有寫操作都必須通過當前的領導者節點(Leader),並在多數追隨者節點(Followers)確認後才會被提交。

1、工作流程
  1. 客户端將寫請求發送到領導者節點。
  2. 領導者節點將寫操作記錄到自己的日誌,然後將日誌條目複製到所有追隨者節點。
  3. 當多數節點確認收到日誌條目後,領導者節點將其標記為已提交,並應用到狀態機。
  4. 領導者節點向客户端返回成功確認。

Raft 算法通過以下機制來處理這些異常:

  • 領導者故障:如果領導者節點失效,集羣中的其他節點會發起新的領導者選舉,選出新的領導者節點。新的領導者節點會從已提交的日誌條目中恢復一致性狀態。
  • 追隨者故障:如果某個追隨者節點失效,領導者節點仍然可以在大多數節點確認寫操作後進行提交。一旦追隨者節點恢復,它會從領導者節點同步最新的日誌條目,恢復一致狀態。

因此,當客户端收到寫入成功的消息時,表示leader節點及大多數節點都已成功寫入,且提交到狀態機了。就算還有其他少量節點還沒有寫入,etcd後續也會保證它們會被同步到。

2、優缺點

優點:提供強一致性保證,即所有節點最終會收斂到相同的狀態。適用於對數據一致性要求高的場景。

缺點:在高負載或網絡分區情況下,可能會出現寫操作延遲。

1.2. etcd寫入順序

由於所有寫請求都必須經過 Leader 節點,並且 Leader 節點會將日誌條目按順序添加到自己的日誌中,這就保證了寫請求的順序性。

雖然這些寫請求在 Leader 節點上的處理和複製過程是並行的,但各個 Follower 節點也是按照 Leader 節點寫入的順序提交日誌條目。這是 Raft 共識算法的一部分,確保了整個集羣中的所有節點都保持相同的一致性和操作順序

Raft 算法中的日誌複製和提交流程:

  • Leader 寫入日誌

    • Leader 節點接收到寫請求後,會將請求轉換為日誌條目,並將其添加到本地的 Raft 日誌中。
    • 這些日誌條目在 Leader 節點上是按順序寫入的,形成一個有序的日誌條目列表。
  • 日誌條目的複製

    • Leader 節點將新日誌條目並行地複製到所有 Follower 節點。
    • Follower 節點接收到這些日誌條目後,會將它們添加到本地日誌中,保持與 Leader 節點相同的順序。
  • 日誌條目的提交

    • 當 Leader 節點和多數 Follower 節點都確認接收到並持久化了某個日誌條目後,Leader 節點會將這個日誌條目標記為已提交。
    • 提交是按順序進行的,即 Leader 節點只有在之前的日誌條目提交後,才能提交後續的日誌條目。
  • 應用到狀態機

    • 提交日誌條目後,Leader 和 Follower 節點會按照日誌中的順序將這些操作應用到各自的狀態機中。

因此,可以説etcd的寫入是嚴格按照順序寫入的。

1.3. 對比eureka

Eureka 主要採用以高可用性和最終一致性為目標的設計,而不是強一致性。

1、工作流程

寫請求處理:

  1. 客户端將寫請求(如服務註冊或續約)發送到一個 Eureka 服務器節點。
  2. 該節點立即處理請求並將信息更新到本地註冊表,然後立即向客户端返回成功確認。
  3. 隨後,該節點以異步的方式將更新傳播給其他 Eureka 服務器節點。

異步複製機制:

  • Eureka 採用異步複製機制,即寫操作先在收到請求的節點上完成,然後通過後台線程將更新同步到集羣中的其他節點。
  • 這種設計確保了高可用性和快速響應,但在網絡分區或節點故障的情況下,可能會導致短暫的數據不一致。
2、優點

對於服務發現和註冊場景,這種最終一致性通常是可接受的,因為服務實例的變化(如上線或下線)不需要立即全局可見。

由於沒有主節點,任何 Eureka 服務器節點都可以處理寫請求,提高了系統的可用性。如果一個節點失效,其他節點仍然可以處理請求,服務註冊和發現不會中斷。

3、缺點

由於異步複製機制,Eureka 提供的是最終一致性。這意味着系統在一段時間後會達到一致狀態,但在短時間內,可能會存在數據不一致的情況。即客户端收到寫入成功的返回時,實際上可能只在一個節點上寫入成功,如果此時查詢的請求打入其他節點,就查詢不到。

另外,因為每個節點都可以先寫入,而且沒有多數節點都確認。當多個節點同時修改某個配置時,各自節點都寫入成功了,但在節點之間日誌同步時,就會出現衝突。

4、寫入順序

由於eureka每個節點都可以先寫入,然後再異步同步給其他節點,所以沒辦法保證順序。

2. 讀取

etcd 提供了多種讀取方式,主要包括線性一致性讀取、順序一致性讀取和最終一致性讀取。下面將詳細講解這幾種讀取方式的讀取流程。

2.1. 線性一致性讀取(Linearizable Read)

1、流程
  • 發送請求到 Leader 節點:客户端將讀取請求發送到集羣中的 Leader 節點。
  • Leader 處理讀取請求

    • Leader 節點將讀取請求作為一個空寫操作(no-op)寫入 Raft 日誌,確保讀取操作在所有先前的寫操作之後。
    • Leader 節點等待這個空寫操作被多數節點(包括自己)確認和提交。
  • 讀取最新數據:當空寫操作被提交後,Leader 節點執行讀取操作,確保返回的數據是最新的。
  • 返回結果給客户端:Leader 節點將讀取結果返回給客户端。
2、特點
  • 強一致性:確保讀取的數據是最新的,反映了所有先前的寫操作。
  • 較高的延遲:需要與多數節點通信並確認空寫操作,因此延遲較高。

2.2. 順序一致性讀取(Sequential Read)

1、流程
  • 發送請求到 Leader 節點:客户端將讀取請求發送到集羣中的 Leader 節點。
  • Leader 處理讀取請求

    • Leader 節點直接從本地狀態讀取數據,並返回給客户端。
    • 由於 Leader 節點已經確保了操作的順序性,該讀取操作是順序一致的。
2、特點
  • 強一致性:讀取操作遵循操作的順序性,但不一定是最新的。
  • 較高的延遲:不需要與多數節點通信,性能較好。

2.3. 最終一致性讀取(Eventually Consistent Read)

1、流程
  • 發送請求到 Leader 節點:客户端可以將讀取請求發送到集羣中的任意節點(包括 Leader 和 Follower)。
  • 節點處理讀取請求

    • 接受請求的節點直接從本地狀態讀取數據,並返回給客户端。
    • 由於讀取的數據可能不同步,因此數據一致性較弱。
2、特點
  • 一致性較弱:返回的數據可能不是最新的,但在一段時間後會達到一致性。
  • 性能最佳:可以從任意節點讀取數據,延遲最低,適用於對一致性要求不高的場景。

2.4. 代碼示例

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.ReadOption;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class EtcdReadExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        String endpoint = "http://localhost:2379"; // etcd 服務器地址
        String key = "your-key"; // 鍵名

        // 創建 etcd 客户端實例
        Client client = Client.builder().endpoints(endpoint).build();

        try {
            // 線性一致性讀取(默認)
            System.out.println("Linearizable Read (default):");
            linearizableRead(client, key);

            // 順序一致性讀取
            System.out.println("\nSequential Read:");
            sequentialRead(client, key);

            // 最終一致性讀取
            System.out.println("\nEventually Consistent Read:");
            eventuallyConsistentRead(client, key);

        } finally {
            // 關閉客户端連接
            client.close();
        }
    }

    // 線性一致性讀取(默認方式)
    private static void linearizableRead(Client client, String key) throws ExecutionException, InterruptedException {
        // 調用通用讀取方法,不傳遞任何特殊選項
        performRead(client, key);
    }

    // 順序一致性讀取
    private static void sequentialRead(Client client, String key) throws ExecutionException, InterruptedException {
        // 設置讀取選項為順序一致性
        ReadOption readOption = ReadOption.newBuilder().withConsistency(ReadOption.Consistency.SERIALIZABLE).build();
        performRead(client, key, readOption);
    }

    // 最終一致性讀取
    private static void eventuallyConsistentRead(Client client, String key) throws ExecutionException, InterruptedException {
        // 設置讀取選項為最終一致性
        GetOption getOption = GetOption.newBuilder().withConsistency(GetOption.Consistency.KUS).build();
        performRead(client, key, getOption);
    }

    // 執行讀取操作並輸出結果(不傳遞選項,使用默認的線性一致性讀取)
    private static void performRead(Client client, String key) throws ExecutionException, InterruptedException {
        // 執行讀取操作(默認的線性一致性讀取)
        CompletableFuture<GetResponse> responseFuture = client.getKVClient().get(ByteSequence.from(key, "UTF-8"));

        // 獲取讀取結果
        GetResponse response = responseFuture.get();

        // 輸出讀取到的值
        if (response.getKvs().size() > 0) {
            System.out.println("Key: " + response.getKvs().get(0).getKey().toStringUtf8());
            System.out.println("Value: " + response.getKvs().get(0).getValue().toStringUtf8());
        } else {
            System.out.println("Key not found");
        }
    }

    // 執行讀取操作並輸出結果(傳遞讀取選項)
    private static void performRead(Client client, String key, GetOption getOption) throws ExecutionException, InterruptedException {
        // 執行讀取操作
        CompletableFuture<GetResponse> responseFuture = client.getKVClient().get(ByteSequence.from(key, "UTF-8"), getOption);

        // 獲取讀取結果
        GetResponse response = responseFuture.get();

        // 輸出讀取到的值
        if (response.getKvs().size() > 0) {
            System.out.println("Key: " + response.getKvs().get(0).getKey().toStringUtf8());
            System.out.println("Value: " + response.getKvs().get(0).getValue().toStringUtf8());
        } else {
            System.out.println("Key not found");
        }
    }

    // 重載方法,適配 ReadOption
    private static void performRead(Client client, String key, ReadOption readOption) throws ExecutionException, InterruptedException {
        // 執行讀取操作
        CompletableFuture<GetResponse> responseFuture = client.getKVClient().get(ByteSequence.from(key, "UTF-8"), readOption);

        // 獲取讀取結果
        GetResponse response = responseFuture.get();

        // 輸出讀取到的值
        if (response.getKvs().size() > 0) {
            System.out.println("Key: " + response.getKvs().get(0).getKey().toStringUtf8());
            System.out.println("Value: " + response.getKvs().get(0).getValue().toStringUtf8());
        } else {
            System.out.println("Key not found");
        }
    }
}

2.5. 總結

通過代碼可以看出,etcd默認的讀取方式是線行一致性讀取,説明etcd更側重強一致性的場景。

三種讀取方式一致性由弱到強分別為:

1、最終一致性讀取

客户端可以將讀取請求發送到集羣中的任意節點(包括 Leader 和 Follower)。

如果讀取請求到了leader節點,或者是同步到leader最新日誌的follower節點,其實和“順序一致性讀取”一樣。

但如果讀取請求到了一個落後leader日誌的follower節點(因為網絡暫時隔離等情況),那麼讀取到的數據就是過時的。基於 raft算法,要等到後續網絡等恢復之後,該節點才會同步到最新日誌。

該節點的好處在於“讀寫分離”,當高併發讀取時,如果都請求leader節點會扛不住。

2、順序一致性讀取

客户端的請求都發送在leader節點,所以相比較前者而言,能保障讀到的是當前已提交的最新的數據。

但是etcd 內部使用讀寫鎖機制,因此讀操作不會與正在進行的寫操作衝突,當有一批非事務的讀寫命令請求時,沒辦法嚴格保障執行順序。

3、線性一致性讀取

讀取操作被偽裝成空寫操作,也會有大多數follower節點的確認。在寫入的章節説過,etcd實現了所有節點寫入的順序一致性,因此該讀取方式,也實現了嚴格的順序。

但缺點就是,所有請求都交給leader處理,讀取請求都需要大多數follower節點確認,所以網絡消耗較高,高併發時有瓶頸。

2.6. zookeeper的讀取

zookeeper和etcd不同,zookeeper 的讀取操作默認是從 follower 節點進行的。

  • Leader 節點:負責處理所有寫操作,並將寫操作通過 Zab 協議(ZooKeeper Atomic Broadcast)廣播給所有 follower 節點。
  • Follower 節點:負責處理讀取操作和轉發寫操作請求給 leader 節點。

讀取操作的默認行為是從 follower 節點進行,但這也意味着讀取操作可能會有一定的延遲,因為 follower 節點需要從 leader 節點同步最新的數據。為了確保讀取的一致性,ZooKeeper 提供了 sync 方法,可以強制客户端從 leader 節點同步最新的更新。

sync 方法

sync 方法可以確保在讀取操作之前,所有之前的寫操作都已經被應用到 follower 節點。使用 sync 可以強制客户端在讀取數據前,同步最新的狀態,從而實現強一致性的讀取。

3. 監聽

etcd 的 watch 功能用於監聽鍵或鍵的前綴,以便在這些鍵發生變化時通知客户端。watch 是 etcd 中非常強大和常用的功能,可以用來實現服務發現、配置管理和分佈式鎖等功能。

實際應用場景
  • 配置管理:可以通過 watch 機制監聽配置變化,當配置發生變化時,及時通知相應的服務進行更新。
  • 服務發現:通過 watch 機制監聽服務註冊信息的變化,當有新服務註冊或服務下線時,及時通知客户端進行相應處理。
  • 分佈式鎖:通過 watch 機制監聽鎖的狀態變化,實現分佈式鎖的搶佔和釋放。

3.1. 內部實現

1、修訂版本(Revision)

在 etcd 中,每次寫操作都會生成一個新的修訂版本(Revision)。這個修訂版本是一個遞增的數字,用於標識每次事務提交的順序。watch 機制利用這個修訂版本來跟蹤鍵值的變化。

2、事件存儲和轉發

當客户端創建 watch 時,etcd 會將 watch 請求轉發到當前的 leader 節點。leader 節點會將 watch 請求添加到其內部的監聽列表中。

當有新的寫操作提交時,leader 節點會檢查是否有 watch 監聽這個鍵或鍵前綴。如果有,leader 節點會將相應的事件通知給客户端。

3、異步通知

etcd 的 watch 機制是異步的。當有新的事件生成時,etcd 會異步地將事件推送給客户端。這種異步通知機制可以提高系統的響應速度和併發處理能力。

3.2. 斷點續傳

etcd 提供了斷點續傳功能,確保在客户端連接中斷後重新連接時,不會丟失變更事件。具體實現通過以下幾個步驟:

  • 指定修訂版本:客户端在發起 watch 請求時,可以指定從哪個修訂版本開始監聽。
  • 事件緩存:leader 節點會緩存一段時間內的事件,以便在客户端重新連接時提供這些事件。
  • 重新連接:客户端在重新連接時,可以通過指定上次接收到的最後一個事件的修訂版本,繼續接收從該修訂版本後的所有事件。
緩存事件時間

etcd 的事件緩存機制主要是為了確保在客户端重新連接時,不會丟失在斷連期間的事件。緩存的時間和事件數量由 etcd 的內部實現和配置決定

  • 事件保留時間:默認情況下,etcd 會保留最近一段時間內的事件。這段時間通常是配置的 etcd 副本保留時間(compaction interval)。
  • 事件數量限制:緩存的事件數量也受到內存和其他資源的限制,以防止內存佔用過多。

etcd 提供了一些配置項來調節事件的保留時間和緩存策略。

retention configuration(保留配置)

--auto-compaction-retention:這個參數用於指定自動壓縮的保留時間間隔。壓縮會刪除超過這個時間間隔的歷史修訂版本和事件。
如果設置為 1 小時(--auto-compaction-retention=1h),那麼 etcd 會保留過去 1 小時內的所有事件。

Compaction(壓縮)

etcd 提供了手動和自動的壓縮機制,通過刪除舊的修訂版本和事件來節省空間。
自動壓縮可以通過配置 --auto-compaction-mode 和 --auto-compaction-retention 參數來實現。

3.3. 代碼示例

import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.Watch.Watcher;
import io.etcd.jetcd.watch.WatchEvent;
import io.etcd.jetcd.watch.WatchResponse;
import io.etcd.jetcd.options.WatchOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;

@Service
public class EtcdWatchService {

    private static final Logger LOGGER = LoggerFactory.getLogger(EtcdWatchService.class);

    @Value("${etcd.endpoints}")
    private String etcdEndpoints;

    private Client client;
    private Watcher watcher;
    private AtomicLong lastRevision = new AtomicLong(0);
    private volatile boolean shouldReconnect = true;
    private static final int MAX_RETRIES = 5;  // 最大重試次數
    private int retryCount = 0;
    private long baseDelay = 1000;  // 基礎延遲時間(毫秒)

    @PostConstruct
    public void init() {
        client = Client.builder().endpoints(etcdEndpoints.split(",")).build();
        startWatching();
    }

    @PreDestroy
    public void cleanup() {
        shouldReconnect = false;
        if (watcher != null) {
            watcher.close();
        }
        if (client != null) {
            client.close();
        }
    }

    private void startWatching() {
        ByteSequence key = ByteSequence.from("my-key", StandardCharsets.UTF_8);
        WatchOption watchOption = WatchOption.newBuilder().withRevision(lastRevision.get()).build();

        watcher = client.getWatchClient().watch(key, watchOption, new Watch.Listener() {
            @Override
            public void onNext(WatchResponse response) {
                response.getEvents().forEach(event -> {
                    switch (event.getEventType()) {
                        case PUT:
                            LOGGER.info("PUT event: {} -> {}", event.getKeyValue().getKey(),
                                    event.getKeyValue().getValue().toString(StandardCharsets.UTF_8));
                            break;
                        case DELETE:
                            LOGGER.info("DELETE event: {}", event.getKeyValue().getKey());
                            break;
                    }
                    // 更新最後的修訂版本號
                    lastRevision.set(event.getKeyValue().getModRevision());
                });
            }

            @Override
            public void onError(Throwable throwable) {
                LOGGER.error("Watch error: {}", throwable.getMessage());
                throwable.printStackTrace();
                if (shouldReconnect && retryCount < MAX_RETRIES) {
                    reconnect();
                } else {
                    LOGGER.error("Max retries reached, will not attempt to reconnect.");
                }
            }

            @Override
            public void onCompleted() {
                LOGGER.info("Watch completed");
            }
        });
    }

    private void reconnect() {
        try {
            long delay = (long) (baseDelay * Math.pow(2, retryCount));  // 指數退避
            LOGGER.info("Reconnecting watch in {} ms...", delay);
            cleanupWatcher();
            Thread.sleep(delay);
            startWatching();
            retryCount++;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOGGER.error("Reconnect interrupted", e);
        }
    }

    private void cleanupWatcher() {
        if (watcher != null) {
            watcher.close();
        }
    }
}
lastRevision

在每次監聽到寫入消息後,都需要記錄最新的revision,這樣當下次重連時傳入lastRevision,可以繼續基於上次的版本繼續消費緩存的事件。

Watch.Listener 接口三個方法
  • onNext(WatchResponse response): 當被監聽的鍵發生變化時,etcd 會調用此方法。
  • onError(Throwable throwable): 當發生錯誤時,etcd 會調用此方法。
  • onCompleted(): 當 watch 完成時,etcd 會調用此方法
onError的場景

etcd 的 Watch.Listener 中的 onError 方法會在各種異常和錯誤情況下被觸發,這些情況包括但不限於網絡問題、連接斷開、權限問題等。包括:

  • 網絡問題

    • 網絡中斷:如果客户端與 etcd 服務器之間的網絡連接中斷或不穩定,會觸發 onError 方法。
    • 超時:如果網絡請求超時,導致無法接收到 etcd 服務器的響應,也會觸發 onError。
  • 服務端問題

    • etcd 服務器重啓:如果 etcd 服務器重啓,現有的 watch 連接會中斷,觸發 onError 方法。
    • etcd Leader 變更:在 etcd 集羣中,如果 leader 節點發生變更,現有的 watch 連接可能會中斷,觸發 onError 方法。
  • 客户端問題

    • 客户端主動關閉:如果客户端主動關閉連接,也會觸發 onError 方法。
    • 客户端資源耗盡:如果客户端資源(如線程、文件描述符等)耗盡,可能導致 watch 連接失敗,觸發 onError 方法。
  • 權限問題

    • 權限不足:如果客户端對所 watch 的鍵或鍵前綴沒有足夠的權限,也會導致 watch 失敗,觸發 onError 方法。

3.4. 對比zookeeper

zookeeper 的 watch 機制是一種一次性觸發的通知機制,允許客户端在指定的節點(znode)上設置 watch,當 znode 的數據或子節點狀態發生變化時,客户端會收到通知。

1、基本特性
  • 一次性觸發:ZooKeeper 的 watch 是一次性的,即當 watch 被觸發後,它就會失效。如果客户端需要繼續監視該 znode 的變化,需要重新設置 watch。
  • 沒有事件重放機制:ZooKeeper 不像 Etcd,沒有事件重放機制,這意味着如果因為連接中斷或程序錯誤等,錯過了某個消息事件,重新監聽後也不會再收到原事件。
  • 註冊的位置:watch 可以註冊在數據讀取操作(如 getData 或 getChildren)上,當這些操作返回結果時,watch 會被註冊到指定的 znode 上。
2、沒有斷點續傳

ZooKeeper 的 watch 機制本身並不直接支持斷點續傳(即斷連後重新連接時能夠繼續監聽斷連期間的事件)。

這是因為 ZooKeeper 的 watch 是一次性觸發的,當 watch 被觸發後,它就會失效。如果客户端需要繼續監視該 znode 的變化,需要重新設置 watch。而且 ZooKeeper 的 watch是客户端實現的,並不像etcd,會在服務端將事件緩存一段時間。

ZooKeeper 的設計哲學是一次性觸發 watch,它更像是一種通知機制,而不是持續的訂閲。

代碼示例

zookeeper watch代碼:

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

@Service
public class ZooKeeperWatchService implements Watcher {

    private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperWatchService.class);

    @Autowired
    private ZooKeeper zooKeeper;

    private final String watchedNodePath = "/my-node";

    @PostConstruct
    public void init() {
        try {
            watchNode(watchedNodePath);
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("Error initializing ZooKeeper watch", e);
        }
    }

    @PreDestroy
    public void cleanup() throws InterruptedException {
        if (zooKeeper != null) {
            zooKeeper.close();
        }
    }

    private void watchNode(String path) throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists(path, this);
        if (stat != null) {
            byte[] data = zooKeeper.getData(path, this, stat);
            LOGGER.info("Current data at {}: {}", path, new String(data));
        } else {
            LOGGER.warn("Node {} does not exist", path);
        }
    }

    @Override
    public void process(WatchedEvent event) {
        LOGGER.info("Received event: {}", event);
        try {
            if (event.getType() == Event.EventType.NodeDataChanged) {
                // Data has changed, watch the node again to keep watching for future changes
                watchNode(event.getPath());
            } else if (event.getType() == Event.EventType.NodeCreated || event.getType() == Event.EventType.NodeDeleted) {
                // For NodeCreated and NodeDeleted events, re-watch the parent node
                watchNode(watchedNodePath);
            } else if (event.getState() == Event.KeeperState.SyncConnected) {
                // Re-connect to ZooKeeper and re-watch the node
                watchNode(watchedNodePath);
            }
        } catch (KeeperException | InterruptedException e) {
            LOGGER.error("Error watching node", e);
        }
    }
}
  • process 方法是 Watcher 接口的實現方法,用於處理來自 ZooKeeper 的 WatchedEvent
  • 在每次監聽到消息後,都需要重新監聽節點。調用 zooKeeper.getData(path, this, stat); 獲取節點數據,同時設置數據變化的 watch。this 再次表示當前實現 Watcher 接口的實例。
3. Etcd 更優勢的方面
  • 高併發時更好:Etcd提供了一個事件流接口,允許客户端連續接收變化通知。這種設計在處理大量連續變化時非常高效,因為它避免了 ZooKeeper 頻繁的連接和斷開連接操作,減少了網絡和服務器端的負擔。
  • 消息事件不容易丟失:Etcd支持事件重放,在客户端重新連接後,還可以斷點續傳。另外 ZooKeeper 的監聽是一次性觸發,然後再重新註冊,如果在這時間窗口之間有變更事件,同樣也會丟失。
4. 二者擅長場景

(1)ZooKeeper 的 Watch 機制適用場景

  • 事件觸發簡單且可預測的場景:ZooKeeper的watch是一次性觸發的,這適合於事件變化不頻繁,且每次變化後客户端可以簡單地重新註冊watch的場景。例如,在分佈式鎖或領導者選舉中,某個關鍵節點的變化通常是較少且重要的,watch的簡化觸發機制已足夠。

(2)Etcd 的 Watch 機制適用場景

  • 高頻次事件通知的場景:持續更新和自動重新設置,etcd的watch支持持久連接和事件流,這適合於頻繁變化的環境,如動態服務發現和配置管理,客户端無需手動重新註冊watch。
  • 需要事件歷史和重放能力的場景:事件可靠性:,etcd允許客户端在重新連接後獲取自上次斷開以來的所有事件,這對於需要確保事件不丟失的系統至關重要,例如在分佈式緩存更新中。

4. 租約

租約的核心概念是為鍵值對設定一個生存時間(TTL),當租約到期時,綁定到該租約的所有鍵值對都會自動刪除。

Etcd 租約的基本操作
  • 創建租約:為鍵值對分配一個租約,並設置 TTL。
  • 綁定鍵值對到租約:將鍵值對綁定到租約上,以便在租約到期時自動刪除這些鍵值對。一個租約可以綁定多個鍵,當租約到期後都會被刪除。
  • 續期租約:在租約到期之前,通過續期操作延長租約的生存時間。注意etcd並不能續期指定時間的租約,而是將租約重置回原來的時間。
  • 釋放租約:顯式釋放租約。
代碼示例
import com.google.protobuf.ByteString;
import com.ibm.etcd.api.KeyValue;
import com.ibm.etcd.api.RangeResponse;
import com.ibm.etcd.client.EtcdClient;
import com.ibm.etcd.client.lease.LeaseClient;
import com.ibm.etcd.client.kv.KvClient;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.Lease;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.kv.GetResponse;
import io.etcd.jetcd.options.GetOption;
import io.etcd.jetcd.options.PutOption;

import java.io.IOException;
import java.util.concurrent.ExecutionException;

public class EtcdLeaseExample {

    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        // 創建 Etcd 客户端
        Client client = Client.builder()
                .endpoints("http://localhost:2379")
                .build();

        // 獲取 KV 和 Lease 客户端
        KvClient kvClient = client.getKvClient();
        LeaseClient leaseClient = client.getLeaseClient();

        // 創建租約,TTL 為 10 秒
        long leaseId = leaseClient.grant(10).get().getID();

        // 將鍵值對綁定到租約
        kvClient.put(ByteSequence.from("key1", "UTF-8"), ByteString.copyFromUtf8("value1"))
                .withLease(leaseId)
                .get();

        kvClient.put(ByteSequence.from("key2", "UTF-8"), ByteString.copyFromUtf8("value2"))
                .withLease(leaseId)
                .get();

        // 續期租約
        leaseClient.keepAlive(leaseId)
                .thenAccept(response -> System.out.println("Lease kept alive"));

        // 等待租約過期
        Thread.sleep(15000);

        // 檢查鍵值對是否已刪除
        RangeResponse response1 = kvClient.get(ByteSequence.from("key1", "UTF-8")).get();
        RangeResponse response2 = kvClient.get(ByteSequence.from("key2", "UTF-8")).get();

        if (response1.getKvs().isEmpty()) {
            System.out.println("Key1 is deleted");
        } else {
            System.out.println("Key1 still exists: " + response1.getKvs().get(0).getValue().toStringUtf8());
        }

        if (response2.getKvs().isEmpty()) {
            System.out.println("Key2 is deleted");
        } else {
            System.out.println("Key2 still exists: " + response2.getKvs().get(0).getValue().toStringUtf8());
        }

        // 釋放租約
        leaseClient.revoke(leaseId).get();

        // 關閉 Etcd 客户端
        client.close();
    }
}
user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.