zoukankan      html  css  js  c++  java
  • Zookeeper客户端使用(使用Curator)

     

    Zookeeper客户端(使用Curator)

     

     

    三、使用curator客户端

    在pom.xml中加入依赖

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-framework</artifactId>
      <version>2.12.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>2.11.0</version>
    </dependency>

    直接上代码:

      1 /**
      2  * Project Name:mk-project <br>
      3  * Package Name:com.suns.zookeeper.curator <br>
      4  *
      5  * @author mk <br>
      6  * Date:2018-10-31 14:03 <br>
      7  */
      8 
      9 package com.suns.zookeeper.curator;
     10 
     11 import org.apache.curator.RetryPolicy;
     12 import org.apache.curator.framework.CuratorFramework;
     13 import org.apache.curator.framework.CuratorFrameworkFactory;
     14 import org.apache.curator.framework.recipes.cache.*;
     15 import org.apache.curator.retry.ExponentialBackoffRetry;
     16 import org.apache.zookeeper.CreateMode;
     17 import org.apache.zookeeper.data.Stat;
     18 
     19 import java.util.concurrent.TimeUnit;
     20 
     21 /**
     22  * curator客户端使用
     23  *
     24  *  和原生zookeeper优点:
     25  *  1.使用api更方便,功能更丰富
     26  *  2.监听节点数据改变或者子节点变化,只需要订阅一次,便可以一直使用。而原生zookeeper的监听是一次性的,需要重复注册。
     27  *  3.链式编程
     28  *
     29  * Curator包含了几个包:
     30  * curator-framework:对zookeeper的底层api的一些封装
     31  * curator-client:提供一些客户端的操作,例如重试策略等
     32  * curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等
     33  * ClassName: ZkCuratorTest <br>
     34  * Description:  <br>
     35  * @author mk
     36  * @Date 2018-10-31 14:03 <br>
     37  * @version
     38  */
     39 public class ZkCuratorTest {
     40 
     41     public static final String connect = "127.0.0.1:2181";
     42     private static CuratorFramework curatorFramework = null;
     43     private static String nodePath = "/curator1";
     44     private static String nodeChildPath = "/curator1/n1/n11/n111/n1111";
     45 
     46     public static void main(String[] args) throws Exception {
     47 
     48         //初始化
     49         init(connect,5000);
     50 
     51         //监听节点数据改变或者子节点变化,只需要订阅一次,便可以一直使用。而原生zookeeper的监听是一次性的,需要重复注册。
     52         listener(nodePath);
     53 
     54         //新增
     55         create(nodePath,"n1");
     56         //递归新增
     57         createRecursion(nodeChildPath,"n1");
     58 
     59         //查询
     60         query(nodePath);
     61 
     62         TimeUnit.SECONDS.sleep(2);
     63 
     64         //修改
     65         update(nodePath,"n11");
     66 
     67         //单个节点删除
     68 //        delete(nodePath);
     69         //递归删除
     70         deleteRecursion(nodePath);
     71 
     72     }
     73 
     74     private static void deleteRecursion(String path) throws Exception {
     75         Void aVoid = curatorFramework.delete().deletingChildrenIfNeeded().forPath(path);
     76         System.out.println("delete:"+"["+path+"],result:"+aVoid);
     77     }
     78 
     79     private static void delete(String path) throws Exception {
     80         Void aVoid = curatorFramework.delete().forPath(path);
     81         System.out.println("delete:"+"["+path+"],result:"+aVoid);
     82 
     83     }
     84 
     85     private static void update(String path, String data) throws Exception {
     86         Stat stat = curatorFramework.setData().forPath(path, data.getBytes());
     87         System.out.println("setData:"+"["+path+"],stat:"+stat);
     88 
     89     }
     90 
     91     private static void query(String path) throws Exception {
     92         byte[] bytes = curatorFramework.getData().forPath(path);
     93         System.out.println("query:"+"["+path+"],result:"+new String(bytes));
     94 
     95     }
     96 
     97     private static void createRecursion(String path,String data) throws Exception {
     98         String result = curatorFramework.create().creatingParentContainersIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
     99         System.out.println("create:"+"["+path+"-->"+data+"],result:"+result);
    100 
    101     }
    102 
    103     private static void create(String path, String data) throws Exception {
    104 //        Stat stat = curatorFramework.checkExists().forPath(path);
    105 //        if(null != stat){
    106 //            System.out.println("节点["+path+"]已存在,不能新增");
    107 //            return;
    108 //        }
    109         String result = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, data.getBytes());
    110         System.out.println("create:"+"["+path+"-->"+data+"],result:"+result);
    111     }
    112 
    113     private static void listener(String path) throws Exception {
    114 
    115         //监听节点内容改变
    116         final NodeCache nodeCache = new NodeCache(curatorFramework, path);
    117         nodeCache.start();
    118         /*nodeCache.getListenable().addListener(new NodeCacheListener() {
    119             @Override
    120             public void nodeChanged() throws Exception {
    121                 System.out.println("节点内容发生变化----->"+nodeCache.getCurrentData());
    122             }
    123         });*/
    124 
    125         //使用lambda表达式-jdk1.8以上
    126         nodeCache.getListenable().addListener(()->{System.out.println("节点内容发生变化----->"+nodeCache.getCurrentData());});
    127 
    128 
    129         //监听子节点改变
    130         final PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
    131         pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    132       /*  pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
    133             @Override
    134             public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
    135                 switch (pathChildrenCacheEvent.getType() ){
    136                     case CHILD_ADDED:
    137                         System.out.println("新增子节点"+pathChildrenCache.getCurrentData());
    138                         break;
    139                     case CHILD_UPDATED:
    140                         System.out.println("更新子节点"+pathChildrenCache.getCurrentData());
    141                         break;
    142                     case CHILD_REMOVED:
    143                         System.out.println("删除子节点"+pathChildrenCache.getCurrentData());
    144                         break;
    145                     default:break;
    146                 }
    147             }
    148         });*/
    149         //使用lambda表达式-jdk1.8以上
    150         pathChildrenCache.getListenable().addListener((curatorFramework,pathChildrenCacheEvent)->
    151             {switch (pathChildrenCacheEvent.getType() ){
    152                 case CHILD_ADDED:
    153                     System.out.println("新增子节点"+pathChildrenCache.getCurrentData());
    154                     break;
    155                 case CHILD_UPDATED:
    156                     System.out.println("更新子节点"+pathChildrenCache.getCurrentData());
    157                     break;
    158                 case CHILD_REMOVED:
    159                     System.out.println("删除子节点"+pathChildrenCache.getCurrentData());
    160                     break;
    161                 default:break;
    162             }});
    163 
    164 
    165     }
    166 
    167     private static void init(String connect, int sessionTimeout) {
    168         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);//重试策略,初始等待1s,重试3次
    169         //通过工厂获得CuratorFramework
    170         curatorFramework = CuratorFrameworkFactory.builder()
    171                 .connectString(connect).connectionTimeoutMs(sessionTimeout).retryPolicy(retryPolicy).build();
    172         curatorFramework.start();//开启连接
    173         System.out.println("curatorFramework start :" +connect);
    174     }
    175 
    176 }

    运行截图:



  • 相关阅读:
    vue使用elementui合并table
    使用layui框架导出table表为excel
    vue使用elementui框架,导出table表格为excel格式
    前台传数据给后台的几种方式
    uni.app图片同比例缩放
    我的博客
    【C语言】取16进制的每一位
    SharePoint Solution 是如何部署的呢 ???
    无效的数据被用来用作更新列表项 Invalid data has been used to update the list item. The field you are trying to update may be read only.
    SharePoint 判断用户在文件夹上是否有权限的方法
  • 原文地址:https://www.cnblogs.com/lookupthesky/p/9883188.html
Copyright © 2011-2022 走看看