zoukankan      html  css  js  c++  java
  • Spring Boot中通过RabbitTemplate主动pull(get)消息的例子

    import java.util.Properties;
    import java.util.function.Consumer;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.ChannelCallback;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
    import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.GetResponse;
    
    @Service
    public class RabbitAdminServices {
    
        private static final Logger logger = LoggerFactory.getLogger(RabbitAdminServices.class);
    
        @Autowired
        AmqpAdmin rabbitAdmin;
    
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @Autowired
        MessageConverter messageConverter;
    
        private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    
        public int getCount(String queueName) {
    
            Properties properties = rabbitAdmin.getQueueProperties(queueName);
            return (Integer)properties.get(RabbitAdmin.QUEUE_MESSAGE_COUNT);
        }
    
        public <T> void processQueue(String queueName, Integer count, Class<T> clazz, Consumer<T> consumer) {
    
            int reprocessCount = getCount(queueName);
            int requestCount = reprocessCount;
            if(count != null) {
                requestCount = count;
            }
            for(int i = 0; i < reprocessCount && i < requestCount; i++) {
                rabbitTemplate.execute(new ChannelCallback<T>() {
    
                    @Override
                    public T doInRabbit(Channel channel) throws Exception {
                        GetResponse response = channel.basicGet(queueName, false);
                        T result = null;
                        try {
                            MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
                            if(response.getMessageCount() >= 0) {
                                messageProps.setMessageCount(response.getMessageCount());
                            }
                            Message message = new Message(response.getBody(), messageProps);
                            result = (T)messageConverter.fromMessage(message);
                            consumer.accept(result);
                            channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
                        }
                        catch(Exception e) {
                            channel.basicNack(response.getEnvelope().getDeliveryTag(), false, true);
                        }
                        return result;
                    }
                });
    
            }
        }
    }

    实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。

    实现推模式推荐的方式是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。

    推模式是最常用的,但是有些情况下推模式并不适用的,比如说:

    • 由于某些限制,消费者在某个条件成立时才能消费消息

    • 需要批量拉取消息进行处理

     

    实现拉模式

    RabbitMQ的Channel提供了 basicGet 方法用于拉取消息。

    1.  
      /**
    2.  
      * Retrieve a message from a queue using {@link com.rabbitmq.client.AMQP.Basic.Get}
    3.  
      * @see com.rabbitmq.client.AMQP.Basic.Get
    4.  
      * @see com.rabbitmq.client.AMQP.Basic.GetOk
    5.  
      * @see com.rabbitmq.client.AMQP.Basic.GetEmpty
    6.  
      * @param queue the name of the queue
    7.  
      * @param autoAck true if the server should consider messages
    8.  
      * acknowledged once delivered; false if the server should expect
    9.  
      * explicit acknowledgements
    10.  
      * @return a {@link GetResponse} containing the retrieved message data
    11.  
      * @throws java.io.IOException if an error is encountered
    12.  
      */
    13.  
      GetResponse basicGet(String queue, boolean autoAck) throws IOException;

    basicGet 返回 GetResponse 类。

    1.  
      public class GetResponse {
    2.  
      private final Envelope envelope;
    3.  
      private final BasicProperties props;
    4.  
      private final byte[] body;
    5.  
      private final int messageCount;
    6.  
       
    7.  
      // ...
     

    public class GetResponse { private final Envelope envelope; private final BasicProperties props; private final byte[] body; private final int messageCount; // ...

    rabbitmq-client版本4.0.3

    使用 basicGet 拉取消息需要注意:

    1.  
      basicGet
    2.  
      DefaultConsumer

    示例代码:

    1.  
      private void consume(Channel channel) throws IOException, InterruptedException {
    2.  
      while (true) {
    3.  
      if (!isConditionSatisfied()) {
    4.  
      TimeUnit.MILLISECONDS.sleep(1);
    5.  
      continue;
    6.  
      }
    7.  
      GetResponse response = channel.basicGet(CAOSH_TEST_QUEUE, false);
    8.  
      if (response == null) {
    9.  
      TimeUnit.MILLISECONDS.sleep(1);
    10.  
      continue;
    11.  
      }
    12.  
      String data = new String(response.getBody());
    13.  
      logger.info("Get message <= {}", data);
    14.  
      channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
    15.  
      }
    16.  
      }

    批量拉取消息

    RabbitMQ支持客户端批量拉取消息,客户端可以连续调用 basicGet 方法拉取多条消息,处理完成之后一次性ACK。需要注意:

    1.  
      basicGet
    2.  
      basicAck

    示例代码:

    1.  
      String bridgeQueueName = extractorProperties.getBridgeQueueName();
    2.  
      int batchSize = extractorProperties.getBatchSize();
    3.  
      List<GetResponse> responseList = Lists.newArrayListWithCapacity(batchSize);
    4.  
      long tag = 0;
    5.  
      while (responseList.size() < batchSize) {
    6.  
      GetResponse getResponse = channel.basicGet(bridgeQueueName, false);
    7.  
      if (getResponse == null) {
    8.  
      break;
    9.  
      }
    10.  
      responseList.add(getResponse);
    11.  
      tag = getResponse.getEnvelope().getDeliveryTag();
    12.  
      }
    13.  
      if (responseList.isEmpty()) {
    14.  
      TimeUnit.MILLISECONDS.sleep(1);
    15.  
      } else {
    16.  
      logger.info("Get <{}> responses this batch", responseList.size());
    17.  
      // handle messages
    18.  
      channel.basicAck(tag, true);
    19.  
      }

    关于QueueingConsumer

    QueueingConsumer 在客户端本地使用 BlockingQueue 缓冲消息,其nextDelivery方法也可以用于实现拉模式(其本质上是 BlockingQueue.take ),但是 QueueingConsumer 现在已经标记为Deprecated。

  • 相关阅读:
    glib 库 hash table 使用
    git 使用
    centos6.5 下安装 sqlplus
    Oracle tns 协议
    unix环境高级编程附录 B 通用代码
    centos6.5安装gtk开发环境
    linux 下定位程序假死
    Kotlin与Android SDK 集成(KAD 05)
    Kotlin的android扩展:对findViewById说再见(KAD 04)
    Kotlin类:功能更强、而更简洁(KAD 03)
  • 原文地址:https://www.cnblogs.com/exmyth/p/13822234.html
Copyright © 2011-2022 走看看