ZkClient是Gitthub上一个开源的ZooKeeper客户端。ZKClient在ZooKeeper原生API接口之上进行了包装,是一个更加易用的ZooKeeper客户端。同时ZKClient在内部实现诸如Session超时重连,Watcher反复注册等功能。
一:maven依赖
1 <dependency> 2 <groupId>org.apache.zookeeper</groupId> 3 <artifactId>zookeeper</artifactId> 4 <version>3.4.6</version> 5 </dependency> 6 <dependency> 7 <groupId>com.101tec</groupId> 8 <artifactId>zkclient</artifactId> 9 <version>0.5</version> 10 </dependency>
二:一个java存储实体
1 package com.yeepay.sxf.testZkClient; 2 3 import java.io.Serializable; 4 5 6 /** 7 * 用户对象 8 * @author sxf 9 * 10 */ 11 public class User implements Serializable{ 12 private int id; 13 private String name; 14 private String address; 15 16 17 public int getId() { 18 return id; 19 } 20 public void setId(int id) { 21 this.id = id; 22 } 23 public String getName() { 24 return name; 25 } 26 public void setName(String name) { 27 this.name = name; 28 } 29 public String getAddress() { 30 return address; 31 } 32 public void setAddress(String address) { 33 this.address = address; 34 } 35 36 }
三:测试源代码
1 package com.yeepay.sxf.testZkClient; 2 3 import java.util.List; 4 5 import org.I0Itec.zkclient.IZkChildListener; 6 import org.I0Itec.zkclient.IZkDataListener; 7 import org.I0Itec.zkclient.ZkClient; 8 import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer; 9 import org.apache.zookeeper.CreateMode; 10 import org.apache.zookeeper.data.Stat; 11 /** 12 * zkclient的api测试 13 * acl权限相关操作和原生zookeeper客户端一样 14 * @author sxf 15 * 16 */ 17 public class CreateSession { 18 19 public static void main(String[] args) throws InterruptedException { 20 //第一个参数:zk的ip:端口号 21 //会话过期时间 22 //链接超时时间 23 //序列化器(将java对象转化成byte数组,得到时从byte数组反序列化成java对象)这个序列化器在事件订阅上,修改数据一定要保持一致,否则无效果 24 //ZkClient zkClient=new ZkClient("10.151.30.75:2181", 10000, 10000, new SerializableSerializer()); 25 26 //这个客户端链接,传入的序列化器皿等于什么都没做 27 ZkClient zkClient=new ZkClient("10.151.30.75:2181", 10000, 10000, new BytesPushThroughSerializer()); 28 29 //创建一个节点 30 //createNode(zkClient); 31 32 //获取一个节点中存取的值 33 //getNodeValue(zkClient); 34 35 //获取一个节点下的子节点列表 36 //getNodeChildList(zkClient); 37 38 //判断一个节点是否存在 39 //isNodeExists(zkClient); 40 41 //删除一个节点 42 //deleteNode(zkClient); 43 44 //修改一个节点的数据 45 //updateNodeValue(zkClient); 46 47 //为某个节点的子节点列表发生变化订阅一个事件 48 //whenChildListChangeThenDoSomeThing(zkClient); 49 50 //为某个节点存储的值发生变化或者节点被删除订阅一个事件 51 whenNodeValueChangeThenDoThing(zkClient); 52 53 Thread.sleep(Integer.MAX_VALUE); 54 } 55 56 57 /** 58 * 创建一个node节点 59 * @param zkClient 60 */ 61 public static void createNode(ZkClient zkClient){ 62 //user 对象必须实现序列化接口Serializable 63 User user=new User(); 64 user.setId(1); 65 user.setName("shangxiaofei"); 66 user.setAddress("smx"); 67 //(节点路径,节点数据,持久节点) 68 String path=zkClient.create("/node_131", user, CreateMode.PERSISTENT); 69 System.out.println("CreateSession.createNode(创建节点路径为:)"+path); 70 } 71 72 /** 73 * 获取一个节点中存取的值 74 * @param zkClient 75 */ 76 public static void getNodeValue(ZkClient zkClient){ 77 78 Stat stat=new Stat(); 79 //获取一个节点中存取的值 80 User user=zkClient.readData("/node_131",stat); 81 System.out.println("CreateSession.getNodeValue(id==>)"+user.getId()); 82 System.out.println("CreateSession.getNodeValue(name==>)"+user.getName()); 83 System.out.println("CreateSession.getNodeValue(address==>)"+user.getAddress()); 84 System.out.println("CreateSession.getNodeValue(节点状态==>)"+stat); 85 } 86 87 /** 88 * 获取某个节点的子节点列表 89 * @param zkClient 90 */ 91 public static void getNodeChildList(ZkClient zkClient){ 92 93 List<String> nodesList=zkClient.getChildren("/"); 94 System.out.println("CreateSession.getNodeChildList(“/”下的子节点列表为=>)"+nodesList); 95 } 96 97 /** 98 * 检测某一个节点是否存在 99 * @param zkClient 100 */ 101 public static void isNodeExists(ZkClient zkClient){ 102 boolean flag=zkClient.exists("/node_132"); 103 System.out.println("CreateSession.isNodeExists(节点是否存在的结果)"+flag); 104 } 105 106 107 /** 108 * 删除一个节点或删除存在子节点的节点 109 * @param zkClient 110 */ 111 public static void deleteNode(ZkClient zkClient){ 112 //删除无子节点的节点。如果存在子节点,抛出异常 113 boolean flag=zkClient.delete("/node_131"); 114 System.out.println("CreateSession.deleteNode(删除无子节点的节点结果==>)"+flag); 115 116 //删除存在子节点的节点。先深入子节点,删除掉所有子节点的节点,再删除当前节点 117 boolean flag2=zkClient.deleteRecursive("/node_131"); 118 System.out.println("CreateSession.deleteNode(删除存在子节点的节点结果==>)"+flag2); 119 } 120 121 /** 122 * 修改一个数据节点中存储的值 123 * @param zkClient 124 */ 125 public static void updateNodeValue(ZkClient zkClient){ 126 User user=new User(); 127 user.setId(100); 128 user.setName("shangxiaoshuai"); 129 user.setAddress("smxcyxzyc"); 130 //修改一个节点中存储的数据 131 zkClient.writeData("/node_131", user); 132 133 //修改一个节点中存储的数据,实现类似乐观锁的功能 134 zkClient.writeData("/node_131", user, 1); 135 } 136 137 138 /** 139 * 当子节点列表发生变化,则触发一个事件 140 */ 141 public static void whenChildListChangeThenDoSomeThing(ZkClient zkClient){ 142 /** 143 * 可以监听(1)子节点列表发生变化,增加新子节点,删除子节点 144 * (2)节点本身被删除 145 * (3)节点本身被创建,意味着可以订阅一个并不存在的节点的监听事件 146 */ 147 148 zkClient.subscribeChildChanges("/node_132", new WhenChildChangeThing()); 149 } 150 /** 151 * 当子节点列表发生变化的事件实现类 152 * @author sxf 153 * 154 */ 155 private static class WhenChildChangeThing implements IZkChildListener{ 156 157 @Override 158 public void handleChildChange(String parentPath,List<String> currentChilds) throws Exception { 159 System.out.println("节点列表变化的父节点路径==>"+parentPath); 160 System.out.println("当前的节点列表==>"+currentChilds); 161 } 162 163 } 164 165 /** 166 * 为节点数据内容发生变化订阅一个事件 167 * @param zkClient 168 */ 169 public static void whenNodeValueChangeThenDoThing(ZkClient zkClient){ 170 zkClient.subscribeDataChanges("/node_132", new WhenNodeValueChange()); 171 } 172 /** 173 * 当数据节点存储的值发生变化的事件监听类 174 * @author sxf 175 * 176 */ 177 private static class WhenNodeValueChange implements IZkDataListener{ 178 179 //当节点数据内容发生变化这个函数被调用 180 @Override 181 public void handleDataChange(String dataPath, Object data) 182 throws Exception { 183 System.out.println("节点数据内容发生变化,变化后的值为==>"+data); 184 System.out.println("节点数据内容发生变化,变化节点的路径==>"+dataPath); 185 } 186 187 //当节点被删除时这个函数被调用 188 @Override 189 public void handleDataDeleted(String dataPath) throws Exception { 190 System.out.println("节点被删除的路径为==>"+dataPath); 191 } 192 193 } 194 }