一、Watcher 概述
zookeeper 提供了數據的 發佈/訂閲功能,多個訂閲者可同時監聽某一特定的主題對象,當主題對象的自身狀態發生了變化時(例如節點內容發生了改變、節點下的子節點列表發生改變等),會實時、主動的通知所有訂閲者。
二、Watcher 架構
Watcher 由三部分組成 zookeeper服務端、zookeeper客户端、客户端的watchManager對象。
客户端首先將 Watcher 註冊到服務器,同時將 Watcher 對象保存到客户端的 Watcher管理器 中。當 zookeeper 服務器端監聽數據狀態發生變化時,服務端會主動通知客户端,接着 客户端的Watcher 管理器會觸發相關的 Watcher 來回調相應的處理邏輯,從而完成整體的數據的發佈/訂閲流程。
三、Watcher 特性
| 特性 | 説明 |
|---|---|
| 一次性 | watcher是一次性的,一旦被觸發就會被移除,再次使用時需要重新註冊。 |
| 客户端順序回調 | watcher 回調的順序是串行化執行的,只有回調後客户端才能看到最新的數據狀態。 |
| 輕量級 | watcherEvent是最小的通信單元,結構上只包含通知狀態、事件類型和節點路徑,並不會告訴數據節點變化前後的具體內容。 |
| 時效性 | watcher只有在當前的session徹底失效時才會無效,若在session有效內快速重連成功,則watcher依然存在,依然可以接收到通知。 |
四、Watcher 通知狀態(KeeperState)
| 枚舉屬性 | 説明 |
|---|---|
| SyncConnected | 客户端和服務端正常連接 |
| Disconnected | 客户端和服務端斷開連接 |
| Expired | 會話session失效 |
| AuthFailed | 身份認證失敗 |
五、Watcher 事件類型(EventType)
| 枚舉屬性 | 説明 |
|---|---|
| None | 無 |
| NodeCreated | 數據的節點創建 |
| NodeDeleted | 數據的節點刪除 |
| NodeDataChanged | 數據節點內容發生變更時 |
| NodeChildrenChanged | 節點的子節點列表發生變更時 |
六、案例
1、狀態獲取
package com.snails.zookeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
public class ZkConnectionWatcher implements Watcher {
//計數器對象
static CountDownLatch countDownLatch = new CountDownLatch(1);
//鏈接對象
static ZooKeeper zookeeper;
@Override
public void process(WatchedEvent event) {
try {
//事件類型
if(event.getType() == Event.EventType.None) {
if(event.getState() == Event.KeeperState.SyncConnected ) {
System.out.println("創建連接成功"); //當連接到zookeeper服務器時,會執行這一塊的代碼塊。
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("斷開連接"); // 當和zookeeper服務器斷開連接時,會執行這一塊代碼。
// 當網絡重新連接成功連接成功之後,並且在sessionTimeOut設置的範圍之內,會自動重新連接到服務器上。
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("會話超時"); // 當網絡時間超過sessionTimeOut設置的時長時,會執行這一塊代碼。
zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkConnectionWatcher()); //網絡超時一般設置重新連接
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("認證失敗");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkConnectionWatcher());
//通過授權模式獲取節點數據
zookeeper.addAuthInfo("digest", "woniu:123456".getBytes());
byte[] data = zookeeper.getData("/woniu", false, null);
System.out.println(new String(data));
countDownLatch.await();
System.out.println(zookeeper.getSessionId());
Thread.sleep(50000);
zookeeper.close();
System.out.println("結束");
} catch (Exception e) {
e.printStackTrace();
}
}
}
2、Watcher三種API之Exists
package com.snails.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkWatcherExists {
String Ip = "127.0.0.1:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
try {
//解決一次性的問題,就是每次節點的修改,不用重啓客户端。
zooKeeper.exists("/woniu1",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("path= " + event.getPath());
System.out.println("path= " + event.getType());
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watchderFunction01() throws KeeperException, InterruptedException {
zooKeeper.exists("/woniu1", true);
Thread.sleep(500000);
System.out.println("end-----");
}
//註冊多個監聽器
@Test
public void watchderFunction02() throws KeeperException, InterruptedException {
zooKeeper.exists("/woniu1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("監聽woniu1 ===================");
System.out.println("path= " + event.getPath());
System.out.println("path= " + event.getType());
}
});
zooKeeper.exists("/woniu2", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("監聽woniu2 ===================");
System.out.println("path= " + event.getPath());
System.out.println("path= " + event.getType());
}
});
Thread.sleep(500000);
System.out.println("end-----");
}
}
3、Watcher三種API之getData
package com.snails.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkWatcherGetData {
String Ip = "127.0.0.1:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
try {
//解決一次性的問題,就是每次節點的修改,不用重啓客户端。
zooKeeper.exists("/woniu1",this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("path= " + event.getPath());
System.out.println("path= " + event.getType());
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watchderFunction01() throws KeeperException, InterruptedException {
zooKeeper.getData("/woniu1", true, null);
Thread.sleep(500000);
System.out.println("end-----");
}
@Test
public void watchderFunction02() throws KeeperException, InterruptedException {
zooKeeper.getData("/woniu1", new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定義watcher");
System.out.println("path= " + event.getPath());
System.out.println("path= " + event.getType());
}
}, null);
Thread.sleep(500000);
System.out.println("end-----");
}
}
4、Watcher三種API之getChild:
package com.snails.zookeeper;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZkWatcherGetChild {
String Ip = "127.0.0.1:2181";
ZooKeeper zooKeeper = null;
@Before
public void before() throws IOException, InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
@Override
public void process(WatchedEvent event) {
if(event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
System.out.println("path= " + event.getPath());
System.out.println("path= " + event.getType());
}
});
countDownLatch.await();
}
@After
public void after() throws InterruptedException {
zooKeeper.close();
}
@Test
public void watchderFunction01() throws KeeperException, InterruptedException {
zooKeeper.getChildren("/woniu1", true);
Thread.sleep(500000);
System.out.println("end-----");
}
}