zoukankan      html  css  js  c++  java
  • Parallel.Foreach的并发问题解决方法-比如爬虫WebClient

    场景五:线程局部变量

    Parallel.ForEach 提供了一个线程局部变量的重载,定义如下:

    public static ParallelLoopResult ForEach<TSource, TLocal>(
        IEnumerable<TSource> source,
        Func<TLocal> localInit,
        Func<TSource, ParallelLoopState, TLocal,TLocal> body,
        Action<TLocal> localFinally)

    使用的示例:

    public static List<R> Filtering<T,R>(IEnumerable<T> source)
    {
    	var results = new List<R>();
    	using (SemaphoreSlim sem = new SemaphoreSlim(1))
    	{
    		Parallel.ForEach(source,
    			() => new List<R>(),
    			(element, loopstate, localStorage) =>
    			{
    				bool filter = filterFunction(element);
    				if (filter)
    					localStorage.Add(element);
    				return localStorage;
    			},
    			(finalStorage) =>
    			{
    				lock(myLock)
    				{
    					results.AddRange(finalStorage)
    				};
    			});
    	}
    	return results;
    }

    线程局部变量有什么优势呢?请看下面的例子(一个网页抓取程序):

    public static void UnsafeDownloadUrls ()
    {
    	WebClient webclient = new WebClient();
    	Parallel.ForEach(urls,
    		(url,loopstate,index) =>
    		{
    			webclient.DownloadFile(url, filenames[index] + ".dat");
    			Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
    		});
    }

    通常第一版代码是这么写的,但是运行时会报错“System.NotSupportedException -> WebClient does not support concurrent I/O operations.”。这是因为多个线程无法同时访问同一个 WebClient 对象。所以我们会把 WebClient 对象定义到线程中来:

    public static void BAD_DownloadUrls ()
    {
    	Parallel.ForEach(urls,
    		(url,loopstate,index) =>
    		{
    			WebClient webclient = new WebClient();
    			webclient.DownloadFile(url, filenames[index] + ".dat");
    			Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
    		});
    }

    修改之后依然有问题,因为你的机器不是服务器,大量实例化的 WebClient 迅速达到你机器允许的虚拟连接上限数。线程局部变量可以解决这个问题:

    public static void downloadUrlsSafe()
    {
    	Parallel.ForEach(urls,
    		() => new WebClient(),
    		(url, loopstate, index, webclient) =>
    		{
    			webclient.DownloadFile(url, filenames[index]+".dat");
    			Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
    			return webclient;
    		},
    			(webclient) => { });
    }

    这样的写法保证了我们能获得足够的 WebClient 实例,同时这些 WebClient 实例彼此隔离仅仅属于各自关联的线程。

    虽然 PLINQ 提供了 ThreadLocal<T> 对象来实现类似的功能:

    public static void downloadUrl()
    {
    	var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
    	var res =
    		urls
    		.AsParallel()
    		.ForAll(
    			url =>
    			{
    				webclient.Value.DownloadFile(url, host[url] +".dat"));
    				Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
    			});
    }

    但是请注意:ThreadLocal<T> 相对而言开销更大!

    --

    场景五:退出操作 (使用 Parallel.ForEach)

    Parallel.ForEach 有个重载声明如下,其中包含一个 ParallelLoopState 对象:

    public static ParallelLoopResult ForEach<TSource >(
        IEnumerable<TSource> source,
        Action<TSource, ParallelLoopState> body)

    ParallelLoopState.Stop() 提供了退出循环的方法,这种方式要比其他两种方法更快。这个方法通知循环不要再启动执行新的迭代,并尽可能快的推出循环。

    ParallelLoopState.IsStopped 属性可用来判定其他迭代是否调用了 Stop 方法。

    示例:

    public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
    {
    	var matchFound = false;
    	Parallel.ForEach(TSpace,
    		(curValue, loopstate) =>
    			{
    				if (curValue.Equals(match) )
    				{
    					matchFound = true;
    					loopstate.Stop();
    				}
    			});
    	return matchFound;
    }

    ParallelLoopState.Break() 通知循环继续执行本元素前的迭代,但不执行本元素之后的迭代。最前调用 Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration 属性中。这种处理方式通常被应用在一个有序的查找处理中,比如你有一个排序过的数组,你想在其中查找匹配元素的最小 index,那么可以使用以下的代码:

    public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
    {
    	var loopResult = Parallel.ForEach(source,
    		(curValue, loopState, curIndex) =>
    		{
    			if (curValue.Equals(match))
    			{
    				loopState.Break();
    			}
    		 });
    	var matchedIndex = loopResult.LowestBreakIteration;
    	return matchedIndex.HasValue ? matchedIndex : -1;
    }

    更多:
    http://www.tuicool.com/articles/jqaUVj
  • 相关阅读:
    JQuery源码解读 JQ框架简化( 妙味讲堂
    Mia Fringe官网会员须知
    require.js
    :before与::before的区别
    css----苹果移动端以及小程序滚动模块卡顿的处理
    Vue使用国密SM4加密
    vue + echarts + echarts-gl 实现3D 环状图
    React Hook 初学
    常用阻止默认行为的两种方式
    理解事件触发,事件捕获,事件冒泡
  • 原文地址:https://www.cnblogs.com/x-poior/p/6263143.html
Copyright © 2011-2022 走看看