zoukankan      html  css  js  c++  java
  • SpringBoot应用操作Rabbitmq(direct高级操作)

    一、首先声明完成任务架构,通过direct订阅/发布的模式进行生产消费。

     a、消息生产指定交换器和路由key

     b、消费者绑定交换器,路由key和队列的关系(集群监控收到的消息不重复)

    二、实战演练

    1、首先第一步是引入消息队列的依赖包

            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit</artifactId>
                <version>2.1.7.RELEASE</version>
            </dependency>

    2、添加配置application.yml文件或者properties文件

    spring:
      application:
        #指定应用的名字
        name: rabbit-add
      #配置rabbitmq
      rabbitmq:
      #链接主机
        host: 127.0.0.1
      #端口
        port: 5672
      #已经授权的用户账号密码
        username: user
        password: user
      #指定的虚拟主机,默认/,
        virtual-host: my_vhost
    
    
    # 自定义配置应用于direct交换器
    mq:
      config:
       #自定义交换器名称
         exchange: log.direct
         queue:
            #自定义error和info队列名称
            errorName: log.error
            infoName: log.info
            #自定义error和info路由键的名称
            routingInfoKey: info.routing.key
            routingErrorKey: error.routing.key

    3、创建队列的消费者,

    注意:1、当前代码autoDelete属性为false,创建的是临时队列

         2、RabbitHandler的isDefault属性的使用,默认false,可能会出现监听这未找到方法的循环异常

             3、集群模式将某个消费者复制多份即可

     a、error队列的消费者
    package com.niu.direct;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/4/28  7:20 PM
     * @RabbitListener 自定义监听事件
     * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
     * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
     * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
     * key  指定路由键
     */
    @Component
    @Slf4j
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "${mq.config.queue.errorName}", autoDelete = "true"),
                    exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                    key = "${mq.config.queue.routingErrorKey}")
    )
    public class ErrorReceiver {
    
        /**
         * 设置监听方法
         *  @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
         *
         * @param msg
         */
        @RabbitHandler(isDefault = true)
        public void process(String msg) {
            log.info("接受到消息:error {}", msg);
        }
    }
    View Code

      b、info队列的消费者

    package com.niu.direct;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.stereotype.Component;
    
    /**
     * @author niunafei
     * @function
     * @email niunafei0315@163.com
     * @date 2020/4/28  7:20 PM
     * @RabbitListener 自定义监听事件
     * @QueueBinding 绑定交换器与队列的关系value 指定队列exchange指定交换器
     * value= @Queue 指定配置队列的信息 value队列名称 autoDelete是否是临时队列
     * exchange= @Exchange 指定交换器 value指定交换器名称 type交换器类型
     * key  指定路由键
     */
    @Component
    @Slf4j
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(
                            value = "${mq.config.queue.infoName}", autoDelete = "true"
                    ),
                    exchange = @Exchange(
                            value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                    key = "${mq.config.queue.routingInfoKey}")
    )
    public class InfoReceiver {
    
        /**
         * 设置监听方法
         *
         * @param msg
         * @RabbitHandler 声明监听方法是下面的 isDefault属性是默认false接受的完整对象,true接受body体
         */
        @RabbitHandler(isDefault = true)
        public void process(String msg) {
            log.info("接受到消息:info {}", msg);
        }
    }
    View Code

    4、创建消息的生产者,这里为了方便就同一个方法生产

     1 package com.niu.direct;
     2 
     3 import org.springframework.amqp.core.Message;
     4 import org.springframework.amqp.core.MessageProperties;
     5 import org.springframework.amqp.rabbit.core.RabbitTemplate;
     6 import org.springframework.beans.factory.annotation.Autowired;
     7 import org.springframework.beans.factory.annotation.Value;
     8 import org.springframework.stereotype.Component;
     9 
    10 /**
    11  * @author niunafei
    12  * @function
    13  * @email niunafei0315@163.com
    14  * @date 2020/4/29  9:44 AM
    15  */
    16 @Component
    17 public class Sender {
    18     /**
    19      * spring整合的操作类
    20      * Message 发送的消息对象
    21      * void send(Message var1) throws AmqpException;
    22      * <p>
    23      * var1 路由键 Message 发送的消息对象
    24      * void send(String var1, Message var2) throws AmqpException;
    25      * <p>
    26      * var1 指定交换器名称 var2 路由键 Message 发送的消息对象
    27      * void send(String var1, String var2, Message var3) throws AmqpException;
    28      *
    29      * convertAndSend() 方法不需要指定MessageProperties属性即可发布
    30      */
    31     @Autowired
    32     private RabbitTemplate rabbitTemplate;
    33 
    34     @Value("${mq.config.queue.routingInfoKey}")
    35     private String routingInfoKey;
    36     @Value("${mq.config.queue.routingErrorKey}")
    37     private String routingErrorKey;
    38     @Value("${mq.config.exchange}")
    39     private String exchange;
    40 
    41     public void send(String msg) {
    42         Message message = new Message(msg.getBytes(), new MessageProperties());
    43         //需要指定交换器和路由键就可以转发
    44         rabbitTemplate.send(exchange, routingInfoKey, message);
    45         rabbitTemplate.send(exchange, routingErrorKey, message);
    46     }
    47 
    48 }
    View Code

    5测试结果:

    注意:ack确认机制,容易产生数据丢失,和产生内存泄漏,消费者进行死循环,配置这两个属性进行确认。

    1、autoDelete属性设置为false

    @Queue(value = "${mq.config.queue.orderName}", autoDelete = "false"

    2、消费者进行死循环问题

    docker安装rabbitmq:rabbitMQ安装docker版 /权限管理命令

    简单应用来这里吧: SpringBoot应用操作Rabbitmq

    简单应用来这里吧: SpringBoot应用操作Rabbitmq(direct高级操作)

    简单应用来这里吧:SpringBoot应用操作Rabbitmq(topic交换器高级操作)   

    简单应用来这里吧:SpringBoot应用操作Rabbitmq(fanout广播高级操作)

  • 相关阅读:
    C# 使用SqlBulkCopy类批量复制大数据 快速导入Excel大量数据
    Git的安装与使用
    未能加载文件或程序集“Microsoft.VisualStudio.Web.PageInspector.Loader, Version=1.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a”或它的某一个依赖项。
    Android SDK Manager不能显示所有包的解决办法
    editor多功能文本框在有些计算机上不能正常加载,解决方法,本人用的是把js调用方法放到<body></body>后面)
    MVC URL参数传递+变为空格解决方法
    SQL 查询不重复数据
    Java基本数据类型取值范围
    统计数组中的逆数对个数
    Docker 安装 ElasticSearch
  • 原文地址:https://www.cnblogs.com/niunafei/p/12801427.html
Copyright © 2011-2022 走看看