curator简介
curator是netflix公司开源的一个zk客户端
原生zkAPI的不足:
- 连接对象异步创建,需要开发人员自行编码等待
- 连接没有自动重连超时机制
- watcher一次注册生效一次
- 不支持递归创建树形节点
curator特点:
- 解决session会话超时重连
- watcher反复注册
- 简化开发api
- 遵循Fluent风格的API
- 提供了分布式锁服务,共享计数器,缓存机制等
watchAPI
curator提供了两种watcher(cache)来监听节点的变化:
- Nodecache:只监听某一特定的节点,监听节点的新增和修改
- PathChildrenCache:监听一个znode的子节点,当一个子节点增加,更新,删除时,Path Cache会改变它的状态,会包含最新的子节点,以及子节点的数据和状态。
分布式锁
- InterProcessMutex:分布式可重入排他锁
- InterProcessReadWriteLock:分布式读写锁
代码实现:
1. 创建节点
public void create3() throws Exception { CuratorConnection.connect(); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println("path:" + curatorEvent.getPath()); System.out.println("type:" + curatorEvent.getType()); } }) .forPath("/node8", "node8".getBytes()); Thread.sleep(4000); CuratorConnection.close(); }
2. 删除节点
/** * 1. 版本号校验:.withVersion(int i) * 2. 若存在子节点则递归删除:.deletingChildrenIfNeeded() * 3. 异步方式:.inBackground(Callback...) * * @throws Exception */ @Test public void delete1() throws Exception { client.delete() .deletingChildrenIfNeeded() .inBackground() .forPath("/node8"); CuratorConnection.close(); }
3. 检验节点
/** * 1. 异步方式: .inBackground(CallBack...) * * @throws Exception */ public void exist1() throws Exception { //判断节点是否存在 Stat stat = client.checkExists().forPath("/node9"); if (stat == null) { System.out.println("节点不存在"); } CuratorConnection.close(); }
4. 获取节点信息
public class NodeGet { static CuratorFramework client = CuratorConnection.getClient(); public void get1() throws Exception { byte[] bytes = client.getData().forPath("/node6"); System.out.println("结果是:" + new String(bytes)); CuratorConnection.close(); } /** * 1. 获取节点的属性: .storingStatIn(stat) * 2. 异步方式读取:.inBackground() * * @throws Exception */ public void get2() throws Exception { Stat stat = new Stat(); client.getData().storingStatIn(stat) .forPath("/node6"); System.out.println(stat); CuratorConnection.close(); } /** * 获取子节点 * 1. 异步读取:.inBackground(Callback...) * * @throws Exception */ public void get3() throws Exception { List<String> list = client.getChildren().forPath("/node6"); System.out.println(list); } }
5. 设置节点
public class NodeSet { static CuratorFramework client = CuratorConnection.getClient(); public void set1() { try { client.setData().forPath("/node8", "new Data node8".getBytes()); } catch (Exception e) { e.printStackTrace(); } } /** * 指定版本号,version设置为-1代表任意版本 * 异步修改:.inBackground(new BackgroundCallback{...}) */ @Test public void set2() { try { client.setData() .withVersion(3) .forPath("/node8", "new node8".getBytes()); } catch (Exception e) { e.printStackTrace(); } } }
6. 在一个方法中多次对节点数据操作时,开启事务
public void test1() throws Exception { // 开启事务 client.inTransaction().create() .forPath("/trans/node1", "node1 data".getBytes()) .and() .create() .forPath("/trans/node2", "node2 data".getBytes()) .and() .commit(); client.close(); }
7. watcher监听
public class NodeWatcher { static CuratorFramework client = CuratorConnection.getClient(); /** * 1. watcher不是一次性的,可反复监听 * * @throws Exception */ public void watcher1() throws Exception { NodeCache nodeCache = new NodeCache(client, "/watcher1"); //启动监视器对象 nodeCache.start(); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { //节点变化时回调的方法 System.out.println(nodeCache.getCurrentData().getPath()); System.out.println(new String(nodeCache.getCurrentData().getData())); } }); Thread.sleep(100000); nodeCache.close(); CuratorConnection.close(); System.out.println("结束"); } @Test public void watcher2() throws Exception { /** * arg1:连接对象 * arg2:监视的节点路径 * arg3:事件中是否可以获取节点的数据 */ PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher2", true); pathChildrenCache.start(); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { // 当子节点发生变化时回调的方法 // 类型 System.out.println(pathChildrenCacheEvent.getType()); // 路径 System.out.println(pathChildrenCacheEvent.getData().getPath()); // 数据 System.out.println(new String(pathChildrenCacheEvent.getData().getData())); } }); Thread.sleep(100000); pathChildrenCache.close(); CuratorConnection.close(); System.out.println("结束"); } }
8 分布式锁
public class CuratorLock { CuratorFramework client = CuratorConnection.getClient(); /** * 分布式可重入排他锁 * * @throws Exception */ @Test public void lock1() throws Exception { ArrayList<String> paths = new ArrayList<>(); paths.add("/lock/lock1"); paths.add("/lock/lock2"); InterProcessLock interProcessLock = new InterProcessMultiLock(client, paths); System.out.println("等待获取锁"); // 获取锁 interProcessLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } // 释放锁 interProcessLock.release(); client.close(); } @Test public void lock2() throws Exception { // 读写锁 InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/lock/lock2"); // 获取写锁对象 InterProcessMutex writeLock = readWriteLock.writeLock(); writeLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } writeLock.release(); client.close(); } @Test public void lock3() throws Exception { // 读写锁 InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/lock/lock2"); // 获取读锁对象 InterProcessMutex readLock = readWriteLock.readLock(); readLock.acquire(); for (int i = 0; i < 10; i++) { Thread.sleep(3000); System.out.println(i); } readLock.release(); client.close(); } }
对zk中同一路径path上锁,即会加锁
zk支持某些特定的四字命令与其交互。它们大多是查询命令,用来获取zk服务的当前状态及相关信息。用户在客户端可以通过telnet或nc向zk提交相应的命令。
- conf:输出相关服务配置的详尽信息。端口号,zk数据及日志配置路径,最大连接数,session超时时间,serverid等
- cons:列出所有连接到这台服务器的客户端连接/会话的详尽信息。包括"接收/发送"的包数量,sessionID,操作延迟,最后的操作执行等信息。
- crst:重置当前这台服务器的所有连接/会话的统计信息
- dump:列出未经处理的会话和临时节点
- envi:输出关于服务器的环境详尽信息
- ruok:测试服务是否处于正确运行状态.如果正常返回imok,否则返回空
- stat:输出服务器的详尽信息:接收/发送包数量,连接数,模式(leader/follower),节点总数,延迟。
- srst:重置server状态
- wchs:列出服务器watchers的简介信息:连接总数,watching节点总数和watches总数
- wchc:通过session分组,列出watch的所有节点,它的输出是一个watch相关的会话的节点列表
- wchp:通过路径分组,列出所有的watch的session id信息
- mntr:列出集群的健康状态。包括"接收/发送"的包数量,操作延迟,当前服务模式(leader/follower),节点总数,watcher总数,临时节点总数
使用方式:
echo 【命令】| nc 【ip】 【port】
例如:echo conf | nc 172.18.19.143 2181
详解:
conf命令:
- echo conf | nc 172.18.19.143 2181;
- clientPort:客户端端口号
- dataDir:数据快照文件目录,默认情况下100000次事务操作生成一次快照;
- dataLogDir:事务日志文件目录,生产环境中放在独立的磁盘中;
- tickTime:服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位);
- maxClientCnxns:最大连接数;
- minSessionTimeout:最小session超时,默认minSessionTimeout=tickTime*2;
- maxSessionTimeout:最大session超时,默认maxSessionTimeout=tickTime*20;
- serverId:服务器编号;
- initLimit:集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数;
- syncLimit:集群中的follower服务器(F)与leader服务器(L)之间请求和应答之间能容忍的最多心跳数;
- electionAlg:在3.4.10版本中,默认值为3,另外三种算法已经被弃用
- 0:基于UDP的LeaderElection
- 1:基于UDP和认证的FastLeaderElection
- 2:基于UDP和认证的FastLeaderElection
- 3:基于TCP的FastLeaderElection
- electionPort:选举端口;
- quorumPort:数据通信端口;
- peerType:是否为观察者,1为观察者
cons命令:
- ip:ip地址;
- port:端口号;
- queued:等待被处理的请求数,请求缓存在队列中;
- received:收到的包数;
- sent:发送的包数;
- sid:会话id;
- lop:最后的操作GETD-读取数据,DELE-删除数据,CREA-创建数据;
- est:连接的时间戳;
- to:超时时间;
- lcxid:当前会话的操作id;
- lzxid:最大事务id;
- lresp:最后响应时间戳;
- llat:最后/最新 延时;
- minlat:最小延时;
- maxlat:最大延时;
- avglat:平均延时;
stat命令:
- Zookeeper version:版本;
- latency min/avg/max:延时;
- received:收包;
- sent:发包;
- connections:连接数;
- outstanding:堆积数;
- zxid:最大事务id;
- mode:服务器角色;
- node count:节点数;
wchc命令:
直接使用会有问题:wchc is not executed because it is not in the whitelist.
解决办法:修改zkServer.sh:
在下面添加如下信息:
ZOOMAIN="-Dzookeeper.41w.commands.whitelist=* ${ZOOMAIN}"
mntr命令:此命令与stat命令相似,不多显示的信息更加详细
- zk_version:版本;
- zk_avg_latency:平均延迟;
- zk_max_latency:最大延迟;
- zk_min_latency:最小延迟;
- zk_packetzk_packets_sents_received:收包数;
- zk_packets_sent:发包数;
- zk_num_alive_connections:连接数;
- zk_outstanding_requests:堆积请求数;
- zk_server_state:leader/follower状态;
- zk_znode_count:znode数量;
- zk_watch_count:watch数量;
- zk_ephemerals_count:临时节点(znode)数量;
- zk_approximate_data_size:数据大小;
- zk_open_file_descriptor_count:打开的文件描述符数量;
- zk_max_file_descriptor_count:最大文件描述符数量;