zoukankan      html  css  js  c++  java
  • 【Socket】苍老师有了丈夫,我也有了SAEA

    一、前言

           时间过得真是快,转眼就2018年了。首先祝各位博友,软件开发者新年新气象,事业有成,身体健康,阖家幸福!最近看到园子里好多关于自己的2017年度总结以及对自己新一年的愿景,觉得咱园子的氛围是真的好。这三天假期我也没闲着,一边看OB海鲜团吃鸡一边写Socket SocketAsyncEventArgs 代码。我上一篇博客已经用APM的方式实现了客户端与服务器端的Socket通信,并具有了一定的并发能力。所以这三天我就决定对服务器代码进行改造,使用MS在4.0时发布的SocketAsyncEventArgs(SAEA)写法。为了方便的进行服务器端两种写法的对比,我客户端的代码没有进行变化,依然使用APM方式。代码已经上传至Github,链接会在文末贴出。

    二、我的业务功能

           我的业务功能依然是实现从服务器多线程下载更新文件。下载之前的那些操作我基本就不讲了,上一篇博文里的都有,本文还是回到Socket下载文件上。具体流程如下:

           在我写SAEA代码之前,我仔细搜了一下网上的资源:MSDN、CNBLOG、CSDN、CodeProject。这四种来源的代码示例的主要流程是这样的:

          对比我的流程,您会发现少了一半的通信过程。客户端的代码好写,但是服务器端如何发送完数据之后再接收数据?这中间的衔接过程还是有点门道的。特别是SAEA的代码采用了Buffer池化以及SAEA池化之后,里面有些小的细节就要想清楚了。下面就是具体的代码,我会以我自己的视角去论述APM与SAEA到底有什么区别。

    三、对比

         其实对于服务器端的APM,我觉得最重要的并不是代码中的BeginXXX或者是EndXXX,因为这就是APM写法的特征,BeginXXX或者EndXXX然后里面有一个回调函数,在回调函数里去做一些业务上的事情。最重要的是要有一个线程等待的概念,也就是代码中的ManualResetEvent这个东西,它就像地铁闸机一样,处理好一个再放一个进去。APM写法的好处是显而易见的,就是代码看起来十分的简单。缺点依照MS的说法就是如果有过多的客-服交流,可能会产生较多的IAsyncResult对象,这样会增加服务器的开销。  

         服务器端的APM写法:

      1 using System;
      2 using System.IO;
      3 using System.Linq;
      4 using System.Net;
      5 using System.Net.Sockets;
      6 using System.Threading;
      7 using UpdaterShare.GlobalSetting;
      8 using UpdaterShare.Model;
      9 using UpdaterShare.Utility;
     10 
     11 namespace UpdaterServerAPM
     12 {  
     13     public static class ServerSocket
     14     {
     15         private static int _downloadChannelsCount;
     16         private static string _serverPath;
     17         private static readonly ManualResetEvent AllDone = new ManualResetEvent(false);
     18 
     19         public static void StartServer(int port, int backlog)
     20         {         
     21             _downloadChannelsCount = DownloadSetting.DownloadChannelsCount;
     22             try
     23             {
     24                 IPAddress ipAddress = IPAddress.Any;
     25                 IPEndPoint localEndPoint = new IPEndPoint(ipAddress, port);
     26                 Socket listener = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
     27                 listener.Bind(localEndPoint);
     28                 listener.Listen(backlog);
     29 
     30                 while (true)
     31                 {
     32                     AllDone.Reset();
     33                     listener.BeginAccept(AcceptCallback, listener);
     34                     AllDone.WaitOne();
     35                 }
     36             }
     37             catch (Exception ex)
     38             {
     39                 var path = $"{AppDomain.CurrentDomain.BaseDirectory}\RunLog.txt";
     40                 File.AppendAllText(path, ex.Message);
     41             }
     42         }
     43 
     44 
     45         private static void AcceptCallback(IAsyncResult ar)
     46         {
     47             AllDone.Set();
     48             Socket listener = (Socket)ar.AsyncState;
     49             Socket handler = listener.EndAccept(ar);
     50             ComObject state = new ComObject { WorkSocket = handler };
     51             handler.BeginReceive(state.Buffer, 0, ComObject.BufferSize, 0, FindUpdateFileCallback, state);
     52         }
     53 
     54 
     55         private static void FindUpdateFileCallback(IAsyncResult ar)
     56         {
     57             ComObject state = (ComObject)ar.AsyncState;
     58             Socket handler = state.WorkSocket;
     59             int bytesRead = handler.EndReceive(ar);
     60             if (bytesRead > 0)
     61             {
     62                 var receiveData = state.Buffer.Take(bytesRead).ToArray();
     63                 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientFindFileInfoTag());
     64                 if (dataList != null && dataList.Any())
     65                 {
     66                     var request = PacketUtils.GetData(PacketUtils.ClientFindFileInfoTag(), dataList.FirstOrDefault());
     67                     string str = System.Text.Encoding.UTF8.GetString(request);
     68                     var infos = str.Split('_');
     69                     var productName = infos[0];
     70                     var revitVersion = infos[1];
     71                     var currentVersion = infos[2];
     72 
     73                     var mainFolder = AppDomain.CurrentDomain.BaseDirectory.Replace("bin", "TestFile");
     74                     var serverFileFolder = Path.Combine(mainFolder, "Server");
     75                     var serverFileFiles = new DirectoryInfo(serverFileFolder).GetFiles();
     76                    
     77                     var updatefile  = serverFileFiles.FirstOrDefault(x=>x.Name.Contains(productName) && x.Name.Contains(revitVersion) && x.Name.Contains(currentVersion));
     78                     if (updatefile != null)
     79                     {
     80                         if (string.IsNullOrEmpty(updatefile.FullName) || !File.Exists(updatefile.FullName)) return;
     81                         _serverPath = updatefile.FullName;
     82                         FoundUpdateFileResponse(handler);
     83                     }
     84                 }
     85             }
     86         }
     87 
     88 
     89         private static void FoundUpdateFileResponse(Socket handler)
     90         {
     91             byte[] foundUpdateFileData = PacketUtils.PacketData(PacketUtils.ServerFoundFileInfoTag(),null);
     92             ComObject state = new ComObject { WorkSocket = handler };
     93             handler.BeginSend(foundUpdateFileData, 0, foundUpdateFileData.Length, 0, HasFoundUpdateFileCallback, state);
     94         }
     95 
     96 
     97         private static void HasFoundUpdateFileCallback(IAsyncResult ar)
     98         {
     99             ComObject state = (ComObject)ar.AsyncState;
    100             Socket handler = state.WorkSocket;
    101             handler.EndSend(ar);
    102             handler.BeginReceive(state.Buffer, 0, ComObject.BufferSize, 0, ReadFilePositionRequestCallback, state);
    103         }
    104 
    105 
    106         private static void ReadFilePositionRequestCallback(IAsyncResult ar)
    107         {
    108             ComObject state = (ComObject)ar.AsyncState;
    109             Socket handler = state.WorkSocket;
    110             int bytesRead = handler.EndReceive(ar);
    111             if (bytesRead > 0)
    112             {
    113                 var receiveData = state.Buffer.Take(bytesRead).ToArray();
    114                 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientRequestFileTag());
    115                 if (dataList != null)
    116                 {
    117                     foreach (var request in dataList)
    118                     {
    119                         if (PacketUtils.IsPacketComplete(request))
    120                         {
    121                             int startPosition = PacketUtils.GetRequestFileStartPosition(request); 
    122                             SendFileResponse(handler, startPosition);
    123                         }
    124                     }
    125                 }
    126             }
    127         }
    128 
    129         private static void SendFileResponse(Socket handler, int startPosition)
    130         {
    131             var packetSize = PacketUtils.GetPacketSize(_serverPath, _downloadChannelsCount);
    132             if (packetSize != 0)
    133             {
    134                 byte[] filedata = FileUtils.GetFile(_serverPath, startPosition, packetSize);
    135                 byte[] packetNumber = BitConverter.GetBytes(startPosition/packetSize);
    136                 if (filedata != null)
    137                 {
    138                     byte[] segmentedFileResponseData = PacketUtils.PacketData(PacketUtils.ServerResponseFileTag(), filedata, packetNumber);
    139                     ComObject state = new ComObject {WorkSocket = handler};
    140                     handler.BeginSend(segmentedFileResponseData, 0, segmentedFileResponseData.Length, 0, SendFileResponseCallback, state);
    141                 }
    142             }
    143             else
    144             {               
    145                 handler.Shutdown(SocketShutdown.Both);
    146                 handler.Close();
    147             }       
    148         }
    149 
    150 
    151         private static void SendFileResponseCallback(IAsyncResult ar)
    152         {
    153             try
    154             {
    155                 ComObject state = (ComObject)ar.AsyncState;
    156                 Socket handler = state.WorkSocket;
    157                 handler.EndSend(ar);
    158                 handler.Shutdown(SocketShutdown.Both);
    159                 handler.Close();             
    160             }
    161             catch (Exception e)
    162             {
    163 
    164             }
    165         }
    166     }
    167 }

            说到SAEA,我觉得初入的小伙伴一定要先看MSDN上的实例,特别是它的BufferManager以及SocketAsyncEventArgsPool是怎么写的,到底是干什么用的。这里我可以简单的说下:SocketAsyncEventArgsPool是用来存放SAEA对象的,其个数依赖于你服务器所能承担的队列长度,比如说我服务器能承担100个客户的等待,我就在服务器端生成100个SAEA对象放在池子里,当有客户来连接时,我从池子里取出一个来和他对接。客户走了,我再扔到池子里去。BufferManager则是对池子里的SAEA对象进行Buffer分配的,也相当于一个池子,这个池子的大小是队列长度*通信缓存长度*2,乘以2是因为读与写是分开的。通信缓存长度很好理解,客户端要传个2G的信息给服务器端不可能一下子接收2G,肯定是一口一口吃,那么这一口的大小就是通信缓存长度。那么分配给每个SAEA的缓存是多大呢?当然就是通信缓存长度的大小咯。注意!!注意!!注意!!既然是池化了,所有关于Buffer的操作都要围绕分配给SAEA的Buffer去操作!见148-149行当服务器拿着分配到的Buffer去接收信息后,如果再要发送信息,所要做的第一件事就是先清空分配的Buffer再使用,BufferManager给你分配哪段你就用哪段,别使用错了。有几个参数需要注意:e.Offset(偏移),e.Count(大小),e.Buffer(缓存字节数组), e.BytesTransferred(通信传输的字节长度)。如果服务器端要发送数据,一定要用Array.Copy将信息写入对应分配的Buffer中。

         说完池化,接着就是写法上的小区别,我觉得区别并不大,无非就是委托换了个写法。当然还要判断下是否为异步操作,如果是否则需要进行同步操作,见82-85行代码。

           服务器的SAEA写法:

      1 using System;
      2 using System.IO;
      3 using System.Linq;
      4 using System.Net;
      5 using System.Net.Sockets;
      6 using System.Threading;
      7 using UpdaterShare.GlobalSetting;
      8 using UpdaterShare.Model;
      9 using UpdaterShare.Utility;
     10 
     11 namespace UpdaterServerSAEA
     12 {
     13     public class ServerSocket
     14     {
     15         private readonly int _port;
     16         private readonly int _backlog;
     17         private Socket _listenSocket;
     18         private const int _opsToPreAlloc = 2;
     19         private readonly BufferManager _bufferManager;
     20         private readonly SocketAsyncEventArgsPool _readWritePool;
     21         private readonly Semaphore _maxNumberAcceptedClients;
     22 
     23         private string _serverPath;
     24         private static readonly int _downloadChannelsCount = DownloadSetting.DownloadChannelsCount;
     25 
     26         public ServerSocket(int port, int backlog)
     27         {
     28             _port = port;
     29             _backlog = backlog;
     30 
     31             _bufferManager = new BufferManager(ComObject.BufferSize * backlog * _opsToPreAlloc, ComObject.BufferSize);
     32             _readWritePool = new SocketAsyncEventArgsPool(backlog);
     33             _maxNumberAcceptedClients = new Semaphore(backlog, backlog);
     34         }
     35 
     36 
     37         private void Init()
     38         {
     39             _bufferManager.InitBuffer();
     40 
     41             for (var i = 0; i < _backlog; i++)
     42             {
     43                 var readWriteEventArg = new SocketAsyncEventArgs();
     44                 _bufferManager.SetBuffer(readWriteEventArg);
     45                 _readWritePool.Push(readWriteEventArg);
     46             }
     47         }
     48 
     49 
     50         public void StartServer()
     51         {
     52             try
     53             {
     54                 Init();
     55                 IPAddress ipAddress = IPAddress.Any;
     56                 IPEndPoint localEndPoint = new IPEndPoint(ipAddress, _port);
     57                 _listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
     58                 _listenSocket.Bind(localEndPoint);
     59                 _listenSocket.Listen(_backlog);
     60                 StartAccept(null);
     61             }
     62             catch (Exception ex)
     63             {
     64                 Console.WriteLine(ex.Message);
     65             }
     66         }
     67 
     68         private void StartAccept(SocketAsyncEventArgs acceptEventArg)
     69         {
     70             if (acceptEventArg == null)
     71             {
     72                 acceptEventArg = new SocketAsyncEventArgs();
     73                 acceptEventArg.Completed += StartAccept_Completed;
     74             }
     75             else
     76             {
     77                 acceptEventArg.AcceptSocket = null;
     78             }
     79 
     80             _maxNumberAcceptedClients.WaitOne();
     81 
     82             if (!_listenSocket.AcceptAsync(acceptEventArg))
     83             {
     84                 ProcessAccept(acceptEventArg);
     85             }
     86         }
     87 
     88         private void StartAccept_Completed(object sender, SocketAsyncEventArgs e)
     89         {
     90             ProcessAccept(e);
     91         }
     92 
     93 
     94         private void ProcessAccept(SocketAsyncEventArgs e)
     95         {
     96             if (e.SocketError == SocketError.Success)
     97             {
     98                 var socket = e.AcceptSocket;
     99                 if (socket.Connected)
    100                 {
    101                     SocketAsyncEventArgs readEventArgs = _readWritePool.Pop();
    102                     readEventArgs.AcceptSocket = socket;
    103                     readEventArgs.Completed += ProcessAccept_Completed;
    104                     if (!socket.ReceiveAsync(readEventArgs))
    105                     {
    106                         ProcessReceiveFindFileRequest(readEventArgs);
    107                     }
    108                     StartAccept(e);
    109                 }
    110             }
    111         }
    112 
    113         private void ProcessAccept_Completed(object sender, SocketAsyncEventArgs e)
    114         {
    115             ProcessReceiveFindFileRequest(e);
    116         }
    117 
    118 
    119         private void ProcessReceiveFindFileRequest(SocketAsyncEventArgs e)
    120         {
    121             var bytesRead = e.BytesTransferred;
    122             if (bytesRead > 0 && e.SocketError == SocketError.Success)
    123             {
    124                 var receiveData = e.Buffer.Skip(e.Offset).Take(bytesRead).ToArray();
    125                 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientFindFileInfoTag());
    126                 if (dataList != null && dataList.Any())
    127                 {
    128                     var request = PacketUtils.GetData(PacketUtils.ClientFindFileInfoTag(), dataList.FirstOrDefault());
    129                     string str = System.Text.Encoding.UTF8.GetString(request);
    130                     var infos = str.Split('_');
    131                     var productName = infos[0];
    132                     var revitVersion = infos[1];
    133                     var currentVersion = infos[2];
    134 
    135                     var mainFolder = AppDomain.CurrentDomain.BaseDirectory.Replace("bin", "TestFile");
    136                     var serverFileFolder = Path.Combine(mainFolder, "Server");
    137                     var serverFileFiles = new DirectoryInfo(serverFileFolder).GetFiles();
    138 
    139                     var updatefile = serverFileFiles.FirstOrDefault(x => x.Name.Contains(productName) && x.Name.Contains(revitVersion) && x.Name.Contains(currentVersion));
    140                     if (updatefile != null)
    141                     {
    142                         if (string.IsNullOrEmpty(updatefile.FullName) || !File.Exists(updatefile.FullName)) return;
    143                         _serverPath = updatefile.FullName;
    144 
    145                         //ready to send back to Client
    146                         byte[] foundUpdateFileData = PacketUtils.PacketData(PacketUtils.ServerFoundFileInfoTag(), null);
    147 
    148                         Array.Clear(e.Buffer, e.Offset, e.Count);
    149                         Array.Copy(foundUpdateFileData, 0, e.Buffer, e.Offset, foundUpdateFileData.Length);
    150 
    151                         e.Completed -= ProcessAccept_Completed;
    152                         e.Completed += ProcessReceiveFindFileRequest_Completed;
    153 
    154                         if (!e.AcceptSocket.SendAsync(e))
    155                         {
    156                             ProcessFilePosition(e);
    157                         }
    158                     }
    159                 }
    160             }
    161         }
    162 
    163 
    164         private void ProcessReceiveFindFileRequest_Completed(object sender, SocketAsyncEventArgs e)
    165         {
    166             ProcessFilePosition(e);
    167         }
    168 
    169 
    170         private void ProcessFilePosition(SocketAsyncEventArgs e)
    171         {
    172             if (e.SocketError == SocketError.Success)
    173             {
    174                 var socket = e.AcceptSocket;
    175                 if (socket.Connected)
    176                 {
    177                     //clear buffer
    178                     Array.Clear(e.Buffer, e.Offset, e.Count);
    179 
    180                     e.Completed -= ProcessReceiveFindFileRequest_Completed;
    181                     e.Completed += ProcessFilePosition_Completed;
    182 
    183                     if (!socket.ReceiveAsync(e))
    184                     {
    185                         ProcessSendFile(e);
    186                     }
    187                 }
    188             }
    189         }
    190 
    191         private void ProcessFilePosition_Completed(object sender, SocketAsyncEventArgs e)
    192         {
    193             ProcessSendFile(e);
    194         }
    195 
    196         private void ProcessSendFile(SocketAsyncEventArgs e)
    197         {
    198             var bytesRead = e.BytesTransferred;
    199             if (bytesRead > 0 && e.SocketError == SocketError.Success)
    200             {
    201                 var receiveData = e.Buffer.Skip(e.Offset).Take(bytesRead).ToArray();
    202                 var dataList = PacketUtils.SplitBytes(receiveData, PacketUtils.ClientRequestFileTag());
    203                 if (dataList != null)
    204                 {
    205                     foreach (var request in dataList)
    206                     {
    207                         if (PacketUtils.IsPacketComplete(request))
    208                         {
    209                             int startPosition = PacketUtils.GetRequestFileStartPosition(request);
    210 
    211                             var packetSize = PacketUtils.GetPacketSize(_serverPath, _downloadChannelsCount);
    212                             if (packetSize != 0)
    213                             {
    214                                 byte[] filedata = FileUtils.GetFile(_serverPath, startPosition, packetSize);
    215                                 byte[] packetNumber = BitConverter.GetBytes(startPosition / packetSize);
    216 
    217                                 Console.WriteLine("Receive File Request PacketNumber: "+startPosition / packetSize);
    218 
    219                                 if (filedata != null)
    220                                 {
    221                                     //ready to send back to Client
    222                                     byte[] segmentedFileResponseData = PacketUtils.PacketData(PacketUtils.ServerResponseFileTag(), filedata, packetNumber);
    223 
    224                                     Array.Clear(e.Buffer, e.Offset, e.Count);
    225                                     Array.Copy(segmentedFileResponseData, 0, e.Buffer, e.Offset, segmentedFileResponseData.Length);
    226 
    227                                     e.Completed -= ProcessFilePosition_Completed;
    228                                     e.Completed += ProcessSendFile_Completed;
    229 
    230                                     if (!e.AcceptSocket.SendAsync(e))
    231                                     {
    232                                         CloseClientSocket(e);
    233                                     }
    234                                 }
    235                             }
    236                         }
    237                     }
    238                 }
    239             }
    240             else
    241             {
    242                 CloseClientSocket(e);
    243             }
    244         }
    245 
    246 
    247         private void ProcessSendFile_Completed(object sender, SocketAsyncEventArgs e)
    248         {
    249             CloseClientSocket(e);
    250         }
    251 
    252 
    253         private void CloseClientSocket(SocketAsyncEventArgs e)
    254         {
    255             try
    256             {
    257                 e.AcceptSocket.Shutdown(SocketShutdown.Both);
    258                 e.AcceptSocket.Close();
    259             }
    260             catch (Exception ex)
    261             {
    262                 Console.WriteLine(ex.Message);
    263             }
    264             finally
    265             {
    266                 _maxNumberAcceptedClients.Release();
    267                 _readWritePool.Push(e);
    268             }
    269         }
    270     }
    271 }

    四、总结

          坑坑洼洼总算是写完了SAEA的代码,由于本人知识面有限,如果说的不对,还请各位及时直接提出批评与建议,我这个人比较在乎技术不在乎面子的。

    附:

    MSDN示例:

    https://msdn.microsoft.com/en-us/library/system.net.sockets.socketasynceventargs(v=vs.110).aspx

    启蒙博客:

    http://www.cnblogs.com/gaochundong/p/csharp_tcp_service_models.html

    大神改造:

    http://freshflower.iteye.com/blog/2285272

    架构狂魔:

    http://www.cnblogs.com/jiahuafu/archive/2013/01/05/2845631.html

     

    我的GitHub

    https://github.com/airforce094/Socket_APM-SAEA

  • 相关阅读:
    数据库查询(二)
    数据库插入操作
    java关键字查询数据库
    登录页面设计
    获取下拉列表的值
    Linux下tomcat 8安装与配置
    CentOS7下防火墙的设置
    centos 7 下nginx的安装
    vmware14下centos 7的安装和配置
    Linux中JDK的安装以及配置
  • 原文地址:https://www.cnblogs.com/lovecsharp094/p/8177146.html
Copyright © 2011-2022 走看看