zoukankan      html  css  js  c++  java
  • springboot集成redis实现消息发布订阅模式-跨多服务器

    环境:SpringBoot + jdk1.8 

    基础配置参考
    https://blog.csdn.net/llll234/article/details/80966952

    查看了基础配置那么会遇到一下几个问题:

    1.实际应用中可能会订阅多个通道,而一下这种写法不太通用
    container.addMessageListener(listenerAdapter(new RedisPmpSub()),new PatternTopic("pmp"));

    2.使用过程中使用new RedisPmpSub()配置消息接收对象会有问题。
    如果RedisPmpSub既是消息接收类,也是消息处理类。那么如果此时需要注入Bean,会成功吗?

    3.考虑后期的扩展性是否能尽量不改变原有代码的基础上,进行扩展

    额外的配置文件

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>RELEASE</version>
    </dependency>
    
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
    </dependency>

    由于GsonUtil依赖的是某个SDK,GsonUtil.toJson(this, BasePubMessage.class)可替换为
    new Gson().toJson(this, BasePubMessage.class);
    lombok需要下载插件

    发布者

    枚举定义

    考虑到可维护性,采用枚举的方式定义管道RedisChannelEnums

     1 public enum RedisChannelEnums {
     2 
     3     /**redis频道code定义 需要与发布者一致*/
     4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE","直播信息改变"),
     5 
     6     ;
     7     /** 枚举定义+描述 */
     8     private String code;
     9     private String description;
    10 
    11     RedisChannelEnums(String code, String description) {
    12         this.code = code;
    13         this.description = description;
    14     }
    15 
    16 
    17     /** 根据code获取对应的枚举对象 */
    18     public static RedisChannelEnums getEnum(String code) {
    19         RedisChannelEnums[] values = RedisChannelEnums.values();
    20         if (null != code && values.length > 0) {
    21             for (RedisChannelEnums value : values) {
    22                 if (value.code == code) {
    23                     return value;
    24                 }
    25             }
    26         }
    27         return null;
    28     }
    29 
    30     /** 该code在枚举列表code属性是否存在 */
    31     public static boolean containsCode(String code) {
    32         RedisChannelEnums anEnum = getEnum(code);
    33         return anEnum != null;
    34     }
    35 
    36     /** 判断code与枚举中的code是否相同 */
    37     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
    38         return calendarSourceEnum.code == code;
    39     }
    40 
    41 
    42     public String getCode() {
    43         return code;
    44     }
    45 
    46     public String getDescription() {
    47         return description;
    48     }
    49 
    50 
    51 }

    消息模板

    为了兼容不同的业务场景,需要定义消息模板对象BasePubMessage
    其中ToString方法的作用是将对象转成Json字符

     1 @Data
     2 public abstract class BasePubMessage {
     3 
     4     /**发布订阅频道名称*/
     5     protected String channel;
     6 
     7     protected String extra;
     8 
     9     @Override
    10     public String toString() {
    11         return GsonUtil.toJson(this, BasePubMessage.class);
    12     }
    13 
    14 }

    消息对象LiveChangeMessage
    其中ToString方法的作用是将对象转成Json字符

     1 @Data
     2 public class LiveChangeMessage extends BasePubMessage {
     3 
     4 
     5     /**直播Ids*/
     6     private String liveIds;
     7 
     8     @Override
     9     public String toString() {
    10         return GsonUtil.toJson(this, LiveChangeMessage.class);
    11     }
    12 
    13 }

    发布者服务

    public interface RedisPub {
    
    
        /**
         * 集成redis实现消息发布订阅模式-双通道
         * @param redisChannelEnums 枚举定义
         * @param basePubMessage 消息
         */
        void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage);
    
    }
     1 @Service
     2 public class RedisPubImpl implements RedisPub {
     3 
     4     @Resource
     5     private StringRedisTemplate stringRedisTemplate;
     6 
     7     @Override
     8     public void sendMessage(RedisChannelEnums redisChannelEnums, BasePubMessage basePubMessage) {
     9 
    10         if(redisChannelEnums ==null || basePubMessage ==null){
    11             return;
    12         }
    13 
    14         basePubMessage.setChannel(redisChannelEnums.getCode());
    15         stringRedisTemplate.convertAndSend(redisChannelEnums.getCode(), basePubMessage.toString());
    16         System.out.println("发布成功!");
    17     }
    18 }

    订阅者

    注解配置

    RedisConfig作为订阅者的配置类,主要作用是:Redis消息监听器容器、配置消息接收处理类
    同时新加入的功能解决了我们上面提出的几个问题

     1 @Service
     2 @Configuration
     3 @EnableCaching
     4 public class RedisConfig {
     5 
     6 
     7     /**
     8      * 存放策略实例
     9      * classInstanceMap : key-beanName value-对应的策略实现
    10      */
    11     private ConcurrentHashMap<String, BaseSub> classInstanceMap = new ConcurrentHashMap<>(20);
    12 
    13     /**
    14      * 注入所有实现了Strategy接口的Bean
    15      *
    16      * @param strategyMap
    17      *         策略集合
    18      */
    19     @Autowired
    20     public RedisConfig(Map<String, BaseSub> strategyMap) {
    21         this.classInstanceMap.clear();
    22         strategyMap.forEach((k, v) ->
    23                 this.classInstanceMap.put(k.toLowerCase(), v)
    24         );
    25     }
    26 
    27 
    28     /**
    29      * Redis消息监听器容器
    30      *
    31      * @param connectionFactory
    32      *
    33      * @return
    34      */
    35     @Bean
    36     RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
    37 
    38         RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    39         container.setConnectionFactory(connectionFactory);
    40 
    41         RedisChannelEnums[] redisChannelEnums = RedisChannelEnums.values();
    42         if (redisChannelEnums.length > 0) {
    43             for (RedisChannelEnums redisChannelEnum : redisChannelEnums) {
    44                 if (redisChannelEnum == null || StringUtils.isEmpty(redisChannelEnum.getCode()) || redisChannelEnum.getClassName()==null) {
    45                     continue;
    46                 }
    47                 //订阅了一个叫pmp和channel 的通道,多通道
    48                 //一个订阅者接收一个频道信息,新增订阅者需要新增RedisChannelEnums定义+BaseSub的子类
    49 
    50                 String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
    51                 BaseSub baseSub = classInstanceMap.get(toLowerCase);
    52                 container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
    53             }
    54         }
    55         return container;
    56     }
    57 
    58     /**
    59      * 配置消息接收处理类
    60      *
    61      * @param baseSub
    62      *         自定义消息接收类
    63      *
    64      * @return MessageListenerAdapter
    65      */
    66     @Bean()
    67     @Scope("prototype")
    68     MessageListenerAdapter listenerAdapter(BaseSub baseSub) {
    69         //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
    70         //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
    71         //注意2个通道调用的方法都要为receiveMessage
    72         return new MessageListenerAdapter(baseSub, "receiveMessage");
    73     }
    74 
    75 }

      

    @Autowired
    public RedisConfig(Map<String, BaseSub> strategyMap) 方法的作用是将所有的配置消息接收处理类注入进来,那么消息接收处理类里面的注解对象也会注入进来。
    解决了我们提出的第二个问题
    
    而String toLowerCase = redisChannelEnum.getClassName().getSimpleName().toLowerCase();
    BaseSub baseSub = classInstanceMap.get(toLowerCase);
    container.addMessageListener(listenerAdapter(baseSub), new PatternTopic(redisChannelEnum.getCode()));
    是根据不同的管道对应不同的订阅者,也就是一个订阅者对应一个管道。方便根据不同的业务场景进行处理。
    使用这种方式主需要配置redisChannelEnum枚举即可,解决了我们提出的第一个问题。
    这样一来,订阅者就变得比较通用了

      

    枚举

    RedisChannelEnums作用:定义不同管道对应的订阅者,后期增加一个管道类型只需要增加一个枚举即可

     1 public enum RedisChannelEnums {
     2 
     3     /**redis频道名称定义 需要与发布者一致*/
     4     LIVE_INFO_CHANGE("LIVE_INFO_CHANGE", LiveChangeSub.class, "直播信息改变"),
     5 
     6     ;
     7     /** 枚举定义+描述 */
     8     private String code;
     9     private Class<? extends BaseSub> className;
    10     private String description;
    11 
    12     RedisChannelEnums(String code, Class<? extends BaseSub> className, String description) {
    13         this.code = code;
    14         this.className=className;
    15         this.description = description;
    16     }
    17 
    18 
    19     /** 根据code获取对应的枚举对象 */
    20     public static RedisChannelEnums getEnum(String code) {
    21         RedisChannelEnums[] values = RedisChannelEnums.values();
    22         if (null != code && values.length > 0) {
    23             for (RedisChannelEnums value : values) {
    24                 if (value.code == code) {
    25                     return value;
    26                 }
    27             }
    28         }
    29         return null;
    30     }
    31 
    32     /** 该code在枚举列表code属性是否存在 */
    33     public static boolean containsCode(String code) {
    34         RedisChannelEnums anEnum = getEnum(code);
    35         return anEnum != null;
    36     }
    37 
    38     /** 判断code与枚举中的code是否相同 */
    39     public static boolean equals(String code, RedisChannelEnums calendarSourceEnum) {
    40         return calendarSourceEnum.code == code;
    41     }
    42 
    43 
    44     public String getCode() {
    45         return code;
    46     }
    47 
    48     public String getDescription() {
    49         return description;
    50     }
    51 
    52     public Class<? extends BaseSub> getClassName() {
    53         return className;
    54     }
    55 }

    消息模板

    BaseSubMessage定义通用的字段,与json字符的通用转换

     1 @Data
     2 abstract class BaseSubMessage {
     3 
     4     /** 发布订阅频道名称 */
     5     private String channel;
     6 
     7     private String extra;
     8 
     9     private String json;
    10 
    11     BaseSubMessage(String json) {
    12         if(StringUtils.isEmpty(json)){
    13             return;
    14         }
    15 
    16         this.json = json;
    17         Map map = new Gson().fromJson(this.json, Map.class);
    18         BeanHelper.populate(this, map);
    19     }
    20 
    21 }

    LiveChangeMessage定义当前业务场景的字段

     1 @Data
     2 @ToString(callSuper = true)
     3 public class LiveChangeMessage extends BaseSubMessage {
     4 
     5     /** 直播Ids */
     6     private String liveIds;
     7 
     8     public LiveChangeMessage(String json) {
     9         super(json);
    10     }
    11 
    12 }

    订阅者服务

    BaseSub定义接收消息的通用方法

    1 public interface BaseSub {
    2 
    3     /**
    4      * 接收消息
    5      * @param jsonMessage  json字符
    6      */
    7     void receiveMessage(String jsonMessage);
    8 }


    LiveChangeSub具体消息接收对象

     1 @Component
     2 public class LiveChangeSub implements BaseSub {
     3 
     4     /**只是定义的注解测试,可以换成自己的*/
     5     @Autowired
     6     private CategoryMapper categoryMapper;
     7     
     8     @Override
     9     public void receiveMessage(String jsonMessage) {
    10 
    11         System.out.println("项目aries-server.....................");
    12         //注意通道调用的方法名要和RedisConfig2的listenerAdapter的MessageListenerAdapter参数2相同
    13         System.out.println("这是 LiveChangeSub" + "-----" + jsonMessage);
    14 
    15         LiveChangeMessage liveChangeMessage = new LiveChangeMessage(jsonMessage);
    16         System.out.println(liveChangeMessage);
    17         
    18         Category category = categoryMapper.get(1L);
    19         System.out.println("category:" + category);
    20 
    21 
    22     }
    23 }

    总结

    发布者配置场景:独立的服务器,独立的项目,A redis缓存服务器
    订阅者配置场景:不同于发布者的独立的服务器,独立的项目,A redis缓存服务器
    使用场景:一个发布者、一个或者多个订阅者。发布者负责发布消息,订阅者负责接收消息。一旦发布者消息发布出来,那么
    订阅者可以通过管道进行监听。同时可以根据不同的管道设置不同的消息接收者或者叫消息处理者。
    
    优点:容易配置,好管理
    缺点:由于基于redis去做,不同的redis服务就不适用了。需要考虑消息丢失,持久化的问题。
  • 相关阅读:
    C++虚继承内存布局
    编译OpenJDK记录
    Node.js + Express 调研
    软件工程开发工具
    Servlets & JSP & JavaBean 参考资料
    Eclipse AST 相关资料
    Git & github 最常用操作笔记
    Java入门学习资料整理
    从变量的类型转换看C语言的思维模式
    数学地图(1)
  • 原文地址:https://www.cnblogs.com/IT-study/p/11352254.html
Copyright © 2011-2022 走看看