zoukankan      html  css  js  c++  java
  • Redission实现的redis分布式锁set key my_random_value NX PX 30000

    Redis分布式锁,看完不懂你打我

     

    简易的redis分布式锁

    加锁:

    set key my_random_value NX PX 30000

    这个命令比setnx好,因为可以同时设置过期时间。不设置过期时间,应用挂了,解不了锁,就一直锁住了。

    解锁:

    if redis.call("get",KEYS[1])==ARGV[1] then
       return redis.call("del",KEYS[1])
    else
      return 0
    end
    

    先比较一下值,相等才删除。防止其他线程把锁给解了。

    以上方案在一般的场景就够用了,但还存在一些小问题:

    1. 如果设置过期时间3秒,但是业务执行需要4秒怎么办?

    解决方案:参照redisson的看门狗,可以后台起一个线程去看看业务线程执行完了没有,如果没有就延长过期时间。

    1. redis是单点的,如果宕机了,那么整个系统就会崩溃。如果是主从结构,那么master宕机了,存储的key还没同步到slave,此时slave升级为新的master,客户端2从新的master上就能拿到同一个资源的锁。这样客户端1和客户端2都拿到锁,就不安全了。

    解决方案:RedLock算法。简单说就是N个(通常是5)独立的redis节点同时执行SETNX,如果大多数成功了,就拿到了锁。这样就允许少数节点不可用。

    那我们看看工业级别是怎么实现redis分布式锁的呢?

    Redission实现的redis分布式锁

    加锁流程:

    解锁流程:

    Redission加锁使用的是redis的hash结构。

    • key :要锁的资源名称
    • filed :uuid+":"+线程id
    • value : 数值型,可以实现可重入锁

    源码里面用到了netty里面Promise的一些api,我列出来帮助理解:

        // 异步操作完成且正常终止
        boolean isSuccess();
        // 异步操作是否可以取消
        boolean isCancellable();
        // 异步操作失败的原因
        Throwable cause();
        // 添加一个监听者,异步操作完成时回调,类比javascript的回调函数
        Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
        Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
        // 阻塞直到异步操作完成
        Future<V> await() throws InterruptedException;
        // 同上,但异步操作失败时抛出异常
        Future<V> sync() throws InterruptedException;
        // 非阻塞地返回异步结果,如果尚未完成返回null
        V getNow();
    

    源码分析:

    加锁:

    public RLock getLock(String name) {
            return new RedissonLock(connectionManager.getCommandExecutor(), name);
        }
        
         public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
            super(commandExecutor, name);
            //命令执行器
            this.commandExecutor = commandExecutor;
           //uuid
            this.id = commandExecutor.getConnectionManager().getId();
            //超时时间,默认30s
            this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
            this.entryName = id + ":" + name;
        }
    
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
            //获取线程id
            long threadId = Thread.currentThread().getId();
            //尝试获取锁
            Long ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            //ttl为空则代表加锁成功
            if (ttl == null) {
                return;
            }
    
      //如果获取锁失败,则订阅到对应这个锁的channel,等其他线程释放锁时,通知线程去获取锁
            RFuture<RedissonLockEntry> future = subscribe(threadId);
            commandExecutor.syncSubscription(future);
    
            try {
                while (true) {
                //再次尝试获取锁
                    ttl = tryAcquire(leaseTime, unit, threadId);
                    // lock acquired
                    if (ttl == null) {
                        break;
                    }
    
                    // waiting for message
                     //ttl大于0,则等待ttl时间后继续尝试获取锁
                    if (ttl >= 0) {
                        getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } else {
                        getEntry(threadId).getLatch().acquire();
                    }
                }
            } finally {
            //取消对channel的订阅
                unsubscribe(future, threadId);
            }
    //        get(lockAsync(leaseTime, unit));
        }
    

    再来看看里面的尝试获取锁的代码:

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
            return get(tryAcquireAsync(leaseTime, unit, threadId));
        }
        
        private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
            if (leaseTime != -1) {
            //如果带有过期时间,则按照普通方式获取锁
                return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
            }
             //先按照30秒的过期时间来执行获取锁的方法
            RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            //异步执行回调监听
            ttlRemainingFuture.addListener(new FutureListener<Long>() {
                @Override
                 //如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
                public void operationComplete(Future<Long> future) throws Exception {
                //没有成功执行完成
                    if (!future.isSuccess()) {
                        return;
                    }
           //非阻塞地返回异步结果,如果尚未完成返回null
                    Long ttlRemaining = future.getNow();
                    // lock acquired
                    if (ttlRemaining == null) {
                        scheduleExpirationRenewal(threadId);
                    }
                }
            });
            return ttlRemainingFuture;
        }
    

    看门狗逻辑:

    使用的是Netty的Timeout延迟任务做的。

    • 比如锁过期 30 秒, 每过 1/3 时间也就是 10 秒会检查锁是否存在, 存在则更新锁的超时时间

    加锁脚本

     <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            //如果锁不存在,则通过hset设置它的值,并设置过期时间
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                       //如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1,并重新设置过期时间
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                         //如果锁已存在,但并非本线程,则返回过期时间
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    解锁:

    public RFuture<Void> unlockAsync(final long threadId) {
            final RPromise<Void> result = new RedissonPromise<Void>();
            //底层解锁方法
            RFuture<Boolean> future = unlockInnerAsync(threadId);
    
            future.addListener(new FutureListener<Boolean>() {
                @Override
                public void operationComplete(Future<Boolean> future) throws Exception {
                    if (!future.isSuccess()) {
                        cancelExpirationRenewal(threadId);
                        result.tryFailure(future.cause());
                        return;
                    }
    
                    Boolean opStatus = future.getNow();
                    //如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
                    if (opStatus == null) {
                        IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                + id + " thread-id: " + threadId);
                        result.tryFailure(cause);
                        return;
                    }
                    if (opStatus) {
                        cancelExpirationRenewal(null);
                    }
                    result.trySuccess(null);
                }
            });
    
            return result;
        }
    

    解锁脚本:

     protected RFuture<Boolean> unlockInnerAsync(long threadId) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                    "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                    "end;" +
                     //如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
                    "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                    "end; " +
                     //通过hincrby递减1的方式,释放一次锁
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                     //若剩余次数大于0 ,则刷新过期时间
                    "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                    "else " +
                      //否则证明锁已经释放,删除key并发布锁释放的消息
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; "+
                    "end; " +
                    "return nil;",
                    Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
    
        }
    
    书山有路勤为径,学海无涯苦作舟
     
    分类: 技术点
    作者:Leo_wl
             
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    版权信息
  • 相关阅读:
    Codeforces1420E. Battle Lemmings 题解 动态规划
    C++使用partial_sum求前缀和
    HDU6171 Admiral 题解 折半搜索
    HDU3017 Treasure Division 题解 折半搜索
    使用re.split 按标点+空格的一种分割办法
    实现CString的Format功能,支持跨平台
    转载一篇makefile,说的很详细
    Microsoft C++ 异常: std::system_error std::thread
    源码一样,运行结果不一致的解决办法
    记录一次阿里的电话面试
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/15708882.html
Copyright © 2011-2022 走看看