"路由模式"
1.架构图
模式简介
- 一个消息生产者P,一个交互者X,多个消息存储队列Q,多个消息消费者C
- Work模型能够较好的解决消息消费太粗犷的问题(生产多少就消费多少)
- 多个队列,对应的多个系统,更加合理的处理消息的消费行为
2.实践应用
2.1 生产者
public class Route_Publish_Producer { private static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] args) throws Exception { // TODO Auto-generated method stub //获得连接 Connection con = ConnectUtil.getConnection(); //获得通道 Channel channel = con.createChannel(); //声明创建exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); //同一时刻只给消费者发送一条 // channel.basicQos(1); //消息内容 int i = 0; //路由标记 String routeKey = ""; while(i<50){ String message = "hello "+i; //发送消息到交换机 if (i % 3 == 0){ routeKey = "insert"; }else if (i % 3 == 1){ routeKey = "update"; }else{ routeKey = "delete"; } channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes()); //发送成功,打印发送信息 System.out.println("生产者发送消息是:"+message); i++; Thread.sleep(i*10); } //关闭通道和连接 channel.close(); con.close(); } }
2.2 消费者
消费者1
public class Router_Exchange_Consumer2 { private static String QUEUE_NAME = "test_queue_exchange_direct2"; private static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception{ //获得连接 Connection con = ConnectUtil.getConnection(); //获得通道 Channel channel = con.createChannel(); //声明创建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); //同一时刻只给消费者发送一条 channel.basicQos(1); //创建消息者 QueueingConsumer consumer = new QueueingConsumer(channel); //发送消息队列 channel.basicConsume(QUEUE_NAME, false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("direct 消费者接受的消息是:"+message); Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
消费者2
public class Router_Exchange_Consumer1 { private static String QUEUE_NAME = "test_queue_exchange_direct1"; private static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception{ //获得连接 Connection con = ConnectUtil.getConnection(); //获得通道 Channel channel = con.createChannel(); //声明创建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete"); //同一时刻只给消费者发送一条 channel.basicQos(1); //创建消息者 QueueingConsumer consumer = new QueueingConsumer(channel); //发送消息队列 channel.basicConsume(QUEUE_NAME, false, consumer); while(true){ Delivery delivery= consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("direct 消费者接受的消息是:"+message); Thread.sleep(1000); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
3.1测试结果:(默认是公平接受)
生产者:生产insert,update,delelet三种消息;
消费者1:insert,update,delelet三种消息都会可以消费;
消费者2:只消费updatet,delete两种消息;
4.总结
通过这种模式就到达了消息“路由”的目的,生产者生产全部的消息,而消费者根据自身的绑定的routerkey选择消费的消息类型,这样就达到了消息消费的更加人性化,也更加符合实际应用场景。
似乎现在的模式满足了大部分的应用需求,但是如果消息种类增多后,而且好多消息可以分组被指定系统消费,似乎一个一个的配置队列的绑定关系就太繁琐了。rabbitmq也考虑到了这种情况,提供了“主题”模式来满足实际应用的切实需要,我们下篇文章着重介绍这种实际更加人文模式。