zoukankan      html  css  js  c++  java
  • DIOCP之注册编码解码器与ClientContext

    FTcpServer.registerCoderClass(TIOCPStreamDecoder, TIOCPStreamEncoder);//注册编码器与解码器
    FTcpServer.registerContextClass(TMyClientContext);//注册clientcontext

    (*
    * Unit owner: D10.Mofen
    * homePage: http://www.diocp.org
    * blog: http://www.cnblogs.com/dksoft
    *
    * 2015-02-22 08:29:43
    * DIOCP-V5 发布
    *
    * 2015-04-08 12:34:33
    * (感谢 suoler反馈bug和提供bug重现)
    * 异步处理逻辑请求后OnContextAction
    * 当连接已经关闭,但是请求还没有来得及处理,然后连接上下文已经归还到池,这个时候应该放弃处理任务()
    *)
    unit diocp.coder.tcpServer;

    interface

    // call DoContextAction procedure with qworker
    {.$DEFINE QDAC_QWorker}

    {$IFDEF DEBUG}
    {$DEFINE DEBUG_ON}
    {$ENDIF}

    uses
    diocp.tcp.server, utils.buffer, SysUtils, Classes,
    diocp.coder.baseObject, utils.queues, utils.locker
    {$IFDEF QDAC_QWorker}
    , qworker
    {$ELSE}
    , diocp.task
    {$ENDIF}
    ;

    type
    TDiocpCoderTcpServer = class;

    TDiocpCoderSendRequest = class(TIocpSendRequest)
    private
    FMemBlock:PMemoryBlock;
    protected
    procedure ResponseDone; override;
    procedure CancelRequest;override;
    end;

    /// <summary>
    /// 请求任务对象, 用于处理异步任务时,可以对比任务时的信息,用于可以进行取消任务
    /// </summary>
    TDiocpTaskObject = class(TObject)
    private
    FOwner:TDiocpCoderTcpServer;
    /// <summary>
    /// 投递异步之前记录DNA,用于做异步任务时,是否取消当前任务
    /// </summary>
    FContextDNA:Integer;
    // 解码对象
    FData: TObject;
    public
    /// <summary>
    /// 归还到对象池
    /// </summary>
    procedure Close;
    end;

    TIOCPCoderClientContext = class(diocp.tcp.server.TIOCPClientContext)
    private
    /// 是否正在处理请求
    FIsProcessRequesting:Boolean;

    /// 请求的任务队列
    FRequestQueue:TSimpleQueue;

    /// 正在发送的BufferLink
    FCurrentSendBufferLink: TBufferLink;

    // 待发送队列<TBufferLink队列>
    FSendingQueue: TSimpleQueue;

    FRecvBuffers: TBufferLink;
    FStateINfo: String;
    function GetStateINfo: String;

    /// <summary>
    /// 执行一次请求
    /// </summary>
    function DoExecuteRequest(pvTaskObj: TDiocpTaskObject): HRESULT;

    /// <summary>
    /// 清理请求列表中的对象
    /// </summary>
    procedure ClearRequestTaskObject();

    {$IFDEF QDAC_QWorker}
    procedure OnExecuteJob(pvJob:PQJob);
    {$ELSE}
    procedure OnExecuteJob(pvTaskRequest: TIocpTaskRequest);
    {$ENDIF}
    protected
    procedure Add2Buffer(buf:PAnsiChar; len:Cardinal);
    procedure ClearRecvedBuffer;
    function DecodeObject: TObject;
    procedure OnRecvBuffer(buf: Pointer; len: Cardinal; ErrCode: WORD); override;

    procedure RecvBuffer(buf:PAnsiChar; len:Cardinal); virtual;

    procedure DoCleanUp;override;
    protected
    /// <summary>
    /// 从发送队列中取出一个要发送的对象进行发送
    /// </summary>
    procedure CheckStartPostSendBufferLink;

    /// <summary>
    /// 投递完成后,继续投递下一个请求,
    /// 只在HandleResponse中调用
    /// </summary>
    procedure PostNextSendRequest; override;
    public
    constructor Create;override;

    destructor Destroy; override;

    /// <summary>
    /// 接收到一个完整的数据包
    /// </summary>
    /// <param name="pvDataObject"> (TObject) </param>
    procedure DoContextAction(const pvDataObject:TObject); virtual;

    /// <summary>
    /// 回写对象(发送对象会客户端, 会调用解码器进行解码)
    /// </summary>
    /// <param name="pvDataObject"> 要回写的对象 </param>
    procedure WriteObject(const pvDataObject:TObject);

    /// <summary>
    /// received buffer
    /// </summary>
    property Buffers: TBufferLink read FRecvBuffers;

    /// <summary>
    /// 一些状态信息
    /// </summary>
    property StateINfo: String read GetStateINfo write FStateINfo;
    end;

    TOnContextAction = procedure(pvClientContext:TIOCPCoderClientContext;
    pvObject:TObject) of object;

    {$IF RTLVersion>22}
    // thanks: 麦子仲肥19183455
    // vcl for win64
    [ComponentPlatformsAttribute(pidWin32 or pidWin64)]
    {$IFEND}
    TDiocpCoderTcpServer = class(TDiocpTcpServer)
    private
    ///异步任务投递对象池
    FTaskObjectPool: TBaseQueue;

    FInnerEncoder: TIOCPEncoder;
    FInnerDecoder: TIOCPDecoder;

    FEncoder: TIOCPEncoder;
    FDecoder: TIOCPDecoder;
    FLogicWorkerNeedCoInitialize: Boolean;
    FOnContextAction: TOnContextAction;

    function GetTaskObject:TDiocpTaskObject;
    procedure GiveBackTaskObject(pvObj:TDiocpTaskObject);
    public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;
    /// <summary>
    /// 注册编码器和解码器类
    /// </summary>
    procedure RegisterCoderClass(pvDecoderClass:TIOCPDecoderClass;
    pvEncoderClass:TIOCPEncoderClass);

    /// <summary>
    /// register Decoder instance
    /// </summary>
    /// <param name="pvDecoder"> (TIOCPDecoder) </param>
    procedure RegisterDecoder(pvDecoder:TIOCPDecoder);

    /// <summary>
    /// register Encoder instance
    /// </summary>
    /// <param name="pvEncoder"> (TIOCPEncoder) </param>
    procedure RegisterEncoder(pvEncoder:TIOCPEncoder);

    published

    /// <summary>
    /// 处理逻辑线程执行逻辑前执行CoInitalize
    /// </summary>
    property LogicWorkerNeedCoInitialize: Boolean read FLogicWorkerNeedCoInitialize write FLogicWorkerNeedCoInitialize;

    /// <summary>
    /// 收到一个完整的数据包的执行事件(在IocpTask/Qworker线程中触发)
    /// </summary>
    property OnContextAction: TOnContextAction read FOnContextAction write FOnContextAction;
    end;

    implementation

    uses
    utils.safeLogger;

    constructor TIOCPCoderClientContext.Create;
    begin
    inherited Create;
    FSendingQueue := TSimpleQueue.Create();
    FRequestQueue := TSimpleQueue.Create();
    FRecvBuffers := TBufferLink.Create();
    end;

    destructor TIOCPCoderClientContext.Destroy;
    begin
    if IsDebugMode then
    begin
    Assert(FSendingQueue.size = 0);
    end;

    FSendingQueue.Free;
    FRecvBuffers.Free;

    // 清理待处理请求队列
    ClearRequestTaskObject();

    FRequestQueue.Free;
    inherited Destroy;
    end;

    procedure TIOCPCoderClientContext.DoCleanUp;
    begin
    /// 清理当前发送队列
    if FCurrentSendBufferLink <> nil then
    begin
    FCurrentSendBufferLink.Free;
    end;

    // 清理释放待发送队列的BufferLink实例
    FSendingQueue.FreeDataObject;

    // 清理待处理请求队列
    ClearRequestTaskObject;

    // 正在处理
    FIsProcessRequesting := False;

    // 清理已经接收缓存数据
    FRecvBuffers.clearBuffer;
    inherited;
    end;

    procedure TIOCPCoderClientContext.Add2Buffer(buf:PAnsiChar; len:Cardinal);
    begin
    //add to context receivedBuffer
    FRecvBuffers.AddBuffer(buf, len);
    end;

    procedure TIOCPCoderClientContext.CheckStartPostSendBufferLink;
    var
    lvMemBlock:PMemoryBlock;
    lvValidCount, lvDataLen: Integer;
    lvSendRequest:TDiocpCoderSendRequest;
    begin
    lock();
    try
    // 如果当前发送Buffer为nil 则退出
    if FCurrentSendBufferLink = nil then Exit;

    // 获取第一块
    lvMemBlock := FCurrentSendBufferLink.FirstBlock;

    lvValidCount := FCurrentSendBufferLink.validCount;
    if (lvValidCount = 0) or (lvMemBlock = nil) then
    begin
    // 释放当前发送数据对象
    FCurrentSendBufferLink.Free;

    // 如果当前块 没有任何数据, 则获取下一个要发送的BufferLink
    FCurrentSendBufferLink := TBufferLink(FSendingQueue.DeQueue);
    // 如果当前发送Buffer为nil 则退出
    if FCurrentSendBufferLink = nil then Exit;

    // 获取需要发送的一块数据
    lvMemBlock := FCurrentSendBufferLink.FirstBlock;

    lvValidCount := FCurrentSendBufferLink.validCount;
    if (lvValidCount = 0) or (lvMemBlock = nil) then
    begin // 没有需要发送的数据了
    FCurrentSendBufferLink := nil; // 没有数据了, 下次压入时执行释放
    exit;
    end;
    end;
    if lvValidCount > Integer(lvMemBlock.DataLen) then
    begin
    lvDataLen := lvMemBlock.DataLen;
    end else
    begin
    lvDataLen := lvValidCount;
    end;


    finally
    unLock();
    end;

    if lvDataLen > 0 then
    begin
    // 从当前BufferLink中移除内存块
    FCurrentSendBufferLink.RemoveBlock(lvMemBlock);

    lvSendRequest := TDiocpCoderSendRequest(GetSendRequest);
    lvSendRequest.FMemBlock := lvMemBlock;
    lvSendRequest.SetBuffer(lvMemBlock.Memory, lvDataLen, dtNone);
    if InnerPostSendRequestAndCheckStart(lvSendRequest) then
    begin
    // 投递成功 内存块的释放在HandleResponse中
    end else
    begin
    lvSendRequest.UnBindingSendBuffer;
    lvSendRequest.FMemBlock := nil;
    lvSendRequest.CancelRequest;

    /// 释放掉内存块
    FreeMemBlock(lvMemBlock);

    TDiocpCoderTcpServer(FOwner).ReleaseSendRequest(lvSendRequest);
    end;
    end;
    end;

    procedure TIOCPCoderClientContext.ClearRecvedBuffer;
    begin
    if FRecvBuffers.validCount = 0 then
    begin
    FRecvBuffers.clearBuffer;
    end else
    begin
    FRecvBuffers.clearHaveReadBuffer;
    end;
    end;

    procedure TIOCPCoderClientContext.ClearRequestTaskObject;
    var
    lvTask:TDiocpTaskObject;
    lvObj:TObject;
    begin
    self.Lock;
    try
    while True do
    begin
    lvTask := TDiocpTaskObject(FRequestQueue.DeQueue);
    if lvTask = nil then Break;

    lvObj := lvTask.FData;

    // 归还到任务池
    lvTask.Close;
    try
    // 释放解码对象
    if lvObj <> nil then FreeAndNil(lvObj);
    except
    end;
    end;
    finally
    self.UnLock;
    end;


    end;

    procedure TIOCPCoderClientContext.DoContextAction(const pvDataObject:TObject);
    begin

    end;

    function TIOCPCoderClientContext.DoExecuteRequest(pvTaskObj: TDiocpTaskObject):
    HRESULT;
    var
    lvObj:TObject;
    begin
    Result := S_FALSE;
    lvObj := pvTaskObj.FData;
    // 连接已经断开
    if Owner = nil then Exit;

    // 连接已经释放
    if Self = nil then Exit;

    // 已经不是当初投递的连接
    if self.ContextDNA <> pvTaskObj.FContextDNA then Exit;

    if self.LockContext('处理逻辑', Self) then
    try
    try
    // 执行Owner的事件
    if Assigned(TDiocpCoderTcpServer(Owner).FOnContextAction) then
    TDiocpCoderTcpServer(Owner).FOnContextAction(Self, lvObj);
    DoContextAction(lvObj);
    except
    on E:Exception do
    begin
    FOwner.LogMessage('截获处理逻辑异常:' + e.Message);
    end;
    end;
    Result := S_OK;
    finally
    self.UnLockContext('处理逻辑', Self);
    end;
    end;

    function TIOCPCoderClientContext.DecodeObject: TObject;
    begin
    Result := TDiocpCoderTcpServer(Owner).FDecoder.Decode(FRecvBuffers, Self);
    end;

    function TIOCPCoderClientContext.GetStateINfo: String;
    begin
    Result := FStateINfo;
    end;

    procedure TIOCPCoderClientContext.OnRecvBuffer(buf: Pointer; len: Cardinal;
    ErrCode: WORD);
    begin
    RecvBuffer(buf, len);
    end;

    procedure TIOCPCoderClientContext.PostNextSendRequest;
    begin
    inherited PostNextSendRequest;
    CheckStartPostSendBufferLink;
    end;

    {$IFDEF QDAC_QWorker}
    procedure TIOCPCoderClientContext.OnExecuteJob(pvJob: PQJob);
    var
    lvTask:TDiocpTaskObject;
    lvObj:TObject;
    begin
    while (Self.Active) do
    begin
    //取出一个任务
    self.Lock;
    try
    lvTask := TDiocpTaskObject(FRequestQueue.DeQueue);
    if lvTask = nil then
    begin
    FIsProcessRequesting := False;
    Break;
    end;
    finally
    self.UnLock;
    end;

    lvObj := lvTask.FData;
    try
    try
    // 执行任务
    if DoExecuteRequest(lvTask) <> S_OK then
    begin
    Break;
    end;
    except
    end;
    finally
    // 归还到任务池
    lvTask.Close;
    try
    // 释放解码对象
    if lvObj <> nil then FreeAndNil(lvObj);
    except
    end;
    end;
    end;


    end;
    {$ELSE}

    procedure TIOCPCoderClientContext.OnExecuteJob(pvTaskRequest: TIocpTaskRequest);
    var
    lvTask:TDiocpTaskObject;
    lvObj:TObject;
    begin

    while (Self.Active) do
    begin
    //取出一个任务
    self.Lock;
    try
    lvTask := TDiocpTaskObject(FRequestQueue.DeQueue);
    if lvTask = nil then
    begin
    FIsProcessRequesting := False;
    Break;
    end;
    finally
    self.UnLock;
    end;

    lvObj := lvTask.FData;
    try
    try
    // 如果需要执行
    if TDiocpCoderTcpServer(Owner).FLogicWorkerNeedCoInitialize then
    pvTaskRequest.iocpWorker.checkCoInitializeEx();

    // 执行任务
    if DoExecuteRequest(lvTask) <> S_OK then
    begin
    Break;
    end;
    except
    end;
    finally
    // 归还到任务池
    lvTask.Close;
    try
    // 释放解码对象
    if lvObj <> nil then FreeAndNil(lvObj);
    except
    end;
    end;
    end;

    //
    // lvTask := TDiocpTaskObject(pvTaskRequest.TaskData);
    // lvObj := lvTask.FData;
    // try
    // // 连接已经断开
    // if Owner = nil then Exit;
    //
    // // 连接已经释放
    // if Self = nil then Exit;
    //
    // // 已经不是当初投递的连接
    // if self.ContextDNA <> lvTask.FContextDNA then Exit;
    //
    // if self.LockContext('处理逻辑', Self) then
    // try
    // try
    // if TDiocpCoderTcpServer(Owner).FLogicWorkerNeedCoInitialize then
    // pvTaskRequest.iocpWorker.checkCoInitializeEx();
    //
    // // 执行Owner的事件
    // if Assigned(TDiocpCoderTcpServer(Owner).FOnContextAction) then
    // TDiocpCoderTcpServer(Owner).FOnContextAction(Self, lvObj);
    //
    // DoContextAction(lvObj);
    // except
    // on E:Exception do
    // begin
    // FOwner.LogMessage('截获处理逻辑异常:' + e.Message);
    // end;
    // end;
    // finally
    // self.UnLockContext('处理逻辑', Self);
    // end;
    // finally
    // // 归还到任务池
    // lvTask.Close;
    // try
    // // 释放解码对象
    // if lvObj <> nil then FreeAndNil(lvObj);
    // except
    // end;
    // end;
    end;
    {$ENDIF}

    procedure TIOCPCoderClientContext.RecvBuffer(buf:PAnsiChar; len:Cardinal);
    var
    lvTaskObject:TDiocpTaskObject;
    lvDecodeObj:TObject;
    begin
    Add2Buffer(buf, len);

    self.StateINfo := '接收到数据,准备进行解码';

    ////避免一次收到多个包时导致只调用了一次逻辑的处理(DoContextAction);
    /// 2013年9月26日 08:57:20
    /// 感谢群内JOE找到bug。
    while True do
    begin

    //调用注册的解码器<进行解码>
    lvDecodeObj := DecodeObject;
    if Integer(lvDecodeObj) = -1 then
    begin
    /// 错误的包格式, 关闭连接
    DoDisconnect;
    exit;
    end else if lvDecodeObj <> nil then
    begin
    // 借一个任务类
    lvTaskObject := TDiocpCoderTcpServer(Owner).GetTaskObject;
    lvTaskObject.FContextDNA := self.ContextDNA;

    // 任务需要处理的解码对象
    lvTaskObject.FData := lvDecodeObj;
    try
    self.StateINfo := '解码成功,准备调用dataReceived进行逻辑处理';


    // 加入到请求处理队列
    self.Lock;
    try
    FRequestQueue.EnQueue(lvTaskObject);

    if not FIsProcessRequesting then
    begin
    FIsProcessRequesting := true;
    {$IFDEF QDAC_QWorker}
    Workers.Post(OnExecuteJob, FRequestQueue);
    {$ELSE}
    iocpTaskManager.PostATask(OnExecuteJob, FRequestQueue);
    {$ENDIF}
    end;
    finally
    self.UnLock();
    end;

    except
    on E:Exception do
    begin
    Owner.LogMessage('截获投递逻辑处理异常!' + e.Message);

    // 投递异常 归还任务对象
    lvTaskObject.Close;
    end;
    end;
    end else
    begin
    //缓存中没有可以使用的完整数据包,跳出循环
    Break;
    end;
    end;

    //清理缓存<如果没有可用的内存块>清理
    ClearRecvedBuffer;
    end;

    procedure TIOCPCoderClientContext.WriteObject(const pvDataObject:TObject);
    var
    lvOutBuffer:TBufferLink;
    lvStart:Boolean;
    begin
    lvStart := false;
    if not Active then Exit;

    if self.LockContext('WriteObject', Self) then
    try
    //sfLogger.logMessage('进入回写对象[%d]',[Integer(self)], 'BCB_DEBUG');
    lvOutBuffer := TBufferLink.Create;
    try
    TDiocpCoderTcpServer(Owner).FEncoder.Encode(pvDataObject, lvOutBuffer);
    lock();
    try
    if FSendingQueue.size >= TDiocpCoderTcpServer(Owner).MaxSendingQueueSize then
    begin
    raise Exception.Create('Out of MaxSendingQueueSize!!!');
    end;
    FSendingQueue.EnQueue(lvOutBuffer);
    if FCurrentSendBufferLink = nil then
    begin
    FCurrentSendBufferLink := TBufferLink(FSendingQueue.DeQueue);
    lvStart := true;
    end;
    finally
    unLock;
    end;
    except
    lvOutBuffer.Free;
    raise;
    end;

    if lvStart then
    begin
    CheckStartPostSendBufferLink;
    end;
    finally
    self.unLockContext('WriteObject', Self);
    //sfLogger.logMessage('离开回写对象[%d]',[Integer(self)], 'BCB_DEBUG');
    end;
    end;

    constructor TDiocpCoderTcpServer.Create(AOwner: TComponent);
    begin
    inherited Create(AOwner);
    FTaskObjectPool := TBaseQueue.Create();
    FClientContextClass := TIOCPCoderClientContext;

    FIocpSendRequestClass := TDiocpCoderSendRequest;
    end;

    destructor TDiocpCoderTcpServer.Destroy;
    begin
    if FInnerDecoder <> nil then FInnerDecoder.Free;
    if FInnerEncoder <> nil then FInnerEncoder.Free;
    FTaskObjectPool.FreeDataObject;
    FTaskObjectPool.Free;
    inherited Destroy;
    end;

    function TDiocpCoderTcpServer.GetTaskObject: TDiocpTaskObject;
    begin
    Result := TDiocpTaskObject(FTaskObjectPool.DeQueue);
    if Result = nil then
    begin
    Result := TDiocpTaskObject.Create;
    end;
    Result.FContextDNA := 0;
    Result.FData := nil;
    Result.FOwner := Self;
    end;

    procedure TDiocpCoderTcpServer.GiveBackTaskObject(pvObj: TDiocpTaskObject);
    begin
    pvObj.FContextDNA := 0;
    pvObj.FData := nil;
    pvObj.FOwner := nil;
    FTaskObjectPool.EnQueue(pvObj);
    end;

    procedure TDiocpCoderTcpServer.RegisterCoderClass(
    pvDecoderClass:TIOCPDecoderClass; pvEncoderClass:TIOCPEncoderClass);
    begin
    if FInnerDecoder <> nil then
    begin
    raise Exception.Create('已经注册了解码器类');
    end;

    FInnerDecoder := pvDecoderClass.Create;
    RegisterDecoder(FInnerDecoder);

    if FInnerEncoder <> nil then
    begin
    raise Exception.Create('已经注册了编码器类');
    end;
    FInnerEncoder := pvEncoderClass.Create;
    RegisterEncoder(FInnerEncoder);
    end;

    { TDiocpCoderTcpServer }

    procedure TDiocpCoderTcpServer.RegisterDecoder(pvDecoder:TIOCPDecoder);
    begin
    FDecoder := pvDecoder;
    end;

    procedure TDiocpCoderTcpServer.RegisterEncoder(pvEncoder:TIOCPEncoder);
    begin
    FEncoder := pvEncoder;
    end;

    { TDiocpCoderSendRequest }

    procedure TDiocpCoderSendRequest.CancelRequest;
    begin
    if FMemBlock <> nil then
    begin
    FreeMemBlock(FMemBlock);
    FMemBlock := nil;
    end;
    inherited;
    end;

    procedure TDiocpCoderSendRequest.ResponseDone;
    begin
    if FMemBlock <> nil then
    begin
    FreeMemBlock(FMemBlock);
    FMemBlock := nil;
    end;
    inherited;
    end;

    { TDiocpTaskObject }

    procedure TDiocpTaskObject.Close;
    begin
    Assert(FOwner <> nil, '归还重复!');
    FOwner.GiveBackTaskObject(Self);
    end;

    end.

  • 相关阅读:
    HTTP 常见状态码
    SpringMVC 入门
    Maven 整合SSH框架
    SSH 框架整合总结
    Maven 整合SSH框架之pom.xml
    Maven 入门
    WebService 综合案例
    CXF 框架
    jQuery高级
    JavaScript补充:BOM(浏览器对象模型)
  • 原文地址:https://www.cnblogs.com/diocp/p/5843563.html
Copyright © 2011-2022 走看看