zoukankan      html  css  js  c++  java
  • Zookeeper Client简介

    直接使用zk的api实现业务功能比较繁琐。因为要处理session loss,session expire等异常,在发生这些异常后进行重连。又因为ZK的watcher是一次性的,如果要基于wather实现发布/订阅模式,还要自己包装一下,将一次性订阅包装成持久订阅。另外如果要使用抽象级别更高的功能,比如分布式锁,leader选举等,还要自己额外做很多事情。这里介绍下ZK的两个第三方客户端包装小工具,可以分别解决上述小问题。

    一、 zkClient
    zkClient主要做了两件事情。一件是在session loss和session expire时自动创建新的ZooKeeper实例进行重连。另一件是将一次性watcher包装为持久watcher。后者的具体做法是简单的在watcher回调中,重新读取数据的同时再注册相同的watcher实例。

    zkClient简单的使用样例如下:

      public static void testzkClient(final String serverList) {
            ZkClient zkClient4subChild = new ZkClient(serverList);
            zkClient4subChild.subscribeChildChanges(PATH, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List currentChilds) throws Exception {
                    System.out.println(prefix() + "clildren of path " + parentPath + ":" + currentChilds);
                }
            });

    上面是订阅children变化,下面是订阅数据变化

      ZkClient zkClient4subData = new ZkClient(serverList);
            zkClient4subData.subscribeDataChanges(PATH, new IZkDataListener() {
                @Override
                public void handleDataChange(String dataPath, Object data) throws Exception {
                    System.out.println(prefix() + "Data of " + dataPath + " has changed");
                }
    
                @Override
                public void handleDataDeleted(String dataPath) throws Exception {
                    System.out.println(prefix() + dataPath + " has deleted");
                }
            });

    订阅连接状态的变化:

     ZkClient zkClient4subStat = new ZkClient(serverList);
            zkClient4subStat.subscribeStateChanges(new IZkStateListener() {
                @Override
                public void handleNewSession() throws Exception {
                    System.out.println(prefix() + "handleNewSession()");
                }
    
                @Override
                public void handleStateChanged(KeeperState stat) throws Exception {
                    System.out.println(prefix() + "handleStateChanged,stat:" + stat);
                }
            });

    下面表格列出了写操作与ZK内部产生的事件的对应关系:

     **event For "/path"****event For "/path/child"**
    **create("/path")** EventType.NodeCreated NA
    **delete("/path")** EventType.NodeDeleted NA
    **setData("/path")** EventType.NodeDataChanged NA
    **create("/path/child")** EventType.NodeChildrenChanged EventType.NodeCreated
    **delete("/path/child")** EventType.NodeChildrenChanged EventType.NodeDeleted
    **setData("/path/child")** NA EventType.NodeDataChanged

    而ZK内部的写事件与所触发的watcher的对应关系如下:

    **event For "/path"****defaultWatcher****exists ("/path")****getData ("/path")****getChildren ("/path")**
    **EventType.None**
    **EventType.NodeCreated**    
    **EventType.NodeDeleted**   √(不正常)  
    **EventType.NodeDataChanged**    
    **EventType.NodeChildrenChanged**      

    综合上面两个表,我们可以总结出各种写操作可以触发哪些watcher,如下表所示:

     **"/path"****"/path/child"**
     **exists****getData****getChildren****exists****getData****getChildren**
    **create("/path")** **√** **√** ** ** ** ** ** ** ** **
    **delete("/path")** **√** **√** **√** ** ** ** ** ** **
    **setData("/path")** **√** **√** ** ** ** ** ** ** ** **
    **create("/path/child")** ** ** ** ** **√** **√** **√** ** **
    **delete("/path/child")** ** ** ** ** **√** **√** **√** **√**
    **setData("/path/child")** ** ** ** ** ** ** **√** **√** ** **

    如果发生session close、authFail和invalid,那么所有类型的wather都会被触发 zkClient除了做了一些便捷包装之外,对watcher使用做了一点增强。比如subscribeChildChanges实际上是通过exists和getChildren关注了两个事件。这样当create("/path")时,对应path上通过getChildren注册的listener也会被调用。另外subscribeDataChanges实际上只是通过exists注册了事件。因为从上表可以看到,对于一个更新,通过exists和getData注册的watcher要么都会触发,要么都不会触发。 zkClient地址:[https://github.com/sgroschupf/zkclient](https://github.com/sgroschupf/zkclient) Maven工程中使用zkClient需要加的依赖:

        <dependency>
            <groupId>zkclient</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.1</version>
        </dependency>

    二、 menagerie

    menagerie基于Zookeeper实现了java.util.concurrent包的一个分布式版本。这个封装是更大粒度上对各种分布式一致性使用场景的抽象。其中最基础和常用的是一个分布式锁的实现:
    org.menagerie.locks.ReentrantZkLock,通过ZooKeeper的全局有序的特性和EPHEMERAL_SEQUENTIAL类型znode的支持,实现了分布式锁。具体做法是:不同的client上每个试图获得锁的线程,都在相同的basepath下面创建一个EPHEMERAL_SEQUENTIAL的node。EPHEMERAL表示要创建的是临时znode,创建连接断开时会自动删除; SEQUENTIAL表示要自动在传入的path后面缀上一个自增的全局唯一后缀,作为最终的path。因此对不同的请求ZK会生成不同的后缀,并分别返回带了各自后缀的path给各个请求。因为ZK全局有序的特性,不管client请求怎样先后到达,在ZKServer端都会最终排好一个顺序,因此自增后缀最小的那个子节点,就对应第一个到达ZK的有效请求。然后client读取basepath下的所有子节点和ZK返回给自己的path进行比较,当发现自己创建的sequential node的后缀序号排在第一个时,就认为自己获得了锁;否则的话,就认为自己没有获得锁。这时肯定是有其他并发的并且是没有断开的client/线程先创建了node。

    基于分布式锁,还实现了其他业务场景,比如leader选举:

    public static void leaderElectionTest() {
    ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181”, 5000);
    LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
    if (elector.nominateSelfForLeader()) {
    System.out.println(“Try to become the leader success!”);
    }
    }

    java.util.concurrent包下面的其他接口实现,也主要是基于ReentrantZkLock的,比如ZkHashMap实现了ConcurrentMap。具体请参见menagerie的API文档

    menagerie地址:https://github.com/wxuedong/menagerie
    Maven工程中使用menagerie需要加的依赖:

        <dependency>
            <groupId>org.menagerie</groupId>
            <artifactId>menagerie</artifactId>
            <version>1.1-SNAPSHOT</version>
        </dependency>
  • 相关阅读:
    学习Python中的集合
    ubuntu14.04下 安装matlabR2015b遇到的一些问题及其解决方法
    matlab的一些关于块分类的函数~~~
    20145207 《Java程序设计》第二周学习总结
    20145207 调查问卷
    20145207 《Java程序设计》第一周学习总结
    10、装饰者模式
    9、观察者模式
    8、迭代器模式
    7、适配器模式
  • 原文地址:https://www.cnblogs.com/wxd0108/p/7097950.html
Copyright © 2011-2022 走看看