zoukankan      html  css  js  c++  java
  • SpringBoot整合Redis实现发布订阅功能实践

    该项目代码下载

    一、项目结构

    我首先用 SpringBoot Initializer 创建一个简单的 Demo,然后在 Demo 上进行修改,这样更便捷。项目结构如下图所示:

    项目结构也很简单

    • PrintMessageListener 负责处理订阅消息,我仅仅是打印了收到的Redis信息;
    • AdminController 负责从浏览器输入url,实现动态订阅/取消订阅以及发布;
    • RedisConfiguration 可能是最重要的,需要负责向 Spring容器注入以下 Bean:
      • RedisTemplate :可以通过调用它的 convertAndSend(channel, Object message) 方法 发布消息;
      • RedisMessageListenerContainer ,可以通过调用它的 addMessageListener(MessageListener listener, Topic topic) 方法 订阅消息;相反地,也可以调用它的 removeMessageListener(MessageListener listener, Topic topic) 方法 取消订阅消息;
    • PubsubApplication 是 SpringBoot 的启动类;
    • logback.xml 配置内容可以参考 这篇文章

    PS:作为 Maven 项目,肯定还要有 pom.xml,图片中没有反映出来,所以我补充一下。

    二、Maven 依赖

    项目需要引入的依赖包括:

    • spring-boot-starter-web:帮助我们启动一个Web服务器;
    • spring-boot-starter-data-redis:帮助我们集成Redis;
    • lombok:方便我们使用 @Slf4j/@Data 等,简化代码;
    • slf4j-api:让我们能够使用 LoggerLoggerFactory 等类;
    • logback-classic:让我们能够真正打印出日志。

    完整的 pom.xml 文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
      </parent>
    
      <groupId>com.example.demo</groupId>
      <artifactId>pubsub</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <name>pubsub</name>
      <description>Demo project for Spring Boot</description>
    
      <properties>
        <java.version>1.8</java.version>
        <slf4j.version>1.7.32</slf4j.version>
        <logback.version>1.2.6</logback.version>
      </properties>
      <dependencies>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
    
        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <optional>true</optional>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
        <dependency>
          <groupId>ch.qos.logback</groupId>
          <artifactId>logback-classic</artifactId>
          <version>${logback.version}</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-test</artifactId>
          <scope>test</scope>
        </dependency>
      </dependencies>
    
      <build>
        <plugins>
          <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
    	<configuration>
    	  <excludes>
    	    <exclude>
    	      <groupId>org.projectlombok</groupId>
    	      <artifactId>lombok</artifactId>
                </exclude>
    	  </excludes>
            </configuration>
          </plugin>
        </plugins>
      </build>
    
    </project>
    

    三、消息监听

    我们收到发布的消息后,需要处理逻辑,这部分逻辑写在 PrintMessageListener 中:

    package com.example.demo.pubsub.listener;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    /**
     * 功能描述:打印收到的Redis信息
     *
     * @author geekziyu
     * @version 1.0.0
     */
    @Slf4j
    public class PrintMessageListener implements MessageListener {
    
        private StringRedisSerializer stringRedisSerializer;
    
        public PrintMessageListener(StringRedisSerializer stringRedisSerializer) {
            this.stringRedisSerializer = stringRedisSerializer;
        }
    
        @Override
        public void onMessage(Message message, byte[] pattern) {
            String channel = stringRedisSerializer.deserialize(message.getChannel());
            String body = stringRedisSerializer.deserialize(message.getBody());
            handleMessage(channel, body);
        }
    
        private void handleMessage(String channel, String body) {
            log.info("消费Redis消息\n channel:{}\n body:{}", channel, body);
        }
    }
    

    四、Redis配置

    前面也说过了,我们要使用 spring-boot-starter-data-redis 中提供的API实现Redis发布和订阅消息,就需要用到 RedisTemplateRedisMessageListenerContainer,现在就来把他们注入Spring容器:

    package com.example.demo.pubsub.config;
    
    import com.example.demo.pubsub.listener.PrintMessageListener;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    
    /**
     * 功能描述:Redis 配置
     *
     * @author geekziyu
     * @version 1.0.0
     */
    @Configuration
    public class RedisConfiguration {
    
        @Bean
        public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory) {
            RedisMessageListenerContainer result = new RedisMessageListenerContainer();
            result.setConnectionFactory(redisConnectionFactory);
    
            return result;
        }
    
        @Bean("redisTemplate")
        public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
            RedisTemplate<String, String> result = new RedisTemplate<>();
            result.setConnectionFactory(factory);
    
            result.setKeySerializer(stringRedisSerializer());
            result.setHashKeySerializer(stringRedisSerializer());
    
            result.setValueSerializer(stringRedisSerializer());
            result.setHashValueSerializer(stringRedisSerializer());
            return result;
        }
    
        @Bean
        public PrintMessageListener printMessageListener() {
            return new PrintMessageListener(stringRedisSerializer());
        }
    
        @Bean
        public StringRedisSerializer stringRedisSerializer() {
            return new StringRedisSerializer();
        }
    }
    

    需要注意的有以下几点:
    第一、如果不调用 setConnectionFactory(RedisConnectionFactory),给 RedisMessageListenerContainer 设置连接工厂,在调用 addMessageListener 执行订阅时,会出现空指针异常,具体发生异常的位置如下图:

    第二、如果不调用 RedisTemplatesetConnectionFactory 方法设置Redis连接工厂,会在启动时就发生异常,如下图所示:

    // 说明 RedisConnectionFactory 对于 RedisTemplate 而言是必需的!
    Caused by: java.lang.IllegalStateException: RedisConnectionFactory is required
    	at org.springframework.util.Assert.state(Assert.java:76)
    	at org.springframework.data.redis.core.RedisAccessor.afterPropertiesSet(RedisAccessor.java:38)
    	at org.springframework.data.redis.core.RedisTemplate.afterPropertiesSet(RedisTemplate.java:128)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1845)
    	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1782)
    

    五、通过HTTP请求订阅发布

    我这里用 AdminController 来接受发布和订阅/取消订阅的请求,源代码如下:

    package com.example.demo.pubsub.controller;
    
    import com.example.demo.pubsub.listener.PrintMessageListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.data.redis.connection.MessageListener;
    import org.springframework.data.redis.core.RedisTemplate;
    import org.springframework.data.redis.listener.ChannelTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import org.springframework.data.redis.serializer.StringRedisSerializer;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 功能描述:后台控制器
     *
     * @author geekziyu
     * @version 1.0.0
     */
    @RestController
    @RequestMapping("/admin")
    public class AdminController {
    
        @Autowired
        private RedisTemplate<String, String> redisTemplate;
    
        @Autowired
        private RedisMessageListenerContainer container;
    
        private Map<String, MessageListener> registeredListener = new HashMap<>();
    
        @Autowired
        private StringRedisSerializer stringRedisSerializer;
    
    
        @GetMapping("/pub")
        public String publish(String channel, String body) {
            redisTemplate.convertAndSend(channel, body);
            return "ok";
        }
    
        @GetMapping("/sub")
        public String subscribe(String channel) {
            MessageListener listener = registeredListener.computeIfAbsent(channel, ch -> new PrintMessageListener(stringRedisSerializer));
            container.addMessageListener(listener, new ChannelTopic(channel));
            return "ok";
        }
    
        @GetMapping("/unsub")
        public String unsubscribe(String channel) {
            MessageListener messageListener = registeredListener.get(channel);
            if (messageListener != null) {
                container.removeMessageListener(messageListener, new ChannelTopic(channel));
            }
            return "ok";
        }
    
    }
    

    六、打印日志

    为了顺利的在控制台输出日志,你可能需要 logback.xml 的完整代码:

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
      <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
          <pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
      </appender>
    
      <root level="info">
        <appender-ref ref="STDOUT"/>
      </root>
    </configuration>
    

    小节

    这样,我们就已经可以实现发布订阅了。

    首先订阅一下:

    http://localhost:8080/admin/sub?channel=dream
    

    再发布一下:

    http://localhost:8080/admin/pub?channel=dream&body=engineer
    

    检查控制台,Redis消息消费成功:

    需要注意,你的 application.properties 中Redis的连接默认为 localhost:6379

    spring.redis.host=localhost
    spring.redis.port=6379
    

    你需要确保本地已经启动了Redis,且服务端口是6379。如果你不熟悉如何搭建Redis,那么你需要修改 Redis 连接到一个可用的 Redis 服务上去。

    参考文档

    SpringBoot整合Redis实现消息发布订阅

  • 相关阅读:
    2020-12
    知识的深度跟知识的广度
    限额类费用报销单N+1原则
    用友实习总结
    NC57,NC63-NC二开经验总结
    union和union all的区别
    2020
    mark_rabbitMQ
    营销之路
    怎么对ORACLE里的CLOB字段进行模糊查询
  • 原文地址:https://www.cnblogs.com/kendoziyu/p/15802698.html
Copyright © 2011-2022 走看看