zoukankan      html  css  js  c++  java
  • RabbitMQ学习路由模式(以C#代码为例)

    路由模式(Routing)

    有选择地(Routing key)接收消息,发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key,仅消费指定路由key的消息

    Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息,路由模式使用的Exchange类型为Direct类型

    应用场景: 如在商品库存中增加了1台iphone12,iphone12促销活动消费者指定routing key为iphone12,只有此促销活动会接收到消息,其它促销活动不关心也不会消费此routing key的消息

     Demo 架构与订阅模式一样

    生产者类WeatherDirect.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Client;
     5 using RabbitMQ.Common;
     6 
     7 namespace RabbitMQ.Producer.Producer
     8 {
     9     public class WeatherDirect
    10     {
    11         public static void SendWeather()
    12         {
    13             Dictionary<string, string> area = new Dictionary<string, string>();
    14             area.Add("china.hunan.changsha.20211231", "中国湖南长沙20211231天气数据");
    15             area.Add("china.hubei.wuhan.20211231", "中国湖北武汉20211231天气数据");
    16             area.Add("china.hubei.xiangyang.20211231", "中国湖北襄阳20211231天气数据");
    17             area.Add("us.cal.lsj.20211231", "美国加州洛杉矶20211231天气数据");
    18 
    19             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    20             {
    21                 using (var channel = connection.CreateModel())
    22                 {
    23                     foreach (var item in area)
    24                     {
    25                         channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key, null, Encoding.UTF8.GetBytes(item.Value));
    26                     }
    27                     Console.WriteLine("气象信息发送成功!");
    28                     Console.WriteLine("Press [Enter] to exit");
    29                     Console.ReadLine();
    30                 }
    31             }
    32         }
    33     }
    34 }

    消费者1 WeatherDirect.cs

     1 using RabbitMQ.Client;
     2 using RabbitMQ.Client.Events;
     3 using RabbitMQ.Common;
     4 using System;
     5 using System.Collections.Generic;
     6 using System.Text;
     7 
     8 namespace RabbitMQ.Consumer01.Consumer
     9 {
    10     public class WeatherDirect
    11     {
    12         public static void ReceiveWeatherInfo()
    13         {
    14             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    15             {
    16                 using (var channel = connection.CreateModel())
    17                 {
    18                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
    19                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
    20                     /*
    21                      * QueueBind用于将队列与交换机绑定
    22                      * queue:队列名
    23                      * exchange:交换机名
    24                      * routingKey:路由key
    25                      */
    26                     channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU,
    27                                       exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING,
    28                                       routingKey: "china.hunan.changsha.20211231");
    29                     channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU,
    30                                     exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING,
    31                                     routingKey: "china.hubei.wuhan.20211231");
    32                     channel.BasicQos(0, 1, false);
    33                     var consumer = new EventingBasicConsumer(channel);
    34 
    35                     consumer.Received += (model, ea) =>
    36                     {
    37                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    38                         Console.WriteLine($"百度收到的气象信息:{message}");
    39                         channel.BasicAck(ea.DeliveryTag, false);
    40                     };
    41                     channel.BasicConsume(queue: RabbitConstant.QUEUE_BAIDU,
    42                                  autoAck: false,
    43                                  consumer: consumer);
    44                     Console.WriteLine(" Press [enter] to exit.");
    45                     Console.ReadLine();
    46                 }
    47             }
    48         }
    49     }
    50 }

    消费者2  WeatherDirect.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Client;
     5 using RabbitMQ.Client.Events;
     6 using RabbitMQ.Common;
     7 
     8 namespace RabbitMQ.Consumer02.Consumer
     9 {
    10     public class WeatherDirect
    11     {
    12         public static void ReceiveWeatherInfo()
    13         {
    14             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    15             {
    16                 using (var channel = connection.CreateModel())
    17                 {
    18                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
    19                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
    20                     /*
    21                      * QueueBind用于将队列与交换机绑定
    22                      * queue:队列名
    23                      * exchange:交换机名
    24                      * routingKey:路由key
    25                      */
    26                     channel.QueueBind(queue: RabbitConstant.QUEUE_SINA,
    27                                       exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING,
    28                                       routingKey: "china.hubei.xiangyang.20211231");
    29                     channel.QueueBind(queue: RabbitConstant.QUEUE_SINA,
    30                                     exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING,
    31                                     routingKey: "china.hubei.wuhan.20211231");
    32                     channel.QueueBind(queue: RabbitConstant.QUEUE_SINA,
    33                                     exchange: RabbitConstant.EXCHANGE_WEATHER_ROUTING,
    34                                     routingKey: "us.cal.lsj.20211231");
    35                     channel.BasicQos(0, 1, false);
    36                     var consumer = new EventingBasicConsumer(channel);
    37 
    38                     consumer.Received += (model, ea) =>
    39                     {
    40                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    41                         Console.WriteLine($"新浪收到的气象信息:{message}");
    42                         channel.BasicAck(ea.DeliveryTag, false);
    43                     };
    44                     channel.BasicConsume(queue: RabbitConstant.QUEUE_SINA,
    45                                  autoAck: false,
    46                                  consumer: consumer);
    47                     Console.WriteLine(" Press [enter] to exit.");
    48                     Console.ReadLine();
    49                 }
    50             }
    51         }
    52     }
    53 }

    执行结果

     备注:若执行报错:'PRECONDITION_FAILED - inequivalent arg 'type' for exchange 

    原因:一旦创建了exchange, RabbitMQ是不允许对其改变的,不然会报错

    解决方法:在web上Exchanges模块把之前的exchange删除。

     

    参考连接:https://www.bilibili.com/video/BV1GU4y1w7Yq?p=8

                      https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg

                     https://blog.csdn.net/adviceuser2014/article/details/102259145

  • 相关阅读:
    Linux修改主机名称
    Druid监控SQL语句
    CentOS7.5搭建Hadoop分布式集群
    CentOS7.5 解决ifconfig报错
    windows 用VMware创建linux虚拟机,安装操作系统CentOS7.2
    MySQL报错this is incompatible with sql_mode=only_full_group_by
    CentOS配置Redis环境变量
    CentOS7.4搭建GitLab
    修改服务器路由策略
    Centos7 安装python3
  • 原文地址:https://www.cnblogs.com/hobelee/p/15755128.html
Copyright © 2011-2022 走看看