今天抽空学习了一下Reactive Extensions库,感觉还是比较容易上手的,顺手练习了一下,写了个ReadAsync的扩展。
static void Main(string[] args)
{
var bufferSize = 1000;
var buffer = new byte[bufferSize];
var stream = File.OpenRead(@"r:\1.txt");
stream.ReadAsync(buffer).Subscribe(
onNext: i => { Console.Write(Encoding.Default.GetString(buffer, 0, i)); },
onCompleted: () => Console.WriteLine("completed")
);
Console.ReadLine();
}
public static IObservable<int> ReadAsync(this Stream stream, byte[] buffer)
{
var bufferLen = buffer.Length;
var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);
{
var bufferSize = 1000;
var buffer = new byte[bufferSize];
var stream = File.OpenRead(@"r:\1.txt");
stream.ReadAsync(buffer).Subscribe(
onNext: i => { Console.Write(Encoding.Default.GetString(buffer, 0, i)); },
onCompleted: () => Console.WriteLine("completed")
);
Console.ReadLine();
}
public static IObservable<int> ReadAsync(this Stream stream, byte[] buffer)
{
var bufferLen = buffer.Length;
var readAsync = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead);
return Observable.Defer(() => readAsync(buffer, 0, bufferLen)).Repeat().TakeWhile(i => i > 0)
// return Observable.While(() => stream.Position != stream.Length, Observable.Defer(() => readAsync(buffer, 0, bufferLen)));
}
public static IObservable<string> ReadLinesAsync(this Stream stream, byte[] buffer)
{
return Observable.CreateWithDisposable<string>(observer =>
{
var blocks = stream.ReadAsync(buffer).Select(i => Encoding.Default.GetString(buffer, 0, i));
var lastLine = string.Empty;
return blocks.Subscribe(data =>
{
if (data.Contains("\r\n"))
{
var lines = data.Split(new string[] { "\r\n" }, StringSplitOptions.None);
observer.OnNext(lastLine + lines[0]);
foreach (var line in lines.Skip(1).Take(lines.Length - 2))
{
observer.OnNext(line);
}
lastLine = lines[lines.Length - 1];
}
else
{
lastLine += data;
}
},
observer.OnError,
() =>
{
observer.OnNext(lastLine);
observer.OnCompleted();
});
});
}
当然,到C# 5.0后,在编译器的原生语法糖支持下,实现这个扩展更加简单和直观,不过Reactive Extensions是异步的基础,并且在一些高级应用方便语法糖仍然不能取代它。先熟悉下这个框架还是有必要的。
由于目前还只熟悉了皮毛,有许多东西还不会,等后续详细了解后再专门写篇文章介绍它。对这个库感兴趣的朋友可以访问一下下面几个链接:
Reactive Extensions for .NET (Rx)