zoukankan      html  css  js  c++  java
  • DIOCP开源项目详解编码器和解码器和如何在传输中加入压缩和解压功能

    >>>>>>DIOCP讨论群:320641073

    >>>>>>SVN源码和DEMO下载:https://code.google.com/p/diocp/

    网络带宽有限,对数据进行压缩再进行传送可以有效的利用带宽和提高传输的效率。

    DIOCP中对数据进行压缩传送,需要修改编码器和解码器,先说说这两个东西的的用法和功能。

          

              举个例子:我们要把一台电脑快递回老家给正在上学的小弟使用,那么老家就是服务端(S),电脑就是我们要发送的对象(O),快递就是TCP传输过程。

    在这个过程中,发送一个对象(电脑)用到客户端的编码器,接收对象(电脑)用到服务端的解码器。

             

              在之前编写的DIOCP例子中都使用了JSonStream对象进行传输,这个TJSonStream类,主要有两个部分的数据,第一部分包含JSon字符串数据,第二部分包含Stream流数据。在三层数据保存例子中我们把客户端的请求放在JSon中。服务端接收数据后通过服务端的解码器还原成JSonStream对象。然后进行逻辑的处理,服务端回写对象时,同过服务端的编码器对回写的JSonStream进行编码发送,客户端通过客户端的解码器接收并还原成JSonStream对象。在服务端回写CDS数据包时将xml字符串数据写在JSonStream.Stream中,如果对Stream对象进行压缩,在做压缩中调试程序时发现一个70K的数据包进行一下压缩,数据包可以变成7K了,对文本压缩效果还是很不错的。

            *当然我们允许自己定义协议和编写编码和解码器,我们可以定义自己的TStrStream类或者TXMLStream等等,然后编写相应的编码器和解码器就行了。

            下面分析下代码

    客户端代码:

    var
      lvJSonStream, lvRecvObject:TJsonStream;
      lvStream:TStream;
      lvData:AnsiString;
      l, j, x:Integer;
    begin
      lvJSonStream := TJsonStream.Create;
      try
        lvJSonStream.JSon := SO();
        lvJSonStream.JSon.I['cmdIndex'] := 1001;   //打开一个SQL脚本,获取数据
        lvJSonStream.Json.S['sql'] := mmoSQL.Lines.Text;
    
        FClientSocket.sendObject(lvJSonStream);
      finally
        lvJSonStream.Free;
      end;

    首先是建立一个TJSonStream对象,然后设定信息,因为是发生SQL所以没有Stream数据。后面是用FClientSocket.sendObject(lvJSonStream);//用Socket进行发送,

    procedure TD10ClientSocket.sendObject(pvObject:TObject);
    begin
      if FCoder = nil then raise Exception.Create('没有注册对象编码和解码器(registerCoder)!');
    
      if not Active then Exit;
    
      FCoder.Encode(Self, pvObject);
    end;

    可以看出这里调用注册的编码器,调用Encode函数

    客户端编码器的Encode函数如下

    procedure TJSonStreamClientCoder.Encode(pvSocket: TClientSocket; pvObject:
        TObject);
    var
      lvJSonStream:TJsonStream;
      lvJSonLength:Integer;
      lvStreamLength:Integer;
      sData, lvTemp:String;
      lvStream:TStream;
      lvTempBuf:PAnsiChar;
    
      lvBytes, lvTempBytes:TBytes;
      
      l:Integer;
      lvBufBytes:array[0..1023] of byte;
    begin
      if pvObject = nil then exit;
      lvJSonStream := TJsonStream(pvObject);
      
      //是否压缩流
      if (lvJSonStream.Stream <> nil) then
      begin
        if lvJSonStream.Json.O['config.stream.zip'] <> nil then
        begin
          if lvJSonStream.Json.B['config.stream.zip'] then
          begin
            //压缩流
            TZipTools.compressStreamEx(lvJSonStream.Stream);
          end;
        end else if lvJSonStream.Stream.Size > 0 then
        begin
          //压缩流
          TZipTools.compressStreamEx(lvJSonStream.Stream);
          lvJSonStream.Json.B['config.stream.zip'] := true;
        end;
      end;
    
      sData := lvJSonStream.JSon.AsJSon(True);
    
      lvBytes := TNetworkTools.ansiString2Utf8Bytes(sData);
    
      lvJSonLength := Length(lvBytes);
      lvStream := lvJSonStream.Stream;
    
      lvJSonLength := TNetworkTools.htonl(lvJSonLength);
    
      if pvSocket.sendBuffer(@lvJSonLength, SizeOf(lvJSonLength)) = SOCKET_ERROR then Exit;
    
    
      if lvStream <> nil then
      begin
        lvStreamLength := lvStream.Size;
      end else
      begin
        lvStreamLength := 0;
      end;
    
      lvStreamLength := TNetworkTools.htonl(lvStreamLength);
      if pvSocket.sendBuffer(@lvStreamLength, SizeOf(lvStreamLength)) = SOCKET_ERROR then Exit;
    
      //json bytes
      if pvSocket.sendBuffer(@lvBytes[0], Length(lvBytes)) = SOCKET_ERROR then Exit;
    
      if lvStream.Size > 0 then
      begin
        lvStream.Position := 0;
        repeat
          l := lvStream.Read(lvBufBytes, SizeOf(lvBufBytes));
          if pvSocket.sendBuffer(@lvBufBytes[0], l) = SOCKET_ERROR then Exit;
        until (l = 0);
      end;
    end;

    该部分完成的功能有

    1.判断Stream数据是否需要压缩。

    2.发送Json数据长度和Stream数据长度

    3.发送Json数据

    4.发送Stream数据

    说明:

    lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
    lvStreamLength := TNetworkTools.ntohl(lvStreamLength);

    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

    这三行代码需要说明下,是为了兼容java,netty做服务端方便解码,当然我们也可以不进行转换。直接发送也是可以的。只要配合服务端就行了。协议是自己定义的。

    接下来是服务端IOCP队列中会收到接收数据的信号。

    function TIOCPObject.processIOQueued: Integer;
    var
      BytesTransferred:Cardinal;
      lvResultStatus:BOOL;
      lvRet:Integer;
      lvIOData:POVERLAPPEDEx;
    
      lvDataObject:TObject;
    
      lvClientContext:TIOCPClientContext;
    begin
      Result := IOCP_RESULT_OK;
    
      //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接受到数据为止
      lvResultStatus := GetQueuedCompletionStatus(FIOCoreHandle,
     
      .......
        if lvIOData.IO_TYPE = IO_TYPE_Accept then  //连接请求
        begin
          TIODataMemPool.instance.giveBackIOData(lvIOData);
          PostWSARecv(lvClientContext);
        end else if lvIOData.IO_TYPE = IO_TYPE_Recv then
        begin
          //加入到套接字对应的缓存中,处理逻辑
          lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
            lvIOData.Overlapped.InternalHigh);
    
          TIODataMemPool.instance.giveBackIOData(lvIOData);
    
          //继续投递接收请求
          PostWSARecv(lvClientContext);
        end;    
      .........
    end;

    //加入到套接字对应的缓存中,处理逻辑
    lvClientContext.RecvBuffer(lvIOData.DataBuf.buf,
      lvIOData.Overlapped.InternalHigh);

    //这里会调用解码器尝试进行解码

    procedure TIOCPClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
    var
      lvObject:TObject;
    begin
      FCS.Enter;
      try
        //加入到套接字对应的缓存
        FBuffers.AddBuffer(buf, len);
    
        //调用注册的解码器<进行解码>
        lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);
        if lvObject <> nil then
        try
          try
            //解码成功,调用业务逻辑的处理方法
            dataReceived(lvObject);
          except
            on E:Exception do
            begin
              TIOCPFileLogger.logErrMessage('截获处理逻辑异常!' + e.Message);
            end;
          end; 
          //清理掉这一次分配的内存<如果没有可用的内存块>清理
          if FBuffers.validCount = 0 then
          begin
            FBuffers.clearBuffer;
          end;
        finally
          lvObject.Free;
        end;
      finally
        FCS.Leave;
      end;
    end;

    我们在之前的Demo中使用的是TIOCPJSonStreamDecoder解码器

    function TIOCPJSonStreamDecoder.Decode(const inBuf: TBufferLink): TObject;
    var
      lvJSonLength, lvStreamLength:Integer;
      lvData:String;
      lvBuffer:array of Char;
      lvBufData:PAnsiChar;
      lvStream:TMemoryStream;
      lvJsonStream:TJsonStream;
      lvBytes:TBytes;
      lvValidCount:Integer;
    begin
      Result := nil;
    
      //如果缓存中的数据长度不够包头长度,解码失败<json字符串长度,流长度>
      lvValidCount := inBuf.validCount;
      if (lvValidCount < SizeOf(Integer) + SizeOf(Integer)) then
      begin
        Exit;
      end;
    
      //记录读取位置
      inBuf.markReaderIndex;
      inBuf.readBuffer(@lvJSonLength, SizeOf(Integer));
      inBuf.readBuffer(@lvStreamLength, SizeOf(Integer));
    
      lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
      lvStreamLength := TNetworkTools.ntohl(lvStreamLength);
    
      //如果缓存中的数据不够json的长度和流长度<说明数据还没有收取完毕>解码失败
      lvValidCount := inBuf.validCount;
      if lvValidCount < (lvJSonLength + lvStreamLength) then
      begin
        //返回buf的读取位置
        inBuf.restoreReaderIndex;
        exit;
      end else if (lvJSonLength + lvStreamLength) = 0 then
      begin
        //两个都为0<两个0>客户端可以用来作为自动重连使用
        TIOCPFileLogger.logDebugMessage('接收到一次[00]数据!');
        Exit;
      end;
    
    
    
      //解码成功
      lvJsonStream := TJsonStream.Create;
      Result := lvJsonStream;
    
      //读取json字符串
      if lvJSonLength > 0 then
      begin
        SetLength(lvBytes, lvJSonLength);
        ZeroMemory(@lvBytes[0], lvJSonLength);
        inBuf.readBuffer(@lvBytes[0], lvJSonLength);
    
        lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);
    
        lvJsonStream.Json := SO(lvData);
      end else
      begin
        TFileLogger.instance.logMessage('接收到一次JSon为空的一次数据请求!', 'IOCP_ALERT_');
      end;
    
    
      //读取流数据 
      if lvStreamLength > 0 then
      begin
        GetMem(lvBufData, lvStreamLength);
        try
          inBuf.readBuffer(lvBufData, lvStreamLength);
          lvJsonStream.Stream.Size := 0;
          lvJsonStream.Stream.WriteBuffer(lvBufData^, lvStreamLength);
    
          //解压流
          if lvJsonStream.Json.B['config.stream.zip'] then
          begin
            //解压
            TZipTools.unCompressStreamEX(lvJsonStream.Stream);
          end;
        finally
          FreeMem(lvBufData, lvStreamLength);
        end;
      end;
    end;

    //服务端解码器中有三行代码来配合客户端的编码流


    lvJSonLength := TNetworkTools.ntohl(lvJSonLength);
    lvStreamLength := TNetworkTools.ntohl(lvStreamLength);   
    lvData := TNetworkTools.Utf8Bytes2AnsiString(lvBytes);

    /////

    服务端解码器主要完成的功能有

    0.判断接收到的数据是否可以进行解码,如果不可以退出,解码不成功。

    1.接收json长度,流数据长度

    2.接收json数据,接收流数据存入JsonStream.json中,

    3.根据json中config.stream.zip进行判断流数据是否需要解压.放入JsonStream.stream中
    4.解码成功返回JsonStream对象。

    解码完成后可以看到

    lvObject := TIOCPContextFactory.instance.FDecoder.Decode(FBuffers);

    if lvObject <> nil then

    try //解码成功,调用业务逻辑的处理方法

        dataReceived(lvObject);

    ………

    解码成功调用dataReceived,进行逻辑的处理。

    总结:

       服务端的解码器配套客户端的编码器,服务端的编码器配套客户端的解码器。

  • 相关阅读:
    生成函数解决多重集合的计数问题
    kmp板子
    poj1001
    【题解】洛谷P1315 [NOIP2011TG] 观光公交(前缀和+贪心)
    【题解】洛谷P1941 [NOIP2014TG] 飞扬的小鸟(背包DP)
    【题解】洛谷P2679 [NOIP2015TG] 子串(DP+滚动数组)
    【题解】洛谷P1514 [NOIP2010TG] 引水入城(DFS+DP)
    【题解】洛谷P1052 [NOIP2005TG] 过河(DP+离散化)
    [arc063F]Snuke's Coloring 2-[线段树+观察]
    [agc001E]BBQ Hard[组合数性质+dp]
  • 原文地址:https://www.cnblogs.com/DKSoft/p/3107933.html
Copyright © 2011-2022 走看看