zoukankan      html  css  js  c++  java
  • 【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理

    前言

    前面已经写了Redisson大多的内容,我们再看看Redisson官网共有哪些组件:

    image.pngimage.png

    剩下还有Semaphore和CountDownLatch两块,我们就趁热打铁,赶紧看看Redisson是如何实现的吧。

    我们在JDK中都知道Semaphore和CountDownLatch两兄弟,这里就不多赘述,不了解的可以再回头看看。

    Semaphore使用示例

    先看下Semaphore原理图如下:

    image.pngimage.png

    接着我们看下Redisson中使用的案例:

    RSemaphore semaphore = redisson.getSemaphore("semaphore");
    // 同时最多允许3个线程获取锁
    semaphore.trySetPermits(3);

    for(int i = 0; i < 10; i++) {
      new Thread(new Runnable() {

        @Override
        public void run() {
          try {
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]尝试获取Semaphore锁"); 
            semaphore.acquire();
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]成功获取到了Semaphore锁,开始工作"); 
            Thread.sleep(3000);  
            semaphore.release();
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]释放Semaphore锁"); 
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }).start();
    }

    Semaphore源码解析

    接着我们根据上面的示例,看看源码是如何实现的:

    第一步:
    semaphore.trySetPermits(3);

    public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
        @Override
        public boolean trySetPermits(int permits) {
            return get(trySetPermitsAsync(permits));
        }

        @Override
        public RFuture<Boolean> trySetPermitsAsync(int permits) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false or value == 0) then "
                        + "redis.call('set', KEYS[1], ARGV[1]); "
                        + "redis.call('publish', KEYS[2], ARGV[1]); "
                        + "return 1;"
                    + "end;"
                    + "return 0;",
                    Arrays.<Object>asList(getName(), getChannelName()), permits);
        }

    }

    执行流程为:

    1. get semaphore,获取到一个当前的值
    2. 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3
    3. 然后发布一些消息,返回1

    接着看看semaphore.acquire();semaphore.release(); 逻辑:

    public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
        @Override
        public RFuture<Boolean> tryAcquireAsync(int permits) {
            if (permits < 0) {
                throw new IllegalArgumentException("Permits amount can't be negative");
            }
            if (permits == 0) {
                return RedissonPromise.newSucceededFuture(true);
            }

            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                      "local value = redis.call('get', KEYS[1]); " +
                      "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                          "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                      "end; " +
                      "return 0;",
                      Collections.<Object>singletonList(getName()), permits);
        }

        @Override
        public RFuture<Void> releaseAsync(int permits) {
            if (permits < 0) {
                throw new IllegalArgumentException("Permits amount can't be negative");
            }
            if (permits == 0) {
                return RedissonPromise.newSucceededFuture(null);
            }

            return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
                "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                "redis.call('publish', KEYS[2], value); ",
                Arrays.<Object>asList(getName(), getChannelName()), permits);
        }

    }

    先看看加锁的逻辑tryAcquireAsync()

    1. get semaphore,获取到一个当前的值,比如说是3,3 > 1
    2. decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
    3. decrby semaphore 1
    4. decrby semaphore 1
    5. 执行3次加锁后,semaphore值为0

    此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁,如下图:

    image.pngimage.png

    接着看看解锁逻辑releaseAsync()

    1. incrby semaphore 1,每次一个客户端释放掉这个锁的话,就会将信号量的值累加1,信号量的值就不是0了

    看到这里大家就明白了了,Redisson实现Semaphore其实是很简单了

    CountDownLatch使用示例

    使用案例:

    RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
    latch.trySetCount(3);
    System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。"); 

    for(int i = 0; i < 3; i++) {
      new Thread(new Runnable() {

        @Override
        public void run() {
          try {
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。"); 
            Thread.sleep(3000); 
            RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch");
            localLatch.countDown();
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作"); 
          } catch (Exception e) {
            e.printStackTrace(); 
          }
        }

      }).start();
    }

    latch.await();
    System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走"); 

    CountDownLatch 源码解析

    源码如下:

     public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

        @Override
        public RFuture<Boolean> trySetCountAsync(long count) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if redis.call('exists', KEYS[1]) == 0 then "
                        + "redis.call('set', KEYS[1], ARGV[2]); "
                        + "redis.call('publish', KEYS[2], ARGV[1]); "
                        + "return 1 "
                    + "else "
                        + "return 0 "
                    + "end",
                    Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
        }

        @Override
        public RFuture<Void> countDownAsync() {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "local v = redis.call('decr', KEYS[1]);" +
                            "if v <= 0 then redis.call('del', KEYS[1]) end;" +
                            "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
                        Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
        }



    }

    先分析trySetCount()方法逻辑:

    1. exists anyCountDownLatch,第一次肯定是不存在的
    2. set redisson_countdownlatch__channel__anyCountDownLatch 3
    3. 返回1

    接着分析latch.await();方法,如下图:

    image.pngimage.png

    这个方法其实就是陷入一个while true死循环,不断的get anyCountDownLatch的值,如果这个值还是大于0那么就继续死循环,否则的话呢,就退出这个死循环

    最后分析localLatch.countDown();方法:

    1. decr anyCountDownLatch,就是每次一个客户端执行countDown操作,其实就是将这个cocuntDownLatch的值递减1
    2. await()方面已经分析过,死循环去判断anyCountDownLatch对应存储的值是否为0,如果为0则接着执行自己的逻辑

    总结

    看到了这里 这两个组件是不是很简单?

    到了这里,Redisson部分的学习都已经结束了,后面还会学习ZK实现分布式锁的原理。

    申明

    本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

  • 相关阅读:
    sublime text 4 vim 插件配置
    ssh-keygen 的使用
    distribution transaction solution
    bilibili 大数据 视频下载 you-get
    Deepin 20.2.1 安装 MS SQL 2019 容器版本
    【转】使用Linux下Docker部署MSSQL并加载主机目录下的数据库
    【转】You Can Now Use OneDrive in Linux Natively Thanks to Insync
    dotnet 诊断工具安装命令
    Linux 使用 xrandr 设置屏幕分辨率
    【转】CentOS 7.9 2009 ISO 官方原版镜像下载
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12548492.html
Copyright © 2011-2022 走看看