zoukankan      html  css  js  c++  java
  • spring boot rabbitmq整合rabbitmq之消息持久化存储

    说明:该文档中的源码来自于:spring-rabbit-2.1.8.RELEASE.jar

    rabbitmq消息持久化存储包含一下三个方面

      1、exchange的持久化

      2、queue的持久化

      3、message的持久化

    exchange的持久化

      在申明exchange的时候,有个参数:durable。当该参数为true,则对该exchange做持久化,重启rabbitmq服务器,该exchange不会消失。durable的默认值为true

    public class DirectExchange extends AbstractExchange {
        public static final DirectExchange DEFAULT = new DirectExchange("");
    
        public DirectExchange(String name) {
            super(name);
        }
    
        public DirectExchange(String name, boolean durable, boolean autoDelete) {
            super(name, durable, autoDelete);
        }
    
        public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
            super(name, durable, autoDelete, arguments);
        }
    
        public final String getType() {
            return "direct";
        }
    }
    public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
        private final String name;
        private final boolean durable;
        private final boolean autoDelete;
        private final Map<String, Object> arguments;
        private volatile boolean delayed;
        private boolean internal;
    
        public AbstractExchange(String name) {
            this(name, true, false);
        }
    
        public AbstractExchange(String name, boolean durable, boolean autoDelete) {
            this(name, durable, autoDelete, (Map)null);
        }
    
        public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
            this.name = name;
            this.durable = durable;
            this.autoDelete = autoDelete;
            if (arguments != null) {
                this.arguments = arguments;
            } else {
                this.arguments = new HashMap();
            }
    
        }

    queue的持久化

       申明队列时也有个参数:durable。当该参数为true,则对该queue做持久化,重启rabbitmq服务器,该queue不会消失。durable的默认值为true

    public Queue(String name) {
            this(name, true, false, false);
        }
    
        public Queue(String name, boolean durable) {
            this(name, durable, false, false, (Map)null);
        }
    
        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
            this(name, durable, exclusive, autoDelete, (Map)null);
        }
    
        public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
            Assert.notNull(name, "'name' cannot be null");
            this.name = name;
            this.actualName = StringUtils.hasText(name) ? name : Base64UrlNamingStrategy.DEFAULT.generateName() + "_awaiting_declaration";
            this.durable = durable;
            this.exclusive = exclusive;
            this.autoDelete = autoDelete;
            this.arguments = (Map)(arguments != null ? arguments : new HashMap());
        }

    message的持久化

      前面我们已经讲到exchange与queue的持久化,那么message如何持久化呢?

      我们在使用rabbit-client做消息持久化时,设置了BasicProperties的deliveryMode为2,做消息的持久化。

    AMQP.BasicProperties properties = new AMQP.BasicProperties.
                    Builder().
                    deliveryMode(2).
                    build();
    
            channel.basicPublish("ex.pc", "key.pc",  properties, "hello world".getBytes());

    那么整合了spring boot,使用RabbitTemplate如何做持久化?

    首先,我们来到经常的使用的消息发送方法:RabbitTemplate类下的convertAndSend

    @Override
        public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
            convertAndSend(exchange, routingKey, object, (CorrelationData) null);
        }

    然后调用了该类下的重载方法:convertAndSend。该方法中将object 转换成了message

    @Override
        public void convertAndSend(String exchange, String routingKey, final Object object,
                @Nullable CorrelationData correlationData) throws AmqpException {
    
            send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
        }

    在做消息转换的时候,我们注意到,传入了一个MessageProperties对象

    protected Message convertMessageIfNecessary(final Object object) {
            if (object instanceof Message) {
                return (Message) object;
            }
            return getRequiredMessageConverter().toMessage(object, new MessageProperties());
        }

    在MessageProperties中,有个deliveryMode属性,该属性默认值为:MessageDeliveryMode.PERSISTENT(持久化的)

     public MessageProperties() {
            this.deliveryMode = DEFAULT_DELIVERY_MODE;
            this.priority = DEFAULT_PRIORITY;
        }

    static {
    DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
    DEFAULT_PRIORITY = 0;
    }

    消息转换完成后,调用时同类方法的send方法

    @Override
        public void send(final String exchange, final String routingKey,
                final Message message, @Nullable final CorrelationData correlationData)
                throws AmqpException {
            execute(channel -> {
                doSend(channel, exchange, routingKey, message,
                        (RabbitTemplate.this.returnCallback != null
                                || (correlationData != null && StringUtils.hasText(correlationData.getId())))
                                && RabbitTemplate.this.mandatoryExpression.getValue(
                                        RabbitTemplate.this.evaluationContext, message, Boolean.class),
                        correlationData);
                return null;
            }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
        }

    该方法又调用了doSend方法

    public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message, // NOSONAR complexity
                boolean mandatory, @Nullable CorrelationData correlationData)
                        throws Exception { // NOSONAR TODO: change to IOException in 2.2.
    
            String exch = exchangeArg;
            String rKey = routingKeyArg;
            if (exch == null) {
                exch = this.exchange;
            }
            if (rKey == null) {
                rKey = this.routingKey;
            }
            if (logger.isDebugEnabled()) {
                logger.debug("Publishing message " + message
                        + "on exchange [" + exch + "], routingKey = [" + rKey + "]");
            }
    
            Message messageToUse = message;
            MessageProperties messageProperties = messageToUse.getMessageProperties();
            if (mandatory) {
                messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_LISTENER_CORRELATION_KEY, this.uuid);
            }
            if (this.beforePublishPostProcessors != null) {
                for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
                    messageToUse = processor.postProcessMessage(messageToUse, correlationData);
                }
            }
            setupConfirm(channel, messageToUse, correlationData);
            if (this.userIdExpression != null && messageProperties.getUserId() == null) {
                String userId = this.userIdExpression.getValue(this.evaluationContext, messageToUse, String.class);
                if (userId != null) {
                    messageProperties.setUserId(userId);
                }
            }
            sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
            // Check if commit needed
            if (isChannelLocallyTransacted(channel)) {
                // Transacted channel created by this template -> commit.
                RabbitUtils.commitIfNecessary(channel);
            }
        }

    在该方法中我们终于看到了发送消息到rabbitmq的操作:sendToRabbit。该方法将MessageProperties对象转换成了BasicProperties。至此,我们终于了解了,spring rabbit 中如何实现messge的持久化。默认的message就是持久化的

    protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
                Message message) throws IOException {
            BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                    .fromMessageProperties(message.getMessageProperties(), this.encoding);
            channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
        }

    如何改变message的持久化属性?

      根据上面的源码分析,spring中默认的message就是持久化的,如何改变持久化属性?

      1、使用send方法,发送message。设置message中MessageProperties的属性deliveryMode

      2、自定义MessageConverter,在消息转换时,设置MessageProperties的属性deliveryMode

      3、自定MessagePropertiesConverter,在MessageProperties对象转换成BasicProperties时,设置deliveryMode

  • 相关阅读:
    上周热点回顾(11.2912.5)
    上周热点回顾(11.1511.21)
    上周热点回顾(11.2211.28)
    上周热点回顾(12.1312.19)
    Bambook程序达人赛报名公告
    HTML5技术专题上线啦!
    “博客无双,以文会友”活动公告
    上周热点回顾(12.612.12)
    [转]Java RMI之HelloWorld篇
    中国现代远程与继续教育网 统考 大学英语(B)考试大纲
  • 原文地址:https://www.cnblogs.com/damon-blogs/p/14047217.html
Copyright © 2011-2022 走看看