zoukankan      html  css  js  c++  java
  • springboot集成使用rabbitmq笔记(3.基本过程)

    1. 使用rabbitmq笔记一
    2. 使用rabbitmq笔记二
    3. 使用rabbitmq笔记三

    1.AMQP协议

    AMQP 0-9-1的工作过程如下图:消息(message)被发布者(publisher)发送给交换机(exchange),交换机常常被比喻成邮局或者邮箱。然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    2.相关组件

    Connection:用于权限控制的虚拟主机,一个虚拟主机可以包含多个交换机、队列、绑定,可以设置不同账号,不同的权限
    Channel:消息通道
    Exchange:交换机,生产者将消息发送至交换机
    Queue:队列,用于存储消息
    Binding:绑定关系,绑定交换机与队列的关系

    3.各个组件的创建

    spring-boot-autoconfiguer中关于amqp的自动配置

    3.1.关于我们自定义组件的初始化

    ①Exchange的初始

                        public class TopicExchange extends AbstractExchange {
                            //交换机名称,父类AbstractExchange中的属性
                            private final String name;
                            //是否持久化(默认true,重启服务后依然存在),父类AbstractExchange中的属性
                            private final boolean durable;
                            //是否自动删除(默认false,长时间不用自动删除),父类AbstractExchange中的属性
                            private final boolean autoDelete;
                            //参数
                            private final Map<String, Object> arguments;
                            //是否延迟类型,true 发送消息的时候需要额外添加header().
                            //注意可能会报异常( unknown exchange type 'x-delayed-message')异常处理方法
                            private volatile boolean delayed;
                            //是否内部使用,若内部使用则客户端不能发送消息
                            private boolean internal;
                            public TopicExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
                                super(name, durable, autoDelete, arguments);
                            }
    
                            @Override
                            public final String getType() {
                                return ExchangeTypes.TOPIC;
                            }
                        }

    ②Queue的初始

                        public class Queue extends AbstractDeclarable {
                            //队列名称
                            private final String name;
                            //是否持久化
                            private final boolean durable;
                            //是否声明该队列是否为连接独占,若为独占,连接关闭后队列即被删除
                            private final boolean exclusive;
                            //是否自动删除,若没有消费者订阅该队列,队列将被删除
                            private final boolean autoDelete;
                            //参数,可以指定队列长度,消息生存时间等队列的设置
                            private final java.util.Map<java.lang.String, java.lang.Object> arguments;
                            public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) {
                            Assert.notNull(name, "'name' cannot be null");
                            this.name = name;
                            this.durable = durable;
                            this.exclusive = exclusive;
                            this.autoDelete = autoDelete;
                            this.arguments = arguments;
                            }
                        }

    ③Binding的初始

                        public class Binding extends AbstractDeclarable {
                            //绑定至队列或交换机
                            public enum DestinationType {
                                QUEUE, EXCHANGE;
                            }
                            //队列或交换机名称
                            private final String destination;
                            //交换机名称
                            private final String exchange;
                            //绑定的路由
                            private final String routingKey;
                            //参数
                            private final Map<String, Object> arguments;
                            //绑定至队列或交换机
                            private final DestinationType destinationType;
    
                            public Binding(String destination, DestinationType destinationType, String exchange, String routingKey,
                                    Map<String, Object> arguments) {
                                this.destination = destination;
                                this.destinationType = destinationType;
                                this.exchange = exchange;
                                this.routingKey = routingKey;
                                this.arguments = arguments;
                            }
                        }

     

    3.2.Connection的创建,CachingConnectionFactory定义相关属性及缓存连接

                    @Configuration
                    @ConditionalOnMissingBean(ConnectionFactory.class)
                    protected static class RabbitConnectionFactoryCreator {
                        @Bean
                        public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config)
                                throws Exception {
                            RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();
                            if (config.determineHost() != null) {
                                factory.setHost(config.determineHost());
                            }
                            factory.setPort(config.determinePort());
                            if (config.determineUsername() != null) {
                                factory.setUsername(config.determineUsername());
                            }
                            if (config.determinePassword() != null) {
                                factory.setPassword(config.determinePassword());
                            }
                            if (config.determineVirtualHost() != null) {
                                factory.setVirtualHost(config.determineVirtualHost());
                            }
                            if (config.getRequestedHeartbeat() != null) {
                                factory.setRequestedHeartbeat(config.getRequestedHeartbeat());
                            }
                            RabbitProperties.Ssl ssl = config.getSsl();
                            if (ssl.isEnabled()) {
                                factory.setUseSSL(true);
                                if (ssl.getAlgorithm() != null) {
                                    factory.setSslAlgorithm(ssl.getAlgorithm());
                                }
                                factory.setKeyStore(ssl.getKeyStore());
                                factory.setKeyStorePassphrase(ssl.getKeyStorePassword());
                                factory.setTrustStore(ssl.getTrustStore());
                                factory.setTrustStorePassphrase(ssl.getTrustStorePassword());
                            }
                            if (config.getConnectionTimeout() != null) {
                                factory.setConnectionTimeout(config.getConnectionTimeout());
                            }
                            factory.afterPropertiesSet();
                            CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
                                    factory.getObject());
                            connectionFactory.setAddresses(config.determineAddresses());
                            connectionFactory.setPublisherConfirms(config.isPublisherConfirms());
                            connectionFactory.setPublisherReturns(config.isPublisherReturns());
                            //缓存通道数量,若未配置则默认为25
                            if (config.getCache().getChannel().getSize() != null) {
                                connectionFactory
                                        .setChannelCacheSize(config.getCache().getChannel().getSize());
                            }
                            //缓存模式,分为两种,1.缓存连接即connection模式,2.缓存通道即channel模式,未配置的默认模式。
                            //connection模式缓存多个Connection,可以配置缓存连接大小,channel模式只有一个connection,缓存多个channel,可以配置
                            if (config.getCache().getConnection().getMode() != null) {
                                connectionFactory
                                        .setCacheMode(config.getCache().getConnection().getMode());
                            }
                            //连接数,默认一个
                            if (config.getCache().getConnection().getSize() != null) {
                                connectionFactory.setConnectionCacheSize(
                                        config.getCache().getConnection().getSize());
                            }
                            //设置获取通道时(缓存的通道都被使用了)等待的毫秒数,默认为0,为0时创建新的通道
                            if (config.getCache().getChannel().getCheckoutTimeout() != null) {
                                connectionFactory.setChannelCheckoutTimeout(
                                        config.getCache().getChannel().getCheckoutTimeout());
                            }
                            return connectionFactory;
                        }
                    }

    3.3.RabbitTemplate的创建,用于发送及接受消息,当我们自己需要发送消息及接收消息时可以注入此对象

                    @Bean
                    @ConditionalOnSingleCandidate(ConnectionFactory.class)
                    @ConditionalOnMissingBean(RabbitTemplate.class)
                    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
                        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
                        MessageConverter messageConverter = this.messageConverter.getIfUnique();
                        //设置消息转换器,可以自定义
                        if (messageConverter != null) {
                            rabbitTemplate.setMessageConverter(messageConverter);
                        }
                        rabbitTemplate.setMandatory(determineMandatoryFlag());
                        RabbitProperties.Template templateProperties = this.properties.getTemplate();
                        RabbitProperties.Retry retryProperties = templateProperties.getRetry();
                        //是否开启重试,默认false,可配置(spring-retry)
                        if (retryProperties.isEnabled()) {
                            rabbitTemplate.setRetryTemplate(createRetryTemplate(retryProperties));
                        }
                        //接收超时,默认0
                        if (templateProperties.getReceiveTimeout() != null) {
                            rabbitTemplate.setReceiveTimeout(templateProperties.getReceiveTimeout());
                        }
                        //回复超时,默认5000
                        if (templateProperties.getReplyTimeout() != null) {
                            rabbitTemplate.setReplyTimeout(templateProperties.getReplyTimeout());
                        }
                        return rabbitTemplate;
                    }

    3.4.RabbitAdmin的创建

    Spring 容器中获取 exchange、Bingding、routingkey 以及queue 的 @bean 声明,然后使用 rabbitTemplate 的 execute 方法进行执行对应的声明、修改、删除等一系列 rabbitMQ 基础功能操作。例如添加交换机、删除一个绑定、清空一个队列里的消息等等

                    @Bean
                    @ConditionalOnSingleCandidate(ConnectionFactory.class)
                    @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true)
                    @ConditionalOnMissingBean(AmqpAdmin.class)
                    public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) {
                        return new RabbitAdmin(connectionFactory);
                    }

    其实现了诸多借口RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware,InitializingBean
    其afterPropertiesSet方法就是在 我们的 bean 加载后进行一些设置,其主要方法为其中的initialize方法

                    public void afterPropertiesSet() {
                        synchronized (this.lifecycleMonitor) {
                            //........略...........
                                        initialize();
                                //.......略......................
                    }
                    public void initialize() {
                        //applicationContext为空直接返回
                        if (this.applicationContext == null) {
                            this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings");
                            return;
                        }
    
                        this.logger.debug("Initializing declarations");
                        //从spring容器中获取我们定义的exchange,queue,binding对象
                        Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
                                this.applicationContext.getBeansOfType(Exchange.class).values());
                        Collection<Queue> contextQueues = new LinkedList<Queue>(
                                this.applicationContext.getBeansOfType(Queue.class).values());
                        Collection<Binding> contextBindings = new LinkedList<Binding>(
                                this.applicationContext.getBeansOfType(Binding.class).values());
    
                        @SuppressWarnings("rawtypes")
                        Collection<Collection> collections = this.applicationContext.getBeansOfType(Collection.class, false, false)
                                .values();
                        for (Collection<?> collection : collections) {
                            if (collection.size() > 0 && collection.iterator().next() instanceof Declarable) {
                                for (Object declarable : collection) {
                                    if (declarable instanceof Exchange) {
                                        contextExchanges.add((Exchange) declarable);
                                    }
                                    else if (declarable instanceof Queue) {
                                        contextQueues.add((Queue) declarable);
                                    }
                                    else if (declarable instanceof Binding) {
                                        contextBindings.add((Binding) declarable);
                                    }
                                }
                            }
                        }
                        //过滤三组件
                        final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
                        final Collection<Queue> queues = filterDeclarables(contextQueues);
                        final Collection<Binding> bindings = filterDeclarables(contextBindings);
                        //Exchange,Queue为非持久化,自动删除则打印日志
                        for (Exchange exchange : exchanges) {
                            if ((!exchange.isDurable() || exchange.isAutoDelete())  && this.logger.isInfoEnabled()) {
                                this.logger.info("Auto-declaring a non-durable or auto-delete Exchange ("
                                        + exchange.getName()
                                        + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". "
                                        + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and "
                                        + "reopening the connection.");
                            }
                        }
    
                        for (Queue queue : queues) {
                            if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) {
                                this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue ("
                                        + queue.getName()
                                        + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:"
                                        + queue.isExclusive() + ". "
                                        + "It will be redeclared if the broker stops and is restarted while the connection factory is "
                                        + "alive, but all messages will be lost.");
                            }
                        }
                        //若三组件都没有直接返回
                        if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) {
                            this.logger.debug("Nothing to declare");
                            return;
                        }
                        //使用rabbitTemplate连接至服务端创建
                        this.rabbitTemplate.execute(new ChannelCallback<Object>() {
                            @Override
                            public Object doInRabbit(Channel channel) throws Exception {
                                declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
                                declareQueues(channel, queues.toArray(new Queue[queues.size()]));
                                declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
                                return null;
                            }
                        });
                        this.logger.debug("Declarations finished");
                    }

    4.发送消息及接收过程

    4.1.rabbitTemplate.send方法
                    public void send(final String exchange, final String routingKey,
                            final Message message, final CorrelationData correlationData)
                            throws AmqpException {
                        execute(new ChannelCallback<Object>() {
    
                            @Override
                            public Object doInRabbit(Channel channel) throws Exception {
                                //此方法中调用channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
                                doSend(channel, exchange, routingKey, message, RabbitTemplate.this.returnCallback != null
                                        && RabbitTemplate.this.mandatoryExpression.getValue(
                                                RabbitTemplate.this.evaluationContext, message, Boolean.class),
                                        correlationData);
                                return null;
                            }
                        }, obtainTargetConnectionFactory(this.sendConnectionFactorySelectorExpression, message));
                    }
                    protected void doSend(Channel channel, String exchange, String routingKey, Message message,
                            //。。。。。。。。略
                        BasicProperties convertedMessageProperties = this.messagePropertiesConverter
                                .fromMessageProperties(messageProperties, this.encoding);
                        channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, messageToUse.getBody());
                        //。。。。。。。。略
                    }

    4.2.CachingConnectionFactory.CachedChannelInvocationHandler.invoke()。动态代理

                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        String methodName = method.getName();
                        //if methodName为其他.进行操作后 return。省略。。
                        try {
                            if (this.target == null || !this.target.isOpen()) {
                                if (this.target instanceof PublisherCallbackChannel) {
                                    this.target.close();
                                    throw new InvocationTargetException(new AmqpException("PublisherCallbackChannel is closed"));
                                }
                                else if (this.txStarted) {
                                    this.txStarted = false;
                                    throw new IllegalStateException("Channel closed during transaction");
                                }
                                this.target = null;
                            }
                            synchronized (this.targetMonitor) {
                                if (this.target == null) {
                                    this.target = createBareChannel(this.theConnection, this.transactional);
                                }
                                Object result = method.invoke(this.target, args);
                                if (this.transactional) {
                                    if (txStarts.contains(methodName)) {
                                        this.txStarted = true;
                                    }
                                    else if (txEnds.contains(methodName)) {
                                        this.txStarted = false;
                                    }
                                }
                                return result;
                            }
                        }
                        //异常处理,省略。。
                    }
  • 相关阅读:
    windows编程学习笔记
    自学JAVA-12:MySQL数据库
    自学JAVA-11:IO流
    自学JAVA-10:集合
    自学JAVA-9:基本类常用方法整理
    自学JAVA-8:异常
    自学JAVA-7:多态
    自学JAVA-6:继承
    自学JAVA-5:修饰符、对象初始化
    自学JAVA-4:方法、对象、类、属性
  • 原文地址:https://www.cnblogs.com/lantuanqing/p/11289060.html
Copyright © 2011-2022 走看看