zoukankan      html  css  js  c++  java
  • C#实现异步阻塞TCP

    1.类

    (1)socket IO操作内存管理类 BufferManager

    // This class creates a single large buffer which can be divided up
    // and assigned to SocketAsyncEventArgs objects for use with each
    // socket I/O operation. 
    // This enables bufffers to be easily reused and guards against
    // fragmenting heap memory.
    //
    // The operations exposed on the BufferManager class are not thread safe.
    public class BufferManager
    {
        //buffer缓冲区大小
        private int m_numBytes;
        //缓冲区
        private byte[] m_buffer;
        private Stack<int> m_freeIndexPool;
        private int m_currentIndex;
        private int m_bufferSize;
     
        public BufferManager(int totalBytes, int bufferSize)
        {
            m_numBytes = totalBytes;
            m_currentIndex = 0;
            m_bufferSize = bufferSize;
            m_freeIndexPool = new Stack<int>();
        }
     
        /// <summary>
        /// 给buffer分配缓冲区
        /// </summary>
        public void InitBuffer()
        {
            m_buffer = new byte[m_numBytes];
        }
     
        /// <summary>
        ///  将buffer添加到args的IO缓冲区中,并设置offset
        /// </summary>
        /// <param name="args"></param>
        /// <returns></returns>
        public bool SetBuffer(SocketAsyncEventArgs args)
        {
            if (m_freeIndexPool.Count > 0)
            {
                args.SetBuffer(m_buffer, m_freeIndexPool.Pop(), m_bufferSize);
            }
            else
            {
                if ((m_numBytes - m_bufferSize) < m_currentIndex)
                {
                    return false;
                }
                args.SetBuffer(m_buffer, m_currentIndex, m_bufferSize);
                m_currentIndex += m_bufferSize;
            }
            return true;
        }
     
        /// <summary>
        /// 将buffer从args的IO缓冲区中释放
        /// </summary>
        /// <param name="args"></param>
        public void FreeBuffer(SocketAsyncEventArgs args)
        {
            m_freeIndexPool.Push(args.Offset);
            args.SetBuffer(null, 0, 0);
        }
     
        /// <summary>
        /// 释放全部buffer缓存
        /// </summary>
        public void FreeAllBuffer()
        {
            m_freeIndexPool.Clear();
            m_currentIndex = 0;
            m_buffer = null;
        }
    } 

    (2)SocketAsyncEventArgsPool

    // Represents a collection of reusable SocketAsyncEventArgs objects. 
    public class SocketAsyncEventArgsPool
    {
        private Stack<SocketAsyncEventArgs> m_pool;
     
        // Initializes the object pool to the specified size
        //
        // The "capacity" parameter is the maximum number of
        // SocketAsyncEventArgs objects the pool can hold
        public SocketAsyncEventArgsPool(int capacity)
        {
            m_pool = new Stack<SocketAsyncEventArgs>(capacity);
        }
     
        // Add a SocketAsyncEventArg instance to the pool
        //
        //The "item" parameter is the SocketAsyncEventArgs instance
        // to add to the pool
        public void Push(SocketAsyncEventArgs item)
        {
            if (item == null) { throw new ArgumentNullException("Items added to a SocketAsyncEventArgsPool cannot be null"); }
            lock (m_pool)
            {
                m_pool.Push(item);
            }
        }
     
        // Removes a SocketAsyncEventArgs instance from the pool
        // and returns the object removed from the pool
        public SocketAsyncEventArgs Pop()
        {
            lock (m_pool)
            {
                return m_pool.Pop();
            }
        }
     
        /// <summary>
        /// 清空栈中元素
        /// </summary>
        public void Clear()
        {
            lock (m_pool)
            {
                m_pool.Clear();
            }
        }
     
        // The number of SocketAsyncEventArgs instances in the pool
        public int Count
        {
            get { return m_pool.Count; }
        }
     
    }

    (3)AsyncUserToken

    using System;
    using System.Net;
    using System.Net.Sockets;
    
    namespace JCommon.Net
    {
        /// <summary>
        /// 存储客户端信息, 这个可以根据自己的实际情况来定义
        /// </summary>
    	public class AsyncUserToken
    	{
            private Socket socket = null;
            private IPEndPoint endPort = null;
            private DateTime connectTime = default(DateTime);       // 连接时间
    
            public Socket Socket 
            {
                get { return this.socket; }
                set { this.socket = value; }
            }
    
            public DateTime ConnectTime 
            {
                get { return this.connectTime; }
                set { this.connectTime = value; }
            }
    
            public IPEndPoint EndPort
            {
                get { return this.endPort; }
                set { this.endPort = value; }
            }
    	}
    }
    

    (4)服务器端操作类SocketServer

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    
    namespace JCommon.Net
    {
        public class SocketServer
        {
            public event EventHandler<SocketAsyncEventArgs> OnReceiveCompleted;     // 接收到数据
            public event EventHandler<SocketAsyncEventArgs> OnSendCompleted;        // 数据发送完成
            public event EventHandler<SocketAsyncEventArgs> OnAccept;               // 客户端连接通知
            public event EventHandler<SocketAsyncEventArgs> OnConnectionBreak;      // 客户端下线通知
    
            private readonly object m_lockHelper = new object();
            private bool m_isRunning = false;                                       // TCP服务器是否正在运行
            private int m_numConnections = 1;                                       // 同时处理的最大连接数
            private int m_bufferSize = 0;                                           // 用于每个Socket I/O 操作的缓冲区大小
            private BufferManager m_bufferManager = null;                           // 表示用于所有套接字操作的大量可重用的缓冲区
            private Socket listenSocket = null;                                     // 用于监听传入的连接请求的套接字
            private SocketAsyncEventArgsPool m_readWritePool = null;                // 可重用SocketAsyncEventArgs对象池,用于写入,读取和接受套接字操作
            private int m_totalBytesRead = 0;                                       // 服务器接收的总共#个字节的计数器
            private int m_numConnectedSockets = 0;                                  // 当前连接的tcp客户端数量
            private Semaphore m_maxNumberAcceptedClients = null;                    // 控制tcp客户端连接数量的信号量
            private List<SocketAsyncEventArgs> m_connectedPool = null;              // 用于socket发送数据的SocketAsyncEventArgs集合
            private string m_ip = "";
            private int m_port = 0;
    
            /// <summary>
            /// 创建服务端实例
            /// </summary>
            /// <param name="numConnections">允许连接到tcp服务器的tcp客户端数量</param>
            /// <param name="bufferSize">用于socket发送和接收的缓存区大小</param>
            public SocketServer(int numConnections, int bufferSize)
            {
                this.m_numConnections = numConnections;
                this.m_bufferSize = bufferSize;
                this.m_bufferManager = new BufferManager(bufferSize * numConnections, bufferSize);
                this.m_readWritePool = new SocketAsyncEventArgsPool(numConnections);
                this.m_connectedPool = new List<SocketAsyncEventArgs>(numConnections);
            }
    
            /// <summary>
            /// 启动服务器,侦听客户端连接请求
            /// </summary>
            /// <param name="localEndPoint"></param>
            public void Start(string ip, int port)
            {
                if (this.m_isRunning)
                    return;
    
                if (string.IsNullOrEmpty(ip))
                    throw new ArgumentNullException("ip cannot be null");
                if (port < 1 || port > 65535)
                    throw new ArgumentOutOfRangeException("port is out of range");
    
                this.m_ip = ip;
                this.m_port = port;
    
                try
                {
                    this.Init();
    
                    this.m_maxNumberAcceptedClients = new Semaphore(this.m_numConnections, this.m_numConnections);
    
                    // 创建 Socket 监听连接请求
                    this.listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                    IPAddress address = IPAddress.Parse(ip);
                    IPEndPoint endpoint = new IPEndPoint(address, port);
                    this.listenSocket.Bind(endpoint);
                    this.listenSocket.Listen(int.MaxValue);   // Listen 参数表示最多可容纳的等待接受的连接数
    
                    // 接受客户端连接请求
                    this.StartAccept(null);
    
                    this.m_isRunning = true;
                }
                catch (Exception ex)
                {
                    Trace.WriteLine(ex.Message);
                    throw ex;
                }
            }
    
            /// <summary>
            /// 分配 SocketAsyncEventArg 对象池
            /// </summary>
            private void Init()
            {
                this.m_totalBytesRead = 0;
                this.m_numConnectedSockets = 0;
    
                // 分配一个大字节缓冲区,所有 I/O 操作都使用该缓冲区。
                this.m_bufferManager.InitBuffer();
                SocketAsyncEventArgs socketAsyncEventArgs;
                for (int i = 0; i < this.m_numConnections; i++)
                {
                    // 分配可重用的 SocketAsyncEventArgs 对象
                    socketAsyncEventArgs = new SocketAsyncEventArgs();
                    socketAsyncEventArgs.Completed += this.IO_Completed;
                    socketAsyncEventArgs.UserToken = new AsyncUserToken();
    
                    // 将缓冲池中的字节缓冲区分配给 SocketAsyncEventArg 对象
                    this.m_bufferManager.SetBuffer(socketAsyncEventArgs);
    
                    // 放入对象池
                    this.m_readWritePool.Push(socketAsyncEventArgs);
                }
            }
    
            public void Stop()
            {
                if( this.m_isRunning )
                {
                    this.m_isRunning = false;
    
                    if (this.listenSocket == null)
                        return;
    
                    try
                    {
                        foreach (var e in this.m_connectedPool)
                        {
                            if (this.OnConnectionBreak != null)
                                this.OnConnectionBreak(this, e);
                            AsyncUserToken token = e.UserToken as AsyncUserToken;
                            token.Socket.Shutdown(SocketShutdown.Both);
                            token.Socket.Close();
                            token.Socket = null;
                            e.Dispose();
                        }
    
                        this.listenSocket.LingerState = new LingerOption(true, 0);
                        this.listenSocket.Close();
                        this.listenSocket = null;
                    }
                    catch (Exception ex)
                    {
                        Trace.WriteLine(ex.Message);
                        throw ex;
                    }
                    finally
                    {
                        this.m_connectedPool.Clear();
                        this.m_readWritePool.Clear();
                        this.m_bufferManager.FreeAllBuffer();
                        GC.Collect();
                    }
                }
            }
    
            public void Restart()
            {
                this.Stop();
                this.Start(this.m_ip, this.m_port);
            }
    
            /// <summary>
            /// 开始接受来自客户端的连接请求
            /// </summary>
            /// <param name="acceptEventArg"></param>
            private void StartAccept(SocketAsyncEventArgs acceptEventArg)
            {
                if (acceptEventArg == null)
                {
                    acceptEventArg = new SocketAsyncEventArgs();
                    acceptEventArg.Completed += this.AcceptEventArg_Completed;
                }
                else
                {
                    acceptEventArg.AcceptSocket = null;
                }
                this.m_maxNumberAcceptedClients.WaitOne();
                if (this.listenSocket == null)
                {
                    return;
                }
                if (!this.listenSocket.AcceptAsync(acceptEventArg))
                {
                    this.ProcessAccept(acceptEventArg);
                }
            }
    
            /// <summary>
            /// Socket.AcceptAsync完成回调函数
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void AcceptEventArg_Completed(object sender, SocketAsyncEventArgs e)
            {
                this.ProcessAccept(e);
            }
    
            /// <summary>
            /// 接受到tcp客户端连接,进行处理
            /// </summary>
            /// <param name="e"></param>
            private void ProcessAccept(SocketAsyncEventArgs e)
            {
                if (this.m_isRunning)
                {
                    if (e.SocketError == SocketError.Success)
                    {
                        Interlocked.Increment(ref this.m_numConnectedSockets);
    
                        if (this.OnAccept != null)
                            this.OnAccept(this, e);
    
                        // 获取接受的客户端连接的套接字
                        SocketAsyncEventArgs socketAsyncEventArgs = this.m_readWritePool.Pop();
                        AsyncUserToken userToken = socketAsyncEventArgs.UserToken as AsyncUserToken;
                        userToken.Socket = e.AcceptSocket;
                        userToken.ConnectTime = DateTime.Now;
                        userToken.EndPort = e.AcceptSocket.RemoteEndPoint as IPEndPoint;
    
                        lock(this.m_lockHelper)
                            this.m_connectedPool.Add(socketAsyncEventArgs);
    
                        // tcp服务器开始接受tcp客户端发送的数据
                        if (!e.AcceptSocket.ReceiveAsync(socketAsyncEventArgs))
                        {
                            this.ProcessReceive(socketAsyncEventArgs);
                        }
    
                        // 接受下一个连接请求
                        this.StartAccept(e);
                    }
                }
            }
    
            /// <summary>
            /// socket.sendAsync和socket.recvAsync的完成回调函数
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void IO_Completed(object sender, SocketAsyncEventArgs e)
            {
                switch (e.LastOperation)
                {
                    case SocketAsyncOperation.Receive:
                        ProcessReceive(e);
                        break;
                    case SocketAsyncOperation.Send:
                        ProcessSend(e);
                        break;
                    default:
                        throw new ArgumentException("The last operation completed on the socket was not a receive or send");
                }
            }
    
            /// <summary>
            /// 处理接受到的tcp客户端数据
            /// </summary>
            /// <param name="e"></param>
            private void ProcessReceive(SocketAsyncEventArgs e)
            {
                if (this.m_isRunning)
                {
                    AsyncUserToken asyncUserToken = (AsyncUserToken)e.UserToken;
                    if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
                    {
                        Interlocked.Add(ref this.m_totalBytesRead, e.BytesTransferred);
                        if (this.OnReceiveCompleted != null)
                            this.OnReceiveCompleted(this, e);
    
                        e.SetBuffer(e.Offset, this.m_bufferSize);
                        if (!asyncUserToken.Socket.ReceiveAsync(e))
                        {
                            this.ProcessReceive(e);
                            return;
                        }
                    }
                    else
                    {
                        this.CloseClientSocket(e);
                    }
                }
            }
    
            /// <summary>
            /// 处理tcp服务器发送的结果
            /// </summary>
            /// <param name="e"></param>
            private void ProcessSend(SocketAsyncEventArgs e)
            {
                if (this.m_isRunning)
                {
                    if (e.SocketError == SocketError.Success)
                    {
                        if (this.OnSendCompleted != null)
                            this.OnSendCompleted(this, e);
                    }
                    else
                    {
                        this.CloseClientSocket(e);
                    }
                }
            }
    
            /// <summary>
            /// 断开某一客户端
            /// </summary>
            /// <param name="e"></param>
            private void CloseClientSocket(SocketAsyncEventArgs e)
            {
                if (this.m_isRunning)
                {
                    if (this.OnConnectionBreak != null)
                        this.OnConnectionBreak(this, e);
    
                    AsyncUserToken token = e.UserToken as AsyncUserToken;
                    if (token != null && token.Socket != null)
                    {
                        try
                        {
                            token.Socket.Shutdown(SocketShutdown.Both);
                            token.Socket.Disconnect(false);
                            token.Socket.Close();
                            token.Socket = null;
                        }
                        catch (Exception ex)
                        {
                            Trace.WriteLine(ex.Message);
                        }
                        finally
                        {
                            Interlocked.Decrement(ref this.m_numConnectedSockets);
                            this.m_readWritePool.Push(e);
                            this.m_maxNumberAcceptedClients.Release();
    
                            lock (this.m_lockHelper)
                                this.m_connectedPool.Remove(e);
                        }
                    }
                }
            }
    
            /// <summary>
            /// 向指定客户端发送信息
            /// </summary>
            /// <param name="ip"></param>
            /// <param name="message"></param>
            public void Send(IPEndPoint endpoint, string message)
            {
                byte[] buff = Encoding.UTF8.GetBytes(message);
                if (buff.Length > this.m_bufferSize)
                    throw new ArgumentOutOfRangeException("message is out off range");
    
                SocketAsyncEventArgs argSend = this.m_connectedPool.Find((s) =>
                {
                    AsyncUserToken userToken = s.UserToken as AsyncUserToken;
                    return userToken.EndPort.ToString() == endpoint.ToString();
                });
    
                if (argSend != null)
                {
                    AsyncUserToken userToken1 = argSend.UserToken as AsyncUserToken;
                    SocketAsyncEventArgs sendArg = new SocketAsyncEventArgs();
                    sendArg.UserToken = userToken1;
                    sendArg.SetBuffer(buff, 0, buff.Length);
                    bool willRaiseEvent = userToken1.Socket.SendAsync(sendArg);
                    if (!willRaiseEvent)
                    {
                        ProcessSend(sendArg);
                    }
                }
            }
    
            /// <summary>
            /// 向已连接所有客户端发送
            /// </summary>
            /// <param name="message"></param>
            public void SendToAll(string message)
            {
                if (string.IsNullOrEmpty(message))
                    throw new ArgumentNullException("message cannot be null");
    
                foreach (var e in this.m_connectedPool)
                {
                    this.Send((e.UserToken as AsyncUserToken).EndPort, message);
                }
            }
        }
    }

    (5)客户端操作类 SocketClient

    using System;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    
    namespace JCommon.Net
    {
        public class SocketClient
    	{
            public event EventHandler<SocketAsyncEventArgs> OnReceived;
            public event EventHandler<SocketAsyncEventArgs> OnSendCompleted;
            public event EventHandler<SocketAsyncEventArgs> OnConnectCompleted;
            public event EventHandler<SocketAsyncEventArgs> OnConnectionClosed;
    
            private readonly object lockobj = new object();
            private int receiveBufferCapacity = 256;
            private int sendCapacity = 1024;
            private byte[] recvBuff = null;                     // 接收缓存数组
            private byte[] sendBuff = null;                     // 发送缓存数组
            private Socket socket = null;                       // 连接socket
            private SocketAsyncEventArgs sendEventArg = null;   // 用于发送数据的SocketAsyncEventArgs
            private SocketAsyncEventArgs recvEventArg = null;   // 用于接收数据的SocketAsyncEventArgs
            private string ip = "";
            private int port = 0;
            private bool bRestart = false;                      // 是否开启断开重连
    
            public SocketClient(int receiveCapacity = 1024, int sendCapacity = 256)
    		{
                this.receiveBufferCapacity = receiveCapacity;
                this.sendCapacity = sendCapacity;
    
                // 设置用于发送数据的SocketAsyncEventArgs
                sendBuff = new byte[sendCapacity];
                sendEventArg = new SocketAsyncEventArgs();
                sendEventArg.SetBuffer(sendBuff, 0, sendCapacity);
                sendEventArg.Completed += this.IO_Completed;
                sendEventArg.UserToken = new AsyncUserToken { Socket = this.socket };
    
                // 设置用于接受数据的SocketAsyncEventArgs
                recvBuff = new byte[receiveBufferCapacity];
                recvEventArg = new SocketAsyncEventArgs();
                recvEventArg.SetBuffer(recvBuff, 0, receiveBufferCapacity);
                recvEventArg.Completed += this.IO_Completed;
                recvEventArg.UserToken = new AsyncUserToken { Socket = this.socket };
            }
    
            /// <summary>
            /// 断开重连
            /// </summary>
            public bool BRestart 
            {
                set { this.bRestart = value; }
                get { return this.bRestart; }
            }
            
            public bool Connected
            {
                get
                {
                    return this.socket != null && this.socket.Connected;
                }
            }
    
            public void Start(string ip, int port)
    		{
                if (this.Connected)
                    return;
    
                if (string.IsNullOrEmpty(ip))
                    throw new ArgumentNullException("ip cannot be null");
                if (port < 1 || port > 65535)
                    throw new ArgumentOutOfRangeException("port is out of range");
    
                this.ip = ip;
                this.port = port;
    
                try
                {
                    this.socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
                    IPAddress address = IPAddress.Parse(ip);
                    IPEndPoint endpoint = new IPEndPoint(address, port);
    
                    // 连接tcp服务器
                    SocketAsyncEventArgs connectEventArg = new SocketAsyncEventArgs();
                    connectEventArg.Completed += ConnectEventArgs_Completed;
    
                    // 设置要连接的tcp服务器地址
                    connectEventArg.RemoteEndPoint = endpoint;
                    if (!socket.ConnectAsync(connectEventArg))
                        this.ProcessConnect(connectEventArg);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
    		}
    
            /// <summary>
            /// 关闭tcp客户端
            /// </summary>
            public void Stop()
            {
                lock (this.lockobj)
                {
                    if (!this.Connected)
                        return;
    
                    try
                    {
                        // 应该在关闭socket之前先使用shutdown进行接受或者发送的禁用。
                        this.socket.Shutdown(SocketShutdown.Both);
                        this.socket.Close();
                        this.socket = null;
                        GC.Collect();
    
                        if (this.OnConnectionClosed != null)
                            this.OnConnectionClosed(this, null);
                    }
                    catch (Exception ex)
                    {
                        throw ex;
                    }
                }
            }
    
            /// <summary>
            /// 重启tcp客户端,重新连接tcp服务器
            /// </summary>
            public void Restart()
            {
                lock (this.lockobj)
                {
                    this.Stop();
                    this.Start(ip, port);
                }
            }
    
            /// <summary>
            /// 异步发送
            /// </summary>
            /// <param name="message">要发送的数据</param>
            public void Send(string message)
            {
                if (string.IsNullOrEmpty(message))
                    throw new ArgumentNullException("message cannot be null");
    
                if (!this.Connected)
                    return;
    
                byte[] buff = Encoding.UTF8.GetBytes(message);
                if (buff.Length > this.sendCapacity)
                    throw new ArgumentNullException("message is out of range");
                Array.Copy(buff, this.sendBuff, buff.Length);   // or buff.CopyTo(this.sendBuff, 0);
                this.sendEventArg.SetBuffer(0, buff.Length); 
                if (!socket.SendAsync(sendEventArg))
                {
                    this.ProcessSend(sendEventArg);
                }
            }
    
            /// <summary>
            /// 大包分包发送
            /// </summary>
            /// <param name="sendBuff"></param>
            public void SendEx(string bigmessage)
            {
                const int SPLITSIZE = 256;
                byte[] sendBuff = Encoding.UTF8.GetBytes(bigmessage);
    
                for (int index = 0; index < sendBuff.Length; index += SPLITSIZE)
                {
                    if (index + SPLITSIZE <= sendBuff.Length)
                    {
                        byte[] by = new byte[SPLITSIZE];
                        Array.Copy(sendBuff, index, by, 0, SPLITSIZE);
                        this.socket.Send(by);
                        Array.Clear(sendBuff, index, SPLITSIZE);
                    }
                    else
                    {
                        byte[] by = new byte[SPLITSIZE];
                        Array.Copy(sendBuff, index, by, 0, sendBuff.Length - index);
                        this.socket.Send(by);
                        Array.Clear(sendBuff, index, sendBuff.Length - index);
                    }
                }
            }
    
            /// <summary>
            /// Socket.ConnectAsync完成回调函数
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void ConnectEventArgs_Completed(object sender, SocketAsyncEventArgs e)
            {
                this.ProcessConnect(e);
            }
    
            private void ProcessConnect(SocketAsyncEventArgs e)
            {
                if (e.SocketError == SocketError.Success)
                {
                    if (this.OnConnectCompleted != null)
                    {
                        this.OnConnectCompleted(this, e);
                    }
    
                    this.recvEventArg.SetBuffer(0, this.receiveBufferCapacity);
                    if (!this.socket.ReceiveAsync(this.recvEventArg))
                    {
                        this.ProcessReceive(this.recvEventArg);
                        return;
                    }
                }
                else if (e.SocketError == SocketError.ConnectionRefused || e.SocketError == SocketError.HostUnreachable || e.SocketError == SocketError.TimedOut)
                {
                    if (this.bRestart)
                    {
                        Thread.Sleep(500);
                        this.Restart();
                    }
                    else
                    {
                        this.Stop();
                    }
                }
            }
    
            /// <summary>
            /// socket.sendAsync和socket.recvAsync的完成回调函数
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void IO_Completed(object sender, SocketAsyncEventArgs e)
            {
                switch (e.LastOperation)
                {
                    case SocketAsyncOperation.Receive:
                        this.ProcessReceive(e);
                        break;
                    case SocketAsyncOperation.Send:
                        this.ProcessSend(e);
                        break;
                    default:
                        throw new ArgumentException("The last operation completed on the socket was not a receive or send");
                }
            }
    
            /// <summary>
            /// 处理接收到的数据
            /// </summary>
            /// <param name="e"></param>
    		private void ProcessReceive(SocketAsyncEventArgs e)
    		{
    			if (e.BytesTransferred > 0 && e.SocketError == SocketError.Success)
    			{
    				if (this.OnReceived != null)
    				{
    					this.OnReceived(this, e);
    				}
    
                    e.SetBuffer(e.Offset, this.receiveBufferCapacity);
                    if (!socket.ReceiveAsync(e))
                    {
                        this.ProcessReceive(e);
                    }
    			}
    			else
    			{
                    if (this.bRestart)
                    {
                        Thread.Sleep(500);
                        this.Restart();
                    }
                    else
                    {
                        this.Stop();
                    }
    			}
    		}
    
            /// <summary>
            /// 处理tcp客户端发送的结果
            /// </summary>
            /// <param name="e"></param>
            private void ProcessSend(SocketAsyncEventArgs e)
            {
                if (e.SocketError == SocketError.Success)
                {
                    if (this.OnSendCompleted != null)
                    {
                        this.OnSendCompleted(this, e);
                        return;
                    }
                }
                else
                {
                    if (this.bRestart)
                    {
                        Thread.Sleep(500);
                        this.Restart();
                    }
                    else
                    {
                        this.Stop();
                    }
                }
            }
        }
    }

    2.使用

    (1)服务器端

    using JCommon.Net;
    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Diagnostics;
    using System.Drawing;
    using System.Linq;
    using System.Net;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    
    namespace JCommon.Test
    {
        delegate void DelMsgShow(string str);
    
        public partial class FrmTcpServer : Form
        {
            SocketServer server;
            Socket sk;
    		private string recvBuff = string.Empty;
            DelMsgShow msgShow;
    
            void MsgShow(string str)
            {
                if(richTextBox1.InvokeRequired)
                {
                    richTextBox1.Invoke(new Action<string>(MsgShow), new object[] { str });
                }
                else
                {
                    richTextBox1.AppendText(str);
                    richTextBox1.AppendText("
    ");
                    richTextBox1.ScrollToCaret();
                }
            }
    
            public FrmTcpServer()
            {
                InitializeComponent();
    
            }
    
            Semaphore ss = new Semaphore(1, 1);
            /// <summary>
            /// 开始监听
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void button1_Click(object sender, EventArgs e)
            {
                string str = button1.Text;
                if (str == "开始监听")
                {
                    ServerInit();
    
                    int port = textBox1.Text.ToInt();
                    IPEndPoint localEndPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), port);
    
                   // server.Start(localEndPoint);
                    server.Start("127.0.0.1", port);
                    button1.Text = "停止监听";
                }
    
                else
                {
                    server.Stop();
                    button1.Text = "开始监听";
                }
            }
    
            /// <summary>
            /// 发送
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
    
            private void button2_Click(object sender, EventArgs e)
            {
                string strEndPort = comboBox1.Text;
                string str = richTextBox2.Text;
                System.Net.IPAddress IPadr = System.Net.IPAddress.Parse(strEndPort.Split(':')[0]);
                IPEndPoint EndPort = new System.Net.IPEndPoint(IPadr, int.Parse(strEndPort.Split(':')[1]));
                server.Send(EndPort, str);
            }
    
            private void ServerInit()
            {
                if (this.server == null)
                {
                    this.server = new SocketServer(2, 1024);
    
                    this.server.OnAccept += this.server_OnAccept;
                    this.server.OnConnectionBreak += this.server_OnConnectionBreak;
                    this.server.OnReceiveCompleted += this.server_OnReceiveCompleted;
                    this.server.OnSendCompleted += this.server_OnSendCompleted;
                }
            }
    
    
            private void server_OnSendCompleted(object sender, SocketAsyncEventArgs e)
            {
                string @string = Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred);
                Console.WriteLine(this.Name + "发送:" + @string);
                msgShow("发送:" + @string);
            }
    
            private void server_OnReceiveCompleted(object sender, SocketAsyncEventArgs e)
            {
                string @string = Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred);
                Console.WriteLine(this.Name + "收到:" + @string);
                this.recvBuff += @string.ToUpper();
                msgShow("收到:" + @string);
            }
    
            private void server_OnConnectionBreak(object sender, SocketAsyncEventArgs e)
            {
                msgShow("连接被断开!");
    
                JCommon.Net.AsyncUserToken userToken = e.UserToken as JCommon.Net.AsyncUserToken;
                string str = userToken.EndPort.ToString();
                this.comboBox1.InvokeControlAction((s) => s.Items.Remove(str));
            }
    
            private void server_OnAccept(object sender, SocketAsyncEventArgs e)
            {
                msgShow("有客户端接入!");
    
                string ip = e.AcceptSocket.RemoteEndPoint.ToString();
                this.comboBox1.InvokeControlAction((s) => s.Items.Add(ip));
            }
    
            private void FrmTcpServer_Load(object sender, EventArgs e)
            {
                textBox1.Text = "20000";
    
                msgShow = this.MsgShow;
    
            }
    
            private void button3_Click(object sender, EventArgs e)
            {
                string str = richTextBox2.Text;
                server.SendToAll(str);
            }
        }
    }
    

    (2)客户端

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Diagnostics;
    using System.Drawing;
    using System.Linq;
    using System.Net.Sockets;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Windows.Forms;
    using JCommon.Net;
    
    namespace JCommon.Test
    {
        public partial class FrmTcpClient : Form
        {
            DelMsgShow msgShow;
    
            void MsgShow(string str)
            {
                if (richTextBox1.InvokeRequired)
                {
                    richTextBox1.Invoke(new Action<string>(MsgShow), new object[] { str });
                }
                else
                {
                    richTextBox1.AppendText(str);
                    //richTextBox1.AppendText("
    ");
                    richTextBox1.ScrollToCaret();
                }
            }
    
            SocketClient client;
    
            private string recvBuff = string.Empty;
            private AutoResetEvent eventConnected = new AutoResetEvent(false);
    
            public FrmTcpClient()
            {
                InitializeComponent();
            }
    
            /// <summary>
            /// 连接
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void button1_Click(object sender, EventArgs e)
            {
                button1.Enabled = false;
                InitClient();
            }
    
            /// <summary>
            /// 发送
            /// </summary>
            /// <param name="sender"></param>
            /// <param name="e"></param>
            private void button2_Click(object sender, EventArgs e)
            {
                string str = richTextBox2.Text;
                if(client.Connected)
                    client.SendEx(str);
            }
    
            string ip;
            int port;
            private void InitClient()
            {
                if (this.client == null)
                {
                    this.client = new SocketClient(1024);
    
                    this.client.OnConnectCompleted += this.client_OnConnectCompleted;
                    this.client.OnConnectionClosed += this.client_OnConnectionClosed;
                    this.client.OnSendCompleted += this.client_OnSendCompleted;
                    this.client.OnReceived += this.client_OnReceived;
    
                    this.ip = textBox1.Text;
                    this.port = textBox2.Text.ToInt();
                }
                this.client.Start(this.ip, this.port);
                this.client.BRestart = true;
            }
    
            private void client_OnReceived(object sender, SocketAsyncEventArgs e)
            {
                string @string = Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred);
                Console.WriteLine(this.Name + "收到:" + @string);
                this.recvBuff += @string.ToUpper();
                msgShow(@string);
    
                Thread.Sleep(5000);
                //Thread.Sleep(50000);
            }
    
            private void client_OnSendCompleted(object sender, SocketAsyncEventArgs e)
            {
                string @string = Encoding.UTF8.GetString(e.Buffer, e.Offset, e.BytesTransferred);
                Console.WriteLine(this.Name + "发送:" + @string);
                msgShow("发送:" + @string);
            }
    
            private void client_OnConnectionClosed(object sender, SocketAsyncEventArgs e)
            {
                msgShow(this.Name + "连接断开");
                button1.InvokeControlAction((s) => s.Enabled = true);
            }
    
            private void client_OnConnectCompleted(object sender, SocketAsyncEventArgs e)
            {
                AsyncUserToken asyncUserToken = e.UserToken as AsyncUserToken;
    
                msgShow(this.Name + "客户端接入");
                button3.InvokeControlAction((s) => s.Enabled = true);
                button1.InvokeControlAction((s) => s.Enabled = false);
            }
    
            private void FrmTcpClient_Load(object sender, EventArgs e)
            {
                msgShow = this.MsgShow;
    
                textBox1.Text = "127.0.0.1";
                textBox2.Text = "20000";
            }
    
            private void button3_Click(object sender, EventArgs e)
            {
                if(client != null && client.Connected)
                {
                    button3.Enabled = false;
                    this.client.Stop();
                }
            }
    
            private void button4_Click(object sender, EventArgs e)
            {
                string str = richTextBox2.Text;
                if (client.Connected)
                    client.Send(str);
            }
        }
    }

    3.演示

    参考:

    https://docs.microsoft.com/zh-cn/dotnet/api/system.net.sockets.socketasynceventargs?view=netframework-4.8

  • 相关阅读:
    SpringBoot自定义过滤器的两种方式及过滤器执行顺序
    如何用上新版本的 IDEA(IDEA 2019.2.2版本)
    IDEA乱码Tomcat控制台乱码输出乱码报文乱码
    单例模式的几种实现方式及对比
    Java中synchronized关键字你知道多少
    [转载]Vertica “ERROR: Too many ROS containers exist”
    FTP Client
    统计数据库表大小
    常用JS代码整理
    WinForm 控件不闪烁
  • 原文地址:https://www.cnblogs.com/jshchg/p/12934234.html
Copyright © 2011-2022 走看看