一.MessageListenerAdapter
消息监听适配器
配置:
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
//设置监听的队列
container.setQueues(queue());
//设置当前消费者数量
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(5);
//重回队列
container.setDefaultRequeueRejected(false);
//签收机制
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
//消费端标签策略
container.setConsumerTagStrategy(queue-> queue+"_"+ UUID.randomUUID().toString());
//消息适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
container.setMessageListener(adapter);
return container;
}
MessageDelegate类:
public class MessageDelegate {
public void handleMessage(byte[] messageBody){
System.out.println("默认方法,消息:"+new String(messageBody));
}
public void handleMessage(String messageBody){
System.out.println("字符串方法,消息"+messageBody);
}
}
注意,自定义的Delegate类,默认方法名是handlermessage

当然,想自定义方法名,可以setDefaultListenerMethod(),这里不再演示。
测试字符串:
@Test
public void testSend1(){
rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring");
}
测试字节数组:
@Test
public void testSend2(){
rabbitTemplate.convertAndSend("amqp.bean.topic", "amqp.send","hello spring2".getBytes());
}

此外也可以自定义转换器
public class MyMessageConvert implements MessageConverter {
/**
* java对象转换为Message对象
*/
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(object.toString().getBytes(),messageProperties);
}
/**
* message对象转换为java对象
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
return new String(message.getBody());
}
}
在适配器中配置转换器
//消息适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new MyMessageConvert());
container.setMessageListener(adapter);
配置了消息转换器后,现在无论发送的消息是字符串还是字节数组,适配器都直接进入handleMessage(String messageBody)方法。
适配器还可以将队列名称与方法名称进行一一匹配。
需要传入一个map,key是队列名,value是方法名
adapter.setQueueOrTagToMethodName(queueName,method);
源码:
/**
* Set the mapping of queue name or consumer tag to method name. The first lookup
* is by queue name, if that returns null, we lookup by consumer tag, if that
* returns null, the {@link #setDefaultListenerMethod(String) defaultListenerMethod}
* is used.
* @param queueOrTagToMethodName the map.
* @since 1.5
*/
public void setQueueOrTagToMethodName(Map<String, String> queueOrTagToMethodName) {
this.queueOrTagToMethodName.putAll(queueOrTagToMethodName);
}
当队列与方法绑定后,队列里的消息会被绑定的方法所处理。
二.MessageConverter
消息转换器,一般自定义转换器需要实现这个接口,重写toMessage和fromMessage方法。
Json转换器:Jackson2JsonMessageConverter,可以进行java对象的转换功能
先加上jackson依赖:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.9</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
json测试:
在delegate类中定义处理json的方法:
public void handleMessage(Map messageBody){
System.out.println("json转换,消息:"+messageBody);
}
在adapter中配置Jackson2JsonMessageConverter
//消息适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
//adapter.setMessageConverter(new MyMessageConvert());
//json形式转换器
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(converter);
container.setMessageListener(adapter);
测试发送消息:
@Test
public void testJson() throws JsonProcessingException {
User user = new User("张三",22);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(user);
MessageProperties properties = new MessageProperties();
properties.setContentType("application/json");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.send("amqp.bean.topic", "amqp.send",message);
}
测试结果:

配置DefaultJackson2JavaTypeMapper映射器:进行java对象的映射关系
//消息适配器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
//adapter.setMessageConverter(new MyMessageConvert());
//json形式转换器
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper();
//设置安全包
mapper.setTrustedPackages("com.wj.springamqp.domain");
converter.setJavaTypeMapper(mapper);
adapter.setMessageConverter(converter);
container.setMessageListener(adapter);
注意要setTrustedPackages,否则会报如下错误。

测试类:
@Test
public void testJson() throws JsonProcessingException {
User user = new User("张三",22);
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(user);
MessageProperties properties = new MessageProperties();
properties.setContentType("application/json");
//key:_TypeId_ value:类的全路径
//加上去后,就可以支持json转换为java对象
properties.getHeaders().put("__TypeId__","com.wj.springamqp.domain.User");
Message message = new Message(json.getBytes(), properties);
rabbitTemplate.send("amqp.bean.topic", "amqp.send",message);
}
注意!!!"__TypeId__"这里下划线是四个,前面两个后面两个,不然无法转为java对象。
Jackson2JsonMessageConverter和DefaultJackson2JavaTypeMapper支持java对象多映射转换。
//json形式转换器
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper mapper = new DefaultJackson2JavaTypeMapper();
//设置安全包
mapper.setTrustedPackages("com.wj.springamqp.domain");
Map<String, Class<?>> map = new HashMap<>();
map.put("user", com.wj.springamqp.domain.User.class);
map.put("stu", com.wj.springamqp.domain.Stu.class);
mapper.setIdClassMapping(map);
converter.setJavaTypeMapper(mapper);
此外MessageConverter支持pdf,image的转换,在发送消息的时候,需要将文件转换为二进制字节流,然后在转换器中生成文件。