zoukankan      html  css  js  c++  java
  • linux可用的跨平台C# .net standard2.0 写的高性能socket框架

    能在window(IOCP)/linux(epoll)运行,基于C# .net standard2.0 写的socket框架,可使用于.net Framework/dotnet core程序集,.使用异步连接,异步发送,异步接收,性能爆表,并且通过压力测试。

    源码下载地址:

    https://download.csdn.net/download/guosa542129/11980602

    通过并发测试,多线程测试程序地址:

    https://download.csdn.net/download/guosa542129/11980605

    操作过程:

    安装NuGet: https://www.nuget.org/packages/socket.core/
    Package Manager: Install-Package socket.core
    .Net CLI :dotnet add package socket.core
    Paket CLI:paket add socket.core

    一:TCP模块介绍

    服务端所在socket.core.Server命名空间下,分别为三种模式 push/pull/pack
    客户端所在socket.core.Client命名空间下,分别为三种模式 push/pull/pack

    主要流程与对应的方法和事件介绍.
    注:connectId(int)代表着一个连接对象,data(byte[]),success(bool)

    • 1.初始化TCP实现类(对应的三种模式)

      实例化服务端类 TcpPushServer/TcpPullServer/TcpPackServer
      实例化客户端类 TcpPushClient/TcpPullClient/TcpPackClient
      参数介绍int numConnections同时处理的最大连接数,int receiveBufferSize用于每个套接字I/O操作的缓冲区大小(接收端), int overtime超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时,uint headerFlag包头标记范围0~1023(0x3FF),当包头标识等于0时,不校验包头

    • 2.启动监听/连接服务器

      服务端 server.Start(port);
      客户端 client.Connect(ip,port);

    • 3.触发连接事件

      服务端 server.OnAccept(connectId); 接收到一个连接id,可用他来发送,接收,关闭的标记
      客户端 client.OnConnect(success); 接收是否成功连接到服务器

    • 4.发送消息

      服务端 server.Send(connectId,data,offset,length);
      客户端 client.Send(data,offset,length);

    • 5.触发已发送事件

      服务端 server.OnSend(connectId,length);
      客户端 client.OnSend(length);

    • 6.触发接收事件

      服务端 server.OnReceive(connectId, data);
      客户端 client.OnReceive(data);

    • 7.关闭连接

      服务端 server.Close(connectId);
      客户端 client.Close();

    • 8.触发关闭连接事件

      服务端 server.OnClose(connectId);
      客户端 client.OnClose();

    三种模型简介

    • 一:push

      当接收到数据时会触发监听事件OnReceive(connectId,data);把数据立马“推”给应用程序

    • 二:pull

      当接收到数据时会触发监听事件OnReceive(connectId,length),告诉应用程序当前已经接收到了多少数据长度,应用程序可使用GetLength(connectId)方法检查已接收的数据的长度,如果满足则调用组件的Fetch(connectId,length)方法,把需要的数据“拉”出来

    • 三:pack

      pack模型组件是push和pull模型的结合体,应用程序不必要处理分包/合包,组件保证每个server.OnReceive(connectId,data)/client.OnReceive(data)事件都向应用程序提供一个完整的数据包
      注:pack模型组件会对应用程序发送的每个数据包自动加上4个字节(32bit)的包头,组件接收到数据时,根据包头信息自动分包,每个完整的数据包通过OnReceive(connectId, data)事件发送给应用程序
      PACK包头格式(4字节)4*8=32
      XXXXXXXXXXYYYYYYYYYYYYYYYYYYYYYY
      前10位X为包头标识位,用于数据包校验,有效包头标识取值范围0~1023(0x3FF),当包头标识等于0时,不校验包头,后22位Y为长度位,记录包体长度。有效数据包最大长度不能超过4194303(0x3FFFFF)字节(byte),应用程序可以通过TcpPackServer/TcpPackClient构造函数参数headerFlag设置

    服务端其它方法介绍

      1. bool SetAttached(int connectId, object data)

      服务端为每个客户端设置附加数据,避免用户自己再建立用户映射表

      1. T GetAttached(int connectId)

      获取指定客户端的附加数据

      1. 属性:ConcurrentDictionary<int, string> ClientList

      获取正在连接的客户端信息<connectId,ip和端口>

    二:核心源码

      • using System;
        using System.Collections.Generic;
        using System.Net;
        using System.Net.Sockets;
        using System.Text;
        using System.Threading;
        using System.Linq;
        using System.Collections.Concurrent;
        using socket.core.Common;
        
        namespace socket.core.Server
        {
            /// <summary>
            /// tcp Socket监听基库
            /// </summary>
            internal class TcpServer
            {
                /// <summary>
                /// 连接标示 自增长
                /// </summary>
                private int connectId;
                /// <summary>
                /// 同时处理的最大连接数
                /// </summary>
                private int m_numConnections;
                /// <summary>
                /// 用于每个套接字I/O操作的缓冲区大小
                /// </summary>
                private int m_receiveBufferSize;
                /// <summary>
                /// 所有套接字接收操作的一个可重用的大型缓冲区集合。
                /// </summary>
                private BufferManager m_bufferManager;
                /// <summary>
                /// 用于监听传入连接请求的套接字
                /// </summary>
                private Socket listenSocket;
                /// <summary>
                /// 接受端SocketAsyncEventArgs对象重用池,接受套接字操作
                /// </summary>
                private SocketAsyncEventArgsPool m_receivePool;
                /// <summary>
                /// 发送端SocketAsyncEventArgs对象重用池,发送套接字操作
                /// </summary>
                private SocketAsyncEventArgsPool m_sendPool;
                /// <summary>
                /// 超时,如果超时,服务端断开连接,客户端需要重连操作
                /// </summary>
                private int overtime;
                /// <summary>
                /// 超时检查间隔时间(秒)
                /// </summary>
                private int overtimecheck = 1;
                /// <summary>
                /// 能接到最多客户端个数的原子操作
                /// </summary>
                private Semaphore m_maxNumberAcceptedClients;
                /// <summary>
                /// 已经连接的对象池
                /// </summary>
                internal ConcurrentDictionary<int, ConnectClient> connectClient;
                /// <summary>
                /// 发送线程数
                /// </summary>
                private int sendthread = 10;
                /// <summary>
                /// 需要发送的数据
                /// </summary>
                private ConcurrentQueue<SendingQueue>[] sendQueues;      
                /// <summary>
                ////// </summary>
                private Mutex mutex = new Mutex();
                /// <summary>
                /// 连接成功事件
                /// </summary>
                internal event Action<int> OnAccept;
                /// <summary>
                /// 接收通知事件
                /// </summary>
                internal event Action<int, byte[], int, int> OnReceive;
                /// <summary>
                /// 已送通知事件
                /// </summary>
                internal event Action<int, int> OnSend;
                /// <summary>
                /// 断开连接通知事件
                /// </summary>
                internal event Action<int> OnClose;
        
                /// <summary>
                /// 设置基本配置
                /// </summary>   
                /// <param name="numConnections">同时处理的最大连接数</param>
                /// <param name="receiveBufferSize">用于每个套接字I/O操作的缓冲区大小(接收端)</param>
                /// <param name="overTime">超时时长,单位秒.(每10秒检查一次),当值为0时,不设置超时</param>
                internal TcpServer(int numConnections, int receiveBufferSize, int overTime)
                {
                    overtime = overTime;
                    m_numConnections = numConnections;
                    m_receiveBufferSize = receiveBufferSize;
                    m_bufferManager = new BufferManager(receiveBufferSize * m_numConnections, receiveBufferSize);
                    m_receivePool = new SocketAsyncEventArgsPool(m_numConnections);
                    m_sendPool = new SocketAsyncEventArgsPool(m_numConnections);
                    m_maxNumberAcceptedClients = new Semaphore(m_numConnections, m_numConnections);
                    Init();
                }
        
                /// <summary>
                /// 初始化服务器通过预先分配的可重复使用的缓冲区和上下文对象。这些对象不需要预先分配或重用,但这样做是为了说明API如何可以易于用于创建可重用对象以提高服务器性能。
                /// </summary>
                private void Init()
                {
                    connectClient = new ConcurrentDictionary<int, ConnectClient>();
                    sendQueues = new ConcurrentQueue<SendingQueue>[sendthread];
                    for (int i = 0; i < sendthread; i++)
                    {
                        sendQueues[i] = new ConcurrentQueue<SendingQueue>();
                    }
                    //分配一个大字节缓冲区,所有I/O操作都使用一个。这个侍卫对内存碎片
                    m_bufferManager.InitBuffer();
                    //预分配的接受对象池socketasynceventargs,并分配缓存
                    SocketAsyncEventArgs saea_receive;
                    //分配的发送对象池socketasynceventargs,但是不分配缓存
                    SocketAsyncEventArgs saea_send;
                    for (int i = 0; i < m_numConnections; i++)
                    {
                        //预先接受端分配一组可重用的消息
                        saea_receive = new SocketAsyncEventArgs();
                        saea_receive.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                        //分配缓冲池中的字节缓冲区的socketasynceventarg对象
                        m_bufferManager.SetBuffer(saea_receive);
                        m_receivePool.Push(saea_receive);
                        //预先发送端分配一组可重用的消息
                        saea_send = new SocketAsyncEventArgs();
                        saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                        m_sendPool.Push(saea_send);
                    }
                }
        
                /// <summary>
                /// 启动tcp服务侦听
                /// </summary>       
                /// <param name="port">监听端口</param>
                internal void Start(int port)
                {
                    IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Any, port);
                    //创建listens是传入的套接字。
                    listenSocket = new Socket(localEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                    listenSocket.NoDelay = true;
                    //绑定端口
                    listenSocket.Bind(localEndPoint);
                    //挂起的连接队列的最大长度。
                    listenSocket.Listen(1000);
                    //在监听套接字上接受
                    StartAccept(null);
                    //发送线程
                    for (int i = 0; i < sendthread; i++)
                    {
                        Thread thread = new Thread(StartSend);
                        thread.IsBackground = true;
                        thread.Priority = ThreadPriority.AboveNormal;
                        thread.Start(i);
                    }
                    //超时机制
                    if (overtime > 0)
                    {
                        Thread heartbeat = new Thread(new ThreadStart(() =>
                        {
                            Heartbeat();
                        }));
                        heartbeat.IsBackground = true;
                        heartbeat.Priority = ThreadPriority.Lowest;
                        heartbeat.Start();
                    }
                }
        
                /// <summary>
                /// 超时机制
                /// </summary>
                private void Heartbeat()
                {
                    //计算超时次数 ,超过count就当客户端断开连接。服务端清除该连接资源
                    int count = overtime / overtimecheck;
                    while (true)
                    {
                        foreach (var item in connectClient.Values)
                        {
                            if (item.keep_alive >= count)
                            {
                                item.keep_alive = 0;
                                CloseClientSocket(item.saea_receive);
                            }
                        }
                        foreach (var item in connectClient.Values)
                        {
                            item.keep_alive++;
                        }
                        Thread.Sleep(overtimecheck * 1000);
                    }
                }
        
                #region Accept
        
                /// <summary>
                /// 开始接受客户端的连接请求的操作。
                /// </summary>
                /// <param name="acceptEventArg">发布时要使用的上下文对象服务器侦听套接字上的接受操作</param>
                private void StartAccept(SocketAsyncEventArgs acceptEventArg)
                {
                    if (acceptEventArg == null)
                    {
                        acceptEventArg = new SocketAsyncEventArgs();
                        acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                    }
                    else
                    {
                        // 套接字必须被清除,因为上下文对象正在被重用。
                        acceptEventArg.AcceptSocket = null;
                    }
                    m_maxNumberAcceptedClients.WaitOne();
                    //准备一个客户端接入
                    if (!listenSocket.AcceptAsync(acceptEventArg))
                    {
                        ProcessAccept(acceptEventArg);
                    }
                }
        
                /// <summary>
                /// 当异步连接完成时调用此方法
                /// </summary>
                /// <param name="e">操作对象</param>
                private void ProcessAccept(SocketAsyncEventArgs e)
                {
                    connectId++;
                    //把连接到的客户端信息添加到集合中
                    ConnectClient connecttoken = new ConnectClient();
                    connecttoken.socket = e.AcceptSocket;
                    //从接受端重用池获取一个新的SocketAsyncEventArgs对象
                    connecttoken.saea_receive = m_receivePool.Pop();
                    connecttoken.saea_receive.UserToken = connectId;
                    connecttoken.saea_receive.AcceptSocket = e.AcceptSocket;
                    connectClient.TryAdd(connectId, connecttoken);
                    //一旦客户机连接,就准备接收。
                    if (!e.AcceptSocket.ReceiveAsync(connecttoken.saea_receive))
                    {
                        ProcessReceive(connecttoken.saea_receive);
                    }
                    //事件回调
                    if (OnAccept != null)
                    {
                        OnAccept(connectId);
                    }
                    //接受第二连接的请求
                    StartAccept(e);
                }
        
                #endregion
        
                #region 接受处理 receive
        
                /// <summary>
                /// 接受处理回调
                /// </summary>
                /// <param name="e">操作对象</param>
                private void ProcessReceive(SocketAsyncEventArgs e)
                {
                    //检查远程主机是否关闭连接
                    if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
                    {
                        int connectId = (int)e.UserToken;
                        ConnectClient client;
                        if (!connectClient.TryGetValue(connectId, out client))
                        {
                            return;
                        }
                        //如果接收到数据,超时记录设置为0
                        if (overtime > 0)
                        {
                            if (client != null)
                            {
                                client.keep_alive = 0;
                            }
                        }
                        //回调               
                        if (OnReceive != null)
                        {
                            if (client != null)
                            {
                                OnReceive(connectId, e.Buffer, e.Offset, e.BytesTransferred);
                            }
                        }
                        //准备下次接收数据      
                        try
                        {
                            if (!e.AcceptSocket.ReceiveAsync(e))
                            {
                                ProcessReceive(e);
                            }
                        }
                        catch (ObjectDisposedException ex)
                        {
                            if (OnClose != null)
                            {
                                OnClose(connectId);
                            }
                        }
                    }
                    else
                    {
                        CloseClientSocket(e);
                    }
                }
        
                #endregion
        
                #region 发送处理 send
        
                /// <summary>
                /// 开始启用发送
                /// </summary>
                private void StartSend(object thread)
                {
                    while (true)
                    {
                        SendingQueue sending;
                        if (sendQueues[(int)thread].TryDequeue(out sending))
                        {
                            Send(sending);
                        }
                        else
                        {
                            Thread.Sleep(100);
                        }
                    }
                }
        
                /// <summary>
                /// 异步发送消息 
                /// </summary>
                /// <param name="connectId">连接ID</param>
                /// <param name="data">数据</param>
                /// <param name="offset">偏移位</param>
                /// <param name="length">长度</param>
                internal void Send(int connectId, byte[] data, int offset, int length)
                {
                    sendQueues[connectId % sendthread].Enqueue(new SendingQueue() { connectId = connectId, data = data, offset = offset, length = length });
                }
        
                /// <summary>
                /// 异步发送消息 
                /// </summary>
                /// <param name="sendQuere">发送消息体</param>
                private void Send(SendingQueue sendQuere)
                {
                    ConnectClient client;
                    if (!connectClient.TryGetValue(sendQuere.connectId, out client))
                    {
                        return;
                    }
                    //如果发送池为空时,临时新建一个放入池中
                    mutex.WaitOne();
                    if (m_sendPool.Count == 0)
                    {
                        SocketAsyncEventArgs saea_send = new SocketAsyncEventArgs();
                        saea_send.Completed += new EventHandler<SocketAsyncEventArgs>(IO_Completed);
                        m_sendPool.Push(saea_send);
                    }
                    SocketAsyncEventArgs sendEventArgs = m_sendPool.Pop();
                    mutex.ReleaseMutex();
                    sendEventArgs.UserToken = sendQuere.connectId;
                    sendEventArgs.SetBuffer(sendQuere.data, sendQuere.offset, sendQuere.length);
                    try
                    {
                        if (!client.socket.SendAsync(sendEventArgs))
                        {
                            ProcessSend(sendEventArgs);
                        }
                    }
                    catch (ObjectDisposedException ex)
                    {
                        if (OnClose != null)
                        {
                            OnClose(sendQuere.connectId);
                        }
                    }
                    sendQuere = null;
                }
        
                /// <summary>
                /// 发送回调
                /// </summary>
                /// <param name="e">操作对象</param>
                private void ProcessSend(SocketAsyncEventArgs e)
                {
                    if (e.SocketError == SocketError.Success)
                    {
                        m_sendPool.Push(e);
                        if (OnSend != null)
                        {
                            OnSend((int)e.UserToken, e.BytesTransferred);
                        }
                    }
                    else
                    {
                        CloseClientSocket(e);
                    }
                }
        
                #endregion
        
                /// <summary>
                /// 每当套接字上完成接收或发送操作时,都会调用此方法。
                /// </summary>
                /// <param name="sender"></param>
                /// <param name="e">与完成的接收操作关联的SocketAsyncEventArg</param>
                private void IO_Completed(object sender, SocketAsyncEventArgs e)
                {
                    //确定刚刚完成哪种类型的操作并调用相关的处理程序
                    switch (e.LastOperation)
                    {
                        case SocketAsyncOperation.Receive:
                            ProcessReceive(e);
                            break;
                        case SocketAsyncOperation.Send:
                            ProcessSend(e);
                            break;
                        case SocketAsyncOperation.Accept:
                            ProcessAccept(e);
                            break;
                        default:
                            break;
                    }
                }
        
                #region 断开连接处理
        
        
                /// <summary>
                /// 客户端断开一个连接
                /// </summary>
                /// <param name="connectId">连接标记</param>
                internal void Close(int connectId)
                {
                    ConnectClient client;
                    if (!connectClient.TryGetValue(connectId, out client))
                    {
                        return;
                    }
                    CloseClientSocket(client.saea_receive);
                }
        
                /// <summary>
                /// 断开一个连接
                /// </summary>
                /// <param name="e">操作对象</param>
                private void CloseClientSocket(SocketAsyncEventArgs e)
                {
                    if (e.LastOperation == SocketAsyncOperation.Receive)
                    {
                        int connectId = (int)e.UserToken;
                        ConnectClient client;
                        if (!connectClient.TryGetValue(connectId, out client))
                        {
                            return;
                        }
                        if (client.socket.Connected == false)
                        {
                            return;
                        }
                        try
                        {
                            client.socket.Shutdown(SocketShutdown.Both);
                        }
                        // 抛出客户端进程已经关闭
                        catch (Exception) { }
                        client.socket.Close();
                        m_receivePool.Push(e);
                        m_maxNumberAcceptedClients.Release();
                        if (OnClose != null)
                        {
                            OnClose(connectId);
                        }
                        connectClient.TryRemove((int)e.UserToken, out client);
                        client = null;
                    }
                }
        
                #endregion
        
                #region 附加数据
        
                /// <summary>
                /// 给连接对象设置附加数据
                /// </summary>
                /// <param name="connectId">连接标识</param>
                /// <param name="data">附加数据</param>
                /// <returns>true:设置成功,false:设置失败</returns>
                internal bool SetAttached(int connectId, object data)
                {
                    ConnectClient client;
                    if (!connectClient.TryGetValue(connectId, out client))
                    {
                        return false;
                    }
                    client.attached = data;
                    return true;
                }
        
                /// <summary>
                /// 获取连接对象的附加数据
                /// </summary>
                /// <param name="connectId">连接标识</param>
                /// <returns>附加数据,如果没有找到则返回null</returns>
                internal T GetAttached<T>(int connectId)
                {
                    ConnectClient client;
                    if (!connectClient.TryGetValue(connectId, out client))
                    {
                        return default(T);
                    }
                    else
                    {
                        return (T)client.attached;
                    }
                }
                #endregion
            }
        
        }
        
        socket核心类

        1.初始化UDP实现类UdpServer/UdpClients

        服务端socket.core.Server.UdpServer
        客户端socket.core.Client.UdpClients
        参数int receiveBufferSize用于每个套接字I/O操作的缓冲区大小(接收端)

      • 2.发送数据

        服务端 server.Send(remoteEndPoint,data,offset,length)
        客户端 client.Send(data,offset,length)
        客户端 client.Send(remoteEndPoint,data,offset,length)

      • 3.触发已发送事件

        服务端 server.OnSend(remoteEndPoint,length)
        客户端 client.OnSend(length)

      • 3.触发接收事件

        服务端 server.OnReceive(remoteEndPoint,data,offset,length)
        客户端 client.OnReceive(data,offset,length)

  • 相关阅读:
    SignalR的三个Demo
    SignalR的一点点东西
    如何在appconfig中配置服务的ip
    IP分片丢失重传
    以太网之物理层
    以太网数据格式与封装解封
    OSI七层模型与TCP/IP五层模型
    边沿检测方法-FPGA入门教程
    如何用ModelsimSE仿真IP核-以PLL为例
    搭建Modelsim SE仿真环境-使用do文件仿真
  • 原文地址:https://www.cnblogs.com/sung/p/11871407.html
Copyright © 2011-2022 走看看