zoukankan      html  css  js  c++  java
  • Springboot集成RabbitMQ之MessageConvert源码解析

    问题

    最近在使用RabbitMq时遇到了一个问题,明明是转换成json发送到mq中的数据,消费者接收到的却是一串数字也就是byte数组,但是使用mq可视化页面查看数据却是正常的,之前在使用过程中从未遇到过这种情况,遇到的情况如下所示:

    生产者发送消息的代码如下所示:

    public void sendJsonStrMsg(String jsonStr){
    	rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr);
    }
    

    消费者代码如下所示:

    @RabbitHandler
    @RabbitListener(queuesToDeclare = {@Queue(name=ProducerService.JSON_QUEUE, durable = "true")},containerFactory = "prefetchTenRabbitListenerContainerFactory")
    public void listenJsonMsg(String msg, Channel channel, Message message){
    	log.debug("json字符串类型消息>>>>{}",msg);
    }
    

    引入的containerFactory如下所示:

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() {
    	SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    	MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<x>
    	factory.setConnectionFactory(connectionFactory);
    	factory.setMessageConverter(jackson2JsonMessageConverter);
    	return factory;
    }
    

    注意代码中标有<x>的地方,这里就是我们解决问题的关键。

    解决方案

    我们先说解决方案,再说原因,解决方案其实很简单,在保持上述代码不变的情况下,只需要再注入如下的bean即可:

    @Bean
    public MessageConverter jackson2JsonMessageConverter(){
    	return new Jackson2JsonMessageConverter("*");
    }
    

    解决方案就是这么简单,只需要在原来的代码的基础上注入Jackson2JsonMessageConverter就可以了,但是原理是什么呢?且往后看。

    原理分析

    关于原理的解释我们从源码层面来说,毕竟源码面前没有秘密.

    生产者源码分析

    首先看我们发送消息到mq的方法rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr),从此方法进去后,经过重载的方法后最终到达下面所示的方法:

    @Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
    		@Nullable CorrelationData correlationData) throws AmqpException {
    
    	send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }
    

    着重看convertMessageIfNecessary方法,方法名已经很直白的告诉我们了,如果需要就转换消息,我们点进去看一下这个方法:

    protected Message convertMessageIfNecessary(final Object object) {
    	if (object instanceof Message) { //<1>
    		return (Message) object;
    	}
    	return getRequiredMessageConverter().toMessage(object, new MessageProperties()); //<2>
    }
    

    <1>处是说如果要发送到mq的对象是Message的实例,那么就直接转换成Message类型返回,否则就获取MessageConverter后调用toMessage()方法返回Message对象。

    我们先看一下RabbitTemplate#getRequiredMessageConverter(),如下所示:

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    	MessageConverter converter = getMessageConverter();
    	if (converter == null) {
    		throw new AmqpIllegalStateException(
    				"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
    	}
    	return converter;
    }
    
    public MessageConverter getMessageConverter() {
    	return this.messageConverter; //<1>
    }
    

    <1>处的代码表明需要一个messageConverter对象,我在RabbitTemplate源码中找到了对应的set方法,由于我们没有调用set方法取设置messageConverter的值,那么就需要取查找默认值,默认值的设置如下代码所示:

    /**
     * Convenient constructor for use with setter injection. Don't forget to set the connection factory.
     */
    public RabbitTemplate() {
    	initDefaultStrategies(); // NOSONAR - intentionally overridable; other assertions will check
    }
    
    /**
     * Set up the default strategies. Subclasses can override if necessary.
     设置默认策略,子类在必须的时候可以重写
     */
    protected void initDefaultStrategies() {
    	setMessageConverter(new SimpleMessageConverter());
    }
    
    public void setMessageConverter(MessageConverter messageConverter) {
    	this.messageConverter = messageConverter;
    }
    

    我们点进去SimpleMessageConverter#toMessage()方法看一下是如何将一个java对象转换成Message对象的,可惜的是在SimpleMessageConverter中未找到toMessage方法,我们在此先看一下SimpleMessageConverter继承情况,类图如下:

    去掉了一些无用的接口和类之后,剩下的类图如下所示,沿着类图向上找,在AbstractMessageConverter中找到了toMessage方法:

    @Override
    public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg,
    		@Nullable Type genericType)
    		throws MessageConversionException {
    
    	MessageProperties messageProperties = messagePropertiesArg;
    	if (messageProperties == null) {
    		messageProperties = new MessageProperties();
    	}
    	Message message = createMessage(object, messageProperties, genericType); //<1>
    	messageProperties = message.getMessageProperties();
    	if (this.createMessageIds && messageProperties.getMessageId() == null) {
    		messageProperties.setMessageId(UUID.randomUUID().toString());
    	}
    	return message;
    }
    

    该方法中没有我们需要的内容,继续看<1>处的方法,该方法需要返回到SimpleMessageConverter中:

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    	byte[] bytes = null;
    	if (object instanceof byte[]) {  //<1>
    		bytes = (byte[]) object;
    		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); //<1.x>
    	}
    	else if (object instanceof String) {  //<2>
    		try {
    			bytes = ((String) object).getBytes(this.defaultCharset);
    		}
    		catch (UnsupportedEncodingException e) {
    			throw new MessageConversionException(
    					"failed to convert to Message content", e);
    		}
    		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//<2.x>
    		messageProperties.setContentEncoding(this.defaultCharset);
    	}
    	else if (object instanceof Serializable) { //<3>
    		try {
    			bytes = SerializationUtils.serialize(object);
    		}
    		catch (IllegalArgumentException e) {
    			throw new MessageConversionException(
    					"failed to convert to serialized Message content", e);
    		}
    		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);//<3.x>
    	}
    	if (bytes != null) {
    		messageProperties.setContentLength(bytes.length);
    		return new Message(bytes, messageProperties);
    	}
    	throw new IllegalArgumentException(getClass().getSimpleName()
    			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());  //<4>
    }
    

    这个方法就比较有意思了,在<1><2><3>三处分别判断了发送的消息是否是byte[]StringSerializable,并且在判断之后将消息的content_type属性分别设置为application/octet-streamtext/plainapplication/x-java-serialized-object三种类型,除了以上三种类型之外的数据将被抛出异常,很显然我们前面发送的是字符串消息,那么content_type属性的值必定是text/plain了,可以在mq可视化页面上看到:

    经过以上的步骤,java对象已经转换成Message对象并且发送到了MQ中,下面就是消费者的源码分析了。

    消费者源码分析

    本来想从SpringBoot启动开始到Bean加载、注册一直到获取消息的源码分析下来的,奈何IoC和AOP的源码还没看完,实在是心有余而力不足,此处留个坑待以后再战。

    前面生产者是调用MessageConverter.toMessage()方法将java对象转换成Message对象发送到MQ中的,那么消费者应该是反其道而行之,调用MessageConverter.formMessage()方法将Message对象转换成java对象,我们就从formMessage方法开始看,生产者使用的是SimpleMessageConverter,那么此处还是查看此类的fromMessage方法:

    /**
     * Converts from a AMQP Message to an Object.
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
    	Object content = null;
    	MessageProperties properties = message.getMessageProperties();  
    	if (properties != null) {
    		String contentType = properties.getContentType();//<1>
    		if (contentType != null && contentType.startsWith("text")) { //<2>
    			String encoding = properties.getContentEncoding();
    			if (encoding == null) {
    				encoding = this.defaultCharset;
    			}
    			try {
    				content = new String(message.getBody(), encoding);
    			}
    			catch (UnsupportedEncodingException e) {
    				throw new MessageConversionException(
    						"failed to convert text-based Message content", e);
    			}
    		}
    		else if (contentType != null &&
    				contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) { //<3>
    			try {
    				content = SerializationUtils.deserialize(
    						createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
    			}
    			catch (IOException | IllegalArgumentException | IllegalStateException e) {
    				throw new MessageConversionException(
    						"failed to convert serialized Message content", e);
    			}
    		}
    	}
    	if (content == null) {
    		content = message.getBody(); //<4>
    	}
    	return content;
    }
    

    以上代码很容易理解
    <1>处是从消息属性中获取到消息的content_type属性
    <2>处和<3>处则是分别判断是否text/plain以及application/x-java-serialized-object
    如果以上两种都不符合,那么只能是调用message.getBody()返回一个byte[]类型的byte数组,这也就是文章开头返回一串数字的由来。

    问题解决

    虽然消费者源码分析得到了一个返回一串数字的缘由,但是这并不是造成本次问题的根本原因,我们再回顾一下问题中消费者所使用的一个containerFactory

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() {
    	SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    	MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<1>
    	factory.setConnectionFactory(connectionFactory);
    	factory.setMessageConverter(jackson2JsonMessageConverter); //<2>
    	return factory;
    }
    

    <1><2>处使用的messageConver是Jackson2JsonMessageConverter,通过前面类图我们可以知道它也是实现了MessageConvert接口,我们看一下这个类的源码:

    /**
     * JSON converter that uses the Jackson 2 Json library.
     */
    public class Jackson2JsonMessageConverter extends AbstractJackson2MessageConverter {
    
    	public Jackson2JsonMessageConverter() {
    		this("*");
    	}
    
    	public Jackson2JsonMessageConverter(String... trustedPackages) {
    		this(new ObjectMapper(), trustedPackages);
    		this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    	}
    
    	public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper) {
    		this(jsonObjectMapper, "*");
    	}
    
    	public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper, String... trustedPackages) {
    		super(jsonObjectMapper, MimeTypeUtils.parseMimeType(MessageProperties.CONTENT_TYPE_JSON), trustedPackages);  //<1>
    	}
    
    }
    

    我删掉了一些无用的代码以及注释,可以在类注释上很显然的看到这个转换器是使用jackson的JSON转换器,也就是说这个转换器只对json数据有效,该类中并没有找到fromMessage和toMessage方法,那么只能从其父类AbstractJackson2MessageConverter中查找fromMessage方法,如下所示,注意上面代码中<1>的地方,传递的content_type类型是application/json

    // AbstractJackson2MessageConverter
    
    @Override
    public Object fromMessage(Message message, @Nullable Object conversionHint) throws MessageConversionException {
    	Object content = null;
    	MessageProperties properties = message.getMessageProperties();
    	if (properties != null) {
    		String contentType = properties.getContentType();//<1>
            //supportedContentType即为构造函数中传递的MimeType
    		if (contentType != null && contentType.contains(this.supportedContentType.getSubtype())) {//<2>
    			String encoding = properties.getContentEncoding();
    			if (encoding == null) {
    				encoding = getDefaultCharset();
    			}
    			try {
    				if (conversionHint instanceof ParameterizedTypeReference) {
    					content = convertBytesToObject(message.getBody(), encoding,
    							this.objectMapper.getTypeFactory().constructType(
    									((ParameterizedTypeReference<?>) conversionHint).getType()));
    				}
    				else if (getClassMapper() == null) {
    					JavaType targetJavaType = getJavaTypeMapper()
    							.toJavaType(message.getMessageProperties());
    					content = convertBytesToObject(message.getBody(),
    							encoding, targetJavaType);
    				}
    				else {
    					Class<?> targetClass = getClassMapper().toClass(// NOSONAR never null
    							message.getMessageProperties());
    					content = convertBytesToObject(message.getBody(),
    							encoding, targetClass);
    				}
    			}
    			catch (IOException e) {
    				throw new MessageConversionException(
    						"Failed to convert Message content", e);
    			}
    		}
    		else {
    			if (this.log.isWarnEnabled()) {
    				this.log.warn("Could not convert incoming message with content-type ["
    						+ contentType + "], '" + this.supportedContentType.getSubtype() + "' keyword missing."); //<3>
    			}
    		}
    	}
    	if (content == null) {
    		content = message.getBody();
    	}
    	return content;
    }
    

    上述代码可以这么理解,Jackson2JsonMessageConverter初始化时将json格式的content_type传递到父类AbstractJackson2MessageConverter中,当消费者将Message消息转换为Java对象时实际上是调用的AbstractJackson2MessageConverter#fromMessage()方法,由于该方法只支持json格式的content_type,因此执行了<3>处的代码,打印出了文章开头所示的提示信息。

    因此最终的解决方案其实有2种

    1.发送消息时也使用Jackson2JsonMessageConverter,这种方式用来支持json格式的数据传输;
    2.删除containerFactory中设置的MessageConvert,使用默认的SimpleMessageConverter,这样就只支持字符串、byte数组以及序列化对象三种消息了。

  • 相关阅读:
    Ubuntu 14.04 卸载通过源码安装的库
    Ubuntu 14.04 indigo 相关依赖
    Ubuntu 14.04 indigo 安装 cartographer 1.0.0
    Ubuntu 14.04 改变文件或者文件夹的拥有者
    安装cartographer遇到Unrecognized syntax identifier "proto3". This parser only recognizes "proto2"问题
    Unrecognized syntax identifier "proto3". This parser only recognizes "proto2". ”问题解决方法
    查看所有用户组,用户名
    1卸载ROS
    Ubuntu14.04 软件安装卸载
    Ubuntu14.04系统显示器不自动休眠修改
  • 原文地址:https://www.cnblogs.com/ybyn/p/13945456.html
Copyright © 2011-2022 走看看