zoukankan      html  css  js  c++  java
  • Springboot集成RabbitMQ

    前面我们已经了解了RabbitMQ的一些基本概念和原理,今天进入实战篇,在springboot框架中集成RabbitMQ,默认已经创建一个Springboot项目。

    添加pom依赖

    在pom.xml文件中引入以下依赖:

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    	<version>${springboot.version}</version>
    </dependency>
    

    修改application.yml文件

    在application.yml文件中添加以下配置:

    spring:
      rabbitmq:
        addresses: 127.0.0.1:5672
        username: lzm
        password: lzm
        virtual-host: test
    

    address是rabbitmq server的请求地址,5672为其默认的端口号,集群多个地址时可以使用逗号隔开
    username:rabbitmq用户名
    password: rabbitmq密码
    virtual-host: 虚拟主机

    以上配置是集成rabbitmq最基本的配置,如果要使用更多的配置,比如配置手动ack,消息发送confirm等属性,可以参考org.springframework.boot.autoconfigure.amqp.RabbitProperties类中的默认配置

    声明一个队列

    我们写一个配置类,在配置类中声明一个队列,也可以通过访问RabbitMQ的管理页面http://IP:15672手动创建一个队列,此处使用代码去声明一个队列,底层使用RabbitAdmin在Rabbit Server中创建队列。

    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    /**
     * @author lzm
     * @date 2020/7/22 22:45
     */
    @Component
    public class RabbitConfig {
    
        /**
         * 定义一个可持久化的队列
         * @return
         */
        @Bean
        public Queue firstQueue(){
            return new Queue(Constant.MY_FIRST_QUEUE_NAME,true,false,false);
        }
    }
    

    上述代码声明了一个可持久化的队列,创建队列时的四个参数解释如下:

    name: 队列名称
    durable: 为true时表示是可持久化的队列,即使rabbitmq服务重启以后,队列依然是存在的
    exclusive: 为true时表示只能被定义该queue的连接使用,一旦这个连接关闭,队列也会跟着删除
    autoDelete: 为true时表示当没有消费者再使用queue时,queue自动删除

    添加以上代码后,启动springboot项目后就可以在RabbitMQ的管理界面看到我们定义的队列:

    生产者

    生产者向消息队列中发送消息,我们创建一个service,在service中实现发送数据的逻辑:

    import com.lzm.rabbitmq.constant.Constant;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.stereotype.Service;
    import javax.annotation.Resource;
    
    /**
     * @author lzm
     * @date 2020/7/22 23:00
     */
    @Service
    public class ProducerService {
    
        /**
         * 注入rabbit模板
         */
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        public void produceMessage(){
    
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend(Constant.MY_FIRST_QUEUE_NAME, "我是消息====>"+i);
            }
        }
    }
    

    上述代码中,首先向service中注入了RabbitTemplate,然后调用template的convertAndSend方法,闯入队列名称和消息内容后即可将消息发送到前面创建的队列中。

    我们写一个controller用以调用service代码生产消息:

    @RestController
    @Slf4j
    @RequestMapping("/")
    public class IndexController {
    
        @Resource
        private ProducerService producerService;
    
        @GetMapping(value = "index")
        public String sendMessage(){
            producerService.produceMessage();
            return "消息已发送,请查看消息队列";
        }
    }
    

    启动tomcat,访问http://127.0.0.1:79999/index方法后,查看rabbitmq管理界面中队列中的数据如下所示:

    可以看到此时队列中共有10条数据,且10条数据都处理ready状态,添加消费者后即可消息处于ready状态的数据,可以点进队列中,查看队列中的详细数据,如下所示:

    通过Get Message面板可以查看当前队列中的消息,不仅可以看到消息体,还可以看到消息头,比如Wxchange是默认的交换机,Routing Key与队列名称相同,以及消息的优先级(priority),是否持久化(delivery_model)等各项内容

    消费者

    前面生产者已经将消息发送到消息队列中,此时需要创建一个消费者对队列中的数据进行消费,创建消费者的代码如下所示:

    import com.lzm.rabbitmq.constant.Constant;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消费者示例代码
     * @author lzm
     * @date 2020/7/22 23:26
     */
    @Component
    @Slf4j
    public class MyConsumer {
    
        @RabbitHandler
        @RabbitListener(queues = { Constant.MY_FIRST_QUEUE_NAME})
        public void consumeMessage(String msg, Channel channel, Message message){
            log.debug(msg);
        }
    }
    

    启动项目后控制台打印效果:

    可以看到此时消费端将队列中的全部消息都消费掉了,此时队列中是没有消息的。

    注解解释:

    @RabbitHandler 添加到方法上,用来处理特定类型的消息
    @RabbitListener 可以添加到方法上,也可以添加到类上,该注解方法中最重要的属性是queues,即当前消费者从哪些队列中消费消息,该注解还有很多属性,比如绑定(bindings)、containerFactory、优先级(priority)等多个属性,可以根据实际需要进行配置

    总结

    通过以上配置即可在springboot中初步集成Rabbitmq,但是上述的集成只是简单的能够发送消息,但是并不能保证生产端发送的消息消费端一定可以接收到,对于分布式消息队列需要的消息投递的可靠性一点都没有涉及,下一篇将介绍Rabbitmq是如何实现消息的可靠投递的。

  • 相关阅读:
    CCF CSP 题解
    CCF CSP 2019032 二十四点
    CCF CSP 2018121 小明上学
    CCF CSP 2019092 小明种苹果(续)
    CCF CSP 2019091 小明种苹果
    CCF CSP 2019121 报数
    CCF CSP 2019031 小中大
    CCF CSP 2020061 线性分类器
    CCF CSP 2020062 稀疏向量
    利用国家气象局的webservice查询天气预报(转载)
  • 原文地址:https://www.cnblogs.com/ybyn/p/13690965.html
Copyright © 2011-2022 走看看