zoukankan      html  css  js  c++  java
  • 基于zookeeper的分布式锁

    基于zk的分布式锁:
      大概原理:仍然跟基于db或者redis一致,就是注册节点,然后删除。不同的是zk因为可以对节点的事件进行监听,那么在收到节点删除的事件时,正在阻塞的线程便可以发起新的抢占锁的请求。当然,真正生产的代码一般不是这么写的,因为这样的情况下如果等待的线程非常多,那么zk向所有注册点的广播就要消耗大量的带宽,也会极大的消耗zk的性能,这显然是不合理的。所以,基于临时有序节点的分布式锁的优势就非常明显了,所有节点只关注自己的前节点,消耗少,线程等待时间可控,而且高可用。
    ---------Talk Is Cheap, Just Show Me The Code-------------------------------------------------
    第一版本的zk分布式锁(做法跟db、redis完全相同):
    锁工具类主要方法:
    锁的获取:
    /**
         * 阻塞获取锁
         */
        public boolean lock(){
            if(tryLock()){
                return true;
            }
            waitForLock();
            return lock();
        }
    
        /**
         * 非阻塞获取锁
         */
        public boolean tryLock(){
            try{
                zkClient.createPersistent("/lock");
                System.out.println("抢到了锁"+Thread.currentThread().getName());
            }catch (ZkNodeExistsException e){
                System.out.println("没抢到锁"+Thread.currentThread().getName());
                return false;
            }
            return true;
        }

    关键的抢占失败后等待方法:

      /**
         * 监听节点,等待抢锁
         */
        private void waitForLock(){
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {}
    
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println("有锁释放了,开始抢占锁"+Thread.currentThread().getName());
                    if(cdl != null){
                        cdl.countDown();
                    }
                }
            };
    
            zkClient.subscribeDataChanges("/lock",listener);
            if(zkClient.exists("/lock")){//节点是否存在
                cdl = new CountDownLatch(1);
                try {
                    cdl.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{//节点可能刚好被删除
                lock();
            }
        }
    

    锁的释放:

      /**
         * 解锁
         */
        public void unlock(){
            System.out.println("删除锁"+Thread.currentThread().getName());
            zkClient.delete("/lock");
        }  

    测试代码:

      private CountDownLatch cdl = new CountDownLatch(num);
    
        @Test
        public void testLock(){
            for(int i=0;i<num;i++){
                MyThread t = new MyThread("mythread"+i);
                t.start();
                cdl.countDown();
            }
            try {
                Thread.sleep(100000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        //内部类
        class MyThread extends Thread{
            private String threadName;
            public MyThread(){};
            public MyThread(String threadName){
                this.threadName = threadName;
            }
            @Override
            public void run() {
                ZkLock zkLock = new ZkLock();
                try {
                    cdl.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(zkLock.lock()){
                    System.out.println("线程-"+threadName+"获得锁,orderId为:"+MyResources.getInstance().getNextId());
                    try {
                        Thread.sleep(20);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    zkLock.unlock();
                }
            }
        }  

    执行结果一部分如下:

     

    已经不用再说什么了,这么多通知已经说明了一切,每删除一个节点通知所有订阅节点的代价是高昂的,这是不能忍受的,得改!

    第二版本的zk分布式锁(临时有序节点版):
      /**
         * 阻塞锁
         * @return
         */
        public boolean lock(){
            if(tryLock()){
                return true;
            }else{
                waitForLock();
                return lock();
            }
        }
    
        /**
         * 非阻塞锁
         * @return
         */
        public boolean tryLock(){
            if(currentPath == null){
                currentPath = zkClient.createEphemeralSequential(lockPath+"/lock",Thread.currentThread().getName());
                System.out.println(Thread.currentThread().getName()+"创建节点:"+currentPath+"=================================================");
                List<String> nodeList = zkClient.getChildren(lockPath);
                Collections.sort(nodeList);
                if(currentPath.equals(lockPath+"/"+nodeList.get(0))){//当前注册节点为最靠前节点
                    return true;
                }else{
                    int weizhi = Collections.binarySearch(nodeList,currentPath.substring(8));
                    beforePath = lockPath +"/"+nodeList.get(weizhi-1);
                    return false;
                }
            }else{
                return true;
            }
        } 
    

    一样重要的等待方法:

    //锁等待
        private boolean waitForLock() {
            IZkDataListener listener = new IZkDataListener() {
                @Override
                public void handleDataChange(String s, Object o) throws Exception {
                }
    
                @Override
                public void handleDataDeleted(String s) throws Exception {
                    if(cdl!=null){
                        System.out.println("前面的节点"+beforePath+"被删除了=====================");
                        cdl.countDown();
                    }
                }
            };
    
            zkClient.subscribeDataChanges(beforePath,listener);
            if(zkClient.exists(beforePath)){//前节点存在
                cdl = new CountDownLatch(1);
                try {
                    cdl.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return false;
            }else{//订阅的时候,可能前边线程处理较快,已经删除了
                return true;
            }
        }
    

    释放锁:

       /**
         * 释放锁
         * @return
         */
        public boolean unlock(){
            return zkClient.delete(currentPath);
        } 

     测试代码:

    private int num = 10;
        private CountDownLatch cdl = new CountDownLatch(num);
    
        @Test
        public void testLock(){
            for(int i=0;i<num;i++){
                ZkLockTest2.MyThread t = new ZkLockTest2.MyThread("mythread"+i);
                t.start();
                cdl.countDown();
            }
            try {
                Thread.sleep(1000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        //内部类
        class MyThread extends Thread{
            private String threadName;
            public MyThread(){};
            public MyThread(String threadName){
                this.threadName = threadName;
            }
            @Override
            public void run() {
                try {
                    cdl.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                ZkLock2 zkLock = new ZkLock2();
                if(zkLock.lock()){
                    System.out.println("线程-"+threadName+"获得锁,orderId为:"+MyResources.getInstance().getNextId());
                    zkLock.unlock();
                }
            }
        }
    

    执行结果如下(所有):  

     

    结束语:
      三篇结束了,对比三种实现方式结论如下:
      a、基于db的分布式锁,相对容易理解,易上手;但依赖数据库,对数据库有损耗(这个影响其实比较小,多数时候可以忽略),可能会有短时间死锁等,其它线程重试的时间不确定,整体时间利用率不好把控(个人认为这个是主要的,其它的可重入,死锁之类的有其它办法解决)。
      b、基于redis的分布式锁,性能远高于db的,依赖redis(这个跟db一般不会挂,但就怕万一),主备之间可能数据不一致,一样线程休眠时间不好确定,整体时间利用率不好把控(个人认为这个也是主要原因,解决办法的话,休眠时间设置为正常单线程处理业务时间的2-3倍(经验值),所有线程休眠时间在某个时间段内随机,不要固定时间;其它问题多数可以有办法解决)。
      c、基于zk的分布式锁,性能略逊redis但一样远高于db,得益于zk的高可用,不用担心挂掉的问题,而且由于临时有序节点的排序跟节点监听,解决了休眠问题;缺点实现略复杂。
    只是自己学习之作,没有考虑生产中具体的需求,可能有理解不正确的地方,欢迎批评斧正。
     

  • 相关阅读:
    GoLang之网络
    GoLang之方法与接口
    GoLang之基础
    Twemproxy 缓存代理服务器
    判断点是否在三角形内
    C++中const 的各种用法
    解决java web中safari浏览器下载后文件中文乱码问题
    Spring MVC如何测试Controller(使用springmvc mock测试)
    java生成指定范围的随机数
    itextpdf添加非自带字体(例如微软雅黑)
  • 原文地址:https://www.cnblogs.com/nevermorewang/p/8306903.html
Copyright © 2011-2022 走看看