zoukankan      html  css  js  c++  java
  • zookeeper 实现分布式锁 demo(新)

    目录结构:

    <!-- 引入zookeeper  -->

    <dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.6.1</version>
    </dependency>

    起4台zookeeper :

    controller  :
    package com..controller;
    
    import com..config.ZKUtils;
    import com..watch.WatchCallBack;
    import org.apache.zookeeper.ZooKeeper;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    
    @RestController
    @RequestMapping("/zk")
    public class ZkLockController {
    
          ZooKeeper zk = ZKUtils.getZK();
    
        @RequestMapping("/lock")
        public String Lock(){
    
               new Thread(){
    
                   @Override
                   public void run() {
    
                       WatchCallBack watchCallBack = new WatchCallBack();
                       watchCallBack.setZk(zk);
                       String threadName = Thread.currentThread().getName();
                       watchCallBack.setThreadName(threadName);
    
                       //加锁
                       watchCallBack.tryLock();
    
                       try {
                           Thread.sleep(2000);
                           System.out.println("执行业务代码....");
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
    
                       //解锁
                       watchCallBack.unLock();
                   }
               }.start();
    
            return "true";
        }
    }
    WatchCallBack 类:
    package com..watch;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class WatchCallBack implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
    
        ZooKeeper zk ;
        String threadName;
        CountDownLatch cc = new CountDownLatch(1);
        String pathName;
    
        public String getPathName() {
            return pathName;
        }
    
        public void setPathName(String pathName) {
            this.pathName = pathName;
        }
    
        public String getThreadName() {
            return threadName;
        }
    
        public void setThreadName(String threadName) {
            this.threadName = threadName;
        }
    
        public ZooKeeper getZk() {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    
        public void tryLock(){
            try {
    
                System.out.println(threadName + " Go to create Node....");
    
                zk.create("/zklock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"lock");
    
                cc.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public void unLock(){
            try {
                zk.delete(pathName,-1);
                System.out.println(threadName + " 业务执行完毕,释放锁....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
    
        @Override
        public void process(WatchedEvent event) {
    
            //如果第一个节点锁释放了,其实只有第二个收到了回调事件!!
            //如果,不是第一个节点,是其中某一个,挂掉了,也能造成他后边的节点收到这个通知,
         //从而让他后边那个跟去watch挂掉这个节点前面一个节点。。。
    switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: zk.getChildren("/",false,this ,"children"); break; case NodeDataChanged: break; case NodeChildrenChanged: break; } } @Override public void processResult(int rc, String path, Object ctx, String name) { if(name != null ){ System.out.println(threadName +" create node : " + name ); pathName = name ; zk.getChildren("/",false,this ,"children"); } } //getChildren call back @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { //一定能看到自己前边的节点。。 Collections.sort(children); int i = children.indexOf(pathName.substring(1)); //是不是第一个 if(i == 0){ //是第一个节点 System.out.println(threadName +" 我是最小节点 ...."); try { zk.setData("/",threadName.getBytes(),-1); cc.countDown(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }else{ //不是第一个节点 zk.exists("/"+children.get(i-1),this,this,"children"); } } @Override public void processResult(int rc, String path, Object ctx, Stat stat) { } }

    初始化 zookeeper 链接 :

    ZKUtils 获取zk

     :

    package com.wondersgroup.config;
    
    import org.apache.zookeeper.ZooKeeper;
    import java.util.concurrent.CountDownLatch;
    
    public class ZKUtils {
    
        private  static ZooKeeper zk;
    
        private static String address = "localhost:2181,localhost:2182,localhost:2183,localhost:2184/zkLocks";
    
        private static DefaultWatch watch = new DefaultWatch();
    
        private static CountDownLatch init  =  new CountDownLatch(1);
        public static ZooKeeper  getZK(){
    
            try {
                zk = new ZooKeeper(address,1000,watch);
           //等待 链接 CountDownLatch -1 watch.setCc(init);
           //继续执行 init.await(); }
    catch (Exception e) { e.printStackTrace(); } return zk; } }

    初始化 zookeeper 链接 :

    DefaultWatch  完成 -1 唤醒ZKutils 继续执行返回zk实例

     :

    package com..config;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import java.util.concurrent.CountDownLatch;
    
    
    public class DefaultWatch implements Watcher {
    
        CountDownLatch cc ;
    
        public void setCc(CountDownLatch cc) {
            this.cc = cc;
        }
    
        @Override
        public void process(WatchedEvent event) {
    
            System.out.println(event.toString());
    
            switch (event.getState()) {
                case Unknown:
                    break;
                case Disconnected:
                    break;
                case NoSyncConnected:
                    break;
                case SyncConnected:
              //初始化完成 -1 cc.countDown();
    break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; } } }

    起三个客户端 端口号分别为 :6011   7011   8011

    访问  Controller    查看zookeeper可视化工具  和   后台日志:

     (完)

  • 相关阅读:
    持续集成(CI)-概念
    python 多个路由参数传递
    python %s 占位符用法
    本地电脑密钥登陆服务器
    nginx 安装及反向代理、负载均衡、静态文件指向、结合uwsgi上线项目、处理跨域问题
    docker 进入容器
    将python文件变成一个shell脚本可执行文件
    python中quote函数是什么意思,怎么用?
    CentOS 使用 runserver 启动服务器后,常驻后台运行
    Open Set Domain Adaptation by Backpropagation(OSBP)论文数字数据集复现
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14395817.html
Copyright © 2011-2022 走看看