zoukankan      html  css  js  c++  java
  • zookeeper:Curator操作节点

    • 为什么使用Curator?
    1. Curator本身是Netflix公司开源的zookeeper客户端
    2. Curator 提供了各种应用场景实现封装
    3. curator-framework 提供了fluent风格api;
    4. curator-replice 提供了实现封装;
    • 引入依赖:
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.11.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.11.0</version>
    </dependency>
    • 创建会话连接
     1 package com.karat.cn.zookeeper.curator;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 import org.apache.curator.framework.CuratorFrameworkFactory;
     5 import org.apache.curator.retry.ExponentialBackoffRetry;
     6 
     7 /**
     8  * 创建会话连接
     9  * @author Administrator
    10  *
    11  */
    12 public class CuratorCreateSessionDemo {
    13     private final static String CONNECTSTRING="47.107.121.215:2181";
    14     
    15     public static void main(String args[]) {
    16         //创建会话连接的2种方式
    17         //正常的风格
    18         CuratorFramework curatorFramework1=CuratorFrameworkFactory.
    19                 newClient(CONNECTSTRING,5000,5000,
    20                         new ExponentialBackoffRetry(1000, 3));//重试机制
    21         curatorFramework1.start();
    22         //fluent风格
    23         CuratorFramework curatorFramework2=CuratorFrameworkFactory.builder().
    24                     connectString(CONNECTSTRING).
    25                     sessionTimeoutMs(5000).
    26                     retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    27         curatorFramework2.start();
    28         System.out.println("success");
    29         
    30     }
    31 }
    View Code
    • curator连接的重试策略
    ExponentialBackoffRetry()  衰减重试
    RetryNTimes 指定最大重试次数
    RetryOneTime 仅重试一次
    RetryUnitilElapsed 一直重试知道规定的时间
    • 节点操作
      1 package com.karat.cn.zookeeper.curator;
      2 
      3 import org.apache.curator.framework.CuratorFramework;
      4 import org.apache.curator.framework.CuratorFrameworkFactory;
      5 import org.apache.curator.framework.api.BackgroundCallback;
      6 import org.apache.curator.framework.api.CuratorEvent;
      7 import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
      8 import org.apache.curator.retry.ExponentialBackoffRetry;
      9 import org.apache.zookeeper.CreateMode;
     10 import org.apache.zookeeper.data.Stat;
     11 
     12 import java.util.Collection;
     13 import java.util.Collections;
     14 import java.util.concurrent.CountDownLatch;
     15 import java.util.concurrent.Executor;
     16 import java.util.concurrent.ExecutorService;
     17 import java.util.concurrent.Executors;
     18 
     19 /**
     20  * curator对节点的增删改查
     21  * @author Administrator
     22  *
     23  */
     24 public class CuratorOperatorDemo {
     25 
     26     public static void main(String[] args) throws InterruptedException {
     27         CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
     28         System.out.println("连接成功.........");
     29 
     30         //fluent风格api增删改查操作
     31         /**
     32          * 创建节点
     33          */
     34        /*try {
     35             String result=curatorFramework.create()
     36                     .creatingParentsIfNeeded()//创建父节点
     37                     .withMode(CreateMode.PERSISTENT)//持久节点:节点创建后,会一直存在,不会因客户端会话失效而删除;
     38                     .forPath("/curator/curator1/curator11","123".getBytes());
     39             System.out.println(result);
     40         } catch (Exception e) {
     41             e.printStackTrace();
     42         }*/
     43         /**
     44          * 删除节点
     45          */
     46         /*try {
     47             //默认情况下,version为-1
     48             curatorFramework.delete()//删除操作
     49             .deletingChildrenIfNeeded()//删除子节点
     50             .forPath("/node");
     51         } catch (Exception e) {
     52             e.printStackTrace();
     53         }*/
     54 
     55         /**
     56          * 查询
     57          */
     58         /*Stat stat=new Stat();
     59         try {
     60             byte[] bytes=curatorFramework
     61                     .getData()
     62                     .storingStatIn(stat)
     63                     .forPath("/curator/curator1/curator11");
     64             System.out.println(new String(bytes)+"-->stat:"+stat);
     65         } catch (Exception e) {
     66             e.printStackTrace();
     67         }*/
     68         /**
     69          * 更新
     70          */
     71         /*try {
     72             Stat stat=curatorFramework
     73                     .setData()
     74                     .forPath("/curator","lijing".getBytes());
     75             System.out.println(stat);
     76         } catch (Exception e) {
     77             e.printStackTrace();
     78         }*/
     79 
     80         /**
     81          * 异步操作
     82          */
     83         /*ExecutorService service= Executors.newFixedThreadPool(1);//线程池(创建节点的事件由线程池处理)
     84         CountDownLatch countDownLatch=new CountDownLatch(1);//计数器
     85         try {
     86             curatorFramework
     87             .create()
     88             .creatingParentsIfNeeded()
     89             .withMode(CreateMode.EPHEMERAL)
     90             .inBackground(new BackgroundCallback() {
     91                  @Override
     92                  public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
     93                      System.out.println(Thread.currentThread().getName()
     94                              +"->resultCode:"
     95                              +curatorEvent.getResultCode()+"->"//响应结果
     96                              +curatorEvent.getType());//当前节点操作类型
     97                      countDownLatch.countDown();
     98                  }
     99             },service)
    100             .forPath("/mic","123".getBytes());
    101         } catch (Exception e) {
    102             e.printStackTrace();
    103         }
    104         countDownLatch.await();//等待(让当前线程等待)
    105         service.shutdown();//关闭线程*/
    106         /**
    107          * 事务操作(curator独有的)
    108          */
    109         try {
    110             Collection<CuratorTransactionResult> resultCollections=curatorFramework
    111                     .inTransaction()//开启一个事务
    112                     .create()
    113                     .forPath("/trans","111".getBytes())//创建一个节点
    114                     .and()//通过and去修改一个节点
    115                     .setData()
    116                     .forPath("/curator","111".getBytes())//当修改节点不存在,则一成功一失败,事务不会提交成功
    117                     .and()
    118                     .commit();//提交事务
    119             for (CuratorTransactionResult result:resultCollections){
    120                 System.out.println(result.getForPath()+"->"+result.getType());
    121             }
    122         } catch (Exception e) {
    123             e.printStackTrace();
    124         }
    125     }
    126 }
    View Code
    • 将会话连接做成工具类
     1 package com.karat.cn.zookeeper.curator;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 import org.apache.curator.framework.CuratorFrameworkFactory;
     5 import org.apache.curator.retry.ExponentialBackoffRetry;
     6 
     7 /**
     8  * 会话连接工具类
     9  * @author Administrator
    10  *
    11  */
    12 public class CuratorClientUtils {
    13 
    14     private static CuratorFramework curatorFramework;
    15     
    16     private final static String CONNECTSTRING="47.107.121.215:2181";
    17 
    18 
    19     public static CuratorFramework getInstance(){
    20         curatorFramework= CuratorFrameworkFactory.
    21                 newClient(CONNECTSTRING,5000,5000,
    22                         new ExponentialBackoffRetry(1000,3));
    23         curatorFramework.start();
    24         return curatorFramework;
    25     }
    26 }
    View Code
    • 监听
     1 package com.karat.cn.zookeeper.curator;
     2 
     3 import org.apache.curator.framework.CuratorFramework;
     4 import org.apache.curator.framework.recipes.cache.NodeCache;
     5 import org.apache.curator.framework.recipes.cache.PathChildrenCache;
     6 import org.apache.zookeeper.CreateMode;
     7 
     8 import java.util.concurrent.TimeUnit;
     9 
    10 /**
    11  * 监听
    12  * @author Administrator
    13  *
    14  */
    15 public class CuratorEventDemo {
    16 
    17     /**
    18      * 三种watcher来做节点的监听
    19      * pathcache   监视一个路径下子节点的创建、删除、节点数据更新
    20      * NodeCache   监视一个节点的创建、更新、删除
    21      * TreeCache   pathcaceh+nodecache 的合体(监视路径下的创建、更新、删除事件),
    22      * 缓存路径下的所有子节点的数据
    23      */
    24 
    25     public static void main(String[] args) throws Exception {
    26         CuratorFramework curatorFramework=CuratorClientUtils.getInstance();
    27         /**
    28          * 节点变化NodeCache
    29          */
    30         //监听
    31         /*NodeCache cache=new NodeCache(curatorFramework,"/curator",false);
    32         cache.start(true);
    33         //监听事件
    34         cache.getListenable().addListener(()-> System.out.println("节点数据发生变化,变化后的结果" +
    35                 ":"+new String(cache.getCurrentData().getData())));
    36         //修改节点
    37         curatorFramework.setData().forPath("/curator","666".getBytes());*/
    38 
    39 
    40         /**
    41          * PatchChildrenCache
    42          */
    43 
    44         PathChildrenCache cache=new PathChildrenCache(curatorFramework,"/event",true);//参数2监听的节点,参数3是否缓存
    45         cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
    46         // Normal/ BUILD_INITIAL_CACHE /POST_INITIALIZED_EVENT
    47 
    48         cache.getListenable().addListener((curatorFramework1,pathChildrenCacheEvent)->{
    49             switch (pathChildrenCacheEvent.getType()){
    50                 case CHILD_ADDED:
    51                     System.out.println("增加子节点");
    52                     break;
    53                 case CHILD_REMOVED:
    54                     System.out.println("删除子节点");
    55                     break;
    56                 case CHILD_UPDATED:
    57                     System.out.println("更新子节点");
    58                     break;
    59                 default:break;
    60             }
    61         });
    62         //创建节点
    63         curatorFramework.create().withMode(CreateMode.PERSISTENT).forPath("/event","event".getBytes());
    64         TimeUnit.SECONDS.sleep(1);
    65         System.out.println("1");
    66         //创建子节点
    67         curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath("/event/event1","1".getBytes());
    68         TimeUnit.SECONDS.sleep(1);
    69         System.out.println("2");
    70         //修改节点
    71         curatorFramework.setData().forPath("/event/event1","222".getBytes());
    72         TimeUnit.SECONDS.sleep(1);
    73         System.out.println("3");
    74         //删除节点
    75         curatorFramework.delete().forPath("/event/event1");
    76         System.out.println("4");
    77          
    78         System.in.read();
    79     }
    80 }
    View Code
  • 相关阅读:
    flask多线程多协程操作
    flask介绍
    centos django+Nginx+uwsgi部署
    centos下运行python3.6+Django+mysql项目
    centos虚拟机下安装nginx
    redis安装
    路飞学城课程_课程详细_作业点评
    redis使用方式
    git命令学习
    组合&多态&封装
  • 原文地址:https://www.cnblogs.com/LJing21/p/10543817.html
Copyright © 2011-2022 走看看