zoukankan      html  css  js  c++  java
  • Reactive Spring实战 -- 响应式Redis交互

    本文分享Spring中如何实现Redis响应式交互模式。

    本文将模拟一个用户服务,并使用Redis作为数据存储服务器。
    本文涉及两个java bean,用户与权益

    public class User {
        private long id;
        private String name;
        // 标签
        private String label;
        // 收货地址经度
        private Double deliveryAddressLon;
        // 收货地址维度
        private Double deliveryAddressLat;
        // 最新签到日
        private String lastSigninDay;
        // 积分
        private Integer score;
        // 权益
        private List<Rights> rights;
        ...
    }
    
    public class Rights {
        private Long id;
        private Long userId;
        private String name;
        ...
    }
    

    启动

    引入依赖

        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
        </dependency>
    

    添加Redis配置

    spring.redis.host=192.168.56.102
    spring.redis.port=6379
    spring.redis.password=
    spring.redis.timeout=5000
    

    SpringBoot启动

    @SpringBootApplication
    public class UserServiceReactive {
        public static void main(String[] args) {
            new SpringApplicationBuilder(
                    UserServiceReactive.class)
                    .web(WebApplicationType.REACTIVE).run(args);
        }
    }
    

    应用启动后,Spring会自动生成ReactiveRedisTemplate(它的底层框架是Lettuce)。
    ReactiveRedisTemplate与RedisTemplate使用类似,但它提供的是异步的,响应式Redis交互方式。
    这里再强调一下,响应式编程是异步的,ReactiveRedisTemplate发送Redis请求后不会阻塞线程,当前线程可以去执行其他任务。
    等到Redis响应数据返回后,ReactiveRedisTemplate再调度线程处理响应数据。
    响应式编程可以通过优雅的方式实现异步调用以及处理异步结果,正是它的最大的意义。

    序列化

    ReactiveRedisTemplate默认使用的序列化是Jdk序列化,我们可以配置为json序列化

    @Bean
    public RedisSerializationContext redisSerializationContext() {
        RedisSerializationContext.RedisSerializationContextBuilder builder = RedisSerializationContext.newSerializationContext();
        builder.key(StringRedisSerializer.UTF_8);
        builder.value(RedisSerializer.json());
        builder.hashKey(StringRedisSerializer.UTF_8);
        builder.hashValue(StringRedisSerializer.UTF_8);
    
        return builder.build();
    }
    
    @Bean
    public ReactiveRedisTemplate reactiveRedisTemplate(ReactiveRedisConnectionFactory connectionFactory) {
        RedisSerializationContext serializationContext = redisSerializationContext();
        ReactiveRedisTemplate reactiveRedisTemplate = new ReactiveRedisTemplate(connectionFactory,serializationContext);
        return reactiveRedisTemplate;
    }
    

    builder.hashValue方法指定Redis列表值的序列化方式,由于本文Redis列表值只存放字符串,所以还是设置为StringRedisSerializer.UTF_8。

    基本数据类型

    ReactiveRedisTemplate支持Redis字符串,散列,列表,集合,有序集合等基本的数据类型。
    本文使用散列保存用户信息,列表保存用户权益,其他基本数据类型的使用本文不展开。

    public Mono<Boolean>  save(User user) {
        ReactiveHashOperations<String, String, String> opsForHash = redisTemplate.opsForHash();
        Mono<Boolean>  userRs = opsForHash.putAll("user:" + user.getId(), beanToMap(user));
        if(user.getRights() != null) {
            ReactiveListOperations<String, Rights> opsForRights = redisTemplate.opsForList();
            opsForRights.leftPushAll("user:rights:" + user.getId(), user.getRights()).subscribe(l -> {
                logger.info("add rights:{}", l);
            });
        }
        return userRs;
    }
    

    beanToMap方法负责将User类转化为map。

    HyperLogLog

    Redis HyperLogLog结构可以统计一个集合内不同元素的数量。
    使用HyperLogLog统计每天登录的用户量

    public Mono<Long>  login(User user) {
        ReactiveHyperLogLogOperations<String, Long> opsForHyperLogLog = redisTemplate.opsForHyperLogLog();
        return opsForHyperLogLog.add("user:login:number:" + LocalDateTime.now().toString().substring(0, 10), user.getId());
    }
    

    BitMap

    Redis BitMap(位图)通过一个Bit位表示某个元素对应的值或者状态。由于Bit是计算机存储中最小的单位,使用它进行储存将非常节省空间。
    使用BitMap记录用户本周是否有签到

    public void addSignInFlag(long userId) {
        String key = "user:signIn:" + LocalDateTime.now().getDayOfYear()/7 + (userId >> 16);
        redisTemplate.opsForValue().setBit(
                key, userId & 0xffff , true)
        .subscribe(b -> logger.info("set:{},result:{}", key, b));
    }
    

    userId高48位用于将用户划分到不同的key,低16位作为位图偏移参数offset。
    offset参数必须大于或等于0,小于2^32(bit 映射被限制在 512 MB 之内)。

    Geo

    Redis Geo可以存储地理位置信息,并对地理位置进行计算。
    如查找给定范围内的仓库信息

    public Flux getWarehouseInDist(User u, double dist) {
        ReactiveGeoOperations<String, String> geo = redisTemplate.opsForGeo();
        Circle circle = new Circle(new Point(u.getDeliveryAddressLon(), u.getDeliveryAddressLat()), dist);
        RedisGeoCommands.GeoRadiusCommandArgs args =
                RedisGeoCommands.GeoRadiusCommandArgs.newGeoRadiusArgs().includeDistance().sortAscending();
        return geo.radius("warehouse:address", circle, args);
    }
    

    warehouse:address这个集合中需要先保存好仓库地理位置信息。
    ReactiveGeoOperations#radius方法可以查找集合中地理位置在给定范围内的元素,它中还支持添加元素到集合,计算集合中两个元素地理位置距离等操作。

    Lua

    ReactiveRedisTemplate也可以执行Lua脚本。
    下面通过Lua脚本完成用户签到逻辑:如果用户今天未签到,允许签到,积分加1,如果用户今天已签到,则拒接操作。

    public Flux<String> addScore(long userId) {
        DefaultRedisScript<String> script = new DefaultRedisScript<>();
        script.setScriptSource(new ResourceScriptSource(new ClassPathResource("/signin.lua")));
        List<String> keys = new ArrayList<>();
        keys.add(String.valueOf(userId));
        keys.add(LocalDateTime.now().toString().substring(0, 10));
        return redisTemplate.execute(script, keys);
    }
    

    signin.lua内容如下

    local score=redis.call('hget','user:'..KEYS[1],'score')
    local day=redis.call('hget','user:'..KEYS[1],'lastSigninDay')
    if(day==KEYS[2])
        then
        return '0'
    else
        redis.call('hset','user:'..KEYS[1],'score', score+1,'lastSigninDay',KEYS[2])
        return '1'
    end
    

    Stream

    Redis Stream 是 Redis 5.0 版本新增加的数据类型。该类型可以实现消息队列,并提供消息的持久化和主备复制功能,并且可以记住每一个客户端的访问位置,还能保证消息不丢失。

    Redis借鉴了kafka的设计,一个Stream内可以存在多个消费组,一个消费组内可以存在多个消费者。
    如果一个消费组内某个消费者消费了Stream中某条消息,则这消息不会被该消费组其他消费者消费到,当然,它还可以被其他消费组中某个消费者消费到。

    下面定义一个Stream消费者,负责处理接收到的权益数据

    @Component
    public class RightsStreamConsumer implements ApplicationRunner, DisposableBean {
        private static final Logger logger = LoggerFactory.getLogger(RightsStreamConsumer.class);
    
        @Autowired
        private RedisConnectionFactory redisConnectionFactory;
    
        private StreamMessageListenerContainer<String, ObjectRecord<String, Rights>> container;
        // Stream队列
        private static final String STREAM_KEY = "stream:user:rights";
        // 消费组
        private static final String STREAM_GROUP = "user-service";
        // 消费者
        private static final String STREAM_CONSUMER = "consumer-1";
    
        @Autowired
        @Qualifier("reactiveRedisTemplate")
        private ReactiveRedisTemplate redisTemplate;
    
        public void run(ApplicationArguments args) throws Exception {
    
            StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Rights>> options =
                    StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                            .batchSize(100) //一批次拉取的最大count数
                            .executor(Executors.newSingleThreadExecutor())  //线程池
                            .pollTimeout(Duration.ZERO) //阻塞式轮询
                            .targetType(Rights.class) //目标类型(消息内容的类型)
                            .build();
            // 创建一个消息监听容器
            container = StreamMessageListenerContainer.create(redisConnectionFactory, options);
    
            // prepareStreamAndGroup查找Stream信息,如果不存在,则创建Stream
            prepareStreamAndGroup(redisTemplate.opsForStream(), STREAM_KEY , STREAM_GROUP)
                    .subscribe(stream -> {
                // 为Stream创建一个消费者,并绑定处理类
                container.receive(Consumer.from(STREAM_GROUP, STREAM_CONSUMER),
                        StreamOffset.create(STREAM_KEY, ReadOffset.lastConsumed()),
                        new StreamMessageListener());
                container.start();
            });
        }
    
        @Override
        public void destroy() throws Exception {
            container.stop();
        }
    
        // 查找Stream信息,如果不存在,则创建Stream
        private Mono<StreamInfo.XInfoStream> prepareStreamAndGroup(ReactiveStreamOperations<String, ?, ?> ops, String stream, String group) {
            // info方法查询Stream信息,如果该Stream不存在,底层会报错,这时会调用onErrorResume方法。
            return ops.info(stream).onErrorResume(err -> {
                logger.warn("query stream err:{}", err.getMessage());
                // createGroup方法创建Stream
                return ops.createGroup(stream, group).flatMap(s -> ops.info(stream));
            });
        }
    
        // 消息处理对象
        class  StreamMessageListener implements StreamListener<String, ObjectRecord<String, Rights>> {
            public void onMessage(ObjectRecord<String, Rights> message) {
                // 处理消息
                RecordId id = message.getId();
                Rights rights = message.getValue();
                logger.info("receive id:{},rights:{}", id, rights);
                redisTemplate.opsForList().leftPush("user:rights:" + rights.getUserId(), rights).subscribe(l -> {
                    logger.info("add rights:{}", l);
                });
            }
        }
    }
    

    下面看一下如何发送信息

    public Mono<RecordId> addRights(Rights r) {
        String streamKey = "stream:user:rights";//stream key
        ObjectRecord<String, Rights> record = ObjectRecord.create(streamKey, r);
        Mono<RecordId> mono = redisTemplate.opsForStream().add(record);
        return mono;
    }
    

    创建一个消息记录对象ObjectRecord,并通过ReactiveStreamOperations发送信息记录。

    Sentinel、Cluster

    ReactiveRedisTemplate也支持Redis Sentinel、Cluster集群模式,只需要调整配置即可。
    Sentinel配置如下

    spring.redis.sentinel.master=mymaster
    spring.redis.sentinel.nodes=172.17.0.4:26379,172.17.0.5:26379,172.17.0.6:26379
    spring.redis.sentinel.password=
    

    spring.redis.sentinel.nodes配置的是Sentinel节点IP地址和端口,不是Redis实例节点IP地址和端口。

    Cluster配置如下

    spring.redis.cluster.nodes=172.17.0.2:6379,172.17.0.3:6379,172.17.0.4:6379,172.17.0.5:6379,172.17.0.6:6379,172.17.0.7:6379
    spring.redis.lettuce.cluster.refresh.period=10000
    spring.redis.lettuce.cluster.refresh.adaptive=true
    

    如Redis Cluster中node2是node1的从节点,Lettuce中会缓存该信息,当node1宕机后,Redis Cluster会将node2升级为主节点。但Lettuce不会自动将请求切换到node2,因为它的缓冲没有刷新。
    开启spring.redis.lettuce.cluster.refresh.adaptive配置,Lettuce可以定时刷新Redis Cluster集群缓存信息,动态改变客户端的节点情况,完成故障转移。

    暂时未发现ReactiveRedisTemplate实现pipeline,事务的方案。

    官方文档:https://docs.spring.io/spring-data/redis/docs/current/reference/html/#redis:reactive
    文章完整代码:https://gitee.com/binecy/bin-springreactive/tree/master/user-service

    如果您觉得本文不错,欢迎关注我的微信公众号,系列文章持续更新中。您的关注是我坚持的动力!

  • 相关阅读:
    JMeter笔记:关于时间函数_time的简单使用(时间戳、时间)
    搜索&分页功能主要测试点
    JMeter笔记十五:逻辑控制器之临界部分控制器(Critical Section Controller)
    JMeter笔记十四:Beanshell取样器
    Python3+RobotFramework自动化测试十:接口测试
    Github Pages中的Octopress框架搭建个人博客(下)
    pycharm破解
    计算机网络 UDP协议
    计算机网络 路由器协议
    计算机网络 TCP协议
  • 原文地址:https://www.cnblogs.com/binecy/p/14531413.html
Copyright © 2011-2022 走看看