zoukankan      html  css  js  c++  java
  • [.NET] RabbitMQ 的行为艺术

    RabbitMQ 的行为艺术

      好像,今天已经是 2 月 28 号了。

      听说,29、30、31 号放假。

      据说,有图,有真相。

    目录

    • 简介

    • 环境搭建

    • 示例一:简单的 Hello World

    • 示例二:发布/订阅模式

    • 尝试发现 - 新物种 EasyNetQ

    简介

      RabbitMQ:一个消息系统,基于 AMQP 系统协议,由 Erlang 语言开发。

      优点:健壮、使用简单、开源和支持各种流行的语言(如 Python、java、.NET)等。

      MQ(Message Queue):消息队列的简称,是一种应用程序之间的通信机制。

      作用:将部分无需立即回调获取结果,并且耗时的操作,使用异步处理的方式提高服务器的吞吐量及性能。如:日志记录。

    图:简单的通信方式,及加入 MQ 后的变化

     
         A 端:生产者将消息写(插)入队列;
         MQ(队列) :中间件,消息的载体;
         B 端:消费者从队列读(取)出消息。
     
      MQ 特点:消费者 - 生产者模型的一种表现形式。
     

    环境搭建

      1.官网下载安装包:http://www.rabbitmq.com/ ;

      2.安装时会提示你下载 Erlang 语言环境;

      3.启动安装完的服务:RabbitMQ;

      4.在 cmd 中指向 sbin 目录,并输入以下命令,才能打开 WEB 管理界面:

    rabbitmq-plugins enable rabbitmq_management

     

      5.默认 url:http://localhost:15672/#/

    示例一:简单的 Hello World

      P(Producer):生产者,意味着发送;

      Queue:队列,本质上是一个无限的缓冲区,可以储存尽可能多的信息;

      C(Consumer):消费者,等待并接收消息。

      【备注】生产者和消费者不需要驻留在同一台服务器上。

      Producer.cs

     1     public class Producer  
     2     {
     3         public static void Send()
     4         {
     5             var factory = new ConnectionFactory { HostName = "localhost" };
     6 
     7             //创建连接对象,基于 Socket
     8             using (var connection = factory.CreateConnection())
     9             {
    10                 //创建新的渠道、会话
    11                 using (var channel = connection.CreateModel())
    12                 {
    13                     //声明队列
    14                     channel.QueueDeclare(queue: "hello",    //队列名
    15                         durable: false,     //持久性
    16                         exclusive: false,   //排他性
    17                         autoDelete: false,  //自动删除
    18                         arguments: null);
    19 
    20                     const string message = "Hello World!";
    21                     var body = Encoding.UTF8.GetBytes(message);
    22 
    23                     channel.BasicPublish(exchange: "",  //交换机名
    24                         routingKey: "hello",    //路由键
    25                         basicProperties: null,
    26                         body: body);
    27                 }
    28             }
    29         }
    30     }

      【备注】队列名如果已存在,将不会重复创建。假设队列已存在,修改 channel.QueueDeclare() 方法内的参数后启动会出现异常。

      【备注】消息内容是一个字节数组。

     
      Consumer.cs
     1     class Consumer
     2     {
     3         public static void Receive()
     4         {
     5             var factory = new ConnectionFactory() { HostName = "localhost" };
     6 
     7             using (var connection = factory.CreateConnection())
     8             {
     9                 using (var channel = connection.CreateModel())
    10                 {
    11                     channel.QueueDeclare(queue: "hello",
    12                                          durable: false,
    13                                          exclusive: false,
    14                                          autoDelete: false,
    15                                          arguments: null);
    16 
    17                     //创建基于该队列的消费者,绑定事件
    18                     var consumer = new EventingBasicConsumer(channel);
    19                     consumer.Received += (model, ea) =>
    20                     {
    21                         var body = ea.Body;     //消息主体
    22                         var message = Encoding.UTF8.GetString(body);
    23                         Console.WriteLine(" [x] Received {0}", message);
    24                     };
    25 
    26                     //启动消费者
    27                     channel.BasicConsume(queue: "hello",    //队列名
    28                                          noAck: true,   //false:手动应答;true:自动应答
    29                                          consumer: consumer);
    30 
    31                     Console.Read();
    32                 }
    33             }
    34         }
    35     }

      【疑问】在消费者的类里面为什么会再次声明队列(channel.QueueDeclare())呢?-- 因为接收方可能会在发送方启动前启动,这是出于保险起见。

    示例二:发布/订阅模式

     
     

      1.Exchange 交换机和 Exchange Type 交换类型  

      RabbitMQ 消息传递模型的核心思想是,生产者不会直接将消息发给队列。

      这里我们将引入新的名词 Exchange(交换机)。交换机传递消息的类型也有很多种:direct, topic, headers(不常用) 和 fanout,我们称之为交换类型。

    图:Direct

     

    图:Fanout 

     

    图:Topic

     --上述 3 张图来源:http://m.blog.csdn.net/article/details?id=52262850

      

      这里,创建一个名为 “logs” 的交换机,它的类型为广播类型(fanout:可以将收到的所有消息,广播给所有已知的队列)。

    channel.ExchangeDeclare(exchange: "logs",   //交换机名
                            type: "fanout");    //交换类型

      

      2.临时队列

      作为消费者,我们有时候只需要一些新的(或者空的)队列,此时,更好的方式就是让它自动生成一个随机名字的队列;其次,当队列连接中断时会选择自动删除对应的消费者。

      创建一个非持久,有排他性和自动删除特性的队列(无参时)。

    var queueName = channel.QueueDeclare().QueueName;

      3.Binding 绑定

      【疑问】有了 Exchange 和 channel,这时,还需要什么东西呢?-- 我们要创建 Exchange 和 channel 关系的桥梁,这个桥梁称之为 Binding(绑定)。

    channel.QueueBind(queue: queueName,
                      exchange: "logs",
                      routingKey: "");
     
     1     class Producer
     2     {
     3         public static void Send()
     4         {
     5             var factory = new ConnectionFactory()
     6             {
     7                 HostName = "localhost",
     8                 Port = 5672,
     9                 UserName = "guest",
    10                 Password = "guest"
    11             };
    12 
    13             using (var connection = factory.CreateConnection())
    14             {
    15                 using (var channel = connection.CreateModel())
    16                 {
    17                     channel.ExchangeDeclare(exchange: "logs",   //交换机名
    18                         type: "fanout");    //交换类型
    19 
    20                     // Guid
    21                     var message = Guid.NewGuid().ToString();
    22                     var body = Encoding.UTF8.GetBytes(message);
    23                     channel.BasicPublish(exchange: "logs",
    24                                          routingKey: "",
    25                                          basicProperties: null,
    26                                          body: body);
    27 
    28                     Console.WriteLine(" [x] Sent {0}", message);
    29                 }
    30 
    31                 Console.WriteLine(" Press [enter] to exit.");
    32                 Console.ReadLine();
    33             }
    34         }
    35     }
    Producer.cs //生产者
     1     class Reciver
     2     {
     3         public static void Recive()
     4         {
     5             var factory = new ConnectionFactory()
     6             {
     7                 HostName = "localhost",
     8                 Port = 5672,
     9                 UserName = "guest",
    10                 Password = "guest"
    11             };
    12 
    13             using (var connection = factory.CreateConnection())
    14             using (var channel = connection.CreateModel())
    15             {
    16                 channel.ExchangeDeclare(exchange: "wen_logs",   //交换机名
    17                     type: "fanout");    //交换类型
    18 
    19                 //创建队列
    20                 var queueName = channel.QueueDeclare().QueueName;
    21                 channel.QueueBind(queue: queueName,
    22                                   exchange: "wen_logs",
    23                                   routingKey: "");
    24 
    25                 Console.WriteLine(" [*] Waiting for logs.");
    26 
    27                 var consumer = new EventingBasicConsumer(channel);
    28                 consumer.Received += (model, ea) =>
    29                 {
    30                     var body = ea.Body;
    31                     var message = Encoding.UTF8.GetString(body);
    32                     Console.WriteLine(" [x] {0}", message);
    33                 };
    34                 channel.BasicConsume(queue: queueName,
    35                                      noAck: true,
    36                                      consumer: consumer);
    37 
    38                 Console.WriteLine(" Press [enter] to exit.");
    39                 Console.ReadLine();
    40             }
    41         }
    42     }
    Reciver.cs //接收者

    尝试发现 - 新物种 EasyNetQ

      这都不是事!EasyNetQ,看名字就知道,搞定 MQ,So easy!

      连接 RabbitMQ 代理:

    var bus = RabbitHutch.CreateBus("host=localhost");

      发布:

    bus.Publish(message);

      订阅:

    bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

      下面我们通过 Demo 来感受一下 Easy 的程度吧,创建项目(效果图如下,附 Demo 下载):

      Wen.EasyNetQDemo.Model:类库

      Wen.EasyNetQDemo.Publisher,Wen.EasyNetQDemo.Subscriber:控制台应用程序,都使用 Nuget 直接安装 EasyNetQ 包,都引用类库 Model。

     
      Demo.cs
        public class Demo
        {
            public string Message { get; set; }
        }

      Publisher

     1 using System;
     2 using EasyNetQ;
     3 using Wen.EasyNetQDemo.Model;
     4 
     5 namespace Wen.EasyNetQDemo.Publisher
     6 {
     7     internal class Program
     8     {
     9         private static void Main(string[] args)
    10         {
    11             using (var bus = RabbitHutch.CreateBus("host=localhost"))
    12             {
    13                 string input;
    14                 Console.WriteLine("请输入信息。 如果是“esc” 将退出当前窗口。");
    15 
    16                 while ((input = Console.ReadLine()) != "esc")
    17                 {
    18                     bus.Publish(new Demo
    19                     {
    20                         Message = input
    21                     });
    22                 }
    23 
    24             }
    25         }
    26     }
    27 }

      【备注】RabbitHutch.CreateBus() 方法可以创建一个简单的发布/订阅和包含请求/响应 API 的消息总线。

      Subscriber

     1 using System;
     2 using EasyNetQ;
     3 using Wen.EasyNetQDemo.Model;
     4 
     5 namespace Wen.EasyNetQDemo.Subscriber
     6 {
     7     internal class Program
     8     {
     9         private static void Main(string[] args)
    10         {
    11             using (var bus = RabbitHutch.CreateBus("host=localhost"))
    12             {
    13                 bus.Subscribe<Demo>("test", HandleDemo);
    14 
    15                 Console.WriteLine("监听信息中...输入“return”将退出当前窗口!");
    16                 Console.ReadLine();
    17             }
    18         }
    19 
    20         private static void HandleDemo(Demo demo)
    21         {
    22             Console.ForegroundColor = ConsoleColor.Green;
    23             Console.WriteLine($"Got message: {demo.Message}");
    24             Console.ResetColor();
    25         }
    26     }
    27 }

     图:效果图

    「世事洞明皆学问 人情练达即文章」

    【博主】反骨仔
  • 相关阅读:
    JQuery validate.js 在ajax提交form时如何触发
    Ajax回调函数返回的中文字符串乱码问题
    对HTML5校验 自定义验证信息
    Mybatis + Mysql 插入数据时中文乱码问题
    javac 导入第三方jar包
    如何在VISIO 2010/2013 中关闭Shape protection(图形保护)
    关于 XMLHttpRequest对象的onreadyStateChange方法
    Centos7通过Docker安装Sentry(哨兵)
    Entity framework 预热
    Xamarin.Android Binding篇
  • 原文地址:https://www.cnblogs.com/liqingwen/p/6412089.html
Copyright © 2011-2022 走看看