zoukankan      html  css  js  c++  java
  • 简单Http多线程下载实现

    闲着没事试着写写,本来想应该挺简单的,但一写就折腾大半天。

    Http要实现多线程现在需要WebHost对HttpHeader中Range支持,有些资源不支持Range头就必须顺序下载。

    协议参考 rfc2616:http://www.ietf.org/rfc/rfc2616.txt 

    大概步骤:

     1.检测Range支持,同时获取长度

     2. 通过长度创建一个相当大小的文件

     3. 创建线程组

     4. 分隔文件

     5. 各个线程获取一个文件块任务(比较小),读取后放在内存中,完成块后写入文件,再领取下一个文件块任务

     6. 直到全部块都完成

    *如果将完成进度实时持久化,启动的时候加载进度,那么就能断的续传了。

      线程组用异步IO 代替,为了简便任务用Stack管理块得分配,失败后再次push回,按照大概的顺序关系下载,直到栈空结束下载。实现比较简单,实际网络情况复杂,还有很多功能需要完善,不多说直接贴代码吧:

     1.

      class ConcurrentDownLoader
        {
            
    public Uri ToUri { getset; }

            
    public string SavePath { getset; }

            
    private int _contentLength;
            
    public int ContentLength { get{return _contentLength;} }

            
    private bool _acceptRanges;
            
    public bool AcceptRanges { get { return _acceptRanges; } }

            
    private int _maxThreadCount;
            
    public int MaxThreadCount { get{return _maxThreadCount;} }

            
    public event Action OnDownLoaded;

            
    public event Action<Exception>  OnError;

            
    private FileStream saveFileStream;
            
    private object _syncRoot = new object();

            
    public ConcurrentDownLoader(Uri uri, string savePath)
                : 
    this(uri, savePath, 5)
            { 
            
            }

            
    public ConcurrentDownLoader(Uri uri,string savePath,int maxThreadCount)
            {
                ToUri 
    = uri;
                SavePath 
    = savePath;
                _maxThreadCount 
    = maxThreadCount;

                ServicePointManager.DefaultConnectionLimit 
    = 100;
            }

         
            
    public void DownLoadAsync()
            {
                _acceptRanges 
    = CheckAcceptRange();
                
                
    if (!_acceptRanges) //可以使用顺序下载
                    throw new Exception("resource cannot allow seek");
                
                
    //create a File the same as ContentLength
                if (File.Exists(SavePath))
                    
    throw new Exception(string.Format("SavePath:{0} already exists",SavePath));
                saveFileStream 
    = File.Create(SavePath);
                saveFileStream.SetLength(ContentLength);
                saveFileStream.Flush();

                PartManager pm 
    = new PartManager(this);
                pm.OnDownLoaded 
    += () => 
                {
                    saveFileStream.Close();
                    
    if (OnDownLoaded != null) ;
                    OnDownLoaded();
                };

                pm.OnError
    +=(ex)=>
                    {
                        saveFileStream.Close();
                    
    if(OnError!=null)
                        OnError(ex);
                    };

                pm.Proc();
            }

             
    public void WriteFile(DPart part, MemoryStream mm)
            {
                
    lock (_syncRoot)
                {
                    
    try
                    {
                        mm.Seek(
    0, SeekOrigin.Begin);
                        saveFileStream.Seek(part.BeginIndex, SeekOrigin.Begin);
                        
    byte[] buffer = new byte[4096];
                        
    int count = 0;
                        
    while ((count = mm.Read(buffer, 0, buffer.Length)) > 0)
                            saveFileStream.Write(buffer, 
    0, count);

                        saveFileStream.Flush();

                        Console.WriteLine(
    "写入:{0}~{1} 成功",part.BeginIndex,part.EndIndex);   
                    }
                    
    catch (Exception ex)
                    {
                        Console.WriteLine(
    "{0},Write Error 这该咋办呢?? fu*K",ex.Message);   
                    }
                }
            }

            
    //检测资源是否支持断点续传
            public bool CheckAcceptRange()
            {
                
    bool isRange = false;

                
    //同步方式,取到应答头即可
                HttpWebRequest req = (HttpWebRequest)WebRequest.Create(ToUri);
                WebResponse rsp 
    = req.GetResponse();
                
    if(rsp.Headers["Accept-Ranges"]=="bytes")
                    isRange
    =true;

                _contentLength 
    = (int)rsp.ContentLength;
                rsp.Close();

                
    return isRange;
            }

        

        }

    2.

     class PartManager

        {
            private int partSize = 128 * 1024//128K每份
            private ConcurrentDownLoader _loader;
            
    private ConcurrentStack<DPart> _dParts;
            
    private AsyncWebRequest[] _reqs;
            
    public event Action OnDownLoaded;
            
    public event Action<Exception> OnError;
            
    private int _aliveThreadCount;
            
    private Thread _checkComplete;

            
    public PartManager(ConcurrentDownLoader loader)
            {
                _loader 
    = loader;
                _dParts 
    = new ConcurrentStack<DPart>();
                _checkComplete 
    = new Thread(new ThreadStart(() =>
                {
                    Thread.Sleep(
    3 * 1000);
                    
    while (true)
                    {
                        
    int count = 0;
                        
    foreach (var req in _reqs)
                        {
                            
    if (req.IsComplete)
                                count
    ++;
                        }
                        
    if (_reqs.Length == count)
                        {
                            
    if (OnDownLoaded != null)
                            {
                                OnDownLoaded();
                                _checkComplete.Abort();
                            }
                        }
                        
    else
                            Thread.Sleep(
    1000);
                    }
                }));
                _checkComplete.IsBackground 
    = true;
                _checkComplete.Start();

                
    //parts Data 
                if (loader.ContentLength < partSize)
                    _dParts.Push(
    new DPart() { BeginIndex = 1, EndIndex = loader.ContentLength });
                
    else
                {
                    
    int count = loader.ContentLength % partSize == 0 ? loader.ContentLength / partSize : loader.ContentLength / partSize + 1;
                    
    for (int i = 0; i < count; i++)
                    {
                        DPart p 
    = new DPart();
                        p.BeginIndex 
    = i * partSize;
                        p.EndIndex 
    = (i + 1* partSize - 1;

                        
    if (count - 1 == i)
                            p.EndIndex 
    = loader.ContentLength - 1;

                        _dParts.Push(p);
                    }
                }

                
    //建立工作线程
                _aliveThreadCount = loader.MaxThreadCount;
                _reqs 
    = new AsyncWebRequest[loader.MaxThreadCount];
                
    for (int i = 0; i < loader.MaxThreadCount; i++)
                {
                    _reqs[i] 
    = new AsyncWebRequest(i, loader);
                }
            }

            
    public void Proc()
            {
                
    foreach (AsyncWebRequest req in _reqs)
                {
                    
    if (_dParts.Count > 0)
                    {
                        DPart d;
                        
    if (_dParts.TryPop(out d))
                        {
                            req.IsComplete 
    = false;
                            req.BeginGetStream(Callback, d);
                        }
                        
    else
                            req.IsComplete 
    = true;
                    }
                    
    else
                        req.IsComplete 
    = true;
                }
            }

            
    public void Callback(AsyncWebRequest req, MemoryStream mm, Exception ex)
            {
                
    //一个线程如果3次都失败,就不再使用了,可能是线程数有限制,
                if (ex == null && mm != null)
                {
                    
    //check mm size
                    if ((int)mm.Length == req.EndIndex - req.BeginIndex + 1)
                    {
                        _loader.WriteFile(req.Part, mm);
                        
    //重新分配 Part
                        if (_dParts.Count > 0)
                        {
                            DPart d;
                            
    if (_dParts.TryPop(out d))
                            {
                                req.BeginGetStream(Callback, d);
                            }
                        }
                        
    else
                        {
                            
    //所有Part都已经完成鸟,ok success
                            req.IsComplete = true;
                        }
                    }
                    
    else
                    {
                        req.IsComplete 
    = true;
                        Console.WriteLine(
    "mm Length:{0}, Part Length:{1} ,Why not the same ~~~shit", mm.Length, req.EndIndex - req.BeginIndex);
                        
    //回收分区
                        _dParts.Push(req.Part);
                        Interlocked.Decrement(
    ref _aliveThreadCount);
                        
    if (_aliveThreadCount == 0)
                        {
                            
    if (OnError != null)
                                OnError(
    null);
                        }
                    }
                }
                
    else
                {
                    req.IsComplete 
    = true;
                    
    //回收分区
                    _dParts.Push(req.Part);
                    Interlocked.Decrement(
    ref _aliveThreadCount);
                    
    if (_aliveThreadCount == 0)
                    {
                        _checkComplete.Abort();
                        
    if (OnError != null)
                            OnError(
    null);
                    }
                }
            }
        }

     3. 文件分片

        class DPart
        {
            
    public int BeginIndex;
            
    public int EndIndex;
            
    public bool IsComlete;
            
    public int tryTimes;

        }

    4.异步WebRequst简单封装

        class AsyncWebRequest

        {
            
    //http://www.ietf.org/rfc/rfc2616.txt
            
    // suffix-byte-range-spec = "-" suffix-length
            
    //suffix-length = 1*DIGIT
            
    // Range = "Range" ":" ranges-specifier
            
    //Range: bytes=100-300

            
    //  Content-Range = "Content-Range" ":" content-range-spec
            
    //content-range-spec      = byte-content-range-spec
            
    //byte-content-range-spec = bytes-unit SP
            
    //                          byte-range-resp-spec "/"
            
    //                          ( instance-length | "*" )
            
    //byte-range-resp-spec = (first-byte-pos "-" last-byte-pos)
            
    //                               | "*"
            
    //instance-length           = 1*DIGIT

            
    //      HTTP/1.1 206 Partial content
            
    //Date: Wed, 15 Nov 1995 06:25:24 GMT
            
    //Last-Modified: Wed, 15 Nov 1995 04:58:08 GMT
            
    //Content-Range: bytes 21010-47021/47022
            
    //Content-Length: 26012
            
    //Content-Type: image/gif

            
    private int _threadId;
            
    public int ThreadId { get { return _threadId; } }

            
    public bool IsComplete { getset; }

            
    private ConcurrentDownLoader _loader;
            
    public ConcurrentDownLoader Loader { get { return _loader; } }

            
    private DPart _part;
            
    public DPart Part { get { return _part; } }
            
    private int _beginIndex;
            
    public int BeginIndex { get { return _beginIndex; } }

            
    private int _endIndex;
            
    public int EndIndex { get { return _endIndex; } }

            
    private int _tryTimeLeft;

            
    private Action<AsyncWebRequest, MemoryStream, Exception> _onResponse;
            
    private HttpWebRequest _webRequest;
            
    private MemoryStream _mmStream;
            
    private Stream _rspStream;
            
    public AsyncWebRequest(int threadId, ConcurrentDownLoader loader)
            {
                _threadId 
    = threadId;
                _loader 
    = loader;
            }

            
    public void BeginGetStream(Action<AsyncWebRequest, MemoryStream, Exception> rep, DPart part)
            {
                IsComplete 
    = false;
                _beginIndex 
    = part.BeginIndex;
                _endIndex 
    = part.EndIndex;
                _part 
    = part;
                _onResponse 
    = rep;
                _tryTimeLeft 
    = 3;
                DoRequest();
            }

            
    private void DoRequest()
            {
                _webRequest 
    = (HttpWebRequest)WebRequest.Create(Loader.ToUri);
                _webRequest.AddRange(BeginIndex, EndIndex);
                
    //  _webRequest.Headers.Add(string.Format("Range: bytes={0}-{1}", BeginIndex, EndIndex));
                _mmStream = new MemoryStream(EndIndex - BeginIndex);

                Console.WriteLine(
    "开始获取{0}-{1}段", BeginIndex, EndIndex);

                _webRequest.BeginGetResponse((result) 
    =>
                {
                    
    try
                    {
                        HttpWebRequest req 
    = result.AsyncState as HttpWebRequest;
                        WebResponse rsp 
    = req.EndGetResponse(result);
                        
    //验证Content-Range: bytes的正确性
                        string contentRange = rsp.Headers["Content-Range"];
                        
    // (\d+)\-(\d+)\/(\d+) Match Content-Range: bytes 21010-47021/47022
                        Regex reg = new Regex(@"(\d+)\-(\d+)\/(\d+)");
                        Match mc 
    = reg.Match(contentRange);
                        
    if (mc.Groups.Count == 4)
                        {
                            
    int bid = Convert.ToInt32(mc.Groups[1].Value);
                            
    int eid = Convert.ToInt32(mc.Groups[2].Value);
                            
    int len = Convert.ToInt32(mc.Groups[3].Value);
                            
    if (bid == BeginIndex && eid == EndIndex && Loader.ContentLength == len)
                                Console.WriteLine(
    "开始获取{0}-{1}段时返回成功,Content-Range:{2}", BeginIndex, EndIndex, contentRange);
                            
    else
                                
    throw new Exception(string.Format("开始获取{0}-{1}段时返回失败,Content-Range 验证错误:{2}", BeginIndex, EndIndex, contentRange));
                        }
                        
    else
                        {
                            
    throw new Exception("return Content-Range Error :" + contentRange);
                        }

                        Console.WriteLine(
    "开始获取{0}-{1}段时返回成功,开始读取数据", BeginIndex, EndIndex);

                        _rspStream 
    = rsp.GetResponseStream();
                        
    byte[] buffer = new byte[4096];
                        _rspStream.BeginRead(buffer, 
    04096, EndReadStream, buffer);
                    }
                    
    catch (Exception ex)
                    {
                        
    if (_tryTimeLeft > 0)
                        {
                            Console.WriteLine(
    "获取{0}-{1}失败,ex:{2},重试", BeginIndex, EndIndex, ex.Message);
                            _tryTimeLeft
    --;
                            _rspStream.Close();
                            _rspStream 
    = null;
                            _webRequest 
    = null;
                            DoRequest();
                        }
                        
    else
                        {
                            Console.WriteLine(
    "获取{0}-{1}失败,ex:{2},已经重试3次放弃~~", BeginIndex, EndIndex, ex.Message);
                            
    if (_onResponse != null)
                                _onResponse(
    thisnull, ex);
                        }
                    }
                }, _webRequest);
            }

            
    private void EndReadStream(IAsyncResult result)
            {
                
    try
                {
                    
    byte[] buffer = result.AsyncState as byte[];
                    
    int count = _rspStream.EndRead(result);
                    
    if (count > 0)
                    {
                        Console.WriteLine(
    "读取{0}-{1}段数据中,读取到:{2}字节,continue···", BeginIndex, EndIndex, count);
                        _mmStream.Write(buffer, 
    0, count);
                        _rspStream.BeginRead(buffer, 
    04096, EndReadStream, buffer);
                    }
                    
    else
                    {
                        
    //OK now all is back
                        if (_onResponse != null)
                            _onResponse(
    this, _mmStream, null);
                    }
                }
                
    catch (Exception ex)
                {
                    
    if (_tryTimeLeft > 0)
                    {
                        Console.WriteLine(
    "获取{0}-{1}失败,ex:{2},重试", BeginIndex, EndIndex, ex.Message);
                        _tryTimeLeft
    --;
                        
    if (_rspStream != null)
                            _rspStream.Close();
                        _rspStream 
    = null;
                        _webRequest 
    = null;
                        DoRequest();
                    }
                    
    else
                    {
                        Console.WriteLine(
    "获取{0}-{1}失败,ex:{2},已经重试3次放弃~~", BeginIndex, EndIndex, ex.Message);
                        
    if (_onResponse != null)
                            _onResponse(
    this, _mmStream, ex);
                    }
                }
            }
        }

    自己测试100多M的文件开10个线程可以正常下载,因为仓促写的,没经过大量测试可能还有很多没考虑的地方,仅供参考。

  • 相关阅读:
    自动补全命令插件的安装(centos)
    vim tab设置为4个空格
    CentOS系统将UTC时间修改为CST时间
    Zabbix报错 zabbix:cannot convert value to numeric type解决
    php-fpm平滑重启开启关闭
    虚拟机启动network服务失败,Job for network.service failed because the control process exited with error code问题
    Tomcat中设置站点子目录的方法
    WordPress添加背景音乐plus教程
    WordPress使用必应每日一图作登录页面背景
    WordPress自定义美化
  • 原文地址:https://www.cnblogs.com/lulu/p/2061062.html
Copyright © 2011-2022 走看看