ZooKeeper常用客户端有三种:原生客户端、zkClient、curator
项目中使用前,需要导入相关依赖
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.10</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency> </dependencies>
原生客户端
创建会话
不使用监听
public class TestCreateSession { /*服务地址*/ private static final String ZK_SERVER = "127.0.0.1:2181"; @Test public void createSession2() throws IOException { ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, null); System.out.println("zk.getState() = " + zk.getState()); } }
zk.getState() = CONNECTING
通过之前的学习可以知道,CONNECTING标志客户端正在连接,并不能确保已经连接上zk服务。可能发生还没有连接到zk服务就进行对zk访问的情况
使用监听
public class TestCreateSession { /*服务地址*/ private static final String ZK_SERVER = "127.0.0.1:2181"; /*倒计时器*/ private CountDownLatch latch = new CountDownLatch(1); @Test public void createSession() throws IOException, InterruptedException { ZooKeeper zk = new ZooKeeper(ZK_SERVER, 50000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected){/*确保zk已连接*/ latch.countDown(); } } }); latch.await(); System.out.println("zk.getState() = " + zk.getState()); } }
zk.getState() = CONNECTED
使用监听机制可以确保在ZooKeeper初始化完成前进行等待,初始化完成再进行后续操作
客户端基本操作
1 public class TestJavaApi implements Watcher { 2 /*zk服务地址*/ 3 private static final String ZK_SERVER = "127.0.0.1:2181"; 4 /*会话连接超时时间*/ 5 private static final int SESSION_TIMEOUT = 50000; 6 /*指定目录【节点】*/ 7 private static final String ZK_PATH = "/zkDir"; 8 /*客户端连接会话*/ 9 private ZooKeeper zk = null; 10 11 /*倒计时器*/ 12 private CountDownLatch latch = new CountDownLatch(1); 13 /** 14 * 事件被触发时的动作 15 * @param event 事件 16 */ 17 @Override 18 public void process(WatchedEvent event) { 19 System.out.println("收到事件通知:" + zk.getState() +" "); 20 if (event.getState() == Event.KeeperState.SyncConnected){ 21 latch.countDown(); 22 } 23 } 24 25 /** 26 * 创建zk会话连接 27 * @param connectString zk服务器地址列表,可以是"地址1,地址2,...." 28 * @param sessionTimeout Session超时时间 29 */ 30 public void createZkSession(String connectString, int sessionTimeout){ 31 try { 32 zk = new ZooKeeper(connectString,sessionTimeout,this); 33 latch.await(); 34 System.out.println("zk.getState() = " + zk.getState()); 35 } catch (IOException|InterruptedException e) { 36 System.out.println("连接创建失败"); 37 e.printStackTrace(); 38 } 39 } 40 41 /** 42 * 关闭zk会话 43 */ 44 public void releaseSession(){ 45 try { 46 zk.close(); 47 } catch (InterruptedException e) { 48 e.printStackTrace(); 49 } 50 } 51 52 /** 53 * 创建节点【目录、文件】 54 * @param path 节点 55 * @param data 节点数据 56 * @return 57 */ 58 public boolean createNode(String path,String data){ 59 try { 60 String node = zk.create(path/*节点path*/, 61 data.getBytes()/*节点数据*/, 62 ZooDefs.Ids.OPEN_ACL_UNSAFE/*权限控制 OPEN_ACL_UNSAFE相当于world:anyone*/, 63 CreateMode.EPHEMERAL)/*临时节点*/; 64 System.out.println("节点创建成功,node = " + node); 65 return true; 66 } catch (KeeperException|InterruptedException e) { 67 System.out.println("节点创建失败"); 68 e.printStackTrace(); 69 } 70 return false; 71 } 72 73 /** 74 * 获取节点数据 75 * @param path 节点路径 76 * @return 77 */ 78 public String readNode(String path){ 79 try { 80 byte[] data = zk.getData(path, true, null); 81 String nodeData = new String(data,"utf-8"); 82 //System.out.println("获取"+path+"节点数据:"+nodeData); 83 return nodeData; 84 } catch (KeeperException | InterruptedException | UnsupportedEncodingException e) { 85 e.printStackTrace(); 86 return null; 87 } 88 } 89 90 /** 91 * 修改节点数据 92 * @param path 节点path 93 * @param newData 节点新数据 94 * @return 95 */ 96 public boolean writeNode(String path,String newData){ 97 try { 98 Stat stat = zk.setData(path, newData.getBytes(), -1); 99 System.out.println("节点["+path+"]修改成功"); 100 return true; 101 } catch (KeeperException|InterruptedException e) { 102 e.printStackTrace(); 103 } 104 return false; 105 } 106 107 /** 108 * 删除指定节点 109 * @param path 节点path 110 */ 111 public void deleteNode(String path){ 112 try { 113 zk.delete(path,-1); 114 System.out.println("节点["+path+"]删除成功"); 115 } catch (InterruptedException|KeeperException e) { 116 System.out.println("节点["+path+"]删除失败"); 117 e.printStackTrace(); 118 } 119 } 120 121 public static void main(String[] args) { 122 TestJavaApi api = new TestJavaApi(); 123 api.createZkSession(ZK_SERVER,SESSION_TIMEOUT); 124 if(api.createNode(ZK_PATH,"初始节点内容")){ 125 System.out.println("第一次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH)); 126 api.writeNode(ZK_PATH,"修改ZK_PATH节点数据"); 127 System.out.println("第二次读"+ZK_PATH+"节点数据:"+api.readNode(ZK_PATH)); 128 api.deleteNode(ZK_PATH); 129 } 130 api.releaseSession(); 131 } 132 } 133 /** 134 ************输出结果*********** 135 收到事件通知:CONNECTED 136 137 zk.getState() = CONNECTED 138 节点创建成功,node = /zkDir 139 第一次读/zkDir节点数据:初始节点内容 140 收到事件通知:CONNECTED 141 142 节点[/zkDir]修改成功 143 第二次读/zkDir节点数据:修改ZK_PATH节点数据 144 收到事件通知:CONNECTED 145 146 节点[/zkDir]删除成功 147 */
watch机制
1 public class ZkWatcher implements Watcher { 2 private static final String ZK_SERVER = "127.0.0.1:2181"; 3 private static final int SESSION_TIMEOUT = 15000; 4 private static final String PARENT_PATH ="/testWatcher"; 5 private static final String CHILDREN_PATH = "/testWatcher/children"; 6 private ZooKeeper zk = null; 7 /*定义原子变量,用于计算进入监听的次数*/ 8 private static AtomicInteger seq = new AtomicInteger(); 9 /*会话进入标志*/ 10 private static final String LOG_PREFIX_OF_MAIN = "【main】"; 11 12 /*倒计时器*/ 13 private CountDownLatch latch = new CountDownLatch(1); 14 @Override 15 public void process(WatchedEvent event) { 16 System.out.println("**************进入process方法**************"); 17 System.out.println("event = " + event); 18 /*模拟业务连接初始化工作*/ 19 TimeUtils.threadSleep(200); 20 if (event == null) { return; } 21 /*连接状态*/ 22 Event.KeeperState eventState = event.getState(); 23 /*事件类型*/ 24 Event.EventType eventType = event.getType(); 25 /*受影响的路径*/ 26 String eventPath = event.getPath(); 27 /*进入监听标志*/ 28 String logPreFix = "【watcher-"+seq.incrementAndGet()+"】"; 29 System.out.println(logPreFix + "收到watcher通知"); 30 System.out.println(logPreFix + "连接状态: "+eventState.toString()); 31 System.out.println(logPreFix + "事件类型: "+eventType.toString()); 32 33 if(Event.KeeperState.SyncConnected == eventState){ 34 if (Event.EventType.None == eventType){/*成功连接上ZK服务器*/ 35 System.out.println(logPreFix + "成功连接上ZK服务器"); 36 latch.countDown(); 37 }else if (Event.EventType.NodeCreated == eventType){/*创建节点*/ 38 System.out.println(logPreFix + "创建节点"); 39 TimeUtils.threadSleep(100); 40 /*使用监听*/ 41 exist(eventPath,true); 42 }else if (Event.EventType.NodeChildrenChanged == eventType){ 43 System.out.println(logPreFix + "子节点变更"); 44 TimeUtils.threadSleep(1000); 45 System.out.println(logPreFix + "子节点列表:" + getChildren(eventPath,true)); 46 }else if (Event.EventType.NodeDataChanged == eventType){ 47 System.out.println(logPreFix + "修改节点数据"); 48 TimeUtils.threadSleep(100); 49 System.out.println(logPreFix + "修改后节点内容:" + readNode(eventPath, true)); 50 }else if (Event.EventType.NodeDeleted == eventType){ 51 System.out.println(logPreFix + "删除节点"); 52 System.out.println(logPreFix + "节点 " + eventPath + " 被删除"); 53 } 54 }else if(Event.KeeperState.Disconnected == eventState){ 55 System.out.println(logPreFix + "与zk服务器断开连接"); 56 }else if(Event.KeeperState.AuthFailed == eventState){ 57 System.out.println(logPreFix + "验证失败"); 58 }else if(Event.KeeperState.Expired == eventState){ 59 System.out.println(logPreFix + "会话超时"); 60 } 61 System.out.println("----------------------------------------"); 62 } 63 /** 64 * 创建ZK连接 65 * @param connectAddr ZK服务器地址列表 66 * @param sessionTimeout Session超时时间 67 */ 68 public void createConnection(String connectAddr, int sessionTimeout) { 69 this.releaseConnection(); 70 try { 71 zk = new ZooKeeper(connectAddr, sessionTimeout, this); 72 System.out.println(LOG_PREFIX_OF_MAIN + "开始连接zk服务器"); 73 latch.await(); 74 } catch (Exception e) { 75 e.printStackTrace(); 76 } 77 } 78 79 /** 80 * 关闭ZK连接 81 */ 82 public void releaseConnection() { 83 if (this.zk != null) { 84 try { 85 this.zk.close(); 86 } catch (InterruptedException e) { 87 e.printStackTrace(); 88 } 89 } 90 } 91 92 /** 93 * 创建节点 94 * @param path 节点路径 95 * @param data 数据内容 96 * @return 97 */ 98 public boolean createPath(String path, String data) { 99 try {/*设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)*/ 100 zk.exists(path, true); 101 System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " + 102 this.zk.create( /*路径*/ 103 path,/*数据*/ 104 data.getBytes(),/*所有可见*/ 105 ZooDefs.Ids.OPEN_ACL_UNSAFE,/*永久存储*/ 106 CreateMode.PERSISTENT ) + 107 ", content: " + data); 108 } catch (Exception e) { 109 e.printStackTrace(); 110 return false; 111 } 112 return true; 113 } 114 115 /** 116 * 删除所有节点 117 */ 118 public void deleteAllTestPath() { 119 if(this.exist(CHILDREN_PATH, false) != null){ 120 this.deleteNode(CHILDREN_PATH); 121 } 122 if(this.exist(PARENT_PATH, false) != null){ 123 this.deleteNode(PARENT_PATH); 124 } 125 } 126 127 /** 128 * 删除指定节点 129 * @param path 130 */ 131 public void deleteNode(String path) { 132 try { 133 zk.delete(path,-1); 134 System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path); 135 } catch (InterruptedException|KeeperException e) { 136 e.printStackTrace(); 137 } 138 } 139 140 /** 141 * 获取节点内容 142 * @param path 143 * @param needWatch 144 * @return 145 */ 146 public String readNode(String path, boolean needWatch) { 147 try { 148 byte[] data = zk.getData(path, needWatch, null); 149 return new String(data,"utf-8"); 150 } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) { 151 e.printStackTrace(); 152 return null; 153 } 154 } 155 156 /** 157 * 获取指定节点的子节点列表 158 * @param path 159 * @param needWatch 160 * @return 161 */ 162 public List<String> getChildren(String path, boolean needWatch) { 163 try { 164 return this.zk.getChildren(path, needWatch); 165 } catch (KeeperException|InterruptedException e) { 166 e.printStackTrace(); 167 return null; 168 } 169 } 170 /** 171 * 更新指定节点数据内容 172 * @param path 节点路径 173 * @param data 数据内容 174 * @return 175 */ 176 public boolean writeNode(String path, String data) { 177 try { 178 System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " + 179 this.zk.setData(path, data.getBytes(), -1)); 180 } catch (Exception e) { 181 e.printStackTrace(); 182 } 183 return false; 184 } 185 /** 186 * path节点是否存在 187 * @param path 188 * @param needWatch 189 * @return 190 */ 191 public Stat exist(String path, boolean needWatch) { 192 try { 193 return zk.exists(path,needWatch); 194 } catch (KeeperException|InterruptedException e) { 195 e.printStackTrace(); 196 return null; 197 } 198 } 199 200 public static void main(String[] args) throws Exception { 201 //建立watcher 202 ZkWatcher watcher = new ZkWatcher(); 203 //创建连接 204 watcher.createConnection(ZK_SERVER, SESSION_TIMEOUT); 205 //System.out.println(zkWatch.zk.toString()); 206 Thread.sleep(1000); 207 // 清理节点 208 watcher.deleteAllTestPath(); 209 if (watcher.createPath(PARENT_PATH, System.currentTimeMillis() + "")) { 210 System.out.println("---------------------- read parent ----------------------------"); 211 /* 212 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。 213 watch是一次性的,也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。 214 */ 215 watcher.readNode(PARENT_PATH, true); 216 watcher.writeNode(PARENT_PATH, System.currentTimeMillis() + ""); 217 System.out.println("---------------------- read children path ----------------------------"); 218 /* 219 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated, 220 而不会输出NodeChildrenChanged,也就是说创建子节点时没有watch。 221 如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在 222 创建c1时watch,输出c1的NodeChildrenChanged,而不会输出创建c2时的NodeChildrenChanged, 223 如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法, 224 其中path="/p/c1" 225 */ 226 watcher.getChildren(PARENT_PATH, true); 227 Thread.sleep(1000); 228 // 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true) 229 watcher.createPath(CHILDREN_PATH, System.currentTimeMillis() + ""); 230 Thread.sleep(1000); 231 watcher.readNode(CHILDREN_PATH, true); 232 watcher.writeNode(CHILDREN_PATH, System.currentTimeMillis() + ""); 233 } 234 Thread.sleep(20000); 235 // 清理节点 236 watcher.deleteAllTestPath(); 237 Thread.sleep(1000); 238 watcher.releaseConnection(); 239 } 240 } 241 242 class TimeUtils{ 243 public static void threadSleep(long mills){ 244 try { 245 Thread.sleep(mills); 246 } catch (InterruptedException e) { 247 e.printStackTrace(); 248 } 249 } 250 } 251 252 /* 253 *********输出结果******** 254 【main】开始连接zk服务器 255 **************进入process方法************** 256 event = WatchedEvent state:SyncConnected type:None path:null 257 【watcher-1】收到watcher通知 258 【watcher-1】连接状态: SyncConnected 259 【watcher-1】事件类型: None 260 【watcher-1】成功连接上ZK服务器 261 ---------------------------------------- 262 **************进入process方法************** 263 event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher 264 【main】节点创建成功, Path: /testWatcher, content: 1567510219582 265 ---------------------- read parent ---------------------------- 266 【main】更新数据成功,path:/testWatcher, stat: 223,224,1567510219588,1567510219598,1,0,0,0,13,0,223 267 268 ---------------------- read children path ---------------------------- 269 【watcher-2】收到watcher通知 270 【watcher-2】连接状态: SyncConnected 271 【watcher-2】事件类型: NodeCreated 272 【watcher-2】创建节点 273 ---------------------------------------- 274 **************进入process方法************** 275 event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher 276 【watcher-3】收到watcher通知 277 【watcher-3】连接状态: SyncConnected 278 【watcher-3】事件类型: NodeDataChanged 279 【watcher-3】修改节点数据 280 【watcher-3】修改后节点内容:1567510219598 281 ---------------------------------------- 282 **************进入process方法************** 283 event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatcher/children 284 【main】节点创建成功, Path: /testWatcher/children, content: 1567510220605 285 【watcher-4】收到watcher通知 286 【watcher-4】连接状态: SyncConnected 287 【watcher-4】事件类型: NodeCreated 288 【watcher-4】创建节点 289 ---------------------------------------- 290 **************进入process方法************** 291 event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher 292 【watcher-5】收到watcher通知 293 【watcher-5】连接状态: SyncConnected 294 【watcher-5】事件类型: NodeChildrenChanged 295 【watcher-5】子节点变更 296 【main】更新数据成功,path:/testWatcher/children, stat: 225,226,1567510220606,1567510221615,1,0,0,0,13,0,225 297 298 【watcher-5】子节点列表:[children] 299 ---------------------------------------- 300 **************进入process方法************** 301 event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatcher/children 302 【watcher-6】收到watcher通知 303 【watcher-6】连接状态: SyncConnected 304 【watcher-6】事件类型: NodeDataChanged 305 【watcher-6】修改节点数据 306 【watcher-6】修改后节点内容:1567510221615 307 ---------------------------------------- 308 **************进入process方法************** 309 event = WatchedEvent state:SyncConnected type:NodeDeleted path:/testWatcher/children 310 【main】删除节点成功,path:/testWatcher/children 311 【main】删除节点成功,path:/testWatcher 312 【watcher-7】收到watcher通知 313 【watcher-7】连接状态: SyncConnected 314 【watcher-7】事件类型: NodeDeleted 315 【watcher-7】删除节点 316 【watcher-7】节点 /testWatcher/children 被删除 317 ---------------------------------------- 318 **************进入process方法************** 319 event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatcher 320 【watcher-8】收到watcher通知 321 【watcher-8】连接状态: SyncConnected 322 【watcher-8】事件类型: NodeChildrenChanged 323 【watcher-8】子节点变更 324 325 */