zoukankan      html  css  js  c++  java
  • 异步SOCKET分包和组包的一种通用算法

    unit uPackage;
    // 应用协议
    // cxg 2016-9-23
    // 包=包头+包体

    interface

    uses
    SysUtils, Classes, PeachCtrl.Net.IocpTcpServer, System.Generics.Collections,
    Winapi.Windows, System.SyncObjs, PeachCtrl.Net.BlockingTcpClient, untLog,
    System.Math;

    const // 包长
    pack_len = 8192;

    const // 版本号
    ver_1 = 1;

    const // 命令分类
    cmd_qry_req = 1;
    cmd_qry_res = 2;
    cmd_post_req = 3;
    cmd_post_res = 4;
    cmd_up_file_req = 5;
    cmd_up_file_res = 6;
    cmd_down_file_req = 7;
    cmd_down_file_res = 8;
    cmd_data = 9;

    type
    THead = packed record // 包头
    cmd: Byte;
    len: Integer;
    packNo: Integer;
    packQty: Integer;
    ver: Byte;
    end;

    type
    TTask = record // 任务
    context: TCustomIocpTcpServer.TPerHandleData;
    body: TBytes;
    cmd: Byte;
    ver: Byte;
    end;

    PTTask = ^TTask;

    type
    TTaskClient = record
    Recved: Boolean;
    body: TBytes;
    end;

    var
    g_tmpList: TList<PTTask>; // 临时队列
    g_task_client: TTaskClient;
    g_iocp_handle: THandle;

    function ValidHead(AHead: THead): Boolean;

    function GetTask(AContext: TCustomIocpTcpServer.TPerHandleData): PTTask;

    procedure ServerProcessRecved(AContext: TCustomIocpTcpServer.TPerHandleData);

    procedure ClientProcessRecved(ARecvThread: TCustomBlockingTcpClient.TRecvThread);

    procedure ServerSendStream(const AStream: TStream; AContext: TCustomIocpTcpServer.TPerHandleData; cmd, ver: Byte);

    function StreamToBytes(aStream: TStream): TBytes;

    function BytesToStream(aBytes: TBytes): TStream;

    procedure QuerySQL(const aSQL: string; aClient: TBlockingTcpClient);

    procedure InitTaskClient;

    function GetBodyLen: Integer;

    function GetHeadLen: Integer;

    function GetPackQty(AStream: TStream): Integer;

    function GetLastPackLen(AStream: TStream): Integer;

    implementation

    var
    FCS: TCriticalSection;

    function GetLastPackLen(AStream: TStream): Integer;
    begin
    Result := AStream.Size mod (pack_len - SizeOf(THead));
    if Result = 0 then
    Result := pack_len
    else
    Result := Result + SizeOf(THead);
    end;

    function GetPackQty(AStream: TStream): Integer;
    begin
    Result := Ceil(AStream.Size / (pack_len - SizeOf(THead)));
    end;

    function GetHeadLen: Integer;
    begin
    Result := SizeOf(THead);
    end;

    function GetBodyLen: Integer;
    begin
    Result := pack_len - SizeOf(THead);
    end;

    procedure InitTaskClient;
    begin
    g_task_client.Recved := False;
    SetLength(g_task_client.body, 0);
    end;

    procedure QuerySQL(const aSQL: string; aClient: TBlockingTcpClient);
    var
    head: THead;
    headLen, bodyLen, packLen: integer;
    buf, body: TBytes;
    begin
    if aSQL = '' then
    Exit;
    try
    head.cmd := cmd_qry_req;
    headLen := SizeOf(THead);
    bodyLen := Length(aSQL);
    packLen := headLen + bodyLen;
    head.len := packLen;
    head.packNo := 1;
    head.packQty := 1;
    head.ver := 1;
    SetLength(buf, packLen);
    Move(head, buf[0], headLen);
    body := BytesOf(aSQL);
    Move(body[0], buf[headLen], bodyLen);
    aClient.SendBuffer(buf[0], packLen); // 发送请求
    except
    on e: Exception do
    begin
    Log.WriteLog('QuerySQL ' + e.Message);
    end;
    end;
    end;

    function BytesToStream(aBytes: TBytes): TStream;
    begin
    Result := TMemoryStream.Create;
    Result.Write(aBytes[0], length(aBytes));
    Result.Position := 0;
    end;

    function StreamToBytes(aStream: TStream): TBytes;
    begin
    try
    SetLength(Result, aStream.Size);
    aStream.Position := 0;
    aStream.Read(Result[0], aStream.Size);
    finally
    if aStream <> nil then
    aStream.Free;
    end;
    end;

    function ValidHead(AHead: THead): Boolean;
    begin
    Result := (AHead.cmd >= 1) and (AHead.len > SizeOf(THead)) and (AHead.packNo >= 1) and (AHead.packQty >= 1);
    end;

    function GetTask(AContext: TCustomIocpTcpServer.TPerHandleData): PTTask;
    var
    i: Integer;
    begin
    Result := nil;
    if AContext = nil then
    Exit;
    FCS.Enter;
    try
    try
    for i := 0 to g_tmpList.Count - 1 do
    begin
    if g_tmpList.Items[i].context = AContext then
    begin
    Result := g_tmpList.Items[i];
    Exit;
    end;
    end;
    except
    on e: Exception do
    begin
    Log.WriteLog('GetTask ' + e.Message);
    end;
    end;
    finally
    FCS.Leave;
    end;
    end;

    procedure ClientProcessRecved(ARecvThread: TCustomBlockingTcpClient.TRecvThread);
    var
    buf: TBytes;
    head: THead;
    bodyLen: Integer;
    headLen: Integer;
    begin
    headLen := GetHeadLen;
    if ARecvThread.RingBuffer.NoProcessBufLen < headLen then
    Exit;
    ARecvThread.RingBuffer.Peep(head, headLen); // 取包头
    if not ValidHead(head) then // 校验包头
    Exit;
    try
    if head.packQty = 1 then // 一批次只有一个包
    begin
    if ARecvThread.RingBuffer.NoProcessBufLen < head.len then
    Exit;
    InitTaskClient; // 初始化
    bodyLen := head.len - headLen;
    SetLength(g_task_client.body, bodyLen);
    ZeroMemory(g_task_client.body, bodyLen);
    SetLength(buf, head.len);
    ZeroMemory(buf, head.len);
    ARecvThread.RingBuffer.Pop(buf[0], head.len);
    Move(buf[headLen], g_task_client.body[0], bodyLen);
    SetLength(buf, 0);
    g_task_client.Recved := True; // 包都收齐了
    end
    else if head.packQty > 1 then // 一批次有多个包
    begin
    if head.packNo = 1 then // 首包
    begin
    if ARecvThread.RingBuffer.NoProcessBufLen < pack_len then
    Exit;
    InitTaskClient; // 初始化
    SetLength(g_task_client.body, head.len - head.packQty * headLen); // 一次分好缓存
    ZeroMemory(g_task_client.body, head.len - head.packQty * headLen);
    SetLength(buf, pack_len);
    ZeroMemory(buf, pack_len);
    ARecvThread.RingBuffer.Pop(buf[0], pack_len);
    bodyLen := pack_len - headLen;
    Move(buf[headLen], g_task_client.body[0], bodyLen);
    SetLength(buf, 0);
    end
    else if head.packNo = head.packQty then // 尾包
    begin
    if ARecvThread.RingBuffer.NoProcessBufLen < head.len then
    exit;
    SetLength(buf, head.len);
    ZeroMemory(buf, head.len);
    ARecvThread.RingBuffer.Pop(buf[0], head.len);
    bodyLen := pack_len - headLen;
    Move(buf[headLen], g_task_client.body[(head.packNo - 1) * bodyLen], head.len - headLen);
    SetLength(buf, 0);
    g_task_client.Recved := True;
    end
    else
    begin // 夹在首包和尾包中间的包
    if ARecvThread.RingBuffer.NoProcessBufLen < head.len then
    Exit;
    SetLength(buf, pack_len);
    ZeroMemory(buf, pack_len);
    ARecvThread.RingBuffer.Pop(buf[0], pack_len);
    bodyLen := pack_len - headLen;
    Move(buf[headLen], g_task_client.body[(head.packNo - 1) * bodyLen], bodyLen);
    SetLength(buf, 0);
    end;
    end;
    except
    on e: Exception do
    begin
    Log.WriteLog('ClientProcessRecved ' + e.Message);
    end;
    end;
    end;

    procedure ServerProcessRecved(AContext: TCustomIocpTcpServer.TPerHandleData);
    var
    pTask: PTTask;
    buf: TBytes;
    head: THead;
    bodyLen: Integer;
    headLen: Integer;
    begin
    if AContext = nil then
    Exit;
    headLen := GetHeadLen;
    if AContext.RingBuffer.NoProcessBufLen < headLen then // 校验
    Exit;
    AContext.RingBuffer.Peep(head, headLen); // 取包头
    if not ValidHead(head) then // 校验包头
    Exit;
    try
    bodyLen := GetBodyLen; // 包体长
    if head.packQty = 1 then // 一批次只有一个包
    begin
    if AContext.RingBuffer.NoProcessBufLen < head.len then // 校验
    Exit;
    New(pTask); // 新任务
    pTask.context := AContext;
    pTask.cmd := head.cmd;
    pTask.ver := head.ver;
    SetLength(pTask.body, head.len - headLen);
    SetLength(buf, head.len);
    AContext.RingBuffer.Pop(buf[0], head.len); // 包头数据
    Move(buf[headLen], pTask.body[0], head.len - headLen); // 包体数据
    SetLength(buf, 0);
    PostQueuedCompletionStatus(g_iocp_handle, 0, 0, POverlapped(pTask));
    end
    else if head.packQty > 1 then // 一批次有多个包
    begin
    if head.packNo = 1 then // 首包
    begin
    if AContext.RingBuffer.NoProcessBufLen < pack_len then // 校验
    Exit; // 新任务
    New(pTask);
    pTask.context := AContext;
    pTask.cmd := head.cmd;
    pTask.ver := head.ver;
    SetLength(pTask.body, head.len - head.packQty * headLen); // 一次分好缓存
    SetLength(buf, pack_len);
    AContext.RingBuffer.Pop(buf[0], pack_len);
    Move(buf[headLen], pTask.body[0], bodyLen);
    SetLength(buf, 0);
    FCS.Enter;
    g_tmpList.Add(pTask); // 提交临时队列
    end
    else if head.packNo > 1 then // 非首包
    begin
    if AContext.RingBuffer.NoProcessBufLen < head.len then
    Exit;
    pTask := GetTask(AContext);
    if pTask = nil then
    Exit;
    SetLength(buf, head.len);
    AContext.RingBuffer.Pop(buf[0], head.len);
    Move(buf[headLen], pTask.body[(head.packNo - 1) * bodyLen], bodyLen);
    SetLength(buf, 0);
    if head.packNo = head.packQty then // 包都收齐了
    begin
    PostQueuedCompletionStatus(g_iocp_handle, 0, 0, POverlapped(pTask));
    FCS.Enter;
    try
    g_tmpList.Delete(g_tmpList.IndexOf(pTask)); // 从临时队列中删除
    finally
    FCS.Leave;
    end;
    end;
    end;
    end;
    except
    on e: Exception do
    begin
    log.WriteLog('ServerProcessRecved ' + e.Message);
    end;
    end;
    end;

    procedure ServerSendStream(const AStream: TStream; AContext: TCustomIocpTcpServer.TPerHandleData; cmd, ver: Byte);
    var
    buf: TBytes;
    head: THead;
    headLen, bodyLen: integer;
    packQty, i, lastPackLen: Integer;
    begin
    if (AStream = nil) or (AContext = nil) then
    Exit;
    try
    headLen := GetHeadLen;
    bodyLen := GetBodyLen;
    packQty := GetPackQty(AStream);
    lastPackLen := GetLastPackLen(AStream);
    if packQty > 1 then
    begin
    for i := 1 to packQty do // 要分包传输
    begin
    if i = 1 then // 首包
    begin
    head.cmd := cmd;
    head.len := (packQty - 1) * pack_len + lastPackLen; // 总长
    head.packNo := 1;
    head.packQty := packQty;
    head.ver := ver;
    SetLength(buf, pack_len);
    ZeroMemory(buf, pack_len);
    Move(head, buf[0], headLen);
    AStream.Position := 0; // 指针定位
    AStream.Read(buf[headLen], bodyLen);
    AContext.SendBuffer(buf[0], pack_len);
    SetLength(buf, 0);
    end
    else if i = packQty then // 尾包
    begin
    head.cmd := cmd;
    head.len := lastPackLen;
    head.packNo := packQty;
    head.packQty := packQty;
    head.ver := ver;
    SetLength(buf, lastPackLen);
    ZeroMemory(buf, lastPackLen);
    Move(head, buf[0], headLen);
    AStream.Read(buf[headLen], lastPackLen - headLen);
    AContext.SendBuffer(buf[0], head.len);
    SetLength(buf, 0);
    end
    else // 夹在包头和包尾中间的包
    begin
    head.cmd := cmd;
    head.len := pack_len;
    head.packNo := i;
    head.packQty := packQty;
    head.ver := ver;
    SetLength(buf, pack_len);
    ZeroMemory(buf, pack_len);
    Move(head, buf[0], headLen);
    AStream.Read(buf[headLen], bodyLen);
    AContext.SendBuffer(buf[0], head.len);
    SetLength(buf, 0);
    end;
    end;
    end
    else if packQty = 1 then
    begin // 只要传输一包
    head.cmd := cmd;
    head.len := AStream.Size + headLen;
    head.packNo := 1;
    head.packQty := 1;
    head.ver := ver;
    SetLength(buf, head.len);
    ZeroMemory(buf, head.len);
    Move(head, buf[0], headLen);
    AStream.Position := 0;
    AStream.Read(buf[headLen], AStream.Size);
    AContext.SendBuffer(buf[0], head.len);
    SetLength(buf, 0);
    end;
    except
    on e: Exception do
    begin
    Log.WriteLog('uPackage.ServerSendStream ' + e.Message);
    Exit;
    end;
    end;
    end;

    initialization
    FCS := TCriticalSection.Create;

    finalization
    FCS.Free;

    end.

  • 相关阅读:
    python——简单的爬虫
    Python——文件读取与写入
    python—列表集合的交集并集差集
    python—turtle佩奇
    python——append用法
    python——列表平均数
    python回文数的判断
    python输入两个数字比较大小
    python———input()函数
    HTML---3
  • 原文地址:https://www.cnblogs.com/hnxxcxg/p/5899227.html
Copyright © 2011-2022 走看看