在分布式系统中,存在分布式事务的问题需要我们去解决,这是一个不可避免的问题,.Net中有一个开源的分布式框架可以供我们使用。我们先来看看它是如何使用的。
1-新建一个.Net Core3.1的WebApi项目
2-引入以下包
3-新建一个WebApi作为消息队列的发布方
PublishController
[Route("api/[controller]")] [ApiController] public class PublishController : ControllerBase { private readonly string ConnectionString = MyConst.mysqlCommectionString;//数据库连接地址 private readonly ICapPublisher _capBus; public PublishController(ICapPublisher capPublisher) { _capBus = capPublisher; } [Route("adonet/transaction/publish")] public IActionResult AdonetWithTransaction() { using (var connection = new MySqlConnection(ConnectionString)) { using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true)) { //your business logic code _capBus.Publish("product.services.time", DateTime.Now);//product.services.time为消息队列的名称 } } return Ok(); } }
4-新建消息的订阅方
具体代码如下
public interface ISubscriberService { void ReceivedMessage(DateTime datetime); } public class SubscriberService : ISubscriberService, ICapSubscribe { [CapSubscribe("product.services.time")] public void ReceivedMessage(DateTime datetime) { Console.WriteLine($"============{datetime}==========="); //throw new Exception($"异常重试!"); } }
5-Startup.cs里面初始化DotNetCore.CAP框架内容
public class Startup { public Startup(IConfiguration configuration) { Configuration = configuration; } public IConfiguration Configuration { get; } public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddTransient<ISubscriberService, SubscriberService>(); services.AddCap(x => { x.UseMySql(MyConst.mysqlCommectionString); x.UseRabbitMQ(options => { options.UserName = "admin";//用户名 options.Password = "admin";//密码 options.HostName = "172.17.17.17";//rabbitmq ip //options.Port = 15672; }); x.UseDashboard();//DotNetCore.CAP框架管控台,地址是:http://localhost:端口/cap }); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseAuthorization(); app.UseEndpoints(endpoints => { endpoints.MapControllers(); }); } }
6-在数据库里面新建一个空的数据库,名字叫capdb,注意这个名字是可以自定义的,但注意在数据库连接地址的地址就要与你自定义的名字保持一致
7-我们请求地址
http://localhost:49237/api/Publish/adonet/transaction
8-这时订阅方即SubscriberService会收到消息
9-这时数据库里面会自动生成2张表,且里面会各自有一条数据,这表的作用可以认为就是分布式事务的事务日志表
更多分享请关注我的公众号