转自:http://www.aboutyun.com/thread-7332-1-1.html
1 package com.taobao.taokeeper.research.sample; 2 3 import java.io.IOException; 4 import java.util.concurrent.CountDownLatch; 5 6 import org.apache.zookeeper.CreateMode; 7 import org.apache.zookeeper.KeeperException; 8 import org.apache.zookeeper.WatchedEvent; 9 import org.apache.zookeeper.Watcher; 10 import org.apache.zookeeper.Watcher.Event.KeeperState; 11 import org.apache.zookeeper.ZooDefs.Ids; 12 import org.apache.zookeeper.ZooKeeper; 13 14 import common.toolkit.java.util.ObjectUtil; 15 16 /** 17 * ZooKeeper Java Api 使用样例<br> 18 * ZK Api Version: 3.4.3 19 * 20 * @author nileader/nileader@gmail.com 21 */ 22 public class JavaApiSample implements Watcher { 23 24 private static final int SESSION_TIMEOUT = 10000; 25 private static final String CONNECTION_STRING = "test.zookeeper.connection_string:2181"; 26 private static final String ZK_PATH = "/nileader"; 27 private ZooKeeper zk = null; 28 29 private CountDownLatch connectedSemaphore = new CountDownLatch( 1 ); 30 31 /** 32 * 创建ZK连接 33 * @param connectString ZK服务器地址列表 34 * @param sessionTimeout Session超时时间 35 */ 36 public void createConnection( String connectString, int sessionTimeout ) { 37 this.releaseConnection(); 38 try { 39 zk = new ZooKeeper( connectString, sessionTimeout, this ); 40 connectedSemaphore.await(); 41 } catch ( InterruptedException e ) { 42 System.out.println( "连接创建失败,发生 InterruptedException" ); 43 e.printStackTrace(); 44 } catch ( IOException e ) { 45 System.out.println( "连接创建失败,发生 IOException" ); 46 e.printStackTrace(); 47 } 48 } 49 50 /** 51 * 关闭ZK连接 52 */ 53 public void releaseConnection() { 54 if ( !ObjectUtil.isBlank( this.zk ) ) { 55 try { 56 this.zk.close(); 57 } catch ( InterruptedException e ) { 58 // ignore 59 e.printStackTrace(); 60 } 61 } 62 } 63 64 /** 65 * 创建节点 66 * @param path 节点path 67 * @param data 初始数据内容 68 * @return 69 */ 70 public boolean createPath( String path, String data ) { 71 try { 72 System.out.println( "节点创建成功, Path: " 73 + this.zk.create( path, // 74 data.getBytes(), // 75 Ids.OPEN_ACL_UNSAFE, // 76 CreateMode.EPHEMERAL ) 77 + ", content: " + data ); 78 } catch ( KeeperException e ) { 79 System.out.println( "节点创建失败,发生KeeperException" ); 80 e.printStackTrace(); 81 } catch ( InterruptedException e ) { 82 System.out.println( "节点创建失败,发生 InterruptedException" ); 83 e.printStackTrace(); 84 } 85 return true; 86 } 87 88 /** 89 * 读取指定节点数据内容 90 * @param path 节点path 91 * @return 92 */ 93 public String readData( String path ) { 94 try { 95 System.out.println( "获取数据成功,path:" + path ); 96 return new String( this.zk.getData( path, false, null ) ); 97 } catch ( KeeperException e ) { 98 System.out.println( "读取数据失败,发生KeeperException,path: " + path ); 99 e.printStackTrace(); 100 return ""; 101 } catch ( InterruptedException e ) { 102 System.out.println( "读取数据失败,发生 InterruptedException,path: " + path ); 103 e.printStackTrace(); 104 return ""; 105 } 106 } 107 108 /** 109 * 更新指定节点数据内容 110 * @param path 节点path 111 * @param data 数据内容 112 * @return 113 */ 114 public boolean writeData( String path, String data ) { 115 try { 116 System.out.println( "更新数据成功,path:" + path + ", stat: " + 117 this.zk.setData( path, data.getBytes(), -1 ) ); 118 } catch ( KeeperException e ) { 119 System.out.println( "更新数据失败,发生KeeperException,path: " + path ); 120 e.printStackTrace(); 121 } catch ( InterruptedException e ) { 122 System.out.println( "更新数据失败,发生 InterruptedException,path: " + path ); 123 e.printStackTrace(); 124 } 125 return false; 126 } 127 128 /** 129 * 删除指定节点 130 * @param path 节点path 131 */ 132 public void deleteNode( String path ) { 133 try { 134 this.zk.delete( path, -1 ); 135 System.out.println( "删除节点成功,path:" + path ); 136 } catch ( KeeperException e ) { 137 System.out.println( "删除节点失败,发生KeeperException,path: " + path ); 138 e.printStackTrace(); 139 } catch ( InterruptedException e ) { 140 System.out.println( "删除节点失败,发生 InterruptedException,path: " + path ); 141 e.printStackTrace(); 142 } 143 } 144 145 public static void main( String[] args ) { 146 147 JavaApiSample sample = new JavaApiSample(); 148 sample.createConnection( CONNECTION_STRING, SESSION_TIMEOUT ); 149 if ( sample.createPath( ZK_PATH, "我是节点初始内容" ) ) { 150 System.out.println(); 151 System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + " " ); 152 sample.writeData( ZK_PATH, "更新后的数据" ); 153 System.out.println( "数据内容: " + sample.readData( ZK_PATH ) + " " ); 154 sample.deleteNode( ZK_PATH ); 155 } 156 157 sample.releaseConnection(); 158 } 159 160 /** 161 * 收到来自Server的Watcher通知后的处理。 162 */ 163 @Override 164 public void process( WatchedEvent event ) { 165 System.out.println( "收到事件通知:" + event.getState() +" " ); 166 if ( KeeperState.SyncConnected == event.getState() ) { 167 connectedSemaphore.countDown(); 168 } 169 170 } 171 172 } 173 输出结果: 174 收到事件通知:SyncConnected 175 176 节点创建成功, Path: /nileader, content: 我是节点初始内容 177 178 获取数据成功,path:/nileader 179 数据内容: 我是节点初始内容 180 181 更新数据成功,path:/nileader, stat: 42950186407,42950186408,1350820182392,1350820182406,1,0,0,232029990722229433,18,0,42950186407 182 183 获取数据成功,path:/nileader 184 数据内容: 更新后的数据 185 186 删除节点成功,path:/nileader