zoukankan      html  css  js  c++  java
  • DelphiIOCP学习笔记<三>====工作线程和Listener

    接第一次代码继续分析

    uses
      JwaWinsock2, Windows, SysUtils;
    
    const
      DATA_BUFSIZE = 1024;
    
      IO_TYPE_Accept = 1;
      IO_TYPE_Recv = 2;
    
    
    
    type
      //(1):单IO数据结构
      LPVOID = Pointer;
      LPPER_IO_OPERATION_DATA = ^PER_IO_OPERATION_DATA ;
      PER_IO_OPERATION_DATA = packed record
        Overlapped: OVERLAPPED;
        IO_TYPE: Cardinal;
        DataBuf: TWSABUF;
        Buffer: array [0..1024] of CHAR;
      end;

    刚开始结存iocp的时候可能无法理解为什么要申明这样一个结构。

    解释下,这个结构是GetQueuedCompletionStatus,PostQueuedCompletionStatus,WSARecv,WSASend,时需要用到一个POverlapped类型的参数。

    也许还会有疑惑,为什么不直接使用系统自带的类型呢?

    POverlapped = ^TOverlapped;
    _OVERLAPPED = record
      Internal: DWORD;
      InternalHigh: DWORD;
      Offset: DWORD;
      OffsetHigh: DWORD;
      hEvent: THandle;
    end;

    ///我可以解释下.是为了在PostQueuedCompletionStatus,WSARecv,WSASend尽可能多传递一下信息给GetQueuedCompletionStatus,所以一般都会扩展这一机构体

    //再啰嗦下。定义的结构体,Overlapped: OVERLAPPED;必须放在第一个.你懂的。

    //PostQueuedCompletionStatus,WSARecv,WSASend会触发工作线程的GetQueuedCompletionStatus返回<上一笔记有提到>

    下面片段是Listen过程

     //下面循环进行循环获取客户端的请求。
      while (TRUE) do
      begin
         //当客户端有连接请求的时候,WSAAccept函数会新创建一个套接字cSocket。这个套接字就是和客户端通信的时候使用的套接字。
         cSocket:= WSAAccept(sSocket, nil, nil, nil, 0);
    
         //判断cSocket套接字创建是否成功,如果不成功则退出。
         if (cSocket= SOCKET_ERROR) then
         begin
            closesocket(sSocket);
            exit;
         end;
    
         //将套接字、完成端口绑定在一起。
    
         //     最开始的时候没有明白为什么还要调用一次createIoCompletionPort
         //
         //     后来经过google,和测试
         //
         //     是将新的套接字(socket)加入到iocp端口<绑定>
         //     这样工作线程才能处理这个套接字(socket)的数据包
         //如果把下面注释掉,WSARecv这个套接字时,GetQueuedCompletionStatus无法处理到收到的数据包
          
         //      2013年4月19日 09:56:00 
         //      注意第三个参数也需要进行绑定, 否则在工作线程中GetQueuedCompletionStatus时completionKey会取不到cSocket值
         lvPerIOPort := CreateIoCompletionPort(cSocket, lvIOPort, cSocket, 0);
         if (lvPerIOPort = 0) then
         begin
            Exit;
         end;
    
         //初始化数据包
         PerIoData := LPPER_IO_OPERATION_DATA(GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA)));
         ZeroMemory(@PerIoData.Overlapped, sizeof(OVERLAPPED));
    
         //数据包中的IO类型:有连接请求
         PerIoData.IO_TYPE := IO_TYPE_Accept;
    
         //通知工作线程,有新的套接字连接<第三个参数>
         PostQueuedCompletionStatus(lvIOPort, 0, cSocket, POverlapped(PerIOData));
      end;

    下面是IOCP工作线程

    function ServerWorkerThread(pData:Pointer): Integer; stdcall;
    var
      CompletionPort:THANDLE;
      BytesTransferred:Cardinal;
      PerIoData:LPPER_IO_OPERATION_DATA;
      cSocket:TSocket;
      Flags:Cardinal;
      RecvBytes:Cardinal;
      lvResultStatus:BOOL;
      lvRet:Integer;
    
    begin
      CompletionPort:=THandle(pData);
      //得到创建线程是传递过来的IOCP
       while(TRUE) do
       begin
            //工作者线程会停止到GetQueuedCompletionStatus函数处,直到接受到数据为止
            lvResultStatus := GetQueuedCompletionStatus(CompletionPort,
              BytesTransferred,
              cSocket,
              POverlapped(PerIoData), INFINITE);
    if (lvResultStatus = False) then
            begin
              //当客户端连接断开或者客户端调用closesocket函数的时候,函数GetQueuedCompletionStatus会返回错误。如果我们加入心跳后,在这里就可以来判断套接字是否依然在连接。
              if cSocket<>0 then
              begin
                closesocket(cSocket);
              end;
              if PerIoData<>nil then
              begin
                GlobalFree(DWORD(PerIoData));
              end;
              continue;
            end;
    
            if PerIoData = nil then
            begin
              closesocket(cSocket);
              Break;
            end else  if (PerIoData<>nil) then
            begin
              ////shutdown(PerHandleData.Socket, 1);
              if PerIoData.IO_TYPE = IO_TYPE_Accept then  //连接请求
              begin
                GlobalFree(DWORD(PerIoData));
              end else if PerIoData.IO_TYPE = IO_TYPE_Recv then
              begin
                ////可以在这里处理数据……
    
                GlobalFree(DWORD(PerIoData));
              end;
    
              /////分配内存<可以加入内存池>
              PerIoData := LPPER_IO_OPERATION_DATA(GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA)));
              ZeroMemory(@PerIoData.Overlapped, sizeof(OVERLAPPED));
              Flags := 0;
    
              /////进入投递收取动作
              PerIoData.IO_TYPE := IO_TYPE_Recv;
              PerIoData.DataBuf.len:=DATA_BUFSIZE;
              ZeroMemory(@PerIoData.Buffer,sizeof(@PerIoData.Buffer));
              PerIoData.DataBuf.buf := @PerIoData.Buffer;
    
              /////异步收取数据
              WSARecv(cSocket,
                 @PerIoData.DataBuf,
                 1,
                 RecvBytes,
                 Flags,
                 @PerIoData^, nil);
              if (WSAGetLastError() <> ERROR_IO_PENDING) then
              begin
                closesocket(cSocket);
                if PerIoData <> nil then
                begin
                  GlobalFree(DWORD(PerIoData));
                end;
                Continue;
              end;
            end;
       end;
    end;
  • 相关阅读:
    elasticsearch 索引清理脚本及常用命令
    git 快速入门及常见用法
    第01章-成本,你真的算对过吗?
    windows mysql安装及常用命令
    centos7 systemctl配置开机自启动服务
    python pip手动安装二进制包
    centos7使用nginx+uwsgi部署python django项目
    python json.loads()、json.dumps()和json.dump()、json.load()区别
    比阿里云快2倍的InfluxDB集群,我们开源了
    为什么是InfluxDB | 写在《InfluxDB原理和实战》出版之际
  • 原文地址:https://www.cnblogs.com/DKSoft/p/3027780.html
Copyright © 2011-2022 走看看