上一篇讲了个 哈喽World,现在来看看如果存在多个消费者的情况。
生产者:
package com.example.demo; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 竞争消费者模式 */ public class CompetingSend { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 连接工厂 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 获取连接 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明队列,只有他不存在的时候创建 String msg = "Hello World!"; // 发送多条消息 for (int i = 0; i < 5; i++){ channel.basicPublish("", QUEUE_NAME, null, (msg + "-" + i).getBytes()); System.out.println("Sending:" + msg); } channel.close(); connection.close(); } }
消费者:
package com.example.demo; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * 一个生产者,多个消费者 */ public class CompetingReceiveA { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); // 连接工厂 factory.setHost("localhost"); Connection connection = factory.newConnection(); // 获取连接 Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 声明队列,只有他不存在的时候创建 Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String recv = new String(body, "UTF-8"); System.out.println("Receive:" + recv); try { doWork(recv); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("Done"); } } }; // true代表接收到消息后,给兔子发消息,让这条消息失效 channel.basicConsume(QUEUE_NAME, true, consumer); } // 模拟每条消息处理时间不一样 private static void doWork(String msg) throws InterruptedException { char c = msg.charAt(msg.length() - 1); for (int i = 0; i < Integer.parseInt(c+""); i++) Thread.sleep(1000); } }
先启动两个消费者,再启动生产者,查看控制台:
消费者A
消费者B
生产者(这里不必有疑问,这里打印的是修改之前的消息)
要说明的是什么观点呢?
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。一般来说,每个消费者得到的消息是一样多。但是,并不是说每个消费者的任务重量是平均的。很有可能出现A总在处理耗时任务,B一直吃西瓜的情况。
因为兔子不知道每个消息的耗时,他就会傻傻的派遣任务。
不过,官方也有解决办法。
为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。这告诉RabbitMQ不要向消费者发送多于一条消息。换句话说,在它处理并确认了前一个消息之前,不要向工作人员发送新消息。
如果当前消费者正在忙碌(没有确认消息),它会将其分派给空闲下一个消费者。
int prefetchCount = 1; channel.basicQos(prefetchCount);