zoukankan      html  css  js  c++  java
  • RabbitMQ学习订阅模式(以C#代码为例)

    订阅模式(Publish/Subscribe)

    一次向许多消费者发送消息,一个生产者发送的消息会被多个消费者获取,也就是将消息将广播到所有的消费者中。

    应用场景: 更新商品库存后需要通知多个缓存和多个数据库,这里的结构应该是:

    • 一个fanout类型交换机扇出两个个消息队列,分别为缓存消息队列、数据库消息队列
    • 一个缓存消息队列对应着多个缓存消费者
    • 一个数据库消息队列对应着多个数据库消费者

    在订阅模型中,多了一个Exchange角色,而且过程略有变化:

       Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交所有队列、或是将消息丢弃。如何操作,取决于               Exchange的类型。Exchange有常见的以下三种类型

            Fanout:广播,将消息交给所有绑定到交换机的队列,订阅模式使用此类型

            Direct:定向,将消息交给符合指定routing key的队列 路由模式使用此类型

            Topic:通配符,把消息交给符合routing pattern的队列 主题模式使用此类型

    Exchange只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失

    Demo 模拟百度和新浪订阅气象局发布的天气信息 

    1.RabbitMQ.Common类库代码

      (1)RabbitConstant.cs代码

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 
     5 namespace RabbitMQ.Common
     6 {
     7     public class RabbitConstant
     8     {
     9         public const string QUEUE_HELLO_WORLD = "helloworld.queue";
    10         public const string QUEUE_SMS = "sms.queue";
    11         public const string EXCHANGE_WEATHER = "weather.exchange";
    12         public const string QUEUE_BAIDU = "baidu.queue";
    13         public const string QUEUE_SINA= "sina.queue";
    14         public const string EXCHANGE_WEATHER_ROUTING = "weather.routing.exchange";
    15         public const string EXCHANGE_WEATHER_TOPIC = "weather.topic.exchange";
    16     }
    17 }

     (2)RabbitUtils.cs代码

     1 using System;
     2 using RabbitMQ.Client;
     3 
     4 namespace RabbitMQ.Common
     5 {
     6     public class RabbitUtils
     7     {
     8         public static ConnectionFactory GetConnection()
     9         {
    10             var factory = new ConnectionFactory();
    11             factory.HostName = "127.0.0.1";
    12             factory.Port = 5672;//是服务端的端口号,与页面的端口号15672区分开
    13             factory.UserName = "guest";
    14             factory.Password = "guest";
    15             //factory.VirtualHost = "/";
    16             return factory;
    17         }
    18     }
    19 }

    2. RabbitMQ.Producer控制台项目代码

    (1)WeatherFanout.cs代码

     1 using System;
     2 using System.Text;
     3 using RabbitMQ.Client;
     4 using RabbitMQ.Common;
     5 
     6 namespace RabbitMQ.Producer.Producer
     7 {
     8     public class WeatherFanout
     9     {
    10         public static void SendWeatherInfo()
    11         {
    12             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    13             {
    14                 using (var channel = connection.CreateModel())
    15                 {
    16                         string message = "零下10度";
    17                         var body = Encoding.UTF8.GetBytes(message);
    18               
    19                         channel.BasicPublish(exchange: RabbitConstant.EXCHANGE_WEATHER,
    20                                              routingKey: "",
    21                                              basicProperties: null,
    22                                              body: body);
    23                     Console.WriteLine("天气信息发送成功");
    24                     Console.WriteLine("Press [Enter] to exit");
    25                     Console.ReadLine();
    26                 }
    27             }
    28         }
    29     }
    30 }

    (2)Program.cs代码

     1 using RabbitMQ.Producer.Producer;
     2 using System;
     3 
     4 namespace RabbitMQ.Producer
     5 {
     6     class Program
     7     {
     8         static void Main(string[] args)
     9         {
    10             WeatherFanout.SendWeatherInfo();
    11         }
    12     }
    13 }

    3.RabbitMQ.Consumer01控制台项目代码

    (1)WeatherFanout.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Common;
     5 using RabbitMQ.Client;
     6 using RabbitMQ.Client.Events;
     7 
     8 namespace RabbitMQ.Consumer01.Consumer
     9 {
    10     public  class WeatherFanout
    11     {
    12         public static void ReceiveWeatherInfo()
    13         {
    14             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    15             {
    16                 using (var channel = connection.CreateModel())
    17                 {
    18                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
    19                     channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
    20                     /*
    21                      * QueueBind用于将队列与交换机绑定
    22                      * queue:队列名
    23                      * exchange:交换机名
    24                      * routingKey:路由key
    25                      */
    26                     channel.QueueBind(queue: RabbitConstant.QUEUE_BAIDU,
    27                                       exchange:RabbitConstant.EXCHANGE_WEATHER,
    28                                       routingKey:"");
    29                     channel.BasicQos(0, 1, false);
    30                     var consumer = new EventingBasicConsumer(channel);
    31 
    32                     consumer.Received += (model, ea) =>
    33                     {
    34                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    35                         Console.WriteLine($"百度收到的气象信息:{message}");
    36                         channel.BasicAck(ea.DeliveryTag, false);
    37                     };
    38                     channel.BasicConsume(queue: RabbitConstant.QUEUE_BAIDU,
    39                                  autoAck: false,
    40                                  consumer: consumer);
    41                     Console.WriteLine(" Press [enter] to exit.");
    42                     Console.ReadLine();
    43                 }
    44             }
    45         }
    46     }
    47 }

    (2)Program.cs代码

     1 using RabbitMQ.Consumer01.Consumer;
     2 using System;
     3 
     4 namespace RabbitMQ.Consumer01
     5 {
     6     class Program
     7     {
     8         static void Main(string[] args)
     9         {
    10             WeatherFanout.ReceiveWeatherInfo();
    11         }
    12     }
    13 }

    4.RabbitMQ.Consumer02控制台项目代码

    (1)WeatherFanout.cs

     1 using System;
     2 using System.Collections.Generic;
     3 using System.Text;
     4 using RabbitMQ.Common;
     5 using RabbitMQ.Client;
     6 using RabbitMQ.Client.Events;
     7 
     8 namespace RabbitMQ.Consumer02.Consumer
     9 {
    10     public class WeatherFanout
    11     {
    12         public static void ReceiveWeatherInfo()
    13         {
    14             using (var connection = RabbitUtils.GetConnection().CreateConnection())
    15             {
    16                 using (var channel = connection.CreateModel())
    17                 {
    18                     channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
    19                     channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
    20                     /*
    21                      * QueueBind用于将队列与交换机绑定
    22                      * queue:队列名
    23                      * exchange:交换机名
    24                      * routingKey:路由key
    25                      */
    26                     channel.QueueBind(queue: RabbitConstant.QUEUE_SINA,
    27                                       exchange: RabbitConstant.EXCHANGE_WEATHER,
    28                                       routingKey: "");
    29                     channel.BasicQos(0, 1, false);
    30                     var consumer = new EventingBasicConsumer(channel);
    31 
    32                     consumer.Received += (model, ea) =>
    33                     {
    34                         var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    35                         Console.WriteLine($"新浪收到的气象信息:{message}");
    36                         channel.BasicAck(ea.DeliveryTag, false);
    37                     };
    38                     channel.BasicConsume(queue: RabbitConstant.QUEUE_SINA,
    39                                  autoAck: false,
    40                                  consumer: consumer);
    41                     Console.WriteLine(" Press [enter] to exit.");
    42                     Console.ReadLine();
    43                 }
    44             }
    45         }
    46     }
    47 }

    (2)Program.cs代码

    using RabbitMQ.Consumer02.Consumer;
    using System;
    
    namespace RabbitMQ.Consumer02
    {
        class Program
        {
            static void Main(string[] args)
            {
                WeatherFanout.ReceiveWeatherInfo();
            }
        }
    }

    5.执行,使用powershell运行 

     

     

    参考连接:https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg 

                      https://www.bilibili.com/video/BV1GU4y1w7Yq?p=8

  • 相关阅读:
    HDU 2655 主席树
    Codeforces Round #169 (Div. 2) A水 B C区间更新 D 思路
    Codeforces Round #402 (Div. 2) A B C sort D二分 (水)
    Docker 网络 Flannel
    Docker 搭建 etcd 集群及管理
    Iptables 端口转发
    CentOS7 citus9.5 集群安装及管理
    Ubuntu 忘记密码
    Zookeeper 启动错误
    数据结构 B树 B+树 B*树 LSM-树
  • 原文地址:https://www.cnblogs.com/hobelee/p/15750840.html
Copyright © 2011-2022 走看看