C# 4.5中引入了await关键字,可以让大大简化异步编程操作。在Reactive Extensions v2.0中实现了让IObservable支持await操作,通过它我们可以简化observable的异步编程:
static async Task Samples()
{
var xs = Observable.Range(0, 10, ThreadPoolScheduler.Instance);
Console.WriteLine("Last = " + await xs);
Console.WriteLine("First = " + await xs.FirstAsync());
Console.WriteLine("Third = " + await xs.ElementAt(3));
Console.WriteLine("All = " + string.Join(", ", await xs.ToList()));
try
{
Console.WriteLine("Error = " + await xs.Select(x => 1 / (5 - x)));
}
catch (DivideByZeroException)
{
Console.WriteLine("Yups, we failed!");
}
}
{
var xs = Observable.Range(0, 10, ThreadPoolScheduler.Instance);
Console.WriteLine("Last = " + await xs);
Console.WriteLine("First = " + await xs.FirstAsync());
Console.WriteLine("Third = " + await xs.ElementAt(3));
Console.WriteLine("All = " + string.Join(", ", await xs.ToList()));
try
{
Console.WriteLine("Error = " + await xs.Select(x => 1 / (5 - x)));
}
catch (DivideByZeroException)
{
Console.WriteLine("Yups, we failed!");
}
}
但是,RX v2.0的await扩展目前只支持.net 4.5,如果我们要在.net 4.0或以下的版本中则无法使用。因此,我手动写了个扩展函数,让IObservable支持await操作。这样一来我们也可以在.net 4.0的observable的异步编程中使用await了。
static class ObservableExtend
{
public static ObservableAwaiter<T> GetAwaiter<T>(this IObservable<T> stream)
{
return new ObservableAwaiter<T>(stream);
}
}
class ObservableAwaiter<T> : System.Runtime.CompilerServices.INotifyCompletion
{
IObservable<T> stream;
T result;
Exception err;
public ObservableAwaiter(IObservable<T> stream)
{
this.stream = stream;
}
public bool IsCompleted { get { return false; } }
public void OnCompleted(Action continuation)
{
stream.Subscribe(item => this.result = item,
err => { this.err = err; continuation(); },
() => continuation());
}
public T GetResult()
{
if (err != null)
throw err;
return result;
}
}
{
public static ObservableAwaiter<T> GetAwaiter<T>(this IObservable<T> stream)
{
return new ObservableAwaiter<T>(stream);
}
}
class ObservableAwaiter<T> : System.Runtime.CompilerServices.INotifyCompletion
{
IObservable<T> stream;
T result;
Exception err;
public ObservableAwaiter(IObservable<T> stream)
{
this.stream = stream;
}
public bool IsCompleted { get { return false; } }
public void OnCompleted(Action continuation)
{
stream.Subscribe(item => this.result = item,
err => { this.err = err; continuation(); },
() => continuation());
}
public T GetResult()
{
if (err != null)
throw err;
return result;
}
}
由于目前.net 4.5还没有正式版,这里我使用的环境是VisualStudio 2012 RC,不保证正式版本中GetAwaiter约束是否和现在一致,不过估计也差不多。
用了一段时间后,感觉IObservable和await关键字配合起来非常好用,前面我也写过两个简单的扩展函数,感兴趣的朋友可以看一下。