目录结构:
<!-- 引入zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.1</version>
</dependency>
起4台zookeeper :
controller :
package com..controller; import com..config.ZKUtils; import com..watch.WatchCallBack; import org.apache.zookeeper.ZooKeeper; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/zk") public class ZkLockController { ZooKeeper zk = ZKUtils.getZK(); @RequestMapping("/lock") public String Lock(){ new Thread(){ @Override public void run() { WatchCallBack watchCallBack = new WatchCallBack(); watchCallBack.setZk(zk); String threadName = Thread.currentThread().getName(); watchCallBack.setThreadName(threadName); //加锁 watchCallBack.tryLock(); try { Thread.sleep(2000); System.out.println("执行业务代码...."); } catch (InterruptedException e) { e.printStackTrace(); } //解锁 watchCallBack.unLock(); } }.start(); return "true"; } }
WatchCallBack 类:
package com..watch; import org.apache.zookeeper.*; import org.apache.zookeeper.data.Stat; import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; public class WatchCallBack implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback { ZooKeeper zk ; String threadName; CountDownLatch cc = new CountDownLatch(1); String pathName; public String getPathName() { return pathName; } public void setPathName(String pathName) { this.pathName = pathName; } public String getThreadName() { return threadName; } public void setThreadName(String threadName) { this.threadName = threadName; } public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public void tryLock(){ try { System.out.println(threadName + " Go to create Node...."); zk.create("/zklock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"lock"); cc.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void unLock(){ try { zk.delete(pathName,-1); System.out.println(threadName + " 业务执行完毕,释放锁...."); } catch (InterruptedException e) { e.printStackTrace(); } catch (KeeperException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { //如果第一个节点锁释放了,其实只有第二个收到了回调事件!! //如果,不是第一个节点,是其中某一个,挂掉了,也能造成他后边的节点收到这个通知,
//从而让他后边那个跟去watch挂掉这个节点前面一个节点。。。 switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: zk.getChildren("/",false,this ,"children"); break; case NodeDataChanged: break; case NodeChildrenChanged: break; } } @Override public void processResult(int rc, String path, Object ctx, String name) { if(name != null ){ System.out.println(threadName +" create node : " + name ); pathName = name ; zk.getChildren("/",false,this ,"children"); } } //getChildren call back @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { //一定能看到自己前边的节点。。 Collections.sort(children); int i = children.indexOf(pathName.substring(1)); //是不是第一个 if(i == 0){ //是第一个节点 System.out.println(threadName +" 我是最小节点 ...."); try { zk.setData("/",threadName.getBytes(),-1); cc.countDown(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ //不是第一个节点 zk.exists("/"+children.get(i-1),this,this,"children"); } } @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } }
初始化 zookeeper 链接 :
ZKUtils 获取zk
:
package com.wondersgroup.config; import org.apache.zookeeper.ZooKeeper; import java.util.concurrent.CountDownLatch; public class ZKUtils { private static ZooKeeper zk; private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkLocks"; private static DefaultWatch watch = new DefaultWatch(); private static CountDownLatch init = new CountDownLatch(1); public static ZooKeeper getZK(){ try { zk = new ZooKeeper(address,1000,watch);
//等待 链接 CountDownLatch -1 watch.setCc(init);
//继续执行 init.await(); } catch (Exception e) { e.printStackTrace(); } return zk; } }
初始化 zookeeper 链接 :
DefaultWatch 完成 -1 唤醒ZKutils 继续执行返回zk实例
:
package com..config; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import java.util.concurrent.CountDownLatch; public class DefaultWatch implements Watcher { CountDownLatch cc ; public void setCc(CountDownLatch cc) { this.cc = cc; } @Override public void process(WatchedEvent event) { System.out.println(event.toString()); switch (event.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected:
//初始化完成 -1 cc.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } } }
起三个客户端 端口号分别为 :6011 7011 8011
访问 Controller 查看zookeeper可视化工具 和 后台日志:
(完)