zoukankan      html  css  js  c++  java
  • 【rabbitmq】Queueingconsumer被废止后老代码如何做的解决方案

    amqp-client 3.x之前的rabbitmq版本有个消费者的写法是借助于Queueingconsumer的:

    QueueingConsumer consumer = new QueueingConsumer(channel);

    channel.basicQos(1);

    channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);

    while (true) {

    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    String message = new String(delivery.getBody());

    System.out.println(" [X] Received '" + message + "'");

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

    break;

    这个应该是5.x之前的经典写法。但是在4.x的版本QueueingConsumer被标记废止5.x被移除。移除的原因是什么呢?

    原来QueueingConsumer内部用LinkedBlockingQueue来存放消息的内容,而LinkedBlockingQueue:一个由链表结构组成的有界队列,照先进先出的顺序进行排序 ,未指定长度的话,默认 此队列的长度为Integer.MAX_VALUE,那么问题来了,如果生产者的速度远远大于消费者的速度,也许没等到队列阻塞的条件产生(长度达到Integer.MAX_VALUE)内存就完蛋了,在老的版本你可以通过设置 rabbitmq的prefetch属性channel.basicQos(prefetch)来处理这个问题如果不设置可能出现内存问题(比如因为网络问题只能向rabbitmq生产不能消费,消费者恢复网络之后就会有大量的数据涌入,出现内存问题,oom fgc等)。

    而且最上面的写法很不合理不符合事件驱动,什么时候停止while循环也不能写的很优雅,所以在更高的版本直接被移除。取而代之的是DefaultConsumer,你可以通过扩展DefaultConsumer来实现消费者:

    消费的代码:(RabbitMqMessageConsumer是对DefaultConsumer的扩展)

    RabbitMqMessageConsumer rpcMessageConsumer = new RabbitMqMessageConsumer(channel,cores);
    channel.basicQos(cores);
    channel.basicConsume(QUEUE_NAME, true, rpcMessageConsumer);

    RabbitMqMessageConsumer代码:

    public class RabbitMqMessageConsumer extends DefaultConsumer{
          public RabbitMqMessageConsumer(Channel channel) {
              super(channel);
         }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
      //TODO someting
     }
    }

    其中handleDelivery是处理消息的逻辑。

    高版本的解决方案给出了,那么回到我们题目的问题,老代码是按照4.x之前的写的,由于某种原因升级到了5.x了如何做?釜底抽薪的办法就是按照上面的事件驱动的方式重写消费者。折中的办法呢(不想改变老代码的逻辑和结构)。

    我就碰到了这样的问题,老代码写了很多的轮子,导致这块代码很难重写。那就只能按照原来QueueingConsumer的写法继续做。解决思路如下:

    首先消费的过程还是按照最开始那样:

    QueueingConsumer consumer = new QueueingConsumer(channel);

    channel.basicQos(1);

    channel.basicConsume(QUEUE_NAME, false, "consumer_test",consumer);

    while (true) {

    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    String message = new String(delivery.getBody());

    System.out.println(" [X] Received '" + message + "'");

    channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

    break;

     然后实现自己的QueueingConsumer(QueueingConsumer已经移除):

    public class QueueingConsumer extends DefaultConsumer{
      private LinkedBlockingQueue<Delivery> queue;
      public QueueingConsumer(Channel channel) {
        super(channel);
        queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>();
      }
      public QueueingConsumer(Channel channel,int size) {
        super(channel);
        queue = new LinkedBlockingQueue<RabbitMqMessageConsumer.Delivery>(size);
      }
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
        Delivery delivery = new Delivery();
        delivery.setBody(body);
        delivery.setProperties(properties);
        delivery.setEnvelope(envelope);
        try {
          queue.put(delivery);
        } catch (InterruptedException e) {
          LogUtils.error(e);
        }
      }
      public Delivery nextDelivery()throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
        return queue.take();
      }
      public Delivery nextDelivery(long timeout)throws InterruptedException, ShutdownSignalException, ConsumerCancelledException{
        return queue.poll(timeout, TimeUnit.MILLISECONDS);
      }
      public class Delivery{
        private BasicProperties properties;
        private byte[] body;
        private Envelope envelope;
        public BasicProperties getProperties() {
          return properties;
        }
        public void setProperties(BasicProperties properties) {
          this.properties = properties;
        }
        public byte[] getBody() {
          return body;
        }
        public void setBody(byte[] body) {
          this.body = body;
        }
        public Envelope getEnvelope() {
          return envelope;
        }
        public void setEnvelope(Envelope envelope) {
          this.envelope = envelope;
        }

      }
    }

    这样你就能在不修改之前的老代码的情况下升级版本了,当然最好还是重写,这个只能起到个过度

  • 相关阅读:
    STM32+ESP8266+AIR202基本控制篇-301-服务器单向SSL认证-MQTT服务器配置SSL单向认证(.Windows系统)
    STM32+ESP8266+AIR202基本控制篇-213-功能测试-微信小程序扫码绑定Air302(NB-IOT),并通过MQTT和Air302(NB-IOT)实现远程通信控制
    17-STM32+ESP8266+AIR202基本控制篇-完成功能2-微信小程序使用APUConfig配网绑定ESP8266,并通过MQTT和ESP8266实现远程通信控制
    Python 元类
    硬核!15张图解Redis为什么这么快
    Protobuf 中 any 的妙用
    Grpc性能压测方法:用ghz进行压测
    压测工具Locuse的使用
    Locust 多机器分布式测试
    kubespray部署kubernetes高可用集群
  • 原文地址:https://www.cnblogs.com/nfsnyy/p/12264590.html
Copyright © 2011-2022 走看看