zoukankan      html  css  js  c++  java
  • 一个高性能异步socket封装库的实现思路 (c#)

    前言

    socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

    异步通讯实际是利用windows完成端口(IOCP)来处理的,关于完成端口实现原理,大家可以参考网上文章。

    我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

    异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

    我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

    纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

    在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

    为了使大家对通讯效率有初步了解,先看测试图。

    客户端和服务端都是本机测试,最大连接数为64422,套接字已耗尽!

    主机配置情况

    百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

    这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

    库的结构图 

    目标

    1. 即可作为服务端(监听)也可以作为客户端(主动连接)使用。
    2. 可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。
    3. 高可用性。将复杂的底层处理封装,对外接口非常友好。
    4. 高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

    实现思路

    网络处理逻辑可以分为以下几个部分:

    1. 网络监听   可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。
    2. 主动连接  可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。
    3. Socket收发处理   每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了粘包,提高发送性能;接收时,一次投递1K的数据。
    4. 组包处理   一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

     NetListener 监听

    using System;
    using System.Net;
    using System.Net.Sockets;
    using System.Threading;
    
    namespace IocpCore
    {
        class NetListener
        {
            private Socket listenSocket;
            public ListenParam _listenParam { get; set; }
            public event Action<ListenParam, AsyncSocketClient> OnAcceptSocket;
    
            bool start;
    
            NetServer _netServer;
            public NetListener(NetServer netServer)
            {
                _netServer = netServer;
            }
    
            public int _acceptAsyncCount = 0;
            public bool StartListen()
            {
                try
                {
                    start = true;
                    IPEndPoint listenPoint = new IPEndPoint(IPAddress.Parse("0.0.0.0"), _listenParam._port);
                    listenSocket = new Socket(listenPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                    listenSocket.Bind(listenPoint);
                    listenSocket.Listen(200);
    
                    Thread thread1 = new Thread(new ThreadStart(NetProcess));
                    thread1.Start();
                   
                    StartAccept();
                    return true;
                }
                catch (Exception ex)
                {
                    NetLogger.Log(string.Format("**监听异常!{0}", ex.Message));
                    return false;
                }
            }
    
            AutoResetEvent _acceptEvent = new AutoResetEvent(false);
            private void NetProcess()
            {
                while (start)
                {
                    DealNewAccept();
                    _acceptEvent.WaitOne(1000 * 10);
                }
            }
    
            private void DealNewAccept()
            {
                try
                {
                    if(_acceptAsyncCount <= 10)
                    {
                        StartAccept();
                    }
    
                    while (true)
                    {
                        AsyncSocketClient client = _newSocketClientList.GetObj();
                        if (client == null)
                            break;
    
                        DealNewAccept(client);
                    }
                }
                catch (Exception ex)
                {
                    NetLogger.Log(string.Format("DealNewAccept 异常 {0}***{1}", ex.Message, ex.StackTrace));
                }
            }
    
            private void DealNewAccept(AsyncSocketClient client)
            {
                client.SendBufferByteCount = _netServer.SendBufferBytePerClient;
               OnAcceptSocket?.Invoke(_listenParam, client);
            }
    
            public bool StartAccept()
            {
               
                return true;
            }
    
            ObjectPool<AsyncSocketClient> _newSocketClientList = new ObjectPool<AsyncSocketClient>();
            private void ProcessAccept(SocketAsyncEventArgs acceptEventArgs)
            {
                try
                {
                    using (acceptEventArgs)
                    {
                        if (acceptEventArgs.AcceptSocket != null)
                        {
                            AsyncSocketClient client = new AsyncSocketClient(acceptEventArgs.AcceptSocket);
                            client.CreateClientInfo(this);
    
                            _newSocketClientList.PutObj(client);
                            _acceptEvent.Set();
                        }
                    }
                }
                catch (Exception ex)
                {
                    NetLogger.Log(string.Format("ProcessAccept {0}***{1}", ex.Message, ex.StackTrace));
                }
            }
        }
    }
    

      

    NetConnectManage连接处理

      1 using System;
      2 using System.Net;
      3 using System.Net.Sockets;
      4 
      5 namespace IocpCore
      6 {
      7     class NetConnectManage
      8     {
      9         public event Action<SocketEventParam, AsyncSocketClient> OnSocketConnectEvent;
     10 
     11         public bool ConnectAsyn(string peerIp, int peerPort, object tag)
     12         {
     13             try
     14             {
     15                 Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
     16                 SocketAsyncEventArgs socketEventArgs = new SocketAsyncEventArgs();
     17                 socketEventArgs.RemoteEndPoint = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
     18                 socketEventArgs.Completed += SocketConnect_Completed;
     19 
     20                 SocketClientInfo clientInfo = new SocketClientInfo();
     21                 socketEventArgs.UserToken = clientInfo;
     22                 clientInfo.PeerIp = peerIp;
     23                 clientInfo.PeerPort = peerPort;
     24                 clientInfo.Tag = tag;
     25 
     26                 bool willRaiseEvent = socket.ConnectAsync(socketEventArgs);
     27                 if (!willRaiseEvent)
     28                 {
     29                     ProcessConnect(socketEventArgs);
     30                     socketEventArgs.Completed -= SocketConnect_Completed;
     31                     socketEventArgs.Dispose();
     32                 }
     33                 return true;
     34             }
     35             catch (Exception ex)
     36             {
     37                 NetLogger.Log("ConnectAsyn",ex);
     38                 return false;
     39             }
     40         }
     41 
     42         private void SocketConnect_Completed(object sender, SocketAsyncEventArgs socketEventArgs)
     43         {
     44             ProcessConnect(socketEventArgs);
     45             socketEventArgs.Completed -= SocketConnect_Completed;
     46             socketEventArgs.Dispose();
     47         }
     48 
     49         private void ProcessConnect(SocketAsyncEventArgs socketEventArgs)
     50         {
     51             SocketClientInfo clientInfo = socketEventArgs.UserToken as SocketClientInfo;
     52             if (socketEventArgs.SocketError == SocketError.Success)
     53             {
     54                 DealConnectSocket(socketEventArgs.ConnectSocket, clientInfo);
     55             }
     56             else
     57             {
     58                 SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, null);
     59                 socketParam.ClientInfo = clientInfo;
     60                 OnSocketConnectEvent?.Invoke(socketParam, null);
     61             }
     62         }
     63 
     64 
     65         void DealConnectSocket(Socket socket, SocketClientInfo clientInfo)
     66         {
     67             clientInfo.SetClientInfo(socket);
     68 
     69             AsyncSocketClient client = new AsyncSocketClient(socket);
     70             client.SetClientInfo(clientInfo);
     71 
     72             //触发事件
     73             SocketEventParam socketParam = new SocketEventParam(EN_SocketEvent.connect, socket);
     74             socketParam.ClientInfo = clientInfo;
     75             OnSocketConnectEvent?.Invoke(socketParam, client);
     76         }
     77 
     78         public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
     79         {
     80             socket = null;
     81             try
     82             {
     83                 Socket socketTmp = new Socket(SocketType.Stream, ProtocolType.Tcp);
     84 
     85                 SocketClientInfo clientInfo = new SocketClientInfo();
     86                 clientInfo.PeerIp = peerIp;
     87                 clientInfo.PeerPort = peerPort;
     88                 clientInfo.Tag = tag;
     89 
     90                 EndPoint remoteEP = new IPEndPoint(IPAddress.Parse(peerIp), peerPort);
     91                 socketTmp.Connect(remoteEP);
     92                 if (!socketTmp.Connected)
     93                     return false;
     94 
     95                 DealConnectSocket(socketTmp, clientInfo);
     96                 socket = socketTmp;
     97                 return true;
     98             }
     99             catch (Exception ex)
    100             {
    101                 NetLogger.Log(string.Format("连接对方:({0}:{1})出错!", peerIp, peerPort), ex);
    102                 return false;
    103             }
    104         }
    105     }
    106 }
    View Code

    AsyncSocketClient socket收发处理

      1 using System;
      2 using System.Collections.Generic;
      3 using System.Diagnostics;
      4 using System.Net;
      5 using System.Net.Sockets;
      6 
      7 namespace IocpCore
      8 {
      9     public class AsyncSocketClient
     10     {
     11         public static int IocpReadLen = 1024;
     12 
     13         public readonly Socket ConnectSocket;
     14 
     15         protected SocketAsyncEventArgs m_receiveEventArgs;
     16         public SocketAsyncEventArgs ReceiveEventArgs { get { return m_receiveEventArgs; } set { m_receiveEventArgs = value; } }
     17         protected byte[] m_asyncReceiveBuffer;
     18 
     19         protected SocketAsyncEventArgs m_sendEventArgs;
     20         public SocketAsyncEventArgs SendEventArgs { get { return m_sendEventArgs; } set { m_sendEventArgs = value; } }
     21         protected byte[] m_asyncSendBuffer;
     22 
     23         public event Action<AsyncSocketClient, byte[]> OnReadData;
     24         public event Action<AsyncSocketClient, int> OnSendData;
     25         public event Action<AsyncSocketClient> OnSocketClose;
     26 
     27         static object releaseLock = new object();
     28         public static int createCount = 0;
     29         public static int releaseCount = 0;
     30 
     31         ~AsyncSocketClient()
     32         {
     33             lock (releaseLock)
     34             {
     35                 releaseCount++;
     36             }
     37         }
     38 
     39         public AsyncSocketClient(Socket socket)
     40         {
     41             lock (releaseLock)
     42             {
     43                 createCount++;
     44             }
     45 
     46             ConnectSocket = socket;
     47 
     48             m_receiveEventArgs = new SocketAsyncEventArgs();
     49             m_asyncReceiveBuffer = new byte[IocpReadLen];
     50             m_receiveEventArgs.AcceptSocket = ConnectSocket;
     51             m_receiveEventArgs.Completed += ReceiveEventArgs_Completed;
     52 
     53             m_sendEventArgs = new SocketAsyncEventArgs();
     54             m_asyncSendBuffer = new byte[IocpReadLen * 2];
     55             m_sendEventArgs.AcceptSocket = ConnectSocket;
     56             m_sendEventArgs.Completed += SendEventArgs_Completed;
     57         }
     58 
     59         SocketClientInfo _clientInfo;
     60 
     61         public SocketClientInfo ClientInfo
     62         {
     63             get
     64             {
     65                 return _clientInfo;
     66             }
     67         }
     68 
     69         internal void CreateClientInfo(NetListener netListener)
     70         {
     71             _clientInfo = new SocketClientInfo();
     72             try
     73             {
     74                 _clientInfo.Tag = netListener._listenParam._tag;
     75                 IPEndPoint ip = ConnectSocket.LocalEndPoint as IPEndPoint;
     76                 Debug.Assert(netListener._listenParam._port == ip.Port);
     77 
     78                 _clientInfo.LocalIp = ip.Address.ToString();
     79                 _clientInfo.LocalPort = netListener._listenParam._port;
     80 
     81                 ip = ConnectSocket.RemoteEndPoint as IPEndPoint;
     82                 _clientInfo.PeerIp = ip.Address.ToString();
     83                 _clientInfo.PeerPort = ip.Port;
     84             }
     85             catch (Exception ex)
     86             {
     87                 NetLogger.Log("CreateClientInfo", ex);
     88             }
     89         }
     90         internal void SetClientInfo(SocketClientInfo clientInfo)
     91         {
     92             _clientInfo = clientInfo;
     93         }
     94 
     95         #region read process
     96         bool _inReadPending = false;
     97         public EN_SocketReadResult ReadNextData()
     98         {
     99             lock (this)
    100             {
    101                 if (_socketError)
    102                     return EN_SocketReadResult.ReadError;
    103                 if (_inReadPending)
    104                     return EN_SocketReadResult.InAsyn;
    105                 if(!ConnectSocket.Connected)
    106                 {
    107                     OnReadError();
    108                     return EN_SocketReadResult.ReadError;
    109                 }
    110 
    111                 try
    112                 {
    113                     m_receiveEventArgs.SetBuffer(m_asyncReceiveBuffer, 0, m_asyncReceiveBuffer.Length);
    114                     _inReadPending = true;
    115                     bool willRaiseEvent = ConnectSocket.ReceiveAsync(ReceiveEventArgs); //投递接收请求
    116                     if (!willRaiseEvent)
    117                     {
    11                         125 return EN_SocketReadResult.HaveRead;
    126                     }
    127                     else
    128                     {
    129                         return EN_SocketReadResult.InAsyn;
    130                     }
    131                 }
    132                 catch (Exception ex)
    133                 {
    134                     NetLogger.Log("ReadNextData", ex);
    135                     _inReadPending = false;
    136                     OnReadError();
    137                     return EN_SocketReadResult.ReadError;
    138                 }
    139             }
    140         }
    141 
    142         private void ProcessReceive()
    143         {
    144             if (ReceiveEventArgs.BytesTransferred > 0
    145                 && ReceiveEventArgs.SocketError == SocketError.Success)
    146             {
    147                 int offset = ReceiveEventArgs.Offset;
    148                 int count = ReceiveEventArgs.BytesTransferred;
    149 
    150                 byte[] readData = new byte[count];
    151                 Array.Copy(m_asyncReceiveBuffer, offset, readData, 0, count);
    152 
    153                 _inReadPending = false;
    154                 if (!_socketError)
    155                     OnReadData?.Invoke(this, readData);
    156             }
    157             else
    158             {
    159                 _inReadPending = false;
    160                 OnReadError();
    161             }
    162         }
    163 
    164         private void ReceiveEventArgs_Completed(object sender, SocketAsyncEventArgs e)
    165         {
    166             lock (this)
    167             {
    168                 _inReadPending = false;
    169                 ProcessReceive();
    170                 if (_socketError)
    171                 {
    172                     OnReadError();
    173                 }
    174             }
    175         }
    176 
    177         bool _socketError = false;
    178         private void OnReadError()
    179         {
    180             lock (this)
    181             {
    182                 if (_socketError == false)
    183                 {
    184                     _socketError = true;
    185                     OnSocketClose?.Invoke(this);
    186                 }
    187                 CloseClient();
    188             }
    189         }
    190         #endregion
    191 
    192         #region send process
    193         int _sendBufferByteCount = 102400;
    194         public int SendBufferByteCount
    195         {
    196             get
    197             {
    198                 return _sendBufferByteCount;
    199             }
    200             set
    201             {
    202                 if (value < 1024)
    203                 {
    204                     _sendBufferByteCount = 1024;
    205                 }
    206                 else
    207                 {
    208                     _sendBufferByteCount = value;
    209                 }
    210             }
    211         }
    212 
    213         SendBufferPool _sendDataPool = new SendBufferPool();
    214         internal EN_SendDataResult PutSendData(byte[] data)
    215         {
    216            //此处省略302         }
    303 
    304         private void SendEventArgs_Completed(object sender, SocketAsyncEventArgs sendEventArgs)
    305         {
    306             lock (this)
    307             {
    308                 try
    309                 {
    310                     _inSendPending = false;
    311                     ProcessSend(m_sendEventArgs);
    312 
    313                     int sendCount = 0;
    314                     if (sendEventArgs.SocketError == SocketError.Success)
    315                     {
    316                         sendCount = sendEventArgs.BytesTransferred;
    317                     }
    318                     OnSendData?.Invoke(this, sendCount);
    319 
    320                     if (_socketError)
    321                     {
    322                         OnSendError();
    323                     }
    324                 }
    325                 catch (Exception ex)
    326                 {
    327                     NetLogger.Log("SendEventArgs_Completed", ex);
    328                 }
    329             }
    330         }
    331 
    332         private bool ProcessSend(SocketAsyncEventArgs sendEventArgs)
    333         {
    334             if (sendEventArgs.SocketError == SocketError.Success)
    335             {
    336                 return true;
    337             }
    338             else
    339             {
    340                 OnSendError();
    341                 return false;
    342             }
    343         }
    344 
    345         private int GetSendData()
    346         {
    347             int dataLen = 0;
    348             while (true)
    349             {
    350                 byte[] data = _sendDataPool.GetObj();
    351                 if (data == null)
    352                     return dataLen;
    353                 Array.Copy(data, 0, m_asyncSendBuffer, dataLen, data.Length);
    354                 dataLen += data.Length;
    355                 if (dataLen > IocpReadLen)
    356                     break;
    357             }
    358             return dataLen;
    359         }
    360         private void OnSendError()
    361         {
    362             lock (this)
    363             {
    364                 if (_socketError == false)
    365                 {
    366                     _socketError = true;
    367                     OnSocketClose?.Invoke(this);
    368                 }
    369                 CloseClient();
    370             }
    371         }
    372         #endregion
    373 
    374         internal void CloseSocket()
    375         {
    376             try
    377             {
    378                 ConnectSocket.Close();
    379             }
    380             catch (Exception ex)
    381             {
    382                 NetLogger.Log("CloseSocket", ex);
    383             }
    384         }
    385 
    386         static object socketCloseLock = new object();
    387         public static int closeSendCount = 0;
    388         public static int closeReadCount = 0;
    389 
    390         bool _disposeSend = false;
    391         void CloseSend()
    392         {
    393             if (!_disposeSend && !_inSendPending)
    394             {
    395                 lock (socketCloseLock)
    396                     closeSendCount++;
    397 
    398                 _disposeSend = true;
    399                 m_sendEventArgs.SetBuffer(null, 0, 0);
    400                 m_sendEventArgs.Completed -= SendEventArgs_Completed;
    401                 m_sendEventArgs.Dispose();
    402             }
    403         }
    404 
    405         bool _disposeRead = false;
    406         void CloseRead()
    407         {
    408             if (!_disposeRead && !_inReadPending)
    409             {
    410                 lock (socketCloseLock)
    411                     closeReadCount++;
    412 
    413                 _disposeRead = true;
    414                 m_receiveEventArgs.SetBuffer(null, 0, 0);
    415                 m_receiveEventArgs.Completed -= ReceiveEventArgs_Completed;
    416                 m_receiveEventArgs.Dispose();
    417             }
    418         }
    419         private void CloseClient()
    420         {
    421             try
    422             {
    423                 CloseSend();
    424                 CloseRead();
    425                 ConnectSocket.Close();
    426             }
    427             catch (Exception ex)
    428             {
    429                 NetLogger.Log("CloseClient", ex);
    430             }
    431         }
    432 
    433         //发送缓冲大小
    434         private List<byte[]> SplitData(byte[] data, int maxLen)
    435         {
    436             List<byte[]> items = new List<byte[]>();
    437 
    438             int start = 0;
    439             while (true)
    440             {
    441                 int itemLen = Math.Min(maxLen, data.Length - start);
    442                 if (itemLen == 0)
    443                     break;
    444                 byte[] item = new byte[itemLen];
    445                 Array.Copy(data, start, item, 0, itemLen);
    446                 items.Add(item);
    447 
    448                 start += itemLen;
    449             }
    450             return items;
    451         }
    452     }
    453 
    454     public enum EN_SocketReadResult
    455     {
    456         InAsyn,
    457         HaveRead,
    458         ReadError
    459     }
    460 
    461     public enum EN_SocketSendResult
    462     {
    463         InAsyn,
    464         HaveSend,
    465         NoSendData,
    466         SendError
    467     }
    468 
    469     class SendBufferPool
    470     {
    471         ObjectPool<byte[]> _bufferPool = new ObjectPool<byte[]>();
    472 
    473         public Int64 _bufferByteCount = 0;
    474         public bool PutObj(byte[] obj)
    475         {
    476             if (_bufferPool.PutObj(obj))
    477             {
    478                 lock (this)
    479                 {
    480                     _bufferByteCount += obj.Length;
    481                 }
    482                 return true;
    483             }
    484             else
    485             {
    486                 return false;
    487             }
    488         }
    489 
    490         public byte[] GetObj()
    491         {
    492             byte[] result = _bufferPool.GetObj();
    493             if (result != null)
    494             {
    495                 lock (this)
    496                 {
    497                     _bufferByteCount -= result.Length;
    498                 }
    499             }
    500             return result;
    501         }
    502     }
    503 }

    NetServer  聚合其他类

      1 using System;
      2 using System.Collections.Generic;
      3 using System.Diagnostics;
      4 using System.Linq;
      5 using System.Net.Sockets;
      6 using System.Threading;
      7 
      8 namespace IocpCore
      9 {
     10     public class NetServer
     11     {
     12         public Action<SocketEventParam> OnSocketPacketEvent;
     13 
     14         //每个连接发送缓冲大小
     15         public int SendBufferBytePerClient { get; set; } = 1024 * 100;
     16 
     17         bool _serverStart = false;
     18         List<NetListener> _listListener = new List<NetListener>();
     19 
     20         //负责对收到的字节流 组成完成的包
     21         ClientPacketManage _clientPacketManage;
     22 
     23         public Int64 SendByteCount { get; set; }
     24         public Int64 ReadByteCount { get; set; }
     25 
     26         List<ListenParam> _listListenPort = new List<ListenParam>();
     27         public void AddListenPort(int port, object tag)
     28         {
     29             _listListenPort.Add(new ListenParam(port, tag));
     30         }
     31         /// <summary>
     32         /// 
     33         /// </summary>
     34         /// <param name="listenFault">监听失败的端口</param>
     35         /// <returns></returns>
     36         public bool StartListen(out List<int> listenFault)
     37         {
     38             _serverStart = true;
     39 
     40             _clientPacketManage = new ClientPacketManage(this);
     41             _clientPacketManage.OnSocketPacketEvent += PutClientPacket;
     42 
     43             _netConnectManage.OnSocketConnectEvent += SocketConnectEvent;
     44 
     45             _listListener.Clear();
     46             Thread thread1 = new Thread(new ThreadStart(NetPacketProcess));
     47             thread1.Start();
     48 
     49             Thread thread2 = new Thread(new ThreadStart(NetSendProcess));
     50             thread2.Start();
     51 
     52             Thread thread3 = new Thread(new ThreadStart(NetReadProcess));
     53             thread3.Start();
     54 
     55             listenFault = new List<int>();
     56             foreach (ListenParam param in _listListenPort)
     57             {
     58                 NetListener listener = new NetListener(this);
     59                 listener._listenParam = param;
     60                 listener.OnAcceptSocket += Listener_OnAcceptSocket;
     61                 if (!listener.StartListen())
     62                 {
     63                     listenFault.Add(param._port);
     64                 }
     65                 else
     66                 {
     67                     _listListener.Add(listener);
     68                     NetLogger.Log(string.Format("监听成功!端口:{0}", param._port));
     69                 }
     70             }
     71 
     72             return listenFault.Count == 0;
     73         }
     74 
     75         public void PutClientPacket(SocketEventParam param)
     76         {
     77             OnSocketPacketEvent?.Invoke(param);
     78         }
     79 
     80         //获取包的最小长度
     81         int _packetMinLen;
     82         int _packetMaxLen;
     83         public int PacketMinLen
     84         {
     85             get { return _packetMinLen; }
     86         }
     87         public int PacketMaxLen
     88         {
     89             get { return _packetMaxLen; }
     90         }
     91 
     92         /// <summary>
     93         /// 设置包的最小和最大长度
     94         /// 当minLen=0时,认为是接收字节流
     95         /// </summary>
     96         /// <param name="minLen"></param>
     97         /// <param name="maxLen"></param>
     98         public void SetPacketParam(int minLen, int maxLen)
     99         {
    100             Debug.Assert(minLen >= 0);
    101             Debug.Assert(maxLen > minLen);
    102             _packetMinLen = minLen;
    103             _packetMaxLen = maxLen;
    104         }
    105 
    106         //获取包的总长度
    107         public delegate int delegate_GetPacketTotalLen(byte[] data, int offset);
    108         public delegate_GetPacketTotalLen GetPacketTotalLen_Callback;
    109 
    110         ObjectPoolWithEvent<SocketEventParam> _socketEventPool = new ObjectPoolWithEvent<SocketEventParam>();
    111         private void NetPacketProcess()
    112         {
    113             while (_serverStart)
    114             {
    115                 try
    116                 {
    117                     DealEventPool();
    118                 }
    119                 catch (Exception ex)
    120                 {
    121                     NetLogger.Log(string.Format("DealEventPool 异常 {0}***{1}", ex.Message, ex.StackTrace));
    122                 }
    123                 _socketEventPool.WaitOne(1000);
    124             }
    125         }
    126 
    127         Dictionary<Socket, AsyncSocketClient> _clientGroup = new Dictionary<Socket, AsyncSocketClient>();
    128         public int ClientCount
    129         {
    130             get
    131             {
    132                 lock (_clientGroup)
    133                 {
    134                     return _clientGroup.Count;
    135                 }
    136             }
    137         }
    138         public List<Socket> ClientList
    139         {
    140             get
    141             {
    142                 lock (_clientGroup)
    143                 {
    144                     return _clientGroup.Keys.ToList();
    145                 }
    146             }
    147         }
    148 
    149         private void DealEventPool()
    150         {
    151             while (true)
    152             {
    153                 SocketEventParam param = _socketEventPool.GetObj();
    154                 if (param == null)
    155                     return;
    156 
    157                 if (param.SocketEvent == EN_SocketEvent.close)
    158                 {
    159                     lock (_clientGroup)
    160                     {
    161                         _clientGroup.Remove(param.Socket);
    162                     }
    163                 }
    164 
    165                 if (_packetMinLen == 0)//字节流处理
    166                 {
    167                     OnSocketPacketEvent?.Invoke(param);
    168                 }
    169                 else
    170                 {
    171                     //组成一个完整的包 逻辑
    172                     _clientPacketManage.PutSocketParam(param);
    173                 }
    174             }
    175         }
    176 
    177         private void SocketConnectEvent(SocketEventParam param, AsyncSocketClient client)
    178         {
    179             try
    180             {
    181                 if (param.Socket == null || client == null) //连接失败
    182                 {
    183                    
    184                 }
    185                 else
    186                 {
    187                     lock (_clientGroup)
    188                     {
    189                         bool remove = _clientGroup.Remove(client.ConnectSocket);
    190                         Debug.Assert(!remove);
    191                         _clientGroup.Add(client.ConnectSocket, client);
    192                     }
    193 
    194                     client.OnSocketClose += Client_OnSocketClose;
    195                     client.OnReadData += Client_OnReadData;
    196                     client.OnSendData += Client_OnSendData;
    197 
    198                     _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
    199                 }
    200                 _socketEventPool.PutObj(param);
    201             }
    202             catch (Exception ex)
    203             {
    204                 NetLogger.Log(string.Format("SocketConnectEvent 异常 {0}***{1}", ex.Message, ex.StackTrace));
    205             }
    206         }
    207 
    208         internal void OnRcvPacketLenError(Socket socket, byte[] buffer, int offset, int packetLen)
    209         {
    210             try
    211             {
    212                 lock (_clientGroup)
    213                 {
    214                     if (!_clientGroup.ContainsKey(socket))
    215                     {
    216                         Debug.Assert(false);
    217                         return;
    218                     }
    219 
    220                     NetLogger.Log(string.Format("报长度异常!包长:{0}", packetLen));
    221                     AsyncSocketClient client = _clientGroup[socket];
    222                     client.CloseSocket();
    223                 }
    224             }
    225             catch (Exception ex)
    226             {
    227                 NetLogger.Log(string.Format("OnRcvPacketLenError 异常 {0}***{1}", ex.Message, ex.StackTrace));
    228             }
    229         }
    230 
    231         #region listen port
    232         private void Listener_OnAcceptSocket(ListenParam listenPatam, AsyncSocketClient client)
    233         {
    234             try
    235             {
    236                 lock (_clientGroup)
    237                 {
    238                     bool remove = _clientGroup.Remove(client.ConnectSocket);
    239                     Debug.Assert(!remove);
    240                     _clientGroup.Add(client.ConnectSocket, client);
    241                 }
    242 
    243                 client.OnSocketClose += Client_OnSocketClose;
    244                 client.OnReadData += Client_OnReadData;
    245                 client.OnSendData += Client_OnSendData;
    246 
    247                 _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
    248 
    249                 SocketEventParam param = new SocketEventParam(EN_SocketEvent.accept, client.ConnectSocket);
    250                 param.ClientInfo = client.ClientInfo;
    251 
    252                 _socketEventPool.PutObj(param);
    253             }
    254             catch (Exception ex)
    255             {
    256                 NetLogger.Log(string.Format("Listener_OnAcceptSocket 异常 {0}***{1}", ex.Message, ex.StackTrace));
    257             }
    258         }
    259 
    260 
    261         ObjectPoolWithEvent<SocketEventDeal> _listSendEvent = new ObjectPoolWithEvent<SocketEventDeal>();
    262         private void NetSendProcess()
    263         {
    264             while (true)
    265             {
    266                 DealSendEvent();
    267                 _listSendEvent.WaitOne(1000);
    268             }
    269         }
    270 
    271         ObjectPoolWithEvent<SocketEventDeal> _listReadEvent = new ObjectPoolWithEvent<SocketEventDeal>();
    272         private void NetReadProcess()
    273         {
    274             while (true)
    275             {
    276                 DealReadEvent();
    277                 _listReadEvent.WaitOne(1000);
    278             }
    279         }
    280 
    281         
    282         private void DealSendEvent()
    283         {
    284             while (true)
    285             {
    286                 SocketEventDeal item = _listSendEvent.GetObj();
    287                 if (item == null)
    288                     break;
    289                 switch (item.SocketEvent)
    290                 {
    291                     case EN_SocketDealEvent.send:
    292                         {
    293                             while (true)
    294                             {
    295                                 EN_SocketSendResult result = item.Client.SendNextData();
    296                                 if (result == EN_SocketSendResult.HaveSend)
    297                                     continue;
    298                                 else
    299                                     break;
    300                             }
    301                         }
    302                         break;
    303                     case EN_SocketDealEvent.read:
    304                         {
    305                             Debug.Assert(false);
    306                         }
    307                         break;                   
    308                 }
    309             }
    310         }
    311 
    312         private void DealReadEvent()
    313         {
    314             while (true)
    315             {
    316                 SocketEventDeal item = _listReadEvent.GetObj();
    317                 if (item == null)
    318                     break;
    319                 switch (item.SocketEvent)
    320                 {
    321                     case EN_SocketDealEvent.read:
    322                         {
    323                             while (true)
    324                             {
    325                                 EN_SocketReadResult result = item.Client.ReadNextData();
    326                                 if (result == EN_SocketReadResult.HaveRead)
    327                                     continue;
    328                                 else
    329                                     break;
    330                             }
    331                         }
    332                         break;
    333                     case EN_SocketDealEvent.send:
    334                         {
    335                             Debug.Assert(false);
    336                         }
    337                         break;
    338                 }
    339             }
    340         }
    341 
    342         private void Client_OnReadData(AsyncSocketClient client, byte[] readData)
    343         {
    344             //读下一条
    345             _listReadEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.read));
    346 
    347             try
    348             {
    349                 SocketEventParam param = new SocketEventParam(EN_SocketEvent.read, client.ConnectSocket);
    350                 param.ClientInfo = client.ClientInfo;
    351                 param.Data = readData;
    352                 _socketEventPool.PutObj(param);
    353 
    354                 lock (this)
    355                 {
    356                     ReadByteCount += readData.Length;
    357                 }
    358             }
    359             catch (Exception ex)
    360             {
    361                 NetLogger.Log(string.Format("Client_OnReadData 异常 {0}***{1}", ex.Message, ex.StackTrace));
    362             }
    363         }
    364 #endregion
    365 
    366         private void Client_OnSendData(AsyncSocketClient client, int sendCount)
    367         {
    368             //发送下一条
    369             _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));
    370             lock (this)
    371             {
    372                 SendByteCount += sendCount;
    373             }
    374         }
    375 
    376         private void Client_OnSocketClose(AsyncSocketClient client)
    377         {
    378             try
    379             {
    380                 SocketEventParam param = new SocketEventParam(EN_SocketEvent.close, client.ConnectSocket);
    381                 param.ClientInfo = client.ClientInfo;
    382                 _socketEventPool.PutObj(param);
    383             }
    384             catch (Exception ex)
    385             {
    386                 NetLogger.Log(string.Format("Client_OnSocketClose 异常 {0}***{1}", ex.Message, ex.StackTrace));
    387             }
    388         }
    389 
    390         /// <summary>
    391         /// 放到发送缓冲
    392         /// </summary>
    393         /// <param name="socket"></param>
    394         /// <param name="data"></param>
    395         /// <returns></returns>
    396         public EN_SendDataResult SendData(Socket socket, byte[] data)
    397         {
    398             if (socket == null)
    399                 return EN_SendDataResult.no_client;
    400             lock (_clientGroup)
    401             {
    402                 if (!_clientGroup.ContainsKey(socket))
    403                     return EN_SendDataResult.no_client;
    404                 AsyncSocketClient client = _clientGroup[socket];
    405                 EN_SendDataResult result = client.PutSendData(data);
    406                 if (result == EN_SendDataResult.ok)
    407                 {
    408                     //发送下一条
    409                     _listSendEvent.PutObj(new SocketEventDeal(client, EN_SocketDealEvent.send));                  
    410                 }
    411                 return result;
    412             }
    413         }
    414 
    415         /// <summary>
    416         /// 设置某个连接的发送缓冲大小
    417         /// </summary>
    418         /// <param name="socket"></param>
    419         /// <param name="byteCount"></param>
    420         /// <returns></returns>
    421         public bool SetClientSendBuffer(Socket socket, int byteCount)
    422         {
    423             lock (_clientGroup)
    424             {
    425                 if (!_clientGroup.ContainsKey(socket))
    426                     return false;
    427                 AsyncSocketClient client = _clientGroup[socket];
    428                 client.SendBufferByteCount = byteCount;
    429                 return true;
    430             }
    431         }
    432 
    433 
    434         #region connect process
    435         NetConnectManage _netConnectManage = new NetConnectManage();
    436         /// <summary>
    437         /// 异步连接一个客户端
    438         /// </summary>
    439         /// <param name="peerIp"></param>
    440         /// <param name="peerPort"></param>
    441         /// <param name="tag"></param>
    442         /// <returns></returns>
    443         public bool ConnectAsyn(string peerIp, int peerPort, object tag)
    444         {
    445             return _netConnectManage.ConnectAsyn(peerIp, peerPort, tag);
    446         }
    447 
    448         /// <summary>
    449         /// 同步连接一个客户端
    450         /// </summary>
    451         /// <param name="peerIp"></param>
    452         /// <param name="peerPort"></param>
    453         /// <param name="tag"></param>
    454         /// <param name="socket"></param>
    455         /// <returns></returns>
    456         public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
    457         {
    458             return _netConnectManage.Connect(peerIp, peerPort, tag, out socket);
    459         }
    460         #endregion
    461     }
    462 
    463     enum EN_SocketDealEvent
    464     {
    465         read,
    466         send,
    467     }
    468     class SocketEventDeal
    469     {
    470         public AsyncSocketClient Client { get; set; }
    471         public EN_SocketDealEvent SocketEvent { get; set; }
    472         public SocketEventDeal(AsyncSocketClient client, EN_SocketDealEvent socketEvent)
    473         {
    474             Client = client;
    475             SocketEvent = socketEvent;
    476         }
    477     }
    478 }
    View Code

    库的使用

    使用起来非常简单,示例如下 

      1 using IocpCore;
      2 using System;
      3 using System.Collections.Generic;
      4 using System.Linq;
      5 using System.Net.Sockets;
      6 using System.Text;
      7 using System.Threading.Tasks;
      8 using System.Windows;
      9 
     10 namespace WarningClient
     11 {
     12     public class SocketServer
     13     {
     14         public Action<SocketEventParam> OnSocketEvent;
     15 
     16         public Int64 SendByteCount
     17         {
     18             get
     19             {
     20                 if (_netServer == null)
     21                     return 0;
     22                 return _netServer.SendByteCount;
     23             }
     24         }
     25         public Int64 ReadByteCount
     26         {
     27             get
     28             {
     29                 if (_netServer == null)
     30                     return 0;
     31                 return _netServer.ReadByteCount;
     32             }
     33         }
     34 
     35         NetServer _netServer;
     36         EN_PacketType _packetType = EN_PacketType.byteStream;
     37         public void SetPacktType(EN_PacketType packetType)
     38         {
     39             _packetType = packetType;
     40             if (_netServer == null)
     41                 return;
     42             if (packetType == EN_PacketType.byteStream)
     43             {
     44                 _netServer.SetPacketParam(0, 1024);
     45             }
     46             else
     47             {
     48                 _netServer.SetPacketParam(9, 1024);
     49             }
     50         }
     51 
     52         public bool Init(List<int> listenPort)
     53         {
     54             NetLogger.OnLogEvent += NetLogger_OnLogEvent;
     55             _netServer = new NetServer();
     56             SetPacktType(_packetType);
     57             _netServer.GetPacketTotalLen_Callback += GetPacketTotalLen;
     58             _netServer.OnSocketPacketEvent += SocketPacketDeal;
     59 
     60             foreach (int n in listenPort)
     61             {
     62                 _netServer.AddListenPort(n, n);
     63             }
     64 
     65             List<int> listenFault;
     66             bool start = _netServer.StartListen(out listenFault);
     67             return start;
     68         }
     69 
     70         int GetPacketTotalLen(byte[] data, int offset)
     71         {
     72             if (MainWindow._packetType == EN_PacketType.znss)
     73                 return GetPacketZnss(data, offset);
     74             else
     75                 return GetPacketAnzhiyuan(data, offset);
     76         }
     77 
     78         int GetPacketAnzhiyuan(byte[] data, int offset)
     79         {
     80             int n = data[offset + 5] + 6;
     81             return n;
     82         }
     83 
     84         int GetPacketZnss(byte[] data, int offset)
     85         {
     86             int packetLen = (int)(data[4]) + 5;
     87             return packetLen;
     88         }
     89 
     90 
     91         public bool ConnectAsyn(string peerIp, int peerPort, object tag)
     92         {
     93             return _netServer.ConnectAsyn(peerIp, peerPort, tag);
     94         }
     95 
     96         public bool Connect(string peerIp, int peerPort, object tag, out Socket socket)
     97         {
     98             return _netServer.Connect(peerIp, peerPort, tag, out socket);
     99         }
    100 
    101         private void NetLogger_OnLogEvent(string message)
    102         {
    103             AppLog.Log(message);
    104         }
    105 
    106         Dictionary<Socket, SocketEventParam> _clientGroup = new Dictionary<Socket, SocketEventParam>();
    107 
    108         public int ClientCount
    109         {
    110             get
    111             {
    112                 lock (_clientGroup)
    113                 {
    114                     return _clientGroup.Count;
    115                 }
    116             }
    117         }
    118         public List<Socket> ClientList
    119         {
    120             get
    121             {
    122                 if (_netServer != null)
    123                     return _netServer.ClientList;
    124                 return new List<Socket>();
    125             }
    126         }
    127         void AddClient(SocketEventParam socketParam)
    128         {
    129             lock (_clientGroup)
    130             {
    131                 _clientGroup.Remove(socketParam.Socket);
    132                 _clientGroup.Add(socketParam.Socket, socketParam);
    133             }
    134         }
    135 
    136         void RemoveClient(SocketEventParam socketParam)
    137         {
    138             lock (_clientGroup)
    139             {
    140                 _clientGroup.Remove(socketParam.Socket);
    141             }
    142         }
    143 
    144         ObjectPool<SocketEventParam> _readDataPool = new ObjectPool<SocketEventParam>();
    145 
    146         public ObjectPool<SocketEventParam> ReadDataPool
    147         {
    148             get
    149             {
    150                 return _readDataPool;
    151             }
    152         }
    153 
    154         private void SocketPacketDeal(SocketEventParam socketParam)
    155         {
    156             OnSocketEvent?.Invoke(socketParam);
    157             if (socketParam.SocketEvent == EN_SocketEvent.read)
    158             {
    159                 if (MainWindow._isShowReadPacket)
    160                     _readDataPool.PutObj(socketParam);
    161             }
    162             else if (socketParam.SocketEvent == EN_SocketEvent.accept)
    163             {
    164                 AddClient(socketParam);
    165                 string peerIp = socketParam.ClientInfo.PeerIpPort;
    166                 AppLog.Log(string.Format("客户端链接!本地端口:{0},对端:{1}",
    167                     socketParam.ClientInfo.LocalPort, peerIp));
    168             }
    169             else if (socketParam.SocketEvent == EN_SocketEvent.connect)
    170             {
    171                 string peerIp = socketParam.ClientInfo.PeerIpPort;
    172                 if (socketParam.Socket != null)
    173                 {
    174                     AddClient(socketParam);
    175 
    176                     AppLog.Log(string.Format("连接对端成功!本地端口:{0},对端:{1}",
    177                        socketParam.ClientInfo.LocalPort, peerIp));
    178                 }
    179                 else
    180                 {
    181                     AppLog.Log(string.Format("连接对端失败!本地端口:{0},对端:{1}",
    182                         socketParam.ClientInfo.LocalPort, peerIp));
    183                 }
    184             }
    185             else if (socketParam.SocketEvent == EN_SocketEvent.close)
    186             {
    187                 MainWindow.MainWnd.OnSocketDisconnect(socketParam.Socket);
    188                 RemoveClient(socketParam);
    189                 string peerIp = socketParam.ClientInfo.PeerIpPort;
    190                 AppLog.Log(string.Format("客户端断开!本地端口:{0},对端:{1},",
    191                     socketParam.ClientInfo.LocalPort, peerIp));
    192             }
    193         }
    194 
    195         public EN_SendDataResult SendData(Socket socket, byte[] data)
    196         {
    197             if(socket == null)
    198             {
    199                 MessageBox.Show("还没连接!");
    200                 return EN_SendDataResult.no_client;
    201             }
    202             return _netServer.SendData(socket, data);
    203         }
    204 
    205         internal void SendToAll(byte[] data)
    206         {
    207             lock (_clientGroup)
    208             {
    209                 foreach (Socket socket in _clientGroup.Keys)
    210                 {
    211                     SendData(socket, data);
    212                 }
    213             }
    214         }
    215     }
    216 }
    View Code

     技术交流联系qq 13712486

  • 相关阅读:
    tomcat配置
    java.net.ConnectException: Connection timed out: connect,java.net.ConnectException: Connection timed out: connect at java.net.DualStackPlainSocketImpl.waitForConnect
    Tomat 下载地址
    Gradle的依赖方式——Lombok在Gradle中的正确配置姿势 本文来源:码农网 本文链接:https://www.codercto.com/a/70161.html
    mssql 那表语句
    监控系统搭建
    vue 子组件触发父组件方法的两种方式
    css margin边界叠加问题详谈
    sticky footer
    JS的构造函数
  • 原文地址:https://www.cnblogs.com/yuanchenhui/p/asyn_scoket.html
Copyright © 2011-2022 走看看