在现在的项目开发过程中,消息中间件使用的越来越多,一般用的比较多的消息中间件有rabbitmq、activemq、rocketmq、kafka等。那么今天,我们来学习springboot整合rabbitmq。
在整合rabbitmq的时候,我们先要在本地下载安装rabbitmq,而rabbitmq是用erlang语言开发的,所以在安装rabbitmq之前,我们需要先安装erlang。具体下载安装的步骤我在这里就不再赘述了,可以参照这篇文章进行安装:https://www.jianshu.com/p/3d43561bb3ee
在安装好了erlang和rabbitmq之后,我们就开始整合。
1、首先我们需要在pom.xml中添加rabbitmq的依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、配置application-boot.yml:
spring: # 配置rabbitMQspring: rabbitmq: host: 127.0.0.1 username: guest password: guest
3、创建两个POJO实体类:MsgContent1和MsgContent2
package com.hry.spring.rabbitmq.boot.msgconvert.pojo; /** * 测试发送对象 */ public class MsgContent1 { private String name; private String age; @Override public String toString(){ return "[ name = " + name + "; " + " age = " + age + " ]"; } public String getName() { return name; } public void setName(String name) { this.name = name; } public String getAge() { return age; } public void setAge(String age) { this.age = age; } }
package com.hry.spring.rabbitmq.boot.msgconvert.pojo; /** * 测试发送对象 */ public class MsgContent2 { private String id; private String content; @Override public String toString(){ return "[ id = " + id + "; " + " content = " + content + " ]"; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }
4、设置序列化类RabbitMsgConvertConfigure
package com.hry.spring.rabbitmq.boot.msgconvert; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 配置RabbitMQ中使用到队列、交换机、绑定等信息 */ @Configuration public class RabbitMsgConvertConfigure { // 队列名称 public final static String SPRING_BOOT_QUEUE = "spring-boot-queue-msg-convert"; // 交换机名称 public final static String SPRING_BOOT_EXCHANGE = "spring-boot-exchange-msg-convert"; // 绑定的值 public static final String SPRING_BOOT_BIND_KEY = "spring-boot-bind-key-msg-convert"; // === 在RabbitMQ上创建queue,exchange,binding 方法一:通过@Bean实现 begin === /** * 定义队列: * @return */ @Bean Queue queue() { return new Queue(SPRING_BOOT_QUEUE, false); } /** * 定义交换机 * @return */ @Bean TopicExchange exchange() { return new TopicExchange(SPRING_BOOT_EXCHANGE); } /** * 定义绑定 * @param queue * @param exchange * @return */ @Bean Binding binding(Queue queue, TopicExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(SPRING_BOOT_BIND_KEY ); } /** * 定义消息转换实例 * @return */ @Bean MessageConverter jackson2JsonMessageConverter() { return new Jackson2JsonMessageConverter(); } // === 如果默认的SimpleMessageListenerContainer不符合我们的要求,我们也可以通过如下的方式创建新的SimpleMessageListenerContainer=== // @Bean // SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { // SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); // container.setConnectionFactory(connectionFactory); // container.setMessageConverter(). // container.setConcurrentConsumers(10); // return container; // } // @Bean // MessageListenerAdapter listenerAdapter(ProductMessageListener receiver) { // return new MessageListenerAdapter(receiver, "receiveMessage"); // } }
5、消息发送者:
package com.hry.spring.rabbitmq.boot.msgconvert; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2; import org.springframework.amqp.core.AmqpAdmin; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 消息发送者 */ @Component public class SendMsgConvertMsg { // 此接口的默认实现是RabbitTemplate,目前只有一个实现, @Autowired private AmqpTemplate amqpTemplate; /** * 发送消息 * * @param msgContent */ public void sendMsgContent1(MsgContent1 msgContent) { amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent ); } /** * 发送消息 * @param msgContent */ public void sendMsgContent2(MsgContent2 msgContent) { amqpTemplate.convertAndSend(RabbitMsgConvertConfigure.SPRING_BOOT_EXCHANGE, RabbitMsgConvertConfigure.SPRING_BOOT_BIND_KEY, msgContent); } }
6、消息接收者:
@RabbitListener定义在类表示此类是消息监听者并设置要监听的队列
@RabbitHandler:在类中可以定义多个@RabbitHandler,spring boot会根据不同参数传送到不同方法处理
package com.hry.spring.rabbitmq.boot.msgconvert; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; @Component // @RabbitListener除了可以作用在方法,也可以作用在类上。在后者的情况下,需要在处理的方法使用@RabbitHandler。一个类可以配置多个@RabbitHandler @RabbitListener(queues = RabbitMsgConvertConfigure.SPRING_BOOT_QUEUE) public class ReceiveMsgConvertMsg { /** * 获取信息: * queue也可以支持RabbitMQ中对队列的模糊匹配 * @param content */ @RabbitHandler public void receiveMsgContent1(MsgContent1 content) { // ... System.out.println("[ReceiveMsgConvertMsg-MsgContent1] receive receiveMsgContent1 msg: " + content); } @RabbitHandler public void receiveMsgContent2(MsgContent2 msgContent2) { // ... System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive receiveMsgContent2 msg: " + msgContent2); } // @RabbitHandler // public void receiveString(@Payload String content) { // // ... // System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content); // } // // @RabbitHandler // public void receiveStringb(byte[] content) { // // ... // System.out.println("[ReceiveMsgConvertMsg-MsgContent2] receive msg: " + content); // } }
7、最后我们编写测试类;
package com.hry.spring.boot.simple; import com.hry.spring.rabbitmq.boot.msgconvert.SendMsgConvertMsg; import com.hry.spring.rabbitmq.boot.msgconvert.SpringBootRabbitMsgConvertApplication; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent1; import com.hry.spring.rabbitmq.boot.msgconvert.pojo.MsgContent2; import com.hry.spring.rabbitmq.boot.raw.SendRawMsg; import com.hry.spring.rabbitmq.boot.raw.SpringBootRabbitRawApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.ThreadLocalRandom; /** * 测试类 */ @RunWith(SpringRunner.class) @SpringBootTest(classes= SpringBootRabbitMsgConvertApplication.class, value = "spring.profiles.active=boot") public class MsgConvertTest { @Autowired private SendMsgConvertMsg sendMsgConvertMsg; @Test public void sendMsgContent() throws Exception { // 发送消息对象MsgContent1 MsgContent1 msgContent1 = new MsgContent1(); msgContent1.setName("send msg via spring boot - msg convert - MsgContent1"); msgContent1.setAge("" + ThreadLocalRandom.current().nextInt(100)); sendMsgConvertMsg.sendMsgContent1(msgContent1); // 发送消息对象MsgContent2 MsgContent2 msgContent2 = new MsgContent2(); msgContent2.setId(ThreadLocalRandom.current().nextInt(100) + ""); msgContent2.setContent("send msg via spring boot - msg convert - MsgContent1"); sendMsgConvertMsg.sendMsgContent2(msgContent2); try { Thread.sleep(1000 * 10); } catch (InterruptedException e) { e.printStackTrace(); } } }
在控制台我们可以看到输出的结果:
说明springboot整合rabbitmq成功,可以实现业务功能了。
注意:这里要注意com.fasterxml.jackson的版本兼容问题,当springboot是使用2.1.4.RELEASE版本时,就会报
这里一定要将springboot的版本设置为1.5.6.RELEASE,功能才能正常实现。
本博客的demo如下:https://download.csdn.net/download/weixin_38340967/11180464
另:本来是想上传demo供大家一起学习的,但是上传资源到CSDN上的时候默认要5积分,还改不了,所以如果有同学想要demo的可以私信我,我邮箱发你就行了!包括erlang和rabbitmq安装包。