zoukankan      html  css  js  c++  java
  • Html异步下载分析

    最近做了许多关于Html下载的项目,自己也总结了许多的东西,现在将关于 Html异步下载分析的代码贴过来,以备后用!

    首先DownloadPool部分代码:

    View Code
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.IO;
    using System.IO.Compression;
    using System.Net;
    using System.Threading;

    namespace Pmars
    {
    /// <summary>
    /// 下载所需要的类,DownloadPool针对这个类来操作
    /// 在这个类里面的一个数据节点DataItem是需要在实际项目中分析的类
    /// </summary>
    /// <typeparam name="T">需要在实际项目中分析的类</typeparam>
    public class DownloadInfo<T>
    {
    public Uri Address; //下载地址
    public string FileName; //使用DownloadFile的时候,需要文件地址
    public string ContentType; //在下载中会对它赋值,如果需要得到网页的Encoding时,需要用到它
    public Exception Error; //下载异常 null 表示没有异常
    public int ErrorCount; //下载错误次数,和 ReTryCount 比较使用
    public T DataItem; //需要在实际项目中分析的类
    public object Tag; //下载后将字节流放到Tag里面
    }

    public class DownloadPool<T>
    {
    //下载队列
    private ConcurrentQueue<DownloadInfo<T>> _infos = new ConcurrentQueue<DownloadInfo<T>>();
    //WebClient to CreateTime 表示生成webClient的时间,用来超时取消
    private readonly List<KeyValuePair<WebClient, DateTime>> _webClients = new List<KeyValuePair<WebClient, DateTime>>();

    private readonly int _maxParallelism = 256; //待分析html最大数量,构造函数中改变其值
    private int _runningCount = 0; //正在运行的下载线程数
    private int _beginCount = 0; //已经开始的下载线程数(包括已经下载完毕的)
    private int _endCount = 0; //已经下载完成的Html页面数量
    private bool _isPause = false; //当待分析Html数量大于_maxParallelism时,需要暂停下载

    public Func<DownloadInfo<T>, object> FinishProceed; //下载完成后运行的函数委托
    public Func<bool> GuardThreadProceed; //下载运行时运行的函数委托
    public Func<bool> OnEndProceed; //所有下载完成后的函数委托
    public int TimeOut = 60; //下载Html的超时时间
    public int GuardThreadInterval = 10000; //多少时间进行一次超时取消webclient操作

    private Thread guardThread; //超时取消webclient线程

    public int BeginCount { get { return _beginCount; } }
    public int EndCount { get { return _endCount; } }
    public int RunningCount { get { return _runningCount; } }
    public bool IsPause { get { return _isPause; } }

    private const string DefaultUserAgent = @"Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 6.1; Trident/4.0; SLCC2; .NET CLR 2.0.50727; .NET CLR 3.5.30729; .NET CLR 3.0.30729; CIBA; .NET4.0C; .NET4.0E)";
    private const string DefaultPostContentType = "application/x-www-form-urlencoded";

    /// <summary>
    /// 构造函数,初始化操作
    /// </summary>
    /// <param name="max">待分析队列中最大Html数量,设置暂停用</param>
    public DownloadPool(int max)
    {
    if (max > 0 && max < 2048)
    _maxParallelism = max;

    guardThread = new Thread(CleanUpWebClients);
    guardThread.Start();
    }

    /// <summary>
    /// 单个任务添加
    /// </summary>
    /// <param name="info">单个节点,T为分析类</param>
    public void QueueTask(DownloadInfo<T> info)
    {
    _infos.Enqueue(info);
    DoSingleWork();
    }

    /// <summary>
    /// 多任务添加
    /// </summary>
    /// <param name="infoArray">分析类数组,T为分析类</param>
    public void QueueTasks(DownloadInfo<T>[] infoArray)
    {
    foreach(var item in infoArray)
    _infos.Enqueue(item);

    DoWork();
    }

    //暂停操作
    public void Pause()
    {
    if (!_isPause)
    {
    _isPause = true;
    Console.WriteLine("Pause");
    }
    }

    //重启操作
    public void Resume()
    {
    if (_isPause)
    {
    _isPause = false;
    DoWork();
    Console.WriteLine("Resume");
    }
    }

    private void DoWork()
    {
    if (!_isPause)
    {
    ThreadPool.QueueUserWorkItem(x =>
    {
    int temp = _runningCount;
    while (_infos.Count > 0 && ++temp < _maxParallelism)
    DoSingleWork();
    }, null);
    }
    }

    private void DoSingleWork()
    {
    while (_infos.Count > 0 && !_isPause && _runningCount < _maxParallelism)
    {
    DownloadInfo<T> info;
    if (_infos.TryDequeue(out info))
    {
    Interlocked.Increment(ref _beginCount);
    Interlocked.Increment(ref _runningCount);
    DownloadData(info);
    break;
    }
    }
    }

    //初始化WebClient
    private void InitWebClient(WebClient webClient, Uri uri)
    {
    webClient.Credentials = CredentialCache.DefaultCredentials;
    webClient.Headers.Add("User-Agent", DefaultUserAgent);
    webClient.Headers.Add("Content-Type", DefaultPostContentType);
    webClient.Headers.Add("Referer", uri.AbsoluteUri);
    ServicePoint servicePoint = ServicePointManager.FindServicePoint(uri);
    if (servicePoint.Expect100Continue == true)
    servicePoint.Expect100Continue = false;
    }

    //处理下载字节流操作
    public static MemoryStream Decompress(byte[] dataBytes, string compressEncoding)
    {
    MemoryStream memory = new MemoryStream(dataBytes);
    if(string.IsNullOrEmpty(compressEncoding))
    return memory;

    const int size = 4096;
    byte[] buffer = new byte[size];
    int count = 0;

    MemoryStream outStream = new MemoryStream();

    if (compressEncoding.Equals("gzip", StringComparison.CurrentCultureIgnoreCase))
    {
    using (GZipStream stream = new GZipStream(memory, CompressionMode.Decompress))
    {
    while ((count = stream.Read(buffer, 0, buffer.Length)) > 0)
    outStream.Write(buffer, 0, count);
    }
    }
    else if (compressEncoding.Equals("deflate", StringComparison.CurrentCultureIgnoreCase))
    {
    using (DeflateStream stream = new DeflateStream(memory, CompressionMode.Decompress))
    {
    while ((count = stream.Read(buffer, 0, buffer.Length)) > 0)
    outStream.Write(buffer, 0, count);
    }
    }
    else
    return memory;

    outStream.Seek(0, SeekOrigin.Begin);
    return outStream;
    }

    private void DownloadFile(DownloadInfo<T> info)
    {
    WebClient webClient = new WebClient();

    lock (_webClients)
    _webClients.Add(new KeyValuePair<WebClient, DateTime>(webClient, DateTime.Now));

    InitWebClient(webClient, info.Address);
    info.Error = null;
    webClient.DownloadFileCompleted += (sender, e) =>
    {
    if (e.Error != null)
    info.Error = e.Error;
    else if (e.Cancelled)
    info.Error = new Exception("超时已取消");

    if (FinishProceed != null)
    FinishProceed(info);

    DownloadFinished(webClient);
    };

    try
    {
    webClient.DownloadFileAsync(info.Address, info.FileName);
    }
    catch (Exception e)
    {
    info.Error = e;
    DownloadFinished(webClient);
    }
    }

    private void DownloadData(DownloadInfo<T> info)
    {
    WebClient webClient = new WebClient();

    lock (_webClients)
    _webClients.Add(new KeyValuePair<WebClient, DateTime>(webClient, DateTime.Now));

    InitWebClient(webClient, info.Address);
    info.Error = null;
    webClient.DownloadDataCompleted += (sender, e) =>
    {
    if (e.Error != null)
    info.Error = e.Error;
    else if (e.Cancelled)
    info.Error = new Exception("超时已取消");
    else
    {
    string contentEncoding = webClient.ResponseHeaders["Content-Encoding"];
    info.ContentType = webClient.ResponseHeaders["Content-Type"];
    info.Tag = Decompress(e.Result, contentEncoding);
    }

    if (FinishProceed != null)
    FinishProceed(info);

    DownloadFinished(webClient);
    };

    try
    {
    webClient.DownloadDataAsync(info.Address);
    }
    catch (Exception e)
    {
    info.Error = e;
    DownloadFinished(webClient);
    }
    }

    private void DownloadFinished(WebClient webClient)
    {
    webClient.Dispose();
    Interlocked.Increment(ref _endCount);
    Interlocked.Decrement(ref _runningCount);
    DoSingleWork();
    }

    private void CleanUpWebClients()
    {
    while (true)
    {
    Thread.Sleep(GuardThreadInterval);
    lock (_webClients)
    {
    DateTime expireTime = DateTime.Now.AddSeconds(TimeOut * -1);

    _webClients.RemoveAll(item =>
    {
    if (item.Key.IsBusy)
    {
    if (item.Value < expireTime)
    item.Key.CancelAsync();
    else
    return false;
    }

    return true;
    });
    }

    if (GuardThreadProceed != null)
    if (!GuardThreadProceed())
    break;
    }

    if (OnEndProceed != null)
    while (!OnEndProceed())
    {
    Thread.Sleep(10000);
    }
    }
    }
    }

    之后是AnalysisPool部分代码

    View Code
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Threading;
    using System.Threading.Tasks;

    namespace Pmars
    {
    public class LimitedTaskScheduler : TaskScheduler
    {
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;
    private readonly ConcurrentQueue<Task> _tasks = new ConcurrentQueue<Task>();
    private int _maxParallelism = 512;
    private int _runningCount = 0;
    private int _queueCount = 0;
    private int _beginCount = 0;

    public int RunningCount { get { return _runningCount; } }
    public int QueueCount { get { return _queueCount; } }
    public int BeginCount { get { return _beginCount; } }

    public LimitedTaskScheduler(int max)
    {
    if (max > 0 && max < 1024)
    _maxParallelism = max;
    }

    protected override IEnumerable<Task> GetScheduledTasks()
    {
    return _tasks.ToArray();
    }

    protected override void QueueTask(Task task)
    {
    _tasks.Enqueue(task);
    Interlocked.Increment(ref _queueCount);

    if (_runningCount < _maxParallelism)
    {
    Interlocked.Increment(ref _runningCount);
    DoWork();
    }
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
    if (!_currentThreadIsProcessingItems || taskWasPreviouslyQueued)
    return false;

    return base.TryExecuteTask(task);
    }

    public sealed override int MaximumConcurrencyLevel { get { return _maxParallelism; } }

    protected sealed override bool TryDequeue(Task task)
    {
    return _tasks.TryPeek(out task);
    }

    private void DoWork()
    {
    ThreadPool.UnsafeQueueUserWorkItem(x =>
    {
    _currentThreadIsProcessingItems = true;
    try
    {
    while (true)
    {
    if (_tasks.Count == 0)
    {
    Interlocked.Decrement(ref _runningCount);
    break;
    }

    Task item;

    if (_tasks.TryDequeue(out item))
    {
    Interlocked.Increment(ref _beginCount);
    base.TryExecuteTask(item);
    }
    }
    }
    finally { _currentThreadIsProcessingItems = false; }
    }, null);
    }
    }
    }

    如何使用它们

    View Code
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Data;
    using System.Data.SqlClient;
    using System.Text.RegularExpressions;
    using Test;
    using System.Web;
    using System.Net;
    using Mozilla.NUniversalCharDet;
    using System.Globalization;
    using System.IO;
    using System.Threading.Tasks;
    using HtmlAgilityPack;
    using System.Threading;

    namespace Pmars
    {
    class Pmars
    {
    public static bool isFinished = false;

    private DownloadPool<DataNode> dp = new DownloadPool<DataNode>(160); ///download queue length
    private LimitedTaskScheduler lts = new LimitedTaskScheduler(1); ///analyzer queue length
    private Dictionary<int, Encoding> webSiteIdEncodingDic = InitEncoding(); ///encoding dic
    private int ReTryCount = 5; //max download error retry times
    private DateTime begin;

    public void PmarsTest()
    {
    List<DownloadInfo<DataNode>> downList = GetDownList(); //初始化下载数据

    dp.OnEndProceed = EndFunction;
    dp.FinishProceed = DownloadFinished;
    dp.GuardThreadProceed = ShowMessage;

    dp.QueueTasks(downList.ToArray());

    begin = DateTime.Now;
    }

    private List<DownloadInfo<DataNode>> GetDownList()
    {
    List<DownloadInfo<DataNode>> downList = new List<DownloadInfo<DataNode>>();
    //读取数据部分略写
    return downList;
    }

    private bool ShowMessage()
    {
    int threads = 0, ios = 0;
    ThreadPool.GetAvailableThreads(out threads, out ios);
    Console.ForegroundColor = ConsoleColor.Red;
    Console.WriteLine("下载:{0} 分析:{1} 待分析:{2} 并行:{3}/{4}",
    dp.BeginCount, lts.BeginCount, lts.QueueCount - lts.BeginCount, dp.RunningCount, lts.RunningCount);
    double sp = (double)lts.BeginCount / (double)(DateTime.Now - begin).TotalSeconds;
    Console.ForegroundColor = ConsoleColor.White;
    Console.WriteLine("Threads/IOs {0}/{1} 分析速度:{2:N3}个/秒 - {3}", threads, ios, sp, DateTime.Now);
    if (lts.QueueCount - lts.BeginCount > 2048)
    dp.Pause();
    else if (lts.QueueCount - lts.BeginCount < 512)
    dp.Resume();

    if (dp.RunningCount == 0 && lts.RunningCount == 0)
    return false;

    return true;
    }

    private object DownloadFinished(DownloadInfo<DataNode> dInfo)
    {
    //if download successful
    if (dInfo.Error == null)
    {
    if (!(dInfo.Tag is string))
    {
    Encoding encoding = null;
    lock (webSiteIdEncodingDic)
    {
    webSiteIdEncodingDic.TryGetValue(dInfo.DataItem.WebSiteId, out encoding);
    if (encoding == null)
    {
    ///在这里需要得到Encoding的,之后加到webSiteIdEncodingDic中去
    ///不过,这里肯定不会得不到,所以就直接略写了这里
    ///如果需要得到Encoding,参见前面发的一个GetEncoding的博客
    throw new Exception();
    }
    }
    }
    Task task = new Task(new Action<object>(AnalyzeMethod), dInfo);
    task.Start(lts);
    }
    //else download error
    else
    {
    WebException wex = dInfo.Error as WebException;
    bool ag = true;
    //如果是WebException异常
    if (wex != null)
    {
    HttpWebResponse hwr = wex.Response as HttpWebResponse;
    if (hwr != null)
    {
    if (hwr.StatusCode == HttpStatusCode.NotFound || hwr.StatusCode == HttpStatusCode.Moved ||
    wex.Status == WebExceptionStatus.ConnectFailure || wex.Status == WebExceptionStatus.NameResolutionFailure)
    {
    ag = false;
    }
    }
    }
    //没有在WebException中得到处理则需要重新下载
    if (ag)
    {
    dInfo.ErrorCount++;
    if (dInfo.ErrorCount >= ReTryCount)
    {
    ag = false;
    }
    else
    {
    dp.QueueTask(dInfo);
    }
    }
    }

    return dInfo;
    }

    private void AnalyzeMethod(object obj)
    {
    DownloadInfo<DataNode> item = obj as DownloadInfo<DataNode>;
    HtmlDocument doc = new HtmlDocument();
    Encoding encoding = Encoding.Default;
    webSiteIdEncodingDic.TryGetValue(item.DataItem.WebSiteId, out encoding);
    if ( encoding == null) encoding = Encoding.UTF8;

    using (MemoryStream mStream = item.Tag as MemoryStream)
    {
    if (mStream == null)
    {
    doc.LoadHtml(item.Tag as string);
    }
    else
    {
    item.DataItem.Content = encoding.GetString(mStream.ToArray());
    }
    }
    DataNode analNode = item.DataItem;

    switch (analNode.WebSiteId)
    {
    case 1:
    Console.WriteLine(analNode.DownUrl);
    break;
    case 2:
    Console.WriteLine(analNode.DownUrl);
    break;
    default:
    Console.WriteLine("没有这个");
    break;
    }
    }

    private bool EndFunction()
    {
    Console.WriteLine("EncFunction is running!");
    isFinished = true;

    return true;
    }

    private static Dictionary<int, Encoding> InitEncoding()
    {
    Dictionary<int, Encoding> encodingDic = new Dictionary<int, Encoding>();
    encodingDic.Add(1, Encoding.GetEncoding("GBK"));
    encodingDic.Add(2, Encoding.GetEncoding("gb2312"));
    return encodingDic;
    }
    }

    class DataNode
    {
    public string DownUrl;
    public string Content;
    public int WebSiteId;
    }
    }

    外部调用参考

    View Code
                Pmars.Pmars test = new Pmars.Pmars();
    test.PmarsTest();

    while (!Pmars.Pmars.isFinished)
    Thread.Sleep(10000);

    这里所有的Download代码就在这里了!





  • 相关阅读:
    【六道无鱼】ExifTool编辑修改图片GPS
    【Elastic】Filebeat+ELK日志收集分析方案
    【Cesium】鹰眼地图功能
    【Cesium】3dtiles模型单体化
    【PHP】Version 7.2.13 报错 Fatal error: Call to undefined function curl_init()解决方案
    【ODM】win10 安装 webODM
    【数据库】mysql 删除多个关联的表
    【ElasticSearch】win10 安装elasticSearch 6.6.1
    【Cesium】视域分析 基于3dtiles做的视域分析
    缕清思路,继续前行
  • 原文地址:https://www.cnblogs.com/pmars/p/2321564.html
Copyright © 2011-2022 走看看