zoukankan      html  css  js  c++  java
  • 大数据学习06_zookeeper3_javaAPI操作

    这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端框架 Curator ,解决了很多 Zookeeper客户端非常底层的细节开发工作

    Curator包含了几个包:

    • curator-framework:对zookeeper的底层api的一些封装
    • curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式 计数器等

    先创建一个java工程,并且导入maven依赖。Maven依赖(使用curator的版本:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼 容性问题,很有可能导致节点操作失败)

     1 <dependencies>
     2         <dependency>
     3             <groupId>org.apache.curator</groupId>
     4             <artifactId>curator-framework</artifactId>
     5             <version>2.12.0</version>
     6         </dependency>
     7         <dependency>
     8             <groupId>org.apache.curator</groupId>
     9             <artifactId>curator-recipes</artifactId>
    10             <version>2.12.0</version>
    11         </dependency>
    12         <dependency>
    13             <groupId>com.google.collections</groupId>
    14             <artifactId>google-collections</artifactId>
    15             <version>1.0</version>
    16         </dependency>
    17         <dependency>
    18             <groupId>junit</groupId>
    19 
    20             <artifactId>junit</artifactId>
    21             <version>RELEASE</version>
    22         </dependency>
    23         <dependency>
    24             <groupId>org.slf4j</groupId>
    25             <artifactId>slf4j-simple</artifactId>
    26             <version>1.7.25</version>
    27         </dependency>
    28     </dependencies>
    29     <build>
    30         <plugins>
    31             <!-- java编译插件 -->
    32             <plugin>
    33                 <groupId>org.apache.maven.plugins</groupId>
    34                 <artifactId>maven-compiler-plugin</artifactId>
    35                 <version>3.2</version>
    36                 <configuration>
    37                     <source>1.8</source>
    38                     <target>1.8</target>
    39                     <encoding>UTF-8</encoding>
    40                 </configuration>
    41             </plugin>
    42         </plugins>
    43     </build>
    maven依赖

    如果改变了curator要手动重新reimport

     接下来进行API操作:

    • 创建永久节点
     1 @Test
     2 public void createNode() throws Exception {
     3    RetryPolicy retryPolicy = new  ExponentialBackoffRetry(1000, 1);
     4    //获取客户端对象
     5    CuratorFramework client =    
     6 CuratorFrameworkFactory.newClient("192.168.xxx.xxx:2181,192.168.xxx.xxx:2
     7 181,192.168.xxx.xxx:2181", 1000, 1000, retryPolicy);
     8     
     9   //调用start开启客户端操作
    10   client.start();
    11     
    12   //通过create来进行创建节点,并且需要指定节点类型
    13   client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTEN
    14 T).forPath("/hello3/world");
    15 client.close();
    16 }
    • 创建临时节点
     1 public void createNode2() throws Exception {
     2  RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
     3   CuratorFramework client =
     4 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",
     5 3000, 3000, retryPolicy);
     6 client.start();
     7 client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).
     8 forPath("/hello5/world");
     9 Thread.sleep(5000);
    10 client.close();
    11 }
    • 修改节点数据
     1 @Test
     2  public void nodeData() throws Exception {
     3  RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
     4  CuratorFramework client =
     5 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",
     6 3000, 3000, retryPolicy);
     7  client.start();
     8  client.setData().forPath("/hello5", "hello7".getBytes());
     9  client.close();
    10 }
    • 节点数据查询
     1 @Test
     2  public void updateNode() throws Exception {
     3 RetryPolicy retryPolicy = new  ExponentialBackoffRetry(3000, 1);
     4  CuratorFramework client =
     5 CuratorFrameworkFactory.newClient("node01:2181,node02:2181,node03:2181",
     6 3000, 3000, retryPolicy);
     7  client.start();
     8  byte[] forPath = client.getData().forPath("/hello3");
     9  System.out.println(new String(forPath));
    10  client.close();
    11  }
    • 节点watch机制

     1 @Test
     2     public void watchZnode() throws Exception {
     3         RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
     4         String connectstr="192.168.xx.xxx:2181,192.168.xx.xxx:2181,192.168.xx.xxx:2181";
     5         CuratorFramework client =
     6                 CuratorFrameworkFactory.newClient(connectstr, 8000, 8000, policy);
     7         client.start();
     8         // ExecutorService pool = Executors.newCachedThreadPool();
     9         //设置节点的cache
    10 
    11         TreeCache treeCache = new TreeCache(client, "/hello3");
    12         //设置监听器和处理过程
    13         treeCache.getListenable().addListener(new TreeCacheListener()
    14         {
    15             @Override
    16             public void childEvent(CuratorFramework client,
    17                                    TreeCacheEvent event) throws Exception {
    18                 ChildData data = event.getData();
    19                 if(data !=null){
    20                     switch (event.getType()) {
    21                         case NODE_ADDED:
    22                             System.out.println("NODE_ADDED : "+
    23                                     data.getPath() +" 数据:"+ new String(data.getData())+"监控到有新增节点");
    24                             break;
    25                         case NODE_REMOVED:
    26                             System.out.println("NODE_REMOVED : "+
    27                                     data.getPath() +" 数据:"+ new String(data.getData())+"监控到移除节点");
    28                             break;
    29                         case NODE_UPDATED:
    30                             System.out.println("NODE_UPDATED : "+
    31                                     data.getPath() +" 数据:"+ new String(data.getData())+"监控到有更新节点");
    32                             break;
    33 
    34                         default:
    35                             break;
    36 
    37                     }
    38                 }else{
    39                     System.out.println( "data is null : "+
    40                             event.getType());
    41                 }
    42             }
    43         });
    44         //开始监听
    45         treeCache.start();
    46         Thread.sleep(500000);
    47     }
  • 相关阅读:
    RSA 与 DSA
    atlassian
    Cygwin
    windows下编写的Shell脚本在Linux下运行错误的解决方法
    NSKeyValueObserving(KVO)
    UIBezierPath 的使用介绍
    Objective
    Objective-C 内存管理原则
    Mac OSX 快捷键&命令行总览
    浅析Objective-C字面量
  • 原文地址:https://www.cnblogs.com/g414056667/p/13568158.html
Copyright © 2011-2022 走看看