zoukankan      html  css  js  c++  java
  • Delphi编写系统服务:完成端口演示

     在开发大量Socket并发服务器,完成端口加重叠I/O是迄今为止最好的一种解决方案,下面是简单的介绍:
       “完成端口”模型是迄今为止最为复杂的一种I/O模型,特别适合需要同时管理为数众多的套接字,采用这种模型,往往可以达到最佳的系统性能。但是只适合Windows NT和Windows 2000及以上操作系统。因其设计的复杂性,只有在你的应用程序需要同时管理数百乃至上千套接字的时候,而且希望随着系统内安装的CPU数量增多,应用程序的性能也可以线性提升,才考虑采用“完成端口”模型。
        重叠I/O(Overlapped I/O)模型使应用程序达到更佳的系统性能。重叠模型的基本设计原理便是让应用程序使用一个重叠的数据结构,一次投递一个或多个Winsock I/O请求。针对哪些提交的请求,在它们完成之后,应用程序可为它们提供服务。该模型适用于除Windows CE之外的各种Windows平台。
        开发完成端口最具有挑战是线程个数和管理内存,创建一个完成端口后,就需要创建一个或多个“工作者线程”,以便在I/O请求投递给完成端口对象后,为完成端口提供服务。但是到底应创建多少个线程,这实际正是完成端口最为复杂的一个方面,一般采用的是为每一个CPU分配一个线程(有的是CPU个数加1,有的是CPU*2的线程个数)。内存分配效率低是因为应用程序在分配内存的时候,系统内核需要不停的Lock/UnLock,而且在多CPU的情况下,会成为整个程序性能的瓶颈,不能随CPU的个数增加而性能提高,一种比较好的做法一个一次分配多块内存。
        下面是我写一个的完成端口的演示程序,在我的电脑上测试可以达到链接5100个客服端,服务器性能还很好,由于我写的客服端占用资源比较的,最后直接重启了,具体见代码。演示程序主要的瓶颈在于发消息的这一块,在实际应用中应去掉。

    (配置:操作系统 Microsoft Windows XP Professional 操作系统 Service Pack 版本 Service Pack 2;CPU:Intel(R) Pentium(R)4 2.40GHz 2.40GHz;内存:2G;主板:华硕P4P800)。

    主要源代码:(Delphi 7编写),下载地址:http://download.csdn.net/source/818039

    {*******************************************************}
    {                                                       }
    {       高性能服务器,这个是一个演示DEMO                }
    {                                                       }
    {       联系邮箱:fansheng_hx@163.com                   }
    {                                                       }
    {*******************************************************}

    unit IOCPSvr;

    interface

    uses
     Windows, Messages, WinSock2, Classes, SysUtils, SyncObjs;

    const
      {* 每一次发送和接收的数据缓冲池大小 *}
      MAX_BUFSIZE = 4096;
      {* 关闭客户端通知消息 *}
      WM_CLIENTSOCKET = WM_USER + $2000;

    type
      {* Windows Socket 消息 *}
      TCMSocketMessage = packed record
        Msg: Cardinal;
        Socket: TSocket;
        SelectEvent: Word;
        SelectError: Word;
        Result: Longint;
      end;

      {* IOCP服务器运行轨迹 *}
      TSocketEvent = (seInitIOPort, seUninitIOPort, seInitThread, seUninitThread,
        seInitSocket, seUninitSocket, seConnect, seDisconnect, seListen, seAccept, seWrite, seRead);
    const
      CSSocketEvent: array[TSocketEvent] of string = ('InitIOPort', 'UninitIOPort', 'InitThread', 'UninitThread',
        'InitSocket', 'UninitSocket', 'Connect', 'Disconnect', 'Listen', 'Accept', 'Write', 'Read');
    type
      {* 产生错误类型 *}
      TErrorEvent = (eeGeneral, eeSend, eeReceive, eeConnect, eeDisconnect, eeAccept);

      {* 完成端口传递的结构体 *}
      TIOCPStruct = packed record
        Overlapped: OVERLAPPED;
        wsaBuffer: TWSABUF;
        Event: TSocketEvent; //读或写
        Buffer: array [0..MAX_BUFSIZE - 1] of Char;
        Assigned: Boolean;  //表示已经分配给某个客户端
        Active: Boolean;    //客服端内部使用,表示是否正在使用
      end;
      PIOCPStruct = ^TIOCPStruct;

      EMemoryBuffer = class(Exception);
      ESocketError = class(Exception);

      TMemoryManager = class;
      TServerSocket = class;
      TSymmetricalSocket = class;

      TMemoryManager = class
      private
        {* 管理内存使用 *}
        FList: TList;
        {* 分配和释放时候使用的锁 *}
        FLock: TCriticalSection;
        {* 服务器 *}
        FServerSocket: TServerSocket;
        function GetCount: Integer;
        function GetIOCPStruct(AIndex: Integer): PIOCPStruct;
      public
        constructor Create(AServerSocket: TServerSocket; ACount: Integer); overload;
        constructor Create(AServerSocket: TServerSocket); overload;
        destructor Destroy; override;

        {* 分配内存使用权 *}
        function Allocate: PIOCPStruct;
        {* 释放内存使用权 *}
        procedure Release(AValue: PIOCPStruct);
        property Server: TServerSocket read FServerSocket;
        property Count: Integer read GetCount;
        property Item[AIndex: Integer]: PIOCPStruct read GetIOCPStruct;
      end;

      {* 客服端链接服务器触发此事件,如果要拒绝链接,把AConnect := False *}
      TOnBeforeConnect = procedure(ASymmIP: string; AConnect: Boolean) of object;
      {* 链接完成之后触发此事件 *}
      TOnAfterConnect = procedure(ASymmetricalSocket: TSymmetricalSocket) of object;
      {* 断开连接触发事件 *}
      TOnAfterDisconnect = procedure(ASymmetricalSocket: TSymmetricalSocket) of object;
      {* 收到数据会触发此事件 *}
      TOnDataEvent = procedure(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer;
        ACount: Integer) of object;
      {* 错误触发事件 *}
      TOnErrorEvent = procedure(AError: Integer; AErrorString: string; AInfo: string; var AHandleError: Boolean) of object;
      {* 服务器运行LOG *}
      TOnLog = procedure (ASocketEvent: TSocketEvent; AInfo: string) of object;

      {* 服务器,负责建立完成端口,管理内存和管理客服端,及Socket消息循环 *}
      TServerSocket = class
      private
        {* 内存管理 *}
        FMemory: TMemoryManager;
        {* 端口 *}
        FPort: Integer;
        {* 套接字 *}
        FSocket: TSocket;
        {* 完成端口句柄 *}
        FIOCPHandle: THandle;
        {* 消息循环句柄 *}
        FHandle: THandle;
        {* 对等的客服端 *}
        FClients: TList;
        {* 服务器运行线程 *}
        FThreads: TList;
        {* 监听线程 *}
        FAcceptThread: TThread;
        {* 表示是否激活 *}
        FActive: Boolean;
        {* 锁 *}
        FLock: TCriticalSection;
        {* 错误触发事件 *}
        FOnError: TOnErrorEvent;
        {* 书写LOG *}
        FOnLog: TOnLog;
        {* 接收连接事件 *}
        FOnBeforeConnect: TOnBeforeConnect;
        {* 连接成功之后的事件 *}
        FOnAfterConnect: TOnAfterConnect;
        {* 断开连接事件 *}
        FOnAfterDisconnect: TOnAfterDisconnect;
        {* 接收数据 *}
        FOnRead: TOnDataEvent;

        procedure WndProc(var AMsg: TMessage);
        {* 激活 *}
        procedure Open;
        {* 关闭 *}
        procedure Close;
        {* 设置激活/关闭 *}
        procedure SetActive(AValue: Boolean);
        {* 触发错误 *}
        function CheckError(AErrorCode: Integer = -1; AInfo: string = ''): Boolean;
        {* 触发LOG *}
        procedure DoLog(ASocketEvent: TSocketEvent; AInfo: string = '');
        {* 设置端口 *}
        procedure SetPort(AValue: Integer);
        {* 注册一个客服端,由于在另外一个线程中调用,需要加锁 *}
        procedure RegisterClient(ASocket: TSymmetricalSocket);
        {* 反注册一个客服端,由于在另外一个线程中调用,需要加锁 *}
        procedure UnRegisterClient(ASocket: TSymmetricalSocket);
        {* 通过Socket句柄查找对等的TSymmetricalSocket *}
        function FindSymmClient(ASocket: TSocket): TSymmetricalSocket;
        {* 客服端关闭消息 *}
        procedure WMClientClose(var AMsg: TCMSocketMessage); message WM_CLIENTSOCKET;
        {* 连接时触发的事件 *}
        function DoConnect(ASocket: TSocket): Boolean;
        {* 连接完成之后触发事件 *}
        procedure DoAfterConnect(ASymSocket: TSymmetricalSocket);
        {* 连接断开触发事件 *}
        procedure DoDisConnect(ASymSocket: TSymmetricalSocket);
        {* 接收数据触发的事件 *}
        procedure DoRead(ASymmetricalSocket: TSymmetricalSocket; AData: Pointer;
          ACount: Integer);
        {* 获得客服端个数 *}
        function GetClientCount: Integer;
        function GetClient(const AIndex: Integer): TSymmetricalSocket;
      public
        constructor Create;
        destructor Destroy; override;
        {* 接收一个客服端,被接收线程调用 *}
        procedure AcceptClient;

        property Port: Integer read FPort write SetPort;
        property Socket: TSocket read FSocket;
        property Handle: THandle read FHandle;
        property Active: Boolean read FActive write SetActive;
        property MemoryManager: TMemoryManager read FMemory;
        {* 事件 *}
        property OnError: TOnErrorEvent read FOnError write FOnError;
        property OnLog: TOnLog read FOnLog write FOnLog;
        property OnRead: TOnDataEvent read FOnRead write FOnRead;
        property OnBeforeConnect: TOnBeforeConnect read FOnBeforeConnect write FOnBeforeConnect;
        property OnAfterConnect: TOnAfterConnect read FOnAfterConnect write FOnAfterConnect;
        property OnAfterDisConnect: TOnAfterDisconnect read FOnAfterDisconnect write FOnAfterDisconnect;
        property ClientCount: Integer read GetClientCount;
        property Client[const AIndex: Integer]: TSymmetricalSocket read GetClient;
      end;

      {* 接收数据、发送数据及管理分配的内存 *}
      TSymmetricalSocket = class
      private
        FSocket: TSocket;
        FServer: TServerSocket;
        FAssignMemory: TList;
        FRemoteAddress, FRemoteHost: string;
        FRemotePort: Integer;

        {* 准备接收数据 *}
        function PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean;
        {* 获得完成端口内存块使用权 *}
        function Allocate: PIOCPStruct;
        {* 处理接收的数据 *}
        function WorkBlock(AIOCPStruct: PIOCPStruct; ACount: DWORD): Integer;
        {* 获得地方IP *}
        function GetRemoteIP: string;
        {* 获得远程机器名 *}
        function GetRemoteHost: string;
        {* 获得远程端口 *}
        function GetRemotePort: Integer;
      public
        constructor Create(ASvrSocket: TServerSocket; ASocket: TSocket);
        destructor Destroy; override;
        {* 发送数据 *}
        function Write(var ABuf; ACount: Integer): Integer;
        function WriteString(const AValue: string): Integer;

        property Socket: TSocket read FSocket;
        property RemoteAddress: string read GetRemoteIP;
        property RemoteHost: string read GetRemoteHost;
        property RemotePort: Integer read GetRemotePort;
      end;

      TSocketThread = class(TThread)
      private
        FServer: TServerSocket;
      public
        constructor Create(AServer: TServerSocket);
      end;

      TAcceptThread = class(TSocketThread)
      protected
        procedure Execute; override;
      end;

      TWorkThread = class(TSocketThread)
      protected
        procedure Execute; override;
      end;

    implementation

    uses
      RTLConsts;

    const
     SHUTDOWN_FLAG = $FFFFFFFF;

    { TMemoryManager }

    constructor TMemoryManager.Create(AServerSocket: TServerSocket;
      ACount: Integer);
    var
      i: Integer;
      pIOCPData: PIOCPStruct;
    begin
      inherited Create;
      FList := TList.Create;
      FLock := TCriticalSection.Create;
      for i := 1 to ACount do
      begin
        New(pIOCPData);
        FillChar(pIOCPData^, SizeOf(PIOCPStruct), 0);
        {* 下面两句其实由FillChar已经完成,在这写,只是为了强调 *}
        pIOCPData.Assigned := False;
        pIOCPData.Active := False;
        FList.Add(pIOCPData);
      end;
    end;

    function TMemoryManager.Allocate: PIOCPStruct;
    var
      i: Integer;
    begin
      FLock.Enter;
      try
        Result := nil;
        for i := 0 to FList.Count - 1 do
        begin
          Result := FList[i];
          if not Result.Assigned then
            Break;
        end;
        if (not Assigned(Result)) or (Result.Assigned) then
        begin
          New(Result);
          FList.Add(Result);
        end;
        FillChar(Result^, SizeOf(TIOCPStruct), 0);
        Result.Assigned := True;
        Result.Active := False;
      finally
        FLock.Leave;
      end;
    end;

    constructor TMemoryManager.Create(AServerSocket: TServerSocket);
    begin
      Create(AServerSocket, 200);
    end;

    destructor TMemoryManager.Destroy;
    var
      i: Integer;
    begin
      for i := 0 to FList.Count - 1 do
        FreeMem(FList[i]);
      FList.Clear;
      FList.Free;
      FLock.Free;
      inherited;
    end;

    function TMemoryManager.GetCount: Integer;
    begin
      Result := FList.Count;
    end;

    function TMemoryManager.GetIOCPStruct(AIndex: Integer): PIOCPStruct;
    begin
      Result := nil;
      if (AIndex >= FList.Count) or (AIndex < 0) then
        EMemoryBuffer.CreateFmt(SListIndexError, [AIndex])
      else
        Result := FList[AIndex];
    end;

    procedure TMemoryManager.Release(AValue: PIOCPStruct);
    begin
      FLock.Enter;
      try
        AValue.Assigned := False;
        AValue.Active := False;
      finally
        FLock.Leave;
      end;
    end;

    { TServerSocket }

    constructor TServerSocket.Create;
    begin
      FMemory := TMemoryManager.Create(Self);
      FClients := TList.Create;
      FThreads := TList.Create;
      FSocket := INVALID_SOCKET;
      FLock := TCriticalSection.Create;

      FPort := 6666;
      FAcceptThread := nil;
      FIOCPHandle := 0;
      FHandle := AllocateHWnd(WndProc);
    end;

    destructor TServerSocket.Destroy;
    begin
      //关闭完成端口
      SetActive(False);
      FThreads.Free;
      FClients.Free;
      DeallocateHWnd(FHandle);
      FMemory.Free;
      FLock.Free;
      inherited;
    end;

    procedure TServerSocket.Open;
    var
      SystemInfo: TSystemInfo;
      i: Integer;
      Thread: TThread;
      Addr: TSockAddr;
      WSData: TWSAData;
    begin
      try
        if WSAStartup($0202, WSData) <> 0 then
        begin
          raise ESocketError.Create('WSAStartup');
        end;
        DoLog(seInitIOPort);  //初始化完成端口
        FIOCPHandle := CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
        if FIOCPHandle = 0 then
          CheckError;

        DoLog(seInitThread); //初始化工作线程
        GetSystemInfo(SystemInfo);
        for i := 0 to SystemInfo.dwNumberOfProcessors * 2 -1 do
        begin
          Thread := TWorkThread.Create(Self);
          FThreads.Add(Thread);
        end;

        DoLog(seInitSocket); //建立套接字
        FSocket := WSASocket(PF_INET, SOCK_STREAM, 0, nil, 0, WSA_FLAG_OVERLAPPED);
        if FSocket = INVALID_SOCKET then CheckError;

        FillChar(Addr, SizeOf(TSockAddr), 0);
        Addr.sin_family := AF_INET;
        Addr.sin_port := htons(FPort);
        Addr.sin_addr.S_addr := htonl(INADDR_ANY);
        CheckError(bind(FSocket, @Addr, SizeOf(TSockAddr)), 'bind');

        DoLog(seListen);  //开始监听
        CheckError(listen(FSocket, 5), 'listen');
        FAcceptThread := TAcceptThread.Create(Self);
      except
        on E: Exception do
        begin
          Close;
          CheckError(GetLastError, E.Message);
        end;
      end;
    end;

    procedure TServerSocket.Close;
    var
      i: Integer;
      Thread: TThread;
    begin
      try
        WSACleanup;
        DoLog(seUninitSocket);
        FAcceptThread.Terminate;
        if FSocket <> INVALID_SOCKET then
        begin
          closesocket(FSocket);
          FSocket := INVALID_SOCKET;
        end;

        DoLog(seUninitThread);
        for i := FThreads.Count - 1 downto 0 do
        begin
          Thread := FThreads[i];
          Thread.Terminate;
          PostQueuedCompletionStatus(FIOCPHandle, 0, 0, Pointer(SHUTDOWN_FLAG))
        end;
        FThreads.Clear;

        for i := FClients.Count - 1 downto 0 do
        begin
          TSymmetricalSocket(FClients[i]).Free;
        end;
        FClients.Clear;

        DoLog(seUninitIOPort);
        CloseHandle(FIOCPHandle);
        FIOCPHandle := 0;
      except
        on E: Exception do
        begin
          Close;
          CheckError(-1, E.Message);
        end;
      end;
    end;

    procedure TServerSocket.SetActive(AValue: Boolean);
    begin
      if FActive = AValue then Exit;
      FActive := AValue;
      if FActive then
        Open
      else
        Close;
    end;

    procedure TServerSocket.WndProc(var AMsg: TMessage);
    begin
      try
        Dispatch(AMsg);
      except
        if Assigned(ApplicationHandleException) then
          ApplicationHandleException(Self);
      end;
    end;

    function TServerSocket.CheckError(AErrorCode: Integer; AInfo: string): Boolean;
    var
      HandleError: Boolean;
    begin
      Result := True;
      if AErrorCode = 0 then Exit;
      if AErrorCode = -1 then
        AErrorCode := WSAGetLastError;
      if AErrorCode = -1 then
        AErrorCode := GetLastError;
      if (AErrorCode <> WSAEWOULDBLOCK) and (AErrorCode <> ERROR_IO_PENDING) and
        (AErrorCode <> 0) then
      begin
        if Assigned(FOnError) then
        begin
          HandleError := False;
          FOnError(AErrorCode, SysErrorMessage(AErrorCode), AInfo, HandleError);
          if HandleError then Exit;
        end;
        raise ESocketError.CreateFmt(SWindowsSocketError,
           [SysErrorMessage(AErrorCode), AErrorCode, AInfo]);
      end;
    end;

    procedure TServerSocket.DoLog(ASocketEvent: TSocketEvent; AInfo: string);
    begin
      if Assigned(FOnLog) then FOnLog(ASocketEvent, AInfo);
    end;

    procedure TServerSocket.DoRead(ASymmetricalSocket: TSymmetricalSocket;
      AData: Pointer; ACount: Integer);
    begin
      if Assigned(FOnRead) then
        FOnRead(ASymmetricalSocket, AData, ACount);
    end;

    procedure TServerSocket.SetPort(AValue: Integer);
    begin
      if FActive then
        raise ESocketError.Create('IOCP is acitve, cann''t change port');
      FPort := AValue;
    end;

    procedure TServerSocket.RegisterClient(ASocket: TSymmetricalSocket);
    begin
      FLock.Enter;
      try
        if FClients.IndexOf(ASocket) = -1 then
        begin
          FClients.Add(ASocket);
          DoAfterConnect(ASocket);
          {* 注册关闭通知消息 *}
          WSAAsyncSelect(ASocket.Socket, FHandle, WM_CLIENTSOCKET, FD_CLOSE);
        end;
      finally
        FLock.Leave;
      end;
    end;

    procedure TServerSocket.UnRegisterClient(ASocket: TSymmetricalSocket);
    var
      iIndex: Integer;
    begin
      FLock.Enter;
      try
        iIndex := FClients.IndexOf(ASocket);
        if iIndex <> -1 then
        begin
          FClients.Delete(iIndex);
          DoDisConnect(ASocket);
        end;
      finally
        FLock.Leave;
      end;
    end;

    procedure TServerSocket.AcceptClient;
    var
      Addr: TSockAddrIn;
      iAddrLen: Integer;
      ClientWinSocket: TSocket;
      SymmSocket: TSymmetricalSocket;
    begin
      iAddrLen := SizeOf(TSockAddrIn);
      ClientWinSocket := WinSock2.WSAAccept(Socket, nil, nil, nil, 0);
      if ClientWinSocket <> INVALID_SOCKET then
      begin
        if (not Active) or (not DoConnect(ClientWinSocket)) then
        begin
          closesocket(ClientWinSocket);
          Exit;
        end;
        try
          DoLog(seAccept);
          SymmSocket := TSymmetricalSocket.Create(Self, ClientWinSocket);
          DoLog(seConnect);
        except
          closesocket(ClientWinSocket);
          CheckError;
          Exit;
        end;
        if CreateIoCompletionPort(ClientWinSocket, FIOCPHandle, DWORD(SymmSocket), 0) = 0 then
        begin
          CheckError(GetLastError, 'CreateIoCompletionPort');
          SymmSocket.Free;
        end
        else
          SymmSocket.PrepareRecv;
      end;
    end;

    procedure TServerSocket.DoAfterConnect(ASymSocket: TSymmetricalSocket);
    begin
      if Assigned(FOnAfterConnect) then FOnAfterConnect(ASymSocket);
    end;

    function TServerSocket.DoConnect(ASocket: TSocket): Boolean;
    var
      SockAddrIn: TSockAddrIn;
      Size: Integer;
    begin
      Result := True;
      if Assigned(FOnBeforeConnect) then
      begin
        Size := SizeOf(TSockAddrIn);
        CheckError(getpeername(ASocket, SockAddrIn, Size), 'getpeername');
        FOnBeforeConnect(inet_ntoa(SockAddrIn.sin_addr), Result);
      end;
    end;

    procedure TServerSocket.DoDisConnect(ASymSocket: TSymmetricalSocket);
    begin
      if Assigned(FOnAfterDisconnect) then FOnAfterDisconnect(ASymSocket);
    end;

    function TServerSocket.FindSymmClient(
      ASocket: TSocket): TSymmetricalSocket;
    var
      i: Integer;
    begin
      Result := nil;
      FLock.Enter;
      try
        for i := 0 to FClients.Count - 1 do
        begin
          Result := FClients[i];
          if ASocket = Result.Socket then
            Break
          else
            Result := nil;
        end;
      finally
        FLock.Leave;
      end;
    end;

    function TServerSocket.GetClient(const AIndex: Integer): TSymmetricalSocket;
    begin
      Result := FClients[AIndex];
    end;

    function TServerSocket.GetClientCount: Integer;
    begin
      Result := FClients.Count;
    end;

    procedure TServerSocket.WMClientClose(var AMsg: TCMSocketMessage);
    var
      ASymmSocket: TSymmetricalSocket;
    begin
      if AMsg.SelectEvent = FD_CLOSE then
      begin
        ASymmSocket := FindSymmClient(AMsg.Socket);
        if Assigned(ASymmSocket) then
          ASymmSocket.Free;
      end;
    end;

    { TSocketThread }

    constructor TSocketThread.Create(AServer: TServerSocket);
    begin
      FServer := AServer;
      inherited Create(False);
      FreeOnTerminate := True;
    end;

    { TAcceptThread }

    procedure TAcceptThread.Execute;
    begin
      inherited;
      while not Terminated and FServer.Active do
      begin
        FServer.AcceptClient;
      end;
    end;

    { TWorkThread }

    procedure TWorkThread.Execute;
    var
      ASymSocket: TSymmetricalSocket;
      AIOCPStruct: PIOCPStruct;
      iWorkCount: Cardinal;
    begin
      inherited;
      while (not Terminated) and (FServer.Active) do
      begin
        AIOCPStruct := nil;
        iWorkCount := 0;
        ASymSocket := nil;
        if not GetQueuedCompletionStatus(FServer.FIOCPHandle, iWorkCount,
          DWORD(ASymSocket), POVerlapped(AIOCPStruct), INFINITE) then
        begin
          if Assigned(ASymSocket) then
            FreeAndNil(ASymSocket);
          Continue;
        end;

        if Cardinal(AIOCPStruct) = SHUTDOWN_FLAG then Break; //退出标志
        if not FServer.Active then Break; //退出

        {* 客户可能超时 或是断开连接,I/O失败 应放在通知结束的后面 *}
        if iWorkCount = 0 then
        begin
          //FreeAndNil(ASymSocket);  //不在这儿释放,而是接收释放消息来释放
          Continue;
        end;
        FServer.DoLog(AIOCPStruct.Event);
        if ASymSocket.WorkBlock(AIOCPStruct, iWorkCount) = -1 then
        begin
          FreeAndNil(ASymSocket);
        end;
      end;
    end;

    { TSymmetricalSocket }

    constructor TSymmetricalSocket.Create(ASvrSocket: TServerSocket;
      ASocket: TSocket);
    begin
      FServer := ASvrSocket;
      FSocket := ASocket;
      FAssignMemory := TList.Create;
      FServer.RegisterClient(Self);
      //PrepareRecv;
    end;

    destructor TSymmetricalSocket.Destroy;
    var
      i: Integer;
      Linger: TLinger;
    begin
      FServer.UnRegisterClient(Self);
      FillChar(Linger, SizeOf(TLinger), 0);     //优雅关闭
      setsockopt(FSocket, SOL_SOCKET, SO_LINGER, @Linger, Sizeof(Linger));
      closesocket(FSocket);
      for i := FAssignMemory.Count - 1 downto 0 do
        FServer.MemoryManager.Release(FAssignMemory[i]);
      FAssignMemory.Free;
      inherited;
    end;

    function TSymmetricalSocket.Allocate: PIOCPStruct;
    var
      i: Integer;
    begin
      for i := 0 to FAssignMemory.Count - 1 do
      begin
        Result := FAssignMemory[i];
        if not Result.Active then
        begin
          Result.Active := True;
          Exit;
        end;
      end;
      Result := FServer.MemoryManager.Allocate;
      FAssignMemory.Add(Result);
      Result.Active := True;
    end;

    function TSymmetricalSocket.PrepareRecv(AIOCPStruct: PIOCPStruct = nil): Boolean;
    var
      iFlags, iTransfer: Cardinal;
      ErrCode: Integer;
    begin
      if not Assigned(AIOCPStruct) then
        AIOCPStruct := Allocate;
      iFlags := 0;
      AIOCPStruct.Event := seRead;
      FillChar(AIOCPStruct.Buffer, SizeOf(AIOCPStruct.Buffer), 0);
      FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0);
      AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer;
      AIOCPStruct.wsaBuffer.len := MAX_BUFSIZE;
      Result := WSARecv(FSocket, @AIOCPStruct.wsaBuffer, 1, @iTransfer, @iFlags, @AIOCPStruct.Overlapped, nil) <> SOCKET_ERROR;
      if not Result then
      begin
        ErrCode := WSAGetLastError;
        Result := ErrCode = ERROR_IO_PENDING;
        if not Result then
          FServer.CheckError(ErrCode, 'WSARecv');
      end;
    end;

    function TSymmetricalSocket.WorkBlock(AIOCPStruct: PIOCPStruct;
      ACount: DWORD): Integer;
    var
      ErrCode: Integer;
      iSend, iFlag: Cardinal;
    begin
      Result := 0;
      try
        case AIOCPStruct.Event of
          seRead:  //接收数据
          begin
            FServer.DoRead(Self, @AIOCPStruct.Buffer[0], ACount);
            if PrepareRecv(AIOCPStruct) then
              Result := ACount;
          end;
          seWrite: //发送数据
          begin
            Dec(AIOCPStruct.wsaBuffer.len, ACount);
            if AIOCPStruct.wsaBuffer.len <= 0 then
            begin
              AIOCPStruct.Active := False;
            end
            else
            begin
              FillChar(AIOCPStruct.Overlapped, SizeOf(AIOCPStruct.Overlapped), 0);
              iFlag := 0;
              if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend,
                iFlag, @AIOCPStruct.Overlapped, nil) then
              begin
                ErrCode := WSAGetLastError;
                if ErrCode <> ERROR_IO_PENDING then
                  FServer.CheckError(ErrCode, 'WSASend');
              end
              else Result := iSend;
            end;
          end;
        end;
      except
        Result := 0;
      end;
    end;

    function TSymmetricalSocket.Write(var ABuf; ACount: Integer): Integer;
    var
      AIOCPStruct: PIOCPStruct;
      ErrCode: Integer;
      iFlag, iSend: Cardinal;
    begin
      Result := ACount;
      if Result = 0 then Exit;
      AIOCPStruct := Allocate;
      iFlag := 0;
      AIOCPStruct.Event := seWrite;
      FillChar(AIOCPStruct.Buffer[0], SizeOf(AIOCPStruct.Buffer), 0);
      CopyMemory(@AIOCPStruct.Buffer[0], @ABuf, ACount);
      AIOCPStruct.wsaBuffer.buf := @AIOCPStruct.Buffer[0];
      AIOCPStruct.wsaBuffer.len := Result;

      if SOCKET_ERROR = WSASend(FSocket, @AIOCPStruct.wsaBuffer, 1, @iSend, iFlag,
        @AIOCPStruct.Overlapped, nil) then
      begin
        ErrCode := WSAGetLastError;
        if ErrCode <> ERROR_IO_PENDING then
        begin
          Result := SOCKET_ERROR;
          FServer.CheckError(ErrCode, 'WSASend');
        end;
      end;
    end;

    function TSymmetricalSocket.WriteString(const AValue: string): Integer;
    begin
      Result := Write(Pointer(AValue)^, Length(AValue));
    end;

    function TSymmetricalSocket.GetRemoteIP: string;
    var
      SockAddrIn: TSockAddrIn;
      iSize: Integer;
      HostEnt: PHostEnt;
    begin
      if FRemoteAddress = '' then
      begin
        iSize := SizeOf(SockAddrIn);
        FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
        FRemoteAddress := inet_ntoa(SockAddrIn.sin_addr);
      end;
      Result := FRemoteAddress;
    end;

    function TSymmetricalSocket.GetRemotePort: Integer;
    var
      SockAddrIn: TSockAddrIn;
      iSize: Integer;
      HostEnt: PHostEnt;
    begin
      if FRemoteAddress = '' then
      begin
        iSize := SizeOf(SockAddrIn);
        FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
        FRemotePort := ntohs(SockAddrIn.sin_port);
      end;
      Result := FRemotePort;
    end;

    function TSymmetricalSocket.GetRemoteHost: string;
    var
      SockAddrIn: TSockAddrIn;
      iSize: Integer;
      HostEnt: PHostEnt;
    begin
      if FRemoteAddress = '' then
      begin
        iSize := SizeOf(SockAddrIn);
        FServer.CheckError(getpeername(FSocket, SockAddrIn, iSize), 'getpeername');
        HostEnt := gethostbyaddr(@SockAddrIn.sin_addr.S_addr, 4, PF_INET);
        if HostEnt <> nil then FRemoteHost := HostEnt.h_name;
      end;
      Result := FRemoteHost
    end;

    end.

    http://blog.sina.com.cn/s/blog_562349090100zufs.html

  • 相关阅读:
    CentOS安装部署Nodejs
    CentOS安装部署Git
    CentOS安装部署Mysql 5.7
    CentOS 7 安装Java环境(脚本一键式安装)
    Java技术 | 细谈Java中UUID的简单了解与使用
    Navicat Premium 版本 12.1 激活成永久
    Java技术 | 细谈 Java 8 中的 Base64
    安装部署Elastic Search
    从零开始搭建linux下laravel 5.5所需环境(二)
    从零开始搭建linux下laravel 5.5所需环境(一)
  • 原文地址:https://www.cnblogs.com/findumars/p/6435491.html
Copyright © 2011-2022 走看看