zoukankan      html  css  js  c++  java
  • 补习系列(13)-springboot redis 与发布订阅

    一、订阅发布

    订阅发布是一种常见的设计模式,常见于消息系统的场景。
    如下面的图:

    [图来自百科]
    消息发布者是消息载体的生产者,其通过某些主题来向调度中心发送消息;
    而消息订阅者会事先向调度中心订阅其"感兴趣"的主题,随后会获得新消息。
    在这里,调度中心是一个负责消息控制中转的逻辑实体,可以是消息队列如ActiveMQ,也可以是Web服务等等。

    常见应用

    • 微博,每个用户的粉丝都是该用户的订阅者,当用户发完微博,所有粉丝都将收到他的动态;
    • 新闻,资讯站点通常有多个频道,每个频道就是一个主题,用户可以通过主题来做订阅(如RSS),这样当新闻发布时,订阅者可以获得更新。

    二、Redis 与订阅发布

    Redis 支持 (pub/sub) 的订阅发布能力,客户端可以通过channel(频道)来实现消息的发布及接收。

    1. 客户端通过 SUBSCRIBE 命令订阅 channel;

    1. 客户端通过PUBLISH 命令向channel 发送消息;

    而后,订阅 channel的客户端可实时收到消息。

    除了简单的SUBSCRIBE/PUBLISH命令之外,Redis还支持订阅某一个模式的主题(正则表达式),
    如下:

    PSUBSCRIBE  /topic/cars/*
    

    于是,我们可以利用这点实现相对复杂的订阅能力,比如:

    • 在电商平台中订阅多个品类的商品促销信息;
    • 智能家居场景,APP可以订阅所有房间的设备消息。
      ...

    尽管如此,Redis pub/sub 机制存在一些缺点:

    • 消息无法持久化,存在丢失风险;
    • 没有类似 RabbitMQ的ACK机制;
    • 由于是广播机制,无法通过添加worker 提升消费能力;

    因此,Redis 的订阅发布建议用于实时且可靠性要求不高的场景。

    三、SpringBoot 与订阅发布

    接下来,看一下SpringBoot 怎么实现订阅发布的功能。

    spring-boot-starter-data-redis 帮我们实现了Jedis的引入,pom 依赖如下:

     <!-- redis -->
      <dependency>
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-data-redis</artifactId>
       <version>${spring-boot.version}</version>
      </dependency>
    

    application.properties 中指定配置

    # redis 连接配置
    spring.redis.database=0 
    spring.redis.host=127.0.0.1
    spring.redis.password=
    spring.redis.port=6379
    spring.redis.ssl=false
    
    # 连接池最大数
    spring.redis.pool.max-active=10 
    # 空闲连接最大数
    spring.redis.pool.max-idle=10
    # 获取连接最大等待时间(s)
    spring.redis.pool.max-wait=600000
    

    A. 消息模型

    消息模型描述了订阅发布的数据对象,这要求生产者与消费者都能理解
    以下面的POJO为例:

        public static class SimpleMessage {
    
            private String publisher;
            private String content;
            private Date createTime;
    
    

    在SimpleMessage类中,我们声明了几个字段:

    字段名 说明
    publisher 发布者
    content 文本内容
    createTime 创建时间

    B. 序列化

    如下的代码采用了JSON 作为序列化方式:

    @Configuration
    public class RedisConfig {
    
        private static final Logger logger = LoggerFactory.getLogger(RedisConfig.class);
    
        /**
         * 序列化定制
         * 
         * @return
         */
        @Bean
        public Jackson2JsonRedisSerializer<Object> jackson2JsonSerializer() {
            Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(
                    Object.class);
    
            // 初始化objectmapper
            ObjectMapper mapper = new ObjectMapper();
            mapper.setSerializationInclusion(Include.NON_NULL);
            mapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
            jackson2JsonRedisSerializer.setObjectMapper(mapper);
            return jackson2JsonRedisSerializer;
        }
    
        /**
         * 操作模板
         * 
         * @param connectionFactory
         * @param jackson2JsonRedisSerializer
         * @return
         */
        @Bean
        public RedisTemplate<String, Object> redisTemplate(JedisConnectionFactory connectionFactory,
                Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer) {
    
            RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
            template.setConnectionFactory(connectionFactory);
    
            // 设置key/hashkey序列化
            RedisSerializer<String> stringSerializer = new StringRedisSerializer();
            template.setKeySerializer(stringSerializer);
            template.setHashKeySerializer(stringSerializer);
    
            // 设置值序列化
            template.setValueSerializer(jackson2JsonRedisSerializer);
            template.setHashValueSerializer(jackson2JsonRedisSerializer);
            template.afterPropertiesSet();
    
            return template;
        }
    

    C. 发布消息

    消息发布,需要先指定一个ChannelTopic对象,随后通过RedisTemplate方法操作。

    @Service
    public class RedisPubSub {  
        private static final Logger logger = LoggerFactory.getLogger(RedisPubSub.class);
    
        @Autowired
        private RedisTemplate<String, Object> redisTemplate;
    
        private ChannelTopic topic = new ChannelTopic("/redis/pubsub");
    
      
        @Scheduled(initialDelay = 5000, fixedDelay = 10000)
        private void schedule() {
            logger.info("publish message");
            publish("admin", "hey you must go now!");
        }
    
        /**
         * 推送消息
         * 
         * @param publisher
         * @param message
         */
        public void publish(String publisher, String content) {
            logger.info("message send {} by {}", content, publisher);
    
            SimpleMessage pushMsg = new SimpleMessage();
            pushMsg.setContent(content);
            pushMsg.setCreateTime(new Date());
            pushMsg.setPublisher(publisher);
    
            redisTemplate.convertAndSend(topic.getTopic(), pushMsg);
        }
    

    上述代码使用一个定时器(@Schedule)来做发布,为了保证运行需要在主类中启用定时器注解:

    @EnableScheduling
    @SpringBootApplication
    public class BootSampleRedis{
    ...
    }
    

    D. 接收消息

    定义一个消息接收处理的Bean:

        @Component
        public static class MessageSubscriber {
    
            public void onMessage(SimpleMessage message, String pattern) {
                logger.info("topic {} received {} ", pattern, JsonUtil.toJson(message));
            }
        }
    

    接下来,利用 MessageListenerAdapter 可将消息通知到Bean方法:

           /**
             * 消息监听器,使用MessageAdapter可实现自动化解码及方法代理
             * 
             * @return
             */
            @Bean
            public MessageListenerAdapter listener(Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer,
                    MessageSubscriber subscriber) {
                MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "onMessage");
                adapter.setSerializer(jackson2JsonRedisSerializer);
                adapter.afterPropertiesSet();
                return adapter;
            }
    

    最后,关联到消息发布的Topic:

            /**
             * 将订阅器绑定到容器
             * 
             * @param connectionFactory
             * @param listenerAdapter
             * @return
             */
            @Bean
            public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                    MessageListenerAdapter listener) {
    
                RedisMessageListenerContainer container = new RedisMessageListenerContainer();
                container.setConnectionFactory(connectionFactory);
                container.addMessageListener(listener, new PatternTopic("/redis/*"));
                return container;
            }
    

    运行结果
    启动程序,从控制台可输出:

    .RedisPubSub : publish message
    .RedisPubSub : message send hey you must go now! by admin
    .RedisPubSub : topic /redis/* received {"publisher":"admin","content":"hey you must go now!","createTime":1543418694007} 
    

    这样,我们便完成了订阅发布功能。

    示例程序下载

    小结

    消息订阅发布是分布式系统中的常用手段,也经常用来实现系统解耦、性能优化等目的;
    当前小节结合SpringBoot 演示了 Redis订阅发布(pub/sub)的实现,在部分场景下可以参考使用。
    欢迎继续关注"美码师的补习系列-springboot篇" ,期待更多精彩内容-

  • 相关阅读:
    【File类:重命名功能】
    一段代码-Java
    Galahad
    简单的中位数
    小A的题 线段树区间赋值
    上升子序列方案数
    Superdoku 二分图匹配
    Haybale Guessing 区间并查集
    Dijkstra+二分查找
    莫比乌斯反演
  • 原文地址:https://www.cnblogs.com/littleatp/p/10035786.html
Copyright © 2011-2022 走看看