zoukankan      html  css  js  c++  java
  • Redisson分布式锁实现

    转:

    Redisson分布式锁实现

    转:分布式锁和Redisson实现

    概述

    分布式系统有一个著名的理论CAP,指在一个分布式系统中,最多只能同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三项中的两项。所以在设计系统时,往往需要权衡,在CAP中作选择。当然,这个理论也并不一定完美,不同系统对CAP的要求级别不一样,选择需要考虑方方面面。

    在微服务系统中,一个请求存在多级跨服务调用,往往需要牺牲强一致性老保证系统高可用,比如通过分布式事务,异步消息等手段完成。但还是有的场景,需要阻塞所有节点的所有线程,对共享资源的访问。比如并发时“超卖”和“余额减为负数”等情况。

    本地锁可以通过语言本身支持,要实现分布式锁,就必须依赖中间件,数据库、redis、zookeeper等。

    分布式锁特性

    不管使用什么中间件,有几点是实现分布式锁必须要考虑到的。

    1. 互斥:互斥好像是必须的,否则怎么叫锁。
    2. 死锁: 如果一个线程获得锁,然后挂了,并没有释放锁,致使其他节点(线程)永远无法获取锁,这就是死锁。分布式锁必须做到避免死锁。
    3. 性能: 高并发分布式系统中,线程互斥等待会成为性能瓶颈,需要好的中间件和实现来保证性能。
    4. 锁特性:考虑到复杂的场景,分布式锁不能只是加锁,然后一直等待。最好实现如Java Lock的一些功能如:锁判断,超时设置,可重入性等。

    Redis实现之Redisson原理

    redission实现了JDK中的Lock接口,所以使用方式一样,只是Redssion的锁是分布式的。如下:

    RLock lock = redisson.getLock("className"); 
    lock.lock(); 
    try {
        // do sth.
    } finally {
        lock.unlock(); 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    好,Lock主要实现是RedissionLock。

    RedissonLock

    先来看常用的Lock方法实现。

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    再看lockInterruptibly方法:

    @Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        // 获取锁
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        if (ttl == null) { // 获取成功
            return;
        }
    
        // 异步订阅redis chennel
        RFuture<RedissonLockEntry> future = subscribe(threadId);
        commandExecutor.syncSubscription(future); // 阻塞获取订阅结果
    
        try {
            while (true) {// 循环判断知道获取锁
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }
    
                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);// 取消订阅
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    总结lockInterruptibly:获取锁,不成功则订阅释放锁的消息,获得消息前阻塞。得到释放通知后再去循环获取锁。

    下面重点看看如何获取锁:Long ttl = tryAcquire(leaseTime, unit, threadId)

     private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(leaseTime, unit, threadId));// 通过异步获取锁,但get(future)实现同步
    }
    
    • 1
    • 2
    • 3
    • 1
    • 2
    • 3
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) { //1 如果设置了超时时间,直接调用 tryLockInnerAsync
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        //2 如果leaseTime==-1,则默认超时时间为30s
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
        //3 监听Future,获取Future返回值ttlRemaining(剩余超时时间),获取锁成功,但是ttlRemaining,则刷新过期时间
        ttlRemainingFuture.addListener(new FutureListener<Long>() {
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }
    
                Long ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    已经在注释中解释了,需要注意的是,此处用到了Netty的Future-listen模型,可以看看我的另一篇对Future的简单讲解:给Future一个Promise

    下面就是最重要的redis获取锁的方法tryLockInnerAsync:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(
            getName(), 
            LongCodec.INSTANCE, 
            command,
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              "return redis.call('pttl', KEYS[1]);",
            Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    这个方法主要就是调用redis执行eval lua,为什么使用eval,因为redis对lua脚本执行具有原子性。把这个方法翻译一下:

    -- 1.  没被锁{key不存在}
    eval "return redis.call('exists', KEYS[1])" 1 myLock
    -- (1) 设置Lock为key,uuid:threadId为filed, filed值为1
    eval "return redis.call('hset', KEYS[1], ARGV[2], 1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (2) 设置key过期时间{防止获取锁后线程挂掉导致死锁}
    eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    
    -- 2. 已经被同线程获得锁{key存在并且field存在}
    eval "return redis.call('hexists', KEYS[1], ARGV[2])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (1) 可重入,但filed字段+1
    eval "return redis.call('hincrby', KEYS[1], ARGV[2],1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (2) 刷新过去时间
    eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    
    -- 3. 已经被其他线程锁住{key存在,但是field不存在}:以毫秒为单位返回 key 的剩余超时时间
    eval "return redis.call('pttl', KEYS[1])" 1 myLock
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    这就是核心获取锁的方式,下面直接释放锁方法unlockInnerAsync

    -- 1. key不存在
    eval "return redis.call('exists', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- (1) 发送释放锁的消息,返回1,释放成功
    eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    
    -- 2. key存在,但field不存在,说明自己不是锁持有者,无权释放,直接return nil
    eval "return redis.call('hexists', KEYS[1], ARGV[3])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    eval "return nil"
    
    -- 3. filed存在,说明是本线程在锁,但有可能其他地方重入锁,不能直接释放,应该-1
    eval "return redis.call('hincrby', KEYS[1], ARGV[3],-1)" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    
    -- 4. 如果减1后大于0,说明还有其他重入锁,刷新过期时间,返回0。
    eval "return redis.call('pexpire', KEYS[1], ARGV[2])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    
    -- 5. 如果不大于0,说明最后一把锁,需要释放
    -- 删除key
    eval "return redis.call('del', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 发释放消息
    eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
    -- 返回1,释放成功
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    从释放锁代码中看到,删除key后会发送消息,所以上文提到获取锁失败后,阻塞订阅此消息。

    另外,上文提到刷新过期时间方法scheduleExpirationRenewal,指线程获取锁后需要不断刷新失效时间,避免未执行完锁就失效。这个方法的实现原理也类似,只是使用了Netty的TimerTask,每到过期时间1/3就去重新刷一次,如果key不存在则停止刷新。Timer实现大概如下:

    private static void nettyTimer() {
        final int expireTime = 6;
        EventExecutorGroup group = new DefaultEventExecutorGroup(1);
        final Timer timer = new HashedWheelTimer();
        timer.newTimeout(timerTask -> {
            Future<Boolean> future = group.submit(() -> {
                System.out.println("刷新key的失效时间为"+expireTime +"秒");
                return false;// 但key不存在时,返回true
            });
            future.addListener(future1 -> {
                if (!future.getNow()) {
                    nettyTimer();
                }
            });
        }, expireTime/3, TimeUnit.SECONDS);
    }
  • 相关阅读:
    Openstack Paste.ini 文件详解
    Keystone controller.py & routers.py代码解析
    YARN源码分析(三)-----ResourceManager HA之应用状态存储与恢复
    YARN源码分析(四)-----Journalnode
    YARN源码分析(四)-----Journalnode
    YARN源码分析(四)-----Journalnode
    YARN源码学习(五)-----NN,DN,RM在Ganglia上的监控实现机理
    Confluence 6 配置一个 Confluence 环境
    Confluence 6 审查日志的对象
    Confluence 6 审查日志
  • 原文地址:https://www.cnblogs.com/libin6505/p/10790212.html
Copyright © 2011-2022 走看看