Curator提供两种Watcher来监听节点的变化。
后文中的ct:
CuratorFramework ct; @Before public void before(){ ct = CuratorFrameworkFactory.builder() //ip:端口 .connectString("192.168.10.132:2181,192.168.10.133:2181,192.168.10.135:2181") //超时时间 .sessionTimeoutMs(5000) //连接断开5秒后,会进行一次重连 .retryPolicy(new RetryOneTime(5000)) //命名空间,该命名空间作为父节点 .namespace("ct").build(); //打开连接 ct.start(); } @After public void after(){ ct.close(); }
1.Node Cache
只是监听某一个特定的节点,监听节点的新增和修改
//监视某个节点的数据变化 final NodeCache nodeCache = new NodeCache(ct, "/node1"); nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { //节点变化时的回调方法 public void nodeChanged() throws Exception { System.out.println(nodeCache.getCurrentData().getPath()); System.out.println(new String(nodeCache.getCurrentData().getData())); } }); Thread.sleep(100000); nodeCache.close();
2.PathChildren Cache
监控一个ZNode的子节点,当一个子节点增加更新删除时。Path Cache会改变它的状态,会包含最新的子节点、数据和状态。
//第三个参数:事件中是否可以获取节点的数据 PathChildrenCache cache = new PathChildrenCache(ct, "/node1", true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { //节点变化时的回调方法 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println(event.getType()); System.out.println(event.getData().getPath()); System.out.println(new String(event.getData().getData())); } }); Thread.sleep(100000); cache.close();
3.事务
一系列操作要么全部成功,要么全部失败
//开启事务 ct.inTransaction() .create().forPath("/w","w".getBytes()) .and() .delete().forPath("/zxc") //提交事务 .and().commit();