zoukankan      html  css  js  c++  java
  • C# Socket 多线程可断点传送大文件3

    更新使用.net 4.0 Parallel 来代替 new Thread();

    /*********************************************************************************
    ** File Name    :    FileTransmitor.cs
    ** Copyright (C) 2010 Snda Network Corporation. All Rights Reserved.
    ** Creator        :    RockyWong
    ** Create Date    :    2010-06-02 11:22:45
    ** Update Date    :    2013-01-11 11:35:26
    ** Description    :    多线程多管道可断点传输大文件
    ** Version No    :    
    *********************************************************************************/
    using System;
    using System.Collections.Generic;
    using System.Diagnostics.Contracts;
    using System.IO;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace Rocky.Net
    {
        public sealed class FileTransfer : Disposable
        {
            #region Fields
            internal const int PerLongSize = sizeof(long);
            internal const string PointExtension = ".dat";
            internal const string TempExtension = ".temp";
    
            private string _savePath;
            private Socket _listener;
            #endregion
    
            #region Properties
            public event EventHandler<TransferEventArgs> Prepare;
            public event EventHandler<TransferEventArgs> ProgressChanged;
            public event EventHandler<TransferEventArgs> Completed;
    
            public string DirectoryPath
            {
                get { return _savePath; }
                set
                {
                    _savePath = value + @"\" + DateTime.Now.ToString("yyyy-MM") + @"\";
                }
            }
            #endregion
    
            #region Constructors
            public FileTransfer()
            {
    
            }
    
            protected override void DisposeInternal(bool disposing)
            {
                if (disposing)
                {
                    SocketHelper.DisposeListener(ref _listener);
                }
                _listener = null;
                Prepare = null;
                ProgressChanged = null;
                Completed = null;
            }
            #endregion
    
            #region Methods
            private void OnPrepare(TransferEventArgs e)
            {
                if (this.Prepare != null)
                {
                    this.Prepare(this, e);
                }
            }
    
            private void OnProgressChanged(TransferEventArgs e)
            {
                if (this.ProgressChanged != null)
                {
                    this.ProgressChanged(this, e);
                }
            }
    
            private void OnCompleted(TransferEventArgs e)
            {
                if (this.Completed != null)
                {
                    this.Completed(this, e);
                }
            }
            #endregion
    
            #region Receive
            /// <summary>
            /// Listen & Receive
            /// </summary>
            /// <param name="savePath"></param>
            /// <param name="port"></param>
            public void Listen(string savePath, ushort port)
            {
                Contract.Requires(!string.IsNullOrEmpty(savePath));
                if (_listener != null)
                {
                    throw new ApplicationException("已启动监听");
                }
    
                Runtime.CreateDirectory(_savePath = savePath);
                var localIpe = new IPEndPoint(IPAddress.Any, port);
                //最多支持16线程
                _listener = SocketHelper.CreateListener(localIpe, 16);
                TaskHelper.Factory.StartNew(() =>
                {
                    while (_listener != null)
                    {
                        Socket controlClient = _listener.Accept();
                        Runtime.LogInfo("TunnelTest 双工通讯: {0}.", controlClient.RemoteEndPoint);
    
                        TransferConfig config;
                        controlClient.Receive(out config);
                        var e = new TransferEventArgs(config);
                        this.OnPrepare(e);
                        if (e.Cancel)
                        {
                            controlClient.Close();
                            continue;
                        }
    
                        var chunkGroup = new ReceiveChunkModel[config.ThreadCount];
                        chunkGroup[0] = new ReceiveChunkModel(controlClient);
                        for (int i = 1; i < chunkGroup.Length; i++)
                        {
                            chunkGroup[i] = new ReceiveChunkModel(_listener.Accept());
                        }
                        TaskHelper.Factory.StartNew(Receive, new object[] { e, chunkGroup });
                    }
                });
            }
    
            private void Receive(object state)
            {
                var args = (object[])state;
                var e = (TransferEventArgs)args[0];
                var chunkGroup = (ReceiveChunkModel[])args[1];
                var controlClient = chunkGroup[0].Client;
    
                e.Progress = new TransferProgress();
                e.Progress.Start(e.Config.FileLength);
                #region Breakpoint
                int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length;
                byte[] bufferInfo = new byte[count];
                string filePath = Path.Combine(_savePath, e.Config.Checksum + Path.GetExtension(e.Config.FileName)),
                    pointFilePath = Path.ChangeExtension(filePath, PointExtension), tempFilePath = Path.ChangeExtension(filePath, TempExtension);
                FileStream pointStream;
                long oddSize, avgSize = Math.DivRem(e.Config.FileLength, (long)chunkGroup.Length, out oddSize);
                if (File.Exists(pointFilePath) && File.Exists(tempFilePath))
                {
                    pointStream = new FileStream(pointFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None);
                    pointStream.Read(bufferInfo, 0, count);
                    long fValue, tValue;
                    for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                    {
                        fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount);
                        tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize);
                        chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
                        Runtime.LogDebug("[Multi]Local{0} breakpoint read{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue);
                    }
                    controlClient.Send(bufferInfo);
                }
                else
                {
                    pointStream = new FileStream(pointFilePath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None);
                    FileStream stream = new FileStream(tempFilePath, FileMode.CreateNew, FileAccess.Write, FileShare.Write);
                    stream.SetLength(e.Config.FileLength);
                    stream.Flush();
                    stream.Dispose();
                    for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                    {
                        chunkGroup[i].Initialize(tempFilePath, e.Config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize);
                    }
                    controlClient.Send(bufferInfo, 0, 4, SocketFlags.None);
                }
                var timer = new Timer(arg =>
                {
                    long fValue, tValue;
                    for (int i = 0; i < chunkGroup.Length; i++)
                    {
                        chunkGroup[i].ReportProgress(out fValue, out tValue);
                        Buffer.BlockCopy(BitConverter.GetBytes(fValue), 0, bufferInfo, i * perPairCount, 8);
                        Buffer.BlockCopy(BitConverter.GetBytes(tValue), 0, bufferInfo, i * perPairCount + PerLongSize, 8);
                        Runtime.LogDebug("[Multi]Local{0} breakpoint write{1}:{2}/{3}.", _listener.LocalEndPoint, i, fValue, tValue);
                    }
                    pointStream.Position = 0L;
                    pointStream.Write(bufferInfo, 0, count);
                    pointStream.Flush();
                }, null, TimeSpan.Zero, TimeSpan.FromSeconds(4));
                #endregion
                Parallel.ForEach(chunkGroup, chunk => chunk.Run());
                long bytesTransferred = 0L;
                do
                {
                    chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
                    this.OnProgressChanged(e);
                    Thread.Sleep(1000);
                }
                while (!chunkGroup.IsAllCompleted());
                chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
                this.OnProgressChanged(e);
                timer.Dispose();
                pointStream.Dispose();
                File.Delete(pointFilePath);
                File.Move(tempFilePath, filePath);
    
                e.Progress.Stop();
                this.OnCompleted(e);
            }
            #endregion
    
            #region Send
            public void Send(TransferConfig config, IPEndPoint remoteIpe)
            {
                Contract.Requires(config != null && remoteIpe != null);
    
                var controlChunk = new SendChunkModel(remoteIpe);
                controlChunk.Client.Send(config);
                var e = new TransferEventArgs(config);
                this.OnPrepare(e);
                if (e.Cancel || !controlChunk.Client.Connected)
                {
                    controlChunk.Client.Close();
                    return;
                }
    
                var chunkGroup = new SendChunkModel[config.ThreadCount];
                chunkGroup[0] = controlChunk;
                for (int i = 1; i < chunkGroup.Length; i++)
                {
                    chunkGroup[i] = new SendChunkModel(remoteIpe);
                }
    
                e.Progress = new TransferProgress();
                e.Progress.Start(config.FileLength);
                #region Breakpoint
                int perPairCount = PerLongSize * 2, count = perPairCount * chunkGroup.Length;
                byte[] bufferInfo = new byte[count];
                long oddSize, avgSize = Math.DivRem(config.FileLength, (long)chunkGroup.Length, out oddSize);
                if (controlChunk.Client.Receive(bufferInfo) == 4)
                {
                    for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                    {
                        chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize);
                    }
                }
                else
                {
                    long fValue, tValue;
                    for (int i = 0, j = chunkGroup.Length - 1; i < chunkGroup.Length; i++)
                    {
                        fValue = BitConverter.ToInt64(bufferInfo, i * perPairCount);
                        tValue = BitConverter.ToInt64(bufferInfo, i * perPairCount + PerLongSize);
                        chunkGroup[i].Initialize(config.FilePath, config.ChunkLength, i * avgSize, i == j ? avgSize + oddSize : avgSize, fValue, tValue);
                        Runtime.LogDebug("[Multi]Remote{0} breakpoint{1}:{2}/{3}.", remoteIpe, i, fValue, tValue);
                    }
                }
                Thread.Sleep(200);
                #endregion
                Parallel.ForEach(chunkGroup, chunk => chunk.Run());
                long bytesTransferred = 0L;
                do
                {
                    chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
                    this.OnProgressChanged(e);
                    Thread.Sleep(1000);
                }
                while (!chunkGroup.IsAllCompleted());
                chunkGroup.ReportSpeed(e.Progress, ref bytesTransferred);
                this.OnProgressChanged(e);
    
                e.Progress.Stop();
                this.OnCompleted(e);
            }
            #endregion
        }
    }

    另外推荐一个网络小工具:

    内测安装地址:http://publish.xineworld.com/cloudagent/publish.htm

    上篇文章:http://www.cnblogs.com/Googler/archive/2013/01/11/2856219.html

  • 相关阅读:
    LoadRunner
    LoadRunner
    LoadRunner
    LoadRunner
    Python
    hadoop for .Net
    MVC初学
    MVC初学
    android学习---面试一
    android学习---progressbar和ratingbar
  • 原文地址:https://www.cnblogs.com/Googler/p/3095286.html
Copyright © 2011-2022 走看看