博客 / 詳情

返回

百萬架構師第三十二課:協調服務-zookeeper:zookeeper的實踐與原理|JavaGuide

原文鏈接
  1. 數據存儲
  2. 基於 Java API初探zookeeper的使用
  3. 深入分析Watcher機制的實現原理
  4. Curator客户端的使用,簡單高效

數據存儲

​ 基於znode,基於文件系統風格的,樹形結構的文件模型,和內存數據庫差不多,基於增刪改查的命令去操作數據庫,整個數據庫包括整個樹形結構的內容,比如説我們的節點目錄,節點路徑和權限信息,而且zookeeper它會定時去把這些信息數據存儲到磁盤上。

​ 使用DataTree管理整個數據存儲,主要用來存儲節點路徑和數據的內容。底層是一個典型的基於ConcurrentHashMap的一個存儲結構。

事務日誌

創造一個節點或者進行事務操作的時候,都會記錄一個事務日誌,

zoo.cfg 文件中 , datadir

快照日誌

它會記錄zookeeper的某個時刻的快照,記錄整個zookeeper全量數據的內容,類似於我們數據備份的一個概念,並且他會把這些文件放在指定的目錄下去做一個存儲,也是基於datadir這樣一個路徑去做一個存儲

運行時日誌

bin/zookeeper.out

我們可以通過它來看zookeeper運行時的一些情況。

實際應用過程中,我們會把事務日誌單獨掛載到一個磁盤上,我們的每一個請求,事務請求都會做一個輸入日誌的記錄,那這個磁盤的讀寫的性能,將影響到zookeeper本身的操作性能。

基於Java API初探zookeeper的使用

sh zkCli.sh  

基於控制枱

<dependency>
   <groupId>com.apache.zookeeper</groupId>
   <artifactId>zookeeper</artifactId>
   <version>3.4.8</version>
</dependency>

引入包以後就可以使用相應的API去做相應的操作,

代碼

public class ConnectionDemo {
    public static void main(String[] args) {
        try {
            ZooKeeper zooKeeper =
                new ZooKeeper("192.168.136.128:2181," +
                              "192.168.136.129:2181," +
                              "192.168.136.130:2181",
                              4000,null);
            System.out.println(zooKeeper.getState());


        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

控制枱

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING

連接以後

顯示連接狀態為 connecting 等待以後connected

引入watcher機制,new watcher() 直接輸出鏈接狀態 connected

JavaGuide_Zookeeper_實踐與原理_Client四個狀態.png

連接,

代碼

public class ConnectionDemo {
    public static void main(String[] args) {
        try {
            ZooKeeper zooKeeper =
                new ZooKeeper("192.168.136.128:2181," +
                              "192.168.136.129:2181," +
                              "192.168.136.130:2181",
                              4000,null);
            System.out.println(zooKeeper.getState());

            Thread.sleep(1000);

            System.out.println(zooKeeper.getState());

            zooKeeper.close();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

控制枱

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
CONNECTED

Stat 對應控制枱的各項屬性

Watcher實現保證連接成功

public class ConnectionDemo {
    public static void main(String[] args) {
        try {
            final CountDownLatch  countDownLatch = new CountDownLatch(1);
            ZooKeeper zooKeeper =
                new ZooKeeper("192.168.136.128:2181," +
                              "192.168.136.129:2181," +
                              "192.168.136.130:2181",
                              4000, new Watcher() {
                                  // 為了保證連接的成功狀態
                                  @Override
                                  public void process(WatchedEvent watchedEvent) {
                                      // 連接成功以後觸發watcher事件
                                      if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                                          // 如果收到了服務端的響應事件,連接成功
                                          // 連接成功以後做一個遞減。
                                          countDownLatch.countDown();
                                      }
                                  }
                              });
            countDownLatch.await();

            System.out.println(zooKeeper.getState());//CONNECTED

            zooKeeper.close();

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

控制枱

log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTED

參數都需要傳進去version

Stat.getVersion()

增刪改查

public class ConnectionDemo {
   public static void main(String[] args) {
       try {
           final CountDownLatch  countDownLatch = new CountDownLatch(1);
           ZooKeeper zooKeeper =
               new ZooKeeper("192.168.136.128:2181," +
                             "192.168.136.129:2181," +
                             "192.168.136.130:2181",
                             4000, new Watcher() {
                                 // 為了保證連接的成功狀態
                                 @Override
                                 public void process(WatchedEvent watchedEvent) {
                                     // 連接成功以後觸發watcher事件
                                     if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
                                         // 如果收到了服務端的響應事件,連接成功
                                         // 連接成功以後做一個遞減。
                                         countDownLatch.countDown();
                                     }
                                 }
                             });
           countDownLatch.await();

           System.out.println(zooKeeper.getState());//CONNECTED

           // 添加節點
           zooKeeper.create("/java-con-darian","232".getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

           Stat stat = new Stat();
           //Stat和我們在控制枱看到的屬性是一樣的。
           // 得到節點的值
           byte[] dataBytes = zooKeeper.getData("/java-con-darian", null, stat);
           System.out.println(new String(dataBytes));

           Thread.sleep(1000);
           // 修改節點的值,樂觀鎖的概念
           zooKeeper.setData("/java-con-darian", "55".getBytes(),stat.getVersion());

           byte[] updateBytes = zooKeeper.getData("/java-con-darian", null, stat);
           System.out.println(new String(updateBytes));

           zooKeeper.delete("/java-con-darian", stat.getVersion());

           zooKeeper.close();

           // 當前線程進行阻塞
           System.in.read();

       } catch (IOException e) {
           e.printStackTrace();
       } catch (InterruptedException e) {
           e.printStackTrace();
       } catch (KeeperException e) {
           e.printStackTrace();
       }

控制枱

CONNECTED
232
55

事件機制

我們對一個節點發起訂閲以後,那麼對這個節點發起的任何變化,它都會有一個觸發,發給觸發前綁定的客户端,相當於一個發佈訂閲的功能。

通過watcher機制,保證剛開始的連接狀態是connected.

Watcher機制是zookeeper裏邊非常重要的一個特性,那麼我們在zookeeper上創建節點的時候,我們可以綁定監聽事件,比如説,我們可以監聽節點的變更,刪除,子節點變更的一些事件,通過這些事件,可以實現zookeeper的分佈式鎖 ,集羣管理的一些功能。

Watcher特性:當數據發生變化的時候,zookeeper會產生一個watcher事件,並且發送到客户端,但客户端只會收到一次通知,如果後續這個節點再次發生變化,那麼之前設置的watcher的客户端不會再次收到消息(watcher機制是一次性操作)。可以通過循環監聽達到永久的監聽效果。

如何註冊事件機制

通過三個操作來綁定事件:getData、Exists、getChildren

getData 通過一個節點獲得一個節點的數據
Exists 判斷這個節點是否存在
getChildren 可以拿到當前節點的所有子節點

​ 通過設置節點的路徑,相當於給節點綁定了事件

如何觸發事件?

​ 凡是事務類型的操作,都會觸發監聽操作 create / delete / setData

zookeeper.exists(event.getPath().true);

true :默認使用全局的默認事件。

代碼
public class WatcherDemo {
    public static void main(String[] args) {
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            final ZooKeeper zooKeeper =
                new ZooKeeper("192.168.136.128:2181," +
                              "192.168.136.129:2181," +
                              "192.168.136.130:2181",
                              4000, new Watcher() {
                                  // 為了保證連接的成功狀態
                                  @Override
                                  public void process(WatchedEvent watchedEvent) {
                                      System.out.println("全局的默認的watcher事件:"
                                                         + watchedEvent.getType()
                                                         + "----->>>>>>"
                                                         + watchedEvent.getPath());
                                      // 連接成功以後觸發watcher事件
                                      if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
                                          // 如果收到了服務端的響應事件,連接成功
                                          // 連接成功以後做一個遞減。
                                          countDownLatch.countDown();
                                      }
                                  }
                              });
            countDownLatch.await();

            zooKeeper.create("/zk-tmp-darian", "1".getBytes(),
                             ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            // exists   getdata getchildren

            // 綁定事件
            // 我們在創建zookeeper的時候創建了一個默認的匿名內部類,
            // 如果説設置成 true 的話,所有的事件都會觸發到內部類裏邊
            // zooKeeper.exists("/zi-tmp-darian",true);

            // 綁定自己的watcher事件
            // 通過 exists 綁定事件
            Stat stat = zooKeeper.exists("/zk-tmp-darian", new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println(watchedEvent.getType() + "--->>>>" + watchedEvent.getPath());

                    try {
                        // 通過再一次綁定事件去實現永久監聽的一個效果
                        zooKeeper.exists(watchedEvent.getPath(), true);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 通過修改食物類型操作來觸發監聽事件
            stat = zooKeeper.setData("/zk-tmp-darian", "222".getBytes(), stat.getVersion());

            Thread.sleep(1000);

            zooKeeper.delete("/zk-tmp-darian", stat.getVersion());

            System.in.read();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}
控制枱
全局的默認的watcher事件:None----->>>>>>null
NodeDataChanged--->>>>/zk-tmp-darian
全局的默認的watcher事件:NodeDeleted----->>>>>>/zk-tmp-darian

Watcher事件機制:

事件類型

None(-1) 客户端鏈接狀態發生變化的時候,會收到none的事件
NodeCreated(1) 創建節點的事件。比如説zk-persis-mic
NodeDeleted(2) 刪除節點的事件。
NodeDataChanged(3) 節點數據發生變更
NodeChildrenChanged(4) 子節點被創建、被刪除、會發生事件觸發

觸發條件

zk-persis-mic(監聽事件) zk-persis-mic/child(監聽事件)
create(/zk-persis-mic) NodeCreated(exists/getData)
delete(/zk-persis-mic) NodeDelete(exists/getData)
setData(/zk-persis-mic) NodeDataChanged(exists/getData)
create(/zk-persis-mic/children) NodeChildrenChanged(getChildren) NodeCreated
delete(/zk-persis-mic/children) NodeChildrenChanged(getChildren) NodeDelete
setData(/zk-persis-mic/children) NodeDataChanged

事件的實現原理

通過 #exists方法去綁定的事件

入口就是 #exists

Zookeeper 類中的 #exists 方法

public Stat exists(final String path, Watcher watcher)
   throws KeeperException, InterruptedException{
   final String clientPath = path;
   PathUtils.validatePath(clientPath);

   // the watch contains the un-chroot path
   WatchRegistration wcb = null;
   if (watcher != null) {
       wcb = new ExistsWatchRegistration(watcher, clientPath);
   }

   final String serverPath = prependChroot(clientPath);

   RequestHeader h = new RequestHeader();
   h.setType(ZooDefs.OpCode.exists);
   ExistsRequest request = new ExistsRequest();
   request.setPath(serverPath);
   request.setWatch(watcher != null);
   SetDataResponse response = new SetDataResponse();
   ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
   if (r.getErr() != 0) {
       if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
           return null;
       }
       throw KeeperException.create(KeeperException.Code.get(r.getErr()),
               clientPath);
   }

   return response.getStat().getCzxid() == -1 ? null : response.getStat();
}

我們client在zookeeper server上註冊一個事件

ZkWatcher Manager 上保存客户端的事件註冊

JavaGuide_Zookeeper_實踐與原理_Server_Client_Watcher事件.png

源碼地址:Zookeeper核心類中exists(

分析原理:

在客户端調用裏邊就是實例化,

調用增刪改查方法

一定有一個機制去對這個隊列去做一個數據的處理

OutgoingQueue

有一個入隊肯定有一個出隊

數據不是放到隊列中就完事了,肯定需要有機制去對隊列做一個出隊列

判斷是引導流程走向的一個方式!

Ctrl + alt + B 點到方法上,可以看到有幾個實現

NettyServerCnxn

服務端處理請求的

#receiveMessage() 方法

通過鏈式的process對請求去做一個鏈式的處理,把不同的業務去做一個分離

預處理 > 同步處理 > finally處理

JavaGuide_Zookeeper_實踐與原理_Processor_鏈式處理.png

PIPE(管道模式)

Linux 的管道的處理,一個請求過來,進入不同的管道,做不同的處理,最終返回一個結果。

類似於鏈式風格的一種設計。

Zookeeper中大部分都在用多線程的方式異步化流程。

一定有一個線程去處理隊列 submittRequests

Hashmap<watcher, String path>

JavaGuide_Zookeeper_實踐與原理_Watcher_Server_處理流程詳細圖.png

Curator的操作

主要是對我們數據節點操作的一個封裝

​ 原生操作會有很多不便之處,雖然説靈活性比較好,但是我們要做一些事情,比如説,要去綁定事件很複雜,我們要去創建一個連接也很複雜,那怎麼去簡化?

​ Netflix公司開源了一個zookeeper客户端,他跟原生的客户端相比,它是一個更高層次的抽象,他不光是API層次的一個抽象,同時還封裝了一套基於應用場景的API,也就是説zookeeper它本身能夠實現分佈式鎖,leader選舉這些功能,那麼curator對這些功能做了一些封裝,我們直接調用API就好了。就可以完成leader選舉的這樣一個機制,那麼我們怎麼實現這樣一個功能。

基於fluent風格的一個機制

CuratorFramework curatorFamework = CuratorFrameworkFactory
.builder()
.connectString()
.sessionTimeoutMS(4000)
.retryPolicy(new ExponetialBackoffRetry(1000,3))
.namespace(“curator”)
.build()
  
  // 結果: /curator/mic/node1
// 原生api中,必須是逐層創建,也就是父節點必須存在,子節點才能創建

Curator的增刪改查

package com.gupao.study.vip;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;

/**
* <br>類説明 :
* <br>屬性説明:
* <br>作者:Darian
**/
public class CuratorDemo {
   public static void main(String[] args) throws Exception {
       CuratorFramework curatorFramework = CuratorFrameworkFactory
           .builder()
           .connectString("192.168.136.128:2181," +
                          "192.168.136.129:2181," +
                          "192.168.136.130:2181")
           .sessionTimeoutMs(4000)
           // 一個重試機制,遞減的重試機制
           .retryPolicy(new ExponentialBackoffRetry(1000, 3))
           // 指定命名空間,我們會把某個業務放在一個大的命名空間下面
           // 因為我們要區分每個場景下的業務劃分
           .namespace("/curator")
           .build();

       curatorFramework.start();

       // 創建結果: /curator/darian/node1
       // 原生的 API 中,必須是逐層創建,也就是父節點必須存在,子節點才能創建
       curatorFramework.create()
           .creatingParentContainersIfNeeded()
           .withMode(CreateMode.PERSISTENT)
           .forPath("/darian/node1", "2".getBytes());

       // 查詢節點,得到節點的 Stat 信息
       Stat stat = new Stat();
       curatorFramework.getData()
           .storingStatIn(stat)
           .forPath("/darian/node1");

       // 更新操作
       curatorFramework.setData()
           .withVersion(stat.getVersion())
           .forPath("/darian/node1", "232".getBytes());


       // 去刪除某個節點
       curatorFramework.delete()
           .deletingChildrenIfNeeded()
           .forPath("/darian/node1");
   }
}
public static void main(String[] args) {
    CuratorFramework.create()
    .creatingPatentsIfNeeded()
    .withMode(CreateMode.PERSISTENT)
    .forPath(“/mic/node1”,”1”.getBytes());
  
  //刪除
  curatorFramework
    .delete()
    .deletingChildrenIfNeeded()
    .forPath(“/mic/node1”);

    Stat stat = new Stat();
    curatorFramework
    .getData()
    .storingStatIn(stat).forPath(“/mic/node1”)
    
    curatorFramework
    .setData()
    .withVersion(sta.getVersion())
    .forPath(“/mic/node1”,”xx”.getBytes);
  
  curatorFramework.close();    //關閉連接
}

curator 事件的高度封裝,

一定有一個節點特性

PathChildCache 監聽一個節點下子節點的創建、刪除、更新
(CHILD-ADDED,CHILD_UPDATE,CHILD_REMOVED)
NodeCache 監聽一個節點的更新和創建時間
(Receive Event:/mic)
TreeCache zonghe1PathChildCache和NodeCache的特性

Curator的watcher機制

package com.gupao.study.vip;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
* <br>類説明 : 事件的高度封裝
* <p>
* curator封裝了三種事件的機制
* PathChildCache   監聽一個節點下節點的創建、刪除、更新
* NodeCache        監聽一個節點的更新和創建事件
* TreeCache        綜合 PatchChildCache 和 NodeCache 的特性
*
* <br>屬性説明:
* <br>作者:Darian
**/
public class CuratorWatcherDemo {
   public static void main(String[] args) throws Exception {
       CuratorFramework curatorFramework = CuratorFrameworkFactory
           .builder()
           .connectString("192.168.136.128:2181," +
                          "192.168.136.129:2181," +
                          "192.168.136.130:2181")
           .sessionTimeoutMs(4000)
           .retryPolicy(new ExponentialBackoffRetry(1000, 3))
           .namespace("curator")
           .build();

       curatorFramework.start();


       // 當前節點的創建、刪除事件監聽
       // addListenerWithNodeCache(curatorFramework, "/darian");

       // 子節點的增加、修改、刪除的事件監聽
       // addlistenerWithPathChildCache(curatorFramework,"/darian");

       // 綜合節點的監聽事件
       addListerWithTreeCache(curatorFramework, "/darian");

       System.in.read();

   }

   /**
    * NodeCache + PathChildrenCache
    **/
   public static void addListerWithTreeCache(CuratorFramework curatorFramework, String path) throws Exception {
       final TreeCache treeCache = new TreeCache(curatorFramework,path);

       TreeCacheListener treeCacheListener = new TreeCacheListener() {
           @Override
           public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
               System.out.println(treeCacheEvent.getType()+"-->>"+treeCacheEvent.getData());
           }
       };

       treeCache.getListenable().addListener(treeCacheListener);
       treeCache.start();
       /*
       INITIALIZED-->>null
NODE_ADDED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,0,0,0,1,0,51539607621
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1', stat=51539607622,51539607622,1531222969992,1531222969992,0,0,0,0,1,0,51539607622
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_REMOVED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_UPDATED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,2,0,0,1,0,51539607626
, data=[49]}
        */
   }


   /**
    * 監聽對應節點下的子節點的變化
    * 創建
    * 刪除
    * 更新
    */
   public static void addlistenerWithPathChildCache(CuratorFramework curatorFramework, String path) throws Exception {
       final PathChildrenCache pathChildrenCache =
           new PathChildrenCache(curatorFramework, path, true);
       PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
           @Override
           public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
               System.out.println("receive Event:"
                                  + pathChildrenCacheEvent.getType()
                                  + "-->>"
                                  + pathChildrenCacheEvent.getData());
           }
       };

       pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
       pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
       /*
receive Event:CONNECTION_RECONNECTED-->>null
receive Event:CHILD_ADDED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
receive Event:CHILD_REMOVED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
        */
   }

   /**
    * 監聽這個節點的變化
    * 創建
    * 更新
    **/
   public static void addListenerWithNodeCache(CuratorFramework curatorFramework, String path) throws Exception {
       final NodeCache nodeCache = new NodeCache(curatorFramework, path, false);

       NodeCacheListener nodeCacheListener = new NodeCacheListener() {
           @Override
           public void nodeChanged() throws Exception {

               System.out.println("Receive Event:" + nodeCache.getCurrentData().getPath());
           }
       };

       nodeCache.getListenable().addListener(nodeCacheListener);
       nodeCache.start();
   }
}

來源於: https://javaguide.net

公眾號:不止極客

user avatar lankerens 頭像 13917911249 頭像 mo_or 頭像 lingfeng23 頭像 pudongping 頭像 jianhuan 頭像 wodingshangniliao 頭像 qiehxb8 頭像 longbig 頭像 u_11103654 頭像
10 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.