一:介绍
1.模型
有两种情形,分别是轮训分发与公平分发。
2.出现的场景
考虑到simple queue中的缺点。
因为生产者发送消息后,消费者消费要花费时间,这个会造成消息的堆积。
二:Round robin--轮循
1.发送程序
这个与简单程序类似,只是发送多条数据而已。
1 package com.mq.work.round; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class RoundWorkSend { 8 private static final String QUENE_NAME="test_work_queue"; 9 public static void main(String[] args) throws Exception { 10 //获取一个连接 11 Connection connection= ConnectionUtil.getConnection(); 12 //从连接中获取一个通道 13 Channel channel=connection.createChannel(); 14 //创建队列声明 15 channel.queueDeclare(QUENE_NAME,false,false,false,null); 16 17 //消息与发送放入for循环 18 for (int i=0;i<50;i++){ 19 String msg="hello "+i; 20 System.out.println("[send msg]:"+msg); 21 channel.basicPublish("",QUENE_NAME,null,msg.getBytes()); 22 Thread.sleep(i*1); 23 } 24 25 //关闭连接 26 channel.close(); 27 connection.close(); 28 } 29 }
2.消费者一
1 package com.mq.work.round; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class RoundWorkReceive1 { 9 private static final String QUENE_NAME="test_work_queue"; 10 public static void main(String[] args)throws Exception{ 11 //获取一个连接 12 Connection connection = ConnectionUtil.getConnection(); 13 //创建通道 14 Channel channel = connection.createChannel(); 15 //创建队列声明 16 channel.queueDeclare(QUENE_NAME,false,false,false,null); 17 //创建消费者 18 DefaultConsumer consumer=new DefaultConsumer(channel){ 19 @Override 20 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 String msg=new String(body,"utf-8"); 22 System.out.println("[1]receive msg:"+msg); 23 try { 24 Thread.sleep(200); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 }finally { 28 System.out.println("done"); 29 } 30 } 31 }; 32 //监听队列 33 boolean autoAck=true; 34 channel.basicConsume(QUENE_NAME,autoAck,consumer); 35 } 36 }
3.消费者二
1 package com.mq.work.round; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class RoundWorkReceive2 { 9 private static final String QUENE_NAME="test_work_queue"; 10 public static void main(String[] args)throws Exception{ 11 //获取一个连接 12 Connection connection = ConnectionUtil.getConnection(); 13 //创建通道 14 Channel channel = connection.createChannel(); 15 //创建队列声明 16 channel.queueDeclare(QUENE_NAME,false,false,false,null); 17 //创建消费者 18 DefaultConsumer consumer=new DefaultConsumer(channel){ 19 @Override 20 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 21 String msg=new String(body,"utf-8"); 22 System.out.println("[2]receive msg:"+msg); 23 try { 24 Thread.sleep(300); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 }finally { 28 System.out.println("done"); 29 } 30 } 31 }; 32 //监听队列 33 boolean autoAck=true; 34 channel.basicConsume(QUENE_NAME,autoAck,consumer); 35 } 36 }
4.现象
send
receive1:
receive2:
三:fair dispatcher
1.介绍
使用公平分发需要关闭自动应答,改成手动。
有一种通俗的说法是:能者多劳。
2.生产者
需要改动的地方是:每个消费者在得到确认消息之前,消息队列不得发送一个消息给消费者,一次只能处理一个消息。
1 package com.mq.work.fair; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.Channel; 5 import com.rabbitmq.client.Connection; 6 7 public class FairWorkSend { 8 private static final String QUENE_NAME="test_work_queue"; 9 public static void main(String[] args) throws Exception { 10 //获取一个连接 11 Connection connection= ConnectionUtil.getConnection(); 12 //从连接中获取一个通道 13 Channel channel=connection.createChannel(); 14 //创建队列声明 15 channel.queueDeclare(QUENE_NAME,false,false,false,null); 16 17 //限制发送给一个消费者不得超过1条 18 int prefetchCount=1; 19 channel.basicQos(prefetchCount); 20 21 //消息与发送放入for循环 22 for (int i=0;i<50;i++){ 23 String msg="hello "+i; 24 System.out.println("[send msg]:"+msg); 25 channel.basicPublish("",QUENE_NAME,null,msg.getBytes()); 26 Thread.sleep(i*1); 27 } 28 29 //关闭连接 30 channel.close(); 31 connection.close(); 32 } 33 }
3.消费者一
需要改动的行数,14,18,33,38
1 package com.mq.work.fair; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FairWorkReceive1 { 9 private static final String QUENE_NAME="test_work_queue"; 10 public static void main(String[] args)throws Exception{ 11 //获取一个连接 12 Connection connection = ConnectionUtil.getConnection(); 13 //创建通道 14 final Channel channel = connection.createChannel(); 15 //创建队列声明 16 channel.queueDeclare(QUENE_NAME,false,false,false,null); 17 18 //一次只能发送一个消息 19 channel.basicQos(1); 20 21 //创建消费者 22 DefaultConsumer consumer=new DefaultConsumer(channel){ 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 String msg=new String(body,"utf-8"); 26 System.out.println("[1]receive msg:"+msg); 27 try { 28 Thread.sleep(200); 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 }finally { 32 System.out.println("done"); 33 //手动应答 34 channel.basicAck(envelope.getDeliveryTag(),false); 35 } 36 } 37 }; 38 //监听队列,不是自动应答 39 boolean autoAck=false; 40 channel.basicConsume(QUENE_NAME,autoAck,consumer); 41 } 42 }
3.消费者二
与消费者一不同点在于消费每个消息的时间不同。
1 package com.mq.work.fair; 2 3 import com.mq.utils.ConnectionUtil; 4 import com.rabbitmq.client.*; 5 6 import java.io.IOException; 7 8 public class FairWorkReceive2 { 9 private static final String QUENE_NAME="test_work_queue"; 10 public static void main(String[] args)throws Exception{ 11 //获取一个连接 12 Connection connection = ConnectionUtil.getConnection(); 13 //创建通道 14 final Channel channel = connection.createChannel(); 15 //创建队列声明 16 channel.queueDeclare(QUENE_NAME,false,false,false,null); 17 18 //一次只能发送一个消息 19 channel.basicQos(1); 20 21 //创建消费者 22 DefaultConsumer consumer=new DefaultConsumer(channel){ 23 @Override 24 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { 25 String msg=new String(body,"utf-8"); 26 System.out.println("[1]receive msg:"+msg); 27 try { 28 Thread.sleep(500); 29 } catch (InterruptedException e) { 30 e.printStackTrace(); 31 }finally { 32 System.out.println("done"); 33 //手动应答 34 channel.basicAck(envelope.getDeliveryTag(),false); 35 } 36 } 37 }; 38 //监听队列,不是自动应答 39 boolean autoAck=false; 40 channel.basicConsume(QUENE_NAME,autoAck,consumer); 41 } 42 }
4.现象
消费者一:
消费者二: