zoukankan      html  css  js  c++  java
  • 分布式消息通信之RabbitMQ_02

    1. 可靠性投递分析

    RabbitMQ工作模型
     在某些业务实时一致性要求较高的场景,需要确保消息投递的可靠性,可以从RabbitMQ的工作模型的4个主要流程来做处理;并且效率和可靠性不可能同时兼顾,如果要保证每个环节都成功,对消息的收发效率肯定会有影响。
      1. 生产者将消息投递至交换机
      2. 交换机根据消息的路由关键字和队列的绑定关键字匹配,将消息路由到匹配的队列
      3. 队列将消息存储在内存或者磁盘当中
      4. 消费者从队列取走消息
      5. 其他

    1.1 消息投递

     有两种方法可以监听生成着投递消息是否成功事物transaction模式确认confirm模式;
     事物模式(不推荐使用):可以使用com.rabbitmq.client.Channel#txSelect来开启事物,当消息由生产者投递到RabbitMQ服务器成功后,事物会提交;不过事物模式会严重影响RabbitMQ特性,一般不建议使用。

            ConnectionFactory factory = new ConnectionFactory();
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            try {
    
                // 开启事物
                channel.txSelect();
                System.out.println("send msg..");
                channel.basicPublish("", SimpleConsumer.QUEUE_NAME, null,
                        "Hello World".getBytes());
                channel.txCommit();
                System.out.println("消息发送成功!");
            } catch (Exception e) {
                // 回滚事务
                System.out.println("消息发送失败, rollback");
                channel.txRollback();
            }
    

     确认Confirm模式又分为批量确认,异步确认

    Normal: 
            channel.confirmSelect();
    
            channel.basicPublish("", SimpleConsumer.QUEUE_NAME, null,
                    "hello world confirm msg!".getBytes());
    
            if (channel.waitForConfirms()) {
    
                System.out.println("消息投递成功!");
    
            }
    
    batch: 
           try {
                channel.confirmSelect();
                for (int i = 0; i < 5; i++) {
                    // 发送消息
                    // String exchange, String routingKey, BasicProperties props, byte[] body
                    channel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());
                }
                // 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了
                // 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)
                // 直到所有信息都发布,只要有一个未被Broker确认就会IOException
                channel.waitForConfirmsOrDie();
                System.out.println("消息发送完毕,批量确认成功");
            } catch (Exception e) {
                // 发生异常,可能需要对所有消息进行重发
                e.printStackTrace();
            }
    

     Async

           ConnectionFactory factory = new ConnectionFactory();
            factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));
    
            // 建立连接
            Connection conn = factory.newConnection();
            // 创建消息通道
            Channel channel = conn.createChannel();
    
            String msg = "Hello world, Rabbit MQ, Async Confirm";
            // 声明队列(默认交换机AMQP default,Direct)
            // String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
            channel.queueDeclare(SimpleConsumer.QUEUE_NAME, false, false, true, null);
    
            // 用来维护未确认消息的deliveryTag
            final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
    
            // 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条
            // 异步监听确认和未确认的消息
            // 如果要重复运行,先停掉之前的生产者,清空队列
            channel.addConfirmListener(new ConfirmListener() {
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Broker未确认消息,标识:" + deliveryTag);
                    if (multiple) {
                        // headSet表示后面参数之前的所有元素,全部删除
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        confirmSet.remove(deliveryTag);
                    }
                    // 这里添加重发的方法
                }
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    // 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认
                    System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
                    System.out.println("multiple:"+multiple);
                    if (multiple) {
                        System.out.println("deliveryTag:"+deliveryTag);
                        // headSet表示后面参数之前的所有元素,全部删除
                        confirmSet.headSet(deliveryTag + 1L).clear();
                    } else {
                        // 只移除一个元素
                        confirmSet.remove(deliveryTag);
                    }
                    System.out.println("未确认的消息:"+confirmSet);
                }
            });
    
            // 开启发送方确认模式
            channel.confirmSelect();
            for (int i = 0; i < 10; i++) {
                long nextSeqNo = channel.getNextPublishSeqNo();
                // 发送消息
                // String exchange, String routingKey, BasicProperties props, byte[] body
                channel.basicPublish("", SimpleConsumer.QUEUE_NAME, null, (msg +"-"+ i).getBytes());
                confirmSet.add(nextSeqNo);
            }
            System.out.println("所有消息:"+confirmSet);
    
            // 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK
            //channel.close();
            //conn.close();
    

    1.2 消息路由

     当消息投递到Exchange后,会根据消息的路由关键字来匹配路由到的队列,当关键字没有匹配到队列或者队列不存在或者路由关键字错误时,消息就会丢失;为了保证消息可以被正确的路由,可以使用两种方式:生产者添加ReturnListener或者创建队列时指定备份交换机 alternate-exchange

    
    public class SimpleConsumer {
    
        public static final String EXCHANGE_NAME = "Simple_Reliability_Exchange";
        public static final String QUEUE_NAME = "Simple_Reliability_Queue";
    
    }
    - - - - - - - - 
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;
    
    public class Rmq03ReturnListener {
    
        private static final String BACKUP_EXCHANGE = "MY_alternate_exchange";
        private static final String BACKUP_QUEUE = "MY_alternate_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
    //        factory.setPort(15672);
            factory.setUsername("guest");
            factory.setPassword("guest");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            // 设置交换机无法路由的被返回的消息的监听器
            channel.addReturnListener(new ReturnListener() {
                @Override
                public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    
                    System.out.println("ReturnListener 接收到无法路由被退回的消息 " + new Date());
                    System.out.println(String.format("replyCode[%s] replyText[%s] exchange[%s] routingKey[%s] properties[%s] msg[%s]",
                            replyCode, replyText, exchange, routingKey, properties, new String(body)));
    
                }
            });
    
            // 初始化核心交换机和队列
            Map<String, Object> exchangeArgs = new HashMap();
            exchangeArgs.put("alternate-exchange", BACKUP_EXCHANGE);    // 指定交换机的备份交换机,接收无法路由的消息
            channel.exchangeDeclare(SimpleConsumer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false, true, exchangeArgs);
            channel.queueDeclare(SimpleConsumer.QUEUE_NAME, false, false, true, null);
            channel.queueBind(SimpleConsumer.QUEUE_NAME, SimpleConsumer.EXCHANGE_NAME, "#.fast.#");
    
            // 初始化备份交换机和队列
            initBackup(channel);
    
            // 发送消息
            String msg = "Hello World, test  msg can not rout ";
            // 在发送消息是,可以设置mandatory参数未true,这样当消息在交换器上无法被路由时,服务器将消息返回给生产者,生产者实现回调函数处理被服务端返回的消息。
            Boolean mandatory = true;
            channel.basicPublish(SimpleConsumer.EXCHANGE_NAME, "v.fast", mandatory,null, msg.getBytes());
            channel.basicPublish(SimpleConsumer.EXCHANGE_NAME, "v.slow", mandatory,null, msg.getBytes());
    
    //        channel.close();
    //        connection.close();
    
    
        }
    
        private static void initBackup(Channel channel) throws IOException {
    
            channel.queueDeclare(BACKUP_QUEUE, false, false, true, null);
            channel.exchangeDeclare(BACKUP_EXCHANGE, BuiltinExchangeType.FANOUT);
            channel.queueBind(BACKUP_QUEUE, BACKUP_EXCHANGE, "");
    
            DeliverCallback deliverCallback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
    
                    System.out.println("备份交换机 alternate-exchange 接收到无法路由的消息 " + new String(delivery.getBody()) );
                    // 注释回执
                    // channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
    
                }
            };
            channel.basicConsume(BACKUP_QUEUE, false, deliverCallback, consumerTag -> {});
    
        }
    
    }
    

    1.3 消息存储

     当消息路由至匹配队列,也就是步骤3时,如果消息没有被消费而RabbitMQ服务系统宕机或者重启了,会导致消息丢失,因此可以使用消息的持久化配置。
    交换机持久化 消费者在创建交换机时指定存储在磁盘中,系统重启后交换机不被删除

      Consumer
      
            // 交换机持久化
            // String exchange, String type,
            // boolean durable (交换机持久化),
            // boolean autoDelete(自动删除,为true时当前连接中断且交换机绑定队列没有消息时,会自动删除),
            // Map<String, Object> arguments
            channel.exchangeDeclare(EXCHANGE_PERSISTENCE, BuiltinExchangeType.TOPIC, true, false, null);  
    
    

    队列持久化 费者在创建队列时指定存储在磁盘中,系统重启后队列不被删除

       consumer
    
       // 队列持久化
            // String queue,
            // boolean durable,  队列持久化参数,
            // boolean exclusive,  排他队列参数
            // boolean autoDelete,  是否自动删除,为true时当前连接中断且交换机绑定队列没有消息时,会自动删除
            //        Map<String, Object> arguments
            channel.queueDeclare(QUEUE_PERSISTENCE, true, false, false, null);
    
    

    消息持久化 生成者在发送消息时指定消息类型为持久化

      producer
    
           AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .build();
            channel.basicPublish(EXCHANGE_PERSISTENCE, "bird.fly.fast", properties, new String("just text persistence msg").getBytes());
    

    集群 镜像

    1.4 消息消费

     消费者在从队列中取走消息进行消费时,如果逻辑处理中出现异常或者服务宕机消息就会丢失,可以在消息时使用手动发送回执的方式进行操作,当消息真正处理完毕后发送回执信息。
    producer 发送消息

       for (int i = 0; i < 10; i++) {
                String msg = i%2 == 0 ? "This is a success msg" : " This is a error msg";
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
    

    consumer 消费消息

           // 消息消费 autoAck 为false
    
            //  DeliverCallback 内进行手工应答
    
            /*
             * 1. 成功
             * com.rabbitmq.client.Channel#basicAck(long, boolean)
             *
             * 2. 拒绝,
             *  拒绝可以分为单条拒绝 和 批量拒绝
             *  拒绝的消息可以设置重新入队,不重新入队的则进入死信交换机,死信队列
             * com.rabbitmq.client.Channel#basicReject(long, boolean)
             * com.rabbitmq.client.Channel#basicNack(long, boolean, boolean)
             *
             */
     DeliverCallback callback = new DeliverCallback() {
                @Override
                public void handle(String consumerTag, Delivery delivery) throws IOException {
    
                    String msg = new String(delivery.getBody());
                    System.out.println(String.format("消费者接收到消息 msg[%s] at %s", msg, new Date()));
    
                    if (msg.contains("error")) {
                        // reject one
                        channel.basicReject(delivery.getEnvelope().getDeliveryTag(), true);
                        // reject one or more
    //                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), true, true);
                    } else {
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), true);
                    }
    
    
                }
            };
            channel.basicConsume(QUEUE_NAME, false, callback, consumerTag -> {});
    
    

    1.5 其他

    消费者回调补偿机制消息幂等性消息顺序性
    消费者回调
     当跨系统异步通信时,消费者从队列中取走消费并消费成功后,生产者其实并不知道自己发出的消息已经被处理掉了,所以 a) 生产者可以在发送消息时额外添加一个回调API接口,当消费者消费完消息时,调用消息的回调API通知生产者;或者b)消费者在消费完消息后,再发送一条消费成功的消息给生产者; 这样生产者就可以获知了。
    补偿机制
     当生产者发送的消息一定时间内没有响应时,可以设置一个定时重发的机制,不过必须要规定重发次数如5,避免消息堆积。
    消息幂等性
     消息的补偿重发机制会发送多次一样的消息给消费者,造成重复消费,可以为消息设置唯一id,将消费者处理完的消息记录下来,入库,每次接收到消息根据库中id判断消息是否已经处理过,如果处理过了就忽略这条消息。
    消息顺序性
     当一个队列被多个消费者消费时,无法保证消费的顺序性。一个队列只有一个消费者时,顺序性可以得到保障。

    2. 高可用架构部署方案

     避免单点故障造成数据丢失,可以启用集群来达到高可用和负载均衡。

    2.1 集群

     集群部署与运维...
    通信基础
     erlang.cookie,hosts
    集群节点
      分为磁盘节点和内存节点两种,磁盘节点数据会保存在磁盘中,较安全;内存节点数据保存于内存中,访问效率较快;集群中最少有一个是磁盘节点,用于保存数据。
    配置步骤
      a) 配置hosts b) 配置erlang.cookie c) 加入集群

    2.2 镜像

    3. 经验总结

    3.1 配置文件与命名规范

     a)集中放在配置文件中 b) 体现数据类型 c) 体现数据来源和去向
    lei.topicexchange=RISK_TO_PAY_EXCHANGE

    3.2 调用封装

     在原有基础上封装,减少调用复杂度

      @Autowired
        @Qualifier("amqpTemplate2")
        private AmqpTemplate amqpTemplate2;
    
        /**
         * 演示三种交换机的使用
         *
         * @param message
         */
        public void sendMessage(Object message) {
            logger.info("Send message:" + message);
    
            // amqpTemplate 默认交换机 MY_DIRECT_EXCHANGE
            // amqpTemplate2 默认交换机 MY_TOPIC_EXCHANGE
    
            // Exchange 为 direct 模式,直接指定routingKey
            amqpTemplate.convertAndSend("FirstKey", "[Direct,FirstKey] "+message);
            amqpTemplate.convertAndSend("SecondKey", "[Direct,SecondKey] "+message);
      }
    

    3.3 信息落库(可追溯,可重发) + 定时任务

     根据业务场景可以将消息存入库中,可以进行追溯,表中记录消息回执状态,定时轮训库表判断是否收到回执,没有收到可以重发。
     同时连接数据库表也会造成发送消息时效率的降低;

    3.4 减少连接数

     多条消息拼装到一起发送一次,总数据量不要超过4M

    3.5 生产者先发送消息还是先登记业务表

     先登记业务表;如果先发送消息,后续业务操作异常导致回滚,信息就会不一致。

    3.6 谁来创建对象(队列,交换机,绑定关系)

     由消费者创建 队列 交换机 绑定关系

    3.7 运维监控

    zabbix系列zabbix3.4监控rabbitmq

    3.8 其他插件

    Plugins

    C:Program FilesRabbitMQ Server
    abbitmq_server-3.7.14sbin>rabbitmq-plugins list
    Listing plugins with pattern ".*" ...
     Configured: E = explicitly enabled; e = implicitly enabled
     | Status: * = running on rabbit@DESKTOP-2NHH5NJ
     |/
    [  ] rabbitmq_amqp1_0                  3.7.14
    [  ] rabbitmq_auth_backend_cache       3.7.14
    [  ] rabbitmq_auth_backend_http        3.7.14
    [  ] rabbitmq_auth_backend_ldap        3.7.14
    [  ] rabbitmq_auth_mechanism_ssl       3.7.14
    [  ] rabbitmq_consistent_hash_exchange 3.7.14
    [  ] rabbitmq_event_exchange           3.7.14
    [  ] rabbitmq_federation               3.7.14
    [  ] rabbitmq_federation_management    3.7.14
    [  ] rabbitmq_jms_topic_exchange       3.7.14
    [E*] rabbitmq_management               3.7.14
    [e*] rabbitmq_management_agent         3.7.14
    [  ] rabbitmq_mqtt                     3.7.14
    [  ] rabbitmq_peer_discovery_aws       3.7.14
    [  ] rabbitmq_peer_discovery_common    3.7.14
    [  ] rabbitmq_peer_discovery_consul    3.7.14
    [  ] rabbitmq_peer_discovery_etcd      3.7.14
    [  ] rabbitmq_peer_discovery_k8s       3.7.14
    [  ] rabbitmq_random_exchange          3.7.14
    [  ] rabbitmq_recent_history_exchange  3.7.14
    [  ] rabbitmq_sharding                 3.7.14
    [  ] rabbitmq_shovel                   3.7.14
    [  ] rabbitmq_shovel_management        3.7.14
    [  ] rabbitmq_stomp                    3.7.14
    [  ] rabbitmq_top                      3.7.14
    [  ] rabbitmq_tracing                  3.7.14
    [  ] rabbitmq_trust_store              3.7.14
    [e*] rabbitmq_web_dispatch             3.7.14
    [  ] rabbitmq_web_mqtt                 3.7.14
    [  ] rabbitmq_web_mqtt_examples        3.7.14
    [  ] rabbitmq_web_stomp                3.7.14
    [  ] rabbitmq_web_stomp_examples       3.7.14
    
  • 相关阅读:
    20080531 Windows 下安装 Bugzilla
    20080823 windows + apache + mod_python 的安装
    20080519 在 Windows Server 2003 下安装 SQL Server 2000 提示“无法验证产品密钥”
    20080508 Borland CodeGear 卖了
    20080520 Javascript 随机数产生办法
    20090613 批量操作 Windows Live Mail 邮件的办法
    20080726 Castle项目创始人加入微软
    20080511 php send_mail()
    20080618 ASP.NET Ajax clientside framework failed to load
    20081105 Microsoft Word 2007 中鼠标操作失效的解决办法
  • 原文地址:https://www.cnblogs.com/Qkxh320/p/distributed_rabbitmq_02.html
Copyright © 2011-2022 走看看