zoukankan      html  css  js  c++  java
  • Redisson分布式锁原理

    分布式锁产生的场景

    分布式锁在需要分布式同步的场景下使用,也就是在分布式系统下才能发挥作用,传统的单台系统使用java提供的锁,分布式场景多实例项目需要多个项目之间同步。

    都有哪些实现方式

    1 数据库锁:通过在数据库中创建一条记录,根据创建结果来判断是否上锁成功。实现简单但是数据库效率低。
    2 redis实现:通过redis缓存中间件实现,比较繁琐,效率高。
    2 zookeeper实现:通过临时节点实现,实现简单,失效时间可以控制。
    效率:redis > zookeeper > 数据库

    redis实现

    判断 key 是否存在,如果不存在,设置key,加锁成功,并设置过期是时间,每过一段时间判断是否执行完,没执行完重设过期时间。
    如果key存在,就循环判断,直到获取锁。

    Redisson

    Redisson是redis官方出的分布式锁工具。

    基本使用方式如下:

    Config config = new Config();
    config.useSingleServer().setAddress("redis://127.0.0.1:6379");
    RedissonClient client = Redisson.create(config);
    RLock lock = client.getLock("abcde");
    lock.lock();
    try {
        ...
    } finally {
        lock.unlock();
    }
    

    创建客户端连接

    根据给出的配置 创建连接
    RedissonClient client = Redisson.create(config);
    创建时包括,连接管理器、命令执行器和调度器

     protected Redisson(Config config) {
        this.config = config;
        Config configCopy = new Config(config);
        
        connectionManager = ConfigSupport.createConnectionManager(configCopy);
        commandExecutor = new CommandSyncService(connectionManager);
        evictionScheduler = new EvictionScheduler(commandExecutor);
    }
    

    获取锁

    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        Long ttl = tryAcquire(leaseTime, unit);
        // 获取了锁 返回
        if (ttl == null) {
            return;
        }
    
        long threadId = Thread.currentThread().getId();
        // 订阅锁消息
        Future<RedissonLockEntry> future = subscribe(threadId);
        get(future);//
    
        try {
            while (true) {
                ttl = tryAcquire(leaseTime, unit);
                // lock acquired
                if (ttl == null) {
                    break;
                }
    
                // waiting for message
                if (ttl >= 0) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
    }
    

    tryAcquire 获取锁方法

    <T> Future<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command){
        internalLockLeaseTime = unit.toMillis(leaseTime);
    
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +	
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +	
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +	
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +	
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

    上边一段lua脚本可以看到redis获取锁的逻辑:
    假设key:abcde,Lock的id为123456789 线程为thread1 ,有效期为 10
    1 如果不存在 key:abcde
    2 设置 abcde 然后里面一个键值对 123456789:thread1 值为 1
    3 设置 abcde 有效期 10
    4 如果存在abcde并且123456789:thread1
    5 将123456789:thread1 的值加 1
    6 重设过期时间

    这里数据结构用的是redis的hash表
    抢到锁后有一个线程会一直循环续命默认是10秒执行一次,一个锁默认是30秒
    如果没有抢到锁:
    subscribe 订阅锁的释放消息
    然后循环获取锁

    释放锁

    public void unlock() {
        Boolean opStatus= commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; " +
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                            "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                            "return 0; " +
                        "else " +
                            "redis.call('del', KEYS[1]); " +
                            "redis.call('publish', KEYS[2], ARGV[1]); " +
                            "return 1; "+
                        "end; " +
                        "return nil;",
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
    

    假设 参数1是key:abcde,参数2:redisson_lock__channel__{abcde},参数3:123456789:thread1

    1 判断是否有abcde这个key
    2 不存在广播解锁消息
    3 存在key,判断key是否有123456789:thread1参数
    4 存在参数将数值减一
    5 判断减一后数字是否大于0
    6 大于0 重设过期时间
    7 不大于 删除key,并广播解锁消息,其他线程就可以开始抢占锁了

  • 相关阅读:
    Git的初步学习
    Git的初步学习
    微信小程序我的界面
    微信小程序我的界面
    Day2:html和css
    Day2:html和css
    Day1:html和css
    Day1:html和css
    Java之JDK7的新语法探索
    Java之JDK7的新语法探索
  • 原文地址:https://www.cnblogs.com/paper-man/p/13284598.html
Copyright © 2011-2022 走看看