zoukankan      html  css  js  c++  java
  • C#8.0: 在 LINQ 中支持异步的 IAsyncEnumerable

    C# 8.0中,提供了一种新的IAsyncEnumerable<T>接口,在对集合进行迭代时,支持异步操作。比如在读取文本中的多行字符串时,如果读取每行字符串的时候使用同步方法,那么会导致线程堵塞。IAsyncEnumerable<T>可以解决这种情况,在迭代的时候支持使用异步方法。也就是说,之前我们使用foreach来对IEnumerable进行迭代,现在可以使用await foreach来对IAsyncEnumerable<T>进行迭代,每个项都是可等待的。这种新的接口称为async-streams,将会随.NET Core 3发布。我们来看一下如何在LINQ中实现异步的迭代。

    使用常规的IEnumerable<T>

    首先我们创建一个新的Console项目,基于.NET Core 3

    namespace AsyncLinqDemo
    {
       class Program
      {
           static void Main(string[] args)
          {
               Console.WriteLine("Input the file path:");
               var file = Console.ReadLine();
               var lines = ReadAllLines(file);
               foreach (var line in lines)
              {
                   Console.WriteLine(line);
              }
          }
    
           static IEnumerable<string> ReadAllLines(string file)
          {
               using (var fs = File.OpenRead(file))
              {
                   using (var sr = new StreamReader(fs))
                  {
                       while (true)
                      {
                           string line = sr.ReadLine();
                           if(line == null)
                          {
                               break;
                          }
                           yield return line;
                      }
                  }
              }
          }
      }
    }

    这是一个很简单的Console程序,实现了一个简单的返回类型为IEnumerable<string>ReadAllLines(string file)方法,从文本文件中逐行读取文本,并逐行输出。如果文本内容较少的话,没什么问题。但如果我们使用过aync/await,就会了解,在IO操作如读取或写入文件的时候,最好使用异步方法以避免线程阻塞。让我们来改进一下。

    使用异步的IAsyncEnumerable<T>

    可以优化的是下面这句:

    string line = sr.ReadLine();

    对于IO操作,最好使用异步方式。这里可使用相应的异步方法:

    string line = await sr.ReadLineAsync();

    我们说“异步是传染的”,如果这里使用异步,那么相应的该方法的返回值也要使用异步,所以需要将返回值改为static async Task<IEnumerable<string>>,但这样会得到一个错误:

    ErrorCS1624The body of 'Program.ReadAllLines(string)' cannot be an iterator block because 'Task<IEnumerable<string>>' is not an iterator interface typeAsyncLinqDemoC:SourceWorkspacesConsoleAsyncLinqDemoAsyncLinqDemoProgram.cs23Active

    因为Task<IEnumerable<string>>并不是一个可以迭代的接口类型,所以我们无法在方法内部使用yield关键字。解决问题的办法是使用新的IAsyncEnumerable接口:

    static async IAsyncEnumerable<string> ReadAllLines(string file)
    {
       using (var fs = File.OpenRead(file))
      {
           using (var sr = new StreamReader(fs))
          {
               while (true)
              {
                   string line = await sr.ReadLineAsync();
                   if(line == null)
                  {
                       break;
                  }
                   yield return line;
              }
    
          }
      }
    }

    F12查看该接口的定义:

    namespace System.Collections.Generic
    {
       public interface IAsyncEnumerable<out T>
      {
           IAsyncEnumerator<T> GetAsyncEnumerator(CancellationTokencancellationToken = default);
      }
    }

    这是一个异步的迭代器,并提供了CancellationToken。再按F12查看IAsyncEnumerator<T>的定义,可发现里面是这样的:

    namespace System.Collections.Generic
    {
       public interface IAsyncEnumerator<out T> : IAsyncDisposable
      {
           T Current { get; }
           ValueTask<bool> MoveNextAsync();
      }
    }

    这里MoveNextAsync()方法实际是返回了一个结果类型为boolTask,每次迭代都是可等待的,从而实现了迭代器的异步。

    使用await foreach消费IAsyncEnumerable<T>

    当我们做了以上改动之后,ReadAllLines()方法返回的是一个支持异步的IAsyncEnumerable,那么在使用的时候,也不能简单的使用foreach了。修改Main方法如下:

    static async Task Main(string[] args)
    {
       Console.WriteLine("Input the file path:");
       var file = Console.ReadLine();
       var lines = ReadAllLines(file);
       await foreach (var line in lines)
      {
           Console.WriteLine(line);
      }
    }

    首先在foreach之前添加await关键字,还要需要将Main方法由void改为async Task。这样整个程序都是异步执行了,不会再导致堵塞了。这个例子只是一个简单的demo,是否使用异步并不会感觉到明显的区别。如果在迭代内部需要比较重的操作,如从网络获取大量数据或读取大量磁盘文件,异步的优势还是会比较明显的。

    使用LINQ消费IAsyncEnumerable<T>

    使用LINQ来操作集合是常用的功能。如果使用IEnumberable,在Main方法中可以做如下改动:

    var lines = ReadAllLines(file);
    var res = from line in lines where line.StartsWith("ERROR: ") selectline.Substring("ERROR: ".Length);
    foreach (var line in res)
    {
       Console.WriteLine(line);
    }

    或:

    var res = lines.Where(x => x.StartsWith("ERROR: ")).Select(x => x.Substring("ERROR: ".Length));

    如果使用了新的IAsyncEnumerable,你会发现无法使用Where等操作符了:

    ErrorCS1936Could not find an implementation of the query pattern for source type 'IAsyncEnumerable<string>'. 'Where' not found.AsyncLinqDemoC:SourceWorkspacesConsoleAsyncLinqDemoAsyncLinqDemoProgram.cs16Active

    目前LINQ还没有提供对IAsyncEnumerable的原生支持,不过微软提供了一个Nuget包来实现此功能。在项目中打开Nuget Package Manger搜索安装System.Linq.Async,注意该包目前还是预览版,所以要勾选include prerelease才能看到。安装该Nuget包后,Linq查询语句中的错误就消失了。

    System.Linq.Async这个包中,对每个同步的LINQ方法都做了相应的扩展。所以基本上代码无需什么改动即可正常编译。

    对于LINQ中的条件语句,也可以使用WhereAwait()方法来支持await

    public static IAsyncEnumerable<TSource> WhereAwait<TSource>(thisIAsyncEnumerable<TSource> source, Func<TSource, int, ValueTask<bool>>predicate);

    如需要在条件语句中进行IO或网络请求等异步操作,可以这样用:

    var res = lines.WhereAwait(async x => await DoSomeHeavyOperationsAsync(x));

    DoSomeHeavyOperationsAsync方法的签名如下:

    private static ValueTask<bool> DoSomeHeavyOperationsAsync(string x)
    {
       //Do some works...
    }

    小结

    通过以上的示例,我们简要了解了如何使用IAsyncEnumerable接口以及如何在LINQ中实现异步查询。在使用该接口时,我们需要创建一个自定义方法返回IAsyncEnumerable<T>来代替IEnumberable<T>,这个方法可称为async-iterator方法,需要注意以下几点:

    • 该方法应该被声明为async

    • 返回IAsyncEnumerable<T>

    • 同时使用awaityield。如await foreachyield returnyield break等。

    例如:

    async IAsyncEnumerable<int> GetValuesFromServer()
    {
       while (true)
      {
           IEnumerable<int> batch = await GetNextBatch();
           if (batch == null) yield break;
    
           foreach (int item in batch)
          {
               yield return item;
          }
      }
    }

    此外还有一些限制:

    • 无法在tryfinally块中使用任何形式的yield语句。

    • 无法在包含任何catch语句的try语句中使用yield return语句。

    期待.NET Core 3的正式发布!

    了解新西兰IT行业真实码农生活

    请长按上方二维码关注“程序员在新西兰”

  • 相关阅读:
    Gson中@SerializedName 注解使用
    centos8 安装mongodb4.4
    ssh 连接manjaro失败
    git保存仓库的账号密码
    centos 安装etcd
    kubeadm部署k8s 拉取基础镜像
    centos 安装cloc 代码统计工具
    centos7 安装mongodb
    shell获取时间戳
    最详细的阿里云ECS服务器CentOS7上安装tomcat8服务(三)
  • 原文地址:https://www.cnblogs.com/yanxiaodi/p/Support-IAsyncEnumerable-with-LINQ.html
Copyright © 2011-2022 走看看