zoukankan      html  css  js  c++  java
  • C# 异步 TCP 服务器完整实现

    TCP异步Socket模型

    C#的TCP异步Socket模型是通过Begin-End模式实现的。例如提供 BeginConnect、BeginAccept、BeginSend 和 BeginReceive等。

    IAsyncResult BeginAccept(AsyncCallback callback, object state);

    AsyncCallback 回调在函数执行完毕后执行。state 对象被用于在执行函数和回调函数间传输信息。

    Socket socket = new Socket(
                      AddressFamily.InterNetwork, 
                      SocketType.Stream, 
                      ProtocolType.Tcp);
    IPEndPoint iep = new IPEndPoint(IPAddress.Any, 8888);
    socket.Bind(iep);
    socket.Listen(5);
    socket.BeginAccept (new AsyncCallback(CallbackAccept), socket);
    
    private void CallbackAccept(IAsyncResult iar)
    {
      Socket server = (Socket)iar.AsyncState;
      Socket client = server.EndAccept(iar);
    }

    则在Accept一个TcpClient,需要维护TcpClient列表。

    private List<TcpClientState> clients;

    异步TCP服务器完整实现

      1   /// <summary>
      2   /// 异步TCP服务器
      3   /// </summary>
      4   public class AsyncTcpServer : IDisposable
      5   {
      6     #region Fields
      7 
      8     private TcpListener listener;
      9     private List<TcpClientState> clients;
     10     private bool disposed = false;
     11 
     12     #endregion
     13 
     14     #region Ctors
     15 
     16     /// <summary>
     17     /// 异步TCP服务器
     18     /// </summary>
     19     /// <param name="listenPort">监听的端口</param>
     20     public AsyncTcpServer(int listenPort)
     21       : this(IPAddress.Any, listenPort)
     22     {
     23     }
     24 
     25     /// <summary>
     26     /// 异步TCP服务器
     27     /// </summary>
     28     /// <param name="localEP">监听的终结点</param>
     29     public AsyncTcpServer(IPEndPoint localEP)
     30       : this(localEP.Address, localEP.Port)
     31     {
     32     }
     33 
     34     /// <summary>
     35     /// 异步TCP服务器
     36     /// </summary>
     37     /// <param name="localIPAddress">监听的IP地址</param>
     38     /// <param name="listenPort">监听的端口</param>
     39     public AsyncTcpServer(IPAddress localIPAddress, int listenPort)
     40     {
     41       Address = localIPAddress;
     42       Port = listenPort;
     43       this.Encoding = Encoding.Default;
     44 
     45       clients = new List<TcpClientState>();
     46 
     47       listener = new TcpListener(Address, Port);
     48       listener.AllowNatTraversal(true);
     49     }
     50 
     51     #endregion
     52 
     53     #region Properties
     54 
     55     /// <summary>
     56     /// 服务器是否正在运行
     57     /// </summary>
     58     public bool IsRunning { get; private set; }
     59     /// <summary>
     60     /// 监听的IP地址
     61     /// </summary>
     62     public IPAddress Address { get; private set; }
     63     /// <summary>
     64     /// 监听的端口
     65     /// </summary>
     66     public int Port { get; private set; }
     67     /// <summary>
     68     /// 通信使用的编码
     69     /// </summary>
     70     public Encoding Encoding { get; set; }
     71 
     72     #endregion
     73 
     74     #region Server
     75 
     76     /// <summary>
     77     /// 启动服务器
     78     /// </summary>
     79     /// <returns>异步TCP服务器</returns>
     80     public AsyncTcpServer Start()
     81     {
     82       if (!IsRunning)
     83       {
     84         IsRunning = true;
     85         listener.Start();
     86         listener.BeginAcceptTcpClient(
     87           new AsyncCallback(HandleTcpClientAccepted), listener);
     88       }
     89       return this;
     90     }
     91 
     92     /// <summary>
     93     /// 启动服务器
     94     /// </summary>
     95     /// <param name="backlog">
     96     /// 服务器所允许的挂起连接序列的最大长度
     97     /// </param>
     98     /// <returns>异步TCP服务器</returns>
     99     public AsyncTcpServer Start(int backlog)
    100     {
    101       if (!IsRunning)
    102       {
    103         IsRunning = true;
    104         listener.Start(backlog);
    105         listener.BeginAcceptTcpClient(
    106           new AsyncCallback(HandleTcpClientAccepted), listener);
    107       }
    108       return this;
    109     }
    110 
    111     /// <summary>
    112     /// 停止服务器
    113     /// </summary>
    114     /// <returns>异步TCP服务器</returns>
    115     public AsyncTcpServer Stop()
    116     {
    117       if (IsRunning)
    118       {
    119         IsRunning = false;
    120         listener.Stop();
    121 
    122         lock (this.clients)
    123         {
    124           for (int i = 0; i < this.clients.Count; i++)
    125           {
    126             this.clients[i].TcpClient.Client.Disconnect(false);
    127           }
    128           this.clients.Clear();
    129         }
    130 
    131       }
    132       return this;
    133     }
    134 
    135     #endregion
    136 
    137     #region Receive
    138 
    139     private void HandleTcpClientAccepted(IAsyncResult ar)
    140     {
    141       if (IsRunning)
    142       {
    143         TcpListener tcpListener = (TcpListener)ar.AsyncState;
    144 
    145         TcpClient tcpClient = tcpListener.EndAcceptTcpClient(ar);
    146         byte[] buffer = new byte[tcpClient.ReceiveBufferSize];
    147 
    148         TcpClientState internalClient 
    149           = new TcpClientState(tcpClient, buffer);
    150         lock (this.clients)
    151         {
    152           this.clients.Add(internalClient);
    153           RaiseClientConnected(tcpClient);
    154         }
    155 
    156         NetworkStream networkStream = internalClient.NetworkStream;
    157         networkStream.BeginRead(
    158           internalClient.Buffer, 
    159           0, 
    160           internalClient.Buffer.Length, 
    161           HandleDatagramReceived, 
    162           internalClient);
    163 
    164         tcpListener.BeginAcceptTcpClient(
    165           new AsyncCallback(HandleTcpClientAccepted), ar.AsyncState);
    166       }
    167     }
    168 
    169     private void HandleDatagramReceived(IAsyncResult ar)
    170     {
    171       if (IsRunning)
    172       {
    173         TcpClientState internalClient = (TcpClientState)ar.AsyncState;
    174         NetworkStream networkStream = internalClient.NetworkStream;
    175 
    176         int numberOfReadBytes = 0;
    177         try
    178         {
    179           numberOfReadBytes = networkStream.EndRead(ar);
    180         }
    181         catch
    182         {
    183           numberOfReadBytes = 0;
    184         }
    185 
    186         if (numberOfReadBytes == 0)
    187         {
    188           // connection has been closed
    189           lock (this.clients)
    190           {
    191             this.clients.Remove(internalClient);
    192             RaiseClientDisconnected(internalClient.TcpClient);
    193             return;
    194           }
    195         }
    196 
    197         // received byte and trigger event notification
    198         byte[] receivedBytes = new byte[numberOfReadBytes];
    199         Buffer.BlockCopy(
    200           internalClient.Buffer, 0, 
    201           receivedBytes, 0, numberOfReadBytes);
    202         RaiseDatagramReceived(internalClient.TcpClient, receivedBytes);
    203         RaisePlaintextReceived(internalClient.TcpClient, receivedBytes);
    204 
    205         // continue listening for tcp datagram packets
    206         networkStream.BeginRead(
    207           internalClient.Buffer, 
    208           0, 
    209           internalClient.Buffer.Length, 
    210           HandleDatagramReceived, 
    211           internalClient);
    212       }
    213     }
    214 
    215     #endregion
    216 
    217     #region Events
    218 
    219     /// <summary>
    220     /// 接收到数据报文事件
    221     /// </summary>
    222     public event EventHandler<TcpDatagramReceivedEventArgs<byte[]>> DatagramReceived;
    223     /// <summary>
    224     /// 接收到数据报文明文事件
    225     /// </summary>
    226     public event EventHandler<TcpDatagramReceivedEventArgs<string>> PlaintextReceived;
    227 
    228     private void RaiseDatagramReceived(TcpClient sender, byte[] datagram)
    229     {
    230       if (DatagramReceived != null)
    231       {
    232         DatagramReceived(this, new TcpDatagramReceivedEventArgs<byte[]>(sender, datagram));
    233       }
    234     }
    235 
    236     private void RaisePlaintextReceived(TcpClient sender, byte[] datagram)
    237     {
    238       if (PlaintextReceived != null)
    239       {
    240         PlaintextReceived(this, new TcpDatagramReceivedEventArgs<string>(
    241           sender, this.Encoding.GetString(datagram, 0, datagram.Length)));
    242       }
    243     }
    244 
    245     /// <summary>
    246     /// 与客户端的连接已建立事件
    247     /// </summary>
    248     public event EventHandler<TcpClientConnectedEventArgs> ClientConnected;
    249     /// <summary>
    250     /// 与客户端的连接已断开事件
    251     /// </summary>
    252     public event EventHandler<TcpClientDisconnectedEventArgs> ClientDisconnected;
    253 
    254     private void RaiseClientConnected(TcpClient tcpClient)
    255     {
    256       if (ClientConnected != null)
    257       {
    258         ClientConnected(this, new TcpClientConnectedEventArgs(tcpClient));
    259       }
    260     }
    261 
    262     private void RaiseClientDisconnected(TcpClient tcpClient)
    263     {
    264       if (ClientDisconnected != null)
    265       {
    266         ClientDisconnected(this, new TcpClientDisconnectedEventArgs(tcpClient));
    267       }
    268     }
    269 
    270     #endregion
    271 
    272     #region Send
    273 
    274     /// <summary>
    275     /// 发送报文至指定的客户端
    276     /// </summary>
    277     /// <param name="tcpClient">客户端</param>
    278     /// <param name="datagram">报文</param>
    279     public void Send(TcpClient tcpClient, byte[] datagram)
    280     {
    281       if (!IsRunning)
    282         throw new InvalidProgramException("This TCP server has not been started.");
    283 
    284       if (tcpClient == null)
    285         throw new ArgumentNullException("tcpClient");
    286 
    287       if (datagram == null)
    288         throw new ArgumentNullException("datagram");
    289 
    290       tcpClient.GetStream().BeginWrite(
    291         datagram, 0, datagram.Length, HandleDatagramWritten, tcpClient);
    292     }
    293 
    294     private void HandleDatagramWritten(IAsyncResult ar)
    295     {
    296       ((TcpClient)ar.AsyncState).GetStream().EndWrite(ar);
    297     }
    298 
    299     /// <summary>
    300     /// 发送报文至指定的客户端
    301     /// </summary>
    302     /// <param name="tcpClient">客户端</param>
    303     /// <param name="datagram">报文</param>
    304     public void Send(TcpClient tcpClient, string datagram)
    305     {
    306       Send(tcpClient, this.Encoding.GetBytes(datagram));
    307     }
    308 
    309     /// <summary>
    310     /// 发送报文至所有客户端
    311     /// </summary>
    312     /// <param name="datagram">报文</param>
    313     public void SendAll(byte[] datagram)
    314     {
    315       if (!IsRunning)
    316         throw new InvalidProgramException("This TCP server has not been started.");
    317 
    318       for (int i = 0; i < this.clients.Count; i++)
    319       {
    320         Send(this.clients[i].TcpClient, datagram);
    321       }
    322     }
    323 
    324     /// <summary>
    325     /// 发送报文至所有客户端
    326     /// </summary>
    327     /// <param name="datagram">报文</param>
    328     public void SendAll(string datagram)
    329     {
    330       if (!IsRunning)
    331         throw new InvalidProgramException("This TCP server has not been started.");
    332 
    333       SendAll(this.Encoding.GetBytes(datagram));
    334     }
    335 
    336     #endregion
    337 
    338     #region IDisposable Members
    339 
    340     /// <summary>
    341     /// Performs application-defined tasks associated with freeing, 
    342     /// releasing, or resetting unmanaged resources.
    343     /// </summary>
    344     public void Dispose()
    345     {
    346       Dispose(true);
    347       GC.SuppressFinalize(this);
    348     }
    349 
    350     /// <summary>
    351     /// Releases unmanaged and - optionally - managed resources
    352     /// </summary>
    353     /// <param name="disposing"><c>true</c> to release 
    354     /// both managed and unmanaged resources; <c>false</c> 
    355     /// to release only unmanaged resources.</param>
    356     protected virtual void Dispose(bool disposing)
    357     {
    358       if (!this.disposed)
    359       {
    360         if (disposing)
    361         {
    362           try
    363           {
    364             Stop();
    365 
    366             if (listener != null)
    367             {
    368               listener = null;
    369             }
    370           }
    371           catch (SocketException ex)
    372           {
    373             ExceptionHandler.Handle(ex);
    374           }
    375         }
    376 
    377         disposed = true;
    378       }
    379     }
    380 
    381     #endregion
    382   }

    使用举例

     1   class Program
     2   {
     3     static AsyncTcpServer server;
     4 
     5     static void Main(string[] args)
     6     {
     7       LogFactory.Assign(new ConsoleLogFactory());
     8 
     9       server = new AsyncTcpServer(9999);
    10       server.Encoding = Encoding.UTF8;
    11       server.ClientConnected += 
    12         new EventHandler<TcpClientConnectedEventArgs>(server_ClientConnected);
    13       server.ClientDisconnected += 
    14         new EventHandler<TcpClientDisconnectedEventArgs>(server_ClientDisconnected);
    15       server.PlaintextReceived += 
    16         new EventHandler<TcpDatagramReceivedEventArgs<string>>(server_PlaintextReceived);
    17       server.Start();
    18 
    19       Console.WriteLine("TCP server has been started.");
    20       Console.WriteLine("Type something to send to client...");
    21       while (true)
    22       {
    23         string text = Console.ReadLine();
    24         server.SendAll(text);
    25       }
    26     }
    27 
    28     static void server_ClientConnected(object sender, TcpClientConnectedEventArgs e)
    29     {
    30       Logger.Debug(string.Format(CultureInfo.InvariantCulture, 
    31         "TCP client {0} has connected.", 
    32         e.TcpClient.Client.RemoteEndPoint.ToString()));
    33     }
    34 
    35     static void server_ClientDisconnected(object sender, TcpClientDisconnectedEventArgs e)
    36     {
    37       Logger.Debug(string.Format(CultureInfo.InvariantCulture, 
    38         "TCP client {0} has disconnected.", 
    39         e.TcpClient.Client.RemoteEndPoint.ToString()));
    40     }
    41 
    42     static void server_PlaintextReceived(object sender, TcpDatagramReceivedEventArgs<string> e)
    43     {
    44       if (e.Datagram != "Received")
    45       {
    46         Console.Write(string.Format("Client : {0} --> ", 
    47           e.TcpClient.Client.RemoteEndPoint.ToString()));
    48         Console.WriteLine(string.Format("{0}", e.Datagram));
    49         server.Send(e.TcpClient, "Server has received you text : " + e.Datagram);
    50       }
    51     }
    52   }

    本文为 Dennis Gao 原创技术文章,发表于博客园博客,未经作者本人允许禁止任何形式的转载。

  • 相关阅读:
    Zend Framework 2.1.5 中根据服务器的环境配置调用数据库等的不同配置
    在基于 Eclipse 的 IDE 中安装和使用 Emmet(ZenCoding)
    【翻译】Emmet(Zen Coding)官方文档 之六 自定义 Emmet
    【翻译】Emmet(Zen Coding)官方文档 之二 缩写
    【翻译】Emmet(Zen Coding)官方文档 之七 一览表
    【翻译】Emmet(Zen Coding)官方文档 之三 CSS 缩写
    【翻译】Emmet(Zen Coding)官方文档 之四 动作
    【翻译】Emmet(Zen Coding)官方文档 之一 web 编程的必备工具
    Zend Framework 2 时区设置警告问题的解决
    【翻译】Emmet (Zen Coding) 元素类型
  • 原文地址:https://www.cnblogs.com/gaochundong/p/csharp_async_tcp_server.html
Copyright © 2011-2022 走看看