zoukankan      html  css  js  c++  java
  • Zookeeper 系列(三)Zookeeper API

    Zookeeper 系列(三)Zookeeper API

    本节首先介绍 Zookeeper 的 Shell 命令,再对 Java 操作 Zookeeper 的三种方式进行讲解,本节先介绍 Zookeeper 的原生 API。

    • Zookeeper API:Zookeeper 原生 api
    • ZKClient API
    • Curator API

    一、Shell 命令

    启动 Zookeeper 服务之后,输入以下命令,连接到 Zookeeper 服务:

    zkCli.sh -server localhost:2181
    

    注意: window 下直接 zkCli 启动,不要带参数,否则会报错,详见zookeeper启动抛出NumberFormatException 异常

    连接成功之后,输入 help 之后,屏幕会输出可用的 Zookeeper 命令,如下所示:

    ZooKeeper -server host:port cmd args
            stat path [watch]
            set path data [version]
            ls path [watch]
            delquota [-n|-b] path
            ls2 path [watch]
            setAcl path acl
            setquota -n|-b val path
            history
            redo cmdno
            printwatches on|off
            delete path [version]
            sync path
            listquota path
            rmr path
            get path [watch]
            create [-s] [-e] path data acl
            addauth scheme auth
            quit
            getAcl path
            close
            connect host:port
    

    (1) 查询 ls

    [zk: localhost:2181(CONNECTED) 0] ls /
    [zookeeper]
    [zk: localhost:2181(CONNECTED) 1] ls /zookeeper
    [quota]
    

    (2) 创建新的 Znode 节点 create

    [zk: localhost:2181(CONNECTED) 7] create /date 2018-04-05
    Created /date
    [zk: localhost:2181(CONNECTED) 8] ls /
    [date, zookeeper]
    

    (3) 获取 Znode 节点 get

    [zk: localhost:2181(CONNECTED) 9] get /date
    2018-04-05
    cZxid = 0x7
    ctime = Thu Apr 05 14:21:20 CST 2018
    mZxid = 0x7
    mtime = Thu Apr 05 14:21:20 CST 2018
    pZxid = 0x7
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 10
    numChildren = 0
    

    (4) 修改 Znode 节点 set

    set /date 2018-04-06
    

    (4) 删除 Znode 节点 delete/rmr

    delete /date    # 不能删除非空目录
    rmr /date       # 递归删除
    

    二、Zookeeper API

    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.5.0-alpha</version>
    </dependency>
    

    示例:

    package com.github.binarylei.zookeeper;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author: leigang
     * @version: 2018-04-05
     */
    public class ZookeeperBase {
    
        /** zookeeper 地址,多个用 , 隔开 */
        static final String CONNECT_ADDR = "127.0.0.1";
        /** session 超时时间,单位:ms */
        static final int SESSION_OUTTIME = 5000;
        /** 阻塞程序执行,用于等待zookeeper 连接成功,发送成功信号 */
        static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            ZooKeeper zooKeeper = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher() { // (1)
                @Override
                public void process(WatchedEvent event) {
                    // 获取事件的状态
                    Event.KeeperState state = event.getState();
                    Event.EventType type = event.getType();
                    // 如果是建立连接
                    if (state == Event.KeeperState.SyncConnected) {
                        if (type == Event.EventType.None) {
                            // 如果连接建立成功,则发送信号,让后续阻塞程序向下执行
                            connectedSemaphore.countDown();
                            System.out.println("zookeeper 连接建立");
                        }
                    }
    
                }
            });
    
            // 进行阻塞,等待与 Zookeeper 的连接建立完成
            connectedSemaphore.await();
            System.out.println("============================");
    
            //1. 创建节点
            zooKeeper.create( // (2)
                    "/testRoot",            // 节点路径,不允许递归创建节点
                    "testRoot".getBytes(),        // 节点内容
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,  // 节点权限,一般情况下不用关注
                    CreateMode.PERSISTENT);       // 节点类型
    
            //2. 获取节点
            byte[] data = zooKeeper.getData("/testRoot", false, null);
            System.out.println("获取节点:" + new String(data));
    
            //3. 获取子节点
            List<String> nodes = zooKeeper.getChildren("/", false);
            for (String node : nodes) {
                System.out.println("获取" + node + "子节点:" +
                        new String(zooKeeper.getData("/" + node, false, null)));
            }
    
            //4. 修改节点的值
            Stat stat = zooKeeper.setData("/testRoot", "111".getBytes(), -1);
    
            //5. 判断节点是否存在
            System.out.println("节点是否存在:" + zooKeeper.exists("/testRoot", false));
    
            //6. 删除节点,不支持递归删除
            zooKeeper.delete("/testRoot", -1, new AsyncCallback.VoidCallback() {
                @Override
                public void processResult(int rc, String path, Object ctx) {
                    System.out.println("响应码:" + rc);
                    System.out.println("路径:" + path);
                    System.out.println("上下文:" + ctx);
                }
            }, 1);
    
            zooKeeper.close();
        }
    }
    

    (1) 创建会话方法 :客户端可以通过创建一个 Zookeeper 实例来连接 Zookeeper 服务器。ZooKeeper 构造方法 有 4 个,参数说明如下:

    1. connectstring :连接服务器列表,已","分割。
    2. sessiontimeout :心跳检测时间周期期(秒)
    3. wather :事件处理通知器。
    4. canbereadonly :标识当前会话是否支持只读。
    5. session 和 sessionpasswd :提供连接 zookeeper 的 session 和密码,通过这俩个确定唯一台客户端,目的是可以提供重复会话。

    注意: Zookeeper 客户端和服务器端会话的建立是一个异步的过程 ,程序方法在处理完客户端初始化后立即返回,也就是说程序继续往下执行代码,这样,大多数情况下我们并没有真正构建好一个可用会话,在会话的生命周期处于 "SyncConnected" 时才算真正建立完毕。解决方案是使用 CountDownLatch 阻塞,起到连接建立完成。

    (2) 创建节点

    create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
    
    1. path :节点路径,/nodeName。不允许递归创建节点。
    2. data :节点内容,要求类型是字节数组。
    3. acl :节点权限,一般使用 ZooDefs.Ids.OPEN_ACL_UNSAFE 即可。
    4. createMode :节点类型。PERSISTENT(持久节点)、PERSISTENT_SEQUENTIAL(持久顺序节点)、EPHEMERAL(临时节点)、EPHEMERAL_SEQUENTIAL(临时顺序节点)。

    (3) 获取节点

    getData(String path, boolean watch, Stat stat)
    

    (4) 获取子节点

    getChildren(String path, boolean watch)
    

    (5) 修改节点的值

    # version = -1 表示全部的历史版本,一般使用 -1 即可
    setData(final String path, byte data[], int version)
    

    (6) 判断节点是否存在

    exists(String path, boolean watch)
    

    (7) 删除节点

    delete(final String path, int version)
    

    (8) 所有的节点都有同步和异步区分,以delete为例

    // 参数分别为:路径,版本(-1即可),回调函数,上下文环境
    zooKeeper.delete("/testRoot", -1, new AsyncCallback.VoidCallback() {
        @Override
        public void processResult(int rc, String path, Object ctx) {
            System.out.println("响应码:" + rc);
            System.out.println("路径:" + path);
            System.out.println("上下文:" + ctx);
        }
    }, 1);
    

    注册一个异步回调函数,要实现 AsyncCallback.VoidCallback 接口,重写 processResult(int rc, String path, Object ctx) 方法,当节点创建完毕后执行此方法。

    1. rc :为服务端响应码 0(调用成功)、-4(端口连接)、-110(指定节点存在)、-112(会话已经过期)。
    2. path :接口调用时传入 API 的数据节点的路径参数
    3. ctx :为调用接口传入 API 的 ctx 值

    三、Zookeeper Watcher

    3.1 watcher 的概念

    Zookeeper Watcher 事件触发机制详见 Watch 触发器。当 watch 监视的数据发生变化时,通知设置了该 watch 的 client,即 watcher。

    同样,其 watcher 是监听数据发送了某些变化,那就一定会有对应的事件类型,和状态类型。

    (1) 事件类型(znode节点相关):

    1. EventType.None :客户端连接成功
    2. EventType.NodeCreated :节点创建
    3. EventType.NodeDataChanged :节点变更
    4. EventType.NodeChildrenChanged :子节点变量
    5. EventType.NodeDeleted :节点删除

    (2) 状态类型(客户端状态):

    1. KeeperState.Disconnected :客户端连接断开
    2. KeeperState.SyncConnected :客户端连接成功
    3. KeeperState.AuthFailed :客户端认证失败
    4. KeeperState.Expired :客户端连接过期

    3.2 watcher 的特性

    Watcher 的特性:一次性、客户端串行执行、轻量。

    (1) 一次性

    对于 ZK 的 watcher,你只需要记住一点: zookeeper 有 watch事件,是一次性触发的,当 watch 监视的数据发生变化时,通知设置了该 watch 的 client,即 watcher,由于 zookeeper 的监控都是一次性的所以每次必须设置监控。

    (2) 客户端串行执行

    客户端 Watcher 回调的过程是一个串行同步的过程,这为我们保证了顺序,同时需要开发人员注意一点,千万不要因为一个 Watcher 的处理逻辑影响了整个客户端的 Watcher 回调。

    (3) 轻量

    Watched Event 是 Zookeeper 整个 Watcher 通知机制的最小通知单元,整个结构只包含三部分:通知状态、事件类型和节点路径。也就是说 Watcher 通知非常的简单,只会告诉客户端发生了事件而不会告知其具体内容,需要客户自己去进行获取,比如 NodeDataChanged 事件, Zookeeper 只会通知客户端指定节点的数据发生了变更,而不会直接提供具体的数据内容。

    我们通过一个示例,详细学习下 Watcher 的概念和其目的。

    3.3 Watcher 示例

    package com.github.binarylei.zookeeper.watcher;
    
    import java.io.Closeable;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.ZooKeeper;
    
    /**
     * @author: leigang
     * @version: 2018-04-05
     */
    public class ConnectionWatcher implements Watcher, Closeable {
        private static final int SESSION_TIMEOUT = 5000;
        /** 定义原子变量,用于记录watcher数 */
        private AtomicInteger seq = new AtomicInteger();
    
        protected ZooKeeper zk;
        private CountDownLatch connectedSignal = new CountDownLatch(1);
    
        public void connect(String host) {
            this.close();
            try {
                this.zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
                this.connectedSignal.await();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void process(WatchedEvent event) {
            if (event == null) {
                return;
            }
            // 连接状态
            KeeperState keeperState = event.getState();
            // 事件类型
            EventType eventType = event.getType();
            // 受影响的path
            String path = event.getPath();
            String logPrefix = "【Watcher-" + seq.incrementAndGet() + "】";
    
            if(keeperState == KeeperState.SyncConnected) {
                // 成功连接上服务器
                if (eventType == EventType.None) {
                    System.out.println(logPrefix + "成功连接上服务器");
                    this.connectedSignal.countDown();
                }
                // 节点创建
                else if (eventType == EventType.NodeCreated) {
                    System.out.println(logPrefix + "节点创建:" + path);
                }
                // 节点数据更新
                else if (eventType == EventType.NodeDataChanged) {
                    System.out.println(logPrefix + "节点数据更新:"+ path);
                }
                // 节点删除
                else if (eventType == EventType.NodeDeleted) {
                    System.out.println(logPrefix + "节点删除:" + path);
                }
            } else if (keeperState == KeeperState.Disconnected) {
                System.out.println(logPrefix + "连接断开");
            } else if (keeperState == KeeperState.AuthFailed) {
                System.out.println(logPrefix + "权限认证失败");
            } else if (keeperState == KeeperState.Expired) {
                System.out.println(logPrefix + "连接过期");
            }
        }
    
        // 关闭连接
        public void close() {
            if (this.zk != null) {
                try {
                    this.zk.close();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public ZooKeeper getZk() {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    }
    
    public class ConnectionWatcherTest {
    
        @Test
        public void test() throws KeeperException, InterruptedException {
            ConnectionWatcher zkWatcher = new ConnectionWatcher();
            zkWatcher.connect("127.0.0.1:2181");
    
            ZooKeeper zk = zkWatcher.getZk();
            zk.create("/date", String.valueOf(System.currentTimeMillis()).getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    
            //1. 注册watch事件
            zk.exists("/date", true);
            zk.setData("/date", String.valueOf(System.currentTimeMillis()).getBytes(), -1);
    
            //2. watch的触发是一次性的,要想再次触发必须重新注册
            zk.exists("/date", true);
            zk.setData("/date", String.valueOf(System.currentTimeMillis()).getBytes(), -1);
    
            zk.delete("/date", -1);
            zkWatcher.close();
        }
    }
    

    测试结果如下:

    【Watcher-1】成功连接上服务器
    【Watcher-2】节点数据更新:/date
    # 当第二个 zk.exists("/date", true); 注释后,【Watcher-3】就不会触发了
    【Watcher-3】节点数据更新:/date
    

    四、Zookeeper 安全机制

    ACL(Access Control List), Zookeeper 作为分布式协调框架,其内部存储的都是一些关乎分布式系统运行时状态的元数据,尤其是设计到分布式锁、 Master 选举和协调等应用场景。我们需要有效地保障 Zookeeper 中的数据安全,Zookeeper 提供一套完善的 ACL 权限控制机酮来保障数据的安全。

    Zookeeper 提供了三种模式。权限模式、授权对象、权限。

    (1) 权限模式: Scheme,开发人员最多使用的如下四种权限模式:

    1. IP :ip 模式通过 ip 地址粒度来进行控制权限,例如配置了 192.168.1.107 即表示权限控都针对这个 ip 地址的,同时也支持按网段分配,比如 192.168.1.*
    2. Diges :digest 是最常用的权限控制模式 ,也更符合我们对权限控制的认识,其类似于 "username: password" 形式的权限标识进行权限配置。zK 会对形成的权限标识先后进行俩次编码处理,分别是 SHA-1 加密算法、BASE64 编码。
    3. World :World 是一直最开放的权限控制模式。这种模式可以看做为特殊的 Digest。他仅仅是一个标识而已。
    4. Super :超级用户模式_在超级用户模式下可以对 ZK 任意进行操作。

    (2) 权限对象:指的是权限赋予的用户或者一个指定的实体,例如 ip 地址或机器等。在不的模式下,授权对象是不同的。这种模式和权限对象一一对应。

    (3) 权限:权限就是指那些通过权限检测后可以被允许执行的操作,在 ZK 中,对数据的操作权限分为以下五大类:

    CREATE、DELETE、READ、WRITE、ADMIN
    

    4.1 代码示例

    // 修改connect连接,在实例化ZooKeeper后,添加认证信息
    public class ZookeeperAuth implements Watcher, Closeable {
        public void connect(String host) {
            connect(host, null);
        }
    
        // 添加认证
        public void connect(String host, String password) {
            this.close();
            try {
                this.zk = new ZooKeeper(host, SESSION_TIMEOUT, this);
                if (password != null) {
                    this.zk.addAuthInfo("digest", password.getBytes());
                }
                this.connectedSignal.await();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    测试代码如下:

    public class ZookeeperAuthTest{
    
        @Test
        public void test1() throws KeeperException, InterruptedException { // (1)
            ZookeeperAuth zkAuth = new ZookeeperAuth();
            zkAuth.connect("127.0.0.1", "123456");
    
            ZooKeeper zk = zkAuth.getZk();
            zk.create("/testAuth", "test".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, CreateMode.PERSISTENT);
    
            zkAuth.close();
        }
    
        @Test
        public void test2() throws KeeperException, InterruptedException { // (2)
            ZookeeperAuth zkAuth = new ZookeeperAuth();
            zkAuth.connect("127.0.0.1:2181", "1234568");
    
            ZooKeeper zk = zkAuth.getZk();
            byte[] data = zk.getData("/testAuth", false, null);
            System.out.println(new String(data));
    
            zkAuth.close();
        }
    }
    

    (1) Client-1 使用密码 123456 创建一个连接,并创建一个 znode 节点 /testAuth,注意创建节点时权限使用 ZooDefs.Ids.CREATOR_ALL_ACL

    (2) Client-2 使用密码 1234568 创建一个连接,并打算修改节点 /testAuth ,结果出现权限不足的错误。

    org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /testAuth
    
    	at org.apache.zookeeper.KeeperException.create(KeeperException.java:117)
    	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
    	at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1611)
    	at org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1640)
    	at com.github.binarylei.zookeeper.auth.ZookeeperAuthTest.test2(ZookeeperAuthTest.java:33)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
    	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    
  • 相关阅读:
    深入浅出Mybatis系列(一)Mybatis入门
    LinkedList其实就那么一回事儿之源码分析
    深入浅出Mybatis系列(八)mapper映射文件配置之select、resultMap
    ArrayList其实就那么一回事儿之源码浅析
    springMVC 源码解读系列(一)初始化
    深入浅出Mybatis系列(三)配置详解之properties与environments(mybatis源码篇)
    深入浅出Mybatis系列(四)配置详解之typeAliases别名(mybatis源码篇)
    深入浅出Mybatis系列(六)objectFactory、plugins、mappers简介与配置
    深入浅出Mybatis系列(二)配置简介(mybatis源码篇)
    深入浅出Mybatis系列(七)mapper映射文件配置之insert、update、delete
  • 原文地址:https://www.cnblogs.com/binarylei/p/8727345.html
Copyright © 2011-2022 走看看