zoukankan      html  css  js  c++  java
  • zookeeper客户端之curator

    curator简介

    curator是netflix公司开源的一个zk客户端

    原生zkAPI的不足:

    • 连接对象异步创建,需要开发人员自行编码等待
    • 连接没有自动重连超时机制
    • watcher一次注册生效一次
    • 不支持递归创建树形节点

    curator特点:

    • 解决session会话超时重连
    • watcher反复注册
    • 简化开发api
    • 遵循Fluent风格的API
    • 提供了分布式锁服务,共享计数器,缓存机制等

    watchAPI

    curator提供了两种watcher(cache)来监听节点的变化:

    1. Nodecache:只监听某一特定的节点,监听节点的新增和修改
    2. PathChildrenCache:监听一个znode的子节点,当一个子节点增加,更新,删除时,Path Cache会改变它的状态,会包含最新的子节点,以及子节点的数据和状态。

    分布式锁

    1. InterProcessMutex:分布式可重入排他锁
    2. InterProcessReadWriteLock:分布式读写锁

    代码实现:

    1. 创建节点

        public void create3() throws Exception {
            CuratorConnection.connect();
            client.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.PERSISTENT)
                    .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
                    .inBackground(new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("path:" + curatorEvent.getPath());
                            System.out.println("type:" + curatorEvent.getType());
                        }
                    })
                    .forPath("/node8", "node8".getBytes());
            Thread.sleep(4000);
            CuratorConnection.close();
        }

    2. 删除节点

        /**
         * 1. 版本号校验:.withVersion(int i)
         * 2. 若存在子节点则递归删除:.deletingChildrenIfNeeded()
         * 3. 异步方式:.inBackground(Callback...)
         *
         * @throws Exception
         */
        @Test
        public void delete1() throws Exception {
            client.delete()
                    .deletingChildrenIfNeeded()
                    .inBackground()
                    .forPath("/node8");
            CuratorConnection.close();
        }

    3. 检验节点

        /**
         * 1. 异步方式: .inBackground(CallBack...)
         *
         * @throws Exception
         */
        public void exist1() throws Exception {
            //判断节点是否存在
            Stat stat = client.checkExists().forPath("/node9");
            if (stat == null) {
                System.out.println("节点不存在");
            }
            CuratorConnection.close();
    
        }

    4. 获取节点信息

    public class NodeGet {
        static CuratorFramework client = CuratorConnection.getClient();
    
    
        public void get1() throws Exception {
            byte[] bytes = client.getData().forPath("/node6");
            System.out.println("结果是:" + new String(bytes));
            CuratorConnection.close();
        }
    
        /**
         * 1. 获取节点的属性: .storingStatIn(stat)
         * 2. 异步方式读取:.inBackground()
         *
         * @throws Exception
         */
        public void get2() throws Exception {
            Stat stat = new Stat();
            client.getData().storingStatIn(stat)
                    .forPath("/node6");
            System.out.println(stat);
            CuratorConnection.close();
        }
    
        /**
         * 获取子节点
         * 1. 异步读取:.inBackground(Callback...)
         *
         * @throws Exception
         */
        public void get3() throws Exception {
            List<String> list = client.getChildren().forPath("/node6");
            System.out.println(list);
        }
    
    }

    5. 设置节点

    public class NodeSet {
        static CuratorFramework client = CuratorConnection.getClient();
    
        public void set1() {
            try {
                client.setData().forPath("/node8", "new Data node8".getBytes());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 指定版本号,version设置为-1代表任意版本
         * 异步修改:.inBackground(new BackgroundCallback{...})
         */
        @Test
        public void set2() {
            try {
                client.setData()
                        .withVersion(3)
                        .forPath("/node8", "new node8".getBytes());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }

    6. 在一个方法中多次对节点数据操作时,开启事务

        public void test1() throws Exception {
            // 开启事务
            client.inTransaction().create()
                    .forPath("/trans/node1", "node1 data".getBytes())
                    .and()
                    .create()
                    .forPath("/trans/node2", "node2 data".getBytes())
                    .and()
                    .commit();
            client.close();
        }

    7. watcher监听

    public class NodeWatcher {
        static CuratorFramework client = CuratorConnection.getClient();
    
        /**
         * 1. watcher不是一次性的,可反复监听
         *
         * @throws Exception
         */
        public void watcher1() throws Exception {
            NodeCache nodeCache = new NodeCache(client, "/watcher1");
            //启动监视器对象
            nodeCache.start();
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    //节点变化时回调的方法
                    System.out.println(nodeCache.getCurrentData().getPath());
                    System.out.println(new String(nodeCache.getCurrentData().getData()));
                }
            });
            Thread.sleep(100000);
            nodeCache.close();
            CuratorConnection.close();
            System.out.println("结束");
        }
    
        @Test
        public void watcher2() throws Exception {
            /**
             * arg1:连接对象
             * arg2:监视的节点路径
             * arg3:事件中是否可以获取节点的数据
             */
            PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/watcher2", true);
            pathChildrenCache.start();
            pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    // 当子节点发生变化时回调的方法
                    // 类型
                    System.out.println(pathChildrenCacheEvent.getType());
                    // 路径
                    System.out.println(pathChildrenCacheEvent.getData().getPath());
                    // 数据
                    System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                }
            });
    
            Thread.sleep(100000);
            pathChildrenCache.close();
            CuratorConnection.close();
            System.out.println("结束");
        }
    
    }

    8 分布式锁

    public class CuratorLock {
        CuratorFramework client = CuratorConnection.getClient();
    
        /**
         * 分布式可重入排他锁
         *
         * @throws Exception
         */
        @Test
        public void lock1() throws Exception {
            ArrayList<String> paths = new ArrayList<>();
            paths.add("/lock/lock1");
            paths.add("/lock/lock2");
            InterProcessLock interProcessLock = new InterProcessMultiLock(client, paths);
            System.out.println("等待获取锁");
            // 获取锁
            interProcessLock.acquire();
    
            for (int i = 0; i < 10; i++) {
                Thread.sleep(3000);
                System.out.println(i);
    
            }
            // 释放锁
            interProcessLock.release();
            client.close();
        }
    
    
        @Test
        public void lock2() throws Exception {
            // 读写锁
            InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/lock/lock2");
            // 获取写锁对象
            InterProcessMutex writeLock = readWriteLock.writeLock();
            writeLock.acquire();
            for (int i = 0; i < 10; i++) {
                Thread.sleep(3000);
                System.out.println(i);
            }
            writeLock.release();
            client.close();
    
        }
    
        @Test
        public void lock3() throws Exception {
            // 读写锁
            InterProcessReadWriteLock readWriteLock = new InterProcessReadWriteLock(client, "/lock/lock2");
            // 获取读锁对象
            InterProcessMutex readLock = readWriteLock.readLock();
            readLock.acquire();
            for (int i = 0; i < 10; i++) {
                Thread.sleep(3000);
                System.out.println(i);
            }
            readLock.release();
            client.close();
    
        }
    
    }

    对zk中同一路径path上锁,即会加锁

    zk支持某些特定的四字命令与其交互。它们大多是查询命令,用来获取zk服务的当前状态及相关信息。用户在客户端可以通过telnet或nc向zk提交相应的命令。

    • conf:输出相关服务配置的详尽信息。端口号,zk数据及日志配置路径,最大连接数,session超时时间,serverid等
    • cons:列出所有连接到这台服务器的客户端连接/会话的详尽信息。包括"接收/发送"的包数量,sessionID,操作延迟,最后的操作执行等信息。
    • crst:重置当前这台服务器的所有连接/会话的统计信息
    • dump:列出未经处理的会话和临时节点
    • envi:输出关于服务器的环境详尽信息
    • ruok:测试服务是否处于正确运行状态.如果正常返回imok,否则返回空
    • stat:输出服务器的详尽信息:接收/发送包数量,连接数,模式(leader/follower),节点总数,延迟。
    • srst:重置server状态
    • wchs:列出服务器watchers的简介信息:连接总数,watching节点总数和watches总数
    • wchc:通过session分组,列出watch的所有节点,它的输出是一个watch相关的会话的节点列表
    • wchp:通过路径分组,列出所有的watch的session id信息
    • mntr:列出集群的健康状态。包括"接收/发送"的包数量,操作延迟,当前服务模式(leader/follower),节点总数,watcher总数,临时节点总数

    使用方式:

     echo 【命令】| nc 【ip】 【port】

    例如:echo conf | nc 172.18.19.143 2181

    详解:

    conf命令:

    • echo conf | nc 172.18.19.143 2181;
    • clientPort:客户端端口号
    • dataDir:数据快照文件目录,默认情况下100000次事务操作生成一次快照;
    • dataLogDir:事务日志文件目录,生产环境中放在独立的磁盘中;
    • tickTime:服务器之间或客户端与服务器之间维持心跳的时间间隔(以毫秒为单位);
    • maxClientCnxns:最大连接数;
    • minSessionTimeout:最小session超时,默认minSessionTimeout=tickTime*2;
    • maxSessionTimeout:最大session超时,默认maxSessionTimeout=tickTime*20;
    • serverId:服务器编号;
    • initLimit:集群中的follower服务器(F)与leader服务器(L)之间初始连接时能容忍的最多心跳数;
    • syncLimit:集群中的follower服务器(F)与leader服务器(L)之间请求和应答之间能容忍的最多心跳数;
    • electionAlg:在3.4.10版本中,默认值为3,另外三种算法已经被弃用
      • 0:基于UDP的LeaderElection
      • 1:基于UDP和认证的FastLeaderElection
      • 2:基于UDP和认证的FastLeaderElection
      • 3:基于TCP的FastLeaderElection
    • electionPort:选举端口;
    • quorumPort:数据通信端口;
    • peerType:是否为观察者,1为观察者

    cons命令:

    • ip:ip地址;
    • port:端口号;
    • queued:等待被处理的请求数,请求缓存在队列中;
    • received:收到的包数;
    • sent:发送的包数;
    • sid:会话id;
    • lop:最后的操作GETD-读取数据,DELE-删除数据,CREA-创建数据;
    • est:连接的时间戳;
    • to:超时时间;
    • lcxid:当前会话的操作id;
    • lzxid:最大事务id;
    • lresp:最后响应时间戳;
    • llat:最后/最新 延时;
    • minlat:最小延时;
    • maxlat:最大延时;
    • avglat:平均延时;

    stat命令:

    • Zookeeper version:版本;
    • latency min/avg/max:延时;
    • received:收包;
    • sent:发包;
    • connections:连接数;
    • outstanding:堆积数;
    • zxid:最大事务id;
    • mode:服务器角色;
    • node count:节点数;

    wchc命令:

    直接使用会有问题:wchc is not executed because it is not in the whitelist.
    解决办法:修改zkServer.sh:
    在下面添加如下信息:
    ZOOMAIN="-Dzookeeper.41w.commands.whitelist=* ${ZOOMAIN}"


    mntr命令:此命令与stat命令相似,不多显示的信息更加详细

    • zk_version:版本;
    • zk_avg_latency:平均延迟;
    • zk_max_latency:最大延迟;
    • zk_min_latency:最小延迟;
    • zk_packetzk_packets_sents_received:收包数;
    • zk_packets_sent:发包数;
    • zk_num_alive_connections:连接数;
    • zk_outstanding_requests:堆积请求数;
    • zk_server_state:leader/follower状态;
    • zk_znode_count:znode数量;
    • zk_watch_count:watch数量;
    • zk_ephemerals_count:临时节点(znode)数量;
    • zk_approximate_data_size:数据大小;
    • zk_open_file_descriptor_count:打开的文件描述符数量;
    • zk_max_file_descriptor_count:最大文件描述符数量;
  • 相关阅读:
    【实验吧】藏在图片中的秘密
    pwntools各使用模块简介
    【笔记】shellcode相关整理
    【pwnable】asm之write up
    【实验吧】转瞬即逝write up
    利用wireshark任意获取qq好友IP实施精准定位
    【实验吧】逆向1000
    【实验吧】逆向rev50
    pwnable.kr brainfuck之write up
    JavaScript获取后台C#变量以及调用后台方法
  • 原文地址:https://www.cnblogs.com/kongieg/p/13367381.html
Copyright © 2011-2022 走看看