这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多 Zookeeper客户端非常底层的细节开发工作
Curator包含了几个包:
- curator-framework:对zookeeper的底层api的一些封装
- curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式 计数器等
先创建一个java工程,并且导入maven依赖。Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼 容性问题,很有可能导致节点操作失败)
1 <dependencies> 2 <dependency> 3 <groupId>org.apache.curator</groupId> 4 <artifactId>curator-framework</artifactId> 5 <version>2.12.0</version> 6 </dependency> 7 <dependency> 8 <groupId>org.apache.curator</groupId> 9 <artifactId>curator-recipes</artifactId> 10 <version>2.12.0</version> 11 </dependency> 12 <dependency> 13 <groupId>com.google.collections</groupId> 14 <artifactId>google-collections</artifactId> 15 <version>1.0</version> 16 </dependency> 17 <dependency> 18 <groupId>junit</groupId> 19 20 <artifactId>junit</artifactId> 21 <version>RELEASE</version> 22 </dependency> 23 <dependency> 24 <groupId>org.slf4j</groupId> 25 <artifactId>slf4j-simple</artifactId> 26 <version>1.7.25</version> 27 </dependency> 28 </dependencies> 29 <build> 30 <plugins> 31 <!-- java编译插件 --> 32 <plugin> 33 <groupId>org.apache.maven.plugins</groupId> 34 <artifactId>maven-compiler-plugin</artifactId> 35 <version>3.2</version> 36 <configuration> 37 <source>1.8</source> 38 <target>1.8</target> 39 <encoding>UTF-8</encoding> 40 </configuration> 41 </plugin> 42 </plugins> 43 </build>
如果改变了curator要手动重新reimport
接下来进行API操作:
- 创建永久节点
1 @Test 2 public void createNode() throws Exception { 3 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 1); 4 //获取客户端对象 5 CuratorFramework client = 6 CuratorFrameworkFactory.newClient("192.168.xxx.xxx:2181,192.168.xxx.xxx:2 7 181,192.168.xxx.xxx:2181", 1000, 1000, retryPolicy); 8 9 //调用start开启客户端操作 10 client.start(); 11 12 //通过create来进行创建节点,并且需要指定节点类型 13 client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTEN 14 T).forPath("/hello3/world"); 15 client.close(); 16 }
- 创建临时节点
1 public void createNode2() throws Exception { 2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1); 3 CuratorFramework client = 4 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 5 3000, 3000, retryPolicy); 6 client.start(); 7 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL). 8 forPath("/hello5/world"); 9 Thread.sleep(5000); 10 client.close(); 11 }
- 修改节点数据
1 @Test 2 public void nodeData() throws Exception { 3 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1); 4 CuratorFramework client = 5 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 6 3000, 3000, retryPolicy); 7 client.start(); 8 client.setData().forPath("/hello5", "hello7".getBytes()); 9 client.close(); 10 }
- 节点数据查询
1 @Test 2 public void updateNode() throws Exception { 3 RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1); 4 CuratorFramework client = 5 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181", 6 3000, 3000, retryPolicy); 7 client.start(); 8 byte[] forPath = client.getData().forPath("/hello3"); 9 System.out.println(new String(forPath)); 10 client.close(); 11 }
- 节点watch机制
1 @Test 2 public void watchZnode() throws Exception { 3 RetryPolicy policy = new ExponentialBackoffRetry(3000, 3); 4 String connectstr="192.168.xx.xxx:2181,192.168.xx.xxx:2181,192.168.xx.xxx:2181"; 5 CuratorFramework client = 6 CuratorFrameworkFactory.newClient(connectstr, 8000, 8000, policy); 7 client.start(); 8 // ExecutorService pool = Executors.newCachedThreadPool(); 9 //设置节点的cache 10 11 TreeCache treeCache = new TreeCache(client, "/hello3"); 12 //设置监听器和处理过程 13 treeCache.getListenable().addListener(new TreeCacheListener() 14 { 15 @Override 16 public void childEvent(CuratorFramework client, 17 TreeCacheEvent event) throws Exception { 18 ChildData data = event.getData(); 19 if(data !=null){ 20 switch (event.getType()) { 21 case NODE_ADDED: 22 System.out.println("NODE_ADDED : "+ 23 data.getPath() +" 数据:"+ new String(data.getData())+"监控到有新增节点"); 24 break; 25 case NODE_REMOVED: 26 System.out.println("NODE_REMOVED : "+ 27 data.getPath() +" 数据:"+ new String(data.getData())+"监控到移除节点"); 28 break; 29 case NODE_UPDATED: 30 System.out.println("NODE_UPDATED : "+ 31 data.getPath() +" 数据:"+ new String(data.getData())+"监控到有更新节点"); 32 break; 33 34 default: 35 break; 36 37 } 38 }else{ 39 System.out.println( "data is null : "+ 40 event.getType()); 41 } 42 } 43 }); 44 //开始监听 45 treeCache.start(); 46 Thread.sleep(500000); 47 }