zoukankan      html  css  js  c++  java
  • (原) 2.3 Curator使用

    本文为原创文章,转载请注明出处,谢谢

    Curator使用

    1、jar包引入,演示版本为2.6.0,非maven项目,可以下载jar包导入到项目中

           <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>2.6.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>2.6.0</version>
            </dependency> 

    2、RetryPolicy:重试机制

    • ExponentialBackoffRetry:每次重试会增加重试时间baseSleepTimeMs
      • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
      • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
        • baseSleepTimeMs:基本重试时间差
        • maxRetries:最大重试次数
        • maxSleepMs:最大重试时间
    • RetryNTimes
      • RetryNTimes(int n, int sleepMsBetweenRetries)
        • n:重试次数
        • sleepMsBetweenRetries:每次重试间隔时间
    • RetryUntilElapsed
      • RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
        • maxElapsedTimeMs:最大重试时间
        • sleepMsBetweenRetries:每次重试间隔时间
    • BoundedExponentialBackoffRetry、RetryOneTime、SleepingRetry

    3、创建Zookeeper连接

    • 传统方式

       示例:CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);

       API:  

    newClient(java.lang.String connectString, org.apache.curator.RetryPolicy retryPolicy)
    
    newClient(java.lang.String connectString, int sessionTimeoutMs, int connectionTimeoutMs, org.apache.curator.RetryPolicy retryPolicy)
      • connectString:Zookeeper服务器地址
      • retryPolicy:自定义重试机制
      • sessionTimeoutMs:session超时时间
      • connectionTimeoutMs:连接超时时间
    • 链式方式 
    curatorFramework = CuratorFrameworkFactory.builder()
                                    .connectString("192.168.117.128:2181")
                                    //.authorization() 设置访问权限 设置方法同原生API
                                    .sessionTimeoutMs(5000).connectionTimeoutMs(5000)
                                    .retryPolicy(retryPolicy).build();
    • 代码示例
        public void createSession() {
            //RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//基本重试间隔时间,重试次数(每次重试时间加长)
            //RetryPolicy retryPolicy = new RetryNTimes(5,1000);//重试次数,重试间隔时间
            RetryPolicy retryPolicy = new RetryUntilElapsed(5000,1000);//重试时间,重试间隔时间
            //curatorFramework = CuratorFrameworkFactory.newClient("192.168.117.128:2181",5000,5000,retryPolicy);
            curatorFramework = CuratorFrameworkFactory.builder()
                                    .connectString("192.168.117.128:2181")
                                    //.authorization() 设置访问权限 设置方法同原生API
                                    .sessionTimeoutMs(5000).connectionTimeoutMs(5000)
                                    .retryPolicy(retryPolicy).build();
            curatorFramework.start();
        }

    4、创建节点

    public void createNode() throws Exception {
            createSession();
            String path = curatorFramework.create()
                    .creatingParentsIfNeeded()//如果父节点没有自动创建
                    //.withACL()设置权限  权限创建同原生API
                    .withMode(CreateMode.PERSISTENT)//节点类型
                    .forPath("/note_curator/02", "02".getBytes());
            System.out.println("path:"+path);
        }

    节点类型、权限设置详见2.1Zookeeper原生API使用

    5、节点删除

      public void del() throws Exception {
            createSession();
            curatorFramework.delete()
                    .guaranteed()//保证机制,出错后后台删除 直到删除成功
                    .deletingChildrenIfNeeded()//删除当前节点下的所有节点,再删除自身
                    .forPath("/note_curator");
        }

    6、获取子节点

    public void getChildren() throws Exception {
            createSession();
            List<String> children = curatorFramework.getChildren().forPath("/note_curator");
            System.out.println(children);
    
        }

    7、获取节点信息

    public void getData() throws Exception {
            createSession();
            Stat stat = new Stat();
            byte[] u = curatorFramework.getData().storingStatIn(stat).forPath("/note_curator");
            System.out.println(new String(u));
            System.out.println(stat);
        }

    8、设置节点信息

    public void setData() throws Exception {
            createSession();
            curatorFramework.setData()
                    //.withVersion(1) 设置版本号 乐观锁概念
                    .forPath("/note_curator/01", "shengke0815".getBytes());
        }

    9、是否存在节点

    public void exists() throws Exception {
            createSession();
            Stat s = curatorFramework.checkExists().forPath("/note_curator");
            System.out.println(s);
        }

    10、设置节点信息回调

     ExecutorService executorService = Executors.newFixedThreadPool(5);//线程池
        @Test
        public void setDataAsync() throws Exception {
            createSession();
            curatorFramework.setData().inBackground(new BackgroundCallback() {//设置节点信息时回调方法
                @Override
                public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
    
                    System.out.println(curatorFramework.getZookeeperClient());
                    System.out.println(curatorEvent.getResultCode());
                    System.out.println(curatorEvent.getPath());
                    System.out.println(curatorEvent.getContext());
                }
            },"shangxiawen",executorService).forPath("/note_curator","sksujer0815".getBytes());
            Thread.sleep(Integer.MAX_VALUE);
        }

    API:

    inBackground(org.apache.curator.framework.api.BackgroundCallback backgroundCallback, java.lang.Object o, java.util.concurrent.Executor executor);
      • backgroundCallback:自定义BackgroundCallback
      •   o:上下文信息,回调方法中curatorEvent.getContext()可获取此信息
      •   executor:线程池

    11、监听节点改变事件

    public void nodeListen() throws Exception {
            createSession();
            final NodeCache cache = new NodeCache(curatorFramework,"/note_curator");
            cache.start();
            cache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println(new String(cache.getCurrentData().getData()));
                    System.out.println(cache.getCurrentData().getPath());
                }
            });
    
            Thread.sleep(Integer.MAX_VALUE);
    
        }

    12、监听子节点列表改变事件

    public void nodeClildrenListen() throws Exception {
            createSession();
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework,"/note_curator",true);
            cache.start();
            cache.getListenable().addListener(new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    switch (pathChildrenCacheEvent.getType()){
                        case CHILD_ADDED:
                            System.out.println("add children");
                            System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                            System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
                            break;
                        case CHILD_REMOVED:
                            System.out.println("remove children");
                            System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                            System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
                            break;
                        case CHILD_UPDATED:
                            System.out.println("update children");
                            System.out.println(new String(pathChildrenCacheEvent.getData().getData()));
                            System.out.println(new String(pathChildrenCacheEvent.getData().getPath()));
                            break;
                    }
                }
            });
    
            Thread.sleep(Integer.MAX_VALUE);
    
        }

    下一节:3.1 Zookeeper应用 - Master选举

  • 相关阅读:
    助你成功:十四个可以运用在任何领域的心理定律
    人生没有“如果…”,二十条经典领悟切莫错过!
    “潜意识”下的奇迹:成功说服的七条法则
    成就你一生的100句名言,改变从今天开始~
    命好不如习惯好!改造我们的习惯领域
    成功从现在开始:改变你一生的七句话
    职业经理人必须牢记六字原则:新、信、行、馨、型、星
    不要害怕自己与众不同:十个成功的秘诀
    价值百万:八个心态和十三个习惯成就成功者!
    代长珍(帮别人名字作诗)
  • 原文地址:https://www.cnblogs.com/shengkejava/p/5663259.html
Copyright © 2011-2022 走看看