zoukankan      html  css  js  c++  java
  • 分布式协调Zookeeper使用(Watcher、Curator、Session、Acl)

    分布式协调-Zookeeper使用(Watcher、Session、Curator、Acl)

    前面说到zk可以为shardingSphere当做动态配置的一个中间件,然后聊了一下zk的大体介绍,本篇咱们聊聊他的一些常见的特性,并且对其进行相关阐释,同时使用Curator作为Demo。本篇会聊到:

    • 【State】:zk上每个节点除了存储了节点数据,同时也存储了一些节点的状态信息。我们会分析一下。
    • 【Watcher(发布订阅)】:在shardSphere使用zk作为分布式配置的那一篇,我们在zk上手动修改了配置,然后ShardingSphere就可以感知到,其底层就是基于这个特性来做的。
    • 【Session】:客户端和session建立连接的流程。
    • 【Acl】(权限控制):因为不是谁都有权限对zk上的node进行操作的,一旦操作不当,系统就可能宕机。
    • 【Curator】:对zk的api封装的一个客户端框架。

    Stat(每个节点的状态和信息)

     
    czxid 表示该数据节点被创建时的事务ID
    mzxid 表示该节点最后一次被更新时的事务ID
    ctime 表示节点被创建的时间
    mtime 表示该节点最后一次被更新的时间
    version 数据节点的版本号,这里其实是一种乐观锁。我们每次修改一个节点数据的时候,节点的version就会增加。那每个客户端在修改节点的时候,带一个version,当他们传递的version和当前version不一致的时候,就修改失败。
    cversion 子结点的版本号
    aversion 节点的ACL版本号
    ephemeralOwner 创建该临时节点的会话的sessionID。如果该节点是持久节点,那么这个属性值为0
    dataLength 数据内容的长度
    numChildren 当前节点的子节点个数
    pzxid 表示该节点的子节点里最后一次被修改时的事务ID。

    version 】:像上面解释的一样,现在我们的版本seq的版本是2,我们修改时带上版本是1就无法修改。

    Watcher

    我们的shardingSphere使用zk作为配置中心,当在zk上修改了配置后,shardingSphere就能感知到,就是通过watcher做的,zk实际上是和我们的客户端建立连接,并且主动通知客户端有数据修改了。我们这里举个例子。

    对某个节点建立一个监听:所有命令带w的都是可以进行监听的

     比如:我们在get的时候对某个节点进行监听,那么当其他客户端对我们get的这个数据进行操作的时候,我们对这个节点监听的节点就会收到消息。

    现在我们使用客户端1对seq这个节点get的时候进行监听

    然后使用客户端2对这个节点的值进行修改

     这个时候节点一就能收到被监听节点的修改信息

     问题是当我们再次这个数据进行修改的时候,修改的信息并没有被监听到,这也就是说,这种方式只是一次性监听。那如何进行每次修改都被监听到呢?

    • 循环监听:我们看到,当收到信息时候,我们是可以知道哪个节点被修改了,那我们就可以拿到这个节点再次进行监听。
    • addwatch:我们发现它命令中有一个addwatch,这个就是实现持续监听的方法。

     

    持久化监听里面提供了两种方式:

    【PERSISTENT】:持久化订阅,针对当前节点的修改和删除事件,以及当前节点的子节点的删除和新增事件
     
    【PERSISTENT_RECURSIVE】:持久化递归订阅,在PERSISTENT的基础上,增加了子节点修改的事件触发,以及子节点的子节点的数据变化都会触发相关事件(满足递归订阅特性)
     
    那了解了这个特性就知道,我们的shardingSphere指定是对rules(存储他的配置文件的及节点)进行了addwather的监听,这样当我们修改了rules的数据,监听这个节点的那些个shardingSphere就收到了信息啦。

    Session(当客户端连接zkServer的时候,是一个异步的状态.)

    • 客户端向Zookeeper Server发起连接请求,此时状态为CONNECTING
    • 当连接建立好之后,Session状态转化为CONNECTED,此时可以进行数据的IO操作
    • 如果Client和Server的连接出现丢失,则Client又会变成CONNECTING状态
    • 如果会话过期或者主动关闭连接时,此时连接状态为CLOSE
    • 如果是身份验证失败,直接结束 

     Curator(使用java操作zk)

    pom

        <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>5.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>5.2.0</version>
            </dependency>
    View Code

    以下的代码,就是对zk进行操作的一个案例,包含了增删改查。curator提供了两种方式,同步和异步的操作。

    【增删改查同步操作】

       // 启动以及连接zk
        private CuratorOperationExample(){
            curatorFramework= CuratorFrameworkFactory
                    .builder()
                    .connectionTimeoutMs(20000)
                    .connectString("192.168.43.3:2181") //读写分离(zookeeper-server)
                    .retryPolicy(new ExponentialBackoffRetry(1000,3))
                    .sessionTimeoutMs(15000)
                    .build();
            curatorFramework.start(); //启动
        }
    
        // 对节点进行操作
        private void nodeCRUD() throws Exception {
            System.out.println("开始针对节点的CRUD操作");
            //这个要给节点中存储的数据
            String value="Hello World";
            //创建一个节点
            String node=curatorFramework.create()
                    .creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .forPath("/node",value.getBytes());
            System.out.println("节点创建成功:"+node);
            //存储状态信息的对象
            Stat stat=new Stat();
            //获取节点的value
            byte[] data=curatorFramework.getData().storingStatIn(stat).forPath(node);
            System.out.println("节点value值:"+new String(data));
            //这里使用先查询,然后修改,因为可能有多个客户端同时连接。可能存在锁的问题。这里查询出来version然后再进行操作
            stat=curatorFramework.setData()
                    .withVersion(stat.getVersion())
                    .forPath(node,"Update Date Result".getBytes());
            String result=new String(curatorFramework.getData().forPath(node));
            System.out.println("修改节点之后的数据:"+result);
            System.out.println("开始删除节点");
            curatorFramework.delete().forPath(node);
            Stat existStat=curatorFramework.checkExists().forPath(node);
            if(existStat==null){
                System.out.println("节点删除成功");
            }
        }
    View Code

    【异步增操作】 

        // 创建一个节点的异步方式,其他的操作都有对应的api,
        // 这里的CountDownLatch是为了不让线程直接跑下去,
        // 要是直接跑下去的话,就看不到创建的节点了,因为是异步的,而当执行到获取节点的时候,可能还没有创建好,只是为了看见这个节点内容而已,没有任何作用。
        public void asyncCRUD() throws Exception {
            CountDownLatch countDownLatch=new CountDownLatch(1);
            // ZK会回调BackgroundCallback里面的方法进行回调
            String node=curatorFramework.create().withMode(CreateMode.PERSISTENT)
                    .inBackground((session,event)->{
                System.out.println(Thread.currentThread().getName()+":执行创建节点:"+event.getPath());
                countDownLatch.countDown(); //触发回调,递减计数器
            }).forPath("/async-node");
            countDownLatch.await();
        }
    View Code

    【Acl操作:digest】digest为例 因为他是对于每次会话的授权,那我们在建立连接的时候就要加上,【.authorization("digest","glen:glen".getBytes())】

        //创酱一个acl节点
        //scheme 是digest id是glen:glen 权限是ala
        private void aclOperation() throws Exception {
            Id id=new Id("digest", DigestAuthenticationProvider.generateDigest("glen:glen"));
            List<ACL> acls=new ArrayList<>();
            acls.add(new ACL(ZooDefs.Perms.ALL,id));
            String node=curatorFramework.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(acls,false).forPath("/curator-auth","Auth".getBytes());
            System.out.println("创建带有权限节点:"+node);
            System.out.println("数据查询结果:"+new String(curatorFramework.getData().forPath(node)));
        }
    View Code

    一次性监听

        // 一次性监听
        public void normalWatcher() throws Exception {
            CuratorWatcher curatorWatcher=new CuratorWatcher() {
                @Override
                public void process(WatchedEvent watchedEvent) throws Exception {
                    System.out.println("监听到的事件"+watchedEvent.toString());
                    //循环设置监听
                    curatorFramework.checkExists().usingWatcher(this).forPath(watchedEvent.getPath());
                }
            };
            // 创酱一个节点
            String node=curatorFramework.create().forPath("/watcher","Watcher String".getBytes());
            System.out.println("节点创建成功:"+node);
            //设置一次普通的watcher监听
            String data=new String(curatorFramework.getData().usingWatcher(curatorWatcher).forPath(node));
            System.out.println("设置监听并获取节点数据:"+data);
            //第一次操作才会触发监听,而第二次不会。所以上面在回调方法中我们设置了一个循环监听。
            curatorFramework.setData().forPath(node,"change data 0".getBytes());
            Thread.sleep(1000);
            curatorFramework.setData().forPath(node,"change data 1".getBytes());
        }
    View Code

    持续化监听

        //持久化监听
        //node :要监听的节点名称
        private void persisWatcher(String node){
            CuratorCache curatorCache=CuratorCache.
                    //实例;去监听的节点;操作类型(单节点缓存,对数据进行压缩,关闭或不清理缓存)
                    build(curatorFramework,node, CuratorCache.Options.SINGLE_NODE_CACHE);
            //这里可以设置对于事件的监听类型
            CuratorCacheListener listener=CuratorCacheListener
                    .builder()
                    //这里是我们自己写的一个类,他会调用我们类的方法,这个实现了他的CuratorCacheListener接口
                    .forAll(new ZookeeperWatcherListener())
                    .build();
            //把事件监听添加进去
            curatorCache.listenable().addListener(listener);
            curatorCache.start();
        }
    View Code

    测试

    //这里对节点进行操作,看看是不是会别检测到事件
        private void operation(String node) throws Exception {
            curatorFramework.create().forPath(node);
            curatorFramework.setData().forPath(node,"hello".getBytes());
            curatorFramework.delete().forPath(node);
        }
    
        public static void main(String[] args) throws Exception {
            ZookeeperWatchExample zookeeperWatchExample=new ZookeeperWatchExample();
            String node="/persis-node";
            zookeeperWatchExample.persisWatcher(node);
            zookeeperWatchExample.operation(node);
            //让main方法等待
            System.in.read();
        }
    View Code

    ACL权限控制

    zk针对节点提供了权限的控制,这是因为要规避有人不小心删除了某个节点,而导致整个系统出现问题的情况。他和linux的权限控制相似。

    他的权限标志符是这样的:【scheme:id:perm

    比如我们获取seq节点,他的scheme就是world(全部都能访问),id就是所有人,permission就是增、删、改、查、管理

    • Scheme(权限模式),标识授权策略,即表示通过什么样子的方式去控制权限。
      • 【world】:默认方式,相当于全部都能访问。
      • 【auth】:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
      • 【digest】:即用户名:密码这种方式认证
      • 【ip】:通过ip地址来做权限控制。
    • ID(授权对象):比如说我们的scheme是ip,那这里就填写ip,如果是digest,那就填写用户名和密码
    • Permission:授予的权限 (c) create . (d)delete (r)read (w)write  (a)admin 

    【world】通过get和set【acl】命令去修改一个节点的权限,一个world的例子。

    auth 对登录用户进行授权,授权后可以操作授权的节点,而退出客户端后,再次进入就需要再次授权了。前面的glen是用户名 后面的glen是密码  

     当退出后,我们就无法操作atuh这个节点,必须再次进行授权了。

     

  • 相关阅读:
    在调试asp.net程序时,提示windows窗体身份验证错误怎么办
    关于用css数字与汉字的垂直对其问题
    如何在存储过程中使用like操作符
    iframe的页面内容如何获取父页面的地址
    Visual Studio 2005常用插件搜罗(zz)
    如何通过javascript动态改变按钮的css属性值
    双十一数据库也疯狂,SQL Server 无法生成 FRunCM 线程,求解
    Oracle 连接玩我!ORA12514及ORA28547错误解决
    ASP.NET MVC Web API 学习笔记第一个Web API程序
    SQL Server数据库同步问题分享(三)创建订阅
  • 原文地址:https://www.cnblogs.com/UpGx/p/15569174.html
Copyright © 2011-2022 走看看