zoukankan      html  css  js  c++  java
  • ICOPclient版本号,异步connect

    之前在网上看到一个服务端的ICOP模块,比較小巧,感觉还不错,后来在工作中,需要开发一个挂号的程序,监视大量server执行情况,初期连接数大概六七百,我就把这个ICOP模块改造成了一个client版本号。后来发现因为是同步的connect,有时候会卡在connect过程非常久,也不方便设置connect的超时,想到使用ConnectEx做异步连接,感觉ConnectEx过于繁琐,还得自己获取函数指针,必需要先调用bind等,断开连接要调用DisconnectEx。后来我自己想到一种办法,在调用connect之前,用ioctlsocket把socket先设置为非堵塞模式,然后在连接成功后再设置回堵塞模式,但这有一个问题,ICOP里面设置为非堵塞模式,怎么推断连接成功、失败、超时呢?我是这么做的,调用connect成功之后,投递事件,在connect事件里,调用getsockopt(clt->fd, SOL_SOCKET, 0x700C/*SO_CONNECT_TIME*/, (char*)&Connect_Time, &len)来检測连接时间,假设返回-1表示连接没有成功,然后推断是否超时,假设超时直接失败,否则断续投递事件,直到连接成功或者超时,以下直接上代码,关键代码段在:int connect()函数和case T::EV_CONNECT: 

    #ifndef iocptcpclient_h__
    #define iocptcpclient_h__
    #include <Winsock2.h>
    #include <windows.h>
    #include <MSTCPiP.h>
    
    #pragma comment(lib, "Ws2_32.lib")
    
    namespace iocp
    {
    	template<typename T>
    	class Scheduler
    	{
    	public:
    		void start();
    		void stop();
    		void push(T * clt);
    	public:
    		int scheds;
    		HANDLE iocp;
    	};
    
    	template<typename T>
    	void Scheduler<T>::start()
    	{
    		iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, scheds);
    		if (NULL == iocp)
    		{
    			throw (int)::WSAGetLastError();
    		}
    	}
    
    	template<typename T>
    	void Scheduler<T>::stop()
    	{
    	}
    
    	template<typename T>
    	void Scheduler<T>::push(T * clt)
    	{
    		::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
    	}
    
    	template<typename T>
    	class Processor
    	{
    	public: 
    		void start();
    		void stop();
    	public:
    		static DWORD WINAPI run(LPVOID param);
    
    	public: 
    		int threads;
    		Scheduler<T> * scheder;
    	};
    
    	template<typename T>
    	void Processor<T>::start()
    	{
    		for (int i = 0; i < threads; i++)
    		{
    			DWORD tid;
    			HANDLE thd = ::CreateThread(NULL,
    				0,
    				(LPTHREAD_START_ROUTINE)run,
    				this,
    				0,
    				&tid);
    			if (NULL == thd)
    			{
    				throw (int)::GetLastError();
    			}
    			::CloseHandle(thd);
    		}
    	}
    
    	template<typename T>
    	void Processor<T>::stop()
    	{
    	}
    
    	template<typename T>
    	DWORD WINAPI Processor<T>::run(LPVOID param)
    	{
    		Processor<T>& procor = *(Processor<T> *)param;
    		Scheduler<T>& scheder = *procor.scheder;
    		HANDLE iocp = scheder.iocp;
    
    		DWORD ready;
    		ULONG_PTR key;
    		WSAOVERLAPPED * overlap;
    		while (true)
    		{
    			::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE);
    
    			T * clt = (T *)key;
    			switch (clt->event)
    			{
    			case T::EV_RECV:
    				{
    					if (0 >= ready)
    					{
    						clt->event = T::EV_DISCONNECT;
    						::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
    					}
    					else
    					{
    						clt->OnRecv(ready);
    					}
    				}
    				break;
    			case T::EV_CONNECT:
    				{
    					int Connect_Time;
    					int len = sizeof(Connect_Time);
    					int result = getsockopt(clt->fd, SOL_SOCKET, 0x700C/*SO_CONNECT_TIME*/, (char*)&Connect_Time, &len);
    					if (Connect_Time == -1){
    						if (GetTickCount() - clt->dwConnTime >= clt->maxConnTime){
    							clt->OnConnectFailed();
    							::closesocket(clt->fd);
    							clt->fd = INVALID_SOCKET;
    						}
    						else
    						{
    							Sleep(1);
    							::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
    						}
    					}
    					else
    					{
    						unsigned long ul = 0;
    						ioctlsocket(clt->fd, FIONBIO, &ul); //设置为堵塞模式*/
    						if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0))
    						{
    							clt->OnConnectFailed();
    							::closesocket(clt->fd);
    							clt->fd = INVALID_SOCKET;
    							//delete clt;
    						}
    						else
    						{
    							clt->OnConnect();
    						}
    					}
    				}
    				break;
    			case T::EV_DISCONNECT:
    				{
    					clt->OnDisconnect();
    					::closesocket(clt->fd);
    					clt->fd = INVALID_SOCKET;
    					//delete clt;
    				}
    				break;
    			case T::EV_SEND:
    				break;
    			}
    		}
    
    		return 0;
    	}
    
    	class Client
    	{
    	public:
    		enum EVENT
    		{
    			EV_CONNECT,
    			EV_DISCONNECT,
    			EV_RECV,
    			EV_SEND
    		};
    
    		Client(){
    			fd = INVALID_SOCKET;
    			maxConnTime = 5000;
    		}
    		virtual ~Client(){};
    
    		int connect(){
    			this->event = EV_CONNECT;
    			dwConnTime = GetTickCount();
    			struct sockaddr_in addr;
    			addr.sin_family = AF_INET;
    			addr.sin_addr.s_addr = ip;
    			addr.sin_port = htons(port);
    
    			DWORD dwError = 0, dwBytes = 0;
    			tcp_keepalive sKA_Settings = { 0 }, sReturned = { 0 };
    			sKA_Settings.onoff = 1;
    			sKA_Settings.keepalivetime = 30000;    // Keep Alive in 30 sec.
    			sKA_Settings.keepaliveinterval = 3000; // Resend if No-Reply
    			if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, &sKA_Settings,
    				sizeof(sKA_Settings), &sReturned, sizeof(sReturned), &dwBytes,
    				NULL, NULL) != 0)
    			{
    				dwError = WSAGetLastError();
    			}
    			unsigned long ul = 1;
    			ioctlsocket(fd, FIONBIO, &ul); //设置为非堵塞模式
    			int ret = -1;
    			if (::connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr)) == -1 && WSAGetLastError() == WSAEWOULDBLOCK)
    			{
    				ret = 0;
    			}
    			return ret;
    		}
    
    		void send(const char * buff, int len){
    			::send(fd, buff, len, 0);
    		}
    
    		void recv(char * buff, int len){
    			this->event = EV_RECV;
    
    			::memset(&overlap, 0, sizeof(overlap));
    			WSABUF buf;
    			buf.buf = buff;
    			buf.len = len;
    			DWORD ready = 0;
    			DWORD flags = 0;
    			if (0 != ::WSARecv(fd, &buf, 1, &ready, &flags, &overlap, NULL)
    				&& WSA_IO_PENDING != WSAGetLastError())
    			{
    				this->event = EV_DISCONNECT;
    				::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)this, NULL);
    			}
    		}
    		void close(){
    			::shutdown(fd, SD_BOTH);
    		}
    
    		virtual void OnConnect(){};//连接成功
    
    		virtual void OnConnectFailed(){};//连接失败
    
    		virtual void OnDisconnect(){};	//连接断开
    
    		virtual void OnRecv(int len){};
    
    		virtual void OnSend(int len){};
    	public:
    		int ip;
    		int port;
    		void * srv;
    		HANDLE iocp;
    		EVENT event;
    		SOCKET fd;
    		DWORD maxConnTime;
    		DWORD dwConnTime;
    		WSAOVERLAPPED overlap;
    	};
    
    	template<typename T>
    	class TCPClt
    	{
    	public:
    		void start();
    		void stop();
    		bool addclt(T* clt, int ip, int port);
    	public:
    		int scheds;
    		int threads;
    
    		iocp::Scheduler<T> scheder;
    		iocp::Processor<T> procor;
    	};
    
    	template<typename T>
    	void TCPClt<T>::start()
    	{
    		WSADATA wsadata;
    		int wsaversion = WSAStartup(MAKEWORD(2, 2), &wsadata);
    
    		if (threads <= 0)
    		{
    			threads = 1;
    		}
    
    		scheder.scheds = scheds;
    		scheder.start();
    
    		procor.threads = threads;
    		procor.scheder = &scheder;
    		procor.start();
    	}
    
    	template<typename T>
    	void TCPClt<T>::stop()
    	{
    	}
    
    	template<typename T>
    	bool TCPClt<T>::addclt(T* clt, int ip, int port){
    		clt->ip = ip;
    		clt->port = port;
    		clt->iocp = scheder.iocp;
    		clt->fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
    		if (clt->fd == INVALID_SOCKET)
    		{
    			return false;
    		}
    		if (clt->connect() != 0)
    		{
    			closesocket(clt->fd);
    			return false;
    		}
    		scheder.push(clt);
    		return true;
    	}
    }
    #endif // iocptcpclient_h__


  • 相关阅读:
    拖拽模块move2
    拖拽模块move1
    String类和StringBuilder
    你真的会二分查找吗
    C++中关于new及动态内存分配的思考
    【转】Github 上传代码
    HDU4801·二阶魔方
    POJ2676,HDU4069解决数独的两种实现:DFS、DLX
    读书笔记
    SpringBoot-------实现多数据源Demo
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3789843.html
Copyright © 2011-2022 走看看