原文鏈接
- 數據存儲
- 基於 Java API初探zookeeper的使用
- 深入分析Watcher機制的實現原理
- Curator客户端的使用,簡單高效
數據存儲
基於znode,基於文件系統風格的,樹形結構的文件模型,和內存數據庫差不多,基於增刪改查的命令去操作數據庫,整個數據庫包括整個樹形結構的內容,比如説我們的節點目錄,節點路徑和權限信息,而且zookeeper它會定時去把這些信息數據存儲到磁盤上。
使用DataTree管理整個數據存儲,主要用來存儲節點路徑和數據的內容。底層是一個典型的基於ConcurrentHashMap的一個存儲結構。
事務日誌
創造一個節點或者進行事務操作的時候,都會記錄一個事務日誌,
zoo.cfg 文件中 , datadir
快照日誌
它會記錄zookeeper的某個時刻的快照,記錄整個zookeeper全量數據的內容,類似於我們數據備份的一個概念,並且他會把這些文件放在指定的目錄下去做一個存儲,也是基於datadir這樣一個路徑去做一個存儲
運行時日誌
bin/zookeeper.out
我們可以通過它來看zookeeper運行時的一些情況。
實際應用過程中,我們會把事務日誌單獨掛載到一個磁盤上,我們的每一個請求,事務請求都會做一個輸入日誌的記錄,那這個磁盤的讀寫的性能,將影響到zookeeper本身的操作性能。
基於Java API初探zookeeper的使用
sh zkCli.sh
基於控制枱
<dependency>
<groupId>com.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.8</version>
</dependency>
引入包以後就可以使用相應的API去做相應的操作,
代碼
public class ConnectionDemo {
public static void main(String[] args) {
try {
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000,null);
System.out.println(zooKeeper.getState());
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
控制枱
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
連接以後
顯示連接狀態為 connecting 等待以後connected
引入watcher機制,new watcher() 直接輸出鏈接狀態 connected
連接,
代碼
public class ConnectionDemo {
public static void main(String[] args) {
try {
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000,null);
System.out.println(zooKeeper.getState());
Thread.sleep(1000);
System.out.println(zooKeeper.getState());
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
控制枱
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTING
CONNECTED
Stat 對應控制枱的各項屬性
Watcher實現保證連接成功
public class ConnectionDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 為了保證連接的成功狀態
@Override
public void process(WatchedEvent watchedEvent) {
// 連接成功以後觸發watcher事件
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
// 如果收到了服務端的響應事件,連接成功
// 連接成功以後做一個遞減。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());//CONNECTED
zooKeeper.close();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
控制枱
log4j:WARN No appenders could be found for logger (org.apache.zookeeper.ZooKeeper).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
CONNECTED
參數都需要傳進去version
Stat.getVersion()
增刪改查
public class ConnectionDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 為了保證連接的成功狀態
@Override
public void process(WatchedEvent watchedEvent) {
// 連接成功以後觸發watcher事件
if(Event.KeeperState.SyncConnected == watchedEvent.getState()){
// 如果收到了服務端的響應事件,連接成功
// 連接成功以後做一個遞減。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
System.out.println(zooKeeper.getState());//CONNECTED
// 添加節點
zooKeeper.create("/java-con-darian","232".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
Stat stat = new Stat();
//Stat和我們在控制枱看到的屬性是一樣的。
// 得到節點的值
byte[] dataBytes = zooKeeper.getData("/java-con-darian", null, stat);
System.out.println(new String(dataBytes));
Thread.sleep(1000);
// 修改節點的值,樂觀鎖的概念
zooKeeper.setData("/java-con-darian", "55".getBytes(),stat.getVersion());
byte[] updateBytes = zooKeeper.getData("/java-con-darian", null, stat);
System.out.println(new String(updateBytes));
zooKeeper.delete("/java-con-darian", stat.getVersion());
zooKeeper.close();
// 當前線程進行阻塞
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
控制枱
CONNECTED
232
55
事件機制
我們對一個節點發起訂閲以後,那麼對這個節點發起的任何變化,它都會有一個觸發,發給觸發前綁定的客户端,相當於一個發佈訂閲的功能。
通過watcher機制,保證剛開始的連接狀態是connected.
Watcher機制是zookeeper裏邊非常重要的一個特性,那麼我們在zookeeper上創建節點的時候,我們可以綁定監聽事件,比如説,我們可以監聽節點的變更,刪除,子節點變更的一些事件,通過這些事件,可以實現zookeeper的分佈式鎖 ,集羣管理的一些功能。
Watcher特性:當數據發生變化的時候,zookeeper會產生一個watcher事件,並且發送到客户端,但客户端只會收到一次通知,如果後續這個節點再次發生變化,那麼之前設置的watcher的客户端不會再次收到消息(watcher機制是一次性操作)。可以通過循環監聽達到永久的監聽效果。
如何註冊事件機制
通過三個操作來綁定事件:getData、Exists、getChildren
| getData | 通過一個節點獲得一個節點的數據 |
| Exists | 判斷這個節點是否存在 |
| getChildren | 可以拿到當前節點的所有子節點 |
通過設置節點的路徑,相當於給節點綁定了事件
如何觸發事件?
凡是事務類型的操作,都會觸發監聽操作 create / delete / setData
zookeeper.exists(event.getPath().true);
true :默認使用全局的默認事件。
代碼
public class WatcherDemo {
public static void main(String[] args) {
try {
final CountDownLatch countDownLatch = new CountDownLatch(1);
final ZooKeeper zooKeeper =
new ZooKeeper("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181",
4000, new Watcher() {
// 為了保證連接的成功狀態
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("全局的默認的watcher事件:"
+ watchedEvent.getType()
+ "----->>>>>>"
+ watchedEvent.getPath());
// 連接成功以後觸發watcher事件
if (Event.KeeperState.SyncConnected == watchedEvent.getState()) {
// 如果收到了服務端的響應事件,連接成功
// 連接成功以後做一個遞減。
countDownLatch.countDown();
}
}
});
countDownLatch.await();
zooKeeper.create("/zk-tmp-darian", "1".getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// exists getdata getchildren
// 綁定事件
// 我們在創建zookeeper的時候創建了一個默認的匿名內部類,
// 如果説設置成 true 的話,所有的事件都會觸發到內部類裏邊
// zooKeeper.exists("/zi-tmp-darian",true);
// 綁定自己的watcher事件
// 通過 exists 綁定事件
Stat stat = zooKeeper.exists("/zk-tmp-darian", new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType() + "--->>>>" + watchedEvent.getPath());
try {
// 通過再一次綁定事件去實現永久監聽的一個效果
zooKeeper.exists(watchedEvent.getPath(), true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 通過修改食物類型操作來觸發監聽事件
stat = zooKeeper.setData("/zk-tmp-darian", "222".getBytes(), stat.getVersion());
Thread.sleep(1000);
zooKeeper.delete("/zk-tmp-darian", stat.getVersion());
System.in.read();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
控制枱
全局的默認的watcher事件:None----->>>>>>null
NodeDataChanged--->>>>/zk-tmp-darian
全局的默認的watcher事件:NodeDeleted----->>>>>>/zk-tmp-darian
Watcher事件機制:
事件類型
| None(-1) | 客户端鏈接狀態發生變化的時候,會收到none的事件 |
| NodeCreated(1) | 創建節點的事件。比如説zk-persis-mic |
| NodeDeleted(2) | 刪除節點的事件。 |
| NodeDataChanged(3) | 節點數據發生變更 |
| NodeChildrenChanged(4) | 子節點被創建、被刪除、會發生事件觸發 |
觸發條件
| zk-persis-mic(監聽事件) | zk-persis-mic/child(監聽事件) | |
|---|---|---|
| create(/zk-persis-mic) | NodeCreated(exists/getData) | 無 |
| delete(/zk-persis-mic) | NodeDelete(exists/getData) | 無 |
| setData(/zk-persis-mic) | NodeDataChanged(exists/getData) | 無 |
| create(/zk-persis-mic/children) | NodeChildrenChanged(getChildren) | NodeCreated |
| delete(/zk-persis-mic/children) | NodeChildrenChanged(getChildren) | NodeDelete |
| setData(/zk-persis-mic/children) | NodeDataChanged |
事件的實現原理
通過 #exists方法去綁定的事件
入口就是 #exists
Zookeeper 類中的 #exists 方法
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new ExistsWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
if (r.getErr() == KeeperException.Code.NONODE.intValue()) {
return null;
}
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
return response.getStat().getCzxid() == -1 ? null : response.getStat();
}
我們client在zookeeper server上註冊一個事件
ZkWatcher Manager 上保存客户端的事件註冊
源碼地址:Zookeeper核心類中exists(
分析原理:
在客户端調用裏邊就是實例化,
調用增刪改查方法
一定有一個機制去對這個隊列去做一個數據的處理
OutgoingQueue
有一個入隊肯定有一個出隊
數據不是放到隊列中就完事了,肯定需要有機制去對隊列做一個出隊列
判斷是引導流程走向的一個方式!
Ctrl + alt + B 點到方法上,可以看到有幾個實現
NettyServerCnxn
服務端處理請求的類
#receiveMessage() 方法
通過鏈式的process對請求去做一個鏈式的處理,把不同的業務去做一個分離
預處理 > 同步處理 > finally處理
PIPE(管道模式)
Linux 的管道的處理,一個請求過來,進入不同的管道,做不同的處理,最終返回一個結果。
類似於鏈式風格的一種設計。
Zookeeper中大部分都在用多線程的方式異步化流程。
一定有一個線程去處理隊列 submittRequests。
Hashmap<watcher, String path>
Curator的操作
主要是對我們數據節點操作的一個封裝
原生操作會有很多不便之處,雖然説靈活性比較好,但是我們要做一些事情,比如説,要去綁定事件很複雜,我們要去創建一個連接也很複雜,那怎麼去簡化?
Netflix公司開源了一個zookeeper客户端,他跟原生的客户端相比,它是一個更高層次的抽象,他不光是API層次的一個抽象,同時還封裝了一套基於應用場景的API,也就是説zookeeper它本身能夠實現分佈式鎖,leader選舉這些功能,那麼curator對這些功能做了一些封裝,我們直接調用API就好了。就可以完成leader選舉的這樣一個機制,那麼我們怎麼實現這樣一個功能。
基於fluent風格的一個機制
CuratorFramework curatorFamework = CuratorFrameworkFactory
.builder()
.connectString()
.sessionTimeoutMS(4000)
.retryPolicy(new ExponetialBackoffRetry(1000,3))
.namespace(“curator”)
.build()
// 結果: /curator/mic/node1
// 原生api中,必須是逐層創建,也就是父節點必須存在,子節點才能創建
Curator的增刪改查
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
/**
* <br>類説明 :
* <br>屬性説明:
* <br>作者:Darian
**/
public class CuratorDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181")
.sessionTimeoutMs(4000)
// 一個重試機制,遞減的重試機制
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
// 指定命名空間,我們會把某個業務放在一個大的命名空間下面
// 因為我們要區分每個場景下的業務劃分
.namespace("/curator")
.build();
curatorFramework.start();
// 創建結果: /curator/darian/node1
// 原生的 API 中,必須是逐層創建,也就是父節點必須存在,子節點才能創建
curatorFramework.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath("/darian/node1", "2".getBytes());
// 查詢節點,得到節點的 Stat 信息
Stat stat = new Stat();
curatorFramework.getData()
.storingStatIn(stat)
.forPath("/darian/node1");
// 更新操作
curatorFramework.setData()
.withVersion(stat.getVersion())
.forPath("/darian/node1", "232".getBytes());
// 去刪除某個節點
curatorFramework.delete()
.deletingChildrenIfNeeded()
.forPath("/darian/node1");
}
}
public static void main(String[] args) {
CuratorFramework.create()
.creatingPatentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(“/mic/node1”,”1”.getBytes());
//刪除
curatorFramework
.delete()
.deletingChildrenIfNeeded()
.forPath(“/mic/node1”);
Stat stat = new Stat();
curatorFramework
.getData()
.storingStatIn(stat).forPath(“/mic/node1”)
curatorFramework
.setData()
.withVersion(sta.getVersion())
.forPath(“/mic/node1”,”xx”.getBytes);
curatorFramework.close(); //關閉連接
}
curator 事件的高度封裝,
一定有一個節點特性
| PathChildCache | 監聽一個節點下子節點的創建、刪除、更新
(CHILD-ADDED,CHILD_UPDATE,CHILD_REMOVED) |
| NodeCache | 監聽一個節點的更新和創建時間
(Receive Event:/mic) |
| TreeCache | zonghe1PathChildCache和NodeCache的特性 |
Curator的watcher機制
package com.gupao.study.vip;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* <br>類説明 : 事件的高度封裝
* <p>
* curator封裝了三種事件的機制
* PathChildCache 監聽一個節點下節點的創建、刪除、更新
* NodeCache 監聽一個節點的更新和創建事件
* TreeCache 綜合 PatchChildCache 和 NodeCache 的特性
*
* <br>屬性説明:
* <br>作者:Darian
**/
public class CuratorWatcherDemo {
public static void main(String[] args) throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkFactory
.builder()
.connectString("192.168.136.128:2181," +
"192.168.136.129:2181," +
"192.168.136.130:2181")
.sessionTimeoutMs(4000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.namespace("curator")
.build();
curatorFramework.start();
// 當前節點的創建、刪除事件監聽
// addListenerWithNodeCache(curatorFramework, "/darian");
// 子節點的增加、修改、刪除的事件監聽
// addlistenerWithPathChildCache(curatorFramework,"/darian");
// 綜合節點的監聽事件
addListerWithTreeCache(curatorFramework, "/darian");
System.in.read();
}
/**
* NodeCache + PathChildrenCache
**/
public static void addListerWithTreeCache(CuratorFramework curatorFramework, String path) throws Exception {
final TreeCache treeCache = new TreeCache(curatorFramework,path);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println(treeCacheEvent.getType()+"-->>"+treeCacheEvent.getData());
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
/*
INITIALIZED-->>null
NODE_ADDED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,0,0,0,1,0,51539607621
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1', stat=51539607622,51539607622,1531222969992,1531222969992,0,0,0,0,1,0,51539607622
, data=[49]}
NODE_ADDED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_REMOVED-->>ChildData{path='/darian/node1/node1', stat=51539607623,51539607623,1531222989856,1531222989856,0,0,0,0,1,0,51539607623
, data=[49]}
NODE_UPDATED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian/node1', stat=51539607622,51539607625,1531222969992,1531223116477,1,2,0,0,1,0,51539607624
, data=[50]}
NODE_REMOVED-->>ChildData{path='/darian', stat=51539607621,51539607621,1531222950294,1531222950294,0,2,0,0,1,0,51539607626
, data=[49]}
*/
}
/**
* 監聽對應節點下的子節點的變化
* 創建
* 刪除
* 更新
*/
public static void addlistenerWithPathChildCache(CuratorFramework curatorFramework, String path) throws Exception {
final PathChildrenCache pathChildrenCache =
new PathChildrenCache(curatorFramework, path, true);
PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("receive Event:"
+ pathChildrenCacheEvent.getType()
+ "-->>"
+ pathChildrenCacheEvent.getData());
}
};
pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);
/*
receive Event:CONNECTION_RECONNECTED-->>null
receive Event:CHILD_ADDED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
receive Event:CHILD_REMOVED-->>ChildData{path='/darian/node1', stat=51539607611,51539607611,1531222192297,1531222192297,0,0,0,0,1,0,51539607611
, data=[49]}
*/
}
/**
* 監聽這個節點的變化
* 創建
* 更新
**/
public static void addListenerWithNodeCache(CuratorFramework curatorFramework, String path) throws Exception {
final NodeCache nodeCache = new NodeCache(curatorFramework, path, false);
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("Receive Event:" + nodeCache.getCurrentData().getPath());
}
};
nodeCache.getListenable().addListener(nodeCacheListener);
nodeCache.start();
}
}
來源於: https://javaguide.net
公眾號:不止極客