zoukankan      html  css  js  c++  java
  • Redis实现缓存与分布式锁

    缓存与分布式锁

    哪些数据适合放入缓存

    • 即时性、数据一致性要求不高的
    • 访问量大且更新频率不高的数据

    选择redis做为缓存中间件

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    

    问题记录与分析

    产生堆外内存溢出:OutOfDirectMemoryError

    1. springboot2.0 以后默认使用lettuce作为操作redis的客户端,它使用netty进行网络通信。
    2. lettuce的bug导致netty堆外内存溢出

    解决方案: 切换到jedis(或者升级lettuce)

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
      <exclusions>
        <exclusion>
          <groupId>io.lettuce</groupId>
          <artifactId>lettuce-core</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    
    <dependency>
      <groupId>redis.clients</groupId>
      <artifactId>jedis</artifactId>
    </dependency>
    

    高并发下缓存失效问题-缓存穿透

    缓存穿透:

    指查询一个一定不存在的数据,由于缓存是不命中,将要去查询数据库,但是数据库也没有该记录,我们将这次查询的null写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义

    风险:

    利用不存在数据进行攻击,数据库瞬时压力增大,最终导致崩溃

    解决:

    null结果缓存,并加入短暂过期时间

    高并发下缓存失效问题-缓存雪崩

    缓存雪崩:

    缓存雪崩是指我们设置缓存时key采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩。

    解决:

    原有的失效时间基础上增加一个随机值,比如1-5min随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

    高并发下缓存失效问题-缓存击穿

    缓存击穿:

    • 对于一些设置了过期时间的key,如果这些key可能会在某些时间点被超高并发地访问,是一种非常”热点“的数据。
    • 如果这个key在大量请求同时进来前正好失效,那么所有key的数据查询都落到db,我们称之为缓存击穿。

    解决:

    加锁,大量并发只让一个请求去查,其他请求等待,查到以后释放锁,其他请求获取到锁,先查缓存,就会有数据,不用去db。

    加锁实践:

    springboot所有的组件在容器中都是单例的,可以使用synchronized(this),JUC(Lock)等解决单体应用中的问题,但是分布式系统中,要想锁住所有数据,就必须使用分布式锁

    通过分析 分布式锁必须保证加锁(占位+过期时间)和删除锁(判断+删除)的原子性。

    加锁可以使用redis setnx ex命令来操作,但是删除锁的时候 ,要先判断再删除,想把这两步操作做成原则性的,需要采用redis+lusj脚本的方式来操作。

    public Map<String, List<Catalog2Vo>> getCatalogJsonWithRedisLock() {
            // 1. 占分布式锁
            String uuid = UUID.randomUUID().toString();
            // 设置锁和设置过期时间必须是原子性的 不能通过redis的两条命令设置,这里的命令等价于redis命令setnx ex
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS);
            if (lock) {
                log.info("获取分布式锁成功 ...");
                Map<String, List<Catalog2Vo>> dataFromDb;
                try {
                    dataFromDb = getDataFromDb();
                } finally {
                    // 删除锁 必须判断是当前锁 再删除,所以,为了保证原子性操作 需要采取redis+Lua脚本 完成
                    String luaScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
                    // 执行脚本
                    redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
                            Collections.singletonList("lock"), uuid);
                }
                return dataFromDb;
            } else {
                // 加锁失败
                log.info("加锁失败,获取分布式锁 等待重试");
    
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return getCatalogJsonWithRedisLock();
            }
        }
    

    Redisson

    Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

    官方文档

    导入依赖

    <!--使用redisson做为分布式锁,分布式对象等功能框架 -->
    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.12.0</version>
    </dependency>
    

    配置单个redis

    @Configuration
    public class MyRedissonConfig {
        @Bean(destroyMethod = "shutdown")
        public RedissonClient redisson() throws IOException {
            // 1. 创建配置
            Config config = new Config();
            config.useSingleServer().setAddress("redis://127.0.0.1:6379");
            // 2. 根据配置创建出Redisson实例
            return Redisson.create(config);
        }
    }
    

    测试:

    @ResponseBody
        @GetMapping("/hello")
        public String hello() {
            // 1. 获取一把锁 ,只要锁的名字一样,就是同一把锁
            RLock lock = redisson.getLock("my-lock");
    
            // 2. 加锁
            lock.lock(); // 阻塞式等待
            // 锁的自动续期:如果业务超长,运行期间自动给锁续上新的30s 不用担心业务时间长,锁自动过期被删掉
            // 加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30s以后自动删除
            try {
                System.out.println("加锁成功 执行业务 ..." + Thread.currentThread().getId());
                Thread.sleep(30000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 3. 解锁
                System.out.println("释放锁 ..." + Thread.currentThread().getId());
                lock.unlock();
            }
            return "hello";
        }
    

    lock.lock()

    • 如果我们传递了时间,就发送给redis执行脚本,进行占锁,默认超时就是我们指定的时间
    • 如果我们未指定锁的超时时间,就使用30*1000 (看门狗的默认时间:LockWatchdogTimeout),只要占锁成功,就会启动一个定时任务:重新给锁设置过期时间,新的过期时间就是看门狗的默认时间。这个定时任务执行间隔(internalLockLeaseTime)为: (看门狗时间/3)

    源代码:

    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
      if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
      }
      RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
          return;
        }
    
        // lock acquired
        if (ttlRemaining == null) {
          scheduleExpirationRenewal(threadId);
        }
      });
      return ttlRemainingFuture;
    }
    

    传递时间时执行的方法:执行lua脚本

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
            internalLockLeaseTime = unit.toMillis(leaseTime);
    
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                      "if (redis.call('exists', KEYS[1]) == 0) then " +
                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return nil; " +
                      "end; " +
                      "return redis.call('pttl', KEYS[1]);",
                        Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
        }
    

    定时任务来做续期

    private void renewExpiration() {
      ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
      if (ee == null) {
        return;
      }
    
      Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
          ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
          if (ent == null) {
            return;
          }
          Long threadId = ent.getFirstThreadId();
          if (threadId == null) {
            return;
          }
    
          RFuture<Boolean> future = renewExpirationAsync(threadId);
          future.onComplete((res, e) -> {
            if (e != null) {
              log.error("Can't update lock " + getName() + " expiration", e);
              return;
            }
    
            if (res) {
              // reschedule itself
              renewExpiration();
            }
          });
        }
      }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
      ee.setTimeout(task);
    }
    

    最佳实践是传时间,省去了整个续期的操作,给定合理的过期时间即可。

    读写锁测试:

    /**
     * 测试读写锁 - 写
     */
    @GetMapping("/write")
    @ResponseBody
    public String writeValue() {
      RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
      RLock rLock = lock.writeLock();
      String s = "";
      try {
        rLock.lock();
        s = UUID.randomUUID().toString();
        // 模拟业务时长
        Thread.sleep(30000);
        redisTemplate.opsForValue().set("writeValue", s);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        rLock.unlock();
      }
      return s;
    }
    
    /**
      * 测试读写锁 - 读
      * <p>
      * 保证一定能读到最新的数据,修改期间,写锁是一个排他锁(互斥锁),读锁是一个共享锁
      * 写锁没释放 读就必须等待
      */
    @GetMapping("/read")
    @ResponseBody
    public String readValue() {
      RReadWriteLock lock = redisson.getReadWriteLock("rw-lock");
      RLock rLock = lock.readLock();
      rLock.lock();
      String writeValue = "";
      try {
        writeValue = redisTemplate.opsForValue().get("writeValue");
      } catch (Exception e) {
        e.printStackTrace();
      } finally {
        rLock.unlock();
      }
      return writeValue;
    }
    

    总结:

    • 读+读 :相当于无锁,并发读,只会在redis中记录好,所有当前的读锁,他们都会同时加锁成功
    • 写+读 :等待写锁释放
    • 写+写 :阻塞方式
    • 读+写 :有读锁,写也需要等待

    即只要有写的存在,都必须等待

    信号量测试:

    /**
      * 测试信号量
      * <p>
      * 模拟车库停车
      * 车位 3  测试的时候先在redis中先设置当前车位数  set park 3
      */
    @GetMapping("/park")
    @ResponseBody
    public String park() throws InterruptedException {
      RSemaphore semaphore = redisson.getSemaphore("park");
      // acquire()是阻塞的,当没有车位时会一直等到有释放时才返回
      // 如果不想阻塞 可以使用 tryAcquire() 会返回一个布尔值
      semaphore.acquire(); // 获取一个信号,获取一个值,即占一个车位
      return "ok";
    }
    
    /**
      * 出库
      */
    @GetMapping("/go")
    @ResponseBody
    public String go() throws InterruptedException {
      RSemaphore semaphore = redisson.getSemaphore("park");
      semaphore.release(); // 释放一个信号,即空出一个车位
      return "ok";
    }
    

    闭锁测试:

    /**
      * 测试闭锁
      * <p>
      * 模拟学校关闭大门 只要当5个班级人都走完了 才可以关闭大门
      */
    @GetMapping("/lockDoor")
    @ResponseBody
    public String lockDoor() throws InterruptedException {
      RCountDownLatch door = redisson.getCountDownLatch("door");
      door.trySetCount(5);
      door.await(); //等待闭锁都完成
      return "放假喽!关大门了哈";
    }
    
    @GetMapping("/outDoor/{id}")
    @ResponseBody
    public String outDoor(@PathVariable("id") Long id) {
      // 这里只是模拟 不用考虑真实场景
      RCountDownLatch door = redisson.getCountDownLatch("door");
      door.countDown(); // 计数减一
      return id + "班的人都走完了";
    }
    

    缓存数据一致性

    • 双写模式
    • 失效模式

    无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?

    1. 如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加

    上过期时间,每隔一段时间触发读的主动更新即可

    1. 如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。

    2. 缓存数据+过期时间也足够解决大部分业务对于缓存的要求。

    3. 通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心

    脏数据,允许临时脏数据可忽略);

    总结:

    • 我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保

    证每天拿到当前最新数据即可。

    • 我们不应该过度设计,增加系统的复杂性

    • 遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。

    整合springcache

    整合springcache,简化缓存开发

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-cache</artifactId>
    </dependency>
    

    指定使用redis来缓存

    spring.cache.type=redis
    

    spring提供的几个注解:

    • @Cacheable: Triggers cache population:触发将数据保存到缓存的操作
    • @CacheEvict: Triggers cache eviction:触发将数据从缓存中删除的操作
    • @CachePut: Updates the cache without interfering with the method execution:不影响方法执行更新缓存
    • @Caching: Regroups multiple cache operations to be applied on a method:组合以上多个操作
    • @CacheConfig: Shares some common cache-related settings at class-level:在类级别共享缓存的相同配置

    测试:

    1. 开启缓存功能:启动类上加上 @EnableCaching注解
    2. 只需要使用注解就能完成缓存

    使用缓存后的默认行为:

    • 如果缓存中有数据,则方法不会调用,即直接返回缓存中的数据
    • key默认自动生成:缓存的名字::SimpleKey []
    • 缓存的value值,默认使用jdk序列化机制,将序列化后的数据存到redis
    • 默认ttl时间:-1,即用不过期

    以上默认行为导致的结果与我们实际需求不同,所有我们可以自定义这些配置:

    自定义:

    • 指定生成的key : 通过key属性指定,接收一个spEL表达式
    • 指定缓存数据的过期时间:配置文件中配置
    • 将保存的value数据转为json格式

    原理:CacheAutoConfiguration -> RedisCacheConfiguration -> 自动配置了RedisCacheManager -> 初始化所有的缓存 -> 每个缓存决定使用什么配置 -> 如果redisCacheConfiguration有就用已有的,没有就使用默认配置

    所以,想要改缓存的配置,只需要给容器中注入一个 RedisCacheConfiguration即可

    就会应用到当前RedisCacheManager管理的所有缓存分区中;

    可以参考源码中默认配置来自己写一个RedisCacheConfiguration:

    源码中的默认配置:

     * <dd>{@link org.springframework.data.redis.serializer.StringRedisSerializer}</dd>
     * <dt>value serializer</dt>
     * <dd>{@link org.springframework.data.redis.serializer.JdkSerializationRedisSerializer}</dd>
    

    从这注释中可以看出 k采用的是字符串序列化,v采用的是jdk序列化

    自定义配置:

    @EnableCaching
    @Configuration
    @EnableConfigurationProperties(CacheProperties.class)
    public class MyRedisCacheConfig {
        @Bean
        RedisCacheConfiguration configuration(CacheProperties cacheProperties) {
            RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig();
            config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer()));
            config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericFastJsonRedisSerializer()));
            CacheProperties.Redis redisProperties = cacheProperties.getRedis();
            if (redisProperties.getTimeToLive() != null) {
                config = config.entryTtl(redisProperties.getTimeToLive());
            }
            if (redisProperties.getKeyPrefix() != null) {
                config = config.prefixCacheNameWith(redisProperties.getKeyPrefix());
            }
            if (!redisProperties.isCacheNullValues()) {
                config = config.disableCachingNullValues();
            }
            if (!redisProperties.isUseKeyPrefix()) {
                config = config.disableKeyPrefix();
            }
            return config;
        }
    }
    

    redis缓存的其他配置

    spring.cache.type=redis
    # 单位为毫秒
    spring.cache.redis.time-to-live=3600000
    # 缓存的key 加前缀, 可以用来区分redis中的值哪些是缓存用的数据
    # 如果这里配置了前缀 就代替了默认前缀 之前的默认前缀 缓存名字::
    spring.cache.redis.key-prefix=CHCHE_
    # 是否使用配置的前缀
    spring.cache.redis.use-key-prefix=true
    # 是否缓存空值,开启 防止缓存穿透
    spring.cache.redis.cache-null-values=true
    

    缓存数据测试:

    @Cacheable(value = {"category"}, key = "#root.method.name")
    @Override
    public List<CategoryEntity> getLevelOne() { ... }
    

    删除缓存测试:

    // 分类数据更新的时候 触发删除缓存 指定缓存分区 再指定key  注意这里的key 接收的是一个spEL表达式,如果是普通字符串 需要里面加单引号
    @CacheEvict(value = "category", key = "'getLevelOne'")
    

    如果一个要删除多个缓存,就可以使用@Caching,它可以组合其他注解

    @Caching(evict = {
      @CacheEvict(value = "category", key = "'getLevelOne'"),
      @CacheEvict(value = "category", key = "'getCatalogJson'")
    })
    

    或者可以指定删除某个缓存分区下的所有缓存,这也是使用缓存分区的好处

    @CacheEvict(value = "category", allEntries = true)
    

    所以我们约定,存储同一类型的数据 使用同一个缓存分区

    且为了方便管理 配置文件中,不知道自定义前缀,就使用默认的 分区名为前缀

    Spring-Cache 的不足:

    使用@Cacheable时可以指定sync = true解决缓存击穿问题,但是不是分布式锁。

  • 相关阅读:
    Educational Codeforces Round 28 B. Math Show
    Educational Codeforces Round 28 A. Curriculum Vitae
    Codeforces Round #433 (Div. 2, based on Olympiad of Metropolises) D. Jury Meeting
    Codeforces Round #433 (Div. 2, based on Olympiad of Metropolises) C. Planning
    2017 Multi-University Training Contest 2 hdu 6047
    2017 Multi-University Training Contest 2 hdu 6045
    2017 Multi-University Training Contest 1 hdu 6043
    2017 Multi-University Training Contest 1 hdu 6034
    2017 Multi-University Training Contest 1 hdu 6033
    poj 2104主席树求区间第k小
  • 原文地址:https://www.cnblogs.com/chengming104/p/13857490.html
Copyright © 2011-2022 走看看