zoukankan      html  css  js  c++  java
  • zookeeper curator CRUD

    Curator客户端的基本操作

    疯狂创客圈 Java 分布式聊天室【 亿级流量】实战系列之 -24【 博客园 总入口


    写在前面

    ​ 大家好,我是作者尼恩。目前和几个小伙伴一起,组织了一个高并发的实战社群【疯狂创客圈】。正在开始高并发、亿级流程的 IM 聊天程序 学习和实战

    ​ 前面,已经完成一个高性能的 Java 聊天程序的四件大事:

    接下来,需要进入到分布式开发的环节了。 分布式的中间件,疯狂创客圈的小伙伴们,一致的选择了zookeeper,不仅仅是由于其在大数据领域,太有名了。更重要的是,很多的著名框架,都使用了zk。

    本篇介绍 Curator客户端的基本操作

    1.1.1. Curator客户端的依赖包

    打开Curator的官网,我们可以看到,Curator包含了以下几个包:

    curator-framework:对zookeeper的底层api的一些封装;

    curator-client:提供一些客户端的操作,例如重试策略等;

    curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。

    Maven依赖(使用curator的版本:4.0.0,对应Zookeeper的版本为:3.4.x,如果版本不匹配,就会有兼容性问题,很有可能导致节点操作失败。具体的版本对应关系,可以去curator的官网查看。

       <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>4.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.zookeeper</groupId>
                        <artifactId>zookeeper</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    

    1.1.2. Curator 创建会话

    使用 curator-framework 包中的工厂类CuratorFrameworkFactory中的静态方法newClient,来创建客户端会话。

    代码如下:

    /**
     * create by 尼恩 @ 疯狂创客圈
     **/
    public class ClientFactory {
    
        /**
         * @param connectionString zk的连接地址
         * @return CuratorFramework 实例
         */
        public static CuratorFramework createSimple(String connectionString) {
            // 重试策略:第一次重试等待1s,第二次重试等待2s,第三次重试等待4s
            // 第一个参数:等待时间的基础单位,单位为毫秒
            // 第二个参数:最大重试次数
            ExponentialBackoffRetry retryPolicy =
                    new ExponentialBackoffRetry(1000, 3);
    
            // 获取 CuratorFramework 实例的最简单的方式
            // 第一个参数:zk的连接地址
            // 第二个参数:重试策略
            return CuratorFrameworkFactory.newClient(connectionString, retryPolicy);
        }
    
        /**
         * @param connectionString    zk的连接地址
         * @param retryPolicy         重试策略
         * @param connectionTimeoutMs 连接
         * @param sessionTimeoutMs
         * @return CuratorFramework 实例
         */
        public static CuratorFramework createWithOptions(
                String connectionString, RetryPolicy retryPolicy,
                int connectionTimeoutMs, int sessionTimeoutMs) {
    
            // builder 模式创建 CuratorFramework 实例
            return CuratorFrameworkFactory.builder()
                    .connectString(connectionString)
                    .retryPolicy(retryPolicy)
                    .connectionTimeoutMs(connectionTimeoutMs)
                    .sessionTimeoutMs(sessionTimeoutMs)
                    // 其他的创建选项
                    .build();
        }
    }
    
    

    这里用到两个版本,前一个是简化版本,只需要设置ZK集群的连接地址和重试策略。

    后一个是相对复杂的重载版本,可以设置连接超时connectionTimeoutMs、会话超时sessionTimeoutMs 等其他的会话创建选项。

    具体请看疯狂创客圈的Demo源码。

    1.1.3. CRUD 之 Create 创建节点

    使用create()方法,最后使用forPath带上需要创建的节点路径。

        /**
         * 创建节点
         */
        @Test
        public void createNode() {
            //创建客户端
            CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
            try {
                //启动客户端实例,连接服务器
                client.start();
    
                // 创建一个 ZNode 节点
                // 节点的数据为 payload
    
                String data = "hello";
                byte[] payload = data.getBytes("UTF-8");
                String zkPath = "/test/CRUD/node-1";
                client.create()
                        .creatingParentsIfNeeded()
                        .withMode(CreateMode.PERSISTENT)
                        .forPath(zkPath, payload);
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    
    
    

    使用withMode()方法,设置节点的类型。zookeeper节点有四种类型:

    (1)PERSISTENT 持久节点

    (2)PERSISTENT_SEQUENTIAL 持久顺序节点

    (3)PHEMERAL 临时节

    (4)EPHEMERAL_SEQUENTIAL 临时顺序节点

    下面详细介绍一下四种节点的区别和联系。

    (1)持久节点(PERSISTENT)

    所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点。持久节点的生命周期是永久有效,不会因为创建该节点的客户端会话失效而消失。

    (2)持久顺序节点(PERSISTENT_SEQUENTIAL)

    这类节点的生命周期和持久节点是一致的。额外的特性是,在ZK中,每个父节点会为他的第一级子节点维护一份次序,会记录每个子节点创建的先后顺序。如果在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK会自动为给定节点名加上一个表示次序的数字后缀,作为新的节点名。这个次序后缀的范围是整型的最大值。

    比如,在创建节点的时候只需要传入节点 “/test_”,这样之后,zookeeper自动会给”test_”后面补充数字次序。

    (3)临时节点(EPHEMERAL)

    和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。这里还要注意一件事,就是当你客户端会话失效后,所产生的节点也不是一下子就消失了,也要过一段时间,大概是10秒以内,可以试一下,本机操作生成节点,在服务器端用命令来查看当前的节点数目,你会发现客户端已经stop,但是产生的节点还在。

    另外,在临时节点下面不能创建子节点。

    (4)临时顺序节点(EPHEMERAL_SEQUENTIAL)

    此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。

    1.1.4. CRUD 之Read获取节点

    与节点读取的有关的方法,主要有三个:

    (1)首先是判断节点是否存在,使用checkExists方法。

    (2)其次是获取节点的数据,使用getData方法。

    (3)最后是获取子节点列表,使用getChildren方法。

    演示代码如下:

     /**
         * 读取节点
         */
        @Test
        public void readNode() {
            //创建客户端
            CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
            try {
                //启动客户端实例,连接服务器
                client.start();
    
                String zkPath = "/test/CRUD/node-1";
    
    
                Stat stat = client.checkExists().forPath(zkPath);
                if (null != stat) {
                    //读取节点的数据
                    byte[] payload = client.getData().forPath(zkPath);
                    String data = new String(payload, "UTF-8");
                    log.info("read data:", data);
    
                    String parentPath = "/test";
                    List<String> children = client.getChildren().forPath(parentPath);
    
                    for (String child : children) {
                        log.info("child:", child);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    
    

    1.1.5. CRUD 之update更新节点

    节点的更新,分为同步更新与异步更新。

        /**
         * 更新节点
         */
        @Test
        public void updateNode() {
            //创建客户端
            CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
            try {
                //启动客户端实例,连接服务器
                client.start();
    
    
                String data = "hello world";
                byte[] payload = data.getBytes("UTF-8");
                String zkPath = "/test/node-1";
                client.setData()
                        .forPath(zkPath, payload);
    
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    
    

    异步更新,需要用到inBackground()方法,其作用是,让更新操作异步执行。如果需要监听到异步操作的结果,需要为inBackground加上AsyncCallback回调实例。

    异步更新的代码如下:

        /**
         * 更新节点 - 异步模式
         */
        @Test
        public void updateNodeAsync() {
            //创建客户端
            CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
            try {
    
                //更新完成监听器
                AsyncCallback.StringCallback callback = new AsyncCallback.StringCallback() {
    
                    @Override
                    public void processResult(int i, String s, Object o, String s1) {
                        System.out.println(
                                "i = " + i + " | " +
                                        "s = " + s + " | " +
                                        "o = " + o + " | " +
                                        "s1 = " + s1
                        );
                    }
                };
                //启动客户端实例,连接服务器
                client.start();
    
                String data = "hello ,every body! ";
                byte[] payload = data.getBytes("UTF-8");
                String zkPath = "/test/CRUD/node-1";
                client.setData()
                        .inBackground(callback)
                        .forPath(zkPath, payload);
    
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    

    1.1.6. CRUD 之delete删除节点

    删除节点,使用delete 方法,代码如下。

      /**
         * 删除节点
         */
        @Test
        public void deleteNode() {
            //创建客户端
            CuratorFramework client = ClientFactory.createSimple(ZK_ADDRESS);
            try {
                //启动客户端实例,连接服务器
                client.start();
    
                //删除节点
                String zkPath = "/test/CRUD/node-1";
                client.delete().forPath(zkPath);
    
    
                //删除后查看结果
                String parentPath = "/test";
                List<String> children = client.getChildren().forPath(parentPath);
    
                for (String child : children) {
                    log.info("child:", child);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                CloseableUtils.closeQuietly(client);
            }
        }
    
    

    和更新一样,也可以进行异步删除,同样需要用到inBackground()方法。如果需要监听异步操作的结果,需要为inBackground方法加上一个参数:AsyncCallback回调实例。

    写在最后

    ​ 下一篇:开启zk的客户端开发。


    疯狂创客圈 亿级流量 高并发IM 实战 系列

    • Java (Netty) 聊天程序【 亿级流量】实战 开源项目实战

    
    
  • 相关阅读:
    java线程池,工作窃取算法
    java线程池,阿里为什么不允许使用Executors?
    CMakeLists 的使用,大型工程使用cmake 的构件过程
    ieee文献免费下载办法
    欧式距离、标准化欧式距离、马氏距离、余弦距离
    sliding window:"Marginalization","Schur complement","First estimate jacobin"
    机器学习中的线性代数之矩阵求导
    opencv中滤波方法学习
    opencv关于Mat类中的Scalar()---颜色赋值
    C/C++预处理指令#define,#ifdef,#ifndef,#endif…
  • 原文地址:https://www.cnblogs.com/crazymakercircle/p/10226101.html
Copyright © 2011-2022 走看看