zoukankan      html  css  js  c++  java
  • ASP.NET Core2利用MassTransit集成RabbitMQ

    在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。

    MassTransit

    先看看MassTransit是个什么宝贝(MassTransit官网的简介):

    MassTransit是一个免费的开源轻量级消息总线,用于使用.NET框架创建分布式应用程序。MassTransit在现有的顶级消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。

    通俗描述:

    MassTransit就是一套基于消息服务的高级封装类库,下游可联接RabbitMQ、Redis、MongoDb等服务。

    github官网:https://github.com/MassTransit/MassTransit

    RabbitMQ

    RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:

    官网:http://www.rabbitmq.com

    下载与安装:http://www.rabbitmq.com/download.html

    实现代码

    通过上面的介绍,咱们已对MassTransit与RabbitMQ有了初步了解,那么现在来看看如何在ASP.NET Core上优雅的使用RabbitMQ吧。

    1、创建一个名为“RabbitMQHelp.cs”公共类,用于封装操作RabbitMQ的公共方法,并通过Nuget来管理并引用“MassTransit”与“MassTransit.RabbitMQ”类库。

    2、“RabbitMQHelp.cs”公共类主要对外封装两个静态方法,其代码如下:

      1 using MassTransit;
      2 using MassTransit.RabbitMqTransport;
      3 using System;
      4 using System.Collections.Generic;
      5 using System.Text;
      6 using System.Threading.Tasks;
      7 
      8 namespace Lezhima.Comm
      9 {
     10     /// <summary>
     11     /// RabbitMQ公共操作类,基于MassTransit库
     12     /// </summary>
     13     public class RabbitMQHelp
     14     {
     15         #region 交换器
     16 
     17         /// <summary>
     18         /// 操作日志交换器
     19         /// 同时需在RabbitMQ的管理后台创建同名交换器
     20         /// </summary>
     21         public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
     22 
     23 
     24         #endregion
     25 
     26 
     27         #region 声明变量
     28 
     29         /// <summary>
     30         /// MQ联接地址,建议放到配置文件
     31         /// </summary>
     32         private static readonly string mqUrl = "rabbitmq://192.168.1.181/";
     33 
     34         /// <summary>
     35         /// MQ联接账号,建议放到配置文件
     36         /// </summary>
     37         private static readonly string mqUser = "admin";
     38 
     39         /// <summary>
     40         /// MQ联接密码,建议放到配置文件
     41         /// </summary>
     42         private static readonly string mqPwd = "admin";
     43 
     44         #endregion
     45 
     46         /// <summary>
     47         /// 创建连接对象
     48         /// 不对外公开
     49         /// </summary>
     50         private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
     51         {
     52             //通过MassTransit创建MQ联接工厂
     53             return Bus.Factory.CreateUsingRabbitMq(cfg =>
     54             {
     55                 var host = cfg.Host(new Uri(mqUrl), hst =>
     56                 {
     57                     hst.Username(mqUser);
     58                     hst.Password(mqPwd);
     59                 });
     60                 registrationAction?.Invoke(cfg, host);
     61             });
     62         }
     63 
     64 
     65         /// <summary>
     66         /// MQ生产者
     67         /// 这里使用fanout的交换类型
     68         /// </summary>
     69         /// <param name="obj"></param>
     70         public async static Task PushMessage(string exchange, object obj)
     71         {
     72             var bus = CreateBus();
     73             var sendToUri = new Uri($"{mqUrl}{exchange}");
     74             var endPoint = await bus.GetSendEndpoint(sendToUri);
     75             await endPoint.Send(obj);
     76         }
     77 
     78         /// <summary>
     79         /// MQ消费者
     80         /// 这里使用fanout的交换类型
     81         /// consumer必需是实现IConsumer接口的类实例
     82         /// </summary>
     83         /// <param name="obj"></param>
     84         public static void ReceiveMessage(string exchange, object consumer)
     85         {
     86             var bus = CreateBus((cfg, host) =>
     87             {
     88                 //从指定的消息队列获取消息 通过consumer来实现消息接收
     89                 cfg.ReceiveEndpoint(host, exchange, e =>
     90                 {
     91                     e.Instance(consumer);
     92                 });
     93             });
     94             bus.Start();
     95         }
     96     }
     97 }
     98 

    3、“RabbitMQHelp.cs”公共类已经有了MQ“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递JSON、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过IConsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:

      1 using MassTransit;
      2 using System;
      3 using System.Collections.Generic;
      4 using System.Text;
      5 using System.Threading.Tasks;
      6 
      7 namespace Lezhima.Storage.Consumer
      8 {
      9     /// <summary>
     10     /// 从MQ接收并处理数据
     11     /// 实现MassTransit的IConsumer接口
     12     /// </summary>
     13     public class LogConsumer : IConsumer<ActionLog>
     14     {
     15         /// <summary>
     16         /// 实现Consume方法
     17         /// 接收并处理数据
     18         /// </summary>
     19         /// <param name="context"></param>
     20         /// <returns></returns>
     21         public Task Consume(ConsumeContext<ActionLog> context)
     22         {
     23             return Task.Run(async () =>
     24             {
     25                 //获取接收到的对象
     26                 var amsg = context.Message;
     27                 Console.WriteLine($"Recevied By Consumer:{amsg}");
     28                 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
     29             });
     30         }
     31     }
     32 }
     33 

    调用代码

    1、生产者调用代码如下:

      1 /// <summary>
      2 /// 测试MQ生产者
      3 /// </summary>
      4 /// <returns></returns>
      5 [HttpGet]
      6 public async Task<MobiResult> AddMessageTest()
      7 {
      8     //声明一个实体对象
      9     var model = new ActionLog();
     10     model.ActionLogId = Guid.NewGuid();
     11     model.CreateTime = DateTime.Now;
     12     model.UpdateTime = DateTime.Now;
     13     //调用MQ
     14     await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
     15 
     16     return new MobiResult(1000, "操作成功");
     17 }

    2、消费者调用代码如下:

      1 using Lezhima.Storage.Consumer;
      2 using Microsoft.Extensions.Configuration;
      3 using System;
      4 using System.IO;
      5 
      6 namespace Lezhima.Storage
      7 {
      8     class Program
      9     {
     10         static void Main(string[] args)
     11         {
     12             var conf = new ConfigurationBuilder()
     13               .SetBasePath(Directory.GetCurrentDirectory())
     14               .AddJsonFile("appsettings.json", true, true)
     15               .Build();
     16 
     17             //调用接收者
     18             RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
     19              new LogConsumer()
     20             );
     21 
     22             Console.ReadLine();
     23         }
     24     }
     25 }
     26 

    总结

    1、基于MassTransit库使得我们使用RabbitMQ变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。

    2、RabbitMQ的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。

    声明

    本文为作者原创,转载请备注出处与保留原文地址,谢谢。如文章能给您带来帮助,请点下推荐或关注,感谢您的支持!

  • 相关阅读:
    (6).NET CORE微服务 Micro-Service ---- AOP框架
    (5).NET CORE微服务 Micro-Service ---- 熔断降级(Polly)
    (4).NET CORE微服务 Micro-Service ---- Consul服务发现和消费
    (3).NET CORE微服务 Micro-Service ---- Consul服务治理
    (2).NET CORE微服务 Micro-Service ---- .NetCore启动配置 和 .NetCoreWebApi
    (1).NET CORE微服务 Micro-Service ---- 什么是微服务架构,.netCore微服务选型
    ASP.NET Web Api 2 接口API文档美化之Swagger
    用Web api /Nancy 通过Owin Self Host简易实现一个 Http 服务器
    线程介绍
    C# 登陆验证码工具类VerifyCode
  • 原文地址:https://www.cnblogs.com/Miidy/p/9579764.html
Copyright © 2011-2022 走看看