场景:
我们一般在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理。
实际环境:
我们使用自定义的Consumer更加的方便,解耦性更强,也在实际工作中最常用。
操作:
//生产端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "test_consumer_exchange"; String routingKey = "consumer.save"; String msg = "Hello RabbitMQ Consumer Message"; for(int i =0; i<5; i ++){ channel.basicPublish(exchange, routingKey, true, null, msg.getBytes()); }
//消费端代码 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_consumer_exchange"; String routingKey = "consumer.#"; String queueName = "test_consumer_queue"; channel.exchangeDeclare(exchangeName, "topic", true, false, null); channel.queueDeclare(queueName, true, false, false, null); channel.queueBind(queueName, exchangeName, routingKey); //使用自定义consumer channel.basicConsume(queueName, true, new MyConsumer(channel));
//自定义消费端 //继承DefaultConsumer类 public class MyConsumer extends DefaultConsumer { public MyConsumer(Channel channel) { super(channel); } //重写handleDelivery() @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.err.println("-----------consume message----------"); System.err.println("consumerTag: " + consumerTag); System.err.println("envelope: " + envelope); System.err.println("properties: " + properties); System.err.println("body: " + new String(body)); } }
运行结果: