其实在学之前我也有个疑虑,我为啥要学curator,撇开涨薪这些外在的东西,就单技术层面来讲,学curator能帮我做些什么?这就不得不从zookeeper说起,上篇我已经大篇幅讲了zk是做什么的了,但真正要靠zk去实现多服务器自动拉取更新的配置文件等功能是非常难的,如果没有curator,直接去写的话基本上能把你累哭,就好比连Mybatis或者jpa都没有,让你用原生的代码去写个网站一样,你可以把curator当做一个比较强大的工具,有了它操作zk不再是事,说这么多,是时候进入正题了:
curator 官网:http://curator.apache.org
使用curator去实现的几块内容:
学习目录: 1.使用curator建立与zk的连接 2.使用curator添加/递归添加节点 3.使用curator删除/递归删除节点 4.使用curator创建/验证 ACL(访问权限列表) 5.使用curator监听 单个/父 节点的变化(watch事件) --------------------------------------------- 6.基于curator实现zookeeper分布式锁(需要掌握基本的多线程知识) 前置条件:已掌握zookeeper的基本操作,对zookeeper有所了解,如果没有掌握请翻阅我前面的博客去学习.
本节所需要引入的依赖有以下三个,建议直接全部引入即可:
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
1.通过curator建立与zk的连接
需要准备连接zk的url,建议直接写成工具类,因为接下来会频繁用到,功能类似于jdbc.
public class ZkClientUtil {
private static final int BASE_SLEEP_TIME_MS = 5000; //定义失败重试间隔时间 单位:毫秒
private static final int MAX_RETRIES = 3; //定义失败重试次数
private static final int SESSION_TIME_OUT = 1000000; //定义会话存活时间,根据业务灵活指定 单位:毫秒
private static final String ZK_URI = "192.168.174.132:2181";//你自己的zkurl和端口号
private static final String NAMESPACE = "laohan_jianshen";
//工作空间,可以不指定,建议指定,功能类似于项目包,之后创建的所有的节点都会在该工作空间下,方便管理
public static CuratorFramework build(){
//创建比较简单,链式编程,很爽,基本上指定点参数就OK了
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES);//重试策略
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString(ZK_URI)
.retryPolicy(retryPolicy)
.namespace(NAMESPACE)
.sessionTimeoutMs(SESSION_TIME_OUT)
.build();
return client;
}
}
2.通过curator添加/递归添加节点
//通过上一步获取到的client,直接启动该client,值得注意的是client在使用前必须先启动:
client.start;
client
.create()//创建节点
.withMode(CreateMode.xxx)//节点属性:永久节点/临时节点/有序节点 通过CreateMode.即可看到
.withACL(ZooDefs.Ids.xxx)//节点访问权限,通过Ids.即可看到 默认是OPEN_ACL_UNSAFE(开放不安全权限)
.forPath("/yourpath","yourdata".getBytes());//指明你的节点路径,数据可以不指定,数据必须是byte[]
创建递归节点:
//比如我想一次性创建/yourpath/a/b/c/1/2/3...这样的节点,如果按传统方法会累死你 //curator可以一次性创建好,只需要在创建时添加creatingParentsIfNeeded即可. client .create()//创建节点 .creatingParentsIfNeeded()//创建父节点,如果需要的话 ...
3.使用curator删除/递归删除节点
client
.delete() //删除
.guaranteed()//保证一定帮你删了它
.withVersion(0)//指定要删节点的版本号
.forPath("/yourpath")//指定要删节点的路径
递归删除:
//比如我当前的节点结构是这样:/yourpath/a/b/c/1/2/3 我想删除a节点下面的所有目录 //传统方法累死个人,现在只需要添加deletingChildrenIfNeeded即可 client .delete() //删除 .deletingChildrenIfNeeded()//如果它有儿子都给删了...
4.使用curator创建/验证 ACL(访问权限列表)
//为了保证安全,有时需要对节点的访问权限做一些限制,否则可能会引起重要信息泄露/篡改/删除等 //节点ACL的创建方式有两种,一种是使用ZK提供的,一种是自定义的 //1.ZK提供的,比较简单,拿来即用,在创建节点时指明withACL即可 client .create() .withACL(ZooDefs.Ids.READ_ACL_UNSAFE)//指明该节点是只读节点,还有其他属性,可以通过Ids.查看
//创建自定义ACL,需要自己new Id(),并指明是否是加密的,然后账号和密码是多少,加密策略使用zk提供的:
List<ACL> aclList = new ArrayList<ACL>();
ACL acl1 = new ACL(ZooDefs.Perms.READ,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456")));
ACL acl2 = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:123456")));
aclList.add(acl1);
aclList.add(acl2);
//如此我就创建好了两种不同的权限账号,user只能对该节点有读的权限,但root用户对该节点有所有权限
//ACL验证,创建好节点之后,可以在服务器的zk安装目录的bin目录下 连接客户端./zkCli
//然后通过ls /该目录 查看是否可以访问 正常是不能访问的 会提示权限不够
//下面我们通过curator去连接,要想访问该节点需要在创建client时就指明账号和密码:
CuratorFramework client = CuratorFrameworkFactory
.builder()
.authorization("digest","root:123456".getBytes())//指明使用了加密,用户名和密码用:隔开,以byte[]输入
//如此,接下来通过该client可以对刚刚创建的节点具有所有权限,如果登录的是user,则只具有读权限.
5.通过curator创建单个节点及其父节点的watch事件
由于zk的watch事件是只能被触发一次的,触发完即销毁监听,这显然不是我们想要的,在实际开发中更多的场景是需要对某个节点持续监听,所以这里我只介绍创建持续监听的单节点/父节点
//对单个节点创建watch事件 //定义NodeCache,指明被监听节点的路径: final NodeCache nodeCache = new NodeCache(client,"/yourpath"); nodeCache.start(true);//开启 nodeCache .getCurrentData()//可以获取该监听节点的数据 .getPath();//可以获取该监听节点的路径
//对指定父节点创建watch事件,只要其任何一个子节点,或子节点的子节点...发生变化,就会触发watch事件.
//定义PathChildrenCache,指明要watch的目录
final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"yourpath");
//启动,启动策略有三种:同步,异步提交,异步 用的比较多的就是下面这种,用StartMode.可以查看到
pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
//对该节点创建监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
//TODO 可以通过PathChildrenCacheEvent.拿到你想要的数据和路径等
}
});
至此,curator的常用内容已学习完毕,建议每个都亲自操作一下,为之后的自动配置和分布式锁操作打下基础.
===================================================================
package com.demo.zookeeper.curator.lock;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
/**
* num: 模拟 库存 10个
* 流程: 创建10个线程 同时去减num, 会出现 问题
* 解决: curator 的 分布式锁解决方案 InterProcessLock 加锁 acquire() 解锁 release()
*/
public class DistributedLockDemo2 {
// ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行
private final String lockPath = "/distributed-lock";
// ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181),
// 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183)
private String connectString;
// Curator 客户端重试策略
private RetryPolicy retry;
// Curator 客户端对象
private CuratorFramework client;
// client2 用户模拟其他客户端
private CuratorFramework client2;
CountDownLatch countDownLatch = new CountDownLatch(10);
// 数据
private static int num = 10;
// 初始化资源
@Before
public void init() throws Exception {
// 设置 ZooKeeper 服务地址为本机的 2181 端口
connectString = "127.0.0.1:2181";
// 重试策略
// 初始休眠时间为 1000ms, 最大重试次数为 3
retry = new ExponentialBackoffRetry(1000, 3);
// 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间
client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
//client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry);
// 创建会话
client.start();
//client2.start();
}
@Test
public void sharedLock() throws Exception {
// 创建共享锁
final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath);
// lock2 用于模拟其他客户端
final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath);
for (int i=0;i<10;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
lock.acquire();
if(num >1){
num--;
System.out.println(Thread.currentThread().getName()+","+num);
}
lock.release();
countDownLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
countDownLatch.await();
System.out.println("### num:" + num);
}
// 释放资源
@After
public void close() {
CloseableUtils.closeQuietly(client);
}
}