zoukankan      html  css  js  c++  java
  • (转)RabbitMQ学习之消息可靠性及特性

    http://blog.csdn.net/zhu_tianwei/article/details/53971296

    下面主要从队列、消息发送、消息接收方面了解消息传递过的一些可靠性处理。 
    1、队列 
    消费者是无法订阅或者获取不存在的MessageQueue中信息。消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。 
    声明一个队列 
    channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments) 
    durable:声明队列持久化 
    exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。 
    autoDelete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。 
    其他选项,channel.queueDeclarePassive:例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。 
    2、发送消息 
    1.发送消息设置 
    channel.basicPublish(exchange, routingKey, mandatory, immediate, basicProperties, body); 
    basicProperties:通过参数实现消息持久化,MessageProperties.PERSISTENT_TEXT_PLAIN

    public BasicProperties(
                String contentType,//消息类型如:text/plain
                String contentEncoding,//编码
                Map<String,Object> headers,
                Integer deliveryMode,//1:nonpersistent 2:persistent
                Integer priority,//优先级
                String correlationId,
                String replyTo,//反馈队列
                String expiration,//expiration到期时间
                String messageId,
                Date timestamp,
                String type,
                String userId,
                String appId,
                String clusterId)

    mandatory:当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者;当mandatory设为false时,出现上述情形broker会直接将消息扔掉。 
    immediate:当immediate标志位设置为true时,如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。 
    2.事务机制 
    对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,因为consume命令本身没有任何Response返回,所以即使服务器崩溃,没有持久化该消息,生产者也无法获知该消息已经丢失。如果此时使用事务,即通过txSelect()开启一个事务,然后发送消息给服务器,然后通过txCommit()提交该事务,即可以保证,如果txCommit()提交了,则该消息一定会持久化,如果txCommit()还未提交即服务器崩溃,则该消息不会服务器就收。当然Rabbit MQ也提供了txRollback()命令用于回滚某一个事务。

    channel.txSelect()
    ...
    channel.txCommit()
    ...
    channel.txRollback()

    3.Confirm机制 
    事务机制会带来大量的多余开销,并会导致吞吐量下降 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。如果设置channel为confirm状态,则通过该channel发送的消息都会被分配一个唯一的ID,然后一旦该消息被正确的路由到匹配的队列中后,服务器会返回给生产者一个Confirm,该Confirm包含该消息的ID,这样生产者就会知道该消息已被正确分发。对于持久化消息,只有该消息被持久化后,才会返回Confirm。Confirm机制的最大优点在于异步,生产者在发送消息以后,即可继续执行其他任务。而服务器返回Confirm后,会触发生产者的回调函数,生产者在回调函数中处理Confirm信息。如果消息服务器发生异常,导致该消息丢失,会返回给生产者一个nack,表示消息已经丢失,这样生产者就可以通过重发消息,保证消息不丢失。Confirm机制在性能上要比事务优越很多。但是Confirm机制,无法进行回滚,就是一旦服务器崩溃,生产者无法得到Confirm信息,生产者其实本身也不知道该消息吃否已经被持久化,只有继续重发来保证消息不丢失,但是如果原先已经持久化的消息,并不会被回滚,这样队列中就会存在两条相同的消息,系统需要支持去重。可以mandatory配合实现消息的发送可靠性。

    // confirm 异步机制 通过注册listener,实现异步ack,提高性能
            channel.confirmSelect();
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    //失败重发
                }
    
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    //确认ok
                }
            });
    //confirm 同步机制
    if(channel.waitForConfirms(timeout)){  
                //确认ok
            }else{
                //失败从发
            }

    3、消息接收 
    1.autoAck 
    为了确保消息一定被消费者处理,rabbitMQ提供了消息确认功能,就是在消费者处理完任务之后,就给服务器一个回馈,服务器就会将该消息删除,如果消费者超时不回馈,那么服务器将就将该消息重新发送给其他消费者。默认是开启的,在消费者端通过下面的方式开启消息确认, 首先将autoAck自动确认关闭,等我们的任务执行完成之后,手动的去确认.

    boolean autoAck = false;
    channel.basicConsume("hello", autoAck, consumer);
    ...
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    //确认消息,已经收到  
     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); 

    2.公平调度 
    让每个消费者在同一时刻会分配一个任务。 通过channel.basicQos(prefetchCount);可以设置。 
    3.exclusive 
    和queue一样,设置了true,只有第一个启动的消费者可用。 
    channel.basicConsume(queue, autoAck, consumerTag, noLocal, exclusive, arguments, consumer)

  • 相关阅读:
    C# 实现复杂对象的序列化与反序列化
    C#操纵XML文档(主要是应用程序的配置文件)
    滕王阁序——王 勃 (注:我至爱的一篇文章)
    SmartClient(智能客户端)学习笔记之——Smart Client基本学习资源
    listview按列自动排序的一点补充
    (转)SmartClient(智能客户端)学习笔记之——Microsoft Updater Application Block ApplicationUpdater assembly设计
    用超图实现城市给水的爆管分析
    .net2005中对asp.net中GridView的常用操作
    .net2003中对DataGrid的常用操作
    为DataGrid或者GridView或者DataList最前面增加一排序号
  • 原文地址:https://www.cnblogs.com/telwanggs/p/7124670.html
Copyright © 2011-2022 走看看