zoukankan      html  css  js  c++  java
  • 基于Redis队列实现《一》——SUB/PUB模式

    一、背景

    最近项目上通过VSP+API的模式对接京东商城,其中会涉及很多消息通知。我们的业务有很多场景需要很多根据消息通知来同步创建相应业务数据,更新相关数据的状态。

    考虑到后期的用户会越来越多,同一个时刻会产生很多不同的消息,同一个消息一次也会获取到很多条,所以想通过定时任务+队列的方式来异步处理消息。这里没有使用

    传统的消息队列中间件,而是更加轻量级的redis队列。

    二、队列基础介绍

    这里先简单介绍一下队列的基础知识,队列常用于处理业务之间的异步,服务与服务之间的解耦,消息通知,请求前端的削峰和处理消费者与生产者模式下两端处理能力不

    致的情况。常见的队列模式有两种——发布/订阅模式 和 点对点模式。

    发布/订阅模式

    这里先来个示意图(如下),这里可以联想到网络里的UDP协议,该模式主要用于类似消息通知的场景:

     

    点对点模式

    意图(如下),这里可以联想到网络模式下的TCP协议:

     

    三、实现示例

    说明:这里使用的java语言,spring boot 项目,这里假设使用RedisTemplate做为redis客户端,这里对spring boot集成redis客户端的相关代码和配置略过。

    该实现主要基于Listener模式实现,业务方将消息推送到指定的channel(队列名),订阅过对应channel的服务方就会处理对应的消息,这里的服务方法实现了幂等性,所以不会出现消息重复消费的问题。

    1.配置RedisCofig类 
    /**
     * Created by liuyq on 2021/3/21
     */
    @Configuration
    public class RedisConfig {
    
        
        @Bean
        RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                MessageListenerAdapter recvAdapter) {
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //添加待处理消息1
            container.addMessageListener(recvAdapter, new PatternTopic("handle_message_queue_prefix1"));
           //添加待处理消息5
            container.addMessageListener(recvAdapter, new PatternTopic("handle_message_queue_prefix5"));
           
            return container;
        }
    
        @Bean
        MessageListenerAdapter recvAdapter(RedisRecv receiver){
            //实例化MessageListenerAdapter,并指定处理方法为recvMsg
            return new MessageListenerAdapter(receiver, "recvMsg");
        }
    
        @Bean
        StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
            //实例化RedisTemplate类
            return new StringRedisTemplate(connectionFactory);
        }
    }
    

     2.  消息接收与处理类 RedisRecv

    /**
     * Created by liuyq on 2021/3/21
     */
    @Slf4j @Component public class RedisRecv { @Autowired private JdHandleMessageService handleService; /** * 处理redis接收消息 * @param msg 消息 * @param channel 通道 */ public void recvMsg(String msg, String channel) { log.info("-----------------------------recvMsg: {}", channel); //处理普通消息 if (channel.startsWith(Constant.handle_message_queue_prefix)) { String typeStr = channel.substring(channel.lastIndexOf('_') + 1); int type = Integer.parseInt(typeStr); log.info("消息处理type:【{}】", type); switch (type) { case 1: log.info("订单拆单信息 type:{}", type); handleService.handleSplit(msg); break; case 5: log.info("获取妥投信息 type:{}", type); handleService.handleReceipt(msg); break; log.info("该{}暂未有相关处理方法", type); } } } }

     3. 消息发送公共类 DemoServiceImpl 

    @Service
    public class DemoServiceImpl implements DemoService {
    
        @Autowired
        private StringRedisTemplate stringRedisTemplate;
    
        /**
         * 保存消息到队列,基于PUB/SUB模式
         * @param queue 队列列
         * @param content 内容
         */
        public void saveMessageToQueue(String queue, String content){
            stringRedisTemplate.convertAndSend(queue, content);
        }
    
    }
    

      4. 示例,调用类  Demo

    @Service
    public class Demo {
    
       @Autowired
        private DemoService demoService;
    
       public void pushMessage(String msg){
           log.info("接收到消息:【{}】", msg);
           //推送消息到队列 handle_message_queue_prefix1
           demoService.saveMessageToQueue("handle_message_queue_prefix1", msg);
       }
    }
    

     

  • 相关阅读:
    图片轮播
    swoole 内存泄露的问题有没有好的办法解决
    学习Swoole需要掌握哪些基础知识
    通过SSH通道来访问MySQL
    redis常见应用场景
    Redis 消息队列的实现
    PHP-Curl模拟HTTPS请求
    代码重构方向原则指导
    win8.1系统相关
    SQL Server 学习系列之六
  • 原文地址:https://www.cnblogs.com/liuyq/p/14860764.html
Copyright © 2011-2022 走看看