zoukankan      html  css  js  c++  java
  • rabbitMq(4)之“Direct”模式

    "路由模式"

    1.架构图

      

     模式简介

    • 一个消息生产者P,一个交互者X,多个消息存储队列Q,多个消息消费者C
    • Work模型能够较好的解决消息消费太粗犷的问题(生产多少就消费多少)
    • 多个队列,对应的多个系统,更加合理的处理消息的消费行为

    2.实践应用

    2.1  生产者

    public class Route_Publish_Producer {
        private static String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] args) throws Exception {
            // TODO Auto-generated method stub
            //获得连接
            Connection con =  ConnectUtil.getConnection();
            //获得通道
            Channel channel = con.createChannel();
            //声明创建exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct");
            //同一时刻只给消费者发送一条
    //        channel.basicQos(1);
            //消息内容
            int i = 0;
            //路由标记
            String routeKey = "";
            while(i<50){
                String message = "hello "+i;
                //发送消息到交换机
                if (i % 3 == 0){
                    routeKey = "insert";
                }else if (i % 3 == 1){
                    routeKey = "update";
                }else{
                    routeKey = "delete";
                }
                channel.basicPublish(EXCHANGE_NAME, routeKey, null, message.getBytes());
                //发送成功,打印发送信息
                System.out.println("生产者发送消息是:"+message);
                i++;
                Thread.sleep(i*10);
            }
            //关闭通道和连接
            channel.close();
            con.close();
        }
    
    }

    2.2 消费者 

    消费者1

    public class Router_Exchange_Consumer2 {
        private static String QUEUE_NAME = "test_queue_exchange_direct2";
        private static String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] argv) throws Exception{
            //获得连接
            Connection con =  ConnectUtil.getConnection();
            //获得通道
            Channel channel = con.createChannel();
            //声明创建队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
            //同一时刻只给消费者发送一条
            channel.basicQos(1);
            //创建消息者
            QueueingConsumer consumer = new QueueingConsumer(channel); 
            //发送消息队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
            while(true){
                Delivery delivery= consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("direct 消费者接受的消息是:"+message);
                Thread.sleep(1000);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    消费者2

    public class Router_Exchange_Consumer1 {
        private static String QUEUE_NAME = "test_queue_exchange_direct1";
        private static String EXCHANGE_NAME = "test_exchange_direct";
        public static void main(String[] argv) throws Exception{
            //获得连接
            Connection con =  ConnectUtil.getConnection();
            //获得通道
            Channel channel = con.createChannel();
            //声明创建队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //绑定队列到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");
            //同一时刻只给消费者发送一条
            channel.basicQos(1);
            //创建消息者
            QueueingConsumer consumer = new QueueingConsumer(channel); 
            //发送消息队列
            channel.basicConsume(QUEUE_NAME, false, consumer);
            while(true){
                Delivery delivery= consumer.nextDelivery();
                String message = new String(delivery.getBody());
                System.out.println("direct 消费者接受的消息是:"+message);
                Thread.sleep(1000);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    }

    3.1测试结果:(默认是公平接受)

      生产者:生产insert,update,delelet三种消息;

      消费者1:insert,update,delelet三种消息都会可以消费;

      消费者2:只消费updatet,delete两种消息;

    4.总结

      通过这种模式就到达了消息“路由”的目的,生产者生产全部的消息,而消费者根据自身的绑定的routerkey选择消费的消息类型,这样就达到了消息消费的更加人性化,也更加符合实际应用场景。

      似乎现在的模式满足了大部分的应用需求,但是如果消息种类增多后,而且好多消息可以分组被指定系统消费,似乎一个一个的配置队列的绑定关系就太繁琐了。rabbitmq也考虑到了这种情况,提供了“主题”模式来满足实际应用的切实需要,我们下篇文章着重介绍这种实际更加人文模式。

  • 相关阅读:
    apache安装
    docker搭建redis主从
    docker安装
    sklearn工具-绪论
    数学基础-矩阵和线性代数
    数学基础-概率论与贝叶斯先验
    数学基础-数学分析
    数据科学包——Matplotlib
    数据科学包——pandas
    数据科学包——numpy
  • 原文地址:https://www.cnblogs.com/fengyan20150508/p/9266351.html
Copyright © 2011-2022 走看看