zoukankan      html  css  js  c++  java
  • Springboot集成RabbitMQ

    前面我们已经了解了RabbitMQ的一些基本概念和原理,今天进入实战篇,在springboot框架中集成RabbitMQ,默认已经创建一个Springboot项目。

    添加pom依赖

    在pom.xml文件中引入以下依赖:

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    	<version>${springboot.version}</version>
    </dependency>
    

    修改application.yml文件

    在application.yml文件中添加以下配置:

    spring:
      rabbitmq:
        addresses: 127.0.0.1:5672
        username: lzm
        password: lzm
        virtual-host: test
    

    address是rabbitmq server的请求地址,5672为其默认的端口号,集群多个地址时可以使用逗号隔开
    username:rabbitmq用户名
    password: rabbitmq密码
    virtual-host: 虚拟主机

    以上配置是集成rabbitmq最基本的配置,如果要使用更多的配置,比如配置手动ack,消息发送confirm等属性,可以参考org.springframework.boot.autoconfigure.amqp.RabbitProperties类中的默认配置

    声明一个队列

    我们写一个配置类,在配置类中声明一个队列,也可以通过访问RabbitMQ的管理页面http://IP:15672手动创建一个队列,此处使用代码去声明一个队列,底层使用RabbitAdmin在Rabbit Server中创建队列。

    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    /**
     * @author lzm
     * @date 2020/7/22 22:45
     */
    @Component
    public class RabbitConfig {
    
        /**
         * 定义一个可持久化的队列
         * @return
         */
        @Bean
        public Queue firstQueue(){
            return new Queue(Constant.MY_FIRST_QUEUE_NAME,true,false,false);
        }
    }
    

    上述代码声明了一个可持久化的队列,创建队列时的四个参数解释如下:

    name: 队列名称
    durable: 为true时表示是可持久化的队列,即使rabbitmq服务重启以后,队列依然是存在的
    exclusive: 为true时表示只能被定义该queue的连接使用,一旦这个连接关闭,队列也会跟着删除
    autoDelete: 为true时表示当没有消费者再使用queue时,queue自动删除

    添加以上代码后,启动springboot项目后就可以在RabbitMQ的管理界面看到我们定义的队列:

    生产者

    生产者向消息队列中发送消息,我们创建一个service,在service中实现发送数据的逻辑:

    import com.lzm.rabbitmq.constant.Constant;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    
    /**
     * @author lzm
     * @date 2020/7/22 23:00
     */
    @Service
    public class ProducerService {
    
        /**
         * 注入rabbit模板
         */
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public void produceMessage(){
    
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend(Constant.MY_FIRST_QUEUE_NAME, "我是消息====>"+i);
            }
        }
    }
    

    上述代码中,首先向service中注入了RabbitTemplate,然后调用template的convertAndSend方法,闯入队列名称和消息内容后即可将消息发送到前面创建的队列中。

    我们写一个controller用以调用service代码生产消息:

    @RestController
    @Slf4j
    @RequestMapping("/")
    public class IndexController {
    
        @Resource
        private ProducerService producerService;
    
        @GetMapping(value = "index")
        public String sendMessage(){
            producerService.produceMessage();
            return "消息已发送,请查看消息队列";
        }
    }
    

    启动tomcat,访问http://127.0.0.1:79999/index方法后,查看rabbitmq管理界面中队列中的数据如下所示:

    可以看到此时队列中共有10条数据,且10条数据都处理ready状态,添加消费者后即可消息处于ready状态的数据,可以点进队列中,查看队列中的详细数据,如下所示:

    通过Get Message面板可以查看当前队列中的消息,不仅可以看到消息体,还可以看到消息头,比如Wxchange是默认的交换机,Routing Key与队列名称相同,以及消息的优先级(priority),是否持久化(delivery_model)等各项内容

    消费者

    前面生产者已经将消息发送到消息队列中,此时需要创建一个消费者对队列中的数据进行消费,创建消费者的代码如下所示:

    import com.lzm.rabbitmq.constant.Constant;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者示例代码
     * @author lzm
     * @date 2020/7/22 23:26
     */
    @Component
    @Slf4j
    public class MyConsumer {
    
        @RabbitHandler
        @RabbitListener(queues = { Constant.MY_FIRST_QUEUE_NAME})
        public void consumeMessage(String msg, Channel channel, Message message){
            log.debug(msg);
        }
    }
    

    启动项目后控制台打印效果:

    可以看到此时消费端将队列中的全部消息都消费掉了,此时队列中是没有消息的。

    注解解释:

    @RabbitHandler 添加到方法上,用来处理特定类型的消息
    @RabbitListener 可以添加到方法上,也可以添加到类上,该注解方法中最重要的属性是queues,即当前消费者从哪些队列中消费消息,该注解还有很多属性,比如绑定(bindings)、containerFactory、优先级(priority)等多个属性,可以根据实际需要进行配置

    总结

    通过以上配置即可在springboot中初步集成Rabbitmq,但是上述的集成只是简单的能够发送消息,但是并不能保证生产端发送的消息消费端一定可以接收到,对于分布式消息队列需要的消息投递的可靠性一点都没有涉及,下一篇将介绍Rabbitmq是如何实现消息的可靠投递的。

  • 相关阅读:
    Android Studio 如何导入源码
    多版本 jdk 的切换方法
    Markdown的常用语法
    adb常用命令
    虚拟机怎样共享ubuntu中的文件
    adb的相关配置
    【Leetcode】2的幂(整数的二进制形式,与运算)
    【Leetcode】二叉树的最小深度
    【Leetcode】合并两个有序链表
    【Leetcode】国王挖金矿
  • 原文地址:https://www.cnblogs.com/ybyn/p/13690965.html
Copyright © 2011-2022 走看看