zoukankan      html  css  js  c++  java
  • Rabbitmq channel参数详解

    1、Channel 

    1.1 channel.exchangeDeclare():
    type:有direct、fanout、topic三种:

    fanout

    fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中

    direct

    direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。

    topic

    规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。它的约定是:

    routing key为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit” binding key与routing key一样也是句点号“. ”分隔的字符串。

    binding key中可以存在两种特殊字符“”与“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

    durable:true、false true:服务器重启会保留下来Exchange。警告:仅设置此选项,不代表消息持久化。即不保证重启后消息还在。
    原文:true if we are declaring a durable exchange (the exchange will survive a server restart)

    autoDelete:true、false.true:当已经没有消费者时,服务器是否可以删除该Exchange。
    原文1:true if the server should delete the exchange when it is no longer in use。
    /**
         * Declare an exchange.
         * @see com.rabbitmq.client.AMQP.Exchange.Declare
         * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
         * @param exchange the name of the exchange
         * @param type the exchange type
         * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
         * @param autoDelete true if the server should delete the exchange when it is no longer in use
         * @param arguments other properties (construction arguments) for the exchange
         * @return a declaration-confirm method to indicate the exchange was successfully declared
         * @throws java.io.IOException if an error is encountered
         */
        Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                           Map<String, Object> arguments) throws IOException;
    1.2 chanel.basicQos()
    prefetchSize:0 
    prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
    global:truefalse 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是consumer级别
    备注:据说prefetchSize 和global这两项,rabbitmq没有实现,暂且不研究

     QoS = quality-of-service, 顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,而应该设计有完善的异常处理机制。在出现错误的时候知道在哪里出现什么样子的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。在接收消息出现故障的时候我们可以通过RabbitMQ重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式。

    甚至QoS是在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加如下代码:

    channel.BasicQos(0, 1, false);

    代码第一个参数是可接收消息的大小的,但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况。第二个参数是处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。如果输入3,那么可以最多有3个消息不应答,如果到达了3个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。第三个参数则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。

    这种数量的设置,也为我们在多个客户端监控同一个queue的这种负载均衡环境下提供了更多的选择。

    /**
         * Request specific "quality of service" settings.
         *
         * These settings impose limits on the amount of data the server
         * will deliver to consumers before requiring acknowledgements.
         * Thus they provide a means of consumer-initiated flow control.
         * @see com.rabbitmq.client.AMQP.Basic.Qos
         * @param prefetchSize maximum amount of content (measured in
         * octets) that the server will deliver, 0 if unlimited
         * @param prefetchCount maximum number of messages that the server
         * will deliver, 0 if unlimited
         * @param global true if the settings should be applied to the
         * entire channel rather than each consumer
         * @throws java.io.IOException if an error is encountered
         */
        void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;
    1.3 channel.basicPublish()
    routingKey:路由键,#匹配0个或多个单词,*匹配一个单词,在topic exchange做消息转发用

    mandatory:true:如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返还给生产者。
          false:出现上述情形broker会直接将消息扔掉
    immediate:true:如果exchange在将消息route到queue(s)时发现对应的queue上没有消费者,那么这条消息不会放入队列中。
          当与消息routeKey关联的所有queue(一个或多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
    BasicProperties :需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化
          这里指的是消息的持久化,配合channel(durable=true),queue(durable)可以实现,即使服务器宕机,消息仍然保留
    简单来说:mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;
        immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,
        如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。
    /**
         * Publish a message.
         *
         * Publishing to a non-existent exchange will result in a channel-level
         * protocol exception, which closes the channel.
         *
         * Invocations of <code>Channel#basicPublish</code> will eventually block if a
         * <a href="http://www.rabbitmq.com/alarms.html">resource-driven alarm</a> is in effect.
         *
         * @see com.rabbitmq.client.AMQP.Basic.Publish
         * @see <a href="http://www.rabbitmq.com/alarms.html">Resource-driven alarms</a>.
         * @param exchange the exchange to publish the message to
         * @param routingKey the routing key
         * @param mandatory true if the 'mandatory' flag is to be set
         * @param immediate true if the 'immediate' flag is to be
         * set. Note that the RabbitMQ server does not support this flag.
         * @param props other properties for the message - routing headers etc
         * @param body the message body
         * @throws java.io.IOException if an error is encountered
         */
        void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
                throws IOException;

    1.4 channel.basicAck();

    deliveryTag:该消息的index
    multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
    
    
    /**
         * Acknowledge one or several received
         * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
         * containing the received message being acknowledged.
         * @see com.rabbitmq.client.AMQP.Basic.Ack
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param multiple true to acknowledge all messages up to and
         * including the supplied delivery tag; false to acknowledge just
         * the supplied delivery tag.
         * @throws java.io.IOException if an error is encountered
         */
        void basicAck(long deliveryTag, boolean multiple) throws IOException;

    1.5 channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);

    deliveryTag:该消息的index
    multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
    requeue:被拒绝的是否重新入队列
    /**
         * Reject one or several received messages.
         *
         * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
         * @see com.rabbitmq.client.AMQP.Basic.Nack
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param multiple true to reject all messages up to and including
         * the supplied delivery tag; false to reject just the supplied
         * delivery tag.
         * @param requeue true if the rejected message(s) should be requeued rather
         * than discarded/dead-lettered
         * @throws java.io.IOException if an error is encountered
         */
        void basicNack(long deliveryTag, boolean multiple, boolean requeue)
                throws IOException;

    1.6channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);

    deliveryTag:该消息的index
    requeue:被拒绝的是否重新入队列

    channel.basicNack 与 channel.basicReject 的区别在于basicNack可以拒绝多条消息,而basicReject一次只能拒绝一条消息
    /**
         * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
         * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
         * containing the received message being rejected.
         * @see com.rabbitmq.client.AMQP.Basic.Reject
         * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
         * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
         * @throws java.io.IOException if an error is encountered
         */
        void basicReject(long deliveryTag, boolean requeue) throws IOException;
    1.7 channel.basicConsume(QUEUE_NAME, true, consumer);
    autoAck:是否自动ack,如果不自动ack,需要使用channel.ack、channel.nack、channel.basicReject 进行消息应答
    
    
    /**
         * Start a non-nolocal, non-exclusive consumer, with
         * a server-generated consumerTag.
         * @param queue the name of the queue
         * @param autoAck true if the server should consider messages
         * acknowledged once delivered; false if the server should expect
         * explicit acknowledgements
         * @param callback an interface to the consumer object
         * @return the consumerTag generated by the server
         * @throws java.io.IOException if an error is encountered
         * @see com.rabbitmq.client.AMQP.Basic.Consume
         * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
         * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
         */
        String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
    
    

    1.8 chanel.exchangeBind()

    channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
    用于通过绑定bindingKey将queue到Exchange,之后便可以进行消息接收
    /**
         * Bind an exchange to an exchange, with no extra arguments.
         * @see com.rabbitmq.client.AMQP.Exchange.Bind
         * @see com.rabbitmq.client.AMQP.Exchange.BindOk
         * @param destination the name of the exchange to which messages flow across the binding
         * @param source the name of the exchange from which messages flow across the binding
         * @param routingKey the routine key to use for the binding
         * @return a binding-confirm method if the binding was successfully created
         * @throws java.io.IOException if an error is encountered
         */
        Exchange.BindOk exchangeBind(String destination, String source, String routingKey) throws IOException;

    1.8 channel.queueDeclare(QUEUE_NAME, false, false, false, null);

    durable:true、false true:在服务器重启时,能够存活
    exclusive :是否为当前连接的专用队列,在连接断开后,会自动删除该队列,生产环境中应该很少用到吧。
    autodelete:当没有任何消费者使用时,自动删除该队列。
    /**
         * Declare a queue
         * @see com.rabbitmq.client.AMQP.Queue.Declare
         * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
         * @param queue the name of the queue
         * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
         * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
         * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
         * @param arguments other properties (construction arguments) for the queue
         * @return a declaration-confirm method to indicate the queue was successfully declared
         * @throws java.io.IOException if an error is encountered
         */
        Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
    ref:RabbitMQ Channel 参数详解 - 简书 (jianshu.com)
  • 相关阅读:
    Python 之 raw_input()与input()区别
    Python基础语法
    在Cloudera Hadoop CDH上安装R及RHadoop(rhdfs、rmr2、rhbase、RHive)
    MapReduce 过程详解
    Cloudera Manager and CDH安装及配置
    RFC 目录
    聊一聊 tcp拥塞控制 九 fack
    聊一聊tcp 拥塞控制 八 相关数据结构&& 概念
    聊一聊 tcp 拥塞控制 七 转载
    udp connected socket
  • 原文地址:https://www.cnblogs.com/cy0628/p/15210029.html
Copyright © 2011-2022 走看看