zoukankan      html  css  js  c++  java
  • Zookeeper客户端使用(使用Curator)

     

    Zookeeper客户端(使用Curator)

     

     

    三、使用curator客户端

    在pom.xml中加入依赖

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.12.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.11.0</version>
    </dependency>

    直接上代码:

      1 /**
      2  * Project Name:mk-project <br>
      3  * Package Name:com.suns.zookeeper.curator <br>
      4  *
      5  * @author mk <br>
      6  * Date:2018-10-31 14:03 <br>
      7  */
      8 
      9 package com.suns.zookeeper.curator;
     10 
     11 import org.apache.curator.RetryPolicy;
     12 import org.apache.curator.framework.CuratorFramework;
     13 import org.apache.curator.framework.CuratorFrameworkFactory;
     14 import org.apache.curator.framework.recipes.cache.*;
     15 import org.apache.curator.retry.ExponentialBackoffRetry;
     16 import org.apache.zookeeper.CreateMode;
     17 import org.apache.zookeeper.data.Stat;
     18 
     19 import java.util.concurrent.TimeUnit;
     20 
     21 /**
     22  * curator客户端使用
     23  *
     24  *  和原生zookeeper优点:
     25  *  1.使用api更方便,功能更丰富
     26  *  2.监听节点数据改变或者子节点变化,只需要订阅一次,便可以一直使用。而原生zookeeper的监听是一次性的,需要重复注册。
     27  *  3.链式编程
     28  *
     29  * Curator包含了几个包:
     30  * curator-framework:对zookeeper的底层api的一些封装
     31  * curator-client:提供一些客户端的操作,例如重试策略等
     32  * curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
     33  * ClassName: ZkCuratorTest <br>
     34  * Description:  <br>
     35  * @author mk
     36  * @Date 2018-10-31 14:03 <br>
     37  * @version
     38  */
     39 public class ZkCuratorTest {
     40 
     41     public static final String connect = "127.0.0.1:2181";
     42     private static CuratorFramework curatorFramework = null;
     43     private static String nodePath = "/curator1";
     44     private static String nodeChildPath = "/curator1/n1/n11/n111/n1111";
     45 
     46     public static void main(String[] args) throws Exception {
     47 
     48         //初始化
     49         init(connect,5000);
     50 
     51         //监听节点数据改变或者子节点变化,只需要订阅一次,便可以一直使用。而原生zookeeper的监听是一次性的,需要重复注册。
     52         listener(nodePath);
     53 
     54         //新增
     55         create(nodePath,"n1");
     56         //递归新增
     57         createRecursion(nodeChildPath,"n1");
     58 
     59         //查询
     60         query(nodePath);
     61 
     62         TimeUnit.SECONDS.sleep(2);
     63 
     64         //修改
     65         update(nodePath,"n11");
     66 
     67         //单个节点删除
     68 //        delete(nodePath);
     69         //递归删除
     70         deleteRecursion(nodePath);
     71 
     72     }
     73 
     74     private static void deleteRecursion(String path) throws Exception {
     75         Void aVoid = curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
     76         System.out.println("delete:"+"["+path+"],result:"+aVoid);
     77     }
     78 
     79     private static void delete(String path) throws Exception {
     80         Void aVoid = curatorFramework.delete().forPath(path);
     81         System.out.println("delete:"+"["+path+"],result:"+aVoid);
     82 
     83     }
     84 
     85     private static void update(String path, String data) throws Exception {
     86         Stat stat = curatorFramework.setData().forPath(path, data.getBytes());
     87         System.out.println("setData:"+"["+path+"],stat:"+stat);
     88 
     89     }
     90 
     91     private static void query(String path) throws Exception {
     92         byte[] bytes = curatorFramework.getData().forPath(path);
     93         System.out.println("query:"+"["+path+"],result:"+new String(bytes));
     94 
     95     }
     96 
     97     private static void createRecursion(String path,String data) throws Exception {
     98         String result = curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
     99         System.out.println("create:"+"["+path+"-->"+data+"],result:"+result);
    100 
    101     }
    102 
    103     private static void create(String path, String data) throws Exception {
    104 //        Stat stat = curatorFramework.checkExists().forPath(path);
    105 //        if(null != stat){
    106 //            System.out.println("节点["+path+"]已存在,不能新增");
    107 //            return;
    108 //        }
    109         String result = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
    110         System.out.println("create:"+"["+path+"-->"+data+"],result:"+result);
    111     }
    112 
    113     private static void listener(String path) throws Exception {
    114 
    115         //监听节点内容改变
    116         final NodeCache nodeCache = new NodeCache(curatorFramework, path);
    117         nodeCache.start();
    118         /*nodeCache.getListenable().addListener(new NodeCacheListener() {
    119             @Override
    120             public void nodeChanged() throws Exception {
    121                 System.out.println("节点内容发生变化----->"+nodeCache.getCurrentData());
    122             }
    123         });*/
    124 
    125         //使用lambda表达式-jdk1.8以上
    126         nodeCache.getListenable().addListener(()->{System.out.println("节点内容发生变化----->"+nodeCache.getCurrentData());});
    127 
    128 
    129         //监听子节点改变
    130         final PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
    131         pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    132       /*  pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    133             @Override
    134             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
    135                 switch (pathChildrenCacheEvent.getType() ){
    136                     case CHILD_ADDED:
    137                         System.out.println("新增子节点"+pathChildrenCache.getCurrentData());
    138                         break;
    139                     case CHILD_UPDATED:
    140                         System.out.println("更新子节点"+pathChildrenCache.getCurrentData());
    141                         break;
    142                     case CHILD_REMOVED:
    143                         System.out.println("删除子节点"+pathChildrenCache.getCurrentData());
    144                         break;
    145                     default:break;
    146                 }
    147             }
    148         });*/
    149         //使用lambda表达式-jdk1.8以上
    150         pathChildrenCache.getListenable().addListener((curatorFramework,pathChildrenCacheEvent)->
    151             {switch (pathChildrenCacheEvent.getType() ){
    152                 case CHILD_ADDED:
    153                     System.out.println("新增子节点"+pathChildrenCache.getCurrentData());
    154                     break;
    155                 case CHILD_UPDATED:
    156                     System.out.println("更新子节点"+pathChildrenCache.getCurrentData());
    157                     break;
    158                 case CHILD_REMOVED:
    159                     System.out.println("删除子节点"+pathChildrenCache.getCurrentData());
    160                     break;
    161                 default:break;
    162             }});
    163 
    164 
    165     }
    166 
    167     private static void init(String connect, int sessionTimeout) {
    168         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//重试策略,初始等待1s,重试3次
    169         //通过工厂获得CuratorFramework
    170         curatorFramework = CuratorFrameworkFactory.builder()
    171                 .connectString(connect).connectionTimeoutMs(sessionTimeout).retryPolicy(retryPolicy).build();
    172         curatorFramework.start();//开启连接
    173         System.out.println("curatorFramework start :" +connect);
    174     }
    175 
    176 }

    运行截图:



  • 相关阅读:
    02注册Github账户
    软件工程概论第一节
    01课堂测试
    第二阶段冲刺01
    在子类中,若要调用父类中被覆盖的方法,可以使用super关键字
    动手动脑
    springBoot学习 错误记录
    什么是Hadoop?什么是HDFS?
    springCloud当中Eureca sever当中Camden到底是什么?
    Eureca Server的Helloworld例子
  • 原文地址:https://www.cnblogs.com/lookupthesky/p/9883188.html
Copyright © 2011-2022 走看看