MassTransit 是一个自由、开源、轻量级的消息总线基于.Net框架, 用于创建分布式应用程序。方便搭建基于消息的松耦合异步通信的应用程序和服务。MassTransit 在现有消息传输上提供了一组广泛的功能, 从而使开发人员能够友好地使用基于消息的会话模式异步连接服务。基于消息的通信是实现面向服务的体系结构的可靠和可扩展的方式。
官网地址:http://masstransit-project.com/
发布订阅模式
这种场景十分常见,发送一个消息(或事件)到消息队列中,有一个或是多个订阅方对预期的消息接收处理。
基于需要搭建了两个WebApi程序,用于模拟发送方和订阅方,其中的RabbitMQ已预先搭建好了,只在程序中引用包配置下即可。
<PackageReference Include="MassTransit" Version="7.2.0" />
<PackageReference Include="MassTransit.AspNetCore" Version="7.2.0" />
<PackageReference Include="MassTransit.RabbitMQ" Version="7.2.0" />
发布端配置
在Startup中增加MassTransit需要的服务及初始化配置。
- 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。
- 对发送方需要发送的消息初始化一个请求客户端,配置请求信息及推送到MQ的地址。
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
{
h.Username(Configuration["RabbitmqConfig:Username"]);
h.Password(Configuration["RabbitmqConfig:Password"]);
});
});
x.AddRequestClient<ValueEntered>(new Uri(GetServiceAddress("events-valueentered")));
});
services.AddMassTransitHostedService();
为了快速了解,使用Controller在Action中发起对MQ的消息推送
[ApiController]
[Route("[controller]")]
public class ValueController : ControllerBase
{
readonly IPublishEndpoint _publishEndpoint;
public ValueController(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
[HttpPost]
public async Task<ActionResult> Post(string value)
{
await _publishEndpoint.Publish<ValueEntered>(new
{
Value = value
});
return Ok();
}
}
订阅端配置
订阅端也创建一个WebApi应用,在Startup中增加MassTransit的服务,使用到的Nuget包和发布端一样。
- 对RabbitMQ的连接地址端口、虚拟主机、访问账号密码等系列配置。
- 为订阅端增加一个订阅处理的Handler,即如下的ValueEnteredEventConsumer
- 增加一个接受点,指定队列名称,即发送端发送的队列名称,设置该队列消费处理的Consumer,即ValueEnteredEventConsumer
services.AddMassTransit(x =>
{
x.AddConsumer<ValueEnteredEventConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host(Configuration["RabbitmqConfig:HostIP"], ushort.Parse(Configuration["RabbitmqConfig:HostPort"]), Configuration["RabbitmqConfig:VirtualHost"], h =>
{
h.Username(Configuration["RabbitmqConfig:Username"]);
h.Password(Configuration["RabbitmqConfig:Password"]);
});
cfg.ReceiveEndpoint("events-valueentered", e =>
{
e.ConfigureConsumer<ValueEnteredEventConsumer>(context);
});
});
});
services.AddMassTransitHostedService();
如此一来,通过Postman发送一个请求,经发布端发布一个消息到RabbitMQ,订阅端侦听消息,处理消息,一切都很熟悉。
请求响应模式
在发布订阅的基础上,改变以往的习惯,当发布一个消息后,等待订阅方的处理,并将消息推送回RabbitMQ,发送方接受到处理后的消息继续执行。
请求端
在Startup中新加上一个用于发送消息(CheckOrderStatus)的请求客户端及指定消息队列名称(为每一个消息创建一个单独的队列)。
x.AddRequestClient<CheckOrderStatus>(new Uri(GetServiceAddress("events-checkorderstatus")));
增加一个Controller及Action,来请求及获取处理结果(OrderStatusResult)。
[ApiController]
[Route("[controller]")]
public class OrderController : ControllerBase
{
private readonly IRequestClient<CheckOrderStatus> _client;
public OrderController(IRequestClient<CheckOrderStatus> client)
{
_client = client;
}
public async Task<OrderStatusResult> Get(string id)
{
var response = await _client.GetResponse<OrderStatusResult>(new { OrderId = id });
return response.Message;
}
}
响应端
同样在响应端Startup中对新的消息设置下消息侦听队列以及相应的Handler如下的ValueEnteredEventConsumer去消费消息并返回处理结果。
x.AddConsumer<CheckOrderStatusConsumer >();
x.UsingRabbitMq((context, cfg) =>
{
// ...
cfg.ReceiveEndpoint("events-checkorderstatus", e =>
{
e.ConfigureConsumer<CheckOrderStatusConsumer >(context);
});
});
Consumer中获取请求参数,执行请求,返回执行结果。
public class CheckOrderStatusConsumer : IConsumer<CheckOrderStatus>
{
public async Task Consume(ConsumeContext<CheckOrderStatus> context)
{
if (context.Message.OrderId == "9527")
{
throw new InvalidOperationException("Order not found");
}
Console.WriteLine($"OrderId:{context.Message.OrderId}");
await context.RespondAsync<OrderStatusResult>(new
{
OrderId = context.Message.OrderId,
Timestamp = Guid.NewGuid().ToString(),
StatusCode = "1",
StatusText = "Close"
});
}
}
这样一来,当请求端发起一个消息(事件)到RabbitMQ,响应端侦听并处理完毕返回处理结果到RabbitMQ,请求端依照响应结果继续执行后续请求。
HTTP方式差异
与以往的Http请求方式有所不同,通过httpClient.PostAsync发送请求,接收端处理并返回结果,而走requestClient发送请求到RabbitMQ,再由RabbitMQ推送到侦听节点消费并返回结果,如下第一二部分结构。
2021-06-29,望技术有成后能回来看见自己的脚步