(一) IO模型
I/O设备处理方式一般有两种 同步和异步
同步阻塞:这种方式就一直读写IO直道操作完成或者失败。
异步IO:使用overlapped I/O。overlapped I/O是WIN32的一项技术,你可以要求操作系统为你传送数据,并且在传送完毕时通知你。
(二)使用overlapped I/O:
先来看看OVERLAPPED 结构体有两种定义:
typedef struct _OVERLAPPED { DWORD Internal; DWORD InternalHigh; DWORD Offset; DWORD OffsetHigh; HANDLE hEvent; }OVERLAPPED typedef struct _OVERLAPPED { ULONG_PTR Internal; //操作系统保留,指出一个和系统相关的状态 ULONG_PTR InternalHigh; //指出发送或接收的数据长度 union { struct { DWORD Offset; //文件传送的字节偏移量的低位字 DWORD OffsetHigh; //文件传送的字节偏移量的高位字 }; PVOID Pointer; //指针,指向文件传送位置 }; HANDLE hEvent; //指定一个I/O操作完成后触发的事件 } OVERLAPPED, *LPOVERLAPPED;
1> 进行I/O操作时,指定overlapped方式使用CreateFile (),将其第6个参数指定为FILE_FLAG_OVERLAPPED,就是准备使用overlapped的方式构造或打开文件;
2> 如果采用 overlapped,那么ReadFile()、WriteFile()的第5个参数必须提供一个指针,指向一个OVERLAPPED结构。 OVERLAPPED用于记录了当前正在操作的文件一些相关信息。 详细的看下面代码例子:
int main() { BOOL rc; HANDLE hFile; DWORD numread; OVERLAPPED overlap; char buf[512]; char szPath=”c:\xxxxxxxx”; hFile = CreateFile( szPath, GENERIC_READ, FILE_SHARE_READ|FILE_SHARE_WRITE, NULL, OPEN_EXISTING, FILE_FLAG_OVERLAPPED, // 以overlapped打开文件 NULL ); // OVERLAPPED结构实始化为0 memset(&overlap, 0, sizeof(overlap)); //指定文件位置是1500; overlap.Offset = 1500; rc = ReadFile(hFile,buf,300,&numread,&overlap); //因为是overlapped操作,ReadFile会将读文件请求放入读队列之后立即返回(false),而不会等到文件读完才返回(true) if (rc) { //…………此处即得到数据了。 //文件真是被读完了,rc为true // 或当数据被放入cache中,或操作系统认为它可以很快速地取得数据,rc为true } else { if (GetLastError() == ERROR_IO_PENDING) {//当错误是ERROR_IO_PENDING,那意味着读文件的操作还在进行中 //等候,直到文件读完 WaitForSingleObject(hFile, INFINITE); rc = GetOverlappedResult(hFile,&overlap,&numread,FALSE); //上面二条语句完成的功能与下面一条语句的功能等价:一只阻塞等到得到数据才继续下面。 // GetOverlappedResult(hFile,&overlap,&numread,TRUE); } else { //出错了 } } CloseHandle(hFile); return EXIT_SUCCESS; }
注意:
如果多个操作同时访问同一 操作i/o,这时我们可以使用OVERLAPPED结构体中的event字段。这个envent事件必须是手动方式。在转移完成时处理一个事件设置为有信号状态,调用进程集这个成员在调用ReadFile、 WriteFile、TransactNamedPipe、 ConnectNamedPipe函数之后事件会设置成武信号状态,当完成I/O操作之后信号有设置为由信号状态。
(三) socket IOCP模型
socket通信也是一种I/O操作,既然文件I0有异步模型,那么socket也有异步模型,sokcet的异步操作模型由很多种,我们这里重点说明IOCP也叫完成端口。
使用普通socket通信和iocp通信流程图如下, 此图来自于博客http://blog.csdn.net/neicole/article/details/7549497。
//filename Iocp.h #pragma once #include <WinSock2.h> #include <stdio.h> #include <process.h> #define IP_SIZE 32 //ip地址长度 #define BUFFER_SIZE 1024 enum SOCKET_STATE { ACCEPT = 1, SEND, RECV }; /*传送给处理函数的参数*/ typedef struct tagPleData { SOCKET sSocket; CHAR szClientIP[IP_SIZE]; UINT uiClientPort; /* 其他信息 */ }PLEDATA, * LPPLEDATA; typedef struct tagIOData { OVERLAPPED oOverlapped; WSABUF wsBuffer; CHAR szBuffer[BUFFER_SIZE]; DWORD dSend; DWORD dRecv; SOCKET_STATE sState; }IOData, *LPIOData; typedef void (*ReadProc)(LPPLEDATA lpData, CHAR * RecvData); class Iocp { public: Iocp(const CHAR * host, UINT port); ~Iocp(void); static VOID ServerWorkThread( VOID * _this ); //监听完成端口线程 VOID SetReadProc(VOID * lprFun); //设置读取回掉函数 bool ListenEx(UINT backlog); static VOID AcceptEx(VOID * _this); public: ReadProc lpFun; //读取回调函数 HANDLE h_ComPlePort; //完成端口句柄 bool bIsListen; //是否是服务端socket SOCKADDR_IN m_SockAddr; //socket地址 SOCKET m_ListenSocketID; //socket CHAR m_Host[IP_SIZE]; //连接socketIp UINT m_Port; //连接IP };
//filenameIocp.cpp #include "Iocp.h" Iocp::Iocp(const CHAR * host, UINT port) { /*协商套接字版本*/ WSADATA wsaData; DWORD dwRet = WSAStartup( 0x0202, &wsaData ); if (0 != dwRet ) { WSACleanup(); throw 1; } m_ListenSocketID = INVALID_SOCKET ; memset( &m_SockAddr, 0, sizeof(SOCKADDR_IN) ) ; memset( m_Host, 0, IP_SIZE ) ; m_Port = 0 ; SYSTEM_INFO mySysInfo; GetSystemInfo( &mySysInfo ); iThreadNums = mySysInfo.dwNumberOfProcessors * 2 + 1; BOOL ret = FALSE ; bIsListen = TRUE; strncpy_s(m_Host, host, IP_SIZE - 1); m_SockAddr.sin_family = AF_INET; m_SockAddr.sin_addr.s_addr =inet_addr(host); m_SockAddr.sin_port = htons(port); /*创建监听套接字*/ m_ListenSocketID = WSASocket( AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED ); if( m_ListenSocketID== INVALID_SOCKET ) { throw 1; } /*设置套接字选项*/ CHAR opt = 1; ret = setsockopt( m_ListenSocketID , SOL_SOCKET , SO_REUSEADDR , (const CHAR * )&opt , sizeof(opt) ); if( ret != 0 ) { throw 1 ; } /*绑定套接字*/ if (SOCKET_ERROR == bind(m_ListenSocketID, (const struct sockaddr *)&m_SockAddr, sizeof(struct sockaddr))) { throw 1 ; } /*创建完成端口*/ h_ComPlePort = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, 0, 0 ); if ( h_ComPlePort == NULL ) { throw 1 ; } for ( DWORD i = 0; i < ( mySysInfo.dwNumberOfProcessors * 2 + 1 ); ++i ) { _beginthread(Iocp::ServerWorkThread, 0, (VOID *)this); } } Iocp::~Iocp(void) { WSACleanup(); } /************************************************* Function:AcceptEx Description:接受套接字的线程函数 Input: Output: Others: *************************************************/ VOID Iocp::AcceptEx(VOID * _this) { SOCKET acSocket; DWORD dwRecvBytes; Iocp * pTemp = (Iocp *)_this; SOCKADDR_IN sAddr; INT uiClientSize = sizeof(sAddr); //struct socketaddrin while (TRUE) { int x = 6; acSocket = WSAAccept( pTemp->m_ListenSocketID, (SOCKADDR *)&sAddr, &uiClientSize, NULL, 0 ); if ( acSocket == SOCKET_ERROR ) { return; } LPPLEDATA lpSocketData = (LPPLEDATA)malloc(sizeof(PLEDATA)); if ( NULL == lpSocketData ) { return; } lpSocketData->sSocket = acSocket; sprintf(lpSocketData->szClientIP, inet_ntoa(sAddr.sin_addr)); lpSocketData->uiClientPort = sAddr.sin_port; //将连接socket 加入完成端口中 if ( CreateIoCompletionPort( (HANDLE)acSocket, pTemp->h_ComPlePort, (ULONG_PTR)lpSocketData, 0 ) == NULL ) { return; } /*这里停止监听会有问题*/ if (pTemp->bIsListen = FALSE) { break; } LPIOData lpIoData = (LPIOData )malloc(sizeof(IOData)); if ( lpIoData == NULL ) { return; } #pragma region 投递线程事件 ZeroMemory( &( lpIoData->oOverlapped ), sizeof( lpIoData->oOverlapped) ); lpIoData->dSend = 0; lpIoData->dRecv = 0; lpIoData->wsBuffer.len = BUFFER_SIZE; lpIoData->wsBuffer.buf = lpIoData->szBuffer; lpIoData->sState = SEND; DWORD flags = 0; if ( WSARecv(acSocket, &(lpIoData->wsBuffer), 1, &dwRecvBytes, &flags, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR ) { if ( WSAGetLastError() != ERROR_IO_PENDING ) { return; } else { //return; printf("ERROR_IO_PENDING:ok "); } } #pragma endregion 投递线程事件 } } /************************************************* Function:ListenEx Description:监听函数 Input: Output: Others: *************************************************/ BOOL Iocp::ListenEx(UINT backlog) { if (SOCKET_ERROR == listen(m_ListenSocketID, backlog)) { return FALSE; } /*创建监听线程*/ if (-1 == _beginthread(Iocp::AcceptEx, 0, (VOID *)this)) { return FALSE; } return TRUE; } /************************************************* Function:ServerWorkThread Description:端口上的工作线程 Input: Output: Others: *************************************************/ VOID Iocp:: ServerWorkThread( VOID * _this ) { Iocp * lpTemp = (Iocp *)_this; HANDLE hPlePort = (HANDLE)lpTemp->h_ComPlePort; DWORD dwBytes; LPPLEDATA lpPleData = NULL; LPIOData lpIoData = NULL; DWORD sendBytes = 0; DWORD recvBytes = 0; DWORD dwFlag = 0; while (TRUE) { int x = 89; if ( GetQueuedCompletionStatus( hPlePort, &dwBytes, (PULONG_PTR)&lpPleData, (LPOVERLAPPED *)&lpIoData, INFINITE ) == 0 ) { return ; } if ( dwBytes == 0 || NULL == lpIoData) { printf("there is a socket away "); free( lpPleData ); free( lpIoData ); continue; } else { #pragma region 接受到数据 lpIoData->dRecv = dwBytes; lpIoData->szBuffer[lpIoData->dRecv] = 0; //printf("ServerWorkThread:R[%s] ", lpIoData->szBuffer); lpTemp->lpFun(lpPleData, lpIoData->szBuffer); #pragma endregion 接受到数据 #pragma region 再次投递 lpIoData->dRecv = 0; ZeroMemory( &(lpIoData->oOverlapped), sizeof( OVERLAPPED ) ); lpIoData->wsBuffer.len = BUFFER_SIZE; lpIoData->wsBuffer.buf = lpIoData->szBuffer; if ( WSARecv( lpPleData->sSocket, &(lpIoData->wsBuffer), 1, &recvBytes, &dwFlag, &(lpIoData->oOverlapped), NULL ) == SOCKET_ERROR ) { if ( WSAGetLastError() != ERROR_IO_PENDING ) { return ; } } #pragma endregion 再次投递 } } } VOID Iocp::SetReadProc(VOID * lprFun) { lpFun = (ReadProc)lprFun; }
//finename main.cpp #include <iostream> #include "Iocp.h" using namespace std; #pragma comment( lib, "Ws2_32.lib" ) //客户端的发送的数据会在这个函数通知 void OnRead(LPPLEDATA lpData, CHAR * lpRecvData) { SOCKET sSock = lpData->sSocket; printf("socket:IP[%s:%d] send data[%s] ",lpData->szClientIP, lpData->uiClientPort, lpRecvData); } void main() { Iocp server("127.0.0.1", 20000); server.SetReadProc((VOID *)OnRead); server.ListenEx(10); }