zoukankan      html  css  js  c++  java
  • 使用curator框架简单操作zookeeper 学习笔记

    Curator 操作是zookeeper的优秀api(相对于原生api),满足大部分需求.而且是Fluent流式api风格.

    参考文献:https://www.jianshu.com/p/70151fc0ef5d 感谢分享,动手敲一遍留个印象

    curator-framework:对zookeeper的底层api的一些封装
    curator-client:提供一些客户端的操作,例如重试策略等
    curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等

    环境:JDK1.8 、maven 、启动三台虚拟机做分部署环境 、 curator-recipes 4.0.1 、zookeeper3.4.8

    maven 依赖:不依赖zookeeper会报错..

    <dependency>
      <groupId>org.apache.curator</groupId>
      <artifactId>curator-recipes</artifactId>
      <version>4.0.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.zookeeper</groupId>
      <artifactId>zookeeper</artifactId>
      <version>3.4.8</version>
    </dependency>
    测试增删改查:
    package com.zookeeper.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    
    import java.util.List;
    
    /**
     * Created by Administrator on 2018/6/25.
     * @see org.apache.zookeeper.CreateMode
     * PERSISTENT:持久化
      PERSISTENT_SEQUENTIAL:持久化并且带序列号
     EPHEMERAL:临时
     EPHEMERAL_SEQUENTIAL:临时并且带序列号
     */
    public class curatorRecipesDemo {
    
    
        final static String zookeeperAddress = "192.168.149.133:2181,192.168.149.135:2181,192.168.149.134:2181";
    
        public static void main(String[] args) throws Exception {
            CuratorFramework curatorClint =
                    CuratorFrameworkFactory.builder().
                            connectString(zookeeperAddress)//zkClint连接地址
                            .connectionTimeoutMs(2000)//连接超时时间
                            .sessionTimeoutMs(10000)//会话超时时间
                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                            //重试策略
                            .namespace("myZookeeperTest")
                            //命名空间,默认节点
                            .build();
    
            curatorClint.start();
    
           curatorClint.create().forPath("/path");//默认持久化节点,以斜杠开头
            System.out.println(curatorClint.getChildren().forPath("/"));
            curatorClint.create().withMode(CreateMode.EPHEMERAL)
                    .forPath("/secondPath","hello,word".getBytes());
            System.out.println("节点secondPath的数据"+new String(curatorClint.getData().forPath("/secondPath")));
            curatorClint.setData().forPath("/secondPath","hello,myWorld!".getBytes());
            System.out.println("节点secondPath的数据"+new String(curatorClint.getData().forPath("/secondPath")));
    
            curatorClint.create()
                    .creatingParentContainersIfNeeded()
                    .forPath("/secondPath/second2/second3");//递归创建
          List<String> list= curatorClint.getChildren().forPath("/secondPath");//查询节点的所有字节点
            System.out.println(list);
            curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath/second2");//递归删除
            System.out.println(curatorClint.checkExists().forPath("/secondPath/second2"));//判断节点是否存在
            System.out.println(curatorClint.checkExists().forPath("/secondPath/second2/second3"));//判断节点是否存在
            System.out.println(curatorClint.getChildren().forPath("/secondPath"));
            curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");
    
    
            //todo guaranteed()如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止
           //todo   Stat stat 对象包含版本id,事物id等信息
    
        }
    
    }

      创建的节点可以通过 zookeeper 安装下的bin目录 连接客户端 sh zkCli.sh   ls  /      分开斜杠命令进行查看(或./zkCli.sh -timeout 5000 -server 127.0.0.1:2181)

    监听watcer   api

    package com.zookeeper.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.*;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    import java.util.List;
    import java.util.Objects;
    
    /**
     * Created by Administrator on 2018/6/26.
     *
     * PathChildCache 监听一个节点下子节点的创建、删除、更新
     * NodeCache  监听一个节点的更新和创建事件
     * TreeCache  综合PatchChildCache和NodeCache的特性
     */
    public class WatcherDemo {
        final static String zookeeperAddress = "192.168.149.133:2181,192.168.149.135:2181,192.168.149.134:2181";
    
        public static void main(String[] args) throws Exception {
            CuratorFramework curatorClint =
                    CuratorFrameworkFactory.builder().
                            connectString(zookeeperAddress)//zkClint连接地址
                            .connectionTimeoutMs(2000)//连接超时时间
                            .sessionTimeoutMs(10000)//会话超时时间
                            .retryPolicy(new ExponentialBackoffRetry(1000, 3))
                            //重试策略
                            .namespace("myZookeeperTest")
                            //命名空间,默认节点
                            .build();
    
            curatorClint.start();
           List<String> list= curatorClint.getChildren().forPath("/");
           if(Objects.nonNull(list)){
               if( !list.contains("myWatch")){
                   curatorClint.delete().deletingChildrenIfNeeded().forPath("/myWatch");
               }
           }else {
               curatorClint.create().forPath("/myWatch");
           }
    
    
            PathChildrenCache pathChildrenCache= pathChildrenCache = new PathChildrenCache(curatorClint,"/myWatch",false);
            PathChildrenCacheListener pathChildrenCacheListener=new PathChildrenCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                    System.out.println("pathChildrenCacheListener::::->"+pathChildrenCacheEvent.getData());
                }
            };
            pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);//注册监听事件
            pathChildrenCache.start();
    
            NodeCache nodeCache=new NodeCache(curatorClint,"/myWatch",false);
            NodeCacheListener nodeCacheListener=new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    System.out.println("nodeCacheListener::::->:"+nodeCache.getCurrentData().getPath());
                }
            };
            nodeCache.getListenable().addListener(nodeCacheListener);
            nodeCache.start();
    
            TreeCache treeCache=new TreeCache(curatorClint,"/myWatch");
            TreeCacheListener treeCacheListener=new TreeCacheListener() {
                @Override
                public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
                    System.out.println("treeCacheListener::::->"+treeCacheEvent.getData());
                }
            };
            treeCache.getListenable().addListener(treeCacheListener);
            treeCache.start();
    
            curatorClint.create().forPath("/myWatch/child22","生个好孩子".getBytes());
            curatorClint.create().creatingParentContainersIfNeeded().forPath("/myWatch/child22/child22","生个好孩子".getBytes());
            curatorClint.setData().forPath("/myWatch/child222","生个好孩子aaaa".getBytes());
            System.in.read();//阻塞不然启动后clint就关掉了
        }
    
    }





  • 相关阅读:
    12.19手动 项目部署
    12.19 redis缓存
    12.19 redis缓存
    用压测模拟并发、并发处理(synchronized,redis分布式锁)
    12.19 异常捕获补充
    app提交版本更新的流程
    变量
    类型转换的判别
    本文档中使用的伪类型
    Callbacks
  • 原文地址:https://www.cnblogs.com/jinjian91/p/9226977.html
Copyright © 2011-2022 走看看