zoukankan      html  css  js  c++  java
  • SpringBoot整合rabbitmq

    在现在的项目开发过程中,消息中间件使用的越来越多,一般用的比较多的消息中间件有rabbitmq、activemq、rocketmq、kafka等。那么今天,我们来学习springboot整合rabbitmq。

    在整合rabbitmq的时候,我们先要在本地下载安装rabbitmq,而rabbitmq是用erlang语言开发的,所以在安装rabbitmq之前,我们需要先安装erlang。具体下载安装的步骤我在这里就不再赘述了,可以参照这篇文章进行安装:https://www.jianshu.com/p/3d43561bb3ee

    在安装好了erlang和rabbitmq之后,我们就开始整合。

    1、首先我们需要在pom.xml中添加rabbitmq的依赖:

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    2、配置application-boot.yml:

    spring:
      # 配置rabbitMQspring:
      rabbitmq:
        host: 127.0.0.1
        username: guest
        password: guest
    

    3、创建两个POJO实体类:MsgContent1和MsgContent2

    package com.hry.spring.rabbitmq.boot.msgconvert.pojo;
    
    /**
     * 测试发送对象
     */
    public class MsgContent1 {
        private String name;
        private String age;
    
        @Override
        public String toString(){
            return "[ name = " + name + "; " + " age = " + age + " ]";
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getAge() {
            return age;
        }
    
        public void setAge(String age) {
            this.age = age;
        }
    }
    

      

    package com.hry.spring.rabbitmq.boot.msgconvert.pojo;
    
    /**
     * 测试发送对象
     */
    public class MsgContent2 {
        private String id;
        private String content;
    
        @Override
        public String toString(){
            return "[ id = " + id + "; " + " content = " + content + " ]";
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getContent() {
            return content;
        }
    
        public void setContent(String content) {
            this.content = content;
        }
    }
    

    4、设置序列化类RabbitMsgConvertConfigure

    package com.hry.spring.rabbitmq.boot.msgconvert;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 配置RabbitMQ中使用到队列、交换机、绑定等信息
     */
    @Configuration
    public class RabbitMsgConvertConfigure {
    
        // 队列名称
        public final static String SPRING_BOOT_QUEUE = "spring-boot-queue-msg-convert";
        // 交换机名称
        public final static String SPRING_BOOT_EXCHANGE = "spring-boot-exchange-msg-convert";
        // 绑定的值
        public static final String SPRING_BOOT_BIND_KEY = "spring-boot-bind-key-msg-convert";
    
    
        // === 在RabbitMQ上创建queue,exchange,binding 方法一:通过@Bean实现 begin ===
        /**
         * 定义队列:
         * @return
         */
        @Bean
        Queue queue() {
            return new Queue(SPRING_BOOT_QUEUE, false);
        }
    
        /**
         * 定义交换机
         * @return
         */
        @Bean
        TopicExchange exchange() {
            return new TopicExchange(SPRING_BOOT_EXCHANGE);
        }
    
        /**
         * 定义绑定
         * @param queue
         * @param exchange
         * @return
         */
        @Bean
        Binding binding(Queue queue, TopicExchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(SPRING_BOOT_BIND_KEY );
        }
    
        /**
         * 定义消息转换实例
         * @return
         */
        @Bean
        MessageConverter jackson2JsonMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        // === 如果默认的SimpleMessageListenerContainer不符合我们的要求,我们也可以通过如下的方式创建新的SimpleMessageListenerContainer===
    //    @Bean
    //    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    //        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    //        container.setConnectionFactory(connectionFactory);
    //        container.setMessageConverter().
    //        container.setConcurrentConsumers(10);
    //        return container;
    //    }
    
    
    //    @Bean
    //    MessageListenerAdapter listenerAdapter(ProductMessageListener receiver) {
    //        return new MessageListenerAdapter(receiver, "receiveMessage");
    //    }
    
    }
    

    5、消息发送者:

    package com.hry.spring.rabbitmq.boot.msgconvert;
    
    import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1;
    import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息发送者
     */
    @Component
    public class SendMsgConvertMsg {
    
        // 此接口的默认实现是RabbitTemplate,目前只有一个实现,
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        /**
         * 发送消息
         *
         * @param msgContent
         */
        public void sendMsgContent1(MsgContent1 msgContent) {
            amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent );
    
    
        }
    
        /**
         * 发送消息
         * @param msgContent
         */
        public void sendMsgContent2(MsgContent2 msgContent) {
            amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent);
        }
    }
    

    6、消息接收者:

    @RabbitListener定义在类表示此类是消息监听者并设置要监听的队列 
    @RabbitHandler:在类中可以定义多个@RabbitHandler,spring boot会根据不同参数传送到不同方法处理

    package com.hry.spring.rabbitmq.boot.msgconvert;
    
    import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1;
    import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    
    @Component
    // @RabbitListener除了可以作用在方法,也可以作用在类上。在后者的情况下,需要在处理的方法使用@RabbitHandler。一个类可以配置多个@RabbitHandler
    @RabbitListener(queues = RabbitMsgConvertConfigure.SPRING_BOOT_QUEUE)
    public class ReceiveMsgConvertMsg {
    
        /**
         * 获取信息:
         *  queue也可以支持RabbitMQ中对队列的模糊匹配
         * @param content
         */
        @RabbitHandler
        public void receiveMsgContent1(MsgContent1 content) {
            // ...
            System.out.println("[ReceiveMsgConvertMsg-MsgContent1] receive receiveMsgContent1 msg: " + content);
        }
    
        @RabbitHandler
        public void receiveMsgContent2(MsgContent2 msgContent2) {
            // ...
            System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive receiveMsgContent2 msg: " + msgContent2);
        }
    
    //    @RabbitHandler
    //    public void receiveString(@Payload String content) {
    //        // ...
    //        System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content);
    //    }
    //
    //    @RabbitHandler
    //    public void receiveStringb(byte[] content) {
    //        // ...
    //        System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content);
    //    }
    }
    

    7、最后我们编写测试类;

    package com.hry.spring.boot.simple;
    
    import com.hry.spring.rabbitmq.boot.msgconvert.SendMsgConvertMsg;
    import com.hry.spring.rabbitmq.boot.msgconvert.SpringBootRabbitMsgConvertApplication;
    import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1;
    import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2;
    import com.hry.spring.rabbitmq.boot.raw.SendRawMsg;
    import com.hry.spring.rabbitmq.boot.raw.SpringBootRabbitRawApplication;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import java.util.concurrent.ThreadLocalRandom;
    
    /**
     * 测试类
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes= SpringBootRabbitMsgConvertApplication.class, value = "spring.profiles.active=boot")
    public class MsgConvertTest {
        @Autowired
        private SendMsgConvertMsg sendMsgConvertMsg;
    
        @Test
        public void sendMsgContent() throws Exception {
            // 发送消息对象MsgContent1
            MsgContent1 msgContent1 = new MsgContent1();
            msgContent1.setName("send msg via spring boot - msg convert - MsgContent1");
            msgContent1.setAge("" + ThreadLocalRandom.current().nextInt(100));
            sendMsgConvertMsg.sendMsgContent1(msgContent1);
    
            // 发送消息对象MsgContent2
            MsgContent2 msgContent2 = new MsgContent2();
            msgContent2.setId(ThreadLocalRandom.current().nextInt(100) + "");
            msgContent2.setContent("send msg via spring boot - msg convert - MsgContent1");
            sendMsgConvertMsg.sendMsgContent2(msgContent2);
    
            try {
                Thread.sleep(1000 * 10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    
    }
    

    在控制台我们可以看到输出的结果:

    说明springboot整合rabbitmq成功,可以实现业务功能了。

    注意:这里要注意com.fasterxml.jackson的版本兼容问题,当springboot是使用2.1.4.RELEASE版本时,就会报

    这里一定要将springboot的版本设置为1.5.6.RELEASE,功能才能正常实现。

    本博客的demo如下:https://download.csdn.net/download/weixin_38340967/11180464

    另:本来是想上传demo供大家一起学习的,但是上传资源到CSDN上的时候默认要5积分,还改不了,所以如果有同学想要demo的可以私信我,我邮箱发你就行了!包括erlang和rabbitmq安装包。

      

  • 相关阅读:
    三、springcloud之服务调用Feign
    二、springcloud之熔断器hystrix
    AngularJS中ng-class使用方法
    js中字符串的常用方法
    js中数组的操作
    Linux生成私钥和公钥免密连接
    Jenkins部署码云SpringBoot项目到远程服务器
    Jenkins部署码云SpringBoot项目
    ELK日志分析方案
    Vue项目History模式404问题解决
  • 原文地址:https://www.cnblogs.com/gaopengfirst/p/10871190.html
Copyright © 2011-2022 走看看