其实在学之前我也有个疑虑,我为啥要学curator,撇开涨薪这些外在的东西,就单技术层面来讲,学curator能帮我做些什么?这就不得不从zookeeper说起,上篇我已经大篇幅讲了zk是做什么的了,但真正要靠zk去实现多服务器自动拉取更新的配置文件等功能是非常难的,如果没有curator,直接去写的话基本上能把你累哭,就好比连Mybatis或者jpa都没有,让你用原生的代码去写个网站一样,你可以把curator当做一个比较强大的工具,有了它操作zk不再是事,说这么多,是时候进入正题了:
curator 官网:http://curator.apache.org
使用curator去实现的几块内容:
学习目录: 1.使用curator建立与zk的连接 2.使用curator添加/递归添加节点 3.使用curator删除/递归删除节点 4.使用curator创建/验证 ACL(访问权限列表) 5.使用curator监听 单个/父 节点的变化(watch事件) --------------------------------------------- 6.基于curator实现zookeeper分布式锁(需要掌握基本的多线程知识) 前置条件:已掌握zookeeper的基本操作,对zookeeper有所了解,如果没有掌握请翻阅我前面的博客去学习.
本节所需要引入的依赖有以下三个,建议直接全部引入即可:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.0.0</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.0.0</version> </dependency>
1.通过curator建立与zk的连接
需要准备连接zk的url,建议直接写成工具类,因为接下来会频繁用到,功能类似于jdbc.
public class ZkClientUtil { private static final int BASE_SLEEP_TIME_MS = 5000; //定义失败重试间隔时间 单位:毫秒 private static final int MAX_RETRIES = 3; //定义失败重试次数 private static final int SESSION_TIME_OUT = 1000000; //定义会话存活时间,根据业务灵活指定 单位:毫秒 private static final String ZK_URI = "192.168.174.132:2181";//你自己的zkurl和端口号 private static final String NAMESPACE = "laohan_jianshen"; //工作空间,可以不指定,建议指定,功能类似于项目包,之后创建的所有的节点都会在该工作空间下,方便管理 public static CuratorFramework build(){ //创建比较简单,链式编程,很爽,基本上指定点参数就OK了 RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS,MAX_RETRIES);//重试策略 CuratorFramework client = CuratorFrameworkFactory .builder() .connectString(ZK_URI) .retryPolicy(retryPolicy) .namespace(NAMESPACE) .sessionTimeoutMs(SESSION_TIME_OUT) .build(); return client; } }
2.通过curator添加/递归添加节点
//通过上一步获取到的client,直接启动该client,值得注意的是client在使用前必须先启动: client.start; client .create()//创建节点 .withMode(CreateMode.xxx)//节点属性:永久节点/临时节点/有序节点 通过CreateMode.即可看到 .withACL(ZooDefs.Ids.xxx)//节点访问权限,通过Ids.即可看到 默认是OPEN_ACL_UNSAFE(开放不安全权限) .forPath("/yourpath","yourdata".getBytes());//指明你的节点路径,数据可以不指定,数据必须是byte[]
创建递归节点:
//比如我想一次性创建/yourpath/a/b/c/1/2/3...这样的节点,如果按传统方法会累死你 //curator可以一次性创建好,只需要在创建时添加creatingParentsIfNeeded即可. client .create()//创建节点 .creatingParentsIfNeeded()//创建父节点,如果需要的话 ...
3.使用curator删除/递归删除节点
client .delete() //删除 .guaranteed()//保证一定帮你删了它 .withVersion(0)//指定要删节点的版本号 .forPath("/yourpath")//指定要删节点的路径
递归删除:
//比如我当前的节点结构是这样:/yourpath/a/b/c/1/2/3 我想删除a节点下面的所有目录 //传统方法累死个人,现在只需要添加deletingChildrenIfNeeded即可 client .delete() //删除 .deletingChildrenIfNeeded()//如果它有儿子都给删了...
4.使用curator创建/验证 ACL(访问权限列表)
//为了保证安全,有时需要对节点的访问权限做一些限制,否则可能会引起重要信息泄露/篡改/删除等 //节点ACL的创建方式有两种,一种是使用ZK提供的,一种是自定义的 //1.ZK提供的,比较简单,拿来即用,在创建节点时指明withACL即可 client .create() .withACL(ZooDefs.Ids.READ_ACL_UNSAFE)//指明该节点是只读节点,还有其他属性,可以通过Ids.查看
//创建自定义ACL,需要自己new Id(),并指明是否是加密的,然后账号和密码是多少,加密策略使用zk提供的: List<ACL> aclList = new ArrayList<ACL>(); ACL acl1 = new ACL(ZooDefs.Perms.READ,new Id("digest",DigestAuthenticationProvider.generateDigest("user:123456"))); ACL acl2 = new ACL(ZooDefs.Perms.ALL,new Id("digest",DigestAuthenticationProvider.generateDigest("root:123456"))); aclList.add(acl1); aclList.add(acl2); //如此我就创建好了两种不同的权限账号,user只能对该节点有读的权限,但root用户对该节点有所有权限
//ACL验证,创建好节点之后,可以在服务器的zk安装目录的bin目录下 连接客户端./zkCli //然后通过ls /该目录 查看是否可以访问 正常是不能访问的 会提示权限不够 //下面我们通过curator去连接,要想访问该节点需要在创建client时就指明账号和密码: CuratorFramework client = CuratorFrameworkFactory .builder() .authorization("digest","root:123456".getBytes())//指明使用了加密,用户名和密码用:隔开,以byte[]输入 //如此,接下来通过该client可以对刚刚创建的节点具有所有权限,如果登录的是user,则只具有读权限.
5.通过curator创建单个节点及其父节点的watch事件
由于zk的watch事件是只能被触发一次的,触发完即销毁监听,这显然不是我们想要的,在实际开发中更多的场景是需要对某个节点持续监听,所以这里我只介绍创建持续监听的单节点/父节点
//对单个节点创建watch事件 //定义NodeCache,指明被监听节点的路径: final NodeCache nodeCache = new NodeCache(client,"/yourpath"); nodeCache.start(true);//开启 nodeCache .getCurrentData()//可以获取该监听节点的数据 .getPath();//可以获取该监听节点的路径
//对指定父节点创建watch事件,只要其任何一个子节点,或子节点的子节点...发生变化,就会触发watch事件. //定义PathChildrenCache,指明要watch的目录 final PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"yourpath"); //启动,启动策略有三种:同步,异步提交,异步 用的比较多的就是下面这种,用StartMode.可以查看到 pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); //对该节点创建监听器 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { //TODO 可以通过PathChildrenCacheEvent.拿到你想要的数据和路径等 } });
至此,curator的常用内容已学习完毕,建议每个都亲自操作一下,为之后的自动配置和分布式锁操作打下基础.
===================================================================
package com.demo.zookeeper.curator.lock; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.concurrent.CountDownLatch; /** * num: 模拟 库存 10个 * 流程: 创建10个线程 同时去减num, 会出现 问题 * 解决: curator 的 分布式锁解决方案 InterProcessLock 加锁 acquire() 解锁 release() */ public class DistributedLockDemo2 { // ZooKeeper 锁节点路径, 分布式锁的相关操作都是在这个节点上进行 private final String lockPath = "/distributed-lock"; // ZooKeeper 服务地址, 单机格式为:(127.0.0.1:2181), // 集群格式为:(127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183) private String connectString; // Curator 客户端重试策略 private RetryPolicy retry; // Curator 客户端对象 private CuratorFramework client; // client2 用户模拟其他客户端 private CuratorFramework client2; CountDownLatch countDownLatch = new CountDownLatch(10); // 数据 private static int num = 10; // 初始化资源 @Before public void init() throws Exception { // 设置 ZooKeeper 服务地址为本机的 2181 端口 connectString = "127.0.0.1:2181"; // 重试策略 // 初始休眠时间为 1000ms, 最大重试次数为 3 retry = new ExponentialBackoffRetry(1000, 3); // 创建一个客户端, 60000(ms)为 session 超时时间, 15000(ms)为链接超时时间 client = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); //client2 = CuratorFrameworkFactory.newClient(connectString, 60000, 15000, retry); // 创建会话 client.start(); //client2.start(); } @Test public void sharedLock() throws Exception { // 创建共享锁 final InterProcessLock lock = new InterProcessSemaphoreMutex(client, lockPath); // lock2 用于模拟其他客户端 final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client2, lockPath); for (int i=0;i<10;i++) { new Thread(new Runnable() { @Override public void run() { try { lock.acquire(); if(num >1){ num--; System.out.println(Thread.currentThread().getName()+","+num); } lock.release(); countDownLatch.countDown(); } catch (Exception e) { e.printStackTrace(); } } }).start(); } countDownLatch.await(); System.out.println("### num:" + num); } // 释放资源 @After public void close() { CloseableUtils.closeQuietly(client); } }