Curator极大简化了ZooKeeper的使用,增加了针对ZooKeeper集群中connection的管理。
节点的创建和删除
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; public class CuratorBase { static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181"; static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception { //重试策略:初试时间为10s,最大重试次数为20 RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20); //创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); //开启连接 cf.start(); //建立节点 指定节点类型(不加withMode默认为持久类型节点)、路径、数据内容 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes()); Thread.sleep(30000); //删除节点 cf.delete().guaranteed().deletingChildrenIfNeeded().forPath("/persistent"); cf.close(); } }
run as--java application
线程休眠30s后,执行节点删除操作
节点内容的修改
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; public class CuratorBase { static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181"; static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception { //重试策略:初试时间为10s,最大重试次数为20 RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20); //创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); //开启连接 cf.start(); //创建节点 cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p1","p1 value".getBytes()); //cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/persistent/p2","p2 value".getBytes()); //读取节点 String ret1 = new String(cf.getData().forPath("/persistent/p1")); System.out.println(ret1); //修改节点 cf.setData().forPath("/persistent/p1", "new p1 value".getBytes()); String ret2 = new String(cf.getData().forPath("/persistent/p1")); System.out.println(ret2); cf.close(); } }
Eclipse的console输出
Eclipse的ZooKeeper Explorer内容
节点操作的回调函数
节点的新增、修改、删除,都可以设置其回调函数。该回调函数可以输出服务器的状态码、服务器事件类型等内容。还可以加入一个线程池进行优化操作。在批量节点操作的时候,可以用线程池去规划callback,可以将很多的任务放到队列中,使用线程池中的线程将队列中的任务进行处理。线程池中线程的个数可以根据具体的机器配置而定。
下面代码中,节点的创建操作是一个异步的过程,不会阻塞主线程main的执行,代码中将主线程main休眠,子线程在执行完节点的创建操作后执行回调函数并输出相关内容。若不添加主线程休眠的代码,则主线程执行完代码后结束,此时节点创建的子线程还没有完成节点的创建,因main线程的结束子线程也结束,进而就不能完成节点创建和回调函数的执行。
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; public class CuratorBase { static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181"; static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception { //重试策略:初试时间为10s,最大重试次数为20 RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20); //创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); //开启连接 cf.start(); // 绑定回调函数 ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("线程为:" + Thread.currentThread().getName()); } }, pool) .forPath("/persistent/p2","p2 value".getBytes()); System.out.println("主线程:"+Thread.currentThread().getName()); Thread.sleep(Integer.MAX_VALUE); cf.close(); } }
Eclipse中console输出
ZooKeeper Explorer中内容
获取子节点
import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Stat; public class CuratorBase { static final String CONNECT_ADDR = "192.168.0.217:2181,192.168.0.218:2181,192.168.0.219:2181"; static final int SESSION_TIMEOUT = 35000;//会话超时时间,默认为60000,单位:ms static final int CONNECTION_TIMEOUT=60000;//连接超时时间,默认为15000,单位:ms public static void main(String[] args) throws Exception { //重试策略:初试时间为10s,最大重试次数为20 RetryPolicy retryPolicy = new ExponentialBackoffRetry(10000, 20); //创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_TIMEOUT) .retryPolicy(retryPolicy) .build(); //开启连接 cf.start(); // 绑定回调函数 ExecutorService pool = Executors.newCachedThreadPool(); cf.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework cf, CuratorEvent ce) throws Exception { System.out.println("code:" + ce.getResultCode()); System.out.println("type:" + ce.getType()); System.out.println("线程为:" + Thread.currentThread().getName()); } }, pool) .forPath("/persistent/p2","p2 value".getBytes()); System.out.println("主线程:"+Thread.currentThread().getName()); Thread.sleep(20000);//主线程休眠20s,等待节点创建完毕 // 读取子节点getChildren方法 和 判断节点是否存在checkExists方法 List<String> list = cf.getChildren().forPath("/persistent"); for(String p : list){ System.out.println(p); } Stat stat_p1 = cf.checkExists().forPath("/persistent/p1"); System.out.println(stat_p1); Stat stat_p2 = cf.checkExists().forPath("/persistent/p2"); System.out.println(stat_p2); cf.close(); } }
Eclipse的console输出
若上面代码将Thread.sleep(20000);删除,有时会出现下面的异常,原因是节点创建和main主线程的执行是异步的。