zoukankan      html  css  js  c++  java
  • ZooKeeper API

    ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合。通过这些原语言的组合使用,能够帮助我们解决更高层次的分布式问题,关于ZooKeeper的典型使用场景,请查看这个文章《ZooKeeper典型使用场景一览

    本文主要针对ZooKeeper提供的Java API,通过实际代码讲述如何使用API。

    1. package com.taobao.taokeeper.research.sample; 
    2.  
    3. import java.io.IOException; 
    4. import java.util.concurrent.CountDownLatch; 
    5.  
    6. import org.apache.zookeeper.CreateMode; 
    7. import org.apache.zookeeper.KeeperException; 
    8. import org.apache.zookeeper.WatchedEvent; 
    9. import org.apache.zookeeper.Watcher; 
    10. import org.apache.zookeeper.Watcher.Event.KeeperState; 
    11. import org.apache.zookeeper.ZooDefs.Ids; 
    12. import org.apache.zookeeper.ZooKeeper; 
    13.  
    14. import common.toolkit.java.util.ObjectUtil; 
    15.  
    16. /** 
    17.  * ZooKeeper Java Api 使用样例<br> 
    18.  * ZK Api Version: 3.4.3 
    19.  *  
    20.  * @author nileader/nileader@gmail.com 
    21.  */ 
    22. public class JavaApiSample implements Watcher { 
    23.  
    24.     private static final int SESSION_TIMEOUT = 10000
    25.     private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181"
    26.     private static final String ZK_PATH = "/nileader"
    27.     private ZooKeeper zk = null
    28.      
    29.     private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); 
    30.  
    31.     /** 
    32.      * 创建ZK连接 
    33.      * @param connectString  ZK服务器地址列表 
    34.      * @param sessionTimeout   Session超时时间 
    35.      */ 
    36.     public void createConnection( String connectString, int sessionTimeout ) { 
    37.         this.releaseConnection(); 
    38.         try { 
    39.             zk = new ZooKeeper( connectString, sessionTimeout, this ); 
    40.             connectedSemaphore.await(); 
    41.         } catch ( InterruptedException e ) { 
    42.             System.out.println( "连接创建失败,发生 InterruptedException" ); 
    43.             e.printStackTrace(); 
    44.         } catch ( IOException e ) { 
    45.             System.out.println( "连接创建失败,发生 IOException" ); 
    46.             e.printStackTrace(); 
    47.         } 
    48.     } 
    49.  
    50.     /** 
    51.      * 关闭ZK连接 
    52.      */ 
    53.     public void releaseConnection() { 
    54.         if ( !ObjectUtil.isBlank( this.zk ) ) { 
    55.             try { 
    56.                 this.zk.close(); 
    57.             } catch ( InterruptedException e ) { 
    58.                 // ignore 
    59.                 e.printStackTrace(); 
    60.             } 
    61.         } 
    62.     } 
    63.  
    64.     /** 
    65.      *  创建节点 
    66.      * @param path 节点path 
    67.      * @param data 初始数据内容 
    68.      * @return 
    69.      */ 
    70.     public boolean createPath( String path, String data ) { 
    71.         try { 
    72.             System.out.println( "节点创建成功, Path: " 
    73.                     + this.zk.create( path, // 
    74.                                               data.getBytes(), // 
    75.                                               Ids.OPEN_ACL_UNSAFE, // 
    76.                                               CreateMode.EPHEMERAL ) 
    77.                     + ", content: " + data ); 
    78.         } catch ( KeeperException e ) { 
    79.             System.out.println( "节点创建失败,发生KeeperException" ); 
    80.             e.printStackTrace(); 
    81.         } catch ( InterruptedException e ) { 
    82.             System.out.println( "节点创建失败,发生 InterruptedException" ); 
    83.             e.printStackTrace(); 
    84.         } 
    85.         return true
    86.     } 
    87.  
    88.     /** 
    89.      * 读取指定节点数据内容 
    90.      * @param path 节点path 
    91.      * @return 
    92.      */ 
    93.     public String readData( String path ) { 
    94.         try { 
    95.             System.out.println( "获取数据成功,path:" + path ); 
    96.             return new String( this.zk.getData( path, falsenull ) ); 
    97.         } catch ( KeeperException e ) { 
    98.             System.out.println( "读取数据失败,发生KeeperException,path: " + path  ); 
    99.             e.printStackTrace(); 
    100.             return ""
    101.         } catch ( InterruptedException e ) { 
    102.             System.out.println( "读取数据失败,发生 InterruptedException,path: " + path  ); 
    103.             e.printStackTrace(); 
    104.             return ""
    105.         } 
    106.     } 
    107.  
    108.     /** 
    109.      * 更新指定节点数据内容 
    110.      * @param path 节点path 
    111.      * @param data  数据内容 
    112.      * @return 
    113.      */ 
    114.     public boolean writeData( String path, String data ) { 
    115.         try { 
    116.             System.out.println( "更新数据成功,path:" + path + ", stat: " + 
    117.                                                         this.zk.setData( path, data.getBytes(), -1 ) ); 
    118.         } catch ( KeeperException e ) { 
    119.             System.out.println( "更新数据失败,发生KeeperException,path: " + path  ); 
    120.             e.printStackTrace(); 
    121.         } catch ( InterruptedException e ) { 
    122.             System.out.println( "更新数据失败,发生 InterruptedException,path: " + path  ); 
    123.             e.printStackTrace(); 
    124.         } 
    125.         return false
    126.     } 
    127.  
    128.     /** 
    129.      * 删除指定节点 
    130.      * @param path 节点path 
    131.      */ 
    132.     public void deleteNode( String path ) { 
    133.         try { 
    134.             this.zk.delete( path, -1 ); 
    135.             System.out.println( "删除节点成功,path:" + path ); 
    136.         } catch ( KeeperException e ) { 
    137.             System.out.println( "删除节点失败,发生KeeperException,path: " + path  ); 
    138.             e.printStackTrace(); 
    139.         } catch ( InterruptedException e ) { 
    140.             System.out.println( "删除节点失败,发生 InterruptedException,path: " + path  ); 
    141.             e.printStackTrace(); 
    142.         } 
    143.     } 
    144.  
    145.     public static void main( String[] args ) { 
    146.  
    147.         JavaApiSample sample = new JavaApiSample(); 
    148.         sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT ); 
    149.         if ( sample.createPath( ZK_PATH, "我是节点初始内容" ) ) { 
    150.             System.out.println(); 
    151.             System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + " " ); 
    152.             sample.writeData( ZK_PATH, "更新后的数据" ); 
    153.             System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + " " ); 
    154.             sample.deleteNode( ZK_PATH ); 
    155.         } 
    156.  
    157.         sample.releaseConnection(); 
    158.     } 
    159.  
    160.     /** 
    161.      * 收到来自Server的Watcher通知后的处理。 
    162.      */ 
    163.     @Override 
    164.     public void process( WatchedEvent event ) { 
    165.         System.out.println( "收到事件通知:" + event.getState() +" "  ); 
    166.         if ( KeeperState.SyncConnected == event.getState() ) { 
    167.             connectedSemaphore.countDown(); 
    168.         } 
    169.  
    170.     } 
    171.  

    输出结果:

    1. 收到事件通知:SyncConnected 
    2.  
    3. 节点创建成功, Path: /nileader, content: 我是节点初始内容 
    4.  
    5. 获取数据成功,path:/nileader 
    6. 数据内容: 我是节点初始内容 
    7.  
    8. 更新数据成功,path:/nileader, stat: 42950186407,42950186408,1350820182392,1350820182406,1,0,0,232029990722229433,18,0,42950186407 
    9.  
    10. 获取数据成功,path:/nileader 
    11. 数据内容: 更新后的数据 
    12.  
    13. 删除节点成功,path:/nileader
  • 相关阅读:
    Linux中配置Aria2 RPC Server
    Ubuntu无法进入Windows的NTFS分区
    Visualbox在UEFI模式下无法正常引导
    pacman安装软件包出现损坏
    Windows下禁用锁屏热键WinKey+L
    Linux中无权限使用sudo
    Windows 10 MBR转GPT
    oh-my-zsh的安装与基本配置
    Raspbian开启root账户
    xrandr: 命令行修改分辨率工具
  • 原文地址:https://www.cnblogs.com/cl1024cl/p/6205147.html
Copyright © 2011-2022 走看看