zoukankan      html  css  js  c++  java
  • RabbitMQ消费端自定义监听(九)

      场景

        我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。

      实际环境

        我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用。

      操作  

            //生产端代码
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_consumer_exchange";
            String routingKey = "consumer.save";
            
            String msg = "Hello RabbitMQ Consumer Message";
            
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
            //消费端代码
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            
            String exchangeName = "test_consumer_exchange";
            String routingKey = "consumer.#";
            String queueName = "test_consumer_queue";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            //使用自定义consumer
            channel.basicConsume(queueName, true, new MyConsumer(channel));    
           //自定义消费端
            //继承DefaultConsumer类
            public class MyConsumer extends DefaultConsumer {
    
    
                   public MyConsumer(Channel channel) {
                           super(channel);
                   }
        
                   //重写handleDelivery()
                  @Override
                  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                          System.err.println("-----------consume message----------");
                          System.err.println("consumerTag: " + consumerTag);
                          System.err.println("envelope: " + envelope);
                          System.err.println("properties: " + properties);
                          System.err.println("body: " + new String(body));
                   }
    
      
              }    

        运行结果:

        

  • 相关阅读:
    BZOJ1819 [JSOI]Word Query电子字典 Trie
    洛谷2973 [USACO10HOL]赶小猪Driving Out the Piggi… 概率 高斯消元
    BZOJ2669 [cqoi2012]局部极小值 状压DP 容斥原理
    BZOJ5047 空间传送装置 2017年9月月赛 最短路 SPFA
    BZOJ5045 打砖块 2017年9月月赛 其他
    BZOJ1858 [Scoi2010]序列操作 线段树
    BZOJ1826 [JSOI2010]缓存交换 堆 贪心
    BZOJ1898 [Zjoi2005]Swamp 沼泽鳄鱼 矩阵
    BZOJ1878 [SDOI2009]HH的项链 树状数组 或 莫队
    BZOJ1875 [SDOI2009]HH去散步 矩阵
  • 原文地址:https://www.cnblogs.com/luhan777/p/11193004.html
Copyright © 2011-2022 走看看