zoukankan      html  css  js  c++  java
  • RabbitMQ(六)——路由模式

    RabbitMQ系列

    RabbitMQ(一)——简介

    RabbitMQ(二)——模式类型

    RabbitMQ(三)——简单模式

    RabbitMQ(四)——工作队列模式

    RabbitMQ(五)——发布订阅模式

    RabbitMQ(六)——路由模式

    RabbitMQ(七)——主题模式

    RabbitMQ(八)——消息确认

    RabbitMQ(九)——消息持久化

    RabbitMQ(十)——消息优先级

    前言

      本章讲解路由模式,路由模式跟发布订阅模式类似,然后在发布订阅模式的基础上改变了类型(fanout => direct),订阅模式是分发到所有绑定到交换机的队列,路由模式只分发到绑定在交换机上面指定路由的队列,我们可以看一下下面这张图:

    以上的图是info,error,warning为路由,表示日志的等级通过不同的路由发送到不同的队列中,下面开始路由模式start~~~~

    发布订阅模式 VS 路由模式

      发布:

        发布订阅模式的类型为fanout,而路由模式类型为direct;

    //发布订阅模式
    channel.ExchangeDeclare(ExchangeName, "fanout");
    //路由模式
    channel.ExchangeDeclare(ExchangeName, "direct");
    

      

        订阅发布模式发布时roukey参数为空字符串,路由模式指定了路由;

    //发布订阅模式
    channel.BasicPublish(ExchangeName, "", null, body);
    //路由模式
    channel.BasicPublish(ExchangeName, RouteName, null, body);
    

      

      接收:

          定义交换器时,与发布时一致,发布订阅模式为fanout,路由模式为direct;

    //发布订阅模式
    channel.ExchangeDeclare(ExchangeName, "fanout");
    //路由模式
    channel.ExchangeDeclare(ExchangeName, "direct");
    

      

          绑定路由时,发布订阅模式的routekey参数为空字符串,表示接受所有消息,而路由模式的routekey参数必须指定某一路由。

    //发布订阅模式
    channel.QueueBind(queueName, ExchangeName, "");
    //路由模式
    channel.QueueBind(queueName, ExchangeName, routkey);
    

      

    通过以上对比可以知道,路由模式与发布订阅模式基本一致,唯一差距就是两个参数,exchange类型和 routingKey

     

    代码

      生产者:

    static void Main(string[] args)
            {
                Console.WriteLine("DirectServer发布服务器启动...");
    
                //1.创建连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var conn = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = conn.CreateModel())
                    {
                        //4.声明交换器
                        channel.ExchangeDeclare("directExchange", "direct");
    
                        string msg = "";
                        for (int i = 0; i < 20; i++)
                        {
                            msg = $"发布消息{i}";
                            string ROUTE_KEY = "";
                            var body = Encoding.UTF8.GetBytes(msg);
                            //模拟向不同路由发送消息
                            if (i % 2 == 0)
                            {
                                ROUTE_KEY = "route1";
                            }
                            else
                            {
                                ROUTE_KEY = "route2";
                            }
                            //5.发布消息
                            channel.BasicPublish("directExchange", ROUTE_KEY, null, body);
                            Console.WriteLine($"向{ROUTE_KEY}发布消息成功:{msg}");
    
                            Thread.Sleep(1000);
                        }
                        Console.ReadKey();
                    }
                }
            }
    View Code

      消费者1:

    static void Main(string[] args)
            {
                Console.WriteLine("DirectClient接收客户端启动...");
                //1.创建连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var conn = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = conn.CreateModel())
                    {
                        //3.声明队列
                        var queue = channel.QueueDeclare().QueueName;
                        //4.绑定交换器
                        channel.QueueBind(queue, "directExchange", "route1");
                        //5.声明消费者 消费消息
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                          {
                              //接收消息
                              var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                              Console.WriteLine($"接收route1消息:{body.ToString()}");
                          };
                        channel.BasicConsume(queue, true, consumer);
    
                        Console.ReadKey();
                    }
                }
            }
    View Code

      

      消费者2:

    static void Main(string[] args)
            {
                Console.WriteLine("DirectClient接收客户端启动...");
                //1.创建连接工厂
                var factory = new ConnectionFactory()
                {
                    HostName = "127.0.0.1",
                    UserName = "guest",
                    Password = "guest"
                };
                //2.创建连接
                using (var conn = factory.CreateConnection())
                {
                    //3.创建通道
                    using (var channel = conn.CreateModel())
                    {
                        //3.声明队列
                        var queue = channel.QueueDeclare().QueueName;
                        //4.绑定交换器
                        channel.QueueBind(queue, "directExchange", "route2");
                        //5.声明消费者 消费消息
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, e) =>
                        {
                            byte[] message = e.Body.ToArray();
                            Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                                //返回消息确认
                                channel.BasicAck(e.DeliveryTag, false);
                        };
                        //消费者开始监听
                        channel.BasicConsume(queue, true, consumer);
    
                        Console.ReadKey();
                    }
                }
            }
    View Code

      可以看到生产者发布消息时指定了路由rout1/rout2,随后消息会转发到route1/route2路由上。接收时通过将一个队列与交换器绑定,指定路由route1/route2,这样就能接收到route1/route2路由上的消息。

    效果

      消费者定义的随机队列

                                                    

      向route1、route2发布消息,两个消费者分别接收route1和route2的消息

    效果

      路由模式中生产者发布消息时指定路由,向指定路由发送,消费者绑定交换器与路由即可接收到生产者向此路由发布的消息;

      只有将消费者发送消息的交换器、路由 与生产者指定的交换器、路由一致,消费者才能接收到生产者向指定路由的消费者发送的消息。

      注意:声明路由时类型必须为direct

    附上Demo地址:https://github.com/1164887865/RabbitMQDemo

  • 相关阅读:
    _STORAGE_WRITE_ERROR_:./Application/Runtime/Cache/Home/f8995a0e1afcdadc637612fae5a3b585.php
    git 报错:没有权限 remote: error: unable to unlink old 'README.md' (Permission denied)
    深度学习入门实战(二)-用TensorFlow训练线性回归
    一条SQL搞定信息增益的计算
    Vue.js动画在项目使用的两个示例
    腾讯云安全:开发者必看|Android 8.0 新特性及开发指南
    腾讯云上Selenium用法示例
    一个只有99行代码的JS流程框架
    腾讯云上PhantomJS用法示例
    前端开发框架简介:angular和react
  • 原文地址:https://www.cnblogs.com/zousc/p/12739504.html
Copyright © 2011-2022 走看看