前言
NServiceBus是.Net平台下的开源的消息服务框架,已经支持.Net Core。目前稳定版本7.1。企业开发需要购买License,开发者可在线下载开发者License。
官方网站:https://particular.net/
官方示例:https://docs.particular.net/get-started/
NServiceBus入门
如图所示,项目一共包括4个端点(Endpoint),也就是四个单独的项目,端点是NServiceBus中的核心概念,发送消息和事件发布订阅的基础都是Endpoint。这个项目中包括发送消息和事件的发布订阅。
完整的项目结构如图所示:
ClientUI
class Program
{
private static ILog log = LogManager.GetLogger<Program>();
static void Main(string[] args)
{
MainAsync().GetAwaiter().GetResult();
}
static async Task RunAsync(IEndpointInstance endpointInstance)
{
log.Info("Press 'P' to place an order,press 'Q' to quit");
while (true)
{
var key = Console.ReadKey();
Console.WriteLine();
switch (key.Key)
{
case ConsoleKey.P:
{
var command = new PlaceOrder
{
OrderId = Guid.NewGuid().ToString()
};
log.Info($"Sending PlaceOrder with OrderId:{command.OrderId}");
//发送到Sales端点
await endpointInstance.Send("Sales",command).ConfigureAwait(false);
break;
}
case ConsoleKey.Q:
return;
default:
log.Info("Please try again");
break;
}
}
}
static async Task MainAsync()
{
Console.Title = "Client-UI";
var config = new EndpointConfiguration("ClientUI");//设置端点名称
config.UseTransport<LearningTransport>(); //设置消息管道模式,LearningTransport仅仅用来学习,生产慎用
config.UsePersistence<LearningPersistence>();//持久化
var endpointInstance =await Endpoint.Start(config).ConfigureAwait(false);
await RunAsync(endpointInstance).ConfigureAwait(false); //RunAsync返回的是Task,所以这里使用ConfigureAwait()
await endpointInstance.Stop().ConfigureAwait(false);
}
}
Sales
class Program
{
static async Task Main(string[] args)
{
Console.Title = "Sales";
var config = new EndpointConfiguration("Sales");
config.UseTransport<LearningTransport>();
config.UsePersistence<LearningPersistence>();
var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
Console.WriteLine("Press Enter to quit...");
Console.ReadLine();
await endpointInstance.Stop().ConfigureAwait(false);
}
}
public class PlaceOrderHandler:IHandleMessages<PlaceOrder>
{
private static ILog log = LogManager.GetLogger<PlaceOrderHandler>();
public Task Handle(PlaceOrder message, IMessageHandlerContext context)
{
//接受端点消息
log.Info($"Received PlaceOrder ,OrderId:{message.OrderId}");
//发布OrderPlaced事件
var order=new OrderPlaced();
order.OrderId = message.OrderId;
return context.Publish(order);
}
}
Billing
static async Task Main(string[] args)
{
Console.Title = "Sales";
var config = new EndpointConfiguration("Billing");
config.UseTransport<LearningTransport>();
config.UsePersistence<LearningPersistence>();
var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
Console.WriteLine("Press Enter to quit...");
Console.ReadLine();
await endpointInstance.Stop().ConfigureAwait(false);
}
public class OrderPlacedHandler:IHandleMessages<OrderPlaced>
{
private static ILog log = LogManager.GetLogger<OrderPlacedHandler>();
public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
//订阅OrderPlaced事件
log.Info($"Received OrderPlaced,OrderId {message.OrderId} - Charging credit card");
//发布OrderBilled事件
var order=new OrderBilled();
order.OrderId = message.OrderId;
return context.Publish(order);
}
}
Shipping
static async Task Main(string[] args)
{
Console.Title = "Sales";
var config = new EndpointConfiguration("Shipping");
config.UseTransport<LearningTransport>();
config.UsePersistence<LearningPersistence>();
var endpointInstance = await Endpoint.Start(config).ConfigureAwait(false);
Console.WriteLine("Press Enter to quit...");
Console.ReadLine();
await endpointInstance.Stop().ConfigureAwait(false);
}
public class OrderBilledHandler:IHandleMessages<OrderBilled>
{
private static ILog log = LogManager.GetLogger<OrderBilledHandler>();
//处理OrderBilled订阅事件
public Task Handle(OrderBilled message, IMessageHandlerContext context)
{
log.Info($"Received OrderBilled,OrderId={message.OrderId} Should we ship now?");
return Task.CompletedTask;
}
}
public class OrderPlacedHandler:IHandleMessages<OrderPlaced>
{
private static ILog log = LogManager.GetLogger<OrderPlacedHandler>();
//处理OrderPlaced订阅事件
public Task Handle(OrderPlaced message, IMessageHandlerContext context)
{
log.Info($"Received OrderPlaced,OrderId={message.OrderId} Should we ship now?");
return Task.CompletedTask;
}
}
运行结果
总结
NServiceBus的核心是在端点之间通信,通信的实体需要实现ICommand接口,通信的事件需要实现IEvent事件,NServiceBus会扫描实现这两个接口的类。每个端点之间的关键配置就是EndpointConfiguration。