zoukankan      html  css  js  c++  java
  • ZooKeeper(二)Java API使用

    ZooKeeper官网提供了Java和C的API。 本文使用Java API来实现ZooKeeper的基本操作。

    前言

    下图中的Replicated Database是包含完整数据树(entire data tree)的内存数据库。ZooKeeper的节点都是存放在内存中,所以读写速度很快。更新日志被记录到了磁盘中,以便用于恢复数据。在更新内存中节点数据之前,会先序列化到磁盘中,然后才会加载到内存中。

    ZooKeeper中的每个节点存储的数据要被原子性的操作。也就是说读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。另外,每一个节点都拥有自己的ACL(访问控制列表),这个列表规定了用户的权限,即限定了特定用户对目标节点可以执行的操作。

    目录

    一、API引用

    二、API简介

    三、API详解

    1. 连接zooKeeper服务器

    2. 创建znode(zk节点)

    3. 删除znode节点

    4. 更新znode节点数据

    5. 判断znode是否存在

    6. 获取znode节点的子节点列表

    7. 获取znode节点数据

    四、代码示例

    一、API引用

    <!-- zookeeper -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>${zookeeper.version}</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    二、API简介

    ZooKeeper API共包含五个包:

    1. org.apache.zookeeper
    2. org.apache.zookeeper.data
    3. org.apache.zookeeper.server
    4. org.apache.zookeeper.server.quorum
    5. org.apache.zookeeper.server.upgrade

     其中org.apache.zookeeper,包含ZooKeeper类,他是我们编程时最常用的类文件。这个类是ZooKeeper客户端的主要类文件。如果要使用ZooKeeper服务,应用程序首先必须创建一个ZooKeeper实例,这时就需要使用此类。一旦客户端和ZooKeeper服务建立起了连接,ZooKeeper系统将会给次连接会话分配一个ID值,并且客户端将会周期性的向服务器端发送心跳来维持会话连接。只要连接有效,客户端就可以使用ZooKeeperAPI来做相应处理了。

    三、API详解

    1. 连接zooKeeper服务器

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

    参数说明:

    connectString zooKeeper server列表, 以逗号隔开. zooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接。
    sessionTimeout 指定连接的超时时间。
    watcher 一个 watcher 实例。

    注意,创建ZooKeeper对象时,只要对象完成初始化便立刻返回。建立连接是以异步的形式进行的,当连接成功建立后,会回调watcher的process方法。如果想要同步建立与server的连接,需要自己进一步封装。一般我们创建完Zookeeper之后,直接等着其连接好了之后,在watcher的process方法里面让主函数结束等待。

    2. 创建znode(zk节点)

    public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

    参数说明:

    path znode的路径,必须是绝对路径。
    data 与znode关联的数据。
    acl 指定权限信息, 如果不想指定权限, 可以传入Ids.OPEN_ACL_UNSAFE。
    createMode 指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可。

    createMode的可选的成员类型:

    PERSISTENT 创建后只要不删就永久存在.
    EPHEMERAL 会话结束年结点自动被删除,EPHEMERAL结点不允许有子节点。
    SEQUENTIAL 节点名末尾会自动追加一个10位数的单调递增的序号,同一个节点的所有子节点序号是单调递增的。
    PERSISTENT_SEQUENTIAL 结合PERSISTENT和SEQUENTIAL。
    EPHEMERAL_SEQUENTIAL 结合EPHEMERAL和SEQUENTIAL。

    znode中关联的数据不能超过1M。ZooKeeper的使命是分布式协作,而不是数据存储。

    3. 删除znode节点

    public void delete(final String path, int version)

    参数说明:

     path znode的路径,必须是绝对路径。
    version 指定要删除的数据的版本, 如果version和真实的版本不同, 删除操作将失败。指定version为-1则忽略版本检查。

    4. 更新znode节点数据

    public Stat setData(final String path, byte data[], int version)

    参数说明:

    path znode的路径,必须是绝对路径。
    data 要设置的到znode的数据,注意数据是直接清空原来的数据,然后写新的数据,且数据的大小不能超过1MB。
    version 指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败。 指定version为-1则忽略版本检查。

    5. 判断znode是否存在

    public Stat exists(final String path, Watcher watcher)

    参数说明:

    path znode的路径,必须是绝对路径。
    watcher 一个 watcher 实例。

     如果该node存在,则返回该node的状态信息,否则返回null。

    public Stat exists(String path, boolean watch)

    其余和上面都一样,就是boolean如果为true,表示要用watcher,这时的watcher是初始化zookeeper时候指定的watcher。

    6. 获取znode节点的子节点列表

    public List<String> getChildren(final String path, Watcher watcher)

    参数说明:

    path znode的路径,必须是绝对路径。
    watcher 一个 watcher 实例。

    如果有child存在,就返回child,如果没有的话,就会返回null。

     public List<String> getChildren(String path, boolean watch)

    其余和上面都一样,就是boolean如果为true,表示要用watcher,这时的watcher是初始化zookeeper时候指定的watcher。

    7. 获取znode节点数据 

    public byte[] getData(final String path, Watcher watcher, Stat stat)

    参数说明:

     path znode的路径,必须是绝对路径。
    watcher 用于指定是否监听path node的删除事件,以及数据更新事件,注意,不监听path node的创建事件,因为如果path node不存在,该方法将抛出KeeperException.NoNodeException异常。
    stat 是个传出参数,getData方法会将path node的状态信息设置到该参数中。是个传出参数,getData方法会将path node的状态信息设置到该参数中。
    public byte[] getData(String path, boolean watch, Stat stat)

    其余和上面都一样,就是boolean如果为true,表示要用watcher,这时的watcher是初始化zookeeper时候指定的watcher。

    四、代码示例

    public class ZooKeeperService {
        /**
         * Server列表, 以逗号分割, 例如: 192.168.3.4:2181,192.168.3.5:2181,192.168.3.6:2181
         */
        private final String hosts;
        /**
         * 连接的超时时间, 毫秒
         */
        private int timeoutMills;
    
        private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    
        private ZooKeeper zk;
        /**
         * watcher实例
         */
        private Watcher connWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("conn watched event >>> " + event.toString());
                // 连接建立, 回调process接口时, 其event.getState()为KeeperState.SyncConnected
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    // 放开闸门, wait在connect方法上的线程将被唤醒
                    connectedSemaphore.countDown();
                }
            }
        };
    
        private Watcher existsWatcher = new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("exists watched event >>> " + event.toString());
                System.out.println("----------------------------------------------");
                System.out.println("path : " + event.getPath() + ", type : " + event.getType() + ", state : " + event.getState());
                try {
                    System.out.println("数据: " + watchData(event.getPath()));
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("----------------------------------------------");
            }
        };
    
        public ZooKeeperService(String hosts, int timeoutMills) throws IOException, InterruptedException {
            this.hosts = hosts;
            this.timeoutMills = timeoutMills;
            // 连接ZooKeeper服务器
            connect();
        }
    
        /**
         * 连接ZooKeeper Server
         * <p>
         * 创建ZooKeeper对象时, 只要对象完成初始化便立刻返回.
         * 建立连接是以异步的形式进行的, 当连接成功建立后, 会回调Watcher的process方法.
         * 如果想要同步建立与server的连接, 需要自己进一步封装.
         */
        public void connect() throws IOException, InterruptedException {
            zk = new ZooKeeper(hosts, timeoutMills, connWatcher);
            // 等待连接完成
            connectedSemaphore.await();
        }
    
        /**
         * 关闭连接
         */
        public void disconnect() throws InterruptedException {
            if (zk != null) {
                zk.close();
            }
        }
    
        /**
         * 创建节点
         *
         * @param path 节点路径
         * @param data 初始数据
         * @return 节点的路径
         */
        public boolean createPath(String path, String data) throws KeeperException, InterruptedException {
            return zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) != null;
        }
    
        /**
         * 读取指定节点数据
         *
         * @param path 节点路径
         * @return 节点数据
         */
        public String readData(String path) throws KeeperException, InterruptedException {
            return new String(zk.getData(path, false, null));
        }
    
        /**
         * 监听指定节点数据
         *
         * @param path 节点路径
         * @return 节点数据
         */
        public String watchData(String path) throws KeeperException, InterruptedException {
            Stat stat = zk.exists(path, existsWatcher);
            return new String(zk.getData(path, existsWatcher, stat));
        }
    
        /**
         * 更新指定节点数据
         *
         * @param path 节点路径
         * @param data 数据
         * @return 是否更新成功
         */
        public boolean writeData(String path, String data) throws KeeperException, InterruptedException {
            return zk.setData(path, data.getBytes(), -1) != null;
        }
    
        /**
         * 删除节点
         *
         * @param path 节点路径
         */
        public void deletePath(String path) throws KeeperException, InterruptedException {
            zk.delete(path, -1);
        }
    
        public static void main(String[] args) throws Exception {
            String hosts = "192.168.3.4:2181,192.168.3.5:2181,192.168.3.6:2181";
            String path = "/header";
            int timeoutMills = 3000;
            ZooKeeperService service = new ZooKeeperService(hosts, timeoutMills);
            if (service.createPath(path, "I'm the initial data.")) {
                System.out.println("数据: " + service.readData(path));
                service.writeData(path, "I'm the updated data.");
                System.out.println("数据: " + service.readData(path));
                service.deletePath(path);
            }
            service.disconnect();
        }
    
        public static void mainWatch(String[] args) throws Exception {
            String hosts = "192.168.3.4:2181,192.168.3.5:2181,192.168.3.6:2181";
            String path = "/header";
            int timeoutMills = 3000;
            ZooKeeperService service = new ZooKeeperService(hosts, timeoutMills);
            System.out.println(service.watchData(path));
            System.in.read();
        }
    }
  • 相关阅读:
    如何用ps简单快速扣头发丝
    thinkphp在iis上不是出现500错误
    PHP数组函数详解大全
    一个搜图的好网站
    手把手编写PHP MVC框架实例教程
    centos配置epel和remi源
    CentOS 7 yum 安装php5.6
    20162322 朱娅霖 作业005&006 栈,队列
    2017-2018-1 bug终结者 团队博客002
    2017-2018-1 bug终结者 团队博客001
  • 原文地址:https://www.cnblogs.com/warehouse/p/9362932.html
Copyright © 2011-2022 走看看