zoukankan      html  css  js  c++  java
  • ICOPclient版本号,异步connect

    之前在网上看到一个服务端的ICOP模块,比較小巧,感觉还不错,后来在工作中,需要开发一个挂号的程序,监视大量server执行情况,初期连接数大概六七百,我就把这个ICOP模块改造成了一个client版本号。后来发现因为是同步的connect,有时候会卡在connect过程非常久,也不方便设置connect的超时,想到使用ConnectEx做异步连接,感觉ConnectEx过于繁琐,还得自己获取函数指针,必需要先调用bind等,断开连接要调用DisconnectEx。后来我自己想到一种办法,在调用connect之前,用ioctlsocket把socket先设置为非堵塞模式,然后在连接成功后再设置回堵塞模式,但这有一个问题,ICOP里面设置为非堵塞模式,怎么推断连接成功、失败、超时呢?我是这么做的,调用connect成功之后,投递事件,在connect事件里,调用getsockopt(clt->fd, SOL_SOCKET, 0x700C/*SO_CONNECT_TIME*/, (char*)&Connect_Time, &len)来检測连接时间,假设返回-1表示连接没有成功,然后推断是否超时,假设超时直接失败,否则断续投递事件,直到连接成功或者超时,以下直接上代码,关键代码段在:int connect()函数和case T::EV_CONNECT: 

    #ifndef iocptcpclient_h__
    #define iocptcpclient_h__
    #include <Winsock2.h>
    #include <windows.h>
    #include <MSTCPiP.h>
    
    #pragma comment(lib, "Ws2_32.lib")
    
    namespace iocp
    {
    	template<typename T>
    	class Scheduler
    	{
    	public:
    		void start();
    		void stop();
    		void push(T * clt);
    	public:
    		int scheds;
    		HANDLE iocp;
    	};
    
    	template<typename T>
    	void Scheduler<T>::start()
    	{
    		iocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, scheds);
    		if (NULL == iocp)
    		{
    			throw (int)::WSAGetLastError();
    		}
    	}
    
    	template<typename T>
    	void Scheduler<T>::stop()
    	{
    	}
    
    	template<typename T>
    	void Scheduler<T>::push(T * clt)
    	{
    		::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
    	}
    
    	template<typename T>
    	class Processor
    	{
    	public: 
    		void start();
    		void stop();
    	public:
    		static DWORD WINAPI run(LPVOID param);
    
    	public: 
    		int threads;
    		Scheduler<T> * scheder;
    	};
    
    	template<typename T>
    	void Processor<T>::start()
    	{
    		for (int i = 0; i < threads; i++)
    		{
    			DWORD tid;
    			HANDLE thd = ::CreateThread(NULL,
    				0,
    				(LPTHREAD_START_ROUTINE)run,
    				this,
    				0,
    				&tid);
    			if (NULL == thd)
    			{
    				throw (int)::GetLastError();
    			}
    			::CloseHandle(thd);
    		}
    	}
    
    	template<typename T>
    	void Processor<T>::stop()
    	{
    	}
    
    	template<typename T>
    	DWORD WINAPI Processor<T>::run(LPVOID param)
    	{
    		Processor<T>& procor = *(Processor<T> *)param;
    		Scheduler<T>& scheder = *procor.scheder;
    		HANDLE iocp = scheder.iocp;
    
    		DWORD ready;
    		ULONG_PTR key;
    		WSAOVERLAPPED * overlap;
    		while (true)
    		{
    			::GetQueuedCompletionStatus(iocp, &ready, &key, (LPOVERLAPPED *)&overlap, INFINITE);
    
    			T * clt = (T *)key;
    			switch (clt->event)
    			{
    			case T::EV_RECV:
    				{
    					if (0 >= ready)
    					{
    						clt->event = T::EV_DISCONNECT;
    						::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
    					}
    					else
    					{
    						clt->OnRecv(ready);
    					}
    				}
    				break;
    			case T::EV_CONNECT:
    				{
    					int Connect_Time;
    					int len = sizeof(Connect_Time);
    					int result = getsockopt(clt->fd, SOL_SOCKET, 0x700C/*SO_CONNECT_TIME*/, (char*)&Connect_Time, &len);
    					if (Connect_Time == -1){
    						if (GetTickCount() - clt->dwConnTime >= clt->maxConnTime){
    							clt->OnConnectFailed();
    							::closesocket(clt->fd);
    							clt->fd = INVALID_SOCKET;
    						}
    						else
    						{
    							Sleep(1);
    							::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)clt, NULL);
    						}
    					}
    					else
    					{
    						unsigned long ul = 0;
    						ioctlsocket(clt->fd, FIONBIO, &ul); //设置为堵塞模式*/
    						if (NULL == ::CreateIoCompletionPort((HANDLE)clt->fd, iocp, (ULONG_PTR)clt, 0))
    						{
    							clt->OnConnectFailed();
    							::closesocket(clt->fd);
    							clt->fd = INVALID_SOCKET;
    							//delete clt;
    						}
    						else
    						{
    							clt->OnConnect();
    						}
    					}
    				}
    				break;
    			case T::EV_DISCONNECT:
    				{
    					clt->OnDisconnect();
    					::closesocket(clt->fd);
    					clt->fd = INVALID_SOCKET;
    					//delete clt;
    				}
    				break;
    			case T::EV_SEND:
    				break;
    			}
    		}
    
    		return 0;
    	}
    
    	class Client
    	{
    	public:
    		enum EVENT
    		{
    			EV_CONNECT,
    			EV_DISCONNECT,
    			EV_RECV,
    			EV_SEND
    		};
    
    		Client(){
    			fd = INVALID_SOCKET;
    			maxConnTime = 5000;
    		}
    		virtual ~Client(){};
    
    		int connect(){
    			this->event = EV_CONNECT;
    			dwConnTime = GetTickCount();
    			struct sockaddr_in addr;
    			addr.sin_family = AF_INET;
    			addr.sin_addr.s_addr = ip;
    			addr.sin_port = htons(port);
    
    			DWORD dwError = 0, dwBytes = 0;
    			tcp_keepalive sKA_Settings = { 0 }, sReturned = { 0 };
    			sKA_Settings.onoff = 1;
    			sKA_Settings.keepalivetime = 30000;    // Keep Alive in 30 sec.
    			sKA_Settings.keepaliveinterval = 3000; // Resend if No-Reply
    			if (WSAIoctl(fd, SIO_KEEPALIVE_VALS, &sKA_Settings,
    				sizeof(sKA_Settings), &sReturned, sizeof(sReturned), &dwBytes,
    				NULL, NULL) != 0)
    			{
    				dwError = WSAGetLastError();
    			}
    			unsigned long ul = 1;
    			ioctlsocket(fd, FIONBIO, &ul); //设置为非堵塞模式
    			int ret = -1;
    			if (::connect(fd, (struct sockaddr *)&addr, sizeof(struct sockaddr)) == -1 && WSAGetLastError() == WSAEWOULDBLOCK)
    			{
    				ret = 0;
    			}
    			return ret;
    		}
    
    		void send(const char * buff, int len){
    			::send(fd, buff, len, 0);
    		}
    
    		void recv(char * buff, int len){
    			this->event = EV_RECV;
    
    			::memset(&overlap, 0, sizeof(overlap));
    			WSABUF buf;
    			buf.buf = buff;
    			buf.len = len;
    			DWORD ready = 0;
    			DWORD flags = 0;
    			if (0 != ::WSARecv(fd, &buf, 1, &ready, &flags, &overlap, NULL)
    				&& WSA_IO_PENDING != WSAGetLastError())
    			{
    				this->event = EV_DISCONNECT;
    				::PostQueuedCompletionStatus(iocp, 0, (ULONG_PTR)this, NULL);
    			}
    		}
    		void close(){
    			::shutdown(fd, SD_BOTH);
    		}
    
    		virtual void OnConnect(){};//连接成功
    
    		virtual void OnConnectFailed(){};//连接失败
    
    		virtual void OnDisconnect(){};	//连接断开
    
    		virtual void OnRecv(int len){};
    
    		virtual void OnSend(int len){};
    	public:
    		int ip;
    		int port;
    		void * srv;
    		HANDLE iocp;
    		EVENT event;
    		SOCKET fd;
    		DWORD maxConnTime;
    		DWORD dwConnTime;
    		WSAOVERLAPPED overlap;
    	};
    
    	template<typename T>
    	class TCPClt
    	{
    	public:
    		void start();
    		void stop();
    		bool addclt(T* clt, int ip, int port);
    	public:
    		int scheds;
    		int threads;
    
    		iocp::Scheduler<T> scheder;
    		iocp::Processor<T> procor;
    	};
    
    	template<typename T>
    	void TCPClt<T>::start()
    	{
    		WSADATA wsadata;
    		int wsaversion = WSAStartup(MAKEWORD(2, 2), &wsadata);
    
    		if (threads <= 0)
    		{
    			threads = 1;
    		}
    
    		scheder.scheds = scheds;
    		scheder.start();
    
    		procor.threads = threads;
    		procor.scheder = &scheder;
    		procor.start();
    	}
    
    	template<typename T>
    	void TCPClt<T>::stop()
    	{
    	}
    
    	template<typename T>
    	bool TCPClt<T>::addclt(T* clt, int ip, int port){
    		clt->ip = ip;
    		clt->port = port;
    		clt->iocp = scheder.iocp;
    		clt->fd = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);
    		if (clt->fd == INVALID_SOCKET)
    		{
    			return false;
    		}
    		if (clt->connect() != 0)
    		{
    			closesocket(clt->fd);
    			return false;
    		}
    		scheder.push(clt);
    		return true;
    	}
    }
    #endif // iocptcpclient_h__


  • 相关阅读:
    Windows server 2016 解决“无法完成域加入,原因是试图加入的域的SID与本计算机的SID相同。”
    Windows Server 2016 辅助域控制器搭建
    Windows Server 2016 主域控制器搭建
    Net Framework 4.7.2 覆盖 Net Framework 4.5 解决办法
    SQL SERVER 2012更改默认的端口号为1772
    Windows下彻底卸载删除SQL Serever2012
    在Windows Server2016中安装SQL Server2016
    SQL Server 创建索引
    C#控制台或应用程序中两个多个Main()方法的设置
    Icon cache rebuilding with Delphi(Delphi 清除Windows 图标缓存源代码)
  • 原文地址:https://www.cnblogs.com/mengfanrong/p/3789843.html
Copyright © 2011-2022 走看看