zoukankan      html  css  js  c++  java
  • NET 5 CAP结合Redis+数据库实现最终一致性

    CAP 同时支持使用 RabbitMQ,Kafka,Azure Service Bus 等进行底层之间的消息发送。

    CAP 目前支持使用 Sql Server,MySql,PostgreSql,MongoDB 数据库的项目。

    一般是cap+Kafka,这里使用cap+redis

    安装DotNetCore.CAP nuGet包

     配置 appsettings.json 数据 。

    {
      "Logging": {
        "LogLevel": {
          "Default": "Warning"
        }
      },
      "ConnectionStrings": {
        "Mysql_Conn": "Server=localhost;port=3306;Database=db1;UserId=root;Password=123456",
      },
      "RabbitMQ": {
        "HostName": "192.168.122.199",
        "UserName": "admin",
        "Password": "123456",
        "VirtualHost": "vhost_lihy",
        "Port": 5672,
        "ExchangeName": "cap.text.lihy.exchange"
    
      }, 
      "AllowedHosts": "*"
    }

    根据底层消息队列,你可以选择引入不同的包:

    PM> Install-Package DotNetCore.CAP.Kafka
    PM> Install-Package DotNetCore.CAP.RabbitMQ
    PM> Install-Package DotNetCore.CAP.AzureServiceBus

    CAP 目前支持使用 SQL Server, PostgreSql, MySql, MongoDB 的项目,你可以选择引入不同的包:

    PM> Install-Package DotNetCore.CAP.SqlServer
    PM> Install-Package DotNetCore.CAP.MySql
    PM> Install-Package DotNetCore.CAP.PostgreSql
    PM> Install-Package DotNetCore.CAP.MongoDB     //需要 MongoDB 4.0+ 集群

    在 Startup.cs 文件中,添加如下配置:

    public void ConfigureServices(IServiceCollection services)
            {
                services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_2);
    
    
                services.AddDbContext<CapDbContext>(options =>
                options.UseMySql(Configuration.GetConnectionString("Mysql_Conn")));
    
    
                services.AddCap(x =>
                {
                    //如果你使用的 EF 进行数据操作,你需要添加如下配置:
                    x.UseEntityFramework<CapDbContext>();  //可选项,你不需要再次配置 x.UseSqlServer 了
    
                    //如果你使用的ADO.NET,根据数据库选择进行配置:
                    //x.UseSqlServer("数据库连接字符串");
                    //x.UseMySql("server=localhost;port=3306;userid=root;password=123456;database=db1;SslMode=none");
                    //x.UsePostgreSql("数据库连接字符串");
    
                    //如果你使用的 MongoDB,你可以添加如下配置:
                    //x.UseMongoDB("ConnectionStrings");  //注意,仅支持MongoDB 4.0+集群
    
                    //CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,根据使用选择配置:
                    x.UseRabbitMQ(o =>
                    {
                        o.HostName = Configuration.GetSection("RabbitMQ")["HostName"];
                        o.UserName = Configuration.GetSection("RabbitMQ")["UserName"];
                        o.Password = Configuration.GetSection("RabbitMQ")["Password"];
                        o.VirtualHost = Configuration.GetSection("RabbitMQ")["VirtualHost"];
                        o.Port = Convert.ToInt32(Configuration.GetSection("RabbitMQ")["Port"]);
                        //指定Topic exchange名称,不指定的话会用默认的
                        o.ExchangeName = Configuration.GetSection("RabbitMQ")["ExchangeName"];
    
                    });
    
                    //设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。
                    x.SucceedMessageExpiredAfter = 24 * 3600;
    
                    //设置失败重试次数
                    x.FailedRetryCount = 5;
    
                    //x.UseKafka("ConnectionStrings");
                    //x.UseAzureServiceBus("ConnectionStrings");
    
                    x.UseDashboard();
                });
    
            }

    发布事件/消息

    新建 PublishController 控制器

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using DotNetCore.CAP;
    using Microsoft.AspNetCore.Mvc;
    using MySql.Data.MySqlClient;
    
    // For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860
    
    namespace NETCORE.CAP.Controllers
    {
        [Route("api/[controller]")]
        public class PublishController : Controller
        {
    
            private readonly ICapPublisher _capBus;
    
            public PublishController(ICapPublisher capPublisher)
            {
                _capBus = capPublisher;
            }
    
    
            /// <summary>
            /// 不使用事务
            /// </summary>
            /// <returns></returns>
            [Route("~/without/transaction")]
            public IActionResult WithoutTransaction()
            {
                _capBus.Publish("xxx.services.show.time", DateTime.Now);
    
                return Ok();
            }
    
    
            ////Ado.Net 中使用事务,自动提交
            //[Route("~/adonet/transaction")]
            //public IActionResult AdonetWithTransaction()
            //{
            //    using (var connection = new MySqlConnection(ConnectionString))
            //    {
            //        using (var transaction = connection.BeginTransaction(_capBus, autoCommit: true))
            //        {
            //            //业务代码
    
            //            _capBus.Publish("xxx.services.show.time", DateTime.Now);
            //        }
            //    }
            //    return Ok();
            //}
    
            ////EntityFramework 中使用事务,自动提交
            //[Route("~/ef/transaction")]
            //public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext)
            //{
            //    using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true))
            //    {
            //        //业务代码
    
            //        _capBus.Publish("xxx.services.show.time", DateTime.Now);
            //    }
            //    return Ok();
            //}
             
        }
    }

    订阅事件/消息

     新建 ReceivedController 控制器

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using DotNetCore.CAP;
    using Microsoft.AspNetCore.Mvc;
    
    // For more information on enabling Web API for empty projects, visit https://go.microsoft.com/fwlink/?LinkID=397860
    
    namespace NETCORE.CAP.Controllers
    {
        [Route("api/[controller]")]
        public class ReceivedController : Controller
        {
            [NonAction]
            [CapSubscribe("xxx.services.show.time")]
            public void CheckReceivedMessage(DateTime time)
            {
                Console.WriteLine(time);
                //return Task.CompletedTask;
            }
        }
    }

    运行后,

    数据库即生成两张表

    调用接口 https://localhost:5001/without/transaction

    在数据表中可查看相关状态。

    Cap 仪表盘

    默认地址 https://localhost:5001/cap

    Dashboard介绍

    capOptions.UseDashboard(dashoptions =>
                    {
                       dashoptions.AppPath = "applicationpath";
                       dashoptions.PathMatch = "/cap";
                       dashoptions.Authorization = new[] { new CapDashboardFilter() };
    
                    });

    这里只说这几个参数

    AppPath:应用程序路径  访问dashboard的时候会有一个返回应用的操作,这个即是应用的地址

    PathMatch:不设置的情况下都是cap,可以指定自己的dashboard路由地址
    Authorization:授权处理

    授权处理具体实现

    只需要实现接口IDashboardAuthorizationFilter即可

    public class CapDashboardFilter : IDashboardAuthorizationFilter
        {
    
            public bool Authorize(DashboardContext context)
            {
                return true;
            }
        }

    通过DashboardContext上下文处理请求,允许返回true,不允许返回false

     附代码:https://gitee.com/wuxincaicai/NETCORE.git

    分布式事务cap介绍 https://www.cnblogs.com/cmliu/p/11767343.html

    CAP和SignalR接力数据推送  https://blog.csdn.net/lordwish/article/details/105801893

  • 相关阅读:
    批量数据导入数据库方法
    Remoting简单实践
    js面向对象继承
    Linq实现t-Sql的各种连接
    数据库树状结构的关系表的删除方案
    记录一次SQL查询语句
    mvc请求过程总结
    T-sql表表达式
    各个浏览器的兼容问题及样式兼容处理(不定期补充)
    vue.js 键盘enter事件的使用
  • 原文地址:https://www.cnblogs.com/netlock/p/13552982.html
Copyright © 2011-2022 走看看