zoukankan      html  css  js  c++  java
  • C#利用RabbitMQ实现点对点消息传输

    RabbitMQ做为消息代理,负责接收和转发消息,可以将RabbitMQ比喻为一个邮筒、一个邮局和一个邮递员。本文主要以一个简单的小例子,简述RabbitMQ实现消息传输的相关内容,仅供学习分享使用,如有不足之处,还请指正。

    消息队列模型

    所有 MQ 产品从模型抽象上来说都是一样的过程:
    消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

    RabbitMQ设置

    RabbitMQ是通过交换机将消息转发到对应队列,所以队列需要和交换机进行绑定。本例将队列绑定到默认的amq.direct交换机,并设置Routing key,如下图所示:

    RabbitMQ动态库安装

    通过NuGet包管理器进行安装RabbitMQ.Client,如下所示:

    RabbitMQ.Client相关知识点

    • ConnectionFactory:构造一个实例,主要创建连接。
    • IConnection:表示一个基于AMQP协议的连接。
    • IModel:表示一个RabbitMQ通道,可用于声明一个队列,然后开始消费。
    • EventingBasicConsumer:基于独立事件监听的基础消费者,可以监听并接收消息。
    • 生产者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 开始生产并发布消息
    • 消费者基本步骤:1. 创建连接 2. 基于连接创建通道 3. 基于通道声明队列,4. 创建消费者,5. 绑定通道和消费者,并开始消费

    示例效果图

    本例主要有一个生产者,一个消费者,通过消息队列进行消息转发和接收。

    生产者负责消息发送,如下图所示:

    消费者负责消息接收,如下图所示:

    核心代码

    代码结构:主要包括生产者,消费者,公共基础代码,如下所示:

    RabbitMqHelper主要创建连接,如下所示:

     1     public class RabbitMqHelper
     2     {
     3         
     4         /// <summary>
     5         /// 创建连接
     6         /// </summary>
     7         /// <returns></returns>
     8         public IConnection GetConnection()
     9         {
    10             try
    11             {
    12                 var factory = new ConnectionFactory()
    13                 {
    14                     HostName = "127.0.0.1",
    15                     Port = 5672,
    16                     UserName = "guest",
    17                     Password = "guest",
    18                     VirtualHost = "/ShortMsgHost"
    19                 };
    20                 var conn = factory.CreateConnection();
    21                 return conn;
    22             }
    23             catch (Exception ex) {
    24                 throw ex;
    25             }
    26         }
    27 
    28    
    29 
    30     }

    RabbmitMqSendHelper用于发送消息,如下所示:

     1     public class RabbmitMqSendHelper : RabbitMqHelper
     2     {
     3         /// <summary>
     4         /// 发送消息
     5         /// </summary>
     6         /// <param name="msg"></param>
     7         /// <returns></returns>
     8         public bool SendMsg(string msg)
     9         {
    10             try
    11             {
    12                 using (var conn = GetConnection())
    13                 {
    14                     using (var channel = conn.CreateModel())
    15                     {
    16                         channel.QueueDeclare(queue: "ShortMsgQueue",
    17                                      durable: true,
    18                                      exclusive: false,
    19                                      autoDelete: false,
    20                                      arguments: null);
    21                         var body = Encoding.UTF8.GetBytes(msg);
    22 
    23                         channel.BasicPublish(exchange: "amq.direct",
    24                                              routingKey: "ShortMsgKey",
    25                                              basicProperties: null,
    26                                              body: body);
    27 
    28                         //Console.WriteLine(" [x] Sent {0}", message);
    29                     };
    30                 };
    31                 return true;
    32             }
    33             catch (Exception ex)
    34             {
    35                 throw ex;
    36             }
    37         }
    38     }

    RabbitMqReceiveHelper主要用于接收信息,如下所示:

     1     public class RabbitMqReceiveHelper : RabbitMqHelper
     2     {
     3         public RabbitMqReceiveEventHandler OnReceiveEvent;
     4 
     5         private IConnection conn;
     6 
     7         private IModel channel;
     8 
     9         private EventingBasicConsumer consumer;
    10 
    11         public bool StartReceiveMsg()
    12         {
    13             try
    14             {
    15                 conn = GetConnection();
    16 
    17                 channel = conn.CreateModel();
    18 
    19                 channel.QueueDeclare(queue: "ShortMsgQueue",
    20                                 durable: true,
    21                                 exclusive: false,
    22                                 autoDelete: false,
    23                                 arguments: null);
    24 
    25                 consumer = new EventingBasicConsumer(channel);
    26                 consumer.Received += (model, ea) =>
    27                 {
    28                     var body = ea.Body.ToArray();
    29                     var message = Encoding.UTF8.GetString(body);
    30                     //Console.WriteLine(" [x] Received {0}", message);
    31                     if (OnReceiveEvent != null)
    32                     {
    33                         OnReceiveEvent(message);
    34                     }
    35                 };
    36                 channel.BasicConsume(queue: "ShortMsgQueue",
    37                                         autoAck: true,
    38                                         consumer: consumer);
    39                 return true;
    40             }
    41             catch (Exception ex)
    42             {
    43                 throw ex;
    44             }
    45         }
    46     }

    关于RabbitMQ的基础知识介绍,可参考前几篇博文。

    备注

    浣溪沙·堤上游人逐画船

    欧阳修 〔宋代〕

    堤上游人逐画船,拍堤春水四垂天。绿杨楼外出秋千。
    白发戴花君莫笑,六幺催拍盏频传。人生何处似尊前! 


    作者:Alan.hsiang
    出处:http://www.cnblogs.com/hsiang/
    本文版权归作者和博客园共有,写文不易,支持原创,欢迎转载【点赞】,转载请保留此段声明,且在文章页面明显位置给出原文连接,谢谢。

  • 相关阅读:
    IIS 设置IP地址和域名限制
    docker数据持久化
    用户远程登录空闲时间自动断开
    基于python的性能测试工具–locust
    Python代码发送post请求接口测试--转载
    loadrunner监控mysql服务性能
    jmeter for each,循环控制器 遍历结果
    jmeter 如何将上一个请求的结果作为下一个请求的参数——使用正则提取器
    在pycharm中链接MySql数据库并进行操作
    Python—pycharm连接数据库---自创
  • 原文地址:https://www.cnblogs.com/hsiang/p/14770632.html
Copyright © 2011-2022 走看看