zoukankan      html  css  js  c++  java
  • 我的第一个微服务系列(八):引入事件总线,分布式事务解决方案CAP

    一、CAP简介

      CAP 是一个在分布式系统中(SOA,MicroService)实现事件总线及最终一致性(分布式事务)的一个开源的 C# 库,她具有轻量级,高性能,易使用等特点。

      在构建SOA或MicroService系统的过程中,我们通常需要使用事件来集成每个服务。在此过程中,消息队列的简单使用并不能保证可靠性。CAP采用本地消息表程序与当前数据库集成,解决了分布式系统相互调用过程中可能发生的异常。它可以确保在任何情况下都不会丢失事件消息。

      其架构如下图

       CAP 支持 Kafka、RabbitMQ、AzureServiceBus 等消息队列,CAP 提供了 Sql Server, MySql, PostgreSQL,MongoDB 的扩展作为数据库存储。

      关于CAP的具体信息请参考:https://github.com/dotnetcore/CAP 。

    二、项目集成CAP

      添加Nuget包

    PM> Install-Package DotNetCore.CAP
    PM> Install-Package DotNetCore.CAP.RabbitMQ
    PM> Install-Package DotNetCore.CAP.MySql

      配置,因为在User.Api中更改信息然后发布集成事件,Contact.Api订阅事件,所以需要在User.Api、Contact.Api中都配置CAP。

      先来看User.Api如何配置

    services.AddCap(x => {
        x.UseEntityFramework<UserContext>(); //使用EntityFramework
    
        //x.UseRabbitMQ("127.0.0.1"); //使用Rabbit MQ
    
        x.UseRabbitMQ(c => {
            c.HostName = "127.0.0.1";
            c.Port = 5672;
            c.UserName = "admin";
            c.Password = "jesen";
    
        });
    
        x.UseDashboard(); //使用UI界面
    
        //注册服务发现
        x.UseDiscovery(options => {
            options.DiscoveryServerHostName = "localhost";
            options.DiscoveryServerPort = 8500;
            options.CurrentNodeHostName = "localhost";
            options.CurrentNodePort = 63679;
            options.NodeId = "1";
            options.NodeName = "CAP No.1 Node";
        });
    });

      发布

    #region Member
    private UserContext _userContext;
    private ILogger<UserController> _logger;
    private ICapPublisher _capPublisher;
    
    #endregion
    
    #region Ctor
    
    public UserController(UserContext userContext
        , ILogger<UserController> logger
        ,ICapPublisher capPublisher)
    {
        _userContext = userContext;
        _logger = logger;
        _capPublisher = capPublisher;
    }
    
    #endregion
    
    
    [HttpPatch]
    public async Task<IActionResult> Patch([FromBody]JsonPatchDocument<Models.User> patch)
    {
        var user = await _userContext.Users.SingleOrDefaultAsync(u => u.Id == UserIdentity.UserId);
        patch.ApplyTo(user);
    
        foreach (var property in user.Properties)
        {
            _userContext.Entry(property).State = EntityState.Detached;
        }
    
        var originProperties = await _userContext.UserProperties.AsNoTracking().Where(u => u.UserId == UserIdentity.UserId).ToListAsync();
        var allProperties = originProperties.Union(user.Properties).Distinct();
    
        var removeProperties = originProperties.Except(user.Properties);
        var newProperties = allProperties.Except(originProperties);
    
        foreach (var property in removeProperties)
        {
            //_userContext.Entry(property).State = EntityState.Deleted;
            _userContext.Remove(property);
        }
    
        foreach (var property in newProperties)
        {
            _userContext.Add(property);
        }
    
        using(var trans = _userContext.Database.BeginTransaction())
        {
            //发布用户变更的消息
            RaiseUserprofileChangedEvent(user);
    
            _userContext.Users.Update(user);
            await _userContext.SaveChangesAsync();
            
            trans.Commit();
        }
        
    
        return Json(user);
    }
    private void RaiseUserprofileChangedEvent(Models.User user) { if(_userContext.Entry(user).Property(nameof(user.Name)).IsModified || _userContext.Entry(user).Property(nameof(user.Title)).IsModified || _userContext.Entry(user).Property(nameof(user.Company)).IsModified || _userContext.Entry(user).Property(nameof(user.Avatar)).IsModified) { _capPublisher.Publish("finbook.userapi.userprofilechanged", new Dtos.UserIdentity { UserId=user.Id, Name = user.Name, Company =user.Company, Avatar= user.Avatar, Title = user.Title }); } }

      配置Contact.Api

    services.AddCap(x => {
    
    x.UseMySql(Configuration.GetConnectionString("MysqlConnection"));
    
        x.UseRabbitMQ(c => {
            c.HostName = Configuration["RabbitMq:HostName"];
            c.Port = Convert.ToInt32(Configuration["RabbitMq:Port"]);
            c.UserName = Configuration["RabbitMq:UserName"];
            c.Password = Configuration["RabbitMq:Password"];
    
        });
    
        x.UseDashboard(); //使用UI界面
    
        //注册服务发现
        x.UseDiscovery(options => {
            options.DiscoveryServerHostName = "localhost";
            options.DiscoveryServerPort = 8500;
            options.CurrentNodeHostName = "localhost";
            options.CurrentNodePort = 61628;
            options.NodeId = "2";
            options.NodeName = "CAP No.2 Node";
        });
    });
      "ConnectionStrings": {
        "MysqlConnection": "server=127.0.0.1;port=3306;database=contacts;userid=jesen;password=123456"
      },
      "RabbitMq": {
        "HostName": "127.0.0.1",
        "Port": 5672,
        "UserName": "admin",
        "Password": "jesen"
      },

      定义事件

    namespace Contact.Api.IntegrationEvents.Events
    {
        public class UserProfileChangedEvent
        {
            public int UserId { get; set; }
    
            public string Name { get; set; }
    
            public string Title { get; set; }
    
            public string Company { get; set; }
    
            public string Avatar { get; set; }
        }
    }

      完成事件处理

    using Contact.Api.Data;
    using Contact.Api.IntegrationEvents.Events;
    using DotNetCore.CAP;
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Contact.Api.IntegrationEvents.EventHanding
    {
        public class UserProfileChangedEventHandler : ICapSubscribe
        {
            private IContactRepository _contactRepository;
    
            public UserProfileChangedEventHandler(IContactRepository contactRepository)
            {
                _contactRepository = contactRepository;
            }
    
            [CapSubscribe("finbook.userapi.userprofilechanged")]
            public async Task UpdateContactInfo(UserProfileChangedEvent @event)
            {
                var token = new CancellationToken();
    
                await _contactRepository.UpdateContactInfoAsync(new Dtos.UserIdentity
                {
                    UserId = @event.UserId,
                    Name = @event.Name,
                    Company = @event.Company,
                    Title = @event.Title,
                    Avatar = @event.Avatar
                },token);
            }
    
        }
    }

      注入

    services.AddScoped<UserProfileChangedEventHandler>();

      运行项目后可以看到数据库多了两个表,界面上也可以看到发布和订阅数

     

      

      

  • 相关阅读:
    5. Longest Palindromic Substring
    24. Swap Nodes in Pairs
    23. Merge k Sorted Lists
    22. Generate Parentheses
    21. Merge Two Sorted Lists
    20. Valid Parentheses
    19. Remove Nth Node From End of List
    18. 4Sum
    17. Letter Combinations of a Phone Number
    14. Longest Common Prefix
  • 原文地址:https://www.cnblogs.com/jesen1315/p/11548345.html
Copyright © 2011-2022 走看看