zoukankan      html  css  js  c++  java
  • asp.net core microservices 架构之Task 事务一致性 事件源 详解

    一 aspnetcore之task的任务状态-CancellationToken                   

         我有一篇文章讲解了asp.net的线程方面的知识。我们知道.net的针对于多线程的一个亮点就是Task,net clr维护了一个线程池,自动的分派给task执行,执行完成,迅速返回线程池,并且维护异常和状态,针对于基础的thread和其他两种异步编程,Task非常的灵巧,但是针对和应用生命周期关联的异步任务,还是使用Workbackgroup比较合适,或者甚至是基础的thread,因为Task比较高级的线程类,操作也比较简化,人为控制比较弱。那这一节为什么要说线程尼?大家有没有遇到过,部署或者人为的去重启,往往会造成一些不必要的业务中断,web api有这样的情况,后台程序也有这样的情况。异常和系统硬件的故障已经让我们防不胜防了,那么就尽量的人为的情况少那么一点点,系统的健壮性也就高那么一点点。

       目前有两个技巧可以处理这一类事情,第一是让主机graceful方式关闭,并且超时时间设置长一点,这样就有足够的时间,让运行的请求执行完毕,看代码:

        

    public static async Task Main(string[] args)
    {
        var host = new HostBuilder()
            .Build();
    
        await host.RunAsync();
    }

    这是官方上的一段话:IHostedService 是执行代码的入口点。 每个 IHostedService 实现都按照 ConfigureServices 中服务注册的顺序执行。 主机启动时,每个 IHostedService 上都会调用 StartAsync,主机正常关闭时,以反向注册顺序调用 StopAsync

    //关闭超时值
    
    ShutdownTimeout 设置 StopAsync 的超时值。 默认值为 5 秒。
    Program.Main 中的以下选项配置将默认值为 5 秒的关闭超时值增加至 20 秒:
    C#
    
    //复制
    var host = new HostBuilder()
        .ConfigureServices((hostContext, services) =>
        {
            services.Configure<HostOptions>(option =>
            {
                option.ShutdownTimeout = System.TimeSpan.FromSeconds(20);
            });
        })
        .Build();

    而我们看看源码中StopAsync方法:

    /// <summary>
            /// Attempts to gracefully stop the host with the given timeout.
            /// </summary>
            /// <param name="host"></param>
            /// <param name="timeout">The timeout for stopping gracefully. Once expired the
            /// server may terminate any remaining active connections.</param>
            /// <returns></returns>
            public static Task StopAsync(this IHost host, TimeSpan timeout)
            {
                return host.StopAsync(new CancellationTokenSource(timeout).Token);
            }

    系统接受到Ctrl+c和sign,就会调用这个方法,以比较礼貌的方式关闭。

    那么看源码,这两个都是具有阻塞功能的异步方法,对应的非异步方法,都是同步调用的这两个方法:

    /// <summary>
            /// Runs an application and returns a Task that only completes when the token is triggered or shutdown is triggered.
            /// </summary>
            /// <param name="host">The <see cref="IHost"/> to run.</param>
            /// <param name="token">The token to trigger shutdown.</param>
            public static async Task RunAsync(this IHost host, CancellationToken token = default)
            {
                using (host)
                {
                    await host.StartAsync(token);
    
                    await host.WaitForShutdownAsync(token);
                }
            }
    
            /// <summary>
            /// Returns a Task that completes when shutdown is triggered via the given token.
            /// </summary>
            /// <param name="host">The running <see cref="IHost"/>.</param>
            /// <param name="token">The token to trigger shutdown.</param>
            public static async Task WaitForShutdownAsync(this IHost host, CancellationToken token = default)
            {
                var applicationLifetime = host.Services.GetService<IApplicationLifetime>();
            //当前token执行取消的时候,激发这个委托。
                token.Register(state =>
                {
                    ((IApplicationLifetime)state).StopApplication(); //当进程取消的时候,通知注册IApplicationLifetime的进程也取消。
                },
                applicationLifetime);
    
                var waitForStop = new TaskCompletionSource<object>(TaskCreationOptions.RunContinuationsAsynchronously);
                //应用程序生命周期中的停止应用token激发的时候,执行这个委托,去释放阻塞,执行host的停止方法。
                applicationLifetime.ApplicationStopping.Register(obj =>
                {
                    var tcs = (TaskCompletionSource<object>)obj;
                    tcs.TrySetResult(null);
                }, waitForStop);
    
                await waitForStop.Task;//阻塞,直到 tcs.TrySetResult(null);执行完毕。
    // Host will use its default ShutdownTimeout if none is specified.

    await host.StopAsync(); //调用关闭 }

    具体原理就是Host使用这个applicationLifetime,去控制。而applicationLifetime主要的是用到了CancellationTokenSource这个类,使用这个类是可以控制task的取消执行的。

    所以,两个解决方案,如果是webapi,就将将超时时间设置大一点。

    第二,如果在非webapi中,使用了超长执行的Task,就使用CancellationTokenSource吧,将它的Token传进去,在外边判断是否执行中,如果不在执行中,就执行Cancel方法,当然在task内部,也可以

    判断token,是否自己主动取消掉。

    这是官方的一个例子,了解CancellationTokenSource这个类,那么就会明白如何去处理Task

    using System;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;
    
    public class Example
    {
       public static void Main()
       {
          // Define the cancellation token.
          CancellationTokenSource source = new CancellationTokenSource();
          CancellationToken token = source.Token;
    
          Random rnd = new Random();
          Object lockObj = new Object();
          
          List<Task<int[]>> tasks = new List<Task<int[]>>();
          TaskFactory factory = new TaskFactory(token);
          for (int taskCtr = 0; taskCtr <= 10; taskCtr++) {
             int iteration = taskCtr + 1;
             tasks.Add(factory.StartNew( () => {
                                           int value;
                                           int[] values = new int[10];
                                           for (int ctr = 1; ctr <= 10; ctr++) {
                                              lock (lockObj) {
                                                 value = rnd.Next(0,101);
                                              }
                                              if (value == 0) { 
                                                 source.Cancel();
                                                 Console.WriteLine("Cancelling at task {0}", iteration);
                                                 break;
                                              }   
                                              values[ctr-1] = value; 
                                           }
                                           return values;
                                        }, token));   
             
          }
          try {
             Task<double> fTask = factory.ContinueWhenAll(tasks.ToArray(), 
                                                          (results) => {
                                                             Console.WriteLine("Calculating overall mean...");
                                                             long sum = 0;
                                                             int n = 0; 
                                                             foreach (var t in results) {
                                                                foreach (var r in t.Result) {
                                                                      sum += r;
                                                                      n++;
                                                                   }
                                                             }
                                                             return sum/(double) n;
                                                          } , token);
             Console.WriteLine("The mean is {0}.", fTask.Result);
          }   
          catch (AggregateException ae) {
             foreach (Exception e in ae.InnerExceptions) {
                if (e is TaskCanceledException)
                   Console.WriteLine("Unable to compute mean: {0}", 
                                     ((TaskCanceledException) e).Message);
                else
                   Console.WriteLine("Exception: " + e.GetType().Name);
             }
          }
          finally {
             source.Dispose();
          }
       }
    }
    // Repeated execution of the example produces output like the following:
    //       Cancelling at task 5
    //       Unable to compute mean: A task was canceled.
    //       
    //       Cancelling at task 10
    //       Unable to compute mean: A task was canceled.
    //       
    //       Calculating overall mean...
    //       The mean is 5.29545454545455.
    //       
    //       Cancelling at task 4
    //       Unable to compute mean: A task was canceled.
    //       
    //       Cancelling at task 5
    //       Unable to compute mean: A task was canceled.
    //       
    //       Cancelling at task 6
    //       Unable to compute mean: A task was canceled.
    //       
    //       Calculating overall mean...
    //       The mean is 4.97363636363636.
    //       
    //       Cancelling at task 4
    //       Unable to compute mean: A task was canceled.
    //       
    //       Cancelling at task 5
    //       Unable to compute mean: A task was canceled.
    //       
    //       Cancelling at task 4
    //       Unable to compute mean: A task was canceled.
    //       
    //       Calculating overall mean...
    //       The mean is 4.86545454545455.

    二   业务的事务一致性                                                                

           因为微服务的理念中是牺牲了系统业务的一致性,我们知道事务的一致性都是靠的数据库的本地事务,或者分布式事务来实现的,但是微服务是严禁使用分布式事务。那么如何保证整个系统的事务完整性尼?举个例子:比如订单服务中,新接受一个订单,这个订单需要同步到库房的订单子系统,那么在订单服务中的这个订单在最后更新自己订单状态的时候,是需要同时发送异步消息给库房消息服务器的,如果这时候网络断了,本地订单更新成功了,但是异步消息没有发送过去,这样就会引起业务的缺失,目前有两个方法可以实现:

          第一:为本地数据库创建事件源表,记录下消息和本地数据更新的全部状态,比如订单在更新前就可以添加事件,事件状态可以有,准备更新订单,订单已更新,发送消息队列,消息发送成功等。

    这样的好处就是最后跟踪这个事务处理的时候,每个步骤都可以找到,而且完全不用事务。最后job去跟踪失败情况,然后根据情况处理。

          第二:只是用本地事务,就是在订单更新的时候,同时给事件源表添加消息内容,然后让后台job去发送消息,这样是最简单和最稳定的方式。

          当然,最合适的还是第一种方法,虽然代码能复杂点,但是最后的效果是一样的,而且效率是比第二种方法更高效,但是考虑打事件源表并不是并发频繁操作的表,所以这个看自己的喜好了。

    针对一个系统,业务的一致性,也并不是全部,针对于一些关键业务做好一致性,但是很多其实可以设计成为在用户ui层面去补偿操作,唯一的坏处就是一部分数据需要重新填写。

    三     事件源                                                                                                           

         这个事件源并不是为了解决业务的一致性,而是为了应对大数据量的请求,比如,客户管理,一个分类下有上万条记录需要处理,那么往往我们需要对性能和实时反馈上有个折衷。

         系统设计如下:

                                       

      这样看来,会增加1个api服务和一个后台服务,但是对于系统的问题,却得到了一个缓冲,或许这个设计不是最好的,但是却可以做一个抛砖引玉的案例,现实中案例非常多变,所以设计也会有很多方案。

      因为目前我们看到的大部分app,请求的时候,某些功能确实会有少许等待事件,这个都是为了折衷,当然这一篇内容并不是讨论云或者分布式计算,但是在后台这块处理越快,反馈也越快。

      这套方案的设计理念其实就是异步处理,可以有自己的优化空间,而并不会消耗api这个轻量级服务,后台分布式计算越快,app反应也越快,到一定程度,就并不会感觉到有延迟,这就是大师比喻的耳朵与眼睛的关系。

  • 相关阅读:
    [day002]剑指 Offer 09. 用两个栈实现队列
    [day003]718. 最长重复子数组
    [linux]关于Ubuntu中Could not get lock /var/lib/dpkg/lock解决方案
    96. 不同的二叉搜索树
    91. 解码方法
    [动态规划]64. 最小路径和
    62.不同路径
    【Java】list根据某一条件进行分组
    【Java】批量生成小程序参数码并打包下载
    【Docker】使用docker制作libreoffice镜像并解决中文乱码问题
  • 原文地址:https://www.cnblogs.com/ck0074451665/p/10332044.html
Copyright © 2011-2022 走看看