zoukankan      html  css  js  c++  java
  • 【分布式锁】05-使用Redisson中Semaphore和CountDownLatch原理

    前言

    前面已经写了Redisson大多的内容,我们再看看Redisson官网共有哪些组件:

    image.pngimage.png

    剩下还有Semaphore和CountDownLatch两块,我们就趁热打铁,赶紧看看Redisson是如何实现的吧。

    我们在JDK中都知道Semaphore和CountDownLatch两兄弟,这里就不多赘述,不了解的可以再回头看看。

    Semaphore使用示例

    先看下Semaphore原理图如下:

    image.pngimage.png

    接着我们看下Redisson中使用的案例:

    RSemaphore semaphore = redisson.getSemaphore("semaphore");
    // 同时最多允许3个线程获取锁
    semaphore.trySetPermits(3);

    for(int i = 0; i < 10; i++) {
      new Thread(new Runnable() {

        @Override
        public void run() {
          try {
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]尝试获取Semaphore锁"); 
            semaphore.acquire();
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]成功获取到了Semaphore锁,开始工作"); 
            Thread.sleep(3000);  
            semaphore.release();
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]释放Semaphore锁"); 
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      }).start();
    }

    Semaphore源码解析

    接着我们根据上面的示例,看看源码是如何实现的:

    第一步:
    semaphore.trySetPermits(3);

    public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
        @Override
        public boolean trySetPermits(int permits) {
            return get(trySetPermitsAsync(permits));
        }

        @Override
        public RFuture<Boolean> trySetPermitsAsync(int permits) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "local value = redis.call('get', KEYS[1]); " +
                    "if (value == false or value == 0) then "
                        + "redis.call('set', KEYS[1], ARGV[1]); "
                        + "redis.call('publish', KEYS[2], ARGV[1]); "
                        + "return 1;"
                    + "end;"
                    + "return 0;",
                    Arrays.<Object>asList(getName(), getChannelName()), permits);
        }

    }

    执行流程为:

    1. get semaphore,获取到一个当前的值
    2. 第一次数据为0, 然后使用set semaphore 3,将这个信号量同时能够允许获取锁的客户端的数量设置为3
    3. 然后发布一些消息,返回1

    接着看看semaphore.acquire();semaphore.release(); 逻辑:

    public class RedissonSemaphore extends RedissonExpirable implements RSemaphore {
        @Override
        public RFuture<Boolean> tryAcquireAsync(int permits) {
            if (permits < 0) {
                throw new IllegalArgumentException("Permits amount can't be negative");
            }
            if (permits == 0) {
                return RedissonPromise.newSucceededFuture(true);
            }

            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                      "local value = redis.call('get', KEYS[1]); " +
                      "if (value ~= false and tonumber(value) >= tonumber(ARGV[1])) then " +
                          "local val = redis.call('decrby', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                      "end; " +
                      "return 0;",
                      Collections.<Object>singletonList(getName()), permits);
        }

        @Override
        public RFuture<Void> releaseAsync(int permits) {
            if (permits < 0) {
                throw new IllegalArgumentException("Permits amount can't be negative");
            }
            if (permits == 0) {
                return RedissonPromise.newSucceededFuture(null);
            }

            return commandExecutor.evalWriteAsync(getName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,
                "local value = redis.call('incrby', KEYS[1], ARGV[1]); " +
                "redis.call('publish', KEYS[2], value); ",
                Arrays.<Object>asList(getName(), getChannelName()), permits);
        }

    }

    先看看加锁的逻辑tryAcquireAsync()

    1. get semaphore,获取到一个当前的值,比如说是3,3 > 1
    2. decrby semaphore 1,将信号量允许获取锁的客户端的数量递减1,变成2
    3. decrby semaphore 1
    4. decrby semaphore 1
    5. 执行3次加锁后,semaphore值为0

    此时如果再来进行加锁则直接返回0,然后进入死循环去获取锁,如下图:

    image.pngimage.png

    接着看看解锁逻辑releaseAsync()

    1. incrby semaphore 1,每次一个客户端释放掉这个锁的话,就会将信号量的值累加1,信号量的值就不是0了

    看到这里大家就明白了了,Redisson实现Semaphore其实是很简单了

    CountDownLatch使用示例

    使用案例:

    RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
    latch.trySetCount(3);
    System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]设置了必须有3个线程执行countDown,进入等待中。。。"); 

    for(int i = 0; i < 3; i++) {
      new Thread(new Runnable() {

        @Override
        public void run() {
          try {
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]在做一些操作,请耐心等待。。。。。。"); 
            Thread.sleep(3000); 
            RCountDownLatch localLatch = redisson.getCountDownLatch("anyCountDownLatch");
            localLatch.countDown();
            System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]执行countDown操作"); 
          } catch (Exception e) {
            e.printStackTrace(); 
          }
        }

      }).start();
    }

    latch.await();
    System.out.println(new Date() + ":线程[" + Thread.currentThread().getName() + "]收到通知,有3个线程都执行了countDown操作,可以继续往下走"); 

    CountDownLatch 源码解析

    源码如下:

     public class RedissonCountDownLatch extends RedissonObject implements RCountDownLatch {

        @Override
        public RFuture<Boolean> trySetCountAsync(long count) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if redis.call('exists', KEYS[1]) == 0 then "
                        + "redis.call('set', KEYS[1], ARGV[2]); "
                        + "redis.call('publish', KEYS[2], ARGV[1]); "
                        + "return 1 "
                    + "else "
                        + "return 0 "
                    + "end",
                    Arrays.<Object>asList(getName(), getChannelName()), newCountMessage, count);
        }

        @Override
        public RFuture<Void> countDownAsync() {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                            "local v = redis.call('decr', KEYS[1]);" +
                            "if v <= 0 then redis.call('del', KEYS[1]) end;" +
                            "if v == 0 then redis.call('publish', KEYS[2], ARGV[1]) end;",
                        Arrays.<Object>asList(getName(), getChannelName()), zeroCountMessage);
        }



    }

    先分析trySetCount()方法逻辑:

    1. exists anyCountDownLatch,第一次肯定是不存在的
    2. set redisson_countdownlatch__channel__anyCountDownLatch 3
    3. 返回1

    接着分析latch.await();方法,如下图:

    image.pngimage.png

    这个方法其实就是陷入一个while true死循环,不断的get anyCountDownLatch的值,如果这个值还是大于0那么就继续死循环,否则的话呢,就退出这个死循环

    最后分析localLatch.countDown();方法:

    1. decr anyCountDownLatch,就是每次一个客户端执行countDown操作,其实就是将这个cocuntDownLatch的值递减1
    2. await()方面已经分析过,死循环去判断anyCountDownLatch对应存储的值是否为0,如果为0则接着执行自己的逻辑

    总结

    看到了这里 这两个组件是不是很简单?

    到了这里,Redisson部分的学习都已经结束了,后面还会学习ZK实现分布式锁的原理。

    申明

    本文章首发自本人博客:https://www.cnblogs.com/wang-meng 和公众号:壹枝花算不算浪漫,如若转载请标明来源!

    感兴趣的小伙伴可关注个人公众号:壹枝花算不算浪漫

  • 相关阅读:
    计划任务和压缩归档
    libevent 源码学习三 —— 基本使用场景和事件流程
    libevent 源码学习二 —— reactor 模式
    libevent 库源码学习
    手动配置固定IP参数vim vim
    软件相关
    写xhttpd服务器时 遇到segmentation fault
    c与c++中输出字符指针和字符串指针的问题
    char * argv[] ,string简析
    传入参数与传出参数
  • 原文地址:https://www.cnblogs.com/wang-meng/p/12548492.html
Copyright © 2011-2022 走看看