初识Parallel Extensions之TPL(二)
LazyBee
前面我们谈了结构并行化的For循环(具体参见:初识Parallel Extensions之TPL),今天继续谈谈结构并行化的ForEach和Do。
ForEach
对于IEnumerable<T>的循环,并行扩展也提供了相应的并行实现,就是Parallel..ForEach.下面就让我们来看看,Parallel.ForEach的定义:
public static void ForEach<TSource>(IEnumerable<TSource> source, Action<TSource> body)
其中:source是要被处理的数据源,
body是针对每一个数据源中的元素要执行的操作。
例如:我们需要得到指定目录下所有扩展名为txt的文本文件中包含LazyBee的文件个数:
{
if (path == null) throw new ArgumentNullException("path");
if (File.Exists(path) && path.EndsWith(extName) )
{
yield return path;
}
else if (Directory.Exists(path))
{
foreach (string file in Directory.GetFiles(path,"*."+extName))
yield return file;
foreach (string directory in Directory.GetDirectories(path))
{
foreach (string file in GetFiles(directory, "*."+extName))
yield return file;
}
}
}
private static bool FileContainsString(string text, string path)
{
using (StreamReader sr = new StreamReader(File.OpenRead(path)))
{
string line;
while ((line = sr.ReadLine()) != null)
{
if (line.IndexOf(text, 0, StringComparison.CurrentCultureIgnoreCase) >= 0)
return true;
}
}
return false;
}
在有了上面的定义之后,我们就可以写我们的并行循环搜索了:
Parallel.ForEach(GetFiles(path,"txt"), file =>
{
if (FileContainsString(text, file)) Interlocked.Increment(ref count);
});
当然,Parallel.ForEach也提供了和Parallel.For相似的重载版本:
public static void ForEach<TSource>(IEnumerable<TSource> source,
Action<TSource, int, ParallelState> body)
public static void ForEach<TSource, TLocal>(
IEnumerable<TSource> source,
Func<TLocal> threadLocalSelector,
Action<TSource, int, ParallelState<TLocal>> body,
Action<TLocal> threadLocalCleanup
)
其中,source是要被处理的数据源
threadLocalSelector是产生线程本地状态的函数
body是每次循环(迭代)要进行的处理操作。这个操作都有三个输入参数,第一个
就是数据源中的元素,int类型的参数是该元素在数据源中的index.ParallelState类型的参数是用来在需要的情况下来终止循环的,ParallelState<TLocal>是用来保存线程本地状态以及需要时终止循环的。
threadLocalCleanup是用于清理线程本地状态的操作
关于这些重载版本的使用和Parallel.For的使用类似,具体请参考初识Parallel Extensions之TPL的For循环部分的示例。
Do
Parallel.Do主要是用来并行化那些不存在相互依赖顺序的任务,这种情况在递归中是比较多见的,例如遍历树型结构时向左和向右没有顺序之分就可以使用它。具体我们还是来先看看其是如何定义的:
public static void Do(params Action[] actions)
public static void Do(Action[] actions, TaskManager manager,
TaskCreationOptions options)
其中:actions是并行要执行的操作(任务)。这个Action代理类型是.Net3.5引入的,用于定义不带参数也没有返回值的操作。
manager是任务的调度管理器。你可以使用它配合TaskManagerPolicy来指定并行运
行的线程数。
options是一个枚举类型,可以指定以下四种取值:
None:指定使用缺省的行为。
SuppressExecutionContextFlow:指定并不继承当前执行的上下文
RespectParentCancelation:如果父任务取消,该任务也随之取消
SelfReplicating:在处理器可用时将进行自我复制
下面我们来看一个例子,假设我们有一个带有整形数据的二叉树,我们现在需要遍历每一个节点,并且将每个节点的数值都加2,首先我们来定义我们的二叉树:
{
public T Data;
public Tree<T> Left, Right;
}
接下来,是填充二叉树数据的函数:
public static Tree<int> GenerateRandom(int depth)
{
Tree<int> root = new Tree<int> { Data = _rnd.Next() };
if (depth > 0)
{
root.Left = GenerateRandom(depth - 1);
root.Right = GenerateRandom(depth - 1);
}
return root;
}
最后就是如何来实现我们的业务逻辑的代码了:
{
Tree<int> tree =GenerateRandom(5);
ProcessTreeParallel(tree, (tmpTree, i) => { tmpTree.Data += i; Console.WriteLine(tmpTree.Data); },2);
}
public static void ProcessTreeParallel(Tree<int> tree,Action<Tree<int>,int> action,int v1)
{
if (tree == null) return;
Parallel.Do(() => ProcessTreeParallel(tree.Left, action),
()=>ProcessTreeParallel(tree.Right, action),
()=>action(tree,v1));
}
Parallel.Do只有在语句中的所有任务都完成后才会继续往下走。
大家能看到有了Lambda表达式,我们真是省去了很多的代码。
有关TaskManager以及TaskCreationOptions如何使用将在下一次任务并行性中来描述。