zoukankan      html  css  js  c++  java
  • Curator使用:(一)如何开始使用及api介绍(创建会话以及增删查改)

    前言##

    记录下ZK客户端的使用学习,初步想法是从几个方面来记录

    1. 如何开始使用及api介绍(创建会话以及增删查改)
    2. 异步调用
    3. 事件监听
    4. Master选举
    5. 分布式锁
    6. 计数器
    7. Barrier

    仓库介绍##

    GroupID/Org ArtifactID/Name Description
    org.apache.curator curator-recipes All of the recipes. Note: this artifact has dependencies on client and framework and, so, Maven (or whatever tool you're using) should pull those in automatically.
    org.apache.curator curator-async Asynchronous DSL with O/R modeling, migrations and many other features.
    org.apache.curator curator-framework The Curator Framework high level API. This is built on top of the client and should pull it in automatically.
    org.apache.curator curator-client The Curator Client - replacement for the ZooKeeper class in the ZK distribution.
    org.apache.curator curator-test Contains the TestingServer, the TestingCluster and a few other tools useful for testing.
    org.apache.curator curator-examples Example usages of various Curator features.
    org.apache.curator curator-x-discovery A Service Discovery implementation built on the Curator Framework.
    org.apache.curator curator-x-discovery-server A RESTful server that can be used with Curator Discovery.

    版本说明##

    zk版本:

    curator版本:

          <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.2.0</version>
            </dependency>
     <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.2.0</version>
            </dependency>
    
    

    常用API介绍###

    1.创建会话

        CuratorFramework cc = CuratorFrameworkFactory.builder()
                    .connectString("ip:port")
                    .sessionTimeoutMs(2000)
                    .connectionTimeoutMs(5000)
                    .retryPolicy(new ExponentialBackoffRetry(1000,3))
                    .namespace("test")
                    .build();
        cc.start();    
    

    说一下retryPolicy,重连策略,建议用其中两种

        //重连3次,每次休息3秒
        new RetryNTimes(3,3000);
        //重连3次,每次休息大约是1秒
        new ExponentialBackoffRetry(1000,3);
        //初始化一个大概的等待时间1秒,然后开始重连,最多重连3次,每次最多休息2秒
        new ExponentialBackoffRetry(1000,3,2000);
    
        //计算通过这个初始化的大约时间,计算实际需要睡眠多久
        long sleepMs = baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)));
    
    

    namespace代表命名空间,注意的是,curator会自动创建。
    2.创建普通节点

        cc.create().forPath("/comm_msg_nd","ctx".getBytes());
        cc.create().forPath("/comm_no_msg_nd");
    



    需要注意的是,如果没有值,会默认把当前ip地址放进去

        return InetAddress.getLocalHost().getHostAddress().getBytes(); 
    

    3.创建临时节点

        cc.create().withMode(CreateMode.EPHEMERAL).forPath("/ephe_nd","ff".getBytes());
        //creatingParentsIfNeeded会自动创建父节点
        cc.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/ephe_nd","ff".getBytes());
    


    需要注意,只有叶节点可以做临时节点,所以叶节点的父节点必须是永久节点,也就是creatingParentsIfNeeded这个方法创建的父节点必须是永久节点

    4.删除节点

        //删除单独一个节点
        cc.delete().forPath("/comm_msg_nd");
        //删除多级节点
        cc.create().creatingParentsIfNeeded().forPath("/p1/p2/p3/multi_nd");
        cc.delete().deletingChildrenIfNeeded().forPath("/p1");
        //删除指定版本的节点
        cc.delete().withVersion(0).forPath("/comm_msg_nd");
        //保证删除,失败后继续执行
        cc.delete().guaranteed().forPath("/comm_msg_nd");
    
        //通过添加错误的回调方法来实现,错误后继续执行
        OperationAndData.ErrorCallback<String> errorCallback = null;
                if ( guaranteed )
                {
                    errorCallback = new OperationAndData.ErrorCallback<String>()
                    {
                        @Override
                        public void retriesExhausted(OperationAndData<String> operationAndData)
                        {
                            //将路径添加到错误处理中
                            client.getFailedDeleteManager().addFailedOperation(unfixedPath);
                        }
                    };
                }
                client.processBackgroundOperation(new OperationAndData<String>(this, path, backgrounding.getCallback(), errorCallback, backgrounding.getContext(), null), null);
        
        //然后FailedDeleteManager中通过调用这个来继续请求删除操作
        @Override
        protected void executeGuaranteedOperationInBackground(String path)
                throws Exception
        {
            client.delete().guaranteed().inBackground().forPath(path);
        }
    

    5.读数据

        Stat stat = new Stat();
        byte[] ctx = cc.getData().storingStatIn(stat).forPath("/comm_msg_nd");
        //获取节点内容为ctx
        System.out.println(new String(ctx));
        //获取该节点stat
        //78,78,1573789366124,1573789366124,0,0,0,0,3,0,78
        System.out.println(stat);
    

    6.更新数据

        Stat stat = new Stat();
        stat = cc.setData().forPath("/comm_msg_nd","new ctx".getBytes());
        System.out.println(stat);
    
  • 相关阅读:
    【Java】:Java当中为什么不能够直接用==比较String字符串
    Mybatis
    spring boot
    IDEA
    IDEA
    kafka集群partition分布原理分析(转发)
    kafka集群partition分布原理分析(转发)
    scons ------ 基本使用
    色彩管理中的Gamma值的理解
    SFUD ------ (Serial Flash Universal Driver) 串行 Flash 通用驱动库
  • 原文地址:https://www.cnblogs.com/june777/p/11865116.html
Copyright © 2011-2022 走看看