zoukankan      html  css  js  c++  java
  • NET Core 2.0利用MassTransit集成RabbitMQ

    NET Core 2.0利用MassTransit集成RabbitMQ

    https://www.cnblogs.com/Andre/p/9579764.html

    在ASP.NET Core上利用MassTransit来集成使用RabbitMQ真的很简单,代码也很简洁。近期因为项目需要,我便在这基础上再次进行了封装,抽成了公共方法,使得使用RabbitMQ的调用变得更方便简洁。那么,就让咱们来瞧瞧其魅力所在吧。

    MassTransit

    先看看MassTransit是个什么宝贝(MassTransit官网的简介):

    MassTransit是一个免费的开源轻量级消息总线,用于使用.NET框架创建分布式应用程序。MassTransit在现有的顶级消息传输上提供了一系列广泛的功能,从而以开发人员友好的方式使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠且可扩展的方式。

    通俗描述:

    MassTransit就是一套基于消息服务的高级封装类库,下游可联接RabbitMQ、Redis、MongoDb等服务。

    github官网:https://github.com/MassTransit/MassTransit

    RabbitMQ

    RabbitMQ是成熟的MQ队列服务,是由 Erlang 语言开发的 AMQP 的开源实现。关于介绍RabbitMQ的中文资料也很多,有需要可以自行查找。我这里贴出其官网与下载安装的链接,如下:

    官网:http://www.rabbitmq.com

    下载与安装:http://www.rabbitmq.com/download.html

    实现代码

    通过上面的介绍,咱们已对MassTransit与RabbitMQ有了初步了解,那么现在来看看如何在ASP.NET Core上优雅的使用RabbitMQ吧。

    1、创建一个名为“RabbitMQHelp.cs”公共类,用于封装操作RabbitMQ的公共方法,并通过Nuget来管理并引用“MassTransit”与“MassTransit.RabbitMQ”类库。

    2、“RabbitMQHelp.cs”公共类主要对外封装两个静态方法,其代码如下:

    复制代码
    1 using MassTransit;
    2 using MassTransit.RabbitMqTransport;
    3 using System;
    4 using System.Collections.Generic;
    5 using System.Text;
    6 using System.Threading.Tasks;
    7
    8 namespace Lezhima.Comm
    9 {
    10 ///


    11 /// RabbitMQ公共操作类,基于MassTransit库
    12 ///

    13 public class RabbitMQHelp
    14 {
    15 #region 交换器
    16
    17 ///
    18 /// 操作日志交换器
    19 /// 同时需在RabbitMQ的管理后台创建同名交换器
    20 ///

    21 public static readonly string actionLogExchange = "Lezhima.ActionLogExchange";
    22
    23
    24 #endregion
    25
    26
    27 #region 声明变量
    28
    29 ///
    30 /// MQ联接地址,建议放到配置文件
    31 ///

    32 private static readonly string mqUrl = "rabbitmq://192.168.1.181/";
    33
    34 ///
    35 /// MQ联接账号,建议放到配置文件
    36 ///

    37 private static readonly string mqUser = "admin";
    38
    39 ///
    40 /// MQ联接密码,建议放到配置文件
    41 ///

    42 private static readonly string mqPwd = "admin";
    43
    44 #endregion
    45
    46 ///
    47 /// 创建连接对象
    48 /// 不对外公开
    49 ///

    50 private static IBusControl CreateBus(Action<IRabbitMqBusFactoryConfigurator, IRabbitMqHost> registrationAction = null)
    51 {
    52 //通过MassTransit创建MQ联接工厂
    53 return Bus.Factory.CreateUsingRabbitMq(cfg =>
    54 {
    55 var host = cfg.Host(new Uri(mqUrl), hst =>
    56 {
    57 hst.Username(mqUser);
    58 hst.Password(mqPwd);
    59 });
    60 registrationAction?.Invoke(cfg, host);
    61 });
    62 }
    63
    64
    65 ///
    66 /// MQ生产者
    67 /// 这里使用fanout的交换类型
    68 ///

    69 ///
    70 public async static Task PushMessage(string exchange, object obj)
    71 {
    72 var bus = CreateBus();
    73 var sendToUri = new Uri($"{mqUrl}{exchange}");
    74 var endPoint = await bus.GetSendEndpoint(sendToUri);
    75 await endPoint.Send(obj);
    76 }
    77
    78 ///
    79 /// MQ消费者
    80 /// 这里使用fanout的交换类型
    81 /// consumer必需是实现IConsumer接口的类实例
    82 ///

    83 ///
    84 public static void ReceiveMessage(string exchange, object consumer)
    85 {
    86 var bus = CreateBus((cfg, host) =>
    87 {
    88 //从指定的消息队列获取消息 通过consumer来实现消息接收
    89 cfg.ReceiveEndpoint(host, exchange, e =>
    90 {
    91 e.Instance(consumer);
    92 });
    93 });
    94 bus.Start();
    95 }
    96 }
    97 }
    98
    复制代码

    3、“RabbitMQHelp.cs”公共类已经有了MQ“生产者”与“消费者”两个对外的静态公共方法,其中“生产者”方法可以在业务代码中直接调用,可传递JSON、对象等类型的参数向指定的交换器发送数据。而“消费者”方法是从指定交换器中进行接收绑定,但接收到的数据处理功能则交给了“consumer”类(因为在实际项目中,不同的数据有不同的业务处理逻辑,所以这里我们直接就通过IConsumer接口交给具体的实现类去做了)。那么,下面我们再来看看消费者里传递进来的“consumer”类的代码吧:

    复制代码
    1 using MassTransit;
    2 using System;
    3 using System.Collections.Generic;
    4 using System.Text;
    5 using System.Threading.Tasks;
    6
    7 namespace Lezhima.Storage.Consumer
    8 {
    9 ///


    10 /// 从MQ接收并处理数据
    11 /// 实现MassTransit的IConsumer接口
    12 ///

    13 public class LogConsumer : IConsumer
    14 {
    15 ///
    16 /// 实现Consume方法
    17 /// 接收并处理数据
    18 ///

    19 ///
    20 ///
    21 public Task Consume(ConsumeContext context)
    22 {
    23 return Task.Run(async () =>
    24 {
    25 //获取接收到的对象
    26 var amsg = context.Message;
    27 Console.WriteLine($"Recevied By Consumer:{amsg}");
    28 Console.WriteLine($"Recevied By Consumer:{amsg.ActionLogId}");
    29 });
    30 }
    31 }
    32 }
    33
    复制代码

    调用代码

    1、生产者调用代码如下:

    复制代码
    1 ///


    2 /// 测试MQ生产者
    3 ///

    4 ///
    5 [HttpGet]
    6 public async Task AddMessageTest()
    7 {
    8 //声明一个实体对象
    9 var model = new ActionLog();
    10 model.ActionLogId = Guid.NewGuid();
    11 model.CreateTime = DateTime.Now;
    12 model.UpdateTime = DateTime.Now;
    13 //调用MQ
    14 await RabbitMQHelp.PushMessage(RabbitMQHelp.actionLogExchange, model);
    15
    16 return new MobiResult(1000, "操作成功");
    17 }
    复制代码

    2、消费者调用代码如下:

    复制代码
    1 using Lezhima.Storage.Consumer;
    2 using Microsoft.Extensions.Configuration;
    3 using System;
    4 using System.IO;
    5
    6 namespace Lezhima.Storage
    7 {
    8 class Program
    9 {
    10 static void Main(string[] args)
    11 {
    12 var conf = new ConfigurationBuilder()
    13 .SetBasePath(Directory.GetCurrentDirectory())
    14 .AddJsonFile("appsettings.json", true, true)
    15 .Build();
    16
    17 //调用接收者
    18 RabbitMQHelp.ReceiveMessage(RabbitMQHelp.actionLogExchange,
    19 new LogConsumer()
    20 );
    21
    22 Console.ReadLine();
    23 }
    24 }
    25 }
    26
    复制代码

    总结

    1、基于MassTransit库使得我们使用RabbitMQ变得更简洁、方便。而基于再次封装后,生产者与消费者将不需要关注具体的业务,也跟业务代码解耦了,更能适应项目的需要。

    2、RabbitMQ的交换器需在其管理后台自行创建,而这里使用的fanout类型是因为其发送速度最快,且能满足我的项目需要,各位可视自身情况选用不同的类型。fanout类型不会存储消息,必需要消费者绑定交换器后才会发送给消费者。

  • 相关阅读:
    eclipse导入基于maven的java项目后没有Java标志和没有maven Dependencies有解决办法
    centOS6.5 安装后无法启动无线上网
    centOS6.5 关闭关盖待机
    centOS6.5 usr/src/kernels下为空
    python求两个列表的并集.交集.差集
    二叉树遍历
    python实现单链表的反转
    关系型数据库和非关系型数据库的区别和特点
    python 实现快速排序(面试经常问到)
    golang 切片和map查询比较
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/9598979.html
Copyright © 2011-2022 走看看