一个生产者对应多个消费者,但一条消息只能有一个消费者获得(可轮循获取)!!!
生产者:
package com..workqueue; import com..utils.RabbitConstant; import com..utils.RabbitUtils; import com.google.gson.Gson; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; public class OrderSystem { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); for(int i = 1 ; i <= 10 ; i++) { SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功"); String jsonSMS = new Gson().toJson(sms); channel.basicPublish("" , RabbitConstant.QUEUE_SMS , null , jsonSMS.getBytes()); } System.out.println("发送数据成功"); channel.close(); connection.close(); } }
3个消费者接收10条消息:
(1):
package com..workqueue; import com..utils.RabbitConstant; import com..utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; public class SMSSender1 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); //如果不写basicQos(1),则自动MQ会将所有请求平均发送给所有消费者 //basicQos,MQ不再对消费者一次发送多个请求,而是消费者处理完一个消息后(确认后),在从队列中获取一个新的 channel.basicQos(1);//处理完一个取一个 channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender1-短信发送成功:" + jsonSMS); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
后台打印信息:
(2):
package com..workqueue; import com..utils.RabbitConstant; import com..utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; public class SMSSender2 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender2-短信发送成功:" + jsonSMS); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
后台打印信息:
(3):
package com..workqueue; import com..utils.RabbitConstant; import com..utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; public class SMSSender3 { public static void main(String[] args) throws IOException { Connection connection = RabbitUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null); channel.basicQos(1); channel.basicConsume(RabbitConstant.QUEUE_SMS , false , new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String jsonSMS = new String(body); System.out.println("SMSSender3-短信发送成功:" + jsonSMS); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(envelope.getDeliveryTag() , false); } }); } }
后台打印信息: