zoukankan      html  css  js  c++  java
  • Rabbitmq的使用五_消息的可靠性投递

    Rabbitmq的使用五_消息的可靠性投递

    官网地址:https://www.rabbitmq.com/tutorials/tutorial-seven-java.html

    通过前面的学习,可以知道消息的可靠性投递,可以做持久化操作

      过程一:从生产者发送消息到RabbitMQ服务器的过程。

      过程二:确保消息从交换机路由到队列

      过程三:确保消息在队列中正确的存储

      过程四:确保消息从队列正确地投递到消费者

      消息本身也要做持久化操作。

    2.Rabbitmq的工作模型

    执行流程:生产者发送消息通过channel和rabbitmq建立连接,发送一条携带路由关键字的消息到交换机上面的时候。交换机根据消息携带关键字,去查哪一些队列绑定了这些关键字,然后把消息路由到这些队列上面,然后消费者从队列中取消息就可以了

    一个rabbitmq里面,可以建立多个不同的交换机和绑定队列,这时就用到我们的virtual host虚拟机。我们可以把每一个虚拟机当做一个rabbitmq的服务器。这样做解决了硬件资源的问题。因此,我们可以创建多个虚拟机,每一个虚拟机都可以看做是一个rabbitmq的服务器,可以根据此来创建很多的交换机和队列,以及定义之间的绑定关系,然后我们去创建一些用户,比如我们的资金系统,有资金系统的虚拟机,我们把资金系统的用户分配资金系统的权限就ok了。那么对应的其他系统,每一个系统都有自己的用户,自己的权限,那么不同的虚拟机之间是完全透明的。他们之间可以建立同名的交换机,通过这样的方式,我们可以实现硬件资源的高效利用和硬件资源的隔离。

    生产者或者消费者和rabbitmq之间的连接是一个长连接。如果我们直接连接,频繁的创建连接,就会造成性能问题,我们引入一个channel的通道。这是一些虚拟的连接。我们需要连接rabbitmq的话,直接从这些虚拟的连接中拿一个连接就可以使用了。

    exchange:本质是地址的清单,本身不存储消息。相当于一个路由功能

    引入交换机的作用:是为了达到消息的一个灵活的投递

    能够创建多少队列:这个主要取决于队列是保存在哪里的,如果队列是保存在内存里的,那么创建多少队列,取决于内存空间的大小,如果队列是保存在硬盘上的,取决于硬盘空间的大小

    如何实现消息的灵活投递:交换机和队列之间的关系是多对多的关系

    2.1如果过程一发生了异常怎么办?

    从生产者发送消息到rabbitmq服务器的过程中,失败了。RabbitMQ给我们提供了两种方式:1.AMQP事务  2.Confirm模式

    1.事务模式每一个事务都要等待消息的一个应答,所以事务模式极其消耗性能的一个东西。因为极其消耗性能,所以在生产中一般也不会使用,因为,如果每一条消息,都开启事务,太消耗性能了。

     事务模式的三个方法:channel.txSelect(); channel.txCommit(); channel.txRollback();

     代码案例:

    public class RabbitMqTransActionSender {
    
        // 事务队列
        private static final String TRANSTATION_QUEUE = "transaction_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.获取连接
            Connection connection = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.获取通道
            Channel channel = connection.createChannel();
            // 3.声明队列
            channel.queueDeclare(TRANSTATION_QUEUE, false, false, false, null);
            // 4.发送消息
            String msg = "小河流水哗啦啦,我和姐姐去采花1";
    
            // 将channel设置为事务模式
            try {
                channel.txSelect();
                channel.basicPublish("", TRANSTATION_QUEUE, null, msg.getBytes());
                int a = 10 / 0;
                channel.txCommit();
                System.out.println("消息已经提交");
            } catch (Exception e) {
                channel.txRollback();
                System.out.println("消息已经回滚");
            }
            channel.close();
            connection.close();
        }
    }
    View Code

    演示正常情况,发送者发送消息,消费者正常接收消息

    消息发送过程中,出现异常。消息回滚掉,消费者不会接收到任何消息

                  图4生产者                                              图5消费者

    2.确认模式

    当我们发送消息成功之后,会有一个ack应答,只要我们的channel.waitForConfirms返回一个true,表示我们的消息就是发送成功的。

    rabbitmq的消息确认,默认不启动了,需要开启

    Channel channel = connection.createChannel();
    channel.confirmSelect();

    单条发送消息

    当发送者发送消息报错时,消费者就不会受到消息

    public class RabbitMqConfirmSender {
    
        private static final String CONFIRM_QUEUE = "confirm_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            // 1.获取连接
            Connection connection = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.获取通道
            Channel channel = connection.createChannel();
            // 3.声明队列
            channel.queueDeclare(CONFIRM_QUEUE, false, false, false, null);
            // 4.发送消息
            String msg = "小河流水哗啦啦,我和姐姐去采花1";
            // 将channel设置为confirm模式
            channel.confirmSelect();
            channel.basicPublish("", CONFIRM_QUEUE, null, msg.getBytes());
            if (channel.waitForConfirms()) {
                System.out.println("消息发送成功");
            }
            channel.close();
            connection.close();
    
        }
    }
    View Code

    我们在开始时提到,代理异步确认已发布的消息,代码会同步等待,直到消息确认为止。客户端实际异步接收确认,并解除调用阻塞,可以看作是一个在内部依赖于异步通知的同步助手。

    单条消息的发送确认模式,效率也是很低的,每一条消息,先开启消息确认机制,然后发送,然后处理消息应答,效率太低。

    批量发送消息

    public class RabbitMqConfirmBatchSender {
    
        private static final String BATCH_CONFIRM_QUEUE = "batch_confirm_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            // 1.获取连接
            Connection connection = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.获取通道
            Channel channel = connection.createChannel();
            // 3.声明队列
            channel.queueDeclare(BATCH_CONFIRM_QUEUE, false, false, false, null);
            // 4.发送消息
    
            // 将channel设置为confirm模式
            channel.confirmSelect();
            String msg = "小河流水哗啦啦,我和姐姐去采花";
            for (int i = 1; i <= 100; i++) {
    //            int a = 19/0;
                channel.basicPublish("", BATCH_CONFIRM_QUEUE, null, (msg + i).getBytes());
            }
            if (channel.waitForConfirms()) {
                System.out.println("消息发送成功");
            }
            channel.close();
            connection.close();
    
        }
    }
    View Code

    当批量发送异常的时候,消费者不会收到任何消息,如下图所示:

    这种批量发送消息的方式,只要有一条消息未被broker确认,就会发生异常,也就是说当我们channel.waitFroConfirms;只要不抛出异常,就可以认为我们的消息发送成功了

    但是:这种方式也有缺点:
    1.我们是积累多少条消息进行消息的发送
    2.我们假设是1000条发送消息一次,如果前999条发送失败,刚好第1000条发送失败了,怎么办

    Rabbitmq官网还给我们提供了一种方式,是采用异步的方式,进行消息的收发,使用异步的方式,是可以进行一边发送,一边确认的方式,进行消息的收发的,异步的情况,消息不会自动重发的

    Broker异步确认:只需要在发送者客户端注册一个异步回调,就可以接收到确认消息

    代码如下:

    public class RabbitMqAsyncConfirmSender {
    
        private static final String ASYNC_CONFIRM_QUEUE = "async_confirm_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            // 1.获取连接
            Connection connection = RabbitMQConnectionFactory.getRabbitMQConnections();
            // 2.获取通道
            Channel channel = connection.createChannel();
            // 3.声明队列
            channel.queueDeclare(ASYNC_CONFIRM_QUEUE, false, false, false, null);
            // 4.发送消息
            String msg = "小河流水哗啦啦,我和姐姐去采花";
            // 将channel设置为confirm模式
            channel.confirmSelect();
            // 发送消息
            for (int i = 1; i <= 20; i++) {
    //            if (i==17){
    //                int a = 10/0;
    //            }
                channel.basicPublish("", ASYNC_CONFIRM_QUEUE, null, (msg + "======>" + i).getBytes());
            }
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.print("已确认的消息,标识:" + deliveryTag);
                    System.out.println("多个消息: " + multiple);
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Broker 未确认消息,标识: " + deliveryTag);
    
                }
            });
            System.out.println("程序执行完成");
        }
    }
    View Code

     执行结果如下:

    在某些应用程序中,确保已发布的消息到达broker可能非常重要。发布者确认是RabbitMQ的一个特性,有助于满足这一需求。发布者确认在本质上是异步的,但是也可以同步地处理它们。

      1.单条发布消息,同步等待确认:简单,但吞吐量非常有限。
      2.批量发布消息,同步等待批处理的确认:简单、合理的吞吐量,但是很难判断什么时候出现了错误
      3.异步处理:最佳的性能和资源的使用,在错误的情况下良好的控制。

    2.2 过程二发送失败了怎么办?

      就是交换机路由消息到队列的过程中发送失败了。

      解决办法:可以给当前交换机设置备份交换机

       交换机本身也是支持持久化操作的。在声明交换机的时候,参数三表示是否持久化

        /**
         * Actively declare a non-autodelete exchange with no extra arguments
         * @see com.rabbitmq.client.AMQP.Exchange.Declare
         * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
         * @param exchange the name of the exchange
         * @param type the exchange type
         * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
         * @throws java.io.IOException if an error is encountered
         * @return a declaration-confirm method to indicate the exchange was successfully declared
         */
        Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;

    2.3 过程三失败了怎么办?

    当我们没有对消息进行配置的时候,默认是保存在内存中的,消息保存在内存中,不可避免的会出现rabbitmq服务器的重启,宕机等问题,所以,可以对消息做一个消息的持久化的处理。

    发送消息的时候,可以给消息设置一个properties的属性,通过配置properties可以配置消息的一个持久化操作。

    队列可以做持久化操作、消息也可以做持久化操作。

     2.4过程四失败了怎么办?

      确保消息从队列正确地投递到消费者ack应答

      消费者接收到消息的时候,就会给broker一个应答,broker拿到应答之后,就会从队列中删除这条消息。而不是在方法执行完之后,再给服务器的应答。我们可以手动的执行一个ack应答机制。

     2.5 如果消费者处理消息的时候,抛出异常了,生产者怎么知道?

    1.消费者回调 

      1) 发送回执(表示每发送一条消息,都给生产者一条消息回执)
      2)生产者提供api(生产者发送消息的时候,保存一条消息入库),消费者调用暴露的api,去修改这条消息的状态。如果我们消息的状态没有发生变更的话,那么我就可以判断有可能消费者在处理消息的时候,发生了问题

    2.补偿机制 

      如果没有发送回执:可能是因为网络问题
      如果没有调用api:可能消费者调用生产者的过程中出现了问题
      在以上的情况都没有使用的情况下面,我们就需要使用一个补偿机制,比如
      1)消息的重发:
        重发的前提:我们发送前需要把数据保存到数据库中,然后重发的时候,直接从数据库中进行消息的获取,然后重新进行一个消息的发送。但是如果发送5次,10次,如果消费者一直没有应答。。。就会一直重发。所以这里一定要做一个次数的控制,等达到一定的次数之后,我们就不进行重发操作了,我们会在夜间进行一个对账的操作。
    可能发送消息的时候,就是消费成功了,但是由于网络原因,回执执行慢了,

    3.消息的幂等性

      1)处理1次消息,跟处理10 次消息的结果都是一样的
      2) 消息必须有一个唯一性的标志:在金融系统中,任何一笔交易,都会有一个全局流水号的标志, message发送消息的时候,有一个消息的id,消息id+业务id唯一判断 (重帐控制)消费会不会重复消费

    3.如果保证消息的顺序消费

      多个生产者和多个消费者的情况,基本是没有办法实现一个消息的顺序消费的,比如发送了3条消息,消费者的消费速率是不一样,所以我们无法保证哪一个消息是先消费完成的。所以我们完成消费的顺序性,如果我们只有一个生产者,和一个消费者,根据队列先进先出的思想,我们是可以保证消息的一个顺序消费的。对于每一组消息,我们都有一个parentID(批次号),也有一个seqNo,如果上一个批次的消息没有消费完,就不能消费下一个消息的

    int sequenceNumber = channel.getNextPublishSeqNo());
    ch.basicPublish(exchange, queue, properties, body);

    当然如果要保证消息的一个顺序性的消费,我们可以使用rocketmq的顺序消费,他本身就是支持的

    4.springboot整合rabbitmq实现事务模式

    在SpringBoot项目中,使用RabbitMQ事务,只需要声明一个事务管理的Bean,并将RabbitTemplate的事务设置为true即可

    消费者方

    1.消费者添加配置文件

    server:
      port: 8010
    spring:
      rabbitmq:
        host: ip地址
        username: yingxiaocao
        password: yingxiaocao
        virtual-host: /yingxiaocao
        listener:
          type: simple
          simple:  # 开启手动应答
            acknowledge-mode: manual
    配置mq地址

    2.声明队列、交换、绑定关系、事务bean(消费者和生产者都要配置。否则当生产者启动,消费者未启动时,发送消息,就会报错)

    @PropertySource("classpath:transaction_mq.properties")
    @Configuration
    public class ConsumerConfig {
    
        @Value("${TRANSACTION_EXCHANGE_NAME}")
        private String exchangeName;
    
        @Value("${FIRST_QUEUE}")
        private String firstQueue;
    
        // 1.声明一个交换机
        @Bean("fanout_exchange")
        public FanoutExchange getFanoutExchange() {
            FanoutExchange fanoutExchange = new FanoutExchange(exchangeName);
            return fanoutExchange;
        }
    
        // 2.声明2个队列
        @Bean("first_queue")
        public Queue getFirstQueue() {
            Queue queue = new Queue(firstQueue);
            return queue;
        }
        // 3.绑定关系
        @Bean
        public Binding bindingExchange(@Qualifier("fanout_exchange") FanoutExchange fanoutExchange, @Qualifier("first_queue") Queue queue) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    
        @Bean
        public RabbitTransactionManager     rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
            return new RabbitTransactionManager(cachingConnectionFactory);
        }
    声明队列、交换机

    3.创建一个消费者

    @Component
    @PropertySource("classpath:transaction_mq.properties")
    public class RabbitmqConsumer {
    
        @RabbitListener(queues = {"${FIRST_QUEUE}"})
        public void receive(Message message, Channel channel) throws IOException {
            String msg = new String(message.getBody());
            System.out.println("接收到的消息: " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    消费者

    4.创建一个启动类

    @SpringBootApplication
    public class TransactionConsumerStartApp {
    
        public static void main(String[] args) {
            SpringApplication.run(TransactionConsumerStartApp.class, args);
        }
    }
    消费者启动类

    生产者方

    1.创建一个controller

    @RequestMapping("/producer")
    @RestController
    public class ProducerController {
    
        @Autowired
        private ProducerService producerService;
    
        @RequestMapping("/send")
        public void send(String msg) {
            producerService.send(msg);
        }
    }
    controller

    2.创建一个service,用来发送消息

    @PropertySource("classpath:transaction_mq.properties")
    @Service
    public class ProducerService {
    
        @Value("${TRANSACTION_EXCHANGE_NAME}")
        private String exchangeName;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            // 创建对象的同时,开启channel事务模式
            rabbitTemplate.setChannelTransacted(true);
        }
    
        /**
         * 发送消息
         * @param msg
         */
        @Transactional
        public void send(String msg) {
    
            rabbitTemplate.convertAndSend(exchangeName,"",msg);
            System.out.println("消息已发送: "+msg);
            if (msg.equals("xxx")) {
                throw new RuntimeException("抛出异常了");
            }
        }
    
    
        /**
         * 配置启用rabbitmq事务
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
        }
    }
    service发送消息

    3.创建一个启动类

    @SpringBootApplication
    public class TransactionProducerStartApp {
    
        public static void main(String[] args) {
            SpringApplication.run(TransactionProducerStartApp.class, args);
        }
    }
    View Code

    4.声明配置

    @PropertySource("classpath:transaction_mq.properties")
    @Component
    public class RabbitConfig {
    
        @Value("${TRANSACTION_EXCHANGE_NAME}")
        private String exchangeName;
    
        @Value("${FIRST_QUEUE}")
        private String firstQueue;
    
        // 1.声明一个交换机
        @Bean("fanout_exchange")
        public FanoutExchange getFanoutExchange() {
            FanoutExchange fanoutExchange = new FanoutExchange(exchangeName);
            return fanoutExchange;
        }
    
        // 2.声明1个队列
        @Bean("first_queue")
        public Queue getFirstQueue() {
            Queue queue = new Queue(firstQueue);
            return queue;
        }
        // 3.绑定关系
        @Bean
        public Binding bindingExchange(@Qualifier("fanout_exchange") FanoutExchange fanoutExchange, @Qualifier("first_queue") Queue queue) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    
        @Bean
        public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory cachingConnectionFactory) {
            return new RabbitTransactionManager(cachingConnectionFactory);
        }
    }
    声明队列、交换机、绑定关系

    执行结果:

    正常情况,生产者发送消息,消费者收到消费

    异常情况:生产者发送消息后,抛出异常了,消费者并没有收到消息,可见消息回滚了

    但是事务模式效率太差。

    5.springboot整合rabbitmq实现消息确认模式

     通过生产者确认机制,生产者可以在消息被服务器成功接收时得到反馈,并有机会处理未被成功接收的消息。

    在Springboot中开启RabbitMQ的生产者确认模式也很简单,只多了一行配置:publisher-confirms: true 即表示开启生产者确认模式。

    server:
      port: 9010
    spring:
      rabbitmq:
        username: yingxiaocao
        password: yingxiaocao
        host: 地址
        virtual-host: /yingxiaocao
        listener:
          type: simple
        publisher-confirms: true  # 开启消息确认模式
    View Code

    改变生产者代码

    @PropertySource("classpath:confirm_mq.properties")
    @Service
    @Slf4j
    public class ProducerService implements RabbitTemplate.ConfirmCallback {
    
        @Value("${CONFRIM_EXCHANGE_NAME}")
        private String exchangeName;
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            // 创建对象的同时,开启channel事务模式
            // rabbitTemplate.setChannelTransacted(true);
            // 开启确认模式
            rabbitTemplate.setConfirmCallback(this);
        }
    
        /**
         * 发送消息
         *
         * @param msg
         */
        public void send(String msg) {
            // 创建一个消息编号
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchangeName, "aaa", msg, correlationData);
            log.info("消息id{},路由key{},发送消息内容{}", correlationData.getId(), "aaa", msg);
    
            // 创建一个无法投递成功的消息
            CorrelationData correlationData1 = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(exchangeName, "bbb", msg, correlationData1);
            log.info("消息id{},路由key{},发送消息内容{}", correlationData1.getId(), "bbb", msg);
        }
    
        /**
         * 消息回调
         *
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            String id = correlationData != null ? correlationData.getId() : "";
            if (ack) {
                log.info("消息投递成功,消息id{}", id);
            } else {
                log.info("消息投递失败,消息id{},原因:{}", id, cause);
            }
        }
    }
    View Code

    执行代码结果如下:

                          生产者截图

    消费者截图。

    由运行结果可知。消息只要发送到了交换机,不管消息有没有成功投递到队列里面,都会给生产者一个ack应答。

    如何让消息被路由到队列后再返回ACK呢?

    1.设置mandatory参数

    设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者。

    当把 mandotory 参数设置为 true 时,如果交换机无法将消息进行路由时,会将该消息返回给生产者,而如果该参数设置为false,如果发现消息无法进行路由,则直接丢弃。

    rabbitTemplate.setMandatory(true);

    如果要让消息返回给生产者,需要添加一个回调

    为了进行回调,我们需要实现一个接口 RabbitTemplate.ReturnCallback

    使用mandatory这种方式,如果消息发送失败,返回给生产者,通过看日志的方式,就比较麻烦了。可以使用备份交换机的方式。如果消息从交换机路由到队列失败,转发给备份交换机,由备份交换机绑定的队列,进行处理。就比较好了

    2.设置备份交换机

    创建交换机的时候,为交换机添加备份交换机代码,可为备份交换机添加不同的队列,实现不同的功能

    .withArgument("alternate-exchange",BUSINESS_BACKUP_EXCHANGE_NAME);
  • 相关阅读:
    Ext.grid.column.Column主要配置项及示例
    Ext.grid.Panel主要配置及示例
    EF Code First关系规则及配置
    ExtJS4系列目录
    EF Code First数据库连接配置
    ASP.NET MVC系列:ASP.NET MVC简介
    Ext JS下载及配置
    Ext.container.Viewport
    Ext.tab.Panel页签
    ASP.NET MVC系列:Controller/Action
  • 原文地址:https://www.cnblogs.com/yingxiaocao/p/13344698.html
Copyright © 2011-2022 走看看