1.模型
2.创建生产者
package com.dwz.rabbitmq.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** |--c1 * p---Queue--| * |--c2 */ public class Send { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for(int i = 0; i < 50; i++) { String msg = "send:--" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } channel.close(); connection.close(); } }
3.创建消费者1
package com.dwz.rabbitmq.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class rev01 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取一个连接 Connection connection = ConnectionUtils.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //自动接收消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("rev01:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }; //监听队列 channel.basicConsume(QUEUE_NAME, false, consumer); } }
4.创建消费者2
package com.dwz.rabbitmq.work; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.dwz.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class rev02 { private static final String QUEUE_NAME = "test_work_queue"; public static void main(String[] args) throws IOException, TimeoutException { //获取一个连接 Connection connection = ConnectionUtils.getConnection(); //从连接中获取一个通道 Channel channel = connection.createChannel(); //队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //定义消费者 DefaultConsumer consumer = new DefaultConsumer(channel) { //自动接收消息 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("rev02:" + msg); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }; //监听队列 channel.basicConsume(QUEUE_NAME, false, consumer); } }
5.运行代码
预期结果:
按照延迟加载时间获取消息数量不同,数量比例为 延时1:延时2
测试结果如下:
1.两个消费者先启动完成,再启动生产者,这时会采用轮询分发的方式,消费者1和消费者2各拿到一半的消息
2.生产者先启动完成,消费者按照先后顺序启动,会发现所有消息都被先启动的那个消费者接收
达到预期结果的解决方案:
消费者限流+手动签收确认
消费者限流:channel.basicQos(1);
手动签收确认:channel.basicAck(envelope.getDeliveryTag(), false);