zoukankan      html  css  js  c++  java
  • ZooKeeper学习(二)ZooKeeper实现分布式锁

    一、简介

      在日常开发过程中,大型的项目一般都会采用分布式架构,那么在分布式架构中若需要同时对一个变量进行操作时,可以采用分布式锁来解决变量访问冲突的问题,最典型的案例就是防止库存超卖,当然还有其他很多的控制方式,这篇文章我们讨论一下怎么使用ZooKeeper来实现分布式锁。

    二、Curator

      前面提到的分布式锁,在ZooKeeper中可以通过Curator来实现。

      定义:Curator是Netflix公司开源的一套zookeeper客户端框架,解决了很多Zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等等。

    三、尝试写一个分布式锁

    开发思路

      在第一篇文章中,了解了ZooKeeper节点的概念,实现分布式锁的基本思路也是基于对节点的监听与操作从而实现的。

    • 1、创建一个父节点,并对父节点设置监听事件,实际加锁的对象为父节点下的子节点。
    • 2、若父节点下存在临时子节点,则获取锁失败,不存在子节点时,则各个线程可尝试争夺锁。
    • 3、业务逻辑执行完毕后会删除临时子节点,此时下一个进程进入时发现没有存在子节点,则创建子节点并获取锁

    四、写一个工具类

    pom.xml

    <!--Zookeeper实现分布式锁的工具curator start -->
    <dependency>
        <groupId>org.apache.zookeeper</groupId>
        <artifactId>zookeeper</artifactId>
        <version>3.4.10</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.8.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>2.8.0</version>
    </dependency>
    <!--Zookeeper实现分布式锁的工具curator end -->

    application.yml

    #zookeeper分布式锁curator服务配置
    curator:
      retryCount: 5 #重试次数
      elapsedTimeMs: 5000  #重试间隔时间
      connectString: 127.0.0.1:2181   # zookeeper 地址
      sessionTimeoutMs: 60000  # session超时时间
      connectionTimeoutMs: 5000  # 连接超时时间

    配置类

    /**
     * ZK的属性
     */
    @Data
    @Component
    @ConfigurationProperties(prefix = "curator")//获取application.yml配置的值
    public class ZkProperties {
    
        private int retryCount;//重试次数
    
        private int elapsedTimeMs;//重试间隔时间
    
        private String connectString;//zookeeper 地址
    
        private int sessionTimeoutMs;//session超时时间
    
        private int connectionTimeoutMs;//连接超时时间
    }
    /**
     * ZK的属性配置
     */
    @Configuration//标识这是一个配置类
    public class ZkConfiguration {
    
        @Autowired
        ZkProperties zkProperties;
    
        @Bean(initMethod = "start")
        public CuratorFramework curatorFramework() {
            return CuratorFrameworkFactory.newClient(
                    zkProperties.getConnectString(),
                    zkProperties.getSessionTimeoutMs(),
                    zkProperties.getConnectionTimeoutMs(),
                    new RetryNTimes(zkProperties.getRetryCount(), zkProperties.getElapsedTimeMs()));
        }
    }

    分布式锁工具

    /**
     * 分布式锁工具类
     * 【类解析】
     * 1、InitializingBean接口为bean提供了初始化方法的方式。
     * 2、它只包括afterPropertiesSet方法,凡是继承该接口的类,在初始化bean的时候都会执行该方法。
     * 【锁原理】
     * 1、创建一个父节点,并对父节点设置监听事件,实际加锁的对象为父节点下的子节点。
     * 2、若父节点下存在临时子节点,则获取锁失败。不存在子节点时,则各个线程可尝试争夺锁。
     * 3、业务逻辑执行完毕后会删除临时子节点,此时下一个进程进入时发现没有存在子节点,则创建子节点并获取锁
     */
    @Slf4j
    @Service
    public class DistributedLockByZookeeperUtil implements InitializingBean {
    
        private final static String ROOT_PATH_LOCK = "rootlock";//父节点路径
        private CountDownLatch countDownLatch = new CountDownLatch(1);//节点计数器
    
        @Autowired
        private CuratorFramework curatorFramework;
    
        /**
         * 获取分布式锁
         */
        public void acquireDistributedLock(String path) {
            String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            //1、一直循环等待获取锁
            while (true) {
                try {
                    //2、尝试创建子节点,若子节点已经存在,则创建异常,并进入catch块代码
                    curatorFramework
                            .create()//创建节点
                            .creatingParentsIfNeeded()//如果父节点不存在,则在创建节点的同时创建父节点
                            .withMode(CreateMode.EPHEMERAL)//【临时节点】创建后,会话结束节点会自动删除
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//【接入权限】任何链接都可以操作该节点
                            .forPath(keyPath);//对应的操作路径
                    log.info("获取分布式锁成功!路径为:{}", keyPath);
                    break;
                } catch (Exception e) {
                    //3、创建子节点失败时,即获取锁失败
                    log.info("获取分布式锁失败!路径为:{}", keyPath);
                    log.info("等待重新获取锁.......");
                    try {
                        if (countDownLatch.getCount() <= 0) {
                            countDownLatch = new CountDownLatch(1);//重置计数器
                        }
                        //4、从新挂起当前线程,调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
                        countDownLatch.await();
                    } catch (InterruptedException e1) {
                        e1.printStackTrace();
                    }
                }
            }
        }
    
        /**
         * 释放分布式锁
         */
        public boolean releaseDistributedLock(String path) {
            try {
                String keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
                //1、查看当前节点是否已经存在
                if (curatorFramework.checkExists().forPath(keyPath) != null) {
                    //2、若子节点存在,则删除子节点,即释放锁
                    curatorFramework.delete().forPath(keyPath);
                }
            } catch (Exception e) {
                log.error("释放分布式锁错误!");
                return false;
            }
            return true;
        }
    
        /**
         * 创建 watcher 事件
         */
        private void addWatcher(String path) throws Exception {
            String keyPath;
            if (path.equals(ROOT_PATH_LOCK)) {
                keyPath = "/" + path;
            } else {
                keyPath = "/" + ROOT_PATH_LOCK + "/" + path;
            }
            //1、创建子节点监听事件
            final PathChildrenCache cache = new PathChildrenCache(curatorFramework, keyPath, false);
            //2、设置监听器初始化模式:异步初始化。初始化后会触发事件。
            cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
            //3、创建监听事件
            cache.getListenable().addListener((client, event) -> {
                //4、当发生子节点移除事件时,进入if内逻辑
                if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
                    String oldPath = event.getData().getPath();
                    log.info("上一个节点 " + oldPath + " 已经被断开");
                    //5、移除的节点为监听节点的子节点时,即路径包含父节点时,进入if内逻辑
                    if (oldPath.contains(path)) {
                        //6、释放计数器,让当前的请求获取锁
                        countDownLatch.countDown();
                    }
                }
            });
        }
    
        /**
         * 创建父节点,并创建永久节点
         * PS:在所有的属性被初始化后调用此方法,创建父节点
         */
        @Override
        public void afterPropertiesSet() {
            //1、指定命名空间
            curatorFramework = curatorFramework.usingNamespace("lock-namespace");
            //2、下面代码逻辑的父节点路径
            String path = "/" + ROOT_PATH_LOCK;
            try {
                //3、父节点不存在时,创建父节点
                if (curatorFramework.checkExists().forPath(path) == null) {
                    curatorFramework
                            .create()//创建节点
                            .creatingParentsIfNeeded()//如果父节点不存在,则在创建节点的同时创建父节点
                            .withMode(CreateMode.PERSISTENT)//【持久化节点】客户端与zookeeper断开连接后,该节点依旧存在
                            .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)//【接入权限】任何链接都可以操作该节点
                            .forPath(path);//对应的操作路径
                }
                //4、添加对父节点的监听事件
                addWatcher(ROOT_PATH_LOCK);
                log.info("root path 的 watcher 事件创建成功");
            } catch (Exception e) {
                log.error("连接zookeeper失败,请查看日志 >> {}", e.getMessage(), e);
            }
        }
    }

    测试类

    /**
     * 测试类
     */
    @RestController
    @RequestMapping("/test")
    public class TestController {
    
        @Autowired
        private DistributedLockByZookeeperUtil distributedLockByZookeeper;//分布式锁工具类
        @Autowired
        private IUserInfoService iUserInfoService;//业务类
    
        private final static String PATH = "test";//子节点对应路径(PS:在锁工具里面会拼接完整路径)
    
        @GetMapping("/doSomeThings")
        public boolean doSomeThings() {
            /*1、获取锁*/
            Boolean flag;//是否已经释放锁 释放成功:true , 释放失败:false
            distributedLockByZookeeper.acquireDistributedLock(PATH);//获取锁
            /*2、业务代码块*/
            try {
                iUserInfoService.update();
                UserInfoVO vo = iUserInfoService.querySingleVO(1);
                System.out.println("剩余库存为:" + vo.getCreateStaff());
            } catch (Exception e) {
                e.printStackTrace();
                //业务代码报错时及时释放锁
                flag = distributedLockByZookeeper.releaseDistributedLock(PATH);
            }
            /*3、释放锁*/
            flag = distributedLockByZookeeper.releaseDistributedLock(PATH);//执行成功释放锁
            return flag;
        }
        
    }

    五、压测方法

      压测的方法有很多,我使用的是Jmeter来进行并发调用测试类代码,测试结果分布式锁有效,这里不再写压测过程,感兴趣的亲可以看下文末的文章推荐。

    参考文章:

  • 相关阅读:
    周末总结
    大数据开源框架技术汇总
    oracle迁移mysql总结
    梯度下降
    BFC的概念
    元素类型
    window10安装tensorflow
    学习使用git
    设计模式中的关系
    拟合圆
  • 原文地址:https://www.cnblogs.com/riches/p/12329257.html
Copyright © 2011-2022 走看看