zoukankan      html  css  js  c++  java
  • 分布式锁-Zookeeper实现

    1:Zookeeper基本概念

    Zookeeper 存储数据的结构是一个树

    1. 分为持久节点 和 瞬时节点
    2. 持久节点你的会话结束或者是 Zookeeper 重启,这些节点呢都是不会消失的,它会一直存在不会消失的,除非你手动删除这些节点
    3. 瞬时节点(有序),Zookeeper 重启或者连接Zookeeper 会话断了,那么瞬时节点呢就会自动的去消失。瞬时节点不可以再去拥有子节点,如果当前节点是瞬时节点,那么子节点是不可能会存在的,为什么呢,因为如果你自动消失了,你这个瞬时节点的子节点是不是也会跟着消失啊,所以这个顾虑 Zookeeper 规定,瞬时节点是不能拥有子节点的,瞬时节点它还可以一个有序,有序是什么意思,就是你在创建这个瞬时节点的时候呢,你的名称是按照序号排序的,这一点是非常的重要的
    4. Zookeeper 另外一个非常重要的概念,就是这个观察器,它是可以检查Zookeeper 里边某个节点的变化,比如这个节点由存在到不存在,就是节点消失了,有可能节点数据发生了变化,比如这个节点里面的内容改变了,也有可能它的子节点发生了变化。我们都可以检测到这些节点的变化,如果它有变化呢,它会立刻通知到客户端,就是咱们在程序当中连接 Zookeeper 对吧。它会立刻通知你的程序,我这个节点数据发生变化呢,你接下来要做哪些操作,这个就是Zookeeper 的观察器      观察器可以监测3个方法: getData();  getChildren();  exists();

      观察器只能监听一次,如果你要再次监控这个节点的话呢,你需要重新设置这个观察器。这是Zookeeper 的一个特点

    2:Zookeeper 分布式锁的原理

    主要是利用了 Zookeeper 瞬时有序节点这样的一个特性

    1. 多线程并发创建的时候,它的这个节点并不会只有一个,比如说咱们有10个线程,同时去Zookeeper 里边创建节点,并发并不代表说Zookeeper 只创建了一个节点,不是这样。10个线程同时去创建瞬时节点,它会有10个瞬时节点创建成功,但是这10个瞬时节点呢,它是有序的,就是说你10个线程创建10个节点。它的名称是不一样的,它的序号也是不一样的。咱们规定序号最小的这个线程获得这把锁,其他的线程呢都没有获得锁,这个就是分布式的情况了。你两个JVM 或者多个JVM ,同时去Zookeeper 里边创建这个瞬时节点,你所有的线程都创建成功了,但是序号最小的那个线程获得锁。其他线程都是获取锁失败。就是按照这么一个逻辑去实现 Zookeeper 的分布式锁
    2. 其他的线程怎么办?

      其他的线程就监听 自己序号的前一个序号,比如说你有 10个线程,创建了10个瞬时节点,序号最小的是1 ,那就是第1个线程是获得到了锁,它去执行后面的逻辑。那第2个线程怎么办,第2个线程要监听序号为1的这个节点。等序号为1的节点消失了。消失是什么意思,就是第1个线程执行完了,序号为1的节点消失了,它立刻发送一个通知,因为我们设置观察器嘛,序号为2的这个线程监听序号1的这个节点。序号为1的这个节点消失了。它会马上通知序号为2的这个线程。这个线程2就开始执行了

    3:手写Zookeeper分布式锁

    maven

            <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.14</version>
            </dependency>
    

     实现层

    package com.example.zookeeperlock.util;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    
    /**
     * @Author: qiuj
     * @Description:
     * @Date: 2020-10-24 11:28
     */
    @Slf4j
    public class ZkLock implements AutoCloseable,Watcher {
    
        public ZkLock() throws IOException {
            this.zooKeeper = new ZooKeeper("localhost:2181",
                    10000,this);
        }
    
        private  ZooKeeper zooKeeper;
        private String currentNode;
    
        public boolean getLock (String businessCode) throws KeeperException, InterruptedException {
         try {
             //  设置根路径
             Stat stat = zooKeeper.exists("/" + businessCode,false);
             if (stat == null) {
                 zooKeeper.create("/" + businessCode,businessCode.getBytes(),
                         ZooDefs.Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
             }
             //  创建子节点
             currentNode = zooKeeper.create("/" + businessCode + "/" + businessCode + "_",businessCode.getBytes(),
                     ZooDefs.Ids.OPEN_ACL_UNSAFE,
                     CreateMode.EPHEMERAL_SEQUENTIAL);
             //  查询出所有节点
             List<String> nodeList = zooKeeper.getChildren("/" + businessCode,false);
             //  从小到大排序
             Collections.sort(nodeList);
             //  查询出第一个节点  和当前创建节点进行匹配是否一致  一致则获得到锁
             String firstNode = nodeList.get(0);
             if (currentNode.endsWith(firstNode)) {
                 return true;
             }
             //  没有获得到锁  则监听当前节点的上一个节点
             for (String node : nodeList) {
                 if (currentNode.endsWith(node)) {
                     zooKeeper.exists("/" + businessCode + "/" + firstNode,true);
                     break;
                 } else {
                     firstNode = node;
                 }
             }
            //  让该线程进入等待    把执行wait的前提条件放入synchronized中,这样才可以避免wait执行之前执行notify
             synchronized (this) {
                 wait();
             }
             return true;
         } catch (Exception e) {
             e.printStackTrace();;
         }
         return false;
        }
    
        @Override
        public void process(WatchedEvent event) {
            //  watch观察器   监听触发此方法  当前节点删除则唤醒该线程 ,也间接说明获得了锁
            if (Event.EventType.NodeDeleted == event.getType()) {
                synchronized (this) {
                    notify();
                }
            }
        }
    
    
        @Override
        public void close() throws Exception {
            //  方法调用结束之后将当前节点删除   并释放锁
            zooKeeper.delete(currentNode,-1);
            zooKeeper.close();
            log.info("释放锁");
        }
    }
    

     Controller 层调用

    package com.example.zookeeperlock;
    
    import com.example.zookeeperlock.util.ZkLock;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooKeeper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author: qiuj
     * @Description:
     * @Date: 2020-10-05 14:47
     */
    @Slf4j
    @RestController
    public class ZookeeperLockController {
    
    
        @RequestMapping("/zkLock")
        public String zkLock () {
            log.info("我进入了方法!");
            try (ZkLock zkLock = new ZkLock()) {
                if (zkLock.getLock("order")) {
                    log.info("我获得了锁");
                    Thread.sleep(10000);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            }
            log.info("方法执行完成!");
            return "方法执行完成!";
        }
    
       
    }

    开启两个应用   8080  8081    

    8080 19秒进入方法  19秒获得了锁   29秒释放了锁

    8081  20秒进入方法  29秒获得了锁         

    说明在多个JVM 情况下,分布式锁是有效的

     

    4:Curator 客户端 实现分布式锁 

    maven

    
            <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.2.0</version>
            </dependency>

    Applicaton 启动类 

    package com.example.zookeeperlock;
    
    import org.apache.curator.RetryPolicy;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.annotation.Bean;
    
    @SpringBootApplication
    public class ZookeeperLockApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ZookeeperLockApplication.class, args);
        }
    
        /**
         * 注入Bean
          * @return
         * @throws Exception
         */
        @Bean(initMethod="start",destroyMethod="close")
        public CuratorFramework getCuratorFramework () throws Exception {
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", retryPolicy);
            return client;
        }
    }

     Controller 层

    package com.example.zookeeperlock;
    
    import com.example.zookeeperlock.util.ZkLock;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.recipes.locks.InterProcessMutex;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooKeeper;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Author: qiuj
     * @Description:
     * @Date: 2020-10-05 14:47
     */
    @Slf4j
    @RestController
    public class ZookeeperLockController {
    
        @Autowired
        private CuratorFramework client;
    
    
        @RequestMapping("/zk")
        public String curatorLock () {
            log.info("我进入了方法!");
            InterProcessMutex lock = new InterProcessMutex(client,"/curatorLock");
            try {
                if ( lock.acquire(30, TimeUnit.SECONDS) )
                {
                    log.info("我获得了锁");
                    Thread.sleep(10000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    lock.release();
                    log.info("释放锁");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            log.info("方法执行完成!");
            return "方法执行完成!";
        }
    }

    5: 项目源代码   Zookeeper-3.4.6版本包

  • 相关阅读:
    poj 1328 Radar Installation (贪心)
    hdu 2037 今年暑假不AC (贪心)
    poj 2965 The Pilots Brothers' refrigerator (dfs)
    poj 1753 Flip Game (dfs)
    hdu 2838 Cow Sorting (树状数组)
    hdu 1058 Humble Numbers (DP)
    hdu 1069 Monkey and Banana (DP)
    hdu 1087 Super Jumping! Jumping! Jumping! (DP)
    必须知道的.NET FrameWork
    使用记事本+CSC编译程序
  • 原文地址:https://www.cnblogs.com/blogspring/p/14191737.html
Copyright © 2011-2022 走看看