zoukankan      html  css  js  c++  java
  • RabbitMQ消费方式汇总

    在学习本章节前,请先学习之前的章节:
    Java访问RabbitMQ:https://www.cnblogs.com/duanjt/p/10057330.html
    RabbitMQ消息发布时的权衡:https://www.cnblogs.com/duanjt/p/10075308.html


    一、推送Consume


    前面我们使用到的都是这种模式,注册一个消费者后,RabbitMQ会在消息可用时,自动将消息进行推送给消费者。这种方式效率最高最及时。
    核心代码如下:

    // 接收消息,第二个参数表示是否自动应答
    channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
            System.out.println(envelope.getRoutingKey() + " 接收到数据:" + new String(body));
        }
    });

    二、拉取Get


    属于一种轮询模型,发送一次get请求,获得一个消息。如果此时RabbitMQ中没有消息,会获得一个表示空的回复。总的来说,这种方式性能比较差,很明显,每获得一条消息,都要和RabbitMQ进行网络通信发出请求。而且对RabbitMQ来说,RabbitMQ无法进行任何优化,因为它永远不知道应用程序何时会发出请求。
    核心代码如下:

    while(true){
        //如果没有消息,将返回null
        GetResponse getResponse = channel.basicGet(queueName, true);    
        if(null!=getResponse){
            System.out.println("received["+getResponse.getEnvelope().getRoutingKey()+"]"+new String(getResponse.getBody()));
        }
        Thread.sleep(1000);
    }

    三、自动确认


    方法channel.basicConsume和方法channel.basicGet表示同步或异步获取消息,第二个参数都表示是否自动确认。前面我们都设置为了true。这个时候我们只需要处理逻辑,将自动向RabbitMQ进行确认。
    当autoAck=true时,一旦消费者接收到了消息,就视为自动确认了消息。如果消费者在处理消息的过程中,出了错,就没有什么办法重新处理这条消息,所以我们很多时候,需要在消息处理成功后,再确认消息,这就需要手动确认。

    四、手动确认

    // 接收消息手动确认,第二个参数表示是否自动应答
    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
            System.out.println(envelope.getRoutingKey() + " 接收到数据:" + new String(body));
            //手动确认,第一个参数是消息标识,第二个参数表示是否批量确认。这里是一条一条确认,所以设置false
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });

    五、QoS预取模式


    在确认消息被接收之前,消费者可以预先要求接收一定数量的消息,在处理完一定数量的消息后,批量进行确认。如果消费者应用程序在确认消息之前崩溃,则所有未确认的消息将被重新发送给其他消费者。所以这里存在着一定程度上的可靠性风险。
    这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)
    核心代码:

    //参数1表示限制条数,参数2 true=channel,false=消费者
    channel.basicQos(100, true);
    
    // 接收消息手动确认,第二个参数表示是否自动应答
    channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
            System.out.println(envelope.getRoutingKey() + " 接收到数据:" + new String(body));
            //手动确认,第一个参数是消息标识,第二个参数表示是否批量确认。这里是一条一条确认,所以设置false
            channel.basicAck(envelope.getDeliveryTag(), false);
        }
    });

    注意:
    1.消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于consume和channel的粒度进行设置(global)

  • 相关阅读:
    mysql 注意事项 PreparedStatement 对比 statement
    Dbutils commons-dbutils-1.3
    C3P0 mysql 5.7
    servlet-应用mysql-1
    javabean 用integer 而不是int
    servlet-1
    servlet 路径 编码 问题
    mac tomcat 9.0
    case end 的用法
    自定义抛出异常
  • 原文地址:https://www.cnblogs.com/duanjt/p/10075897.html
Copyright © 2011-2022 走看看