zoukankan      html  css  js  c++  java
  • RabbitMQ学习通配符模式(以C#代码为例)

    通配符模式(Topics)

     

    根据通配符(Topics)来接收消息,将路由key和某模式进行匹配,此时队列需要绑定在一个模式上,#匹配一个词或多个词,*只匹配一个词。

    使用的Exchange类型为Topic

    Topic类型的Exchange:

    (1)Topic类型与Direct相比,都是可以根据RoutingKey将消息路由到不同的队列,只不过Topic类型可以让队列在绑定路由时使用通配符

    (2)Topic类型的RoutingKey一般是由一个或多个单词构成,多个单词之间用“.”分割,如item.insert

    (3)Topic类型通配符规则:#匹配一个或多个词,*恰好匹配一个词,例如item.#,可以匹配item.insert或item.insert.user,item.*只能匹配

             item.insert或item.user

    Demo 

    生产者类WeatherTopic.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Client;
     5 using RabbitMQ.Client.Events;
     6 using RabbitMQ.Common;
     7 
     8 namespace RabbitMQ.Producer.Producer
     9 {
    10     public class WeatherTopic
    11     {
    12         public static void SendWeather()
    13         {
    14             Dictionary<string, string> area = new Dictionary<string, string>();
    15             area.Add("china.hunan.changsha.20211231", "中国湖南长沙20211231天气数据");
    16             area.Add("china.hubei.wuhan.20211231", "中国湖北武汉20211231天气数据");
    17             area.Add("china.hubei.xiangyang.20211231", "中国湖北襄阳20211231天气数据");
    18             area.Add("us.cal.lsj.20211231", "美国加州洛杉矶20211231天气数据");
    19 
    20             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    21             {
    22                 using (var channel = connection.CreateModel())
    23                 {
    24                     foreach (var item in area)
    25                     {
    26                         channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key, null, Encoding.UTF8.GetBytes(item.Value));
    27                     }
    28                     Console.WriteLine("气象信息发送成功!");
    29                     Console.WriteLine("Press [Enter] to exit");
    30                     Console.ReadLine();
    31                 }
    32             }
    33         }
    34     }
    35 }

    消费者01 WeatherTopic.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Client;
     5 using RabbitMQ.Client.Events;
     6 using RabbitMQ.Common;
     7 
     8 namespace RabbitMQ.Consumer01.Consumer
     9 {
    10     public class WeatherTopic
    11     {
    12         public static void ReceiveWeatherInfo()
    13         {
    14             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    15             {
    16                 using (var channel = connection.CreateModel())
    17                 {
    18                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
    19                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
    20                     /*
    21                      * QueueBind用于将队列与交换机绑定
    22                      * queue:队列名
    23                      * exchange:交换机名
    24                      * routingKey:路由key
    25                      */
    26                     channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU,
    27                                       exchange: RabbitConstant.EXCHANGE_WEATHER_TOPIC,
    28                                       routingKey: "china.hubei.*.20211231");
    29                     channel.BasicQos(0, 1, false);
    30                     var consumer = new EventingBasicConsumer(channel);
    31 
    32                     consumer.Received += (model, ea) =>
    33                     {
    34                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    35                         Console.WriteLine($"百度收到的气象信息:{message}");
    36                         channel.BasicAck(ea.DeliveryTag, false);
    37                     };
    38                     channel.BasicConsume(queue: RabbitConstant.QUEUE_BAIDU,
    39                                  autoAck: false,
    40                                  consumer: consumer);
    41                     Console.WriteLine(" Press [enter] to exit.");
    42                     Console.ReadLine();
    43                 }
    44             }
    45         }
    46     }
    47 }

    消费者02 WeatherTopic.cs代码

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Client;
     5 using RabbitMQ.Client.Events;
     6 using RabbitMQ.Common;
     7 
     8 namespace RabbitMQ.Consumer02.Consumer
     9 {
    10     public class WeatherTopic
    11     {
    12         public static void ReceiveWeatherInfo()
    13         {
    14             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    15             {
    16                 using (var channel = connection.CreateModel())
    17                 {
    18                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
    19                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
    20                     /*
    21                      * QueueBind用于将队列与交换机绑定
    22                      * queue:队列名
    23                      * exchange:交换机名
    24                      * routingKey:路由key
    25                      */
    26                     channel.QueueBind(queue: RabbitConstant.QUEUE_SINA,
    27                                       exchange: RabbitConstant.EXCHANGE_WEATHER_TOPIC,
    28                                       routingKey: "china.#");
    29                     channel.BasicQos(0, 1, false);
    30                     var consumer = new EventingBasicConsumer(channel);
    31 
    32                     consumer.Received += (model, ea) =>
    33                     {
    34                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    35                         Console.WriteLine($"新浪收到的气象信息:{message}");
    36                         channel.BasicAck(ea.DeliveryTag, false);
    37                     };
    38                     channel.BasicConsume(queue: RabbitConstant.QUEUE_SINA,
    39                                  autoAck: false,
    40                                  consumer: consumer);
    41                     Console.WriteLine(" Press [enter] to exit.");
    42                     Console.ReadLine();
    43                 }
    44             }
    45         }
    46 
    47     }
    48 }

    执行结果

      

    运行前配置:删除原来的Exchange,具体操作如下

     

      参考链接:https://www.bilibili.com/video/BV1GU4y1w7Yq?p=8

                        https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg

  • 相关阅读:
    RabbitMQ 消息可靠性
    SpringBoot2.X+SpringAMQP 整合 RabbitMQ
    《红宝书》 |Array数组介绍及用法
    《红宝书》 |单例内置对象 |Global和Math
    js封装 |随机获取指定范围内的整数
    《红宝书》 |原始包装类型
    《红宝书》 |什么是对象
    js封装 |时间对象相关方法
    兼容 |ios移动端的时间对象
    《红宝书》 |基本引用类型-正则表达式RegExp
  • 原文地址:https://www.cnblogs.com/hobelee/p/15755208.html
Copyright © 2011-2022 走看看