zoukankan      html  css  js  c++  java
  • ZK(ZooKeeper)分布式锁实现

    点赞再看,养成习惯,微信搜索【牧小农】关注我获取更多资讯,风里雨里,小农等你。
    本文中案例都会在上传到git上,请放心浏览
    git地址:https://github.com/muxiaonong/ZooKeeper/tree/master/mxnzookeeper

    准备

    本文会使用到 三台 独立服务器,可以自行提前搭建好。

    不知道如何搭建的,可以看我之前 ZooKeeper集群 搭建:Zookeeper 集群部署的那些事儿

    关于ZooKeeper 一些基础命令可以看这篇:Zookeeper入门看这篇就够了

    前言

    在平时我们对锁的使用,在针对单个服务,我们可以用 Java 自带的一些锁来实现,资源的顺序访问,但是随着业务的发展,现在基本上公司的服务都是多个,单纯的 Lock或者Synchronize 只能解决单个JVM线程的问题,那么针对于单个服务的 Java 的锁是无法满足我们业务的需要的,为了解决多个服务跨服务访问共享资源,于是就有了分布锁,分布式锁产生的原因就是集群
    在这里插入图片描述

    正文

    实现分布式锁的方式有哪些呢?

    • 分布式锁的实现方式主要以(ZooKeeper、Reids、Mysql)这三种为主

    今天我们主要讲解的是使用 ZooKeeper来实现分布式锁,ZooKeeper的应用场景主要包含这几个方面:

    1. 服务注册与订阅(共用节点)
    2. 分布式通知(监听ZNode)
    3. 服务命令(ZNode特性)
    4. 数据订阅、发布(Watcher)
    5. 分布式锁(临时节点)

    ZooKeeper实现分布式锁,主要是得益于ZooKeeper 保证了数据的强一致性,锁的服务可以分为两大类:

    • 保持独占

      所有试图来获取当前锁的客户端,最终有且只有一个能够成功得到当前锁的钥匙,通常我们会把 ZooKeeper 上的节点(ZNode)看做一把锁,通过 create 临时节点的方式来实现,当多个客户端都去创建一把锁的时候,那么只有成功创建了那个客户端才能拥有这把锁

    • 控制时序

      所有试图获取锁的客户端,都是被顺序执行,只是会有一个序号(zxid),我们会有一个节点,例如:/testLock,所有临时节点都在这个下面去创建,ZK的父节点(/testLock) 维持了一个序号,这个是ZK自带的属性,他保证了子节点创建的时序性,从而也形成了每个客户端的一个 全局时序

    ZK锁机制

    在实现ZooKeeper 分布式锁之前我们有必要了解一下,关于ZooKeeper分布式锁机制的实现流程和原理,不然各位宝贝,出去面试的时候怎么和面试官侃侃而谈~

    临时顺序节点

    基于ZooKeeper的临时顺序节点 ,ZooKeeper比较适合来实现分布式锁:

    • 顺序发号器: ZooKeeper的每一个节点,都是自带顺序生成器:在每个节点下面创建临时节点,新的子节点后面,会添加一个次序编号,这个生成的编号,会在上一次的编号进行 +1 操作
    • 有序递增: ZooKeeper节点有序递增,可以保证锁的公平性,我们只需要在一个持久父节点下,创建对应的临时顺序节点,每个线程在尝试占用锁之前,会调用watch,判断自己当前的序号是不是在当前父节点最小,如果是,那么获取锁
    • Znode监听: 每个线程在抢占所之前,会创建属于当前线程的ZNode节点,在释放锁的时候,会删除创建的ZNode,当我们创建的序号不是最小的时候,会等待watch通知,也就是上一个ZNode的状态通知,当前一个ZNode删除的时候,会触发回调机制,告诉下一个ZNode,你可以获取锁开始工作了
    • 临时节点自动删除:ZooKeeper还有一个好处,当我们客户端断开连接之后,我们出创建的临时节点会进行自动删除操作,所以我们在使用分布式锁的时候,一般都是会去创建临时节点,这样可以避免因为网络异常等原因,造成的死锁。
    • 羊群效应: ZooKeeper节点的顺序访问性,后面监听前面的方式,可以有效的避免 羊群效应,什么是羊群效应:当某一个节点挂掉了,所有的节点都要去监听,然后做出回应,这样会给服务器带来比较大压力,如果有了临时顺序节点,当一个节点挂掉了,只有它后面的那一个节点才做出反应。

    我们现在看一下下面一张图:

    在这里插入图片描述
    在上图中,ZooKeeper里面有一把锁节点 testLock,这个锁就是ZooKeeper的一个节点,当两个客户端来获取这把锁的时候,会对ZooKeeper进行加锁的请求,也就是我们所说的 临时顺序节点

    当我们在 /testLock目录下创建了一个顺序临时节点后,ZK会自动对这个临时节点维护 一个节点序号,并且这个节点是递增的,比如我们 clientA 创建了一个临时顺序节点,ZK内部会生成一个序号:/lock0000000001,那么 clientB 也生成了一个临时顺序节点,ZK会生成一个序号为 /lock0000000002,在这里数字都是依次递增的,从1开始递增,ZK内部会维护这个顺序。

    下图所示:

    在这里插入图片描述

    这时候,ClientA会进行监听判断,在父节点下,我是不是最小的,如果是的话,那么俺就可以加锁了,因为我是最小的,其他的都比我大。我自己可以进行加锁,你已经是一个成熟的临时节点了,要学会自己加锁。咳,那么ZK是怎么进行判断的呢?宝贝,您往下看:

    在这里插入图片描述
    这个是cleintA已经加锁完成了,这个时候clientB也要过来加锁,那么他也要在/testLock,创建一个属于自己的临时节点,那么这个时候他的序号就会变成/lock0000000002,如下图所示:
    在这里插入图片描述

    这个时候就会出现我们前面所讲的,clientB 在加锁的时候会判断,自己是不是最小的,一看在当前父节点下不是最小的,啊~我还挺大的,还有比我小的!!!

    加锁失败呀,咳咳,这个时候呢,clientB 就会去偷窥clientA,气氛逐渐暧昧起来,啊不是,是按照顺序去监听前一个节点(clientA),是否完成工作了,如果完成了,clientB才可以进行加锁工作,宝贝,你往下看图片:

    在这里插入图片描述

    clientA 加锁成功后,会进行自己的业务处理,当 clientA 处理完工作后,说我完事了,下一个,那么 clientA 是怎么完事的呢,他多长时间?不是,具体流程是怎样的?小农你不对劲,说什么呢!!!真羞涩

    上面我们不是说了,当 clientB 加锁失败后,会给前一个节点(clientA)加上一个监听,当clientA被删除以后,就表示有人释放了锁,这个时候就会通知 clientB重新去获取锁。

    在这里插入图片描述

    这个时候clientB重新获取锁的时候,发现自己就是当前父节点下面最小的那个,于是clientB就开始加锁,开始工作等一系列操作,当clientB 完事以后,释放锁,也说了一句,下一个。

    如下图所示:

    在这里插入图片描述

    当然除了 clientA、clientB还有CDE等,这字母看着好奇怪又好熟悉,原理都是一样的,都是最小节点进行解锁,如果不是,监听前一个节点是否释放,如果释放了,再次尝试加锁。如果前一节节点释放了,自己就是最小了,就排到前面去了,有点类似于 银行取号 的操作。

    代码实现

    使用ZooKeeper 创建临时顺序节点来实现分布式锁,大体的流程就是 先创建一个持久父节点,在当前节点下,创建临时顺序节点,找出最小的序列号,获取分布式锁,程序业务完成之后释放锁,通知下一个节点进行操作,使用的是watch来监控节点的变化,然后依次下一个最小序列节点进行操作。

    首先我们需要创建一个持久父类节点:我这里是 /mxn
    在这里插入图片描述

    WatchCallBack

    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    
    import java.util.Collections;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    
    /**
     * @program: mxnzookeeper
     * @ClassName WatchCallBack
     * @description:
     * @author: 微信搜索:牧小农
     * @create: 2021-10-23 10:48
     * @Version 1.0
     **/
    public class WatchCallBack  implements Watcher, AsyncCallback.StringCallback ,AsyncCallback.Children2Callback ,AsyncCallback.StatCallback {
    
        ZooKeeper zk ;
        String threadName;
        CountDownLatch cc = new CountDownLatch(1);
        String pathName;
    
        public String getPathName() {
            return pathName;
        }
    
        public void setPathName(String pathName) {
            this.pathName = pathName;
        }
    
        public String getThreadName() {
            return threadName;
        }
    
        public void setThreadName(String threadName) {
            this.threadName = threadName;
        }
    
        public ZooKeeper getZk() {
            return zk;
        }
    
        public void setZk(ZooKeeper zk) {
            this.zk = zk;
        }
    
        /** @Author 牧小农
         * @Description //TODO 尝试加锁方法
         * @Date 16:14 2021/10/24
         * @Param 
         * @return 
         **/
        public void tryLock(){
            try {
    
                System.out.println(threadName + " 开始创建。。。。");
                //创建一个顺序临时节点
                zk.create("/lock",threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,this,"abc");
                //阻塞当前,监听前一个节点是否释放锁
                cc.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /** @Author 牧小农
         * @Description //TODO 解锁方法
         * @Date 16:14 2021/10/24
         * @Param 
         * @return 
         **/
        public void unLock(){
            try {
                //释放锁,删除临时节点
                zk.delete(pathName,-1);
                //结束工作
                System.out.println(threadName + "         结束工作了....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            }
        }
    
    
        @Override
        public void process(WatchedEvent event) {
    
            //如果第一个节点释放了锁,那么第二个就会收到回调
            //告诉它前一个节点释放了,你可以开始尝试获取锁
            switch (event.getType()) {
                case None:
                    break;
                case NodeCreated:
                    break;
                case NodeDeleted:
                    //当前节点重新获取锁
                    zk.getChildren("/",false,this ,"sdf");
                    break;
                case NodeDataChanged:
                    break;
                case NodeChildrenChanged:
                    break;
            }
    
        }
    
        @Override
        public void processResult(int rc, String path, Object ctx, String name) {
            if(name != null ){
                System.out.println(threadName  +" 线程创建了一个节点为 : " +  name );
                pathName =  name ;
                //监听前一个节点
                zk.getChildren("/",false,this ,"sdf");
            }
    
        }
    
        //getChildren  call back
        @Override
        public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
    
            //节点按照编号,升序排列
            Collections.sort(children);
            //对节点进行截取例如  /lock0000000022 截取后就是  lock0000000022
            int i = children.indexOf(pathName.substring(1));
    
    
            //是不是第一个,也就是说是不是最小的
            if(i == 0){
                //是第一个
                System.out.println(threadName +" 现在我是最小的....");
                try {
                    zk.setData("/",threadName.getBytes(),-1);
                    cc.countDown();
    
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else{
                //不是第一个
                //监听前一个节点 看它是不是完成了工作进行释放锁了
                zk.exists("/"+children.get(i-1),this,this,"sdf");
            }
    
        }
    
        @Override
        public void processResult(int rc, String path, Object ctx, Stat stat) {
            //判断是否失败exists
        }
    }
    
    

    TestLock

    import com.mxn.zookeeper.config.ZKUtils;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    
    /**
     * @program: mxnzookeeper
     * @ClassName TestLock
     * @description:
     * @author: 微信搜索:牧小农
     * @create: 2021-10-23 10:45
     * @Version 1.0
     **/
    public class TestLock {
    
    
        ZooKeeper zk ;
    
        @Before
        public void conn (){
            zk  = ZKUtils.getZK();
        }
    
        @After
        public void close (){
            try {
                zk.close();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        @Test
        public void lock(){
    
            //创建十个线程
            for (int i = 0; i < 10; i++) {
                new Thread(){
                    @Override
                    public void run() {
                        WatchCallBack watchCallBack = new WatchCallBack();
                        watchCallBack.setZk(zk);
                        String threadName = Thread.currentThread().getName();
                        watchCallBack.setThreadName(threadName);
                        //线程进行抢锁操作
                        watchCallBack.tryLock();
                        try {
                            //进行业务逻辑处理
                            System.out.println(threadName+"         开始处理业务逻辑了...");
                            Thread.sleep(200);
                        }catch (Exception e){
                            e.printStackTrace();
                        }
                        //释放锁
                        watchCallBack.unLock();
    
    
                    }
                }.start();
            }
    
    
            while(true){
    
            }
    
        }
    
    }
    

    运行结果:

    Thread-1 线程创建了一个节点为 : /lock0000000112
    Thread-5 线程创建了一个节点为 : /lock0000000113
    Thread-2 线程创建了一个节点为 : /lock0000000114
    Thread-6 线程创建了一个节点为 : /lock0000000115
    Thread-9 线程创建了一个节点为 : /lock0000000116
    Thread-4 线程创建了一个节点为 : /lock0000000117
    Thread-7 线程创建了一个节点为 : /lock0000000118
    Thread-3 线程创建了一个节点为 : /lock0000000119
    Thread-8 线程创建了一个节点为 : /lock0000000120
    Thread-0 线程创建了一个节点为 : /lock0000000121
    Thread-1 现在我是最小的....
    Thread-1         开始处理业务逻辑了...
    Thread-1         结束工作了....
    Thread-5 现在我是最小的....
    Thread-5         开始处理业务逻辑了...
    Thread-5         结束工作了....
    Thread-2 现在我是最小的....
    Thread-2         开始处理业务逻辑了...
    Thread-2         结束工作了....
    Thread-6 现在我是最小的....
    Thread-6         开始处理业务逻辑了...
    Thread-6         结束工作了....
    Thread-9 现在我是最小的....
    Thread-9         开始处理业务逻辑了...
    Thread-9         结束工作了....
    Thread-4 现在我是最小的....
    Thread-4         开始处理业务逻辑了...
    Thread-4         结束工作了....
    Thread-7 现在我是最小的....
    Thread-7         开始处理业务逻辑了...
    Thread-7         结束工作了....
    Thread-3 现在我是最小的....
    Thread-3         开始处理业务逻辑了...
    Thread-3         结束工作了....
    Thread-8 现在我是最小的....
    Thread-8         开始处理业务逻辑了...
    Thread-8         结束工作了....
    Thread-0 现在我是最小的....
    Thread-0         开始处理业务逻辑了...
    Thread-0         结束工作了....
    
    

    总结

    ZK分布式锁,能够有效的解决分布式、不可重入的问题,在上面的案例中我, 没有实现可重入锁,但是实现起来也不麻烦,只需要带上线程信息等唯一标识,判断一下就可以了

    ZK实现分布式锁具有天然的优势,临时顺序节点,可以有效的避免死锁问题,让客户端断开,那么就会删除当前临时节点,让下一个节点进行工作。

    如果文中有错误或者不了解的地方,欢迎留言,小农看见了会第一时间回复大家,大家加油

    我是牧小农,一个卑微的打工人,如果觉得文中的内容对你有帮助,记得一键三连啊,你们的三连是小农最大的动力。

    我是牧小农,怕什么真理无穷,进一步 有进一步的欢喜,大家加油~

  • 相关阅读:
    -bash: fork: Cannot allocate memory 问题的处理
    Docker top 命令
    docker常见问题修复方法
    The Absolute Minimum Every Software Developer Absolutely, Positively Must Know About Unicode and Character Sets (No Excuses!)
    What's the difference between encoding and charset?
    hexcode of é î Latin-1 Supplement
    炉石Advanced rulebook
    炉石bug反馈
    Sidecar pattern
    SQL JOIN
  • 原文地址:https://www.cnblogs.com/mingyueyy/p/15455673.html
Copyright © 2011-2022 走看看