zoukankan      html  css  js  c++  java
  • C# 异步 TCP 服务器完整实现

    TCP异步Socket模型

    C#的TCP异步Socket模型是通过Begin-End模式实现的。例如提供 BeginConnect、BeginAccept、BeginSend 和 BeginReceive等。

    IAsyncResult BeginAccept(AsyncCallback callback, object state);

    AsyncCallback 回调在函数执行完毕后执行。state 对象被用于在执行函数和回调函数间传输信息。

    Socket socket = new Socket(
                      AddressFamily.InterNetwork, 
                      SocketType.Stream, 
                      ProtocolType.Tcp);
    IPEndPoint iep = new IPEndPoint(IPAddress.Any, 8888);
    socket.Bind(iep);
    socket.Listen(5);
    socket.BeginAccept (new AsyncCallback(CallbackAccept), socket);
    
    private void CallbackAccept(IAsyncResult iar)
    {
      Socket server = (Socket)iar.AsyncState;
      Socket client = server.EndAccept(iar);
    }

    则在Accept一个TcpClient,需要维护TcpClient列表。

    private List<TcpClientState> clients;

    异步TCP服务器完整实现

      1   /// <summary>
      2   /// 异步TCP服务器
      3   /// </summary>
      4   public class AsyncTcpServer : IDisposable
      5   {
      6     #region Fields
      7 
      8     private TcpListener listener;
      9     private List<TcpClientState> clients;
     10     private bool disposed = false;
     11 
     12     #endregion
     13 
     14     #region Ctors
     15 
     16     /// <summary>
     17     /// 异步TCP服务器
     18     /// </summary>
     19     /// <param name="listenPort">监听的端口</param>
     20     public AsyncTcpServer(int listenPort)
     21       : this(IPAddress.Any, listenPort)
     22     {
     23     }
     24 
     25     /// <summary>
     26     /// 异步TCP服务器
     27     /// </summary>
     28     /// <param name="localEP">监听的终结点</param>
     29     public AsyncTcpServer(IPEndPoint localEP)
     30       : this(localEP.Address, localEP.Port)
     31     {
     32     }
     33 
     34     /// <summary>
     35     /// 异步TCP服务器
     36     /// </summary>
     37     /// <param name="localIPAddress">监听的IP地址</param>
     38     /// <param name="listenPort">监听的端口</param>
     39     public AsyncTcpServer(IPAddress localIPAddress, int listenPort)
     40     {
     41       Address = localIPAddress;
     42       Port = listenPort;
     43       this.Encoding = Encoding.Default;
     44 
     45       clients = new List<TcpClientState>();
     46 
     47       listener = new TcpListener(Address, Port);
     48       listener.AllowNatTraversal(true);
     49     }
     50 
     51     #endregion
     52 
     53     #region Properties
     54 
     55     /// <summary>
     56     /// 服务器是否正在运行
     57     /// </summary>
     58     public bool IsRunning { get; private set; }
     59     /// <summary>
     60     /// 监听的IP地址
     61     /// </summary>
     62     public IPAddress Address { get; private set; }
     63     /// <summary>
     64     /// 监听的端口
     65     /// </summary>
     66     public int Port { get; private set; }
     67     /// <summary>
     68     /// 通信使用的编码
     69     /// </summary>
     70     public Encoding Encoding { get; set; }
     71 
     72     #endregion
     73 
     74     #region Server
     75 
     76     /// <summary>
     77     /// 启动服务器
     78     /// </summary>
     79     /// <returns>异步TCP服务器</returns>
     80     public AsyncTcpServer Start()
     81     {
     82       if (!IsRunning)
     83       {
     84         IsRunning = true;
     85         listener.Start();
     86         listener.BeginAcceptTcpClient(
     87           new AsyncCallback(HandleTcpClientAccepted), listener);
     88       }
     89       return this;
     90     }
     91 
     92     /// <summary>
     93     /// 启动服务器
     94     /// </summary>
     95     /// <param name="backlog">
     96     /// 服务器所允许的挂起连接序列的最大长度
     97     /// </param>
     98     /// <returns>异步TCP服务器</returns>
     99     public AsyncTcpServer Start(int backlog)
    100     {
    101       if (!IsRunning)
    102       {
    103         IsRunning = true;
    104         listener.Start(backlog);
    105         listener.BeginAcceptTcpClient(
    106           new AsyncCallback(HandleTcpClientAccepted), listener);
    107       }
    108       return this;
    109     }
    110 
    111     /// <summary>
    112     /// 停止服务器
    113     /// </summary>
    114     /// <returns>异步TCP服务器</returns>
    115     public AsyncTcpServer Stop()
    116     {
    117       if (IsRunning)
    118       {
    119         IsRunning = false;
    120         listener.Stop();
    121 
    122         lock (this.clients)
    123         {
    124           for (int i = 0; i < this.clients.Count; i++)
    125           {
    126             this.clients[i].TcpClient.Client.Disconnect(false);
    127           }
    128           this.clients.Clear();
    129         }
    130 
    131       }
    132       return this;
    133     }
    134 
    135     #endregion
    136 
    137     #region Receive
    138 
    139     private void HandleTcpClientAccepted(IAsyncResult ar)
    140     {
    141       if (IsRunning)
    142       {
    143         TcpListener tcpListener = (TcpListener)ar.AsyncState;
    144 
    145         TcpClient tcpClient = tcpListener.EndAcceptTcpClient(ar);
    146         byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
    147 
    148         TcpClientState internalClient 
    149           = new TcpClientState(tcpClient, buffer);
    150         lock (this.clients)
    151         {
    152           this.clients.Add(internalClient);
    153           RaiseClientConnected(tcpClient);
    154         }
    155 
    156         NetworkStream networkStream = internalClient.NetworkStream;
    157         networkStream.BeginRead(
    158           internalClient.Buffer, 
    159           0, 
    160           internalClient.Buffer.Length, 
    161           HandleDatagramReceived, 
    162           internalClient);
    163 
    164         tcpListener.BeginAcceptTcpClient(
    165           new AsyncCallback(HandleTcpClientAccepted), ar.AsyncState);
    166       }
    167     }
    168 
    169     private void HandleDatagramReceived(IAsyncResult ar)
    170     {
    171       if (IsRunning)
    172       {
    173         TcpClientState internalClient = (TcpClientState)ar.AsyncState;
    174         NetworkStream networkStream = internalClient.NetworkStream;
    175 
    176         int numberOfReadBytes = 0;
    177         try
    178         {
    179           numberOfReadBytes = networkStream.EndRead(ar);
    180         }
    181         catch
    182         {
    183           numberOfReadBytes = 0;
    184         }
    185 
    186         if (numberOfReadBytes == 0)
    187         {
    188           // connection has been closed
    189           lock (this.clients)
    190           {
    191             this.clients.Remove(internalClient);
    192             RaiseClientDisconnected(internalClient.TcpClient);
    193             return;
    194           }
    195         }
    196 
    197         // received byte and trigger event notification
    198         byte[] receivedBytes = new byte[numberOfReadBytes];
    199         Buffer.BlockCopy(
    200           internalClient.Buffer, 0, 
    201           receivedBytes, 0, numberOfReadBytes);
    202         RaiseDatagramReceived(internalClient.TcpClient, receivedBytes);
    203         RaisePlaintextReceived(internalClient.TcpClient, receivedBytes);
    204 
    205         // continue listening for tcp datagram packets
    206         networkStream.BeginRead(
    207           internalClient.Buffer, 
    208           0, 
    209           internalClient.Buffer.Length, 
    210           HandleDatagramReceived, 
    211           internalClient);
    212       }
    213     }
    214 
    215     #endregion
    216 
    217     #region Events
    218 
    219     /// <summary>
    220     /// 接收到数据报文事件
    221     /// </summary>
    222     public event EventHandler<TcpDatagramReceivedEventArgs<byte[]>> DatagramReceived;
    223     /// <summary>
    224     /// 接收到数据报文明文事件
    225     /// </summary>
    226     public event EventHandler<TcpDatagramReceivedEventArgs<string>> PlaintextReceived;
    227 
    228     private void RaiseDatagramReceived(TcpClient sender, byte[] datagram)
    229     {
    230       if (DatagramReceived != null)
    231       {
    232         DatagramReceived(this, new TcpDatagramReceivedEventArgs<byte[]>(sender, datagram));
    233       }
    234     }
    235 
    236     private void RaisePlaintextReceived(TcpClient sender, byte[] datagram)
    237     {
    238       if (PlaintextReceived != null)
    239       {
    240         PlaintextReceived(this, new TcpDatagramReceivedEventArgs<string>(
    241           sender, this.Encoding.GetString(datagram, 0, datagram.Length)));
    242       }
    243     }
    244 
    245     /// <summary>
    246     /// 与客户端的连接已建立事件
    247     /// </summary>
    248     public event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
    249     /// <summary>
    250     /// 与客户端的连接已断开事件
    251     /// </summary>
    252     public event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
    253 
    254     private void RaiseClientConnected(TcpClient tcpClient)
    255     {
    256       if (ClientConnected != null)
    257       {
    258         ClientConnected(this, new TcpClientConnectedEventArgs(tcpClient));
    259       }
    260     }
    261 
    262     private void RaiseClientDisconnected(TcpClient tcpClient)
    263     {
    264       if (ClientDisconnected != null)
    265       {
    266         ClientDisconnected(this, new TcpClientDisconnectedEventArgs(tcpClient));
    267       }
    268     }
    269 
    270     #endregion
    271 
    272     #region Send
    273 
    274     /// <summary>
    275     /// 发送报文至指定的客户端
    276     /// </summary>
    277     /// <param name="tcpClient">客户端</param>
    278     /// <param name="datagram">报文</param>
    279     public void Send(TcpClient tcpClient, byte[] datagram)
    280     {
    281       if (!IsRunning)
    282         throw new InvalidProgramException("This TCP server has not been started.");
    283 
    284       if (tcpClient == null)
    285         throw new ArgumentNullException("tcpClient");
    286 
    287       if (datagram == null)
    288         throw new ArgumentNullException("datagram");
    289 
    290       tcpClient.GetStream().BeginWrite(
    291         datagram, 0, datagram.Length, HandleDatagramWritten, tcpClient);
    292     }
    293 
    294     private void HandleDatagramWritten(IAsyncResult ar)
    295     {
    296       ((TcpClient)ar.AsyncState).GetStream().EndWrite(ar);
    297     }
    298 
    299     /// <summary>
    300     /// 发送报文至指定的客户端
    301     /// </summary>
    302     /// <param name="tcpClient">客户端</param>
    303     /// <param name="datagram">报文</param>
    304     public void Send(TcpClient tcpClient, string datagram)
    305     {
    306       Send(tcpClient, this.Encoding.GetBytes(datagram));
    307     }
    308 
    309     /// <summary>
    310     /// 发送报文至所有客户端
    311     /// </summary>
    312     /// <param name="datagram">报文</param>
    313     public void SendAll(byte[] datagram)
    314     {
    315       if (!IsRunning)
    316         throw new InvalidProgramException("This TCP server has not been started.");
    317 
    318       for (int i = 0; i < this.clients.Count; i++)
    319       {
    320         Send(this.clients[i].TcpClient, datagram);
    321       }
    322     }
    323 
    324     /// <summary>
    325     /// 发送报文至所有客户端
    326     /// </summary>
    327     /// <param name="datagram">报文</param>
    328     public void SendAll(string datagram)
    329     {
    330       if (!IsRunning)
    331         throw new InvalidProgramException("This TCP server has not been started.");
    332 
    333       SendAll(this.Encoding.GetBytes(datagram));
    334     }
    335 
    336     #endregion
    337 
    338     #region IDisposable Members
    339 
    340     /// <summary>
    341     /// Performs application-defined tasks associated with freeing, 
    342     /// releasing, or resetting unmanaged resources.
    343     /// </summary>
    344     public void Dispose()
    345     {
    346       Dispose(true);
    347       GC.SuppressFinalize(this);
    348     }
    349 
    350     /// <summary>
    351     /// Releases unmanaged and - optionally - managed resources
    352     /// </summary>
    353     /// <param name="disposing"><c>true</c> to release 
    354     /// both managed and unmanaged resources; <c>false</c> 
    355     /// to release only unmanaged resources.</param>
    356     protected virtual void Dispose(bool disposing)
    357     {
    358       if (!this.disposed)
    359       {
    360         if (disposing)
    361         {
    362           try
    363           {
    364             Stop();
    365 
    366             if (listener != null)
    367             {
    368               listener = null;
    369             }
    370           }
    371           catch (SocketException ex)
    372           {
    373             ExceptionHandler.Handle(ex);
    374           }
    375         }
    376 
    377         disposed = true;
    378       }
    379     }
    380 
    381     #endregion
    382   }

    使用举例

     1   class Program
     2   {
     3     static AsyncTcpServer server;
     4 
     5     static void Main(string[] args)
     6     {
     7       LogFactory.Assign(new ConsoleLogFactory());
     8 
     9       server = new AsyncTcpServer(9999);
    10       server.Encoding = Encoding.UTF8;
    11       server.ClientConnected += 
    12         new EventHandler<TcpClientConnectedEventArgs>(server_ClientConnected);
    13       server.ClientDisconnected += 
    14         new EventHandler<TcpClientDisconnectedEventArgs>(server_ClientDisconnected);
    15       server.PlaintextReceived += 
    16         new EventHandler<TcpDatagramReceivedEventArgs<string>>(server_PlaintextReceived);
    17       server.Start();
    18 
    19       Console.WriteLine("TCP server has been started.");
    20       Console.WriteLine("Type something to send to client...");
    21       while (true)
    22       {
    23         string text = Console.ReadLine();
    24         server.SendAll(text);
    25       }
    26     }
    27 
    28     static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
    29     {
    30       Logger.Debug(string.Format(CultureInfo.InvariantCulture, 
    31         "TCP client {0} has connected.", 
    32         e.TcpClient.Client.RemoteEndPoint.ToString()));
    33     }
    34 
    35     static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
    36     {
    37       Logger.Debug(string.Format(CultureInfo.InvariantCulture, 
    38         "TCP client {0} has disconnected.", 
    39         e.TcpClient.Client.RemoteEndPoint.ToString()));
    40     }
    41 
    42     static void server_PlaintextReceived(object sender, TcpDatagramReceivedEventArgs<string> e)
    43     {
    44       if (e.Datagram != "Received")
    45       {
    46         Console.Write(string.Format("Client : {0} --> ", 
    47           e.TcpClient.Client.RemoteEndPoint.ToString()));
    48         Console.WriteLine(string.Format("{0}", e.Datagram));
    49         server.Send(e.TcpClient, "Server has received you text : " + e.Datagram);
    50       }
    51     }
    52   }

    本文为 Dennis Gao 原创技术文章,发表于博客园博客,未经作者本人允许禁止任何形式的转载。

  • 相关阅读:
    返回一个整数数组中最大子数组的和(数组头尾连接)
    场景调研(补)
    《浪潮之巅》读书笔记3
    《软件工程》课程改进意见
    【每日scrum】第一次冲刺day6
    【每日scrum】第一次冲刺day5
    【每日scrum】第一次冲刺day4
    【每日scrum】第一次冲刺day3
    【每日scrum】第一次冲刺day2
    【每日scrum】第一次冲刺day1
  • 原文地址:https://www.cnblogs.com/gaochundong/p/csharp_async_tcp_server.html
Copyright © 2011-2022 走看看