zoukankan      html  css  js  c++  java
  • RabbitMQ消费端自定义监听器DefaultConsumer

    消费者

    package com.flying.rabbitmq.api.consumer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    /**
     * 自定义消费者类型
     */
    public class Consumer {
    
        
        public static void main(String[] args) throws Exception {
            
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            
            String exchangeName = "test_consumer_exchange";
            String routingKey = "consumer.#";
            String queueName = "test_consumer_queue";
            
            channel.exchangeDeclare(exchangeName, "topic", true, false, null);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            
            channel.basicConsume(queueName, true, new MyConsumer(channel));
    
        }
    }

    自定义消费者

    package com.flying.rabbitmq.api.consumer;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * 实现自己的Consumer
     */
    public class MyConsumer extends DefaultConsumer {
        public MyConsumer(Channel channel){
            super(channel);
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
        }
    }

    生产者

    package com.flying.rabbitmq.api.consumer;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class Producer {
    
        
        public static void main(String[] args) throws Exception {
            
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("127.0.0.1");
            connectionFactory.setPort(5672);
            connectionFactory.setVirtualHost("/");
            
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            
            String exchange = "test_consumer_exchange";
            String routingKey = "consumer.save";
            
            String msg = "Hello RabbitMQ Consumer Message";
            
            for(int i =0; i<5; i ++){
                channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
            }
            
        }
    }
  • 相关阅读:
    securecrt 中文乱码解决方案
    linux文件压缩、下载命令
    weinre调试
    linux查看当前目录命令
    linux下清除缓存文件并重启tomcat
    undefined加引号和不加引号的区别
    web/wap微博分享链接
    linux查找文件内容
    MySQL 5.1 安装过程中报apply security setting错误的解决办法 收藏
    Sleep Mode For WSN of Jennic
  • 原文地址:https://www.cnblogs.com/lflying/p/11107299.html
Copyright © 2011-2022 走看看