前言##
记录下ZK客户端的使用学习,初步想法是从几个方面来记录
- 如何开始使用及api介绍(创建会话以及增删查改)
- 异步调用
- 事件监听
- Master选举
- 分布式锁
- 计数器
- Barrier
仓库介绍##
GroupID/Org | ArtifactID/Name | Description |
---|---|---|
org.apache.curator | curator-recipes | All of the recipes. Note: this artifact has dependencies on client and framework and, so, Maven (or whatever tool you're using) should pull those in automatically. |
org.apache.curator | curator-async | Asynchronous DSL with O/R modeling, migrations and many other features. |
org.apache.curator | curator-framework | The Curator Framework high level API. This is built on top of the client and should pull it in automatically. |
org.apache.curator | curator-client | The Curator Client - replacement for the ZooKeeper class in the ZK distribution. |
org.apache.curator | curator-test | Contains the TestingServer, the TestingCluster and a few other tools useful for testing. |
org.apache.curator | curator-examples | Example usages of various Curator features. |
org.apache.curator | curator-x-discovery | A Service Discovery implementation built on the Curator Framework. |
org.apache.curator | curator-x-discovery-server | A RESTful server that can be used with Curator Discovery. |
版本说明##
zk版本:
curator版本:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.2.0</version>
</dependency>
常用API介绍###
1.创建会话
CuratorFramework cc = CuratorFrameworkFactory.builder()
.connectString("ip:port")
.sessionTimeoutMs(2000)
.connectionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000,3))
.namespace("test")
.build();
cc.start();
说一下retryPolicy,重连策略,建议用其中两种
//重连3次,每次休息3秒
new RetryNTimes(3,3000);
//重连3次,每次休息大约是1秒
new ExponentialBackoffRetry(1000,3);
//初始化一个大概的等待时间1秒,然后开始重连,最多重连3次,每次最多休息2秒
new ExponentialBackoffRetry(1000,3,2000);
//计算通过这个初始化的大约时间,计算实际需要睡眠多久
long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
namespace代表命名空间,注意的是,curator会自动创建。
2.创建普通节点
cc.create().forPath("/comm_msg_nd","ctx".getBytes());
cc.create().forPath("/comm_no_msg_nd");
需要注意的是,如果没有值,会默认把当前ip地址放进去
return InetAddress.getLocalHost().getHostAddress().getBytes();
3.创建临时节点
cc.create().withMode(CreateMode.EPHEMERAL).forPath("/ephe_nd","ff".getBytes());
//creatingParentsIfNeeded会自动创建父节点
cc.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/ephe_nd","ff".getBytes());
需要注意,只有叶节点可以做临时节点,所以叶节点的父节点必须是永久节点,也就是creatingParentsIfNeeded这个方法创建的父节点必须是永久节点
4.删除节点
//删除单独一个节点
cc.delete().forPath("/comm_msg_nd");
//删除多级节点
cc.create().creatingParentsIfNeeded().forPath("/p1/p2/p3/multi_nd");
cc.delete().deletingChildrenIfNeeded().forPath("/p1");
//删除指定版本的节点
cc.delete().withVersion(0).forPath("/comm_msg_nd");
//保证删除,失败后继续执行
cc.delete().guaranteed().forPath("/comm_msg_nd");
//通过添加错误的回调方法来实现,错误后继续执行
OperationAndData.ErrorCallback<String> errorCallback = null;
if ( guaranteed )
{
errorCallback = new OperationAndData.ErrorCallback<String>()
{
@Override
public void retriesExhausted(OperationAndData<String> operationAndData)
{
//将路径添加到错误处理中
client.getFailedDeleteManager().addFailedOperation(unfixedPath);
}
};
}
client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null);
//然后FailedDeleteManager中通过调用这个来继续请求删除操作
@Override
protected void executeGuaranteedOperationInBackground(String path)
throws Exception
{
client.delete().guaranteed().inBackground().forPath(path);
}
5.读数据
Stat stat = new Stat();
byte[] ctx = cc.getData().storingStatIn(stat).forPath("/comm_msg_nd");
//获取节点内容为ctx
System.out.println(new String(ctx));
//获取该节点stat
//78,78,1573789366124,1573789366124,0,0,0,0,3,0,78
System.out.println(stat);
6.更新数据
Stat stat = new Stat();
stat = cc.setData().forPath("/comm_msg_nd","new ctx".getBytes());
System.out.println(stat);