打开zookeeper集群
先体会一下原生API有多麻烦(可略过):
//地址 static final String ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181"; //session超时时间 static final int SESSION_OUTTIME = 2000;//ms /** * 信号量,阻塞程序执行,用于等待zookeeper连接(异步的)成功,发送成功信号 */ static final CountDownLatch connectedSemaphore = new CountDownLatch(1); public static void main(String[] args) throws Exception { //创建zk客户端(异步连接) Watcher:监听连接 ZooKeeper zooKeeper = new ZooKeeper(ADDR, SESSION_OUTTIME, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { //获取事件的状态 Event.KeeperState keeperState = watchedEvent.getState(); Event.EventType eventType = watchedEvent.getType(); //如果是建立连接 if(Event.KeeperState.SyncConnected == keeperState){ if(Event.EventType.None == eventType){ //如果建立连接成功,则发送信号量,让后续阻塞程序向下执行 connectedSemaphore.countDown(); System.out.println("zk 建立连接"); } } } }); //进行阻塞,等待zk连接成功 connectedSemaphore.await(); //连接成功执行操作: //同步 路径(不允许递归创建), 数据 权限, 类型(PERSISTENT:持久) :临时节点只是一次session有效,用来实现分布式锁 String name = zooKeeper.create("/testRoot", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); String name1 = zooKeeper.create("/testRoot/child02", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //异步 回调函数 , 回调的参数 zooKeeper.create("/testRoot/child01", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { @Override //服务端响应吗: 0 成功 传入的path 参数 实际创建的path public void processResult(int i, String s, Object o, String s1) { System.out.println(i+" "+s+" "+o.toString()+" "+s1); }},"param"); //删除 (路径必须是叶子节点,即没有下一级节点,不支持递归) //zooKeeper.delete("/testRoot/child01", -1);//版本号:-1表示删除所有的,像git一样,修改一次版本号加一,可删除指定版本号 //同样支持异步,和创建一样传入回调 //判断节点是否存在 System.out.println(zooKeeper.exists("/testRoot/child01", false));//也可传入watch,异步回调等 //获取 byte[] data = zooKeeper.getData("/testRoot", false, null); System.out.println(new String(data)); System.out.println(zooKeeper.getChildren("/testRoot", false)); //修改 zooKeeper.setData("/testRoot", "modify data root".getBytes(), -1); byte[] data1 = zooKeeper.getData("/testRoot", false, null); System.out.println(new String(data1)); Thread.sleep(3000);//休眠防止异步 zooKeeper.close();//释放资源
zkclient:
/** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) { // 创建zkclient实例 ZkClient zkc=new ZkClient(new ZkConnection(CONNECT_ADDR,SESSION_OUTTIME),10000); //增 zkc.create("/test","测试zkclient",CreateMode.PERSISTENT);// 创建节点,可指定持久或者临时 zkc.create("/test/heyi","heyi",CreateMode.PERSISTENT);// 创建节点,可指定持久或者临时 zkc.createEphemeral("/haha","haha");//创建临时节点 zkc.createPersistent("/test/yaozhen","姚振");//创建持久节点,设置值必须有父节点 zkc.createPersistent("/320/houzheng",true);//支持递归创建,但是不能设置值,否则会报异常 //删 zkc.delete("/test/yaozhen"); zkc.deleteRecursive("/320");//递归删除,会删除节点下的子节点 //改 zkc.writeData("/test","新内容"); //查 boolean exists = zkc.exists("/test"); System.out.println(exists); zkc.readData("/jim",true);//如果节点为空返回null String readData = zkc.readData("/test"); System.out.println(readData); //获取子节点和阅读子节点数据 List<String> list = zkc.getChildren("/test"); for(String p : list){ System.out.println(p); String rp = "/test/" + p; String data = zkc.readData(rp); System.out.println("节点为:" + rp + ",内容为: " + data); } int i = zkc.countChildren("/test");//查询子节点数量 System.out.println(i);
相比之下zkclient简直太简单了!
watcher也特别简单:
子节点监听:
// 创建zkclient实例 ZkClient zkClient=new ZkClient(new ZkConnection(CONNECT_ADDR,SESSION_OUTTIME),10000); System.out.println("-----------"); //监听子节点变化(不监听数据变化,只监听节点的新增删除) IZkChildListener实现类 zkClient.subscribeChildChanges("/father",(x,y)->{//x:父路径, y:所有子节点路径集合 System.out.println("parentPath: " + x); System.out.println("currentChilds: " + y); }); Thread.sleep(2000); zkClient.createPersistent("/father");//父节点添加删除也会监听 Thread.sleep(1000); zkClient.createPersistent("/father/son","儿子"); Thread.sleep(1000); zkClient.createPersistent("/father/daughter","女儿"); Thread.sleep(1000); zkClient.delete("/father/son"); Thread.sleep(1000); zkClient.deleteRecursive("/father");
节点数据监听:
//监听节点数据变化,监听之前节点必须存在,不监听子节点 zkClient.createPersistent("/father", "1234"); zkClient.subscribeDataChanges("/father", new IZkDataListener() { @Override //节点数据变化 public void handleDataChange(String s, Object o) throws Exception { System.out.println("变更的节点为:" + s + ", 变更内容为:" + o); } @Override //节点删除 public void handleDataDeleted(String s) throws Exception { System.out.println("删除的节点为:" + s); } }); Thread.sleep(3000); zkClient.writeData("/father", "father");//-1:最新版本的数据 Thread.sleep(1000); zkClient.delete("/father");//节点删除野人不监听 Thread.sleep(1000);
状态监听:
//监听服务连接状态,可手动启动关闭zookeeper查看触发 zkClient.subscribeStateChanges(new IZkStateListener() { @Override public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception { if(state==Watcher.Event.KeeperState.SyncConnected){ System.out.println("连接zookeeper成功"); }else if(state==Watcher.Event.KeeperState.Disconnected){ System.out.println("zookeeper断开"); }else System.out.println("other"+state); } @Override //连接关闭,过了session的设置时间,再连接session就会重置,触发监听 public void handleNewSession() throws Exception { System.out.println("newsession"); } }); Thread.sleep(Integer.MAX_VALUE);//用不关闭线程,一直监听
curator框架:
基本增删改查:
//重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //通过工厂建立连接 CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址 .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy)//重试策略 .build(); curatorFramework.start();//一定要开启连接 //增 creatingParentsIfNeeded:同时创建父节点 withMode:指定节点类型(不加withMode默认为持久类型节点) curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/super/son","儿子".getBytes());//路径、数据内容 curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .forPath("/father/son","儿子".getBytes());//路径、数据内容 //删 curatorFramework.delete().deletingChildrenIfNeeded().forPath("/super");//递归删除子节点 //改 curatorFramework.setData().forPath("/father/son","新儿子".getBytes()); //查 byte[] bytes = curatorFramework.getData().forPath("/father/son"); System.out.println(bytes.toString()); //查找子节点 curatorFramework.getChildren().forPath("/father").forEach(x-> System.out.println(x)); //判断节点是否存在 Stat stat = curatorFramework.checkExists().forPath("/super"); System.out.println(stat);//不存在,返回bull //使用inBackground 绑定回调函数 ExecutorService pool = Executors.newCachedThreadPool(); curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT) .inBackground((cf,curatorEvent)->{//BackgroundCallback实现类 System.out.println("状态码"+curatorEvent.getResultCode());//0表示成功 System.out.println("type:"+curatorEvent.getType());//CREATE },pool).forPath("/320/yaozhen","姚振".getBytes()); //使用线程池做回调 Thread.sleep(Integer.MAX_VALUE);//等待,方便查看打印输出
监听:
//重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //通过工厂建立连接 CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址 .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy)//重试策略 .build(); curatorFramework.start(); //建立一个cache缓存,监听节点 final NodeCache nodeCache = new NodeCache(curatorFramework,"/father"); nodeCache.start(true);//第一次启动的时候就会立刻在Zookeeper上读取对应节点的数据内容,并保存在Cache中 //触发事件为创建节点和更新节点,在删除节点的时候并不触发此操作 nodeCache.getListenable().addListener(()->{ //NodeCacheListener实现类 System.out.println("路径为:" + nodeCache.getCurrentData().getPath()); System.out.println("数据为:" + new String(nodeCache.getCurrentData().getData())); System.out.println("状态为:" + nodeCache.getCurrentData().getStat()); System.out.println("---------------------------------------"); }); Thread.sleep(1000); curatorFramework.create().forPath("/father", "123".getBytes()); Thread.sleep(1000); curatorFramework.setData().forPath("/father", "456".getBytes()); Thread.sleep(1000); curatorFramework.delete().forPath("/father"); Thread.sleep(Integer.MAX_VALUE);
监听节点
//重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //通过工厂建立连接 CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址 .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy)//重试策略 .build(); curatorFramework.start(); //监听子节点:建立一个PathChildrenCache缓存,第三个参数为是否接受节点数据内容 如果为false则不接受 PathChildrenCache cache = new PathChildrenCache(curatorFramework, "/father", true); //在初始化的时候就进行缓存监听 cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); //监听子节点的新增,修改,删除 cache.getListenable().addListener((cf,event)-> {//PathChildrenCacheListener实现类 switch (event.getType()) { case CHILD_ADDED: System.out.println("添加子节点 :" + event.getData().getPath()); break; case CHILD_UPDATED: System.out.println("更新子节点 :" + event.getData().getPath()); break; case CHILD_REMOVED: System.out.println("删除子节点 :" + event.getData().getPath()); break; default: break; } }); //创建本身节点不发生变化 curatorFramework.create().forPath("/father", "init".getBytes()); //添加子节点 Thread.sleep(1000); curatorFramework.create().forPath("/father/c1", "c1内容".getBytes()); Thread.sleep(1000); curatorFramework.create().forPath("/father/c2", "c2内容".getBytes()); //修改子节点 Thread.sleep(1000); curatorFramework.setData().forPath("/father/c1", "c1更新内容".getBytes()); //删除子节点 Thread.sleep(1000); curatorFramework.delete().forPath("/father/c2"); //删除本身节点 Thread.sleep(1000); curatorFramework.delete().deletingChildrenIfNeeded().forPath("/father"); Thread.sleep(Integer.MAX_VALUE);
监听子节点
分布式锁:
//重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //通过工厂建立连接 CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址 .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy)//重试策略 .build(); curatorFramework.start(); //使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的 final InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/test"); final CountDownLatch countDownLatch = new CountDownLatch(1); for (int i = 0; i < 10; i++) { new Thread(()->{ try { countDownLatch.await();//线程等待一起执行 lock.acquire();//分布式锁,数据同步 //处理业务 j--; System.out.println(j); } catch (Exception e) { e.printStackTrace(); } finally { try {//释放锁 lock.release(); } catch (Exception e) { e.printStackTrace(); } } },"t"+i).start(); } Thread.sleep(1000); countDownLatch.countDown();//模拟十个线程一起并发.指定一起执行 } static int j=10;
这里模拟了一下十个线程并发,其实和ReentrantLock效果一样的,但是在分布式中也可以实现同步,而ReentrantLock就不行了
分布式计数器:
和分布式锁其实都是同理:
/** zookeeper地址 */ static final String CONNECT_ADDR = "192.168.171.128:2181,192.168.171.129:2181,192.168.171.130:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //通过工厂建立连接 CuratorFramework curatorFramework = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) //连接地址 .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy)//重试策略 .build(); curatorFramework.start(); //分布式计数器 DistributedAtomicInteger counter=new DistributedAtomicInteger(curatorFramework,"/super",new RetryNTimes(3,100)); //初始化 counter.forceSet(0); AtomicValue<Integer> value = counter.increment();//原子自增 System.out.println("原值为"+value.preValue()); System.out.println("更改后的值为"+value.postValue()); System.out.println("状态"+value.succeeded()); }
分布式线程屏障同步:
for (int i = 0; i < 5; i++) {//模拟5个客户端 new Thread(()->{ try { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .retryPolicy(retryPolicy) .build(); cf.start(); //双重同步DistributedDoubleBarrier: 指定5个线程,到了5个会自动触发 DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + "已经准备"); barrier.enter();//准备,等待成员到达5个再一起往下执行 System.out.println("同时开始运行..."); Thread.sleep(1000 * (new Random()).nextInt(3)); System.out.println(Thread.currentThread().getName() + "运行完毕"); barrier.leave();//再次等待准备退出,5个都是leave状态时再一起退出 System.out.println("同时退出运行..."); } catch (Exception e) { e.printStackTrace(); } },"t" + i).start(); }
和java中CyicrBarrier一模一样,只是支持分布式
static DistributedBarrier barrier; public static void main(String[] args) throws Exception { for(int i = 0; i < 5; i++){ new Thread(()-> { try { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); cf.start(); barrier = new DistributedBarrier(cf, "/test"); System.out.println(Thread.currentThread().getName() + "设置barrier!"); barrier.setBarrier();//设置 barrier.waitOnBarrier(); //等待Barrier释放,准备一起执行 System.out.println("---------开始执行程序----------"); } catch (Exception e) { e.printStackTrace(); } },"t" + i).start(); } Thread.sleep(10000); if(barrier != null){ System.out.println("启动所有线程"); barrier.removeBarrier();//删除屏障 } }
最后一个是只实现同时执行的,很奇怪,实际练习中开始执行程序这就是不执行,线程也一直卡在折了,启动这块都打印了!还得再研究下!