zoukankan      html  css  js  c++  java
  • SpringBoot+RabbitMQ学习笔记(三)使用RabbitMQ的三种交换器之Topic

    一丶简介

    Topic Exchange 
    将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。

    业务场景:

    1.日志服务器记录用户服务、商品服务、订单服务三个服务。

    2.日志服务器有三个日志服务:INFO日志处理服务、ERROR日志处理服务、全日志处理服务。

    3.使用Topic交换器处理日志,匹配规则依次为:*.log.info、*.log.error和*.log.*。

    二丶配置文件

    还是创建两个项目,一个作为生产者一个作为消费者。

    生产者配置:

    server.port=8883
    
    spring.application.name=hello-world
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.thymeleaf.cache=false
    
    设置交换器名称
    mq.config.exchange=log.topic
    View Code

    消费者配置:

    server.port=8884
    
    spring.application.name=lesson1
    
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    #设置交换器名称
    mq.config.exchange=log.topic
    #info队列名称
    mq.config.queue.info=log.info
    #error队列名称
    mq.config.queue.error=log.error
    #log队列名称
    mq.config.queue.logs=log.all
    View Code

    三丶创建生产者

    1.订单服务

    package com.example.amqptopicprovider;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:模拟订单服务发送消息
     */
    @Component
    public class OrderSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        //exChange 交换器
        @Value("${mq.config.exchange}")
        private String exChange;
    
        /**
         * 发送消息的方法
         * @param msg
         */
        public void send(String msg){
            //向消息队列发送消息
            //参数1:队列名称
            //参数2:消息
            // this.amqpTemplate.convertAndSend("hello-queue",msg);
    
            //向消息队列发送消息
            //参数1:交换器名称
            //参数2:路由键
            //参数3:消息
            this.amqpTemplate.convertAndSend(exChange,"order.log.debug","order.log.debug-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"order.log.info","order.log.info-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"order.log.warn","order.log.warn-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"order.log.error","order.log.error-"+msg);
    
        }
    }
    View Code

    2.商品服务

    package com.example.amqptopicprovider;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:模拟商品服务发送消息
     */
    @Component
    public class ProductSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        //exChange 交换器
        @Value("${mq.config.exchange}")
        private String exChange;
    
        /**
         * 发送消息的方法
         * @param msg
         */
        public void send(String msg){
            //向消息队列发送消息
            //参数1:队列名称
            //参数2:消息
            // this.amqpTemplate.convertAndSend("hello-queue",msg);
    
            //向消息队列发送消息
            //参数1:交换器名称
            //参数2:路由键
            //参数3:消息
            this.amqpTemplate.convertAndSend(exChange,"product.log.debug","product.log.debug-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"product.log.info","product.log.info-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"product.log.warn","product.log.warn-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"product.log.error","product.log.error-"+msg);
    
        }
    }
    View Code

    3.用户服务

    package com.example.amqptopicprovider;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:模拟用户服务发送消息
     */
    @Component
    public class UserSender {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        //exChange 交换器
        @Value("${mq.config.exchange}")
        private String exChange;
    
        /**
         * 发送消息的方法
         * @param msg
         */
        public void send(String msg){
            //向消息队列发送消息
            //参数1:队列名称
            //参数2:消息
            // this.amqpTemplate.convertAndSend("hello-queue",msg);
    
            //向消息队列发送消息
            //参数1:交换器名称
            //参数2:路由键
            //参数3:消息
            this.amqpTemplate.convertAndSend(exChange,"user.log.debug","user.log.debug-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"user.log.info","user.log.info-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"user.log.warn","user.log.warn-"+msg);
            this.amqpTemplate.convertAndSend(exChange,"user.log.error","user.log.error-"+msg);
    
        }
    }
    View Code

    四丶创建消费者

    1.ERROR日志处理服务

    package com.ant.amqptopicconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                  exchange:配置交换器
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.error}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                    key = "*.log.error"
            )
    )
    public class TopicErrorReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("error-receiver:"+msg);
        }
    }
    View Code

    2.INFO日志处理服务

    package com.ant.amqptopicconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                  exchange:配置交换器
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.info}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                    key = "*.log.info"
            )
    )
    public class TopicInfoReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("info-receiver:"+msg);
        }
    }
    View Code

    3.全日志处理服务

    package com.ant.amqptopicconsumer;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:消息接收者
     *
     * @RabbitListener bindings:绑定队列
     * @QueueBinding  value:绑定队列的名称
     *                  exchange:配置交换器
     *                  key:路由键
     * @Queue : value:配置队列名称
     *          autoDelete:是否是一个可删除的临时队列
     * @Exchange value:为交换器起个名称
     *           type:指定具体的交换器类型
     */
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.logs}",autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC),
                    key = "*.log.*"
            )
    )
    public class TopicLogReceiver {
    
        /**
         * 接收消息的方法,采用消息队列监听机制
         * @param msg
         */
        @RabbitHandler
        public void process(String msg){
            System.out.println("all-receiver:"+msg);
        }
    }
    View Code

    五丶老规矩测试一发

    package com.example.amqp;
    
    import com.example.ampq.Sender;
    import com.example.amqptopicprovider.OrderSender;
    import com.example.amqptopicprovider.ProductSender;
    import com.example.amqptopicprovider.UserSender;
    import com.example.helloworld.HelloworldApplication;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    /**
     * Author:aijiaxiang
     * Date:2020/4/26
     * Description:
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = HelloworldApplication.class)
    public class QueueTest {
        @Autowired
        private Sender sender;
    
        @Autowired
        private UserSender userSender;
    
        @Autowired
        private ProductSender productSender;
    
        @Autowired
        private OrderSender orderSender;
    
        /**
         * 测试消息队列
         */
    //    @Test
    //    public void test1() throws InterruptedException {
    //        while (true){
    //            Thread.sleep(1000);
    //            sender.send("hello");
    //        }
    //
    //    }
    
        @Test
        public void test2(){
            userSender.send("usersend");
            productSender.send("productsend");
            orderSender.send("ordersend");
        }
    }
    View Code

    注:日志服务处理类中的路由键是直接采用了硬编码的方式进行配置,主要是为了方便查看一目了然,但是还是推荐将路由键配置在配置文件中,使用 "${}" 这个方式来进行读取。

  • 相关阅读:
    字典常用操作复习
    列表常用方法复习
    爬虫流程复习
    协程解决素数
    yield 复习
    多线程复习2
    多线程复习1
    异常 巩固3
    logging日志基础示例
    2019最新百度网盘不限速下载教程
  • 原文地址:https://www.cnblogs.com/aijiaxiang/p/12797512.html
Copyright © 2011-2022 走看看