TPL Dataflow是微软面向高并发应用而推出的新程序库。借助于异步消息传递与管道,它可以提供比线程池更好的控制。本身TPL库在DataflowBlock类中提供了不少扩展函数,用起来还是非常方便的,但感觉还是不够全(当然,MS没必要设计大而全的接口),前段时间写个小程序的时候用到了它,当时顺便写了几个扩展函数,这里记录一下,如果后续有扩展再继续补充。
static class DataFlowExtension
{
///
<summary>
///同步发送所有数据至TargetBlock
///
</summary>
public
static
void PostAll<T>(this
ITargetBlock<T> target, IEnumerable<T> source)
{
var isSuccess = source.All(i => target.Post(i));
if (!isSuccess)
{
throw
new
InvalidOperationException();
}
target.Complete();
}
///
<summary>
///异步发送所有数据至TargetBlock
///
</summary>
public
static
async
Task PostAllAsync<T>(this
ITargetBlock<T> target, IEnumerable<T> source)
{
foreach (var item in source)
{
await target.SendAsync(item);
}
target.Complete();
}
///
<summary>
///同步从数据源中获取所有数据
///
</summary>
public
static
IReadOnlyList<T> ReceiveAll<T>(this
IReceivableSourceBlock<T> source)
{
IList<T> output;
if (!source.TryReceiveAll(out output))
{
throw
new
InvalidOperationException();
}
return output as
IReadOnlyList<T>;
}
///
<summary>
///异步从数据源中获取所有数据
///
</summary>
public
static
async
Task<IReadOnlyList<T>> ReceiveAllAsync<T>(this
ISourceBlock<T> source)
{
List<T> output = new
List<T>();
while (await source.OutputAvailableAsync())
{
output.Add(source.Receive());
}
return output;
}
}
这几个扩展函数本身是对DataflowBlock类中的函数二次封装,没有太多的功能,基本上每个函数都只有几行,主要为了使用更加方便罢了,由于实现简单,扩充它也是非常方便的。