zoukankan      html  css  js  c++  java
  • SpringBoot+RabbitMQ学习笔记(四)使用RabbitMQ的三种交换器之Fanout

    一丶简介

    Fanout Exchange 

      不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    业务场景:

    1.订单服务需要同时向短信服务和push服务发送,两个服务都有各自的消息队列。

    2.使用Fanout交换器。

    二丶配置文件

    同样的创建了两个项目,一个作为生产者,一个作为消费者。

    生产者配置:

    server.port=8883
    
    spring.application.name=hello-world
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #设置交换器名称
    mq.config.exchange=order.fanout
    View Code

    消费者配置:

    server.port=8884
    
    spring.application.name=lesson1
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #设置交换器名称
    mq.config.exchange=order.fanout
    #短信消息服务队列名称
    mq.config.queue.sms=order.sms
    #push消息服务队列名称
    mq.config.queue.push=order.push
    #log消息服务队列名称
    mq.config.queue.log=order.log
    View Code

    注:本是要配置两个消息队列,但是为了测试fanout交换器是否能够将消息发送到所有消息队列(准确的说是配置了路由键的队列和没有配置路由键的队列)多创建的一个。

    三丶编写生产者

    package com.example.amqpfanoutprovider;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:发送消息
     */
    @Component
    public class FanoutSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        //exChange 交换器
        @Value("${mq.config.exchange}")
        private String exChange;
    
        /**
         * 发送消息的方法
         * @param msg
         */
        public void send(String msg){
            //向消息队列发送消息
            //参数1:交换器名称
            //参数2:路由键,广播模式时(fanout交换器)没有路由键使用""空字符串代替
            //参数3:消息
            this.amqpTemplate.convertAndSend(exChange,"",msg);
    
        }
    }
    View Code

    四丶编写消费者

    短信服务类:

    package com.ant.amqpfanoutconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     *                key:路由键(广播模式时不需要路由键,所以不写)
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
            )
    )
    public class SmsReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("sms-receiver:"+msg);
        }
    }
    View Code

    push服务类:

    package com.ant.amqpfanoutconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     *                key:路由键(广播模式时不需要路由键,所以不写)
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.push}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
            )
    )
    public class PushReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("push-receiver:"+msg);
        }
    }
    View Code

    log服务类:该类是为了测试配置了路由键的消息队列和没配置路由键的消息队列是否都能接收到fanout交换器发送的消息。

    package com.ant.amqpfanoutconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     *                key:路由键(广播模式时不需要路由键,所以不写)注:消息队列配置了路由键同样能接收到fanout交换器传过来的消息。
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.log}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT),
                    key = "user.log.info"
            )
    )
    public class LogReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("log-receiver:"+msg);
        }
    }
    View Code

    五丶测试一发

    测试类:

    package com.example.amqp;
    
    import com.example.amqpfanoutprovider.FanoutSender;
    import com.example.helloworld.HelloworldApplication;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = HelloworldApplication.class)
    public class QueueTest {
    
        @Autowired
        private FanoutSender fanoutSender;
    
        /**
         * 测试消息队列
         */
        @Test
        public void test1() throws InterruptedException {
    
                fanoutSender.send("hello");
    
    
        }
    }
    View Code

    OK,看控制台输出得出,配置了路由键的消息队列和没配置路由键的消息队列都能接收到fanout交换器发送的消息!

    如有不足之处欢迎指正!

  • 相关阅读:
    牛客IOI周赛17-提高组 卷积 生成函数 多项式求逆 数列通项公式
    6.3 省选模拟赛 Decompose 动态dp 树链剖分 set
    AtCoder Grand Contest 044 A Pay to Win 贪心
    5.29 省选模拟赛 树的染色 dp 最优性优化
    luogu P6097 子集卷积 FST FWT
    CF724C Ray Tracing 扩展欧几里得 平面展开
    5.30 省选模拟赛 方格操作 扫描线 特殊性质
    5.29 省选模拟赛 波波老师 SAM 线段树 单调队列 并查集
    Spring main方法中怎么调用Dao层和Service层的方法
    Bug -- WebService报错(两个类具有相同的 XML 类型名称 "{http://webService.com/}getPriceResponse"。请使用 @XmlType.name 和 @XmlType.namespace 为类分配不同的名称。)
  • 原文地址:https://www.cnblogs.com/aijiaxiang/p/12798214.html
Copyright © 2011-2022 走看看