异步流?
Async Stream 简单说来是一种非同步的迭代器模式,说更白一点就是可以await 的foreach。在过去的C# 中如果要回传一个可迭代的IEnumerable<T> ,我们可能会这样写:
public class EnumerableProcess { async static public Task<IEnumerable<string>> ReadLineAsync(string path) { List<string> list = new List<string>(); using (StreamReader reader = File.OpenText(path)) { while (await reader.ReadLineAsync() is string result) { list.Add(result); await Task.Delay(100); } } return list; } }
这是一个非同步一行一行读取文字档的例子,这个例子里的回传型别是一个Task<IEnumerable<string>> ,外部程式码将会这样呼叫这个方法:
var r = await EnumerableProcess.ReadLineAsync(path); foreach (var item in r) { Console.WriteLine(item); }
这造成一个长时等待的问题,因为呼叫端必须等待ReadLineAsync 这个Task 整个完成后才能回传;所以C# 8.0 引入了Async Stream 使得非同步的迭代得以实现, 这件事情不仅仅牵涉到编译器,也需要一些新的型别,主要是以下三个:
(1) IAsyncDisposable -- IAsyncEnumerator<out T> 将会拓展这个介面
public interface IAsyncDisposable { ValueTask DisposeAsync(); }
(2)IAsyncEnumerator <out T>
public interface IAsyncEnumerator<out T> : IAsyncDisposable { T Current { get; } ValueTask<bool> MoveNextAsync(); }
(3)IAsyncEnumerable <out T>
public interface IAsyncEnumerable<out T> { IAsyncEnumerator<T> GetAsyncEnumerator(); }
实作Async Stream
由于此时在框架中对于整个Async Stream 的实作尚未完整,所以没办法直接使用yield return,先示范最基本的写法,建立一个类别,并且实作以上介面:
sealed class AsyncFileProcess : IAsyncEnumerable<string>, IAsyncEnumerator<string> { private readonly StreamReader _reader; private bool _disposed; public AsyncFileProcess(string path) { _reader = File.OpenText(path); _disposed = false; } public string Current { get; private set; } public IAsyncEnumerator<string> GetAsyncEnumerator() { return this; } async public ValueTask<bool> MoveNextAsync() { await Task.Delay(100); var result = await _reader.ReadLineAsync(); Current = result; return result != null; } async public ValueTask DisposeAsync() { await Task.Run(() => Dispose()); } private void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (!this._disposed) { if (_reader != null) { _reader.Dispose(); } _disposed = true; } } }
呼叫端就可以这样呼叫它:
var process = new AsyncFileProcess("SourceFile.txt"); try { await foreach (var s in process) { Console.WriteLine(s); } Console.ReadLine(); } finally { await process.DisposeAsync(); }
你可以感受到第一个例子是停顿了很久之后,蹦一下全跳出来;而第二的例子则会一行行跑出来(为了强化这个效果在两方都加了Task.Delay )。在第二个例子的呼叫端可以看到await foreach 的使用。