zoukankan      html  css  js  c++  java
  • Springboot集成RabbitMQ之MessageConvert源码解析

    问题

    最近在使用RabbitMq时遇到了一个问题,明明是转换成json发送到mq中的数据,消费者接收到的却是一串数字也就是byte数组,但是使用mq可视化页面查看数据却是正常的,之前在使用过程中从未遇到过这种情况,遇到的情况如下所示:

    生产者发送消息的代码如下所示:

    public void sendJsonStrMsg(String jsonStr){
    	rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr);
    }
    

    消费者代码如下所示:

    @RabbitHandler
    @RabbitListener(queuesToDeclare = {@Queue(name=ProducerService.JSON_QUEUE, durable = "true")},containerFactory = "prefetchTenRabbitListenerContainerFactory")
    public void listenJsonMsg(String msg, Channel channel, Message message){
    	log.debug("json字符串类型消息>>>>{}",msg);
    }
    

    引入的containerFactory如下所示:

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() {
    	SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    	MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<x>
    	factory.setConnectionFactory(connectionFactory);
    	factory.setMessageConverter(jackson2JsonMessageConverter);
    	return factory;
    }
    

    注意代码中标有<x>的地方,这里就是我们解决问题的关键。

    解决方案

    我们先说解决方案,再说原因,解决方案其实很简单,在保持上述代码不变的情况下,只需要再注入如下的bean即可:

    @Bean
    public MessageConverter jackson2JsonMessageConverter(){
    	return new Jackson2JsonMessageConverter("*");
    }
    

    解决方案就是这么简单,只需要在原来的代码的基础上注入Jackson2JsonMessageConverter就可以了,但是原理是什么呢?且往后看。

    原理分析

    关于原理的解释我们从源码层面来说,毕竟源码面前没有秘密.

    生产者源码分析

    首先看我们发送消息到mq的方法rabbitTemplate.convertAndSend(JSON_QUEUE, jsonStr),从此方法进去后,经过重载的方法后最终到达下面所示的方法:

    @Override
    public void convertAndSend(String exchange, String routingKey, final Object object,
    		@Nullable CorrelationData correlationData) throws AmqpException {
    
    	send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
    }
    

    着重看convertMessageIfNecessary方法,方法名已经很直白的告诉我们了,如果需要就转换消息,我们点进去看一下这个方法:

    protected Message convertMessageIfNecessary(final Object object) {
    	if (object instanceof Message) { //<1>
    		return (Message) object;
    	}
    	return getRequiredMessageConverter().toMessage(object, new MessageProperties()); //<2>
    }
    

    <1>处是说如果要发送到mq的对象是Message的实例,那么就直接转换成Message类型返回,否则就获取MessageConverter后调用toMessage()方法返回Message对象。

    我们先看一下RabbitTemplate#getRequiredMessageConverter(),如下所示:

    private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    	MessageConverter converter = getMessageConverter();
    	if (converter == null) {
    		throw new AmqpIllegalStateException(
    				"No 'messageConverter' specified. Check configuration of RabbitTemplate.");
    	}
    	return converter;
    }
    
    public MessageConverter getMessageConverter() {
    	return this.messageConverter; //<1>
    }
    

    <1>处的代码表明需要一个messageConverter对象,我在RabbitTemplate源码中找到了对应的set方法,由于我们没有调用set方法取设置messageConverter的值,那么就需要取查找默认值,默认值的设置如下代码所示:

    /**
     * Convenient constructor for use with setter injection. Don't forget to set the connection factory.
     */
    public RabbitTemplate() {
    	initDefaultStrategies(); // NOSONAR - intentionally overridable; other assertions will check
    }
    
    /**
     * Set up the default strategies. Subclasses can override if necessary.
     设置默认策略,子类在必须的时候可以重写
     */
    protected void initDefaultStrategies() {
    	setMessageConverter(new SimpleMessageConverter());
    }
    
    public void setMessageConverter(MessageConverter messageConverter) {
    	this.messageConverter = messageConverter;
    }
    

    我们点进去SimpleMessageConverter#toMessage()方法看一下是如何将一个java对象转换成Message对象的,可惜的是在SimpleMessageConverter中未找到toMessage方法,我们在此先看一下SimpleMessageConverter继承情况,类图如下:

    去掉了一些无用的接口和类之后,剩下的类图如下所示,沿着类图向上找,在AbstractMessageConverter中找到了toMessage方法:

    @Override
    public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg,
    		@Nullable Type genericType)
    		throws MessageConversionException {
    
    	MessageProperties messageProperties = messagePropertiesArg;
    	if (messageProperties == null) {
    		messageProperties = new MessageProperties();
    	}
    	Message message = createMessage(object, messageProperties, genericType); //<1>
    	messageProperties = message.getMessageProperties();
    	if (this.createMessageIds && messageProperties.getMessageId() == null) {
    		messageProperties.setMessageId(UUID.randomUUID().toString());
    	}
    	return message;
    }
    

    该方法中没有我们需要的内容,继续看<1>处的方法,该方法需要返回到SimpleMessageConverter中:

    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
    	byte[] bytes = null;
    	if (object instanceof byte[]) {  //<1>
    		bytes = (byte[]) object;
    		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_BYTES); //<1.x>
    	}
    	else if (object instanceof String) {  //<2>
    		try {
    			bytes = ((String) object).getBytes(this.defaultCharset);
    		}
    		catch (UnsupportedEncodingException e) {
    			throw new MessageConversionException(
    					"failed to convert to Message content", e);
    		}
    		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//<2.x>
    		messageProperties.setContentEncoding(this.defaultCharset);
    	}
    	else if (object instanceof Serializable) { //<3>
    		try {
    			bytes = SerializationUtils.serialize(object);
    		}
    		catch (IllegalArgumentException e) {
    			throw new MessageConversionException(
    					"failed to convert to serialized Message content", e);
    		}
    		messageProperties.setContentType(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT);//<3.x>
    	}
    	if (bytes != null) {
    		messageProperties.setContentLength(bytes.length);
    		return new Message(bytes, messageProperties);
    	}
    	throw new IllegalArgumentException(getClass().getSimpleName()
    			+ " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());  //<4>
    }
    

    这个方法就比较有意思了,在<1><2><3>三处分别判断了发送的消息是否是byte[]StringSerializable,并且在判断之后将消息的content_type属性分别设置为application/octet-streamtext/plainapplication/x-java-serialized-object三种类型,除了以上三种类型之外的数据将被抛出异常,很显然我们前面发送的是字符串消息,那么content_type属性的值必定是text/plain了,可以在mq可视化页面上看到:

    经过以上的步骤,java对象已经转换成Message对象并且发送到了MQ中,下面就是消费者的源码分析了。

    消费者源码分析

    本来想从SpringBoot启动开始到Bean加载、注册一直到获取消息的源码分析下来的,奈何IoC和AOP的源码还没看完,实在是心有余而力不足,此处留个坑待以后再战。

    前面生产者是调用MessageConverter.toMessage()方法将java对象转换成Message对象发送到MQ中的,那么消费者应该是反其道而行之,调用MessageConverter.formMessage()方法将Message对象转换成java对象,我们就从formMessage方法开始看,生产者使用的是SimpleMessageConverter,那么此处还是查看此类的fromMessage方法:

    /**
     * Converts from a AMQP Message to an Object.
     */
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
    	Object content = null;
    	MessageProperties properties = message.getMessageProperties();  
    	if (properties != null) {
    		String contentType = properties.getContentType();//<1>
    		if (contentType != null && contentType.startsWith("text")) { //<2>
    			String encoding = properties.getContentEncoding();
    			if (encoding == null) {
    				encoding = this.defaultCharset;
    			}
    			try {
    				content = new String(message.getBody(), encoding);
    			}
    			catch (UnsupportedEncodingException e) {
    				throw new MessageConversionException(
    						"failed to convert text-based Message content", e);
    			}
    		}
    		else if (contentType != null &&
    				contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) { //<3>
    			try {
    				content = SerializationUtils.deserialize(
    						createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
    			}
    			catch (IOException | IllegalArgumentException | IllegalStateException e) {
    				throw new MessageConversionException(
    						"failed to convert serialized Message content", e);
    			}
    		}
    	}
    	if (content == null) {
    		content = message.getBody(); //<4>
    	}
    	return content;
    }
    

    以上代码很容易理解
    <1>处是从消息属性中获取到消息的content_type属性
    <2>处和<3>处则是分别判断是否text/plain以及application/x-java-serialized-object
    如果以上两种都不符合,那么只能是调用message.getBody()返回一个byte[]类型的byte数组,这也就是文章开头返回一串数字的由来。

    问题解决

    虽然消费者源码分析得到了一个返回一串数字的缘由,但是这并不是造成本次问题的根本原因,我们再回顾一下问题中消费者所使用的一个containerFactory

    @Bean
    public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchTenRabbitListenerContainerFactory() {
    	SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    	MessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); //<1>
    	factory.setConnectionFactory(connectionFactory);
    	factory.setMessageConverter(jackson2JsonMessageConverter); //<2>
    	return factory;
    }
    

    <1><2>处使用的messageConver是Jackson2JsonMessageConverter,通过前面类图我们可以知道它也是实现了MessageConvert接口,我们看一下这个类的源码:

    /**
     * JSON converter that uses the Jackson 2 Json library.
     */
    public class Jackson2JsonMessageConverter extends AbstractJackson2MessageConverter {
    
    	public Jackson2JsonMessageConverter() {
    		this("*");
    	}
    
    	public Jackson2JsonMessageConverter(String... trustedPackages) {
    		this(new ObjectMapper(), trustedPackages);
    		this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    	}
    
    	public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper) {
    		this(jsonObjectMapper, "*");
    	}
    
    	public Jackson2JsonMessageConverter(ObjectMapper jsonObjectMapper, String... trustedPackages) {
    		super(jsonObjectMapper, MimeTypeUtils.parseMimeType(MessageProperties.CONTENT_TYPE_JSON), trustedPackages);  //<1>
    	}
    
    }
    

    我删掉了一些无用的代码以及注释,可以在类注释上很显然的看到这个转换器是使用jackson的JSON转换器,也就是说这个转换器只对json数据有效,该类中并没有找到fromMessage和toMessage方法,那么只能从其父类AbstractJackson2MessageConverter中查找fromMessage方法,如下所示,注意上面代码中<1>的地方,传递的content_type类型是application/json

    // AbstractJackson2MessageConverter
    
    @Override
    public Object fromMessage(Message message, @Nullable Object conversionHint) throws MessageConversionException {
    	Object content = null;
    	MessageProperties properties = message.getMessageProperties();
    	if (properties != null) {
    		String contentType = properties.getContentType();//<1>
            //supportedContentType即为构造函数中传递的MimeType
    		if (contentType != null && contentType.contains(this.supportedContentType.getSubtype())) {//<2>
    			String encoding = properties.getContentEncoding();
    			if (encoding == null) {
    				encoding = getDefaultCharset();
    			}
    			try {
    				if (conversionHint instanceof ParameterizedTypeReference) {
    					content = convertBytesToObject(message.getBody(), encoding,
    							this.objectMapper.getTypeFactory().constructType(
    									((ParameterizedTypeReference<?>) conversionHint).getType()));
    				}
    				else if (getClassMapper() == null) {
    					JavaType targetJavaType = getJavaTypeMapper()
    							.toJavaType(message.getMessageProperties());
    					content = convertBytesToObject(message.getBody(),
    							encoding, targetJavaType);
    				}
    				else {
    					Class<?> targetClass = getClassMapper().toClass(// NOSONAR never null
    							message.getMessageProperties());
    					content = convertBytesToObject(message.getBody(),
    							encoding, targetClass);
    				}
    			}
    			catch (IOException e) {
    				throw new MessageConversionException(
    						"Failed to convert Message content", e);
    			}
    		}
    		else {
    			if (this.log.isWarnEnabled()) {
    				this.log.warn("Could not convert incoming message with content-type ["
    						+ contentType + "], '" + this.supportedContentType.getSubtype() + "' keyword missing."); //<3>
    			}
    		}
    	}
    	if (content == null) {
    		content = message.getBody();
    	}
    	return content;
    }
    

    上述代码可以这么理解,Jackson2JsonMessageConverter初始化时将json格式的content_type传递到父类AbstractJackson2MessageConverter中,当消费者将Message消息转换为Java对象时实际上是调用的AbstractJackson2MessageConverter#fromMessage()方法,由于该方法只支持json格式的content_type,因此执行了<3>处的代码,打印出了文章开头所示的提示信息。

    因此最终的解决方案其实有2种

    1.发送消息时也使用Jackson2JsonMessageConverter,这种方式用来支持json格式的数据传输;
    2.删除containerFactory中设置的MessageConvert,使用默认的SimpleMessageConverter,这样就只支持字符串、byte数组以及序列化对象三种消息了。

  • 相关阅读:
    MySQL 性能调优之索引
    MySQL 性能调优之存储引擎
    MySQL数据类型优化—整数类型优化选择
    MySQL数据性能优化-修改方法与步骤
    MySQL设计SQL语句优化规范
    MySQL索引的设计、使用和优化
    MySQL的SQL语句优化-group by语句的优化
    SQL性能优化-order by语句的优化
    MySQL查询优化注意下面的四个细节
    优化MySQL性能的几种方法-总结
  • 原文地址:https://www.cnblogs.com/ybyn/p/13945456.html
Copyright © 2011-2022 走看看