zoukankan      html  css  js  c++  java
  • 高性能 socket 框架

          最近用C# 写了一个易用,高性能的socket,抗住了ServerTestTool.exe 压力测试工具 

    下载地址  GitHub       QQ群: 664740973

    https://gitee.com/fengma_312/socket.core

    https://www.nuget.org/packages/socket.core/


    socket.core

    这是一个基于C# .net standard2.0 写的socket框架,可使用于.net Framework/dotnet core程序集,能在window(IOCP)/linux(epoll)运行.使用异步连接,异步发送,异步接收,性能爆表,并且通过压力测试。

    安装NuGet:
    Package Manager: Install-Package socket.core
    .Net CLI :dotnet add package socket.core
    Paket CLI:paket add socket.core

    服务端所在socket.core.Server命名空间下,分别为三种模式 push/pull/pack
    客户端所在socket.core.Client命名空间下,分别为三种模式 push/pull/pack

    主要流程与对应的方法和事件介绍.
    注:connectId(int)代表着一个连接对象,data(byte[]),success(bool)

    • 1.初始化socket(对应的三种模式)

      实例化服务端类 TcpPushServer/TcpPullServer/TcpPackServer
      实例化客户端类 TcpPushClient/TcpPullClient/TcpPackClient
      参数介绍int numConnections同时处理的最大连接数,int receiveBufferSize用于每个套接字I/O操作的缓冲区大小(接收端), int overtime超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时,uint headerFlag包头标记范围0~1023(0x3FF),当包头标识等于0时,不校验包头

    • 2.启动监听/连接服务器

      服务端 server.Start(port);
      客户端 client.Connect(ip,port);

    • 3.触发连接事件

      服务端 server.OnAccept(connectId); 接收到一个连接id,可用他来发送,接收,关闭的标记
      客户端 client.OnConnect(success); 接收是否成功连接到服务器

    • 4.发送消息

      服务端 server.Send(connectId,data,offset,length);
      客户端 client.Send(data,offset,length);

    • 5.触发已发送事件

      服务端 server.OnSend(connectId,length);
      客户端 client.OnSend(length);

    • 6.触发接收事件

      服务端 server.OnReceive(connectId, data);
      客户端 client.OnReceive(data);

    • 7.关闭连接

      服务端 server.Close(connectId);
      客户端 client.Close();

    • 8.触发关闭连接事件

      服务端 server.OnClose(connectId);
      客户端 client.OnClose();

    三种模型简介

    • 一:push

      当接收到数据时会触发监听事件OnReceive(connectId,data);把数据立马“推”给应用程序

    • 二:pull

      当接收到数据时会触发监听事件OnReceive(connectId,length),告诉应用程序当前已经接收到了多少数据长度,应用程序可使用GetLength(connectId)方法检查已接收的数据的长度,如果满足则调用组件的Fetch(connectId,length)方法,把需要的数据“拉”出来

    • 三:pack

      pack模型组件是push和pull模型的结合体,应用程序不必要处理分包/合包,组件保证每个server.OnReceive(connectId,data)/client.OnReceive(data)事件都向应用程序提供一个完整的数据包
      注:pack模型组件会对应用程序发送的每个数据包自动加上4个字节(32bit)的包头,组件接收到数据时,根据包头信息自动分包,每个完整的数据包通过OnReceive(connectId, data)事件发送给应用程序
      PACK包头格式(4字节)4*8=32
      XXXXXXXXXXYYYYYYYYYYYYYYYYYYYYYY
      前10位X为包头标识位,用于数据包校验,有效包头标识取值范围0~1023(0x3FF),当包头标识等于0时,不校验包头,后22位Y为长度位,记录包体长度。有效数据包最大长度不能超过4194303(0x3FFFFF)字节(byte),应用程序可以通过TcpPackServer/TcpPackClient构造函数参数headerFlag设置

    服务端其它方法介绍

        1. bool SetAttached(int connectId, object data)

        服务端为每个客户端设置附加数据,避免用户自己再建立用户映射表

        1. T GetAttached(int connectId)

        获取指定客户端的附加数据

        2017/12/27

        技术在于分享,大家共同进步

     核心类 

      1 using System;
      2 using System.Collections.Generic;
      3 using System.Net;
      4 using System.Net.Sockets;
      5 using System.Text;
      6 using System.Threading;
      7 using System.Linq;
      8 using System.Collections.Concurrent;
      9 using socket.core.Common;
     10 
     11 namespace socket.core.Server
     12 {
     13     /// <summary>
     14     /// tcp Socket监听基库
     15     /// </summary>
     16     internal class TcpServer
     17     {
     18         /// <summary>
     19         /// 连接标示 自增长
     20         /// </summary>
     21         private int connectId;
     22         /// <summary>
     23         /// 同时处理的最大连接数
     24         /// </summary>
     25         private int m_numConnections;
     26         /// <summary>
     27         /// 用于每个套接字I/O操作的缓冲区大小
     28         /// </summary>
     29         private int m_receiveBufferSize;
     30         /// <summary>
     31         /// 所有套接字接收操作的一个可重用的大型缓冲区集合。
     32         /// </summary>
     33         private BufferManager m_bufferManager;
     34         /// <summary>
     35         /// 用于监听传入连接请求的套接字
     36         /// </summary>
     37         private Socket listenSocket;
     38         /// <summary>
     39         /// 接受端SocketAsyncEventArgs对象重用池,接受套接字操作
     40         /// </summary>
     41         private SocketAsyncEventArgsPool m_receivePool;
     42         /// <summary>
     43         /// 发送端SocketAsyncEventArgs对象重用池,发送套接字操作
     44         /// </summary>
     45         private SocketAsyncEventArgsPool m_sendPool;
     46         /// <summary>
     47         /// 超时,如果超时,服务端断开连接,客户端需要重连操作
     48         /// </summary>
     49         private int overtime;
     50         /// <summary>
     51         /// 超时检查间隔时间(秒)
     52         /// </summary>
     53         private int overtimecheck = 1;
     54         /// <summary>
     55         /// 能接到最多客户端个数的原子操作
     56         /// </summary>
     57         private Semaphore m_maxNumberAcceptedClients;
     58         /// <summary>
     59         /// 已经连接的对象池
     60         /// </summary>
     61         internal ConcurrentDictionary<int, ConnectClient> connectClient;
     62         /// <summary>
     63         /// 发送线程数
     64         /// </summary>
     65         private int sendthread = 10;
     66         /// <summary>
     67         /// 需要发送的数据
     68         /// </summary>
     69         private ConcurrentQueue<SendingQueue>[] sendQueues;      
     70         /// <summary>
     71         /// 72         /// </summary>
     73         private Mutex mutex = new Mutex();
     74         /// <summary>
     75         /// 连接成功事件
     76         /// </summary>
     77         internal event Action<int> OnAccept;
     78         /// <summary>
     79         /// 接收通知事件
     80         /// </summary>
     81         internal event Action<int, byte[], int, int> OnReceive;
     82         /// <summary>
     83         /// 已送通知事件
     84         /// </summary>
     85         internal event Action<int, int> OnSend;
     86         /// <summary>
     87         /// 断开连接通知事件
     88         /// </summary>
     89         internal event Action<int> OnClose;
     90 
     91         /// <summary>
     92         /// 设置基本配置
     93         /// </summary>   
     94         /// <param name="numConnections">同时处理的最大连接数</param>
     95         /// <param name="receiveBufferSize">用于每个套接字I/O操作的缓冲区大小(接收端)</param>
     96         /// <param name="overTime">超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时</param>
     97         internal TcpServer(int numConnections, int receiveBufferSize, int overTime)
     98         {
     99             overtime = overTime;
    100             m_numConnections = numConnections;
    101             m_receiveBufferSize = receiveBufferSize;
    102             m_bufferManager = new BufferManager(receiveBufferSize * m_numConnections, receiveBufferSize);
    103             m_receivePool = new SocketAsyncEventArgsPool(m_numConnections);
    104             m_sendPool = new SocketAsyncEventArgsPool(m_numConnections);
    105             m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections);
    106             Init();
    107         }
    108 
    109         /// <summary>
    110         /// 初始化服务器通过预先分配的可重复使用的缓冲区和上下文对象。这些对象不需要预先分配或重用,但这样做是为了说明API如何可以易于用于创建可重用对象以提高服务器性能。
    111         /// </summary>
    112         private void Init()
    113         {
    114             connectClient = new ConcurrentDictionary<int, ConnectClient>();
    115             sendQueues = new ConcurrentQueue<SendingQueue>[sendthread];
    116             for (int i = 0; i < sendthread; i++)
    117             {
    118                 sendQueues[i] = new ConcurrentQueue<SendingQueue>();
    119             }
    120             //分配一个大字节缓冲区,所有I/O操作都使用一个。这个侍卫对内存碎片
    121             m_bufferManager.InitBuffer();
    122             //预分配的接受对象池socketasynceventargs,并分配缓存
    123             SocketAsyncEventArgs saea_receive;
    124             //分配的发送对象池socketasynceventargs,但是不分配缓存
    125             SocketAsyncEventArgs saea_send;
    126             for (int i = 0; i < m_numConnections; i++)
    127             {
    128                 //预先接受端分配一组可重用的消息
    129                 saea_receive = new SocketAsyncEventArgs();
    130                 saea_receive.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
    131                 //分配缓冲池中的字节缓冲区的socketasynceventarg对象
    132                 m_bufferManager.SetBuffer(saea_receive);
    133                 m_receivePool.Push(saea_receive);
    134                 //预先发送端分配一组可重用的消息
    135                 saea_send = new SocketAsyncEventArgs();
    136                 saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
    137                 m_sendPool.Push(saea_send);
    138             }
    139         }
    140 
    141         /// <summary>
    142         /// 启动tcp服务侦听
    143         /// </summary>       
    144         /// <param name="port">监听端口</param>
    145         internal void Start(int port)
    146         {
    147             IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port);
    148             //创建listens是传入的套接字。
    149             listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
    150             listenSocket.NoDelay = true;
    151             //绑定端口
    152             listenSocket.Bind(localEndPoint);
    153             //挂起的连接队列的最大长度。
    154             listenSocket.Listen(1000);
    155             //在监听套接字上接受
    156             StartAccept(null);
    157             //发送线程
    158             for (int i = 0; i < sendthread; i++)
    159             {
    160                 Thread thread = new Thread(StartSend);
    161                 thread.IsBackground = true;
    162                 thread.Priority = ThreadPriority.AboveNormal;
    163                 thread.Start(i);
    164             }
    165             //超时机制
    166             if (overtime > 0)
    167             {
    168                 Thread heartbeat = new Thread(new ThreadStart(() =>
    169                 {
    170                     Heartbeat();
    171                 }));
    172                 heartbeat.IsBackground = true;
    173                 heartbeat.Priority = ThreadPriority.Lowest;
    174                 heartbeat.Start();
    175             }
    176         }
    177 
    178         /// <summary>
    179         /// 超时机制
    180         /// </summary>
    181         private void Heartbeat()
    182         {
    183             //计算超时次数 ,超过count就当客户端断开连接。服务端清除该连接资源
    184             int count = overtime / overtimecheck;
    185             while (true)
    186             {
    187                 foreach (var item in connectClient.Values)
    188                 {
    189                     if (item.keep_alive >= count)
    190                     {
    191                         item.keep_alive = 0;
    192                         CloseClientSocket(item.saea_receive);
    193                     }
    194                 }
    195                 foreach (var item in connectClient.Values)
    196                 {
    197                     item.keep_alive++;
    198                 }
    199                 Thread.Sleep(overtimecheck * 1000);
    200             }
    201         }
    202 
    203         #region Accept
    204 
    205         /// <summary>
    206         /// 开始接受客户端的连接请求的操作。
    207         /// </summary>
    208         /// <param name="acceptEventArg">发布时要使用的上下文对象服务器侦听套接字上的接受操作</param>
    209         private void StartAccept(SocketAsyncEventArgs acceptEventArg)
    210         {
    211             if (acceptEventArg == null)
    212             {
    213                 acceptEventArg = new SocketAsyncEventArgs();
    214                 acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
    215             }
    216             else
    217             {
    218                 // 套接字必须被清除,因为上下文对象正在被重用。
    219                 acceptEventArg.AcceptSocket = null;
    220             }
    221             m_maxNumberAcceptedClients.WaitOne();
    222             //准备一个客户端接入
    223             if (!listenSocket.AcceptAsync(acceptEventArg))
    224             {
    225                 ProcessAccept(acceptEventArg);
    226             }
    227         }
    228 
    229         /// <summary>
    230         /// 当异步连接完成时调用此方法
    231         /// </summary>
    232         /// <param name="e">操作对象</param>
    233         private void ProcessAccept(SocketAsyncEventArgs e)
    234         {
    235             connectId++;
    236             //把连接到的客户端信息添加到集合中
    237             ConnectClient connecttoken = new ConnectClient();
    238             connecttoken.socket = e.AcceptSocket;
    239             //从接受端重用池获取一个新的SocketAsyncEventArgs对象
    240             connecttoken.saea_receive = m_receivePool.Pop();
    241             connecttoken.saea_receive.UserToken = connectId;
    242             connecttoken.saea_receive.AcceptSocket = e.AcceptSocket;
    243             connectClient.TryAdd(connectId, connecttoken);
    244             //一旦客户机连接,就准备接收。
    245             if (!e.AcceptSocket.ReceiveAsync(connecttoken.saea_receive))
    246             {
    247                 ProcessReceive(connecttoken.saea_receive);
    248             }
    249             //事件回调
    250             if (OnAccept != null)
    251             {
    252                 OnAccept(connectId);
    253             }
    254             //接受第二连接的请求
    255             StartAccept(e);
    256         }
    257 
    258         #endregion
    259 
    260         #region 接受处理 receive
    261 
    262         /// <summary>
    263         /// 接受处理回调
    264         /// </summary>
    265         /// <param name="e">操作对象</param>
    266         private void ProcessReceive(SocketAsyncEventArgs e)
    267         {
    268             //检查远程主机是否关闭连接
    269             if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
    270             {
    271                 int connectId = (int)e.UserToken;
    272                 ConnectClient client;
    273                 if (!connectClient.TryGetValue(connectId, out client))
    274                 {
    275                     return;
    276                 }
    277                 //如果接收到数据,超时记录设置为0
    278                 if (overtime > 0)
    279                 {
    280                     if (client != null)
    281                     {
    282                         client.keep_alive = 0;
    283                     }
    284                 }
    285                 //回调               
    286                 if (OnReceive != null)
    287                 {
    288                     if (client != null)
    289                     {
    290                         OnReceive(connectId, e.Buffer, e.Offset, e.BytesTransferred);
    291                     }
    292                 }
    293                 //准备下次接收数据      
    294                 try
    295                 {
    296                     if (!e.AcceptSocket.ReceiveAsync(e))
    297                     {
    298                         ProcessReceive(e);
    299                     }
    300                 }
    301                 catch (ObjectDisposedException ex)
    302                 {
    303                     if (OnClose != null)
    304                     {
    305                         OnClose(connectId);
    306                     }
    307                 }
    308             }
    309             else
    310             {
    311                 CloseClientSocket(e);
    312             }
    313         }
    314 
    315         #endregion
    316 
    317         #region 发送处理 send
    318 
    319         /// <summary>
    320         /// 开始启用发送
    321         /// </summary>
    322         private void StartSend(object thread)
    323         {
    324             while (true)
    325             {
    326                 SendingQueue sending;
    327                 if (sendQueues[(int)thread].TryDequeue(out sending))
    328                 {
    329                     Send(sending);
    330                 }
    331                 else
    332                 {
    333                     Thread.Sleep(100);
    334                 }
    335             }
    336         }
    337 
    338         /// <summary>
    339         /// 异步发送消息 
    340         /// </summary>
    341         /// <param name="connectId">连接ID</param>
    342         /// <param name="data">数据</param>
    343         /// <param name="offset">偏移位</param>
    344         /// <param name="length">长度</param>
    345         internal void Send(int connectId, byte[] data, int offset, int length)
    346         {
    347             sendQueues[connectId % sendthread].Enqueue(new SendingQueue() { connectId = connectId, data = data, offset = offset, length = length });
    348         }
    349 
    350         /// <summary>
    351         /// 异步发送消息 
    352         /// </summary>
    353         /// <param name="sendQuere">发送消息体</param>
    354         private void Send(SendingQueue sendQuere)
    355         {
    356             ConnectClient client;
    357             if (!connectClient.TryGetValue(sendQuere.connectId, out client))
    358             {
    359                 return;
    360             }
    361             //如果发送池为空时,临时新建一个放入池中
    362             mutex.WaitOne();
    363             if (m_sendPool.Count == 0)
    364             {
    365                 SocketAsyncEventArgs saea_send = new SocketAsyncEventArgs();
    366                 saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
    367                 m_sendPool.Push(saea_send);
    368             }
    369             SocketAsyncEventArgs sendEventArgs = m_sendPool.Pop();
    370             mutex.ReleaseMutex();
    371             sendEventArgs.UserToken = sendQuere.connectId;
    372             sendEventArgs.SetBuffer(sendQuere.data, sendQuere.offset, sendQuere.length);
    373             try
    374             {
    375                 if (!client.socket.SendAsync(sendEventArgs))
    376                 {
    377                     ProcessSend(sendEventArgs);
    378                 }
    379             }
    380             catch (ObjectDisposedException ex)
    381             {
    382                 if (OnClose != null)
    383                 {
    384                     OnClose(sendQuere.connectId);
    385                 }
    386             }
    387             sendQuere = null;
    388         }
    389 
    390         /// <summary>
    391         /// 发送回调
    392         /// </summary>
    393         /// <param name="e">操作对象</param>
    394         private void ProcessSend(SocketAsyncEventArgs e)
    395         {
    396             if (e.SocketError == SocketError.Success)
    397             {
    398                 m_sendPool.Push(e);
    399                 if (OnSend != null)
    400                 {
    401                     OnSend((int)e.UserToken, e.BytesTransferred);
    402                 }
    403             }
    404             else
    405             {
    406                 CloseClientSocket(e);
    407             }
    408         }
    409 
    410         #endregion
    411 
    412         /// <summary>
    413         /// 每当套接字上完成接收或发送操作时,都会调用此方法。
    414         /// </summary>
    415         /// <param name="sender"></param>
    416         /// <param name="e">与完成的接收操作关联的SocketAsyncEventArg</param>
    417         private void IO_Completed(object sender, SocketAsyncEventArgs e)
    418         {
    419             //确定刚刚完成哪种类型的操作并调用相关的处理程序
    420             switch (e.LastOperation)
    421             {
    422                 case SocketAsyncOperation.Receive:
    423                     ProcessReceive(e);
    424                     break;
    425                 case SocketAsyncOperation.Send:
    426                     ProcessSend(e);
    427                     break;
    428                 case SocketAsyncOperation.Accept:
    429                     ProcessAccept(e);
    430                     break;
    431                 default:
    432                     break;
    433             }
    434         }
    435 
    436         #region 断开连接处理
    437 
    438 
    439         /// <summary>
    440         /// 客户端断开一个连接
    441         /// </summary>
    442         /// <param name="connectId">连接标记</param>
    443         internal void Close(int connectId)
    444         {
    445             ConnectClient client;
    446             if (!connectClient.TryGetValue(connectId, out client))
    447             {
    448                 return;
    449             }
    450             CloseClientSocket(client.saea_receive);
    451         }
    452 
    453         /// <summary>
    454         /// 断开一个连接
    455         /// </summary>
    456         /// <param name="e">操作对象</param>
    457         private void CloseClientSocket(SocketAsyncEventArgs e)
    458         {
    459             if (e.LastOperation == SocketAsyncOperation.Receive)
    460             {
    461                 int connectId = (int)e.UserToken;
    462                 ConnectClient client;
    463                 if (!connectClient.TryGetValue(connectId, out client))
    464                 {
    465                     return;
    466                 }
    467                 if (client.socket.Connected == false)
    468                 {
    469                     return;
    470                 }
    471                 try
    472                 {
    473                     client.socket.Shutdown(SocketShutdown.Both);
    474                 }
    475                 // 抛出客户端进程已经关闭
    476                 catch (Exception) { }
    477                 client.socket.Close();
    478                 m_receivePool.Push(e);
    479                 m_maxNumberAcceptedClients.Release();
    480                 if (OnClose != null)
    481                 {
    482                     OnClose(connectId);
    483                 }
    484                 connectClient.TryRemove((int)e.UserToken, out client);
    485                 client = null;
    486             }
    487         }
    488 
    489         #endregion
    490 
    491         #region 附加数据
    492 
    493         /// <summary>
    494         /// 给连接对象设置附加数据
    495         /// </summary>
    496         /// <param name="connectId">连接标识</param>
    497         /// <param name="data">附加数据</param>
    498         /// <returns>true:设置成功,false:设置失败</returns>
    499         internal bool SetAttached(int connectId, object data)
    500         {
    501             ConnectClient client;
    502             if (!connectClient.TryGetValue(connectId, out client))
    503             {
    504                 return false;
    505             }
    506             client.attached = data;
    507             return true;
    508         }
    509 
    510         /// <summary>
    511         /// 获取连接对象的附加数据
    512         /// </summary>
    513         /// <param name="connectId">连接标识</param>
    514         /// <returns>附加数据,如果没有找到则返回null</returns>
    515         internal T GetAttached<T>(int connectId)
    516         {
    517             ConnectClient client;
    518             if (!connectClient.TryGetValue(connectId, out client))
    519             {
    520                 return default(T);
    521             }
    522             else
    523             {
    524                 return (T)client.attached;
    525             }
    526         }
    527         #endregion
    528     }
    529 
    530 }
    socket核心类
  • 相关阅读:
    对用户控件(ascx)属性(property)赋值
    The Controls collection cannot be modified because the control contains code blocks (i.e. <% ... %>).
    图片淡入淡出切换效果
    在用户控件(ASCX)创建用户控件(ASCX)
    Login failed for user 'xxx'
    一些较好的书
    儿子购买的书
    怀念以前做网管的日子
    Linux下selinux简单梳理
    Rsync同步时删除多余文件 [附:删除大量文件方法的效率对比]
  • 原文地址:https://www.cnblogs.com/fengmazi/p/8183078.html
Copyright © 2011-2022 走看看