zoukankan      html  css  js  c++  java
  • RabbitMQ(六)——路由模式

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

    前言

      本章讲解路由模式,路由模式跟发布订阅模式类似,然后在发布订阅模式的基础上改变了类型(fanout => direct),订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由的队列,我们可以看一下下面这张图:

    以上的图是info,error,warning为路由,表示日志的等级通过不同的路由发送到不同的队列中,下面开始路由模式start~~~~

    发布订阅模式 VS 路由模式

      发布:

        发布订阅模式的类型为fanout,而路由模式类型为direct;

    //发布订阅模式
    channel.ExchangeDeclare(ExchangeName, "fanout");
    //路由模式
    channel.ExchangeDeclare(ExchangeName, "direct");
    

      

        订阅发布模式发布时roukey参数为空字符串,路由模式指定了路由;

    //发布订阅模式
    channel.BasicPublish(ExchangeName, "", null, body);
    //路由模式
    channel.BasicPublish(ExchangeName, RouteName, null, body);
    

      

      接收:

          定义交换器时,与发布时一致,发布订阅模式为fanout,路由模式为direct;

    //发布订阅模式
    channel.ExchangeDeclare(ExchangeName, "fanout");
    //路由模式
    channel.ExchangeDeclare(ExchangeName, "direct");
    

      

          绑定路由时,发布订阅模式的routekey参数为空字符串,表示接受所有消息,而路由模式的routekey参数必须指定某一路由。

    //发布订阅模式
    channel.QueueBind(queueName, ExchangeName, "");
    //路由模式
    channel.QueueBind(queueName, ExchangeName, routkey);
    

      

    通过以上对比可以知道,路由模式与发布订阅模式基本一致,唯一差距就是两个参数,exchange类型和 routingKey

     

    代码

      生产者:

    static void Main(string[] args)
            {
                Console.WriteLine("DirectServer发布服务器启动...");
    
                //1.创建连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var conn = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = conn.CreateModel())
                    {
                        //4.声明交换器
                        channel.ExchangeDeclare("directExchange", "direct");
    
                        string msg = "";
                        for (int i = 0; i < 20; i++)
                        {
                            msg = $"发布消息{i}";
                            string ROUTE_KEY = "";
                            var body = Encoding.UTF8.GetBytes(msg);
                            //模拟向不同路由发送消息
                            if (i % 2 == 0)
                            {
                                ROUTE_KEY = "route1";
                            }
                            else
                            {
                                ROUTE_KEY = "route2";
                            }
                            //5.发布消息
                            channel.BasicPublish("directExchange", ROUTE_KEY, null, body);
                            Console.WriteLine($"向{ROUTE_KEY}发布消息成功:{msg}");
    
                            Thread.Sleep(1000);
                        }
                        Console.ReadKey();
                    }
                }
            }
    View Code

      消费者1:

    static void Main(string[] args)
            {
                Console.WriteLine("DirectClient接收客户端启动...");
                //1.创建连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var conn = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = conn.CreateModel())
                    {
                        //3.声明队列
                        var queue = channel.QueueDeclare().QueueName;
                        //4.绑定交换器
                        channel.QueueBind(queue, "directExchange", "route1");
                        //5.声明消费者 消费消息
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                          {
                              //接收消息
                              var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                              Console.WriteLine($"接收route1消息:{body.ToString()}");
                          };
                        channel.BasicConsume(queue, true, consumer);
    
                        Console.ReadKey();
                    }
                }
            }
    View Code

      

      消费者2:

    static void Main(string[] args)
            {
                Console.WriteLine("DirectClient接收客户端启动...");
                //1.创建连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var conn = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = conn.CreateModel())
                    {
                        //3.声明队列
                        var queue = channel.QueueDeclare().QueueName;
                        //4.绑定交换器
                        channel.QueueBind(queue, "directExchange", "route2");
                        //5.声明消费者 消费消息
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                                //返回消息确认
                                channel.BasicAck(e.DeliveryTag, false);
                        };
                        //消费者开始监听
                        channel.BasicConsume(queue, true, consumer);
    
                        Console.ReadKey();
                    }
                }
            }
    View Code

      可以看到生产者发布消息时指定了路由rout1/rout2,随后消息会转发到route1/route2路由上。接收时通过将一个队列与交换器绑定,指定路由route1/route2,这样就能接收到route1/route2路由上的消息。

    效果

      消费者定义的随机队列

                                                    

      向route1、route2发布消息,两个消费者分别接收route1和route2的消息

    效果

      路由模式中生产者发布消息时指定路由,向指定路由发送,消费者绑定交换器与路由即可接收到生产者向此路由发布的消息;

      只有将消费者发送消息的交换器、路由 与生产者指定的交换器、路由一致,消费者才能接收到生产者向指定路由的消费者发送的消息。

      注意:声明路由时类型必须为direct

    附上Demo地址:https://github.com/1164887865/RabbitMQDemo

  • 相关阅读:
    实训9.4.前端:url、href、src,link和@import
    实训9.2.作业1.写一个10次循环,每次得到一个随机数,放进一个集合中,如果这个数已经存在集合中则跳过,最后打印集合中的数字.
    实训9.3. SQL——STRUCTURED QUERY LANGUAGE(结构化查询语言 )
    实训9.2.类集,Collection接口
    实训9.2.IDEA ——java编程语言开发的集成环境(集成开发工具)
    实训9.2. JDK——java语言的软件开发工具包(JAVA的运行环境(JVM+Java系统类库)和JAVA工具) 【java开发的核心】
    从键盘输入数据
    error
    ubuntu 14.04, Command "/usr/bin/python -u -c "import setuptools, tokenize;__file__='
    用Python徒手写线性回归
  • 原文地址:https://www.cnblogs.com/zousc/p/12739504.html
Copyright © 2011-2022 走看看