zoukankan      html  css  js  c++  java
  • SpringBoot+RabbitMQ学习笔记(四)使用RabbitMQ的三种交换器之Fanout

    一丶简介

    Fanout Exchange 

      不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。

    业务场景:

    1.订单服务需要同时向短信服务和push服务发送,两个服务都有各自的消息队列。

    2.使用Fanout交换器。

    二丶配置文件

    同样的创建了两个项目,一个作为生产者,一个作为消费者。

    生产者配置:

    server.port=8883
    
    spring.application.name=hello-world
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #设置交换器名称
    mq.config.exchange=order.fanout
    View Code

    消费者配置:

    server.port=8884
    
    spring.application.name=lesson1
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #设置交换器名称
    mq.config.exchange=order.fanout
    #短信消息服务队列名称
    mq.config.queue.sms=order.sms
    #push消息服务队列名称
    mq.config.queue.push=order.push
    #log消息服务队列名称
    mq.config.queue.log=order.log
    View Code

    注:本是要配置两个消息队列,但是为了测试fanout交换器是否能够将消息发送到所有消息队列(准确的说是配置了路由键的队列和没有配置路由键的队列)多创建的一个。

    三丶编写生产者

    package com.example.amqpfanoutprovider;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:发送消息
     */
    @Component
    public class FanoutSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        //exChange 交换器
        @Value("${mq.config.exchange}")
        private String exChange;
    
        /**
         * 发送消息的方法
         * @param msg
         */
        public void send(String msg){
            //向消息队列发送消息
            //参数1:交换器名称
            //参数2:路由键,广播模式时(fanout交换器)没有路由键使用""空字符串代替
            //参数3:消息
            this.amqpTemplate.convertAndSend(exChange,"",msg);
    
        }
    }
    View Code

    四丶编写消费者

    短信服务类:

    package com.ant.amqpfanoutconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     *                key:路由键(广播模式时不需要路由键,所以不写)
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.sms}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
            )
    )
    public class SmsReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("sms-receiver:"+msg);
        }
    }
    View Code

    push服务类:

    package com.ant.amqpfanoutconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     *                key:路由键(广播模式时不需要路由键,所以不写)
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.push}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
            )
    )
    public class PushReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("push-receiver:"+msg);
        }
    }
    View Code

    log服务类:该类是为了测试配置了路由键的消息队列和没配置路由键的消息队列是否都能接收到fanout交换器发送的消息。

    package com.ant.amqpfanoutconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                exchange:配置交换器
     *                key:路由键(广播模式时不需要路由键,所以不写)注:消息队列配置了路由键同样能接收到fanout交换器传过来的消息。
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.log}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT),
                    key = "user.log.info"
            )
    )
    public class LogReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("log-receiver:"+msg);
        }
    }
    View Code

    五丶测试一发

    测试类:

    package com.example.amqp;
    
    import com.example.amqpfanoutprovider.FanoutSender;
    import com.example.helloworld.HelloworldApplication;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = HelloworldApplication.class)
    public class QueueTest {
    
        @Autowired
        private FanoutSender fanoutSender;
    
        /**
         * 测试消息队列
         */
        @Test
        public void test1() throws InterruptedException {
    
                fanoutSender.send("hello");
    
    
        }
    }
    View Code

    OK,看控制台输出得出,配置了路由键的消息队列和没配置路由键的消息队列都能接收到fanout交换器发送的消息!

    如有不足之处欢迎指正!

  • 相关阅读:
    const用法详解(转)
    [转]Scintilla开源库使用指南
    javascript 流程控制语句 if 语句
    knockout.js 练习一
    深入Node.js的模块机制
    js本地存储解决方案(localStorage与userData)
    linear-gradient 渐变 css3新属性
    制作响应式网站时,用来测试不同宽度下网页展示效果的方法
    zepto.js, django和webpy
    attachEvent 与 addEventListener 的用法、区别
  • 原文地址:https://www.cnblogs.com/aijiaxiang/p/12798214.html
Copyright © 2011-2022 走看看