zoukankan      html  css  js  c++  java
  • redis分布式锁的几种实现方式,以及Redisson的配置和使用

    最近在开发中涉及到了多个客户端的对redis的某个key同时进行增删的问题。这里就会涉及一个问题:锁

    先举例在分布式系统中不加锁会出现问题:

      redis中存放了某个用户的账户余额 ,例如100 (用户id:余额)

      A端需要对用户扣费-1,需要两步:

        A1.将该用户的目前余额取出来(100)

        A2.将余额扣除一部分(99)后再插入到redis中

      B端需要对用户充值+10,需要两步:

        B1.将该用户的目前余额取出来(99)

        B2.将余额添加充值额度(109)后再插入到redis中

      我们的期望执行顺序是A1、A2、B1、B2  结果就会是109

      但是如果不加锁,就会出现A1、B1、A2、B2(110)或者其他各种随机情况,这样就会造成数据错误。

    Redis加锁的几种实现方式

      方式一,自己造轮子

        之前参考的很多博客,关于redis加锁都是先setNX()获取锁,然后再setExpire()设置锁的有效时间。

        然而这样的话获取锁的操作就不是原子性的了,如果setNX后系统宕机,就会造成锁死,系统阻塞。

        根据官方的推荐(https://redis.io/topics/distlock),最好使用set命令:SET key value [EX seconds] [PX milliseconds] [NX|XX]       

        EX PX设置有效时间    NX属性的作用就是如果key存在就返回失败,否则插入数据。

        需要注意的是:

          在Redis 2.6.12之前,set只能返回OK,所以无法判断操作是否成功,所以也就不适用。

          如果使用的是spring-boot-starter-data-redis依赖,那么在2.x版本之前的接口也不支持上述的set操作

        java代码:

    //获取锁
            //锁的键值需要具有标志性。
            //例如,现在有两个系统需要对key=user_id,value=user_balance进行操作,这时就可以设计这个键的锁为user_id+"_key"
            String user_id="1";
            String key=user_id+"_key";
            //值设置为一个随机数(下面讲原因)
            String random_value=UUID.randomUUID().toString();
            redisTemplate.execute((RedisCallback<Boolean>) (RedisConnection connection)->{
                //只有2.0以上的版本才支持set返回插入结果Boolean
                //此命令的意思是只有key不存在,才插入值,并且设置有效时间为10s
                connection.set(key.getBytes(), random_value.getBytes(), Expiration.seconds(10), SetOption.SET_IF_ABSENT);
                //本示例由于依赖版本低于2.0,所以无法接受set设置结果
                Boolean result=true;
                return result;
            });
            
            //进行更新操作...
            
            //释放锁
            //为什么释放之前要比较一下?
            //这是为了防止删除掉别人的锁,例如此场景中:如果我们的中间操作超过了10s那么锁会自动释放,这时别人会再获取锁。
            //如果我们执行完中间就直接删除锁的话,就会把别人的锁删除
            if(redisTemplate.opsForValue().get(key)==random_value) {
                redisTemplate.delete(key);
            }

    可以发现,如果自己来实现的话,受限很多。并且这还是最基本的操作,包括出错重试等功能都没有。

    所以我们要学习redis推荐的reids工具redisson

      方式二:集成redissonhttps://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95

      一.添加依赖

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.6.1</version>
    </dependency>    

      二.resources文件夹添加配置文件redisson.yml

    singleServerConfig:
      #连接空闲超时,单位:毫秒
      idleConnectionTimeout: 10000
      pingTimeout: 1000
      #连接超时,单位:毫秒
      connectTimeout: 10000
      #命令等待超时,单位:毫秒
      timeout: 3000
      #命令失败重试次数
      retryAttempts: 3
      #命令重试发送时间间隔,单位:毫秒
      retryInterval: 1500
      #重新连接时间间隔,单位:毫秒
      reconnectionTimeout: 3000
      #执行失败最大次数
      failedAttempts: 3
      #单个连接最大订阅数量
      subscriptionsPerConnection: 5
      #客户端名称
      clientName: null
      #地址
      address: "redis://192.168.1.16:6379"
      #数据库编号
      database: 0
      #密码
      password: xiaokong
      #发布和订阅连接的最小空闲连接数
      subscriptionConnectionMinimumIdleSize: 1 
      #发布和订阅连接池大小
      subscriptionConnectionPoolSize: 50
      #最小空闲连接数
      connectionMinimumIdleSize: 32
      #连接池大小
      connectionPoolSize: 64
      #是否启用DNS监测
      dnsMonitoring: false
      #DNS监测时间间隔,单位:毫秒
      dnsMonitoringInterval: 5000
    threads: 0
    nettyThreads: 0
    codec: !<org.redisson.codec.JsonJacksonCodec> {}
    transportMode : "NIO"

      三.Application中设置RedissonClient

    import org.mybatis.spring.annotation.MapperScan;
    import org.redisson.Redisson;
    import org.redisson.api.RedissonClient;
    import org.redisson.config.Config;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
    import org.springframework.context.annotation.Bean;
    import org.springframework.core.io.ClassPathResource;
    import org.springframework.transaction.annotation.EnableTransactionManagement;
    
    @SpringBootApplication
    @EnableTransactionManagement
    @MapperScan("com.xxx.mapper")
    public class Application {
        public static void main(String [] args) {
            SpringApplication.run(Application.class, args);
        }
        @Bean(destroyMethod="shutdown")
        public RedissonClient redisson() throws IOException {
            RedissonClient redisson = Redisson.create(
                    Config.fromYAML(new ClassPathResource("redisson.yml").getInputStream()));
            return redisson;
        }
    }

      四.在代码中使用

    @Autowired
        private RedissonClient redisson;
        @Test 
        public void redisson() {
            String user_id="1";
            String key=user_id+"_key";
            //获取锁
            RLock lock = redisson.getLock(key);
            lock.lock();
                //执行具体逻辑...
            
            RBucket<Object> bucket = redisson.getBucket("a");
            bucket.set("bb");
            
            lock.unlock();
        }

    需要注意的是redisson的使用和redisTemplate有比较大的区别,这里简单介绍一下几个特性:(刚用时迷了很久,希望大家能少走些弯路)

    1.redisson中不需要set指令,举个例子:

    RBucket<Object> bucket = redisson.getBucket("a");
    bucket.set("bb");

    在这两条语句中,我们只获取了key="a"的bucket类型对象(里面可以装一个任意对象)。然后修改bucket里面一个值,其实这时["a","bb"]已经被存入redis了

    2.所有的值都是结构体

    和上例的RBucket结构体一样,redisson提供了十几种结构体(https://github.com/redisson/redisson/wiki/7.-%E5%88%86%E5%B8%83%E5%BC%8F%E9%9B%86%E5%90%88)供我们使用,当取值时,redisson也会自动将值转换成对应的结构体。所以如果使用redisson取redisTemplate放入的值,就要小心报错

    方式三.基于redlock的算法讨论

    这种我还没有具体实现过,为什么会出现这种算法,主要是应对redis服务器宕机的问题。当redis宕机时,即使有主从,但是依然会有一个同步间隔。这样就会造成数据流失。

    当然,更为严重的是,在分布式情况下,丢失的是锁,我们知道一般用锁的数据都是比较重要的。

    一个场景:A在向主机1请求到锁成功后,主机1宕机了。现在从机1a变成了主机。但是数据没有同步,从机1a是没有A的锁的。那么B又可以获得一个锁。这样就会造成数据错误。

    redlock主要思想就是做数据冗余。建立5台独立的集群,当我们发送一个数据的时候,要保证3台(n/2+1)以上的机器接受成功才算成功,否则重试或报错。

    当然具体是很复杂,想研究的可以看看(https://redis.io/topics/distlock)

    方式四.使用zookeeper+redis来管理锁

    就像之前讨论的,方式2只能保证客户端的正确,却无法保证服务端的宕机数据丢失。方式三的数据完整性很高,但是管理起来很复杂。这时就有了一个折中的做法:

    将锁存放在zookeeper中,由于zookeeper与redis的场景不同,所以zookeeper的算法对数据的完整性要求很高。在分布式的zookeeper中,数据是很难丢失的。

    这样,我们就可以把锁放到zookeeper中,来保证锁的完整性。

    好吧,这个我也没有实验过(羞耻),不过网上又很多这方面的博客,以后用到再说吧~~~一般项目用方式二就可以啦,

  • 相关阅读:
    C++实现网格水印之调试笔记(六)—— 提取完成
    C++实现网格水印之调试笔记(五)—— 提取出错
    C++实现网格水印之调试笔记(四)—— 完成嵌入
    Spark ---RDD
    Running Spark on YARN
    Spark官方2 ---------Spark 编程指南(1.5.0)
    Spark官方1 ---------Spark SQL和DataFrame指南(1.5.0)
    spark与hive的集成
    [mysql] ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES).
    HDFS概述(6)————用户手册
  • 原文地址:https://www.cnblogs.com/chenkeyu/p/8514250.html
Copyright © 2011-2022 走看看