zoukankan      html  css  js  c++  java
  • 从无到有实现.net协程(一)

    协程的概念,就我而言,来源自当初学习Go,它可以用一句话来总结,“单线程无阻塞异步处理”,也就是说,首先,它的范围是针对单个线程来说的,一个线程可以运行多个代码片段,当运行期间遇到IO等待(包括网络IO、磁盘IO等,常见的数据库操作、web服务调用都属于IO等待)时,自动切换到其他代码片段上执行,当执行完毕或再次遇到IO等待时,再回到之前已经完成IO等待的代码片段继续执行,这样,就保证了线程无阻塞,并且多个片段都能被逐步处理(前提是代码中存在IO等待,否则还是顺序处理),以上就是协程的定义,从描述可以看到,这种处理方式特别适合用于高IO、低CPU计算的程序,现实中,大部分的Web应用都是属于这种模式,这就是为什么现在Nodejs, Go这类语言越来越火的原因。

    下面的图描述了协程的运行

    协程的实际处理顺序(实际取决与协程调度程序)

    在.net中,web处理(asp.net)的模式为多线程异步,其实也很好,也可以做到无阻塞,充分利用CPU资源,这里不谈这种模式,只谈协程模式。

    那么.net如何实现协程呢?这里首先必须介绍一个关键字,yield,.net协程就是利用它来实现的

    Yield是用来处理迭代器(IEnumerator)的,利用迭代器的特性,间接提供了一种可以中断恢复的处理方式,以代码说话

     1     class Program
     2     {
     3         static void Main(string[] args)
     4         {
     5             var result=TestYield();
     6 
     7             result.MoveNext();
     8             Console.WriteLine(result.Current);
     9 
    10             result.MoveNext();
    11             Console.WriteLine(result.Current);
    12 
    13             result.MoveNext();
    14             Console.WriteLine(result.Current);
    15 
    16             result.MoveNext();
    17 
    18             Console.Read();
    19 
    20         }
    21 
    22 
    23         static IEnumerator<string> TestYield()
    24         {
    25             yield return "A";
    26             Console.WriteLine("执行完成A");
    27             yield return "B";
    28             Console.WriteLine("执行完成B");
    29             yield return "C";
    30             Console.WriteLine("执行完成C");
    31         }
    32     }

    执行结果为

    从上面的代码可以看出,每当使用MoveNext方法,代码都从yield return语句之后继续执行,当遇到yield return或方法完成,再次返回调用方,这种迭代器模式(迭代器模式是本身就是23种设计模式之一),就提供了分段执行代码的能力,我们通过这种模式,就能用来完成协程,这里有一个特别需要注意的地方,就是,当我们调用TestYield方法时,你会发现它其实并没有被执行,直到我们第一次调用MoveNext方法,该方法才真正开始被执行,还记得Linq吗,Linq说了,只有调用了诸如ToList()、Count()之类的方法,才真正开始计算,和这个是不是很像?其实,在Linq内部,就是使用了迭代器。

    好了,现在,已经具备了实现协程的基本元素了,接下来就可以开始构建了,由两个部分组成,协程容器和协程单元,协程容器用来负责调度和执行,协程单元用来处理实际的业务,协程容器提供Register方法来注册每个协程单元,继续用代码来说话

     1     /// <summary>
     2     /// 协程容器接口
     3     /// </summary>
     4     public interface ICoroutineContainer
     5     {
     6         /// <summary>
     7         /// 注册协程单元
     8         /// </summary>
     9         /// <param name="unit">协程单元</param>
    10         void Register(ICoroutineUnit unit);
    11         /// <summary>
    12         /// 执行
    13         /// </summary>
    14         void Run();
    15 }
    16     /// <summary>
    17     /// 协程单元接口
    18     /// </summary>
    19     public interface ICoroutineUnit
    20     {
    21         /// <summary>
    22         /// 处理业务
    23         /// </summary>
    24         /// <returns></returns>
    25         IEnumerator<Task> Do();
    26 }

    这两个接口为整个实现的核心接口,下面的协程容器的基本实现

    /// <summary>
        /// 协程容器的基本实现
        /// </summary>
        public class CoroutineContainerBase : ICoroutineContainer
        {
            /// <summary>
            /// 存储协程单元的列表
            /// </summary>
            private List<UnitItem> _units = new List<UnitItem>();
            /// <summary>
            /// 存储新注册的协程单元,与协程单元列表分开,实现注册与执行互不影响
            /// </summary>
            private List<UnitItem> _addUnits = new List<UnitItem>();
            /// <summary>
            /// 错误处理
            /// </summary>
            private Action<ICoroutineUnit, Exception> _errorHandle;
    
            /// <summary>
            /// 构造函数
            /// </summary>
            /// <param name="errorHandle">错误处理</param>
            public CoroutineContainerBase(Action<ICoroutineUnit, Exception> errorHandle)
            {
                _errorHandle = errorHandle;
            }
    
            public void Register(ICoroutineUnit unit)
            {
                lock (_addUnits)
                {
                    _addUnits.Add(new UnitItem() { Unit = unit, UnitResult = null });
                }
    
            }
    
            public void Run()
            {
      	    //开启一个单独任务执行
                Task.Run(() =>
                {
                    //循环处理协程单元
                    while (true)
                    {
                        //将新注册的协程单元加入到列表中
                        lock (_addUnits)
                        {
                            foreach (var addItem in _addUnits)
                            {
                                _units.Add(addItem);
                            }
                            _addUnits.Clear();
                        }
    
                        //依次处理协程单元
                        foreach (var item in _units)
                        {
                            if (item.UnitResult == null)
                            {
                                var result = item.Unit.Do();
    
                                //运行到下一个断点
                                try
                                {
                                    result.MoveNext();
                                }
                                catch (Exception ex)
                                {
                                    _errorHandle(item.Unit, ex);
    
                                    _units.Remove(item);
    
                                    break;
                                }
    
                                item.UnitResult = result;
                            }
                            else
                            {
                                //检查等待是否已经完成,如果已经完成则继续运行
                                if (item.UnitResult.Current.IsCanceled || item.UnitResult.Current.IsCompleted || item.UnitResult.Current.IsFaulted)
                                {
                                    var nextResult = true;
                                    try
                                    {
                                        nextResult = item.UnitResult.MoveNext();
                                    }
                                    catch (Exception ex)
                                    {
                                        _errorHandle(item.Unit, ex);
                                        _units.Remove(item);
    
                                        break;
                                    }
                                    if (!nextResult)
                                    {
    
                                        _units.Remove(item);
    
                                        break;
                                    }
                                }
                            }
                        }
                        
                    }
                });
    
            }
    
    
            /// <summary>
            /// 协程单元存储格式
            /// </summary>
            private class UnitItem
            {
                /// <summary>
                /// 协程单元
                /// </summary>
                public ICoroutineUnit Unit { get; set; }
                /// <summary>
                /// 协程单元使用的迭代器
                /// </summary>
                public IEnumerator<Task> UnitResult { get; set; }
            }
        }
    

    实现两个协程单元

    /// <summary>
        /// 协程单元1
        /// 执行一个网络IO,访问163站点
        /// </summary>
        public class Action1 : ICoroutineUnit
        {
            public IEnumerator<Task> Do()
            {
                Console.WriteLine("开始执行Action1");
                HttpClient client = new HttpClient();
    
                yield return innerDo();
    
                Console.WriteLine("结束执行Action1");
            }
    
            private Task innerDo()
            {
                HttpClient client = new HttpClient();
                return client.GetAsync("http://www.163.com");
            }
        }
    
        /// <summary>
        /// 协程单元2
        /// 执行一个网络IO,访问163站点
        /// </summary>
        public class Action2 : ICoroutineUnit
        {
            public IEnumerator<Task> Do()
            {
                Console.WriteLine("开始执行Action2");
                yield return innerDo();
                Console.WriteLine("结束执行Action2");
            }
    
            private Task innerDo()
            {
                HttpClient client = new HttpClient();
                return client.GetAsync("http://www.163.com");
            }
    }
    

    主程序调用执行

            static void Main(string[] args)
            {
                //错误处理仅仅是将错误显示在控制台里
                Action<ICoroutineUnit,Exception> errorHandle = (unit, ex) =>
                  {
                      Console.WriteLine(ex.ToString());
                  };
                //初始化协程容器
                ICoroutineContainer coroutineContainerBase = new CoroutineContainerBase(errorHandle);
                //注册Action1
                coroutineContainerBase.Register(new Action1());
                //注册Action2
                coroutineContainerBase.Register(new Action2());
                //运行容器
                coroutineContainerBase.Run();
    
                Console.Read();
    
            }

    执行结果

    注意,每次执行的顺序可能不一样,取决于网络速度,但可以明显看到代码的分段执行。

    至此,一个最基本的协程框架已经完成

  • 相关阅读:
    C语言寒假大作战01
    C语言I作业12—学期总结
    C语言I博客作业11
    C语言I博客作业10
    C语言I博客作业09
    C语言I作业07:第十二周作业
    团队作业6--复审与事后分析
    Alpha阶段项目复审
    事后诸葛亮
    团队作业5——测试与发布(Alpha版本)
  • 原文地址:https://www.cnblogs.com/rhwleo/p/6853667.html
Copyright © 2011-2022 走看看