博客 / 詳情

返回

【zookeeper 第四篇章】監控 Watcher

一、Watcher 概述

zookeeper 提供了數據的 發佈/訂閲功能,多個訂閲者可同時監聽某一特定的主題對象,當主題對象的自身狀態發生了變化時(例如節點內容發生了改變、節點下的子節點列表發生改變等),會實時、主動的通知所有訂閲者。

二、Watcher 架構

Watcher 由三部分組成 zookeeper服務端zookeeper客户端客户端的watchManager對象
客户端首先將 Watcher 註冊到服務器,同時將 Watcher 對象保存到客户端的 Watcher管理器 中。當 zookeeper 服務器端監聽數據狀態發生變化時,服務端會主動通知客户端,接着 客户端的Watcher 管理器會觸發相關的 Watcher 來回調相應的處理邏輯,從而完成整體的數據的發佈/訂閲流程。

三、Watcher 特性

特性 説明
一次性 watcher是一次性的,一旦被觸發就會被移除,再次使用時需要重新註冊。
客户端順序回調 watcher 回調的順序是串行化執行的,只有回調後客户端才能看到最新的數據狀態。
輕量級 watcherEvent是最小的通信單元,結構上只包含通知狀態、事件類型和節點路徑,並不會告訴數據節點變化前後的具體內容。
時效性 watcher只有在當前的session徹底失效時才會無效,若在session有效內快速重連成功,則watcher依然存在,依然可以接收到通知。

四、Watcher 通知狀態(KeeperState)

枚舉屬性 説明
SyncConnected 客户端和服務端正常連接
Disconnected 客户端和服務端斷開連接
Expired 會話session失效
AuthFailed 身份認證失敗

五、Watcher 事件類型(EventType)

枚舉屬性 説明
None
NodeCreated 數據的節點創建
NodeDeleted 數據的節點刪除
NodeDataChanged 數據節點內容發生變更時
NodeChildrenChanged 節點的子節點列表發生變更時

六、案例

1、狀態獲取

package com.snails.zookeeper;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.util.concurrent.CountDownLatch;

public class ZkConnectionWatcher implements Watcher {

    //計數器對象
    static CountDownLatch countDownLatch = new CountDownLatch(1);
    //鏈接對象
    static ZooKeeper zookeeper;

    @Override
    public void process(WatchedEvent event) {

        try {
            //事件類型
            if(event.getType() == Event.EventType.None) {
                if(event.getState() == Event.KeeperState.SyncConnected ) {
                    System.out.println("創建連接成功"); //當連接到zookeeper服務器時,會執行這一塊的代碼塊。
                    countDownLatch.countDown();
                } else if (event.getState() == Event.KeeperState.Disconnected) {
                    System.out.println("斷開連接");    //  當和zookeeper服務器斷開連接時,會執行這一塊代碼。
                                                      // 當網絡重新連接成功連接成功之後,並且在sessionTimeOut設置的範圍之內,會自動重新連接到服務器上。
                } else if (event.getState() == Event.KeeperState.Expired) {
                    System.out.println("會話超時");  // 當網絡時間超過sessionTimeOut設置的時長時,會執行這一塊代碼。
                    zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkConnectionWatcher()); //網絡超時一般設置重新連接
                } else if (event.getState() == Event.KeeperState.AuthFailed) {
                    System.out.println("認證失敗");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    public static void main(String[] args) {
        try {
            zookeeper = new ZooKeeper("127.0.0.1:2181", 5000, new ZkConnectionWatcher());

            //通過授權模式獲取節點數據
            zookeeper.addAuthInfo("digest", "woniu:123456".getBytes());
            byte[] data = zookeeper.getData("/woniu", false, null);
            System.out.println(new String(data));

            countDownLatch.await();

            System.out.println(zookeeper.getSessionId());

            Thread.sleep(50000);

            zookeeper.close();

            System.out.println("結束");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

2、Watcher三種API之Exists

package com.snails.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkWatcherExists {

    String Ip = "127.0.0.1:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                try {
                    //解決一次性的問題,就是每次節點的修改,不用重啓客户端。
                    zooKeeper.exists("/woniu1",this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }


    @Test
    public void watchderFunction01() throws KeeperException, InterruptedException {
        zooKeeper.exists("/woniu1", true);

        Thread.sleep(500000);

        System.out.println("end-----");
    }

    //註冊多個監聽器
    @Test
    public void watchderFunction02() throws KeeperException, InterruptedException {
        zooKeeper.exists("/woniu1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("監聽woniu1 ===================");
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });

        zooKeeper.exists("/woniu2", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("監聽woniu2 ===================");
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });

        Thread.sleep(500000);
        System.out.println("end-----");
    }
}

3、Watcher三種API之getData

package com.snails.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkWatcherGetData {

    String Ip = "127.0.0.1:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                try {
                    //解決一次性的問題,就是每次節點的修改,不用重啓客户端。
                    zooKeeper.exists("/woniu1",this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }


    @Test
    public void watchderFunction01() throws KeeperException, InterruptedException {
        zooKeeper.getData("/woniu1", true, null);
        Thread.sleep(500000);
        System.out.println("end-----");

    }

    @Test
    public void watchderFunction02() throws KeeperException, InterruptedException {
        zooKeeper.getData("/woniu1", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("自定義watcher");
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        }, null);

        Thread.sleep(500000);
        System.out.println("end-----");

    }
}

4、Watcher三種API之getChild:

package com.snails.zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;

public class ZkWatcherGetChild {

    String Ip = "127.0.0.1:2181";
    ZooKeeper zooKeeper = null;

    @Before
    public void before() throws IOException, InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);

        zooKeeper = new ZooKeeper(Ip, 60000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if(event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }
                System.out.println("path= " + event.getPath());
                System.out.println("path= " + event.getType());
            }
        });
        countDownLatch.await();
    }

    @After
    public void after() throws InterruptedException {
        zooKeeper.close();
    }


    @Test
    public void watchderFunction01() throws KeeperException, InterruptedException {
        zooKeeper.getChildren("/woniu1", true);
        Thread.sleep(500000);
        System.out.println("end-----");
    }

}
user avatar lankerens 頭像 an_653b347d1d3da 頭像 codingdgsun 頭像 docker_app 頭像 prepared 頭像 lingfeng23 頭像 async_wait 頭像 xiaojiu_625c14980f596 頭像 zoux 頭像 xiaoqian01 頭像 phytium_developers 頭像 xiaoxiaofeng_java 頭像
18 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.