Zookeeper客户端使用
一、使用原生zookeeper
在pom.xml中加入依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency>
直接上代码:
1 /** 2 * Project Name:mk-project <br> 3 * Package Name:com.suns.zookeeper <br> 4 * 5 * @author mk<br> 6 * Date:2018-10-31 9:09 <br> 7 */ 8 9 package com.suns.zookeeper; 10 11 import org.apache.zookeeper.*; 12 import org.apache.zookeeper.data.Stat; 13 14 import java.util.List; 15 import java.util.concurrent.CountDownLatch; 16 17 /** 18 * 原生的zookeeper客户端 19 * 1.连接是异步的,使用时需要注意。增加watcher,监听事件如果为SyncConnected,那么才做其他的操作。(利用CountDownLatch控制) 20 * 2.监听事件是一次性的,如果操作多次需要注册多次(可以通过getData等方法) 21 * ClassName: ZookeeperNative <br> 22 * Description: <br> 23 * @author mk 24 * @Date 2018-10-31 9:09 <br> 25 * @version 26 */ 27 public class ZookeeperNative { 28 29 public static final String connect = "127.0.0.1:2181"; 30 private static ZooKeeper zookeeper = null; 31 private static CountDownLatch cdl = new CountDownLatch(0); 32 private static String nodePath = "/native1"; 33 private static String nodeChildPath = "/native1/n1/n11/n111/n1111"; 34 35 public static void main(String[] args) throws Exception { 36 37 //初始化 38 init(connect,5000); 39 40 //新增 41 create(nodePath,"n1"); 42 //递归新增 43 createRecursion(nodeChildPath,"n1"); 44 45 //查询 46 query(nodePath); 47 48 //修改 49 update(nodePath,"n11"); 50 51 //单个节点删除 52 // delete(nodePath); 53 //递归删除 54 deleteRecursion(nodePath); 55 } 56 57 private static void deleteRecursion(String nodePath) throws KeeperException, InterruptedException { 58 Stat exists = zookeeper.exists(nodePath, true); 59 if(null == exists){ 60 System.out.println(nodePath+"不存在"); 61 return ; 62 } 63 64 List<String> list = zookeeper.getChildren(nodePath, true); 65 if(null == list || list.size() == 0){ 66 delete(nodePath); 67 String parentPath = nodePath.substring(0,nodePath.lastIndexOf("/")); 68 System.out.println("parentPath="+parentPath); 69 if(!"".equals(parentPath)){ 70 deleteRecursion(parentPath); 71 } 72 }else{ 73 for(String child : list){ 74 deleteRecursion(nodePath+"/"+child); 75 } 76 } 77 } 78 79 private static void delete(String path) throws KeeperException, InterruptedException { 80 query(path);//为了让watcher能被监听,在这里查询一次 81 zookeeper.delete(path,-1); 82 System.out.println("delete:"+"["+path+"]"); 83 } 84 85 private static void update(String path, String data) throws KeeperException, InterruptedException { 86 Stat stat = zookeeper.setData(path, data.getBytes(), -1);//versoin=-1代表不记录版本 87 System.out.println("setData:"+"["+path+"],stat:"+stat); 88 } 89 90 private static void query(String path) throws KeeperException, InterruptedException { 91 Stat stat = new Stat(); 92 byte[] data = zookeeper.getData(path, true, stat); 93 System.out.println("query:"+"["+path+"],result:"+new String(data) + ",stat:"+stat); 94 } 95 96 private static void createRecursion(String path,String data) throws KeeperException, InterruptedException { 97 if(null == path || "".equals(path)){ 98 System.out.println("节点["+path+"]为空"); 99 return; 100 } 101 String paths[] = path.substring(1,path.length()).split("/"); 102 for(int i=0;i<paths.length;i++){ 103 String childPath = ""; 104 for(int j=0;j<=i;j++){ 105 childPath += "/" + paths[j]; 106 } 107 create(childPath,data); 108 } 109 110 Stat exists = zookeeper.exists(path, true); 111 if(null != exists){ 112 System.out.println("节点["+path+"]已存在,不能新增"); 113 return; 114 } 115 String result = zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 116 System.out.println("create:"+"["+path+"-->"+data+"],result:"+result); 117 } 118 119 private static void create(String path,String data) throws KeeperException, InterruptedException { 120 Stat exists = zookeeper.exists(path, true); 121 if(null != exists){ 122 System.out.println("节点["+path+"]已存在,不能新增"); 123 return; 124 } 125 String result = zookeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); 126 System.out.println("create:"+"["+path+"-->"+data+"],result:"+result); 127 } 128 129 private static void init(final String connectStr, int sessionTimeout) throws Exception { 130 // zookeeper = new ZooKeeper(connectStr, sessionTimeout, null); 131 //由于zookeeper连接是异步的,如果new ZooKeeper(connectStr, sessionTimeout, null)完之后马上使用,有可能会报错。 132 //解决办法:增加watcher,监听事件如果为SyncConnected,那么才做其他的操作。(利用CountDownLatch控制) 133 zookeeper = new ZooKeeper(connectStr, sessionTimeout, new Watcher() { 134 @Override 135 public void process(WatchedEvent watchedEvent) { 136 if(watchedEvent.getState() == Event.KeeperState.SyncConnected) { 137 // System.out.println("zookeeper已连接["+connectStr+"]成功"); 138 cdl.countDown(); 139 } 140 if(watchedEvent.getType() == Event.EventType.NodeCreated){ 141 System.out.println("zookeeper有新节点创建"+watchedEvent.getPath()); 142 } 143 if(watchedEvent.getType() == Event.EventType.NodeDataChanged){ 144 System.out.println("zookeeper有节点数据变化"+watchedEvent.getPath()); 145 } 146 if(watchedEvent.getType() == Event.EventType.NodeDeleted){ 147 System.out.println("zookeeper有节点被删除"+watchedEvent.getPath()); 148 } 149 if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){ 150 System.out.println("zookeeper有子节点变化"+watchedEvent.getPath()); 151 } 152 } 153 }); 154 cdl.await(); 155 System.out.println("init start :" +zookeeper); 156 } 157 158 }
运行结果: