1、Zookeeper API
1.1、描述
在 ZooKeeper 中,Watcher 是一次性的,不會自動重新註冊。因此,如果你希望在特定事件(如節點數據變化)發生後繼續監聽其他事件(如節點刪除),你需要在每次事件觸發時重新註冊 Watcher
1.2、示例
首先,確保你在項目中添加了 Zookeeper 的依賴:
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
完整示例代碼
import org.apache.zookeeper.*;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
public class ZooKeeperExample {
private static ZooKeeper zk;
private static final String ZK_SERVER = "localhost:2181";
private static final int SESSION_TIMEOUT = 3000;
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
// 創建 ZooKeeper 客户端
zk = new ZooKeeper(ZK_SERVER, SESSION_TIMEOUT, new MyWatcher());
// 創建節點
zk.create("/example", "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new MyStringCallback(), null);
// 檢查節點狀態並設置 Watcher
zk.exists("/example", true, new MyStatCallback(), null);
// 觸發節點變化事件
zk.setData("/example", "newData".getBytes(), -1);
// 刪除節點,驗證 Watcher 是否會監聽刪除事件
Thread.sleep(2000);
zk.delete("/example", -1);
// 保持程序運行以觀察回調
Thread.sleep(10000);
// 關閉 ZooKeeper 客户端
zk.close();
}
// Watcher 實現類
static class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
System.out.println("Watcher received event: " + event);
try {
if (event.getType() == Event.EventType.NodeDataChanged || event.getType() == Event.EventType.NodeDeleted) {
// 重新設置 Watcher
zk.exists(event.getPath(), true);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
}
// StatCallback 實現類
static class MyStatCallback implements StatCallback {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc == 0) {
System.out.println("Node exists: " + path + ", stat: " + stat);
} else {
System.out.println("Node does not exist: " + path);
}
}
}
// StringCallback 實現類
static class MyStringCallback implements StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
if (rc == 0) {
System.out.println("Node created: " + name);
} else {
System.out.println("Node creation failed: " + KeeperException.Code.get(rc));
}
}
}
}
1.3、結果驗證
- 首次觸發 Watcher:當第一次修改節點數據時,Watcher 會觸發並打印事件信息。
- 刪除節點:刪除節點時,Watcher 會再次觸發並打印事件信息,因為在 process 方法中重新註冊了 Watcher。
通過這個完整的示例代碼,你可以驗證 Watcher 是一次性的,並且需要在每次觸發後重新註冊以繼續監聽其他事件,包括節點刪除事件
2、Apache Curator
2.1、描述
Apache Curator 是一個用於簡化 ZooKeeper 客户端開發的 Java 庫。它提供了更高層次的 API 來處理 ZooKeeper 的常見任務,例如連接管理、會話管理、重試機制等。以下是一個完整的實例,演示瞭如何使用 Curator 庫來進行回調和監聽節點事件
2.2、示例
確保在你的項目中添加了 Curator 的依賴:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.0</version>
</dependency>
完整示例代碼
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class TreeCacheExample {
private static final String ZK_SERVER = "localhost:2181";
private static final String ZNODE_PATH = "/example";
public static void main(String[] args) throws Exception {
// 創建 Curator 客户端
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_SERVER, new ExponentialBackoffRetry(1000, 3));
client.start();
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState newState) {
switch (newState) {
case CONNECTED:
System.out.println("Registry connected");
break;
case LOST:
System.out.println("Registry disconnected");
break;
case RECONNECTED:
System.out.println("Registry reconnected");
break;
case SUSPENDED:
System.out.println("Registry suspended");
break;
default:
break;
}
}
});
// 創建 TreeCache
TreeCache treeCache = new TreeCache(client, ZNODE_PATH);
// 添加 TreeCache 監聽器
TreeCacheListener listener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
switch (event.getType()) {
case NODE_ADDED:
System.out.println("Node added: " + event.getData().getPath());
break;
case NODE_UPDATED:
System.out.println("Node updated: " + event.getData().getPath());
break;
case NODE_REMOVED:
System.out.println("Node removed: " + event.getData().getPath());
break;
default:
break;
}
}
};
treeCache.getListenable().addListener(listener);
// 啓動 TreeCache
treeCache.start();
// 進行一些節點操作以觸發事件
client.create().forPath(ZNODE_PATH + "/child", "data".getBytes());
client.setData().forPath(ZNODE_PATH + "/child", "newData".getBytes());
client.delete().forPath(ZNODE_PATH + "/child");
// 保持程序運行以觀察回調
Thread.sleep(10000);
// 關閉 TreeCache 和 Curator 客户端
treeCache.close();
client.close();
}
}
2.3、結果驗證
- ConnectionStateListener 監聽Zookeeper的鏈接狀態
- TreeCacheListener 監聽Zookeeper 路徑的事件
3、對比
TreeCache 和 Watcher 是 Apache Curator 框架和 Apache ZooKeeper 中的兩種不同的監聽機制,用於監視 ZooKeeper 集羣中的節點變化。它們的註冊和使用方式有所不同,原因主要在於它們的設計目的和工作機制
Watcher
- 作用: Watcher 是 ZooKeeper 提供的一種機制,用於監視節點的變化。每次客户端對 ZooKeeper 進行讀取操作時,可以附帶一個 Watcher,當該節點發生變化時,ZooKeeper 會通知客户端。
- 多次註冊: Watcher 是一次性的,即每次觸發後就失效了。如果客户端需要繼續監聽該節點的變化,則需要重新註冊 Watcher。因此,Watcher 的設計是為了輕量級、精細粒度的監聽。
TreeCache
- 作用: TreeCache 是 Apache Curator 提供的一個高級緩存機制,用於監視一個節點及其子節點的變化。它不僅僅是監視單個節點,而是監視整個子樹的變化,並將數據緩存在本地。
- 一次註冊: TreeCache 通過一次註冊,可以持續監視整個子樹的變化,並保持緩存的更新。這是因為 TreeCache 維護了一個本地緩存,能夠自動處理 ZooKeeper 的事件通知,保持緩存數據的一致性。它通過內部的機制,自動重新註冊 Watcher,以保證對子樹的變化持續監控。因此,用户不需要手動多次註冊
為什麼 TreeCache 不像 Watcher 一樣多次註冊?
1. 設計目的: TreeCache 的設計目的是為了方便地監視和緩存整個子樹的變化,提供一個高效、易用的接口來處理複雜的節點監控需求。而 Watcher 是為了提供一個更底層的、精細粒度的節點監控機制,適合一些簡單、輕量級的使用場景。
2. 內部實現: TreeCache 內部會自動管理 Watcher 的註冊和事件處理,因此用户不需要手動多次註冊 Watcher。而 Watcher 需要用户手動管理和重新註冊,以確保對節點的持續監控。
3. 使用場景: 對於需要監控多個節點或整個子樹變化的場景,TreeCache 提供了更高效和便捷的解決方案,而不需要用户頻繁註冊 Watcher。
總結來説,TreeCache 和 Watcher 是為了滿足不同需求而設計的。TreeCache 通過一次註冊,自動管理對節點及子節點的持續監控和緩存更新,而 Watcher 需要用户手動多次註冊以實現持續監控