zoukankan      html  css  js  c++  java
  • springboot 2.X 集成RabbitMQ 详解(二)topic 模式

    Topic 模式
    重点是理解交换器(exchange)、路由键(routing key)、队列名(queue name)三者之间的绑定关系。

    • topic 发送方:
      发送方 关注参数主要有三个 交换器(exchange) 路由键(routing key) 和 消息

    • topic 消费方
      消费方 关注点是队列的名字定义

    • topic 配置关系
      配置关系,主要关注于同一路由键下的 路由键和队列的 绑定关系

    • 重点使用的: 包含匹配规则 routing key 采用 . 分割 ,其中 “” 和 “#” 分别表达的意思是 其中“”用于匹配一个单词,“#”用于匹配多个单词
      例如:
      交换器名称(exchange name)为 A 队列名(queue name) 分别为 monco.xuzhou.man , monco.nanjing.man , sxy.xuzhou.woman , sxy.nanjing.woman
      现在我们需要给包含monco的所有队列,那么我们的路由键设置为 monco.#
      现在我们需要给包含sxy的所有xuzhou的队列,那么我们的路由键设置为 sxy.xuzhou.* 或者 sxy.xuzhou.##
      "#" 表示 可以配置之后的所有的,"*" 表示可以配置之后的一个

    生产者代码如下:

    package com.monco.sender;
    
    import com.monco.config.RmConst;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @author monco
     * @date 2020/4/15
     * @description: topic 发送者
     */
    @Component
    public class TopicSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send() {
    
            String msg1 = "I am email mesaage msg======";
            System.out.println("TopicSender send the 1st : " + msg1);
            this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.QUEUE_TOPIC_EMAIL, msg1);
    
            String msg2 = "I am user mesaages msg########";
            System.out.println("TopicSender send the 2nd : " + msg2);
            this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, RmConst.QUEUE_TOPIC_USER, msg2);
    
            String msg3 = "I am error mesaages msg";
            System.out.println("TopicSender send the 3rd : " + msg3);
            this.rabbitTemplate.convertAndSend(RmConst.EXCHANGE_TOPIC, "errorkey", msg3);
        }
    }
    
    

    配置类代码如下:

        /**
         * 定义topic模式的交换器
         * exchange 交换器 名称  monco.topic.exchange
         * 队列名称  monco.topic.email  monco.topic.email
         * <p>
         * routing key 定义 monco.*.user
         */
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(RmConst.EXCHANGE_TOPIC);
        }
    
        @Bean
        public Queue queueEmailMessage() {
            return new Queue(RmConst.QUEUE_TOPIC_EMAIL);
        }
    
        @Bean
        public Queue queueUserMessage() {
            return new Queue(RmConst.QUEUE_TOPIC_USER);
        }
    
        @Bean
        public Queue queueAllMessage() {
            return new Queue(RmConst.QUEUE_TOPIC_All);
        }
    
        @Bean
        public Binding bindingEmailExchangeMessage() {
            return BindingBuilder
                    .bind(queueEmailMessage())
                    .to(topicExchange())
                    .with("monco.topic.email");
        }
    
        @Bean
        public Binding bindingUserExchangeMessages() {
            return BindingBuilder
                    .bind(queueUserMessage())
                    .to(topicExchange())
                    .with("monco.*.user");
        }
    
        @Bean
        public Binding bindingAllExchangeMessages() {
            return BindingBuilder
                    .bind(queueAllMessage())
                    .to(topicExchange())
                    .with("monco.*.user");
        }
    
    

    消费者代码如下:

    package com.monco.receiver;
    
    import com.monco.config.RmConst;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * @author monco
     * @date 2020/4/15
     * @description: topic email 接收者
     */
    @Slf4j
    @Component
    public class TopicEmailReceiver {
    
        @RabbitHandler
        @RabbitListener(queues = "monco.topic.email")
        public void processEmail(Message message, Channel channel) throws IOException {
            // 手动应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.info("processEmail receive" + new String(message.getBody()));
        }
    
    
        @RabbitHandler
        @RabbitListener(queues = "monco.topic.user")
        public void processUser(Message message, Channel channel) throws IOException {
            // 手动应答
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), true,false);
            log.info("processUser receive" + new String(message.getBody()));
        }
    
        @RabbitHandler
        @RabbitListener(queues = "monco.topic.user")
        public void processUsera(Message message, Channel channel) throws IOException {
            // 手动应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.info("processUser2 receive" + new String(message.getBody()));
        }
    
        @RabbitHandler
        @RabbitListener(queues = "monco.monco.user")
        public void processAll(Message message, Channel channel) throws IOException {
            // 手动应答
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.info("processAll receive" + new String(message.getBody()));
        }
    }
    
    
  • 相关阅读:
    日区 Apple ID共享
    强大的视频跨平台视频处理软件
    百度网盘无限速
    App Store看片神器,请收好
    bootstrap 中这段代码 使bundles 失败
    C# EF中调用 存储过程并调回参数
    mvc 默认访问 Area 下控制器方法
    怎样用SQL语句查看查询的性能指标
    slice 定义和用法
    C# Regex类用法
  • 原文地址:https://www.cnblogs.com/monco-sxy/p/12748809.html
Copyright © 2011-2022 走看看