zoukankan      html  css  js  c++  java
  • 异步tcp通信——APM.Core 服务端概述

    为什么使用异步

      异步线程是由线程池负责管理,而多线程,我们可以自己控制,当然在多线程中我们也可以使用线程池。就拿网络扒虫而言,如果使用异步模式去实现,它使用线程池进行管理。异步操作执行时,会将操作丢给线程池中的某个工作线程来完成。当开始I/O操作的时候,异步会将工作线程还给线程池,这意味着获取网页的工作不会再占用任何CPU资源了。直到异步完成,即获取网页完毕,异步才会通过回调的方式通知线程池。可见,异步模式借助于线程池,极大地节约了CPU的资源。
      注:DMA(Direct Memory Access)直接内存存取,顾名思义DMA功能就是让设备可以绕过处理器,直接由内存来读取资料。通过直接内存访问的数据交换几乎可以不损耗CPU的资源。在硬件中,硬盘、网卡、声卡、显卡等都有直接内存访问功能。异步编程模型就是让我们充分利用硬件的直接内存访问功能来释放CPU的压力。
      两者的应用场景:
        计算密集型工作,采用多线程。
        IO密集型工作,采用异步机制。

    C#中实现异步tcp通信

      socket中仅仅需要将Blocking=false即可轻松实现异步,部分示例如下:

     1 /// <summary>
     2         /// 启动tcp监听
     3         /// </summary>
     4         public void Start()
     5         {
     6             if (!_isStarted)
     7             {
     8                 _isStarted = true;
     9                 _server = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    10 
    11                 #region socket配置            
    12                 LingerOption lingerOption = new LingerOption(true, 30);
    13                 _server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.Linger, lingerOption);
    14                 #endregion
    15 
    16                 _server.Blocking = false;
    17                 _server.ExclusiveAddressUse = false;
    18                 _server.Bind(new IPEndPoint(IPAddress.Any, this._port));
    19                 _server.Listen(1000000);
    20                 Parallel.For(0, 1000000, i =>
    21                 {
    22                     _server.BeginAccept(new AsyncCallback(ProcessAccept), _server);
    23                 });
    24             }
    25         }
    View Code

      tcp异步中处理接io操作最关键的参数:IAsyncResult,使用一般用begin开始,end结束。

      接收数据处理如下:

     1 /// <summary>
     2         /// 处理传入的连接请求
     3         /// </summary>
     4         private void ProcessAccept(IAsyncResult ar)
     5         {
     6             var s = (Socket)ar.AsyncState;
     7             var remote = s.EndAccept(ar);
     8             var user = new UserToken(this._maxBufferSize) { ID = remote.RemoteEndPoint.ToString(), Client = remote };
     9             remote.BeginReceive(user.ReceiveBuffer, 0, user.ReceiveBuffer.Length, SocketFlags.None, new AsyncCallback(ProcessReceive),
    10                 user);
    11             s.BeginAccept(new AsyncCallback(ProcessAccept), s);
    12         }
    View Code
     1 private void ProcessReceive(IAsyncResult ar)
     2         {
     3             var user = (UserToken)ar.AsyncState;
     4             var remote = user.Client;
     5             try
     6             {
     7                 if (remote.Connected)
     8                 {
     9                     var ns = remote.EndReceive(ar);
    10 
    11                     if (ns > 0)
    12                     {
    13                         var buffer = new byte[ns];
    14 
    15                         Buffer.BlockCopy(user.ReceiveBuffer, 0, buffer, 0, buffer.Length);
    16 
    17                         user.UnPackage(buffer, (p) =>
    18                         {
    19                             Interlocked.Increment(ref this._receiveCount);
    20                             this.RaiseOnOnReceived(user, p);
    21                         });
    22 
    23                         user.ClearReceiveBuffer();
    24 
    25                         buffer = null;
    26 
    27                         remote.BeginReceive(user.ReceiveBuffer, 0, user.ReceiveBuffer.Length, SocketFlags.None, new AsyncCallback(ProcessReceive), user);
    28                     }
    29                 }
    30                 else
    31                 {
    32                     this.RaiseOnDisConnected(user, new Exception("客户端已断开连接"));
    33                     this.CloseClient(user);
    34                 }
    35             }
    36             catch (SocketException sex)
    37             {
    38                 this.RaiseOnDisConnected(user, sex);
    39                 this.CloseClient(user);
    40             }
    41             catch (Exception ex)
    42             {
    43                 this.RaiseOnError(user, ex);
    44                 this.CloseClient(user);
    45             }
    46         }
    View Code

      发送数据处理如下:

     1 /// <summary>
     2         /// 发送信息
     3         /// </summary>
     4         /// <param name="remote"></param>
     5         /// <param name="data"></param>
     6         /// <param name="type"></param>
     7         /// <param name="auth"></param>
     8         private void SendAsync(UserToken remote, byte[] data, TransportType type = TransportType.Heart)
     9         {
    10             try
    11             {
    12                 using (var pakage = new TcpPackage(data, type, remote.Auth))
    13                 {
    14                     remote.Client.BeginSend(pakage.Data, 0, pakage.Data.Length, SocketFlags.None, new AsyncCallback(EndSend), remote);
    15                 }
    16 
    17             }
    18             catch (SocketException sex)
    19             {
    20                 this.RaiseOnDisConnected(remote, sex);
    21             }
    22             catch (Exception ex)
    23             {
    24                 this.RaiseOnError(remote, ex);
    25             }
    26         }
    View Code
    1 private void EndSend(IAsyncResult ar)
    2         {
    3             var remote = (UserToken)ar.AsyncState;
    4             remote.Client.EndSend(ar);
    5             Interlocked.Increment(ref this._sendCount);
    6         }
    View Code

      心跳、消息、文件等逻辑都可以基于发送逻辑来完成

    1 /// <summary>
    2         /// 回复心跳
    3         /// </summary>
    4         /// <param name="remote"></param>
    5         /// <param name="package"></param>
    6         private void ReplyHeart(UserToken remote, TcpPackage package)
    7         {
    8             this.SendAsync(remote, null, TransportType.Heart);
    9         }
    View Code
    1 /// <summary>
    2         /// 发送信息
    3         /// </summary>
    4         /// <param name="remote"></param>
    5         /// <param name="msg"></param>
    6         public void SendMsg(UserToken remote, byte[] msg)
    7         {
    8             this.SendAsync(remote, msg, TransportType.Message);
    9         }
    View Code
     1 /// <summary>
     2         /// 发送文件
     3         /// </summary>
     4         /// <param name="remote"></param>
     5         /// <param name="filePath"></param>
     6         public void SendFile(UserToken remote, string filePath)
     7         {
     8             using (var file = new TransferFileInfo()
     9             {
    10                 ID = remote.ID,
    11                 FileBytes = File.ReadAllBytes(filePath),
    12                 Name = filePath.Substring(filePath.LastIndexOf("\") + 1),
    13                 CreateTime = DateTime.Now.Ticks
    14             })
    15             {
    16                 var buffer = TransferFileInfo.Serialize(file);
    17                 this.SendAsync(remote, buffer, TransportType.File);
    18                 buffer = null;
    19             }
    20         }
    View Code
     1 /// <summary>
     2         /// 发送文件
     3         /// </summary>
     4         /// <param name="remote"></param>
     5         /// <param name="fileName"></param>
     6         /// <param name="file"></param>
     7         public void SendFile(UserToken remote, string fileName, byte[] file)
     8         {
     9             using (var fileInfo = new TransferFileInfo()
    10             {
    11                 ID = remote.ID,
    12                 FileBytes = file,
    13                 Name = fileName,
    14                 CreateTime = DateTime.Now.Ticks
    15             })
    16             {
    17                 var buffer = TransferFileInfo.Serialize(fileInfo);
    18                 this.SendAsync(remote, buffer, TransportType.File);
    19                 buffer = null;
    20             }
    21         }
    View Code

    异步tcp通信——APM.Core 服务端概述

    异步tcp通信——APM.Core 解包

    异步tcp通信——APM.Server 消息推送服务的实现

    异步tcp通信——APM.ConsoleDemo


    转载请标明本文来源:http://www.cnblogs.com/yswenli/
    更多内容欢迎star作者的github:https://github.com/yswenli/APM
    如果发现本文有什么问题和任何建议,也随时欢迎交流~

  • 相关阅读:
    生日蛋糕 (搜索)
    C语言中将二维数组作为函数参数来传递
    HDU 1052 Tian Ji -- The Horse Racing (贪心)(转载有修改)
    HDU 1789 Doing Homework again(排序,DP)
    10.6 ip:网络配置工具
    10.7 netstat:查看网络状态
    10.19 dig:域名查询工具
    10.21 nmap:网络探测工具和安全/端口扫描器
    10.20 host:域名查询工具
    LeetCode Add Two Numbers 两个数相加
  • 原文地址:https://www.cnblogs.com/yswenli/p/6265974.html
Copyright © 2011-2022 走看看