Producer
1 package workqueues; 2 3 import com.rabbitmq.client.Channel; 4 import com.rabbitmq.client.Connection; 5 import com.rabbitmq.client.ConnectionFactory; 6 7 public class Send1 { 8 9 private static final String QUEUE_NAME = "workqueue"; 10 public static void main(String[] args) { 11 foo(); 12 } 13 14 private static void foo() { 15 try{ 16 ConnectionFactory factory = new ConnectionFactory(); 17 factory.setHost("localhost"); 18 Connection connection = factory.newConnection(); 19 Channel channel = connection.createChannel(); 20 boolean durable = true; //指定消息是否需要持久化,避免rebbitMq挂掉之后消息丢失 21 channel.queueDeclare(QUEUE_NAME, durable, false, false, null); 22 String dots = ""; 23 for(int i=0; i < 10; i++) { 24 dots += "."; 25 String message = "helloworld" + dots + dots.length(); 26 channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); 27 System.out.println("Send:"+message); 28 } 29 channel.close(); 30 connection.close(); 31 } catch(Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 }
Consumer1
1 package workqueues; 2 3 import java.io.IOException; 4 5 import com.rabbitmq.client.Channel; 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 import com.rabbitmq.client.DefaultConsumer; 9 import com.rabbitmq.client.Envelope; 10 import com.rabbitmq.client.AMQP.BasicProperties; 11 12 public class Recv2 { 13 private static final String QUEUE_NAME = "workqueue"; 14 public static void main(String[] args) { 15 foo(); 16 } 17 18 private static void foo() { 19 try{ 20 int hashCode = Recv2.class.hashCode(); 21 ConnectionFactory factory = new ConnectionFactory(); 22 factory.setHost("localhost"); 23 Connection connection = factory.newConnection(); 24 Channel channel = connection.createChannel(); 25 channel.queueDeclare(QUEUE_NAME, true, false, false, null); 26 //同时处理的待ack的请求的数量 27 int prefetchCount = 3; 28 channel.basicQos(prefetchCount); 29 //关闭autoAck,表示请求处理完需要Consumer显示的发送应答,否则MQ认为消息未得到处理,后续会发给其他人处理 30 boolean autoAck = false; 31 DefaultConsumer consumer = new DefaultConsumer(channel){ 32 @Override 33 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, 34 byte[] body) throws IOException { 35 String message = new String(body,"UTF-8"); 36 doWork(message); 37 //设定每处理完一条消息都要发送ack,只有发送完ack的,rabbitMq才会认为得到正确处理,否则会发送给其它人 38 channel.basicAck(envelope.getDeliveryTag(), autoAck); 39 } 40 }; 41 //设定需要显示发送ack 42 channel.basicConsume(QUEUE_NAME, autoAck, consumer); 43 } catch(Exception e) { 44 e.printStackTrace(); 45 } 46 } 47 private static void doWork(String task) { 48 System.out.println("Received task:"+task); 49 for(char ch : task.toCharArray()) { 50 if(ch == '.') { 51 try { 52 Thread.sleep(500); 53 } catch (InterruptedException e) { 54 e.printStackTrace(); 55 } 56 } 57 } 58 System.out.println("task done:"+task); 59 } 60 }
其中几个主要参数:
durable:表示是否需要持久化消息,其主要是为了避免rabbitMQ崩溃的时候消息丢失,所以设置为持久化,mq会把未处理的消息写到磁盘。
autoack:指定mq client是否自动应答ack(消息处理确认),实际上我们应该将autoack设置成false,等Consumer处理完消息后显示的调用函数向mq 发送ack,表示这条消息已经得到正确处理。
如果Consumer正在处理一条消息的时候挂了,那么mq没有收到应答,则会将这条消息发给这个queue上的其它Consumer来进行处理。
prefetchCount:Consumer同时处理的待应答的消息的数量,如果设置成1则相当于负载均衡,每个人同时只会处理一条消息。处理完之后才会收到下一条消息。
假如设置成3,那么每个Consumer同一时间最多可以处理3条消息。上面测试代码设置为3.
测试结果
Producer
Send:helloworld.1
Send:helloworld..2
Send:helloworld...3
Send:helloworld....4
Send:helloworld.....5
Send:helloworld......6
Send:helloworld.......7
Send:helloworld........8
Send:helloworld.........9
Send:helloworld..........10
C1
Received task:helloworld.1
task done:helloworld.1
Received task:helloworld..2
task done:helloworld..2
Received task:helloworld...3
task done:helloworld...3
Received task:helloworld.......7
task done:helloworld.......7
Received task:helloworld........8
task done:helloworld........8
Received task:helloworld..........10
task done:helloworld..........10
C2
Received task:helloworld....4
task done:helloworld....4
Received task:helloworld.....5
task done:helloworld.....5
Received task:helloworld......6
task done:helloworld......6
Received task:helloworld.........9
task done:helloworld.........9
备注:先运行Producer代码,然后运行Consumer代码(运行两次。即两个实例,分别命名为C1,C2)
结果分析
Producer生产任务1,2,3,4,5,6,7,8,9,10
C1处理任务1,2,3,7,8,10
· C2处理任务4,5,6,9
基本符合同一时间最多处理三条消息的设置,假设在C1处理10的时候将C1进程强杀掉,那么mq会把10发送给C2进行处理