前言:Zookeeper的监听机制很多人都踩过坑,感觉实现了watcher 接口,后面节点的变化都会一一推送过来,然而并非如此。
Watch机制官方声明:一个Watch事件是一个一次性的触发器,当被设置了Watch的数据发生了改变的时候,则服务器将这个改变发送给设置了Watch的客户端,以便通知它们。
Zookeeper机制的特点:
1.一次性触发 数据发生改变时,一个watcher event会被发送到client,但是client只会收到一次这样的信息。
2.watcher event异步发送 watcher 的通知事件从server发送到client是异步的,这就存在一个问题,不同的客户端和服务器之间通过socket进行通信,由于网络延迟或其他因素导致客户端在不通的时刻监听到事件,由于Zookeeper本身提供了ordering guarantee,即客户端监听事件后,才会感知它所监视znode发生了变化。
3.数据监视 Zookeeper有数据监视和子数据监视 getdata() and exists() 设置数据监视,getchildren()设置了子节点监视
4.注册watcher getData、exists、getChildren
5. 触发watcher create、delete、setData
watcher里面的相关事件 |
||
event For "/path" |
event For "/path/child" |
|
create("/path") |
EventType.NodeCreated |
NA |
delete("/path") |
EventType.NodeDeleted |
NA |
setData("/path") |
EventType.NodeDataChanged |
NA |
create("/path/child") |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
delete("/path/child") |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
setData("/path/child") |
NA |
EventType.NodeDataChanged |
各种操作触发的watcher事件 |
||||||||
"/path" |
"/path/child" |
|||||||
exists |
getData |
getChildren |
exists |
getData |
getChildren |
|||
create("/path") |
√ |
√ |
||||||
delete("/path") |
√ |
√ |
√ |
|||||
setData("/path") |
√ |
√ |
||||||
create("/path/child") |
√ |
√ |
√ |
|||||
delete("/path/child") |
√ |
√ |
√ |
√ |
||||
setData("/path/child") |
√ |
√ |
操作与watcher关系 |
|||||||
"/path" |
"/path/child" |
||||||
exists |
getData |
getChildren |
exists |
getData |
getChildren |
||
create("/path") |
√ |
√ |
|||||
delete("/path") |
√ |
√ |
√ |
||||
setData("/path") |
√ |
√ |
|||||
create("/path/child") |
√ |
√ |
√ |
||||
delete("/path/child") |
√ |
√ |
√ |
√ |
|||
setData("/path/child") |
√ |
√ |
各种watch测试
/** * watch test * */ public class App implements Watcher { private static CountDownLatch connectedSemaphore = new CountDownLatch(1); public static Stat stat = new Stat(); ZooKeeper zooKeeper; public static void main(String[] args) throws IOException, InterruptedException, KeeperException { String p = "/testaa"; ZooKeeper zooKeeper = new ZooKeeper("192.168.1.10:2181", 5000, new App()); connectedSemaphore.await(); //exists register watch zooKeeper.exists(p, true); String path = zooKeeper.create(p, "456".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //get register watch zooKeeper.getData(path, true, stat); zooKeeper.setData(path, "hhhh".getBytes(), -1); zooKeeper.exists(path, true); //exists register watch zooKeeper.delete(path, -1); } public void process(WatchedEvent event) { if (KeeperState.SyncConnected == event.getState()) { if (EventType.None == event.getType() && null == event.getPath()) { connectedSemaphore.countDown(); System.out.println("Zookeeper session established"); } else if (EventType.NodeCreated == event.getType()) { System.out.println("success create znode"); } else if (EventType.NodeDataChanged == event.getType()) { System.out.println("success change znode: " + event.getPath()); } else if (EventType.NodeDeleted == event.getType()) { System.out.println("success delete znode"); } else if (EventType.NodeChildrenChanged == event.getType()) { System.out.println("NodeChildrenChanged"); } } } }