zoukankan      html  css  js  c++  java
  • diocp_tcp_client单元源码与注释

    (*
    * Unit owner: d10.天地弦
    * blog: http://www.cnblogs.com/dksoft
    * homePage: www.diocp.org
    *
    * 2015-02-22 08:29:43
    * DIOCP-V5 发布
    *
    * 1. 修复ex.tcpclient编码问题,发送大数据时,无法解码的bug
    * 2015-08-17 14:25:56
    *)
    unit diocp_tcp_client;

    {$I 'diocp.inc'}

    interface


    uses
    diocp_sockets, SysUtils, diocp_sockets_utils
    {$IFDEF UNICODE}, Generics.Collections{$ELSE}, Contnrs {$ENDIF}
    , Classes, Windows, utils_objectPool, diocp_res
    , diocp_core_rawWinSocket
    , utils_async
    , utils_fileWriter
    , utils_threadinfo
    , utils_buffer, utils_queues, SyncObjs;

    type
    TIocpRemoteContext = class(TDiocpCustomContext)
    private
    FLastDisconnectTime:Cardinal;
    FIsConnecting: Boolean;
    FBindingHandle:THandle;

    FAutoReConnect: Boolean;
    FConnectExRequest: TIocpConnectExRequest;

    FHost: String;
    FOnASyncCycle: TNotifyContextEvent;
    FPort: Integer;
    function PostConnectRequest: Boolean;
    procedure ReCreateSocket;
    function CanAutoReConnect:Boolean;
    procedure CheckDestroyBindingHandle;
    protected
    procedure OnConnecteExResponse(pvObject:TObject);

    procedure OnDisconnected; override;

    procedure OnConnected; override;

    procedure SetSocketState(pvState:TSocketState); override;

    procedure OnRecvBuffer(buf: Pointer; len: Cardinal; ErrCode: WORD); override;

    public
    /// <summary>
    /// 进行重连连接,
    /// 如果符合要求
    /// 1. 已经断线
    /// 则进行重连
    /// </summary>
    procedure CheckDoReConnect;

    constructor Create; override;
    destructor Destroy; override;
    /// <summary>
    /// 阻塞方式建立连接
    /// 连接状态变化: ssDisconnected -> ssConnected/ssDisconnected
    /// </summary>
    procedure Connect; overload;

    procedure Connect(pvTimeOut:Integer); overload;

    /// <summary>
    /// 请求异步连接
    /// 连接状态变化: ssDisconnected -> ssConnecting -> ssConnected/ssDisconnected
    /// </summary>
    procedure ConnectASync;

    /// <summary>
    /// 设置该连接对象的自动重连属性
    /// true:允许自动重连
    /// </summary>
    property AutoReConnect: Boolean read FAutoReConnect write FAutoReConnect;

    /// <summary>
    /// 由ASync线程, 循环执行
    /// </summary>
    property OnASyncCycle: TNotifyContextEvent read FOnASyncCycle write FOnASyncCycle;

    property Host: String read FHost write FHost;

    property Port: Integer read FPort write FPort;

    end;

    TDiocpExRemoteContext = class(TIocpRemoteContext)
    private
    FOnBufferAction: TOnContextBufferNotifyEvent;
    protected
    FCacheBuffer: TBufferLink;
    FEndBuffer: array [0..254] of Byte;
    FEndBufferLen: Byte;
    FStartBuffer: array [0..254] of Byte;
    FStartBufferLen: Byte;

    procedure DoCleanUp; override;
    procedure OnRecvBuffer(buf: Pointer; len: Cardinal; ErrCode: WORD); override;
    public
    constructor Create; override;
    destructor Destroy; override;
    procedure SetEnd(pvBuffer:Pointer; pvBufferLen:Byte);
    procedure SetStart(pvBuffer:Pointer; pvBufferLen:Byte);
    property OnBufferAction: TOnContextBufferNotifyEvent read FOnBufferAction write FOnBufferAction;
    end;

    /// <summary>
    /// 注意
    /// Add, ClearContexts, 对列表进行写入,没有对列表进行线程安全处理
    /// Find, CheckContext,Items函数也没有对列表进行锁定
    /// 所以,最好在开始之前对列表进行处理列表. 在停止后对列表进行ClearContexts
    /// </summary>
    TDiocpTcpClient = class(TDiocpCustom)
    private
    function GetCount: Integer;
    function GetItems(pvIndex: Integer): TIocpRemoteContext;
    private
    FDisableAutoConnect: Boolean;
    FAutoConnectTick:Cardinal;

    private
    /// <summary>
    /// 检测使用重新连接 ,单线程使用,仅供DoAutoReconnect调用
    /// 间隔最少5秒以上
    /// </summary>
    procedure DoAutoReconnect(pvASyncWorker:TASyncWorker);

    /// <summary>
    /// 检测使用重新连接 ,单线程使用,仅供DoASyncCycle调用
    /// 间隔最少5秒以上
    /// </summary>
    procedure DoASyncCycle(pvASyncWorker:TASyncWorker);
    protected
    procedure DoASyncWork(pvFileWritter: TSingleFileWriter; pvASyncWorker:
    TASyncWorker); override;
    procedure SetDisableAutoConnect(const Value: Boolean);
    private
    {$IFDEF UNICODE}
    FList: TObjectList<TIocpRemoteContext>;
    {$ELSE}
    FList: TObjectList;
    {$ENDIF}
    FListLocker: TCriticalSection;
    protected
    procedure DoAfterOpen;override;
    procedure DoAfterClose; override;
    public
    constructor Create(AOwner: TComponent); override;
    destructor Destroy; override;
    public
    /// <summary>
    /// 清理Add创建的所有连接
    /// </summary>
    procedure ClearContexts;

    /// <summary>
    /// 添加一个连对象
    /// </summary>
    function Add: TIocpRemoteContext;

    /// <summary>
    /// pvContext是否是当前列表中的对象
    /// nil:不是
    /// </summary>
    function CheckContext(pvContext:TObject): TIocpRemoteContext;


    function GetStateInfo: String;

    /// <summary>
    /// 总的连接对象数量
    /// </summary>
    property Count: Integer read GetCount;

    /// <summary>
    /// 禁止所有连接对象自动重连
    /// </summary>
    property DisableAutoConnect: Boolean read FDisableAutoConnect write
    SetDisableAutoConnect;

    /// <summary>
    /// 通过位置索引获取其中的一个连接
    /// </summary>
    property Items[pvIndex: Integer]: TIocpRemoteContext read GetItems; default;

    end;

    implementation

    uses
    utils_safeLogger, diocp_winapi_winsock2, diocp_core_engine;

    resourcestring
    strCannotConnect = '当前状态下不能进行连接...';
    strConnectError = '建立连接失败, 错误代码:%d';
    strConnectTimeOut= '建立连接超时';


    const
    // 重连间隔,避免连接过快,导致OnDisconnected还没有处理完成, 1秒
    RECONNECT_INTERVAL = 1000;


    /// <summary>
    /// 计算两个TickCount时间差,避免超出49天后,溢出
    /// 感谢 [佛山]沧海一笑 7041779 提供
    /// copy自 qsl代码
    /// </summary>
    function tick_diff(tick_start, tick_end: Cardinal): Cardinal;
    begin
    if tick_end >= tick_start then
    result := tick_end - tick_start
    else
    result := High(Cardinal) - tick_start + tick_end;
    end;

    constructor TIocpRemoteContext.Create;
    begin
    inherited Create;
    FAutoReConnect := False;
    FConnectExRequest := TIocpConnectExRequest.Create(Self);
    FConnectExRequest.OnResponse := OnConnecteExResponse;
    FIsConnecting := false;
    end;

    destructor TIocpRemoteContext.Destroy;
    begin
    CheckDestroyBindingHandle;
    FreeAndNil(FConnectExRequest);
    inherited Destroy;
    end;

    function TIocpRemoteContext.CanAutoReConnect: Boolean;
    begin
    Result := FAutoReConnect and (Owner.Active) and (not TDiocpTcpClient(Owner).DisableAutoConnect);
    end;

    procedure TIocpRemoteContext.CheckDestroyBindingHandle;
    begin
    // 会出现异常
    // if (FBindingHandle = 0) or (FBindingHandle = INVALID_SOCKET) then Exit;
    // CloseHandle(FBindingHandle);
    // FBindingHandle := 0;
    end;

    procedure TIocpRemoteContext.CheckDoReConnect;
    begin
    if not (SocketState in [ssConnecting, ssConnected]) then
    begin
    if Owner.Active then
    begin
    AddDebugStrings('*(*)执行重连请求!');
    ConnectASync;
    end else
    begin
    AddDebugStrings('*(*)CheckDoReConnect::Check Owner is deactive!');
    end;
    end;
    end;

    procedure TIocpRemoteContext.Connect;
    var
    lvRemoteIP:String;
    begin
    if not Owner.Active then raise Exception.CreateFmt(strEngineIsOff, [Owner.Name]);

    if SocketState <> ssDisconnected then raise Exception.Create(strCannotConnect);

    ReCreateSocket;

    try
    lvRemoteIP := RawSocket.GetIpAddrByName(FHost);
    except
    lvRemoteIP := FHost;
    end;

    if not RawSocket.connect(lvRemoteIP, FPort) then
    RaiseLastOSError;

    DoConnected;
    end;

    procedure TIocpRemoteContext.Connect(pvTimeOut: Integer);
    var
    lvRemoteIP:String;
    begin
    if not Owner.Active then raise Exception.CreateFmt(strEngineIsOff, [Owner.Name]);

    if SocketState <> ssDisconnected then raise Exception.Create(strCannotConnect);

    ReCreateSocket;

    try
    lvRemoteIP := RawSocket.GetIpAddrByName(FHost);
    except
    lvRemoteIP := FHost;
    end;

    if not RawSocket.ConnectTimeOut(lvRemoteIP, FPort, pvTimeOut) then
    begin
    raise Exception.Create(strConnectTimeOut);
    end;

    DoConnected;

    end;

    procedure TIocpRemoteContext.ConnectASync;
    begin
    if not Owner.Active then raise Exception.CreateFmt(strEngineIsOff, [Owner.Name]);

    if SocketState <> ssDisconnected then raise Exception.Create(strCannotConnect);

    ReCreateSocket;

    PostConnectRequest;

    end;

    procedure TIocpRemoteContext.OnConnected;
    begin
    inherited;
    // 重置断开时间
    FLastDisconnectTime := 0;
    end;

    procedure TIocpRemoteContext.OnConnecteExResponse(pvObject: TObject);
    begin
    try
    FIsConnecting := false;
    if TIocpConnectExRequest(pvObject).ErrorCode = 0 then
    begin
    DoConnected;
    end else
    begin
    {$IFDEF DEBUG_ON}
    Owner.logMessage(strConnectError, [TIocpConnectExRequest(pvObject).ErrorCode]);
    {$ENDIF}

    DoError(TIocpConnectExRequest(pvObject).ErrorCode);

    SetSocketState(ssDisconnected);
    end;
    finally
    if Owner <> nil then Owner.DecRefCounter;
    end;
    end;

    procedure TIocpRemoteContext.OnDisconnected;
    begin
    inherited;
    end;

    procedure TIocpRemoteContext.OnRecvBuffer(buf: Pointer; len: Cardinal;
    ErrCode: WORD);
    begin
    inherited;

    end;

    function TIocpRemoteContext.PostConnectRequest: Boolean;
    begin
    Result := False;
    if FHost = '' then
    begin
    raise Exception.Create('请指定要建立连接的IP和端口信息!');
    end;

    if Owner <> nil then Owner.IncRefCounter;
    try
    if lock_cmp_exchange(False, True, FIsConnecting) = False then
    begin
    if RawSocket.SocketHandle = INVALID_SOCKET then
    begin
    ReCreateSocket;
    end;


    if not FConnectExRequest.PostRequest(FHost, FPort) then
    begin
    FIsConnecting := false;
    end else
    begin
    Result := True;
    end;
    end else
    begin
    sfLogger.logMessage('TIocpRemoteContext.PostConnectRequest:: 正在进行连接...');
    end;
    finally
    if not Result then
    begin
    if Owner <> nil then Owner.DecRefCounter;
    end;
    end;

    end;

    procedure TIocpRemoteContext.ReCreateSocket;
    begin
    RawSocket.CreateTcpOverlappedSocket;
    if not RawSocket.bind('0.0.0.0', 0) then
    begin
    RaiseLastOSError;
    end;
    CheckDestroyBindingHandle;
    FBindingHandle := Owner.IocpEngine.IocpCore.Bind2IOCPHandle(RawSocket.SocketHandle, 0);
    end;

    procedure TIocpRemoteContext.SetSocketState(pvState: TSocketState);
    begin
    inherited SetSocketState(pvState);
    if pvState = ssDisconnected then
    begin
    // 记录最后断开时间
    FLastDisconnectTime := GetTickCount;
    end;
    end;

    procedure TDiocpTcpClient.ClearContexts;
    begin
    FListLocker.Enter;
    try
    FList.Clear;
    finally
    FListLocker.Leave;
    end;
    end;

    constructor TDiocpTcpClient.Create(AOwner: TComponent);
    begin
    inherited Create(AOwner);
    {$IFDEF UNICODE}
    FList := TObjectList<TIocpRemoteContext>.Create();
    {$ELSE}
    FList := TObjectList.Create();
    {$ENDIF}

    FListLocker := TCriticalSection.Create;

    FContextClass := TIocpRemoteContext;

    // 自动重连
    SetDisableAutoConnect(False);
    end;

    destructor TDiocpTcpClient.Destroy;
    begin
    Close;
    FList.Clear;
    FList.Free;
    FListLocker.Free;
    inherited Destroy;
    end;

    procedure TDiocpTcpClient.DoAfterOpen;
    begin
    inherited;

    end;

    procedure TDiocpTcpClient.DoAfterClose;
    begin
    inherited;

    end;

    procedure TDiocpTcpClient.DoAutoReconnect(pvASyncWorker:TASyncWorker);
    var
    i: Integer;
    lvContext:TIocpRemoteContext;
    begin
    FListLocker.Enter;
    try
    for i := 0 to FList.Count - 1 do
    begin
    if pvASyncWorker.Terminated then Break;

    lvContext := TIocpRemoteContext(FList[i]);
    if lvContext.FAutoReConnect and lvContext.CheckActivityTimeOut(10000) then
    begin
    lvContext.CheckDoReConnect;
    end;
    end;
    finally
    FListLocker.Leave;
    end;
    end;

    function TDiocpTcpClient.Add: TIocpRemoteContext;
    begin
    FListLocker.Enter;
    try
    if FContextClass = nil then
    begin
    Result := TIocpRemoteContext.Create;
    end else
    begin
    Result := TIocpRemoteContext(FContextClass.Create());
    end;
    Result.Owner := Self;
    FList.Add(Result);
    finally
    FListLocker.Leave;
    end;
    end;

    function TDiocpTcpClient.CheckContext(pvContext:TObject): TIocpRemoteContext;
    begin
    if FList.IndexOf(TIocpRemoteContext(pvContext)) = -1 then
    Result := nil
    else
    Result := TIocpRemoteContext(pvContext);
    end;

    function TDiocpTcpClient.GetCount: Integer;
    begin
    Result := FList.Count;
    end;

    function TDiocpTcpClient.GetItems(pvIndex: Integer): TIocpRemoteContext;
    begin
    {$IFDEF UNICODE}
    Result := FList[pvIndex];
    {$ELSE}
    Result := TIocpRemoteContext(FList[pvIndex]);
    {$ENDIF}

    end;

    function TDiocpTcpClient.GetStateInfo: String;
    var
    lvStrings:TStrings;
    begin
    Result := '';
    if DataMoniter = nil then Exit;

    lvStrings := TStringList.Create;
    try
    if Active then
    begin
    lvStrings.Add(strState_Active);
    end else
    begin
    lvStrings.Add(strState_Off);
    end;


    lvStrings.Add(Format(strRecv_PostInfo,
    [
    DataMoniter.PostWSARecvCounter,
    DataMoniter.ResponseWSARecvCounter,
    DataMoniter.PostWSARecvCounter -
    DataMoniter.ResponseWSARecvCounter,
    DataMoniter.Speed_WSARecvResponse
    ]
    ));


    lvStrings.Add(Format(strRecv_SizeInfo, [TransByteSize(DataMoniter.RecvSize)]));


    lvStrings.Add(Format(strSend_Info,
    [
    DataMoniter.PostWSASendCounter,
    DataMoniter.ResponseWSASendCounter,
    DataMoniter.PostWSASendCounter -
    DataMoniter.ResponseWSASendCounter,
    DataMoniter.Speed_WSASendResponse
    ]
    ));

    lvStrings.Add(Format(strSendRequest_Info,
    [
    DataMoniter.SendRequestCreateCounter,
    DataMoniter.SendRequestOutCounter,
    DataMoniter.SendRequestReturnCounter
    ]
    ));

    lvStrings.Add(Format(strSendQueue_Info,
    [
    DataMoniter.PushSendQueueCounter,
    DataMoniter.PostSendObjectCounter,
    DataMoniter.ResponseSendObjectCounter,
    DataMoniter.SendRequestAbortCounter
    ]
    ));

    lvStrings.Add(Format(strSend_SizeInfo, [TransByteSize(DataMoniter.SentSize)]));

    lvStrings.Add(Format(strOnline_Info, [OnlineContextCount, DataMoniter.MaxOnlineCount]));

    lvStrings.Add(Format(strWorkers_Info, [WorkerCount]));

    lvStrings.Add(Format(strRunTime_Info, [GetRunTimeINfo]));

    Result := lvStrings.Text;
    finally
    lvStrings.Free;
    end;
    end;

    procedure TDiocpTcpClient.DoASyncWork(pvFileWritter: TSingleFileWriter;
    pvASyncWorker: TASyncWorker);
    begin
    if tick_diff(FAutoConnectTick, GetTickCount) > 5000 then
    begin
    if not self.DisableAutoConnect then
    begin
    DoAutoReconnect(pvASyncWorker);
    end;
    DoASyncCycle(pvASyncWorker);
    FAutoConnectTick := GetTickCount;
    end;
    end;

    procedure TDiocpTcpClient.DoASyncCycle(pvASyncWorker:TASyncWorker);
    var
    i: Integer;
    lvContext:TIocpRemoteContext;
    begin
    FListLocker.Enter;
    try
    for i := 0 to FList.Count - 1 do
    begin
    if pvASyncWorker.Terminated then Break;

    lvContext := TIocpRemoteContext(FList[i]);
    if Assigned(lvContext.FOnASyncCycle) then
    begin
    lvContext.FOnASyncCycle(lvContext);
    end;
    end;
    finally
    FListLocker.Leave;
    end;
    end;

    procedure TDiocpTcpClient.SetDisableAutoConnect(const Value: Boolean);
    begin
    if Value <> FDisableAutoConnect then
    begin
    FDisableAutoConnect := Value;
    end;
    end;


    constructor TDiocpExRemoteContext.Create;
    begin
    inherited Create;
    FCacheBuffer := TBufferLink.Create();
    end;

    destructor TDiocpExRemoteContext.Destroy;
    begin
    FreeAndNil(FCacheBuffer);
    inherited Destroy;
    end;

    procedure TDiocpExRemoteContext.DoCleanUp;
    begin
    inherited;
    FCacheBuffer.clearBuffer;
    end;

    procedure TDiocpExRemoteContext.OnRecvBuffer(buf: Pointer; len: Cardinal;
    ErrCode: WORD);
    var
    j:Integer;
    lvBuffer:array of byte;
    begin
    FCacheBuffer.AddBuffer(buf, len);
    while FCacheBuffer.validCount > 0 do
    begin
    // 标记读取的开始位置,如果数据不够,进行恢复,以便下一次解码
    FCacheBuffer.markReaderIndex;

    if FStartBufferLen > 0 then
    begin
    // 不够数据,跳出
    if FCacheBuffer.validCount < FStartBufferLen + FEndBufferLen then Break;

    j := FCacheBuffer.SearchBuffer(@FStartBuffer[0], FStartBufferLen);
    if j = -1 then
    begin // 没有搜索到开始标志
    FCacheBuffer.clearBuffer();
    Exit;
    end else
    begin
    FCacheBuffer.restoreReaderIndex;

    // 跳过开头标志
    FCacheBuffer.Skip(j + FStartBufferLen);
    end;
    end;

    // 不够数据,跳出
    if FCacheBuffer.validCount < FEndBufferLen then Break;

    j := FCacheBuffer.SearchBuffer(@FEndBuffer[0], FEndBufferLen);
    if j <> -1 then
    begin
    SetLength(lvBuffer, j);
    FCacheBuffer.readBuffer(@lvBuffer[0], j);
    if Assigned(FOnBufferAction) then
    begin
    FOnBufferAction(Self, @lvBuffer[0], j);
    end;
    FCacheBuffer.Skip(FEndBufferLen);
    end else
    begin // 没有结束符
    FCacheBuffer.restoreReaderIndex;
    Break;
    end;
    end;
    FCacheBuffer.clearHaveReadBuffer();
    end;

    procedure TDiocpExRemoteContext.SetEnd(pvBuffer:Pointer; pvBufferLen:Byte);
    begin
    Move(pvBuffer^, FEndBuffer[0], pvBufferLen);
    FEndBufferLen := pvBufferLen;
    end;

    procedure TDiocpExRemoteContext.SetStart(pvBuffer:Pointer; pvBufferLen:Byte);
    begin
    Move(pvBuffer^, FStartBuffer[0], pvBufferLen);
    FStartBufferLen := pvBufferLen;
    end;

    end.

  • 相关阅读:
    【Redis】集群NetCore实战
    【Redis】集群教程(Windows)
    【Redis】入门
    【SQL SERVER】索引
    【SQL SERVER】锁机制
    【SQL SERVER】数据内部存储结构简单探索
    Windows软件包管理工具
    Git常用命令记录
    【ASP.NET Core学习】远程过程调用
    CouchDB学习-API
  • 原文地址:https://www.cnblogs.com/diocp/p/5847331.html
Copyright © 2011-2022 走看看