- 创建会话
1 package org.zln.zk;
2
3 import org.apache.zookeeper.WatchedEvent;
4 import org.apache.zookeeper.Watcher;
5 import org.apache.zookeeper.ZooKeeper;
6
7 import java.io.IOException;
8
9 /**
10 * Created by sherry on 16/8/27.
11 */
12 public class TestZooKeeperClientApi {
13
14 private static ZooKeeper zooKeeper;
15
16 public static void main(String[] args) throws IOException, InterruptedException {
17 createSession();
18 }
19
20 /**
21 * 创建会话
22 */
23 private static ZooKeeper createSession() throws IOException, InterruptedException {
24 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知)
25 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
26 @Override
27 public void process(WatchedEvent watchedEvent) {
28 System.out.println("收到事件:"+watchedEvent);//收到事件:WatchedEvent state:SyncConnected type:None path:null
29
30
31 //TODO
32 }
33 });
34 System.out.println("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING
35
36 //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了
37 Thread.sleep(5000);
38
39 return zooKeeper;
40
41
42 }
43 }
- 创建节点
1 package org.zln.zk;
2
3 import org.apache.zookeeper.*;
4 import org.apache.zookeeper.data.ACL;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7
8 import java.io.IOException;
9 import java.io.UnsupportedEncodingException;
10 import java.util.ArrayList;
11
12 /**
13 * Created by sherry on 16/8/27.
14 */
15 public class TestZooKeeperClientApi {
16
17 private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class);
18
19 private static ZooKeeper zooKeeper;
20
21 public static void main(String[] args) throws IOException, InterruptedException {
22 createSession();
23 }
24
25 /**
26 * 创建会话
27 */
28 private static ZooKeeper createSession() throws IOException, InterruptedException {
29 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知)
30 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
31 @Override
32 public void process(WatchedEvent watchedEvent) {
33 //TODO 与 ZooKeeper 的交互,一般都放在这里
34 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接
35 logger.info("连接上了");
36
37 try {
38 //参数说明:节点路径 数据的字节数组 权限 创建节点模式
39 String nodePath = createNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
40 logger.info("创建节点:"+nodePath);
41 } catch (UnsupportedEncodingException|KeeperException|InterruptedException e) {
42 e.printStackTrace();
43 }
44
45 }
46 }
47 });
48 logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING
49
50 //如果不停一段时间,那么,监听器还没收到监听,方法就已经退出了
51 Thread.sleep(5000);
52
53 return zooKeeper;
54
55
56 }
57
58 /**
59 * 创建ZooKeeper节点
60 * @param zooKeeper ZooKeeper连接
61 * @return 节点路径
62 */
63 public static String createNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException {
64 //参数说明:节点路径 数据的字节数组 权限 创建节点模式
65 return zooKeeper.create(path,bytes, acls, createMode);
66 }
67 }
创建模式
PERSISTENT 持久节点
PERSISTENT_SEQUENTIAL 持久顺序节点
EPHEMERAL 临时节点
EPHEMERAL_SEQUENTIAL 临时顺序节点
以上代码,是属于同步创建
1 /**
2 * 异步创建节点
3 * @param zooKeeper
4 * @param path
5 * @param bytes
6 * @param acls
7 * @param createMode
8 * @throws KeeperException
9 * @throws InterruptedException
10 */
11 public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {
12
13 //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数
14 zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {
15 /**
16 *
17 * @param rc 节点创建结果返回码 0-节点创建成功
18 * @param path 节点真实路径
19 * @param ctx 异步调用上下文 就是 create方法本地调用的那个最后一个参数
20 * @param name
21 */
22 @Override
23 public void processResult(int rc, String path, Object ctx, String name) {
24 StringBuilder stringBuilder = new StringBuilder();
25 stringBuilder.append("
rc="+rc+"
" +
26 "path="+path+"
" +
27 "ctx="+ctx+"
" +
28 "name="+name+"
");
29 logger.info(stringBuilder.toString());
30 }
31 },"异步创建");
32 }
- 获取子节点
1 /**
2 * 同步方式获取子节点
3 * @param zooKeeper 连接
4 * @param parentPath 父路径
5 * @return
6 * @throws KeeperException
7 * @throws InterruptedException
8 */
9 public static List<String> getChildList(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {
10 //参数说明: 父节点路径 是否需要关注子节点的变化
11 List<String> childs = zooKeeper.getChildren(parentPath,false);
12 return childs;
13 }
异步方式获取子节点且关注子节点的变化
1 /**
2 * 异步方式获取子节点 关注子节点变化
3 * @param zooKeeper 连接
4 * @param parentPath 父路径
5 */
6 public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){
7 zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() {
8 @Override
9 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
10 logger.info("变化后的子节点:");
11 for (String name:children){
12 logger.info("子节点:"+name);
13 }
14 }
15 },"关注子节点变化");
16 }
目前为止可以发现这个规律,有回调函数的是异步方式调用,没有回调函数的是同步调用
问:同步调用和异步调用的使用场景是???
答:下面的操作依赖调用结果的时候,就需要调用同步方法
1 package org.zln.zk;
2
3 import org.apache.zookeeper.*;
4 import org.apache.zookeeper.data.ACL;
5 import org.apache.zookeeper.data.Stat;
6 import org.slf4j.Logger;
7 import org.slf4j.LoggerFactory;
8
9 import java.io.IOException;
10 import java.io.UnsupportedEncodingException;
11 import java.util.ArrayList;
12 import java.util.List;
13
14 /**
15 * Created by sherry on 16/8/27.
16 */
17 public class TestZooKeeperClientApi {
18
19 private static Logger logger = LoggerFactory.getLogger(TestZooKeeperClientApi.class);
20
21 private static ZooKeeper zooKeeper;
22
23 public static void main(String[] args) throws IOException, InterruptedException {
24 createSession();
25
26 Thread.sleep(Integer.MAX_VALUE);
27 }
28
29 /**
30 * 创建会话
31 */
32 private static ZooKeeper createSession() throws IOException, InterruptedException {
33 //实例化的过程,同时也是与ZooKeeper建立连接的过程,参数说明:ip:port 超时时间 监听器(实现water接口,监听器用于接收通知)
34 zooKeeper = new ZooKeeper("127.0.0.1:2181", 5000, new Watcher() {
35 @Override
36 public void process(WatchedEvent watchedEvent) {
37 //TODO 与 ZooKeeper 的交互,一般都放在这里
38 if (watchedEvent.getState() == Event.KeeperState.SyncConnected){//已连接
39 logger.info("连接上了");
40 try {
41 //同步方式创建节点
42 //String nodePath = sysCreateNode(zooKeeper,"/node_1","123".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
43 //logger.info("创建节点:"+nodePath);
44
45 //异步方式创建节点
46 //asCreateNode(zooKeeper,"/node_2","234".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
47
48
49 //同步方式获取子节点 不关注子节点变化
50 // List<String> list = getChildListNoWatch(zooKeeper,"/");
51 // for (String name:list){
52 // logger.info("子节点:"+name);
53 // }
54
55 //异步方式获取节点 关注子节点变化
56 // asGetChildListAndWatch(zooKeeper,"/");
57
58 //同步方式获取节点数据 sysGetNodeData
59
60 byte[] bytes = sysGetNodeDataNoWatch(zooKeeper,"/node_1");
61 logger.info("获取节点数据"+new String(bytes,"UTF-8"));
62
63 deleteNode(zooKeeper,"/node_1",0);
64 } catch (KeeperException|InterruptedException|UnsupportedEncodingException e) {
65 e.printStackTrace();
66 }
67
68 }
69 }
70 });
71 logger.info("查看状态:"+zooKeeper.getState());//查看状态:CONNECTING
72
73 return zooKeeper;
74
75
76 }
77
78 /**
79 * 同步创建节点
80 * @param zooKeeper 连接
81 * @param path 节点路径
82 * @param bytes 字节数组数据
83 * @param acls 权限
84 * @param createMode 创建模式
85 * @return
86 * @throws UnsupportedEncodingException
87 * @throws KeeperException
88 * @throws InterruptedException
89 */
90 public static String sysCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws UnsupportedEncodingException, KeeperException, InterruptedException {
91 return zooKeeper.create(path,bytes, acls, createMode);
92 }
93
94
95 /**
96 * 异步创建节点
97 * @param zooKeeper 连接
98 * @param path 节点路径
99 * @param bytes 字节数组数据
100 * @param acls 权限
101 * @param createMode 创建模式
102 * @throws KeeperException
103 * @throws InterruptedException
104 */
105 public static void asCreateNode(ZooKeeper zooKeeper, String path, byte[] bytes, ArrayList<ACL> acls,CreateMode createMode) throws KeeperException, InterruptedException {
106
107 //异步创建需要增加 AsyncCallback.StringCallback 接口的实现类 以及 一个上下文对象参数
108 zooKeeper.create(path, bytes, acls, createMode, new AsyncCallback.StringCallback() {
109 /**
110 *
111 * @param rc 节点创建结果返回码 0-节点创建成功
112 * @param path 节点真实路径
113 * @param ctx 异步调用上下文 就是 create方法本地调用的那个最后一个参数
114 * @param name
115 */
116 @Override
117 public void processResult(int rc, String path, Object ctx, String name) {
118 StringBuilder stringBuilder = new StringBuilder();
119 stringBuilder.append("
rc="+rc+"
" +
120 "path="+path+"
" +
121 "ctx="+ctx+"
" +
122 "name="+name+"
");
123 logger.info(stringBuilder.toString());
124 }
125 },"异步创建");
126 }
127
128
129 /**
130 * 同步方式获取子节点 不关注子节点变化
131 * @param zooKeeper 连接
132 * @param parentPath 父路径
133 * @return
134 * @throws KeeperException
135 * @throws InterruptedException
136 */
137 public static List<String> sysGetChildListNoWatch(ZooKeeper zooKeeper,String parentPath) throws KeeperException, InterruptedException {
138 //参数说明: 父节点路径 是否需要关注子节点的变化 如果 true,则子节点发生变化后,会产生 NodeChildrenChanged 事件
139 List<String> childs = zooKeeper.getChildren(parentPath,false);
140 return childs;
141 }
142
143 /**
144 * 异步方式获取子节点 关注子节点变化
145 * @param zooKeeper 连接
146 * @param parentPath 父路径
147 */
148 public static void asGetChildListAndWatch(ZooKeeper zooKeeper,String parentPath){
149 zooKeeper.getChildren(parentPath, true, new AsyncCallback.Children2Callback() {
150 @Override
151 public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
152 logger.info("变化后的子节点:");
153 for (String name:children){
154 logger.info("子节点:"+name);
155 }
156 }
157 },"关注子节点变化");
158 }
159
160 /**
161 * 同步方式获取数据
162 * @param zooKeeper
163 * @param path
164 * @return
165 * @throws KeeperException
166 * @throws InterruptedException
167 */
168 public static byte[] sysGetNodeDataNoWatch(ZooKeeper zooKeeper,String path) throws KeeperException, InterruptedException {
169 //路径 是否关注数据变化 状态
170 return zooKeeper.getData(path,false,new Stat());
171 }
172
173 /**
174 * 删除节点
175 * @param zooKeeper
176 * @param nodePath
177 * @param version
178 * @throws KeeperException
179 * @throws InterruptedException
180 */
181 public static void deleteNode(ZooKeeper zooKeeper,String nodePath,int version) throws KeeperException, InterruptedException {
182 zooKeeper.delete(nodePath,version);
183 }
184
185
186 }
除了ZooKeeper提供的Java API外,还有两种客户端,ZKClient和Curator两种客户端,都是对原生API的封装,使得操作更方便
《从PAXOS到ZOOKEEPER分布式一致性原理与实践》,可以参考这本书
