zoukankan      html  css  js  c++  java
  • DELPHI高性能大容量SOCKET并发(四):粘包、分包、解包

    粘包

    使用TCP长连接就会引入粘包的问题,粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着前一包数据的尾。粘包可能由发送方造成,也可能由接收方造成。TCP为提高传输效率,发送方往往要收集到足够多的数据后才发送一包数据,造成多个数据包的粘连。如果接收进程不及时接收数据,已收到的数据就放在系统接收缓冲区,用户进程读取数据时就可能同时读到多个数据包。

    粘包一般的解决办法是制定通讯协议,由协议来规定如何分包解包。

    分包

    在IOCPDemo例子程序中,我们分包的逻辑是先发一个长度,然后紧接着是数据包内容,这样就可以把每个包分开。

    应用层数据包格式如下:

    应用层数据包格式  
    数据包长度Len:Cardinal(4字节无符号整数) 数据包内容,长度为Len
    IOCPSocket分包处理主要代码,我们收到的数据都是在TSocketHandle.ProcessIOComplete方法中处理:
    [delphi] view plain copy
     
    1. procedure TSocketHandle.ProcessIOComplete(AIocpRecord: PIocpRecord;  
    2.   const ACount: Cardinal);  
    3. begin  
    4.   case AIocpRecord.IocpOperate of  
    5.     ioNone: Exit;  
    6.     ioRead: //收到数据  
    7.     begin  
    8.       FActiveTime := Now;  
    9.       ReceiveData(AIocpRecord.WsaBuf.buf, ACount);  
    10.       if FConnected then  
    11.         PreRecv(AIocpRecord); //投递请求  
    12.     end;  
    13.     ioWrite: //发送数据完成,需要释放AIocpRecord的指针  
    14.     begin  
    15.       FActiveTime := Now;  
    16.       FSendOverlapped.Release(AIocpRecord);  
    17.     end;  
    18.     ioStream:  
    19.     begin  
    20.       FActiveTime := Now;  
    21.       FSendOverlapped.Release(AIocpRecord);  
    22.       WriteStream; //继续发送流  
    23.     end;  
    24.   end;  
    25. end;  
    如果是收到数据,则调用ReceiveData函数,ReceiveData主要功能是把数据的写入流中,然后调用Process分包。FInputBuf是一个内存流(FInputBuf: TMemoryStream),内存流的每次写入会造成一次内存分配,如果要获得更高的效率,可以替换为内存池等更好的内存管理方式。还有一种更好的解决方案是规定每次发包的大小,如每个包最大不超过64K,哪么缓冲区的最大大小可以设置为128K(缓存两个数据包),这样就可以每次创建对象时一次分配好,减少内存分配次数,提高效率。(内存的分配和释放比内存的读写效率要低)
    [delphi] view plain copy
     
    1. procedure TSocketHandle.ReceiveData(AData: PAnsiChar; const ALen: Cardinal);  
    2. begin  
    3.   FInputBuf.Write(AData^, ALen);  
    4.   Process;  
    5. end;  
    Process则根据收到的数据进行分包逻辑,如果不够一个包,则继续等待接收数据,如果够一个或多个包,则循环调用Execute函数进行处理,代码如下:
    [delphi] view plain copy
     
    1. procedure TSocketHandle.Process;  
    2. var  
    3.   AData, ALast, NewBuf: PByte;  
    4.   iLenOffset, iOffset, iReserveLen: Integer;  
    5.   
    6.   function ReadLen: Integer;  
    7.   var  
    8.     wLen: Word;  
    9.     cLen: Cardinal;  
    10.   begin  
    11.     FInputBuf.Position := iOffset;  
    12.     if FLenType = ltWord then  
    13.     begin  
    14.       FInputBuf.Read(wLen, SizeOf(wLen));  
    15.       //wLen := ntohs(wLen);  
    16.       Result := wLen;  
    17.     end  
    18.     else  
    19.     begin  
    20.       FInputBuf.Read(cLen, SizeOf(cLen));  
    21.       //cLen := ntohl(cLen);  
    22.       Result := cLen;  
    23.     end;  
    24.   end;  
    25. begin  
    26.   case FLenType of  
    27.     ltWord, ltCardinal:  
    28.     begin  
    29.       if FLenType = ltWord then  
    30.         iLenOffset := 2  
    31.       else  
    32.         iLenOffset := 4;  
    33.       iReserveLen := 0;  
    34.       FPacketLen := 0;  
    35.       iOffset := 0;  
    36.       if FPacketLen <= then  
    37.       begin  
    38.         if FInputBuf.Size < iLenOffset then Exit;  
    39.         FInputBuf.Position := 0; //移动到最前面  
    40.         FPacketLen := ReadLen;  
    41.         iOffset := iLenOffset;  
    42.         iReserveLen := FInputBuf.Size - iOffset;  
    43.         if FPacketLen > iReserveLen then //不够一个包的长度  
    44.         begin  
    45.           FInputBuf.Position := FInputBuf.Size; //移动到最后,以便接收后续数据  
    46.           FPacketLen := 0;  
    47.           Exit;  
    48.         end;  
    49.       end;  
    50.       while (FPacketLen > 0) and (iReserveLen >= FPacketLen) do //如果数据够长,则处理  
    51.       begin //多个包循环处理  
    52.         AData := Pointer(Longint(FInputBuf.Memory) + iOffset); //取得当前的指针  
    53.         Execute(AData, FPacketLen);  
    54.         iOffset := iOffset + FPacketLen; //移到下一个点  
    55.         FPacketLen := 0;  
    56.         iReserveLen := FInputBuf.Size - iOffset;  
    57.         if iReserveLen > iLenOffset then //剩下的数据  
    58.         begin  
    59.           FPacketLen := ReadLen;  
    60.           iOffset := iOffset + iLenOffset;  
    61.           iReserveLen := FInputBuf.Size - iOffset;  
    62.           if FPacketLen > iReserveLen then //不够一个包的长度,需要把长度回退  
    63.           begin  
    64.             iOffset := iOffset - iLenOffset;  
    65.             iReserveLen := FInputBuf.Size - iOffset;  
    66.             FPacketLen := 0;  
    67.           end;  
    68.         end  
    69.         else //不够长度字节数  
    70.           FPacketLen := 0;  
    71.       end;  
    72.       if iReserveLen > then //把剩下的自己缓存起来  
    73.       begin  
    74.         ALast := Pointer(Longint(FInputBuf.Memory) + iOffset);  
    75.         GetMem(NewBuf, iReserveLen);  
    76.         try  
    77.           CopyMemory(NewBuf, ALast, iReserveLen);  
    78.           FInputBuf.Clear;  
    79.           FInputBuf.Write(NewBuf^, iReserveLen);  
    80.         finally  
    81.           FreeMemory(NewBuf);  
    82.         end;  
    83.       end  
    84.       else  
    85.       begin  
    86.         FInputBuf.Clear;  
    87.       end;  
    88.     end;  
    89.   else  
    90.     begin  
    91.       FInputBuf.Position := 0;  
    92.       AData := Pointer(Longint(FInputBuf.Memory)); //取得当前的指针  
    93.       Execute(AData, FInputBuf.Size);  
    94.       FInputBuf.Clear;  
    95.     end;  
    96.   end;  
    97. end;  

    解包

    由于我们应用层数据包既可以传命令也可以传数据,因而针对每个包我们进行解包,分出命令和数据分别处理,因而每个Socket服务对象都需要解包,我们解包的逻辑是放在TBaseSocket.DecodePacket中,命令和数据的包格式为:

    命令长度Len:Cardinal(4字节无符号整数) 命令 数据
    这里和第一版公布的代码不同,这版的代码对命令进行了编码,采用UTF-8编码,代码如下:
    [delphi] view plain copy
     
    1. function TBaseSocket.DecodePacket(APacketData: PByte;  
    2.   const ALen: Integer): Boolean;  
    3. var  
    4.   CommandLen: Integer;  
    5.   UTF8Command: UTF8String;  
    6. begin  
    7.   if ALen > then //命令长度为4字节,因而长度必须大于4  
    8.   begin  
    9.     CopyMemory(@CommandLen, APacketData, SizeOf(Cardinal)); //获取命令长度  
    10.     Inc(APacketData, SizeOf(Cardinal));  
    11.     SetLength(UTF8Command, CommandLen);  
    12.     CopyMemory(PUTF8String(UTF8Command), APacketData, CommandLen); //读取命令  
    13.     Inc(APacketData, CommandLen);  
    14.     FRequestData := APacketData; //数据  
    15.     FRequestDataLen := ALen - SizeOf(Cardinal) - CommandLen; //数据长度  
    16.     FRequest.Text := Utf8ToAnsi(UTF8Command); //把UTF8转为Ansi  
    17.     Result := True;  
    18.   end  
    19.   else  
    20.     Result := False;   
    21. end;  
    具体每个协议可以集成Execute方法,调用DecodePacket进行解包,然后根据命令进行协议逻辑处理,例如TSQLSocket主要代码如下:
    [delphi] view plain copy
     
    1. {* SQL查询SOCKET基类 *}  
    2. TSQLSocket = class(TBaseSocket)  
    3. private  
    4.   {* 开始事务创建TADOConnection,关闭事务时释放 *}  
    5.   FBeginTrans: Boolean;  
    6.   FADOConn: TADOConnection;  
    7. protected  
    8.   {* 处理数据接口 *}  
    9.   procedure Execute(AData: PByte; const ALen: Cardinal); override;  
    10.   {* 返回SQL语句执行结果 *}  
    11.   procedure DoCmdSQLOpen;  
    12.   {* 执行SQL语句 *}  
    13.   procedure DoCmdSQLExec;  
    14.   {* 开始事务 *}  
    15.   procedure DoCmdBeginTrans;  
    16.   {* 提交事务 *}  
    17.   procedure DoCmdCommitTrans;  
    18.   {* 回滚事务 *}  
    19.   procedure DoCmdRollbackTrans;  
    20. public  
    21.   procedure DoCreate; override;  
    22.   destructor Destroy; override;  
    23.   {* 获取SQL语句 *}  
    24.   function GetSQL: string;  
    25.   property BeginTrans: Boolean read FBeginTrans;  
    26. end;  
    Exceute是调用DecodePacket进行解包,然后获取命令分别调用不同的命令处理逻辑,代码如下:
    [delphi] view plain copy
     
    1. procedure TSQLSocket.Execute(AData: PByte; const ALen: Cardinal);  
    2. var  
    3.   sErr: string;  
    4. begin  
    5.   inherited;  
    6.   FRequest.Clear;  
    7.   FResponse.Clear;  
    8.   try  
    9.     AddResponseHeader;  
    10.     if ALen = then  
    11.     begin  
    12.       DoFailure(CIPackLenError);  
    13.       DoSendResult;  
    14.       Exit;  
    15.     end;  
    16.     if DecodePacket(AData, ALen) then  
    17.     begin  
    18.       FResponse.Clear;  
    19.       AddResponseHeader;  
    20.       case StrToSQLCommand(Command) of  
    21.         scLogin:  
    22.         begin  
    23.           DoCmdLogin;  
    24.           DoSendResult;  
    25.         end;  
    26.         scActive:  
    27.         begin  
    28.           DoSuccess;  
    29.           DoSendResult;  
    30.         end;  
    31.         scSQLOpen:  
    32.         begin  
    33.           DoCmdSQLOpen;  
    34.         end;  
    35.         scSQLExec:  
    36.         begin  
    37.           DoCmdSQLExec;  
    38.           DoSendResult;  
    39.         end;  
    40.         scBeginTrans:  
    41.         begin  
    42.           DoCmdBeginTrans;  
    43.           DoSendResult;  
    44.         end;  
    45.         scCommitTrans:  
    46.         begin  
    47.           DoCmdCommitTrans;  
    48.           DoSendResult;  
    49.         end;  
    50.         scRollbackTrans:  
    51.         begin  
    52.           DoCmdRollbackTrans;  
    53.           DoSendResult;  
    54.         end;  
    55.       else  
    56.         DoFailure(CINoExistCommand, 'Unknow Command');  
    57.         DoSendResult;  
    58.       end;  
    59.     end  
    60.     else  
    61.     begin  
    62.       DoFailure(CIPackFormatError, 'Packet Must Include  ');  
    63.       DoSendResult;  
    64.     end;  
    65.   except  
    66.     on E: Exception do //发生未知错误,断开连接  
    67.     begin  
    68.       sErr := RemoteAddress + ':' + IntToStr(RemotePort) + CSComma + 'Unknow Error: ' + E.Message;  
    69.       WriteLogMsg(ltError, sErr);  
    70.       Disconnect;  
    71.     end;  
    72.   end;  
    73. end;  

    更详细代码见示例代码的IOCPSocket单元。

    V1版下载地址:http://download.csdn.net/detail/sqldebug_fan/4510076,需要资源10分,有稳定性问题,可以作为研究稳定性用;

    V2版下载地址:http://download.csdn.net/detail/sqldebug_fan/5560185,不需要资源分,解决了稳定性问题和提高性能;免责声明:此代码只是为了演示IOCP编程,仅用于学习和研究,切勿用于商业用途。水平有限,错误在所难免,欢迎指正和指导。邮箱地址:fansheng_hx@163.com。

    http://blog.csdn.net/sqldebug_fan/article/details/7907765

  • 相关阅读:
    Win10下 Docker Flask实例
    4.1 线性映射的概念
    桥梁的基本组成和分类
    Qt5字符串编码转换学习
    在右键菜单中添加用Jupyter Notebook打开
    左右手(直角)坐标系叉乘计算公式
    __new__方法与单键实例
    向量组的秩
    从线性组合的角度理解三维运算
    Hexo使用小结
  • 原文地址:https://www.cnblogs.com/findumars/p/8196130.html
Copyright © 2011-2022 走看看