zoukankan      html  css  js  c++  java
  • RabbitMQ知识点整理11-消费消息

    Rabb itMQ 的消费模式分两种: 推( Push )模式和拉( Pull )模式。推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get 进行消费。

    推模式

    在推模式中,可以通过持续订阅的方式来消费消息,使用到的相关类有:

    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;

    接收消息一般通过实现Consumer接口或者继承DefaultConsumer 类来实现。当调用与Consumer相关API的时候, 不同的订阅采用不同的消费标签(consumerTag)来区分彼此, 在同一个channel中的消费者也需要通过唯一的消费者标签以作区分, 关键消费代码如下:

    public class ConsumerTest {
    
        final private static String QUEUE_NAME = "queue_demo";
        final private static String IP_ADDRESS = "172.16.176.40";
        final private static int PORT = 15672;
    
        /**
         *
         */
        @Test
        public void consumerTest() throws IOException, TimeoutException {
            //
            final ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(IP_ADDRESS);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root123");
            
            //
            final Connection connection = connectionFactory.newConnection();
            final Channel channel = connection.createChannel();
    
            channel.basicQos(64);
            boolean autoAck = false;
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope
                        , AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 得到路由键
                    final String routingKey = envelope.getRoutingKey();
                    final String contentType = properties.getContentType();
                    final long deliveryTag = envelope.getDeliveryTag();
                    // 接受的内容
                    String content = new String(body);
                    // 这里处理消息
                    channel.basicAck(deliveryTag, false);
                }
            };
    
            channel.basicConsume(QUEUE_NAME, autoAck, "myConsumerTag", consumer);
        }
    }
    View Code

    注意: 上面代码中, 端口是 5672, 不是 15672, 15672只是rabbitmq的web页面的端口号, 需要连接服务器, 端口号请使用5672

    注意, 上面的代码中显示的设置channel.basicConsume的参数autoAck为false, 然后在接收到消息处理之后, 在进行显式的ack操作(channel.basicAck), 对于消费者来说这是非常有必要的, 可以防止消息不必要的丢失.

    channel类中的basicConsume方法有如下几种形式:

    1.String basicConsume(String queue, Consumer callback) throws IOException;
    2.String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; 3.String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
    4.String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; 5.
    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

    其对应的参数说明如下所述:

    queue: 队列的名称

    autoAck: 设置是否自动确认, 建议设置成false, 即不自动确认

    consumerTag: 消费者标签, 用来区分多个消费者

    noLocal: 设置为true, 则表示不能将同一个Connection中生产者发送的消息传递给这个Connection中的消费者

    exclusive: 是否排他

    arguments: 设置消费者的其他参数

    callback: 设置消费者的回调函数, 用来处理rabbitmq推送过来的消息, 比如DefaultConsumer, 使用时需要客户端重写其中的方法

    对于消费者来说, 重写handleDelivery方法是十分方便的, 更复杂的消费者客户端会重写更多的方法, 如下:

    void handleConsumeOk(String consumerTag) ;
    void handleCancelOk(String consumerTag);
    void handleCancel(String consumerTag) throws IOException;
    void handleShutdownSignal(String consumerTag , ShutdownSignalException sig) ;
    void handleRecoverOk(String consumerTag);

    比如handleShutdownSignal方法, 在channel或connection关闭的时候会调用, 再者, handleConsumeOk方法会在其他方法之前调用, 返回消费者标签.

    重写handleCancelOk和handleCancel方法, 这样消费端可以显式的或隐式的取消订阅的时候调用, 也可以通过channel.basicCancel方法来显式的取消一个消费者的消息订阅:

    channel.basicCancel(consumerTag);

    注意上面这行代码会先触发handleConsumerOk方法, 之后触发handleDelivery方法, 最后才触发handleCancelOk方法

    和生产者一样,消费者客户端同样需要考虑线程安全的问题, 消费者客户端的这些callback会被分配到与Channel不同的线程池上, 这意味着消费者客户端可以安全地调用这些阻塞方
    法,比如channel.queueDeclare 、channel.basicCancel 等

    每个Channel 都拥有自己独立的线程, 最常用的做法是一个Channel 对应一个消费者,也就是意味着消费者彼此之间没有任何关联, 当然也可以在一个Channel 中维持多个消费者,但是要注意一个问题,如果Channel 中的一个消费者一直在运行,那么其他消费者的callback会被"耽搁"。

    拉模式

    通过channel.basicGet 方法可以单条地获取消息,其返回值是GetRespone, Channel 类的basicGet 方法没有其他重载方法, 只有:

    GetResponse basicGet(String queue, boolean autoAck) throws IOException;

    其中queue 代表队列的名称,如果设置autoAck 为false , 那么像推模式那样需要调用channel.basicAck 来确认消息己被成功接收。

    拉模式的关键代码如下:

    **
         * 拉模式
         */
        @Test
        public void consumerTest2() throws IOException {
            //
            final ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(IP_ADDRESS);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root123");
    
            //
            final Connection connection = connectionFactory.newConnection();
            final Channel channel = connection.createChannel();
    
            final GetResponse response = channel.basicGet(QUEUE_NAME, false);
    
            // 获取消息
            String content = new String(response.getBody());
    
            // 处理消息确认
            channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
        }
    View Code

    注意要点:

    Basic.Consume将信道(Channel) 置为接收模式,直到取消队列的订阅为止。在接收模式期间, RabbitMQ 会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制.如果只想从队列获得单条消息而不是持续订阅,建议还是使用Basic.Get 进行消费.但是不能将Basic.Get 放在一个循环里来代替Basic.Consume,这样做会严重影RabbitMQ的性能.如果要实现高吞吐量,消费者理应使用Basic.Consume 方法。

  • 相关阅读:
    hiveserver2 with kerberos authentication
    python Basic usage
    python Quicksort demo
    Python HeapSort
    mrunit for wordcount demo
    CCDH证书
    Hadoop question list
    Hadoop Yarn core concepts
    Hadoop Resource
    Hadoop could not find or load main class
  • 原文地址:https://www.cnblogs.com/no-celery/p/14014718.html
Copyright © 2011-2022 走看看