1、常用的zk java客户端
2、搭建maven工程,建立curator与zkserver的连接
3、zk命名空间以及创建节点
4、修改节点数据以及删除节点
5、读取节点数据、节点下面子节点列表、判断节点是否存在
6、一次性监听--curator之usingWatcher
7、curator之nodeCache一次注册N次监听
8、curator之PathChildrenCache子节点监听
9、curator之acl权限操作与认证授权
1、常用的zk java客户端 <--返回目录
1)zk原生api
不足之处:超时重连不支持自动,需要手动操作:watch注册一次后会失效;不支持递归创建节点;
2)zkclient
3)apache curator
apache的开源项目
解决watcher注册一次就失效的
api更加简单易用
提供更多解决方案并且实现简单,比如分布式锁
提供常用的zookeeper工具类
2、搭建maven工程,建立curator与zkserver的连接 <--返回目录
依赖
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.11</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
CuratorOperator
package com.oy.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorOperator {
private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public CuratorOperator() {
// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath).sessionTimeoutMs(20000)
.retryPolicy(retryPolicy).build();
client.start();
}
/**
* 测试客户端连接
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
new Thread().sleep(5000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
3、zk命名空间以及创建节点 <--返回目录
package com.oy.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorOperator {
private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public CuratorOperator() {
// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath).sessionTimeoutMs(20000)
.retryPolicy(retryPolicy).namespace("workspace").build();
client.start();
}
/**
* 测试客户端连接
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
// 创建节点
String nodePath = "/super/son1";
byte[] data = "testnode".getBytes();
curatorOperator.client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, data);
new Thread().sleep(5000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
4、修改节点数据以及删除节点 <--返回目录
更新节点数据
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
// 创建节点
String nodePath = "/super/son1";
// byte[] data = "testnode".getBytes();
// curatorOperator.client.create().creatingParentContainersIfNeeded()
// .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// .forPath(nodePath, data);
// 更新节点数据
byte[] newData = "newtestnode".getBytes();
curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);
new Thread().sleep(5000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
删除节点
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
// 创建节点
String nodePath = "/super/son1";
// byte[] data = "testnode".getBytes();
// curatorOperator.client.create().creatingParentContainersIfNeeded()
// .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// .forPath(nodePath, data);
// 更新节点数据
// byte[] newData = "newtestnode".getBytes();
// curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);
// 删除节点
curatorOperator.client.delete()
.guaranteed() // 如果删除失败,那么在后台还是继续删除,直到成功
.deletingChildrenIfNeeded() // 如果有子节点,也删除
.forPath(nodePath);
new Thread().sleep(5000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
测试删除前,在/workspace/super/son1下面创建子节点,删除son1的时候,下面的子节点也被删除了
5、读取节点数据、节点下面子节点列表、判断节点是否存在 <--返回目录
package com.oy.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CuratorOperator {
private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public CuratorOperator() {
// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath).sessionTimeoutMs(20000)
.retryPolicy(retryPolicy).namespace("workspace").build();
client.start();
}
/**
* 测试客户端连接
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
// 创建节点
String nodePath = "/super/son1";
// byte[] data = "testnode".getBytes();
// curatorOperator.client.create().creatingParentContainersIfNeeded()
// .withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
// .forPath(nodePath, data);
// 更新节点数据
// byte[] newData = "newtestnode".getBytes();
// curatorOperator.client.setData().withVersion(0).forPath(nodePath, newData);
// 删除节点
// curatorOperator.client.delete()
// .guaranteed() // 如果删除失败,那么在后台还是继续删除,直到成功
// .deletingChildrenIfNeeded() // 如果有子节点,也删除
// .forPath(nodePath);
// 读取节点数据
Stat stat = new Stat();
byte[] nodeData = curatorOperator.client.getData().storingStatIn(stat).forPath(nodePath);
log.warn(nodePath + "节点数据: {}, 版本: {}", new String(nodeData), stat.getVersion());
// 查询节点下面的子节点列表
List<String> childNodes = curatorOperator.client.getChildren().forPath(nodePath);
for (String child : childNodes) {
log.warn(child);
}
// 判断节点是否存在,如果不存在则为空
Stat stat1 = curatorOperator.client.checkExists().forPath(nodePath + "/xxx");
log.warn("stat1: {}", stat1);
new Thread().sleep(5000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
6、一次性监听--curator之usingWatcher <--返回目录
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
String nodePath = "/super/son1";
// watcher事件,当使用usingWatcher时,监听只会触发一次,监听完毕后就销毁
curatorOperator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
//curatorOperator.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
new Thread().sleep(50000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
测试:通过客户端zkCli.sh 多次修改 set /workspace/super/son1 bbb, 控制台只打印一次watcher监听结果。
7、curator之nodeCache一次注册N次监听 <--返回目录
package com.oy.curator;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CuratorOperator {
private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public CuratorOperator() {
// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath).sessionTimeoutMs(20000)
.retryPolicy(retryPolicy).namespace("workspace").build();
client.start();
}
/**
* 测试客户端连接
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
String nodePath = "/super/son1";
// watcher事件,当使用usingWatcher时,监听只会触发一次,监听完毕后就销毁
//curatorOperator.client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);
//curatorOperator.client.getData().usingWatcher(new MyWatcher()).forPath(nodePath);
// NodeCache: 监听数据节点的变化,会触发事件
final NodeCache nodeCache = new NodeCache(curatorOperator.client, nodePath);
// buildInital: 为true则在初始化时获取node的值并缓存
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
log.warn("节点初始化数据为:{}", new String(nodeCache.getCurrentData().getData()));
} else {
log.warn("节点初始化数据为空");
}
nodeCache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
// 注意:删除节点时下面代码nodeCache.getCurrentData()==null
String data = new String(nodeCache.getCurrentData().getData());
log.warn("节点路径{}的数据:{}", nodeCache.getPath(), data);
}
});
new Thread().sleep(50000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
测试:
1)启动main方法
2)客户端修改/workspace/super/son1的值 set /workspace/super/son1 eee/fff/ggg
8、curator之PathChildrenCache子节点监听 <--返回目录
package com.oy.curator;
import javafx.scene.shape.Path;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CuratorOperator {
private static final Logger log = LoggerFactory.getLogger(CuratorOperator.class);
private CuratorFramework client = null;
private static final String zkServerPath = "192.168.213.200:2181,192.168.213.200:2182,192.168.213.200:2183";
public CuratorOperator() {
// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath).sessionTimeoutMs(20000)
.retryPolicy(retryPolicy).namespace("workspace").build();
client.start();
}
/**
* 测试客户端连接
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
CuratorOperator curatorOperator = new CuratorOperator();
boolean started = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started ? "连接中" : "已关闭"));
String nodePath = "/super/son1";
// 参数3 cacheData 设置缓存节点的数据状态
PathChildrenCache childrenCache = new PathChildrenCache(curatorOperator.client, nodePath, true);
// StartMode 初始化方式
// POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件
// NORMAL: 异步初始化,初始化后不触发事件; BUILD_INITIAL_CACHE: 同步初始化
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// 只有同步初始化,下面才能获取子节点数据
List<ChildData> childDataList = childrenCache.getCurrentData();
log.warn("当前数据节点的子节点数据列表:");
for (ChildData cd : childDataList) {
log.warn(new String(cd.getData()));
}
childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
log.warn("字节点初始化完成"); // childrenCache.start(POST_INITIALIZED_EVENT) 异步初始化完成后触发调用
}
else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { // 添加子节点。初始化完成时,有几个子节点,这个CHILD_ADDED就会触发几次
log.warn("添加子节点:{}", event.getData().getPath());
log.warn("子节点数据:{}", new String(event.getData().getData()));
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { // 修改子节点数据
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { // 删除子节点
}
}
});
new Thread().sleep(50000);
curatorOperator.closeZKClient();
boolean started1 = curatorOperator.client.isStarted();
log.warn("当前客户端状态: " + (started1 ? "连接中" : "已关闭"));
}
/**
* 关闭zk客户端连接
*/
public void closeZKClient() {
if (client != null) client.close();
}
}
9、curator之acl权限操作与认证授权 <--返回目录
// 参数1 重试次数; 参数2 每次重试间隔的时间
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder().authorization("digest", "user:password".getBytes())
.connectString(zkServerPath).sessionTimeoutMs(20000)
.retryPolicy(retryPolicy).namespace("workspace").build();
client.start();
// 自定义用户认证访问
ArrayList<ACL> acls = new ArrayList<>();
Id userPwd1 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan1:123"));
Id userPwd2 = new Id("digest", AclUtils.getDigestUserPwd("zhangsan2:123"));
acls.add(new ACL(ZooDefs.Perms.ALL, userPwd1));
acls.add(new ACL(ZooDefs.Perms.READ, userPwd2));
acls.add(new ACL(ZooDefs.Perms.DELETE | ZooDefs.Perms.DELETE, userPwd2));
// 创建节点, 使用自定义的权限列表
String nodePath = "/super/son1";
byte[] data = "testnode".getBytes();
curatorOperator.client.create().creatingParentContainersIfNeeded()
.withMode(CreateMode.PERSISTENT).withACL(acls)
.forPath(nodePath, data);
---