  Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)



    随着技术快速发展,数据规模增大,分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。



    基于数据库基于mysql 表唯一索引1.表增加唯一索引
    基于MongoDB findAndModify原子操作1.加锁:执行findAndModify原子命令查找document,若不存在则新增
    基于分布式协调系统基于ZooKeeper1.加锁:在/lock目录下创建临时有序节点,判断创建的节点序号是否最小。若是,则表示获取到锁;否,则则watch /lock目录下序号比自身小的前一个节点
    基于缓存基于redis命令1. 加锁:执行setnx,若成功再执行expire添加过期时间
    2. 解锁:执行delete命令
    基于redis Lua脚本能力1. 加锁:执行SET lock_name random_value EX seconds NX 命令

    2. 解锁:执行Lua脚本,释放锁时验证random_value 
    -- ARGV[1]为random_value,  KEYS[1]为lock_name

    if redis.call("get", KEYS[1]) == ARGV[1] then

        return redis.call("del",KEYS[1])


        return 0






    1. 互斥性。在任意时刻,只有一个客户端能持有锁。
    2. 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
    3. 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
    4. 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。


    Redisson 分布式重入锁用法

    Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例:

    1. // 1.构造redisson实现分布式锁必要的Config
    2. Config config = new Config();
    3. config.useSingleServer().setAddress("redis://").setPassword("123456").setDatabase(0);
    4. // 2.构造RedissonClient
    5. RedissonClient redissonClient = Redisson.create(config);
    6. // 3.获取锁对象实例(无法保证是按线程的顺序获取到)
    7. RLock rLock = redissonClient.getLock(lockKey);
    8. try {
    9. /**
    10. * 4.尝试获取锁
    11. * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
    12. * leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
    13. */
    14. boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    15. if (res) {
    16. //成功获得锁,在这里处理业务
    17. }
    18. } catch (Exception e) {
    19. throw new RuntimeException("aquire lock fail");
    20. }finally{
    21. //无论如何, 最后都要解锁
    22. rLock.unlock();
    23. }






    • 脚本入参
    • 脚本内容
    1. -- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
    2. if (redis.call('exists', KEYS[1]) == 0) then
    3. redis.call('hset', KEYS[1], ARGV[2], 1);
    4. redis.call('pexpire', KEYS[1], ARGV[1]);
    5. return nil;
    6. end;
    7. -- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
    8. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    9. redis.call('hincrby', KEYS[1], ARGV[2], 1);
    10. redis.call('pexpire', KEYS[1], ARGV[1]);
    11. return nil;
    12. end;
    13. -- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
    14. return redis.call('pttl', KEYS[1]);
    • 脚本解读



    • 脚本入参
    • 脚本内容
    1. -- 若锁不存在:则直接广播解锁消息,并返回1
    2. if (redis.call('exists', KEYS[1]) == 0) then
    3. redis.call('publish', KEYS[2], ARGV[1]);
    4. return 1;
    5. end;
    6. -- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
    7. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    8. return nil;
    9. end;
    10. -- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
    11. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
    12. if (counter > 0) then
    13. -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
    14. redis.call('pexpire', KEYS[1], ARGV[2]);
    15. return 0;
    16. else
    17. -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
    18. redis.call('del', KEYS[1]);
    19. redis.call('publish', KEYS[2], ARGV[1]);
    20. return 1;
    21. end;
    22. return nil;
    • 脚本解读





    读加锁源码时,可以把tryAcquire(leaseTime, unit, threadId)方法直接视为执行加锁Lua脚本。直接进入org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)源码

    1. @Override
    2. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    3. // 获取锁能容忍的最大等待时长
    4. long time = unit.toMillis(waitTime);
    5. long current = System.currentTimeMillis();
    6. final long threadId = Thread.currentThread().getId();
    7. // 【核心点1】尝试获取锁,若返回值为null,则表示已获取到锁
    8. Long ttl = tryAcquire(leaseTime, unit, threadId);
    9. // lock acquired
    10. if (ttl == null) {
    11. return true;
    12. }
    13. // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
    14. time -= (System.currentTimeMillis() - current);
    15. if (time <= 0) {
    16. acquireFailed(threadId);
    17. return false;
    18. }
    19. current = System.currentTimeMillis();
    20. // 【核心点2】订阅解锁消息,见org.redisson.pubsub.LockPubSub#onMessage
    21. /**
    22. * 4.订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题:
    23. * 基于信息量,当锁被其它资源占用时,当前线程通过 Redis 的 channel 订阅锁的释放事件,一旦锁释放会发消息通知待等待的线程进行竞争
    24. * 当 this.await返回false,说明等待时间已经超出获取锁最大等待时间,取消订阅并返回获取锁失败
    25. * 当 this.await返回true,进入循环尝试获取锁
    26. */
    27. final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    28. //await 方法内部是用CountDownLatch来实现阻塞,获取subscribe异步执行的结果(应用了Netty 的 Future)
    29. if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
    30. if (!subscribeFuture.cancel(false)) {
    31. subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
    32. @Override
    33. public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
    34. if (subscribeFuture.isSuccess()) {
    35. unsubscribe(subscribeFuture, threadId);
    36. }
    37. }
    38. });
    39. }
    40. acquireFailed(threadId);
    41. return false;
    42. }
    43. // 订阅成功
    44. try {
    45. // 还可以容忍的等待时长=获取锁能容忍的最大等待时长 - 执行完上述操作流逝的时间
    46. time -= (System.currentTimeMillis() - current);
    47. if (time <= 0) {
    48. // 超出可容忍的等待时长,直接返回获取锁失败
    49. acquireFailed(threadId);
    50. return false;
    51. }
    52. while (true) {
    53. long currentTime = System.currentTimeMillis();
    54. // 尝试获取锁;如果锁被其他线程占用,就返回锁剩余过期时间【同上】
    55. ttl = tryAcquire(leaseTime, unit, threadId);
    56. // lock acquired
    57. if (ttl == null) {
    58. return true;
    59. }
    60. time -= (System.currentTimeMillis() - currentTime);
    61. if (time <= 0) {
    62. acquireFailed(threadId);
    63. return false;
    64. }
    65. // waiting for message
    66. currentTime = System.currentTimeMillis();
    67. // 【核心点3】根据锁TTL,调整阻塞等待时长;
    68. // 注意:这里实现非常巧妙,1、latch其实是个信号量Semaphore,调用其tryAcquire方法会让当前线程阻塞一段时间,避免了在while循环中频繁请求获取锁;
    69. //2、该Semaphore的release方法,会在订阅解锁消息的监听器消息处理方法org.redisson.pubsub.LockPubSub#onMessage调用;当其他线程释放了占用的锁,会广播解锁消息,监听器接收解锁消息,并释放信号量,最终会唤醒阻塞在这里的线程。
    70. if (ttl >= 0 && ttl < time) {
    71. getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    72. } else {
    73. getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
    74. }
    75. time -= (System.currentTimeMillis() - currentTime);
    76. if (time <= 0) {
    77. acquireFailed(threadId);
    78. return false;
    79. }
    80. }
    81. } finally {
    82. // 取消解锁消息的订阅
    83. unsubscribe(subscribeFuture, threadId);
    84. }
    85. }

    接下的再获取锁方法 tryAcquire的实现,真的就是执行Lua脚本!

    1. private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    2. // tryAcquireAsync异步执行Lua脚本,get方法同步获取返回结果
    3. return get(tryAcquireAsync(leaseTime, unit, threadId));
    4. }
    5. // 见org.redisson.RedissonLock#tryAcquireAsync
    6. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    7. if (leaseTime != -1) {
    8. // 实质是异步执行加锁Lua脚本
    9. return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    10. }
    11. RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    12. ttlRemainingFuture.addListener(new FutureListener<Long>() {
    13. @Override
    14. public void operationComplete(Future<Long> future) throws Exception {
    15. //先判断这个异步操作有没有执行成功,如果没有成功,直接返回,如果执行成功了,就会同步获取结果
    16. if (!future.isSuccess()) {
    17. return;
    18. }
    19. Long ttlRemaining = future.getNow();
    20. // lock acquired
    21. //如果ttlRemaining为null,则会执行一个定时调度的方法scheduleExpirationRenewal
    22. if (ttlRemaining == null) {
    23. scheduleExpirationRenewal(threadId);
    24. }
    25. }
    26. });
    27. return ttlRemainingFuture;
    28. }
    29. // 见org.redisson.RedissonLock#tryLockInnerAsync
    30. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    31. internalLockLeaseTime = unit.toMillis(leaseTime);
    32. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
    33. "if (redis.call('exists', KEYS[1]) == 0) then " +
    34. "redis.call('hset', KEYS[1], ARGV[2], 1); " +
    35. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    36. "return nil; " +
    37. "end; " +
    38. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    39. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    40. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    41. "return nil; " +
    42. "end; " +
    43. "return redis.call('pttl', KEYS[1]);",
    44. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    45. }




    比如 RedissonLock中的变量internalLockLeaseTime,默认值是30000毫秒,还有调用tryLockInnerAsync()传入的一个从连接管理器获取的getLockWatchdogTimeout(),他的默认值也是30000毫秒,这些都和redisson官方文档所说的watchdog机制有关,看门狗,还是很形象的描述这一机制,那么看门狗到底做了什么,为什么这么做,来看下核心代码.

    处理结束,返回了一个nil;如此说来 就一定会走定时任务了。来看下定时调度scheduleExpirationRenewal代码

    1. private void scheduleExpirationRenewal(final long threadId) {
    2. if (expirationRenewalMap.containsKey(getEntryName())) {
    3. return;
    4. }
    5. Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
    6. @Override
    7. public void run(Timeout timeout) throws Exception {
    8. RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    9. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    10. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    11. "return 1; " +
    12. "end; " +
    13. "return 0;",
    14. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    15. future.addListener(new FutureListener<Boolean>() {
    16. @Override
    17. public void operationComplete(Future<Boolean> future) throws Exception {
    18. expirationRenewalMap.remove(getEntryName());
    19. if (!future.isSuccess()) {
    20. log.error("Can't update lock " + getName() + " expiration", future.cause());
    21. return;
    22. }
    23. if (future.getNow()) {
    24. // reschedule itself
    25. scheduleExpirationRenewal(threadId);
    26. }
    27. }
    28. });
    29. }
    30. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    31. if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
    32. task.cancel();
    33. }
    34. }







    1. @Override
    2. public void unlock() {
    3. // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁
    4. Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    5. if (opStatus == null) {
    6. throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
    7. + id + " thread-id: " + Thread.currentThread().getId());
    8. }
    9. if (opStatus) {
    10. cancelExpirationRenewal();
    11. }
    12. }
    13. // 见org.redisson.RedissonLock#unlockInnerAsync
    14. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    15. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
    16. "if (redis.call('exists', KEYS[1]) == 0) then " +
    17. "redis.call('publish', KEYS[2], ARGV[1]); " +
    18. "return 1; " +
    19. "end;" +
    20. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
    21. "return nil;" +
    22. "end; " +
    23. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
    24. "if (counter > 0) then " +
    25. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
    26. "return 0; " +
    27. "else " +
    28. "redis.call('del', KEYS[1]); " +
    29. "redis.call('publish', KEYS[2], ARGV[1]); " +
    30. "return 1; "+
    31. "end; " +
    32. "return nil;",
    33. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    34. }








    1.在redis集群方案中,如果Lua脚本涉及多个key的操作,则需限制这些key在同一个slot中,才能保障Lua脚本执行的原子性。否则运行会报错Lua script attempted to access a non local key in a cluster node . channel; 

    1. // 见org.redisson.cluster.ClusterConnectionManager#calcSlot
    2. @Override
    3. public int calcSlot(String key) {
    4. if (key == null) {
    5. return 0;
    6. }
    7. int start = key.indexOf('{');
    8. if (start != -1) {
    9. int end = key.indexOf('}');
    10. key = key.substring(start+1, end);
    11. }
    12. int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    13. log.debug("slot {} for {}", result, key);
    14. return result;
    15. }


    A:启动一个redis客户端终端,执行monitor命令以便在终端上实时打印 redis 服务器接收到的命令;然后debug执行redisson加锁/解锁测试用例,即可看到代码运行过程中实际执行了哪些Lua脚本


    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. public class RedissonDistributedLockerTest {
    4. private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
    5. @Resource
    6. private DistributedLocker distributedLocker;
    7. private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
    8. private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
    9. @Test
    10. public void tryLockUnlockCost() throws Exception {
    11. StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");
    12. stopWatch.start();
    13. for (int i = 0; i < 10000; i++) {
    14. String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
    15. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
    16. Assert.assertTrue(optLocked.isPresent());
    17. optLocked.get().unlock();
    18. }
    19. stopWatch.stop();
    20. log.info(stopWatch.prettyPrint());
    21. }
    22. @Test
    23. public void tryLock() throws Exception {
    24. String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
    25. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
    26. Assert.assertTrue(optLocked.isPresent());
    27. Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
    28. Assert.assertTrue(optLocked2.isPresent());
    29. optLocked.get().unlock();
    30. }
    31. /**
    32. * 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁
    33. */
    34. @Test
    35. public void tryLock2() throws Exception {
    36. String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
    37. CountDownLatch countDownLatch = new CountDownLatch(1);
    38. Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
    39. countDownLatch.await();
    40. log.info("B尝试获得锁:thread={}", currentThreadId());
    41. return distributedLocker.tryLock(key, 600000, 600000);
    42. }
    43. );
    44. log.info("A尝试获得锁:thread={}", currentThreadId());
    45. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
    46. Assert.assertTrue(optLocked.isPresent());
    47. log.info("A已获得锁:thread={}", currentThreadId());
    48. countDownLatch.countDown();
    49. optLocked.get().unlock();
    50. log.info("A已释放锁:thread={}", currentThreadId());
    51. Optional<LockResource> lockResource2 = submit.get();
    52. Assert.assertTrue(lockResource2.isPresent());
    53. executorServiceB.submit(() -> {
    54. log.info("B已获得锁:thread={}", currentThreadId());
    55. lockResource2.get().unlock();
    56. log.info("B已释放锁:thread={}", currentThreadId());
    57. });
    58. }
    59. /**
    60. * 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁
    61. */
    62. @Test
    63. public void tryLock3() throws Exception {
    64. String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
    65. log.info("A尝试获得锁:thread={}", currentThreadId());
    66. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
    67. if (optLocked.isPresent()) {
    68. log.info("A已获得锁:thread={}", currentThreadId());
    69. }
    70. Assert.assertTrue(optLocked.isPresent());
    71. CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
    72. Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
    73. cyclicBarrier.await();
    74. log.info("B尝试获得锁:thread={}", currentThreadId());
    75. return distributedLocker.tryLock(key, 600000, 600000);
    76. }
    77. );
    78. Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
    79. cyclicBarrier.await();
    80. log.info("C尝试获得锁:thread={}", currentThreadId());
    81. return distributedLocker.tryLock(key, 600000, 600000);
    82. }
    83. );
    84. optLocked.get().unlock();
    85. log.info("A已释放锁:thread={}", currentThreadId());
    86. CountDownLatch countDownLatch = new CountDownLatch(2);
    87. executorServiceB.submit(() -> {
    88. log.info("B已获得锁:thread={}", currentThreadId());
    89. try {
    90. submitB.get().get().unlock();
    91. } catch (InterruptedException | ExecutionException e) {
    92. e.printStackTrace();
    93. }
    94. log.info("B已释放锁:thread={}", currentThreadId());
    95. countDownLatch.countDown();
    96. });
    97. executorServiceC.submit(() -> {
    98. log.info("C已获得锁:thread={}", currentThreadId());
    99. try {
    100. submitC.get().get().unlock();
    101. } catch (InterruptedException | ExecutionException e) {
    102. e.printStackTrace();
    103. }
    104. log.info("C已释放锁:thread={}", currentThreadId());
    105. countDownLatch.countDown();
    106. });
    107. countDownLatch.await();
    108. }
    109. private static Long currentThreadId() {
    110. return Thread.currentThread().getId();
    111. }
    112. @Test
    113. public void tryLockWaitTimeout() throws Exception {
    114. String key = "mock-key:" + UUID.randomUUID().toString();
    115. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
    116. Assert.assertTrue(optLocked.isPresent());
    117. Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
    118. long now = System.currentTimeMillis();
    119. Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
    120. long cost = System.currentTimeMillis() - now;
    121. log.info("cost={}", cost);
    122. return optLockedAgain;
    123. }).exceptionally(th -> {
    124. log.error("Exception: ", th);
    125. return Optional.empty();
    126. }).join();
    127. Assert.assertTrue(!optLockResource.isPresent());
    128. }
    129. @Test
    130. public void tryLockWithLeaseTime() throws Exception {
    131. String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
    132. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
    133. Assert.assertTrue(optLocked.isPresent());
    134. // 可重入
    135. Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
    136. Assert.assertTrue(optLockedAgain.isPresent());
    137. }
    138. /**
    139. * 模拟1000个并发请求枪一把锁
    140. */
    141. @Test
    142. public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
    143. int totalThread = 1000;
    144. String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
    145. AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
    146. AtomicInteger acquiredLockTimes = new AtomicInteger(0);
    147. ExecutorService executor = Executors.newFixedThreadPool(totalThread);
    148. for (int i = 0; i < totalThread; i++) {
    149. executor.submit(new Runnable() {
    150. @Override
    151. public void run() {
    152. tryAcquireLockTimes.getAndIncrement();
    153. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
    154. if (optLocked.isPresent()) {
    155. acquiredLockTimes.getAndIncrement();
    156. }
    157. }
    158. });
    159. }
    160. executor.awaitTermination(15, TimeUnit.SECONDS);
    161. Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
    162. Assert.assertTrue(acquiredLockTimes.get() == 1);
    163. }
    164. @Test
    165. public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
    166. int totalThread = 100;
    167. String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
    168. AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
    169. AtomicInteger acquiredLockTimes = new AtomicInteger(0);
    170. ExecutorService executor = Executors.newFixedThreadPool(totalThread);
    171. for (int i = 0; i < totalThread; i++) {
    172. executor.submit(new Runnable() {
    173. @Override
    174. public void run() {
    175. long now = System.currentTimeMillis();
    176. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
    177. long cost = System.currentTimeMillis() - now;
    178. log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
    179. if (optLocked.isPresent()) {
    180. acquiredLockTimes.getAndIncrement();
    181. // 主动释放锁
    182. optLocked.get().unlock();
    183. }
    184. }
    185. });
    186. }
    187. executor.awaitTermination(20, TimeUnit.SECONDS);
    188. log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
    189. Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
    190. Assert.assertTrue(acquiredLockTimes.get() == totalThread);
    191. }
    192. }
    193. public interface DistributedLocker {
    194. Optional<LockResource> tryLock(String lockKey, int waitTime);
    195. Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
    196. }
    197. public interface LockResource {
    198. void unlock();
    199. }


    加锁:redissonClient.getLock("my_first_lock_name").tryLock(600000, 600000); 

    1. # 线程A
    2. ## 1.1.1尝试获取锁 -> 成功
    3. 1568357723.205362 [0] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
    4. 1568357723.205452 [0 lua] "exists" "my_first_lock_name"
    5. 1568357723.208858 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
    6. 1568357723.208874 [0 lua] "pexpire" "my_first_lock_name" "600000"
    7. # 线程B
    8. ### 2.1.1尝试获取锁,未获取到,返回锁剩余过期时间
    9. 1568357773.338018 [0] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    10. 1568357773.338161 [0 lua] "exists" "my_first_lock_name"
    11. 1568357773.338177 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    12. 1568357773.338197 [0 lua] "pttl" "my_first_lock_name"
    13. ## 添加订阅(非Lua脚本) -> 订阅成功
    14. 1568357799.403341 [0] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
    15. ## 再次尝试获取锁 -> 未获取到,返回锁剩余过期时间
    16. 1568357830.683631 [0] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    17. 1568357830.684371 [0 lua] "exists" "my_first_lock_name"
    18. 1568357830.684428 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    19. 1568357830.684485 [0 lua] "pttl" "my_first_lock_name"
    20. # 线程A
    21. ## 3.1.1 释放锁并广播解锁消息,0代表解锁消息
    22. 1568357922.122454 [0] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
    23. 1568357922.123645 [0 lua] "exists" "my_first_lock_name"
    24. 1568357922.123701 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
    25. 1568357922.123741 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
    26. 1568357922.123775 [0 lua] "del" "my_first_lock_name"
    27. 1568357922.123799 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
    28. # 线程B
    29. ## 监听到解锁消息消息 -> 释放信号量,阻塞被解除; 再次尝试获取锁 -> 获取成功
    30. 1568357975.015206 [0] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    31. 1568357975.015579 [0 lua] "exists" "my_first_lock_name"
    32. 1568357975.015633 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
    33. 1568357975.015721 [0 lua] "pexpire" "my_first_lock_name" "600000"
    34. ## 取消订阅(非Lua脚本)
    35. 1568358031.185226 [0] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
    36. # 线程B
    37. ## 5.1.1 释放锁并广播解锁消息
    38. 1568358255.551896 [0] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    39. 1568358255.552125 [0 lua] "exists" "my_first_lock_name"
    40. 1568358255.552156 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
    41. 1568358255.552200 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
    42. 1568358255.552258 [0 lua] "del" "my_first_lock_name"
    43. 1568358255.552304 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"

    需要特别注意的是,RedissonLock 同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的 RedissonRedLock,RedissonRedLock 真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。

    所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。


    Redis 官网对 redLock 算法的介绍大致如下:

    The Redlock algorithm

    在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在我们的例子里面我们把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:

    1. 获取当前Unix时间,以毫秒为单位。

    2. 依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

    3. 客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。

    4. 如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) - 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。

    5. 如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

    用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)


    1. Config config1 = new Config();
    2. config1.useSingleServer().setAddress("redis://").setPassword("a123456").setDatabase(0);
    3. RedissonClient redissonClient1 = Redisson.create(config1);
    4. Config config2 = new Config();
    5. config2.useSingleServer().setAddress("redis://").setPassword("a123456").setDatabase(0);
    6. RedissonClient redissonClient2 = Redisson.create(config2);
    7. Config config3 = new Config();
    8. config3.useSingleServer().setAddress("redis://").setPassword("a123456").setDatabase(0);
    9. RedissonClient redissonClient3 = Redisson.create(config3);
    10. /**
    11. * 获取多个 RLock 对象
    12. */
    13. RLock lock1 = redissonClient1.getLock(lockKey);
    14. RLock lock2 = redissonClient2.getLock(lockKey);
    15. RLock lock3 = redissonClient3.getLock(lockKey);
    16. /**
    17. * 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
    18. */
    19. RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
    20. try {
    21. /**
    22. * 4.尝试获取锁
    23. * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
    24. * leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
    25. */
    26. boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    27. if (res) {
    28. //成功获得锁,在这里处理业务
    29. }
    30. } catch (Exception e) {
    31. throw new RuntimeException("aquire lock fail");
    32. }finally{
    33. //无论如何, 最后都要解锁
    34. redLock.unlock();
    35. }

    最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。

    下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。

    # Redisson 实现redlock算法源码分析(RedLock)



    1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    2. long newLeaseTime = -1;
    3. if (leaseTime != -1) {
    4. newLeaseTime = unit.toMillis(waitTime)*2;
    5. }
    6. long time = System.currentTimeMillis();
    7. long remainTime = -1;
    8. if (waitTime != -1) {
    9. remainTime = unit.toMillis(waitTime);
    10. }
    11. long lockWaitTime = calcLockWaitTime(remainTime);
    12. /**
    13. * 1. 允许加锁失败节点个数限制(N-(N/2+1))
    14. */
    15. int failedLocksLimit = failedLocksLimit();
    16. /**
    17. * 2. 遍历所有节点通过EVAL命令执行lua加锁
    18. */
    19. List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    20. for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
    21. RLock lock = iterator.next();
    22. boolean lockAcquired;
    23. /**
    24. * 3.对节点尝试加锁
    25. */
    26. try {
    27. if (waitTime == -1 && leaseTime == -1) {
    28. lockAcquired = lock.tryLock();
    29. } else {
    30. long awaitTime = Math.min(lockWaitTime, remainTime);
    31. lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
    32. }
    33. } catch (RedisResponseTimeoutException e) {
    34. // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
    35. unlockInner(Arrays.asList(lock));
    36. lockAcquired = false;
    37. } catch (Exception e) {
    38. // 抛出异常表示获取锁失败
    39. lockAcquired = false;
    40. }
    41. if (lockAcquired) {
    42. /**
    43. *4. 如果获取到锁则添加到已获取锁集合中
    44. */
    45. acquiredLocks.add(lock);
    46. } else {
    47. /**
    48. * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
    49. * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
    50. * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
    51. */
    52. if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
    53. break;
    54. }
    55. if (failedLocksLimit == 0) {
    56. unlockInner(acquiredLocks);
    57. if (waitTime == -1 && leaseTime == -1) {
    58. return false;
    59. }
    60. failedLocksLimit = failedLocksLimit();
    61. acquiredLocks.clear();
    62. // reset iterator
    63. while (iterator.hasPrevious()) {
    64. iterator.previous();
    65. }
    66. } else {
    67. failedLocksLimit--;
    68. }
    69. }
    70. /**
    71. * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
    72. */
    73. if (remainTime != -1) {
    74. remainTime -= System.currentTimeMillis() - time;
    75. time = System.currentTimeMillis();
    76. if (remainTime <= 0) {
    77. unlockInner(acquiredLocks);
    78. return false;
    79. }
    80. }
    81. }
    82. if (leaseTime != -1) {
    83. List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
    84. for (RLock rLock : acquiredLocks) {
    85. RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
    86. futures.add(future);
    87. }
    88. for (RFuture<Boolean> rFuture : futures) {
    89. rFuture.syncUninterruptibly();
    90. }
    91. }
    92. /**
    93. * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
    94. */
    95. return true;
    96. }


    [1]Distributed locks with Redis

    [2]Distributed locks with Redis 中文版

    [3]SET - Redis

    [4]EVAL command

    [5] Redisson




