一、Curator介绍
zookeeper的提交人也说过,curator对于zookeeper而言就像是guava对于java差不多,更加优雅高效。
而且之前的zookeeper原生API,往往因为2个问题而让代码变的非常复杂:
(1) session expired,当会话由于各种原因而断掉之后的客户端重连机制
(2) watch的一次性问题,每次重连之后都要重新设置watch,而且每次的watch都是一次性的操作
Curator框架提供了一套高级的API, 解决了以上问题并且简化了ZooKeeper的操作。 它增加了很多使用ZooKeeper开发的特性,可以处理ZooKeeper集群复杂的连接管理和重试机制。 这些特性包括:
- 自动化的连接管理: 重新建立到ZooKeeper的连接和重试机制存在一些潜在的错误case。 Curator帮助你处理这些事情,对你来说是透明的。
- 清理API:
- 简化了原生的ZooKeeper的方法,事件等
- 提供了一个现代的流式接口
- 提供了Recipes实现: 如前面的文章介绍的那样,基于这些Recipes可以创建很多复杂的分布式应用
注意官网中的版本说明:
The are currently two released versions of Curator, 2.x.x and 3.x.x:
- Curator 2.x.x - compatible with both ZooKeeper 3.4.x and ZooKeeper 3.5.x
- Curator 3.x.x - compatible only with ZooKeeper 3.5.x and includes support for new features such as dynamic reconfiguration, etc.
二、HelloWorld
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; public class CuratorBase { /** * zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.211:2181,192.168.1.212:2181,192.168.1.213:2181"; /** * session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = null; try { cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) // .namespace("super") .build(); //3 开启连接 cf.start(); //4 建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 // cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes()); //5 删除节点 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); // 读取、修改 /*创建节点*/ cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes()); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c2", "c2内容".getBytes()); /*读取节点*/ String ret1 = new String(cf.getData().forPath("/super/c2")); System.out.println(ret1); /*修改节点*/ cf.setData().forPath("/super/c2", "修改c2内容".getBytes()); String ret2 = new String(cf.getData().forPath("/super/c2")); System.out.println(ret2); // 绑定回调函数 ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("线程为:" + Thread.currentThread().getName()); } }, pool) .forPath("/super/c3", "c3内容".getBytes()); Thread.sleep(Integer.MAX_VALUE); // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法 /** List<String> list = cf.getChildren().forPath("/super"); for(String p : list){ System.out.println(p); } Stat stat = cf.checkExists().forPath("/super/c3"); System.out.println(stat); Thread.sleep(2000); cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); */ //cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super"); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("关闭连接"); CloseableUtils.closeQuietly(cf); } } }
总结方法如下:
方法名 | 描述 |
---|---|
create() | 开始创建操作, 可以调用额外的方法(比如方式mode 或者后台执行background) 并在最后调用forPath()指定要操作的ZNode |
delete() | 开始删除操作. 可以调用额外的方法(版本或者后台处理version or background)并在最后调用forPath()指定要操作的ZNode |
checkExists() | 开始检查ZNode是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用forPath()指定要操作的ZNode |
getData() | 开始获得ZNode节点数据的操作. 可以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
setData() | 开始设置ZNode节点数据的操作. 可以调用额外的方法(版本或者后台处理) 并在最后调用forPath()指定要操作的ZNode |
getChildren() | 开始获得ZNode的子节点列表。 以调用额外的方法(监控、后台处理或者获取状态watch, background or get stat) 并在最后调用forPath()指定要操作的ZNode |
inTransaction() | 开始是原子ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交 |
三、Curator中Watch机制
监控当前节点的数据状况:注意不会监控delete事件
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; public class CuratorWatcher1 { /** * zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.211:2181,192.168.1.212:2181,192.168.1.213:2181"; /** * session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 建立连接 cf.start(); //4 建立一个cache缓存 final NodeCache cache = new NodeCache(cf, "/super", false); cache.start(true); cache.getListenable().addListener(new NodeCacheListener() { /** * <B>方法名称:</B>nodeChanged<BR> * <B>概要说明:</B>触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作。<BR> * @see org.apache.curator.framework.recipes.cache.NodeCacheListener#nodeChanged() */ @Override public void nodeChanged() throws Exception { System.out.println("路径为:" + cache.getCurrentData().getPath()); System.out.println("数据为:" + new String(cache.getCurrentData().getData())); System.out.println("状态为:" + cache.getCurrentData().getStat()); System.out.println("---------------------------------------"); } }); Thread.sleep(1000); cf.create().forPath("/super", "123".getBytes()); Thread.sleep(1000); cf.setData().forPath("/super", "456".getBytes()); Thread.sleep(1000); cf.delete().forPath("/super"); Thread.sleep(1000); System.out.println("按下任何键表示关闭..."); System.in.read(); CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(cf); } }
监控子节点的数据变化
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.utils.CloseableUtils; public class CuratorWatcher2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.1.211:2181,192.168.1.212:2181,192.168.1.213:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 建立连接 cf.start(); //4 建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受 PathChildrenCache cache = new PathChildrenCache(cf, "/super", true); //5 在初始化的时候就进行缓存监听 cache.start(StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener(new PathChildrenCacheListener() { /** * <B>方法名称:</B>监听子节点变更<BR> * <B>概要说明:</B>新建、修改、删除<BR> * @see org.apache.curator.framework.recipes.cache.PathChildrenCacheListener#childEvent(org.apache.curator.framework.CuratorFramework, org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent) */ @Override public void childEvent(CuratorFramework cf, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED :" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED :" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED :" + event.getData().getPath()); break; default: break; } } }); //创建本身节点不发生变化 cf.create().forPath("/super", "init".getBytes()); //添加子节点 Thread.sleep(1000); cf.create().forPath("/super/c1", "c1内容".getBytes()); Thread.sleep(1000); cf.create().forPath("/super/c2", "c2内容".getBytes()); //修改子节点 Thread.sleep(1000); cf.setData().forPath("/super/c1", "c1更新内容".getBytes()); //删除子节点 Thread.sleep(1000); cf.delete().forPath("/super/c2"); //删除本身节点 Thread.sleep(1000); cf.delete().deletingChildrenIfNeeded().forPath("/super"); System.out.println("按下任何键表示关闭..."); System.in.read(); CloseableUtils.closeQuietly(cache); CloseableUtils.closeQuietly(cf); } }
四、Curator Recipe用法
以上只是介绍了curator基础用法,在curator的扩展库中,还提供了非常丰富的用法,这里暂时不做详述。
或者直接去官网直接看源代码:https://git-wip-us.apache.org/repos/asf?p=curator.git;a=tree;f=curator-examples/src/main/java;hb=HEAD
五、路由和负载均衡的实现
现在很多服务治理的方案都是netty+zk,由netty提供自定义协议的rpc服务,由zk去做服务的注册和自动发现,curator自带的discovery模块可以直接被用做服务发现,这里由手动实现作为一个zk的小demo。
首先看zk服务配置中心的node图:
服务提供者在启动时,以节点的形式注册到服务配置中心,服务消费者通过服务配置中来获得需要调用的服务名称节点下的机器列表节点。通过负载均衡算法,选取其中一台服务器进行调用。
当服务器宕机或者下线时,由于znode非持久节点的特性,相应的机器可以动态地从服务配置中心中移除,并触发服务消费者的Watcher。
在这个过程中,服务消费者只有在第一次调用服务的时候查询服务配置中心,然后将查询到的服务信息缓存到本地,后面的调用直接使用本地服务地址列表信息,而不需要重新发起请求去服务配置中心去获取相应的服务地址列表,直到服务的地址列表有变更,变更行为会直接触发消费者的Watcher。这样做,对配置中心的压力也很小,只有修改操作才会触发写,大多数读都是读本地内存。
代码如下:
ServiceInfo简单模拟下服务信息,可以没有
public class ServiceInfo { private String serviceName; private List<String> methods; public ServiceInfo() { } public ServiceInfo(String serviceName, List<String> methods) { this.serviceName = serviceName; this.methods = methods; } //get and set }
Provider模板代码,并用ProviderA和ProviderB模拟2台机器上的不同服务
public class Provider { private ServiceInfo serviceInfo; private CuratorFramework cf = null; private String ip; public Provider(ServiceInfo serviceInfo, CuratorFramework cf, String ip) { this.serviceInfo = serviceInfo; this.cf = cf; this.ip = ip; } public void start() throws Exception { //根节点路径 String path = "/configcenter/" + serviceInfo.getServiceName(); Stat stat = cf.checkExists().forPath(path); if (stat == null) { //不存在 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, FastJsonUtils.convertObjectToJSON(serviceInfo).getBytes()); } InetAddress addr = InetAddress.getLocalHost(); String ip = addr.getHostAddress().toString(); System.out.println("获取本机ip:" + ip); //但是为了测试,这里不使用本机ip,因为只有一台机器... cf.create().withMode(CreateMode.EPHEMERAL).forPath(path + "/" + ip); } }
public class ProviderA { public static final String CONNECT_ADDR = "192.168.1.211:2181,192.168.1.212:2181,192.168.1.213:2181"; public static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //简单演示启动服务 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); try { cf.start(); //启动2个服务A和B new Provider( new ServiceInfo( "service-A", Lists.newArrayList("m1", "m2") ), cf, "192.168.1.101" ).start(); new Provider( new ServiceInfo( "service-B", Lists.newArrayList("m3", "m4") ), cf, "192.168.1.101" ).start(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("输入任何字符串关闭服务器"); System.in.read(); CloseableUtils.closeQuietly(cf); } } }
public class ProviderB { public static final String CONNECT_ADDR = "192.168.1.211:2181,192.168.1.212:2181,192.168.1.213:2181"; public static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //简单演示启动服务 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); try { cf.start(); //启动2个服务A和B new Provider( new ServiceInfo( "service-B", Lists.newArrayList("m1", "m2") ), cf, "192.168.1.102" ).start(); } catch (Exception e) { e.printStackTrace(); } finally { System.out.println("输入任何字符串关闭服务器"); System.in.read(); CloseableUtils.closeQuietly(cf); } } }
消费端代码:
public class Client { private CuratorFramework cf = null; //本地保存了数据信息 private Map<String, List<String>> data = Maps.newHashMap(); public static final String CONNECT_ADDR = "192.168.1.211:2181,192.168.1.212:2181,192.168.1.213:2181"; public static final int SESSION_OUTTIME = 5000;//ms public Client() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); this.cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); this.cf.start(); } public static void main(String[] args) throws Exception { Client client = new Client(); client.subscribe(new ServiceInfo( "service-A", Lists.newArrayList("m1", "m2") )); client.subscribe(new ServiceInfo( "service-B", Lists.newArrayList("m3", "m4") )); System.out.println("输入任何键关闭"); System.in.read(); CloseableUtils.closeQuietly(client.cf); } public void subscribe(final ServiceInfo serviceInfo) throws Exception { String serviceName = serviceInfo.getServiceName(); //订阅某一个服务 final String path = "/configcenter/" + serviceName; System.out.println("path:" + path); Stat stat = cf.checkExists().forPath(path); if (stat == null) { System.out.println(serviceName + "没有服务"); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, FastJsonUtils.convertObjectToJSON(serviceInfo).getBytes()); } PathChildrenCache cache = new PathChildrenCache(cf, path, true); //5 在初始化的时候就进行缓存监听 cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.getListenable().addListener((CuratorFramework client, PathChildrenCacheEvent event) -> { List<String> children = client.getChildren().forPath(path); data.put(serviceName, children); System.out.println(String.format("%s发生了改变:%s", serviceName, children.toString())); }); } }
先启动client,再启动ProviderA和ProviderB,可以发现Client自动发现服务的行为。