zoukankan      html  css  js  c++  java
  • ZooKeeper的分布式锁实现

    分布式锁一般有三种实现方式:

    1. 数据库乐观锁;

    2. 基于Redis的分布式锁;

    3. 基于ZooKeeper的分布式锁。

    本篇博客将介绍第三种方式,基于Zookeeper实现分布式锁。虽然网上已经有各种介绍Zookeeper分布式锁实现的博客,然而他们的实现却有着各种各样的问题,为了避免误人子弟,本篇博客将详细介绍如何正确地实现Zookeeper分布式锁。

      现在模拟一个使用Zookeeper实现分布式锁,假设有A,B,C三台客户端去访问资源,调用zookeeper获得锁。客户端三个在zookeeper的 /locks节点下创建一个/lock节点,由于节点是唯一性的特性,只有一个人会创建成功,其余两个创建失败,会进入监听/locks节点的变化,如果/locks下子节点/lock节点发生变化,其余两个可以去拿锁,这样是否好呢? 这样子会导致惊群效应。就是一个触发使得在短时间呢会触发大量的watcher事件,但是只有一个客户端能拿到锁。所以这种方式不建议。

      有一种比较好的方法就是利用 zookeeper 的有序节点的特性,基本思路:

    1、在获取分布式锁的时候在locker节点下创建临时顺序节点,释放锁的时候删除该临时节点。

    2、客户端调用createNode方法在locks下创建临时顺序节点,然后调用getChildren(“locks”)来获取locks下面的所有子节点,注意此时不用设置任何Watcher。

    3、客户端获取到所有的子节点path之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁。

    4、如果发现自己创建的节点并非locks所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,然后对其调用exist()方法,同时对其注册事件监听器。

    5、之后,让这个被关注的节点删除,则客户端的Watcher会收到相应通知,此时再次判断自己创建的节点是否是locks子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。

     下面看一下我的代码实现:

    public class DistributedLock implements Lock,Watcher {
     
        private ZooKeeper zk=null;
        private String ROOT_LOCK="/locks"; //定义根节点
        private String WAIT_LOCK; //等待前一个锁
        private String CURRENT_LOCK; //表示当前的锁
        // 作为阻塞
        private CountDownLatch countDownLatch; //
     
     
        public DistributedLock() {
     
            try {
                zk=new ZooKeeper("192.168.254.135:2181",
                        4000,this);
                //判断根节点是否存在 
                Stat stat=zk.exists(ROOT_LOCK,false);
                if(stat==null){//如果不存在创建
                    zk.create(ROOT_LOCK,"0".getBytes(),
                            ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
     
        }
     
        /**
         * 尝试获取锁
         */
        @Override
        public boolean tryLock() {
     
            try {
                //创建临时有序节点
                CURRENT_LOCK=zk.create(ROOT_LOCK+"/","0".getBytes(),
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
                System.out.println(Thread.currentThread().getName()+"->"+
                        CURRENT_LOCK+",尝试竞争锁");
                List<String> childrens=zk.getChildren(ROOT_LOCK,false); //获取根节点下的所有子节点
                SortedSet<String> sortedSet=new TreeSet();//定义一个集合进行排序
                for(String children:childrens){ // 排序
                    sortedSet.add(ROOT_LOCK+"/"+children);
                }
                String firstNode=sortedSet.first(); //获得当前所有子节点中最小的节点
                // 取出比我创建的节点还小的节点,没有的话为null
                SortedSet<String> lessThenMe=((TreeSet<String>) sortedSet).headSet(CURRENT_LOCK);
                if(CURRENT_LOCK.equals(firstNode)){//通过当前的节点和子节点中最小的节点进行比较,如果相等,表示获得锁成功
                    return true;
                }
                if(!lessThenMe.isEmpty()){
                    WAIT_LOCK=lessThenMe.last();//获得比当前节点更小的最后一个节点,设置给WAIT_LOCK
                }
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return false;
        }
     
     
        @Override
        public void lock() {
            if(this.tryLock()){ //如果获得锁成功
                System.out.println(Thread.currentThread().getName()+"->"+CURRENT_LOCK+"->获得锁成功");
                return;
            }
            try {
                waitForLock(WAIT_LOCK); //没有获得锁,继续等待获得锁
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
     
        private boolean waitForLock(String prev) throws KeeperException, InterruptedException {
            //监听当前节点的上一个节点 注册事件,这里需要在默认的 watch 事件里面处理
            // 这里是我们之前提到的 watch 事件触发最后执行的 process 回调里面的 请看最下行代码
            Stat stat=zk.exists(prev,true);
            if(stat!=null){
                System.out.println(Thread.currentThread().getName()+"->等待锁"+ROOT_LOCK+"/"+prev+"释放");
                countDownLatch=new CountDownLatch(1);
                countDownLatch.await();// 进入等待,这里需要
                //TODO  watcher触发以后,还需要再次判断当前等待的节点是不是最小的
                System.out.println(Thread.currentThread().getName()+"->获得锁成功");
            }
            return true;
        }
     
        @Override
        public void lockInterruptibly() throws InterruptedException {
     
        }
     
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
     
        @Override
        public void unlock() {
            System.out.println(Thread.currentThread().getName()+"->释放锁"+CURRENT_LOCK);
            try {
                // -1 表示无论如何先把这个节点删了再说
                zk.delete(CURRENT_LOCK,-1);
                CURRENT_LOCK=null;
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
     
        @Override
        public Condition newCondition() {
            return null;
        }
     
        @Override
        public void process(WatchedEvent event) {
            // 事件回调 countDownLatch.countDown();
            if(this.countDownLatch!=null){
                this.countDownLatch.countDown();
            }
        }
    }

    代码中实现了  Lock,Watcher 两个接口。主要用到的是lock 里面的trylock方法,尝试去获取锁。然后还有watcher里面的处理回调的方法

    测试代码

    public static void main( String[] args ) throws IOException {
           CountDownLatch countDownLatch=new CountDownLatch(10);
           for(int i=0;i<10;i++){
               new Thread(()->{
                   try {
                       countDownLatch.await();
                       DistributedLock distributedLock=new DistributedLock();
                       distributedLock.lock(); //获得锁
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
               },"Thread-"+i).start();
               countDownLatch.countDown();
           }
           System.in.read();
       }

    运行结果:

    Thread-8->/locks/0000000040,尝试竞争锁
    Thread-5->/locks/0000000044,尝试竞争锁
    Thread-9->/locks/0000000041,尝试竞争锁
    Thread-3->/locks/0000000042,尝试竞争锁
    Thread-7->/locks/0000000046,尝试竞争锁
    Thread-1->/locks/0000000043,尝试竞争锁
    Thread-0->/locks/0000000047,尝试竞争锁
    Thread-4->/locks/0000000045,尝试竞争锁
    Thread-2->/locks/0000000049,尝试竞争锁
    Thread-6->/locks/0000000048,尝试竞争锁
    Thread-8->/locks/0000000040->获得锁成功
    Thread-9->等待锁/locks//locks/0000000040释放
    Thread-5->等待锁/locks//locks/0000000043释放
    Thread-0->等待锁/locks//locks/0000000046释放
    Thread-1->等待锁/locks//locks/0000000042释放
    Thread-2->等待锁/locks//locks/0000000048释放
    Thread-7->等待锁/locks//locks/0000000045释放
    Thread-6->等待锁/locks//locks/0000000047释放
    Thread-3->等待锁/locks//locks/0000000041释放
    Thread-4->等待锁/locks//locks/0000000044释放
  • 相关阅读:
    Zero Downtime Upgrade of Oracle 10g to Oracle 11g Using GoldenGate — 1
    架构-MVVM:MVVM核心概念
    架构-MVVC:百科
    架构:目录
    架构:template
    JavaScript-Tool:Ext JS
    JavaScript-Tool:jquery.tree.js-un
    JavaScript-Tool:wdtree
    C#:C# 运算符
    C#:目录
  • 原文地址:https://www.cnblogs.com/luxianyu-s/p/11328615.html
Copyright © 2011-2022 走看看