zoukankan      html  css  js  c++  java
  • Windows socket I/O模型 之 select(2)

    Windows socket I/O模型 之  select(1)中。我们仅仅是在console中简单的模拟了select的处理方法。


    还有非常多特性不能改动。比方仅仅能写,不能读。

    没使用线程。也没有同步不同的读写线程。


    先谈谈个人眼下对select的理解。

    select就是监控一组套接字的变化情况。

    比方一个fd_set的变量(暂且定义为fdRead)里面有5个套接字。当你传给select后,如果仅仅有2个被触发。

    那么这个fdRead也就改变了。这是为什么select须要从全局fd_set拷贝一份给select的原因。


    然后我们依据套接字的变化情况做对应的处理就OK了。可是大并发量还没有測试。


    今晚我改进了。以下大家见代码。

    。。。

    // Select_Server.cpp : Defines the entry point for the console application.
    // 服务端
    
    #include "stdafx.h"
    
    
    #define STR_SERVER_IP           "127.0.0.1"
    #define INT_SERVER_PORT         8001
    #define INT_DATABUFFER_SIZE     256
    
    
    SOCKET g_soClient;
    typedef std::list<SOCKET>   LstSocket;
    LstSocket                   g_lstSoClient;
    
    SOCKET g_soServer;
    fd_set g_fdSocketSet;
    
    // for thread synchronize
    CCriSec     g_criSec;
    
    DWORD WINAPI ThreadRead(LPVOID lpvParam)
    {
        int iResult = 0;
        sockaddr_in addrAccept;
        int iAcceptLen = sizeof(addrAccept);
        SOCKET soClient;
    
        FD_ZERO(&g_fdSocketSet);
        FD_SET(g_soServer, &g_fdSocketSet);
        fd_set fdRead, fdWrite;
        while( TRUE ) {
            // initialize
            FD_ZERO(&fdRead);
            FD_ZERO(&fdWrite);
            fdRead = g_fdSocketSet;
            fdWrite = g_fdSocketSet;
    
            int n1 = fdRead.fd_count;
            int n2 = fdWrite.fd_count;
            int n3 = g_fdSocketSet.fd_count;
            int iResult = select(0, &fdRead, &fdWrite, NULL, NULL);
            if( iResult == SOCKET_ERROR) {
                break;
            }
    
            if(FD_ISSET(g_soServer, &fdRead)) {
                soClient = accept(g_soServer, (sockaddr*)&addrAccept, &iAcceptLen);
    
                CCriSecLock lock(g_criSec);
                if(soClient == INVALID_SOCKET) {
                    continue;
                } else {
                    printf("
    [%s:%d] has connected to server!
    ", inet_ntoa(addrAccept.sin_addr),
                        ntohs(addrAccept.sin_port));
                    FD_SET(soClient, &g_fdSocketSet);
                }
            } else {
                // check read
                for(int i=0; i < (int)fdRead.fd_count; i++) {
                    if ( fdRead.fd_array[i] == g_soServer ) {
                        continue;
                    }
    
                    if( FD_ISSET(fdRead.fd_array[i], &g_fdSocketSet) ) {
                        sockaddr_in name;
                        int namelen = sizeof(sockaddr_in);
                        getpeername(fdRead.fd_array[i], (sockaddr *)&name, &namelen);
    
                        char buf[256] = {0};
                        int len = 256;
                        int ret = recv(fdRead.fd_array[i], buf, len, 0);
                        CCriSecLock lock(g_criSec);
                        if( ret == SOCKET_ERROR ) {
                            int nErr = GetLastError();
                            if( nErr == 10054 ) {
                                // Connection reset by peer.
                                FD_CLR(fdRead.fd_array[i], &g_fdSocketSet);
                                printf("
    [%s:%d] disconnect from server.
    ", inet_ntoa(name.sin_addr), ntohs(name.sin_port) );
                            } else {
                                printf("
    fdread failed with %d
    ", nErr);
                            }
                        } else {
                            printf("
    Recv from [%s:%d] : %s
    ", inet_ntoa(name.sin_addr), ntohs(name.sin_port), buf);
                        }
                    }
                }
                
                // check write
                static bool b11 = false;
                for(int i=0; i < (int)fdWrite.fd_count; i++) {
                    if( FD_ISSET(fdWrite.fd_array[i], &g_fdSocketSet) ) {
                        char buf[256] = "abcd";
                        int len = 256;
                        if( !b11 ) {
                            b11 = true;
                            //send(fdWrite.fd_array[i], buf, len ,0);
                        }
                    }
                }
            }
        }
    
        return 0;
    }
    
    DWORD WINAPI ThreadWrite(LPVOID lpvParam)
    {
        std::string str;
        {
            CCriSecLock lock(g_criSec);
            std::cout << "Please input message to client: ";
        }
        while( getline(std::cin, str) ) {
            if( str.compare("exit") == 0 ) {
                {
                    CCriSecLock lock(g_criSec);
                    printf("close write thread
    ");
                }
                break;
            }
    
            for(int i = 1; i < (int)g_fdSocketSet.fd_count; i++) {
                send(g_fdSocketSet.fd_array[i], str.data(), (int)str.size(), 0);
            }
        }
        
        return 0;
    }
    
    int _tmain(int argc, _TCHAR* argv[])
    {
        WORD dwVersion = MAKEWORD(2, 2);
        WSAData wsaData;
        WSAStartup(WINSOCK_VERSION,&wsaData);
    
        g_soServer = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
        if (INVALID_SOCKET == g_soServer) {
            printf("Failed to create socket!
    ");
            WSACleanup();
            return -1;
        }
    
        sockaddr_in addrServer;
        memset(&addrServer,0,sizeof(sockaddr_in));
        addrServer.sin_family = AF_INET;
        addrServer.sin_port = htons(INT_SERVER_PORT);
        addrServer.sin_addr.S_un.S_addr = htonl(INADDR_ANY);
    
        int iResult;
    
        bool bReuseAddr = true;
        iResult = setsockopt(g_soServer, SOL_SOCKET, SO_REUSEADDR, (char *)&bReuseAddr, sizeof(bReuseAddr));
        if(SOCKET_ERROR == iResult) {
            printf("Failed to set resueaddr socket!
    ");
            WSACleanup();
            return -1;
        }
    
        //设置非堵塞方式连接
        unsigned long cmd = 1;
        iResult = ioctlsocket(g_soServer, FIONBIO, &cmd);
    
        iResult = bind(g_soServer, (sockaddr *)&addrServer, sizeof(addrServer));
        if (SOCKET_ERROR == iResult) {
            printf("Failed to bind address!
    ");
            WSACleanup();
            return -1;
        }
    
        if (0 != listen(g_soServer, 5)) {
            printf("Failed to listen client!
    ");
            WSACleanup();
            return -1;
        }
    
        printf("Start server...
    ");
    
        HANDLE hWorkRead = CreateThread(NULL, 0, ThreadRead, NULL, 0, NULL);
        HANDLE hWorkWrite = CreateThread(NULL, 0, ThreadWrite, NULL, 0, NULL);
    
        ::WaitForSingleObject(hWorkRead, INFINITE);
        ::WaitForSingleObject(hWorkWrite, INFINITE);
    
        WSACleanup();
    
        return 0;
    }
    

    以下是client代码:

    // Select_Client.cpp : Defines the entry point for the console application.
    //
    
    #include "stdafx.h"
    
    
    
    #define INT_SERVER_PORT 8001
    #define STR_SERVER_IP "127.0.0.1"
    #define INT_DATABUFFER_SIZE 256
    #define STR_EXIT "exit"
    #define STR_RECV "recv"
    
    //
    SOCKET g_soClient;
    fd_set g_fdSocketSet;
    
    // for thread synchronize
    CCriSec     g_criSec;
    
    DWORD WINAPI ThreadWorker(LPVOID lpvParam)
    {
        FD_ZERO(&g_fdSocketSet);
        FD_SET(g_soClient, &g_fdSocketSet);
        fd_set fdRead, fdWrite;
        
        while( TRUE ) {
            // initialize
            FD_ZERO(&fdRead);
            FD_ZERO(&fdWrite);
            fdRead = g_fdSocketSet;
            fdWrite = g_fdSocketSet;
            
            int iResult = select(0, &fdRead, &fdWrite, NULL, NULL);
            if( iResult == SOCKET_ERROR) {
                break;
            } else if( iResult == 0 ) {
                printf("Time limit expired
    ");
            } else {
                // check read
                if (FD_ISSET(fdRead.fd_array[0], &g_fdSocketSet)) {
                    sockaddr_in name;
                    int namelen = sizeof(sockaddr_in);
                    getpeername(fdRead.fd_array[0], (sockaddr *)&name, &namelen);
    
                    char buf[256] = {0};
                    int len = 256;
                    int ret = recv(fdRead.fd_array[0], buf, len, 0);
                    CCriSecLock lock(g_criSec);
                    if( ret == SOCKET_ERROR ) {
                        int nErr = GetLastError();
                        if( nErr == 10054 ) {
                            // Connection reset by peer.
                            FD_CLR(fdRead.fd_array[0], &g_fdSocketSet);
                            printf( "
    [%s:%d] is closed.
    ", inet_ntoa(name.sin_addr), ntohs(name.sin_port) );
                        } else {
                            printf("fdread failed with %d
    ", nErr);
                        }
                    } else {
                        CCriSecLock lock(g_criSec);
                        printf("
    Recv from [%s:%d] : %s
    ", inet_ntoa(name.sin_addr), ntohs(name.sin_port), buf);
                    }
                }
    
                // check write
                if (FD_ISSET(fdWrite.fd_array[0], &g_fdSocketSet)) {
                    int a=2;
                    int b=a;
                }
            }
        }
        
        return 0;
    }
    
    void main(void)
    {
        WSAData wsaData;
        WSAStartup(WINSOCK_VERSION,&wsaData);
      
        g_soClient = socket(AF_INET,SOCK_STREAM, IPPROTO_TCP);
        if (INVALID_SOCKET == g_soClient) {
            printf("Failed to create client!
    ");
            WSACleanup();
        }
      
        sockaddr_in addrServer;
        addrServer.sin_addr.S_un.S_addr = inet_addr(STR_SERVER_IP);
        addrServer.sin_family = AF_INET;
        addrServer.sin_port = htons(INT_SERVER_PORT);
    
        int iResult;
    
        //设置非堵塞方式连接
        //unsigned long ul = 1;
        //iResult = ioctlsocket(g_soClient, FIONBIO, (unsigned long*)&ul);
        
        iResult = connect(g_soClient, (sockaddr *)&addrServer, sizeof(sockaddr_in));
        if (SOCKET_ERROR == iResult) {
            printf("Failed to connect server!(Error: %d)
    ", ::WSAGetLastError());
            WSACleanup();
            return;
        }
        
        HANDLE hWorker = CreateThread(NULL, 0, ThreadWorker, NULL, 0, NULL);
    
        std::string str;
        std::cout << "Please input message to server: ";
        while( getline(std::cin, str) ) {
            send(g_soClient, str.data(), str.size(), 0);
            std::cout << "Please input message to client: ";
        }
    
        closesocket(g_soClient);
        WSACleanup();
    }
    

    头文件

    // stdafx.h : include file for standard system include files,
    // or project specific include files that are used frequently, but
    // are changed infrequently
    //
    
    #pragma once
    
    #ifndef _WIN32_WINNT		// Allow use of features specific to Windows XP or later.                   
    #define _WIN32_WINNT 0x0501	// Change this to the appropriate value to target other versions of Windows.
    #endif						
    
    #include <stdio.h>
    #include <tchar.h>
    #include <string>
    #include <iostream>
    #include <WINSOCK2.H>
    
    #pragma comment(lib,"ws2_32.lib")
    
    
    
    // TODO: reference additional headers your program requires here
    #include "CriticalSection.h"
    
    

    // CriticalSection.h

    /*
     * Copyright: JessMA Open Source (ldcsaa@gmail.com)
     *
     * Version	: 2.3.2
     * Author	: Bruce Liang
     * Website	: http://www.jessma.org
     * Project	: https://github.com/ldcsaa
     * Blog		: http://www.cnblogs.com/ldcsaa
     * Wiki		: http://www.oschina.net/p/hp-socket
     * QQ Group	: 75375912
     *
     * Licensed under the Apache License, Version 2.0 (the "License");
     * you may not use this file except in compliance with the License.
     * You may obtain a copy of the License at
     *
     *      http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
     
    /****************************************************************************
    *																			*
    * CriticalSection.h 														*
    *																			*
    * Create by :																*
    * Kingfisher	2003-10-15													*
    * 																			*
    * Description: 																*
    * 封装Win32临界量对象和相互排斥量内核对象											*
    ****************************************************************************/
    
    #pragma once
    
    #include <windows.h>
    
    class CCriSec
    {
    public:
    	CCriSec()		{::InitializeCriticalSection(&m_crisec);}
    	~CCriSec()		{::DeleteCriticalSection(&m_crisec);}
    
    	void Lock()		{::EnterCriticalSection(&m_crisec);}
    	void Unlock()	{::LeaveCriticalSection(&m_crisec);}
    
    private:
    	CCriSec(const CCriSec& cs);
    	CCriSec operator = (const CCriSec& cs);
    
    private:
    	CRITICAL_SECTION    m_crisec;
    };
    
    class CCriSec2
    {
    public:
    	CCriSec2(BOOL bInitialize = TRUE)
    	{
    		if(bInitialize)
    		{
    			m_pcrisec = new CRITICAL_SECTION;
    			::InitializeCriticalSection(m_pcrisec);
    		}
    		else
    			m_pcrisec = NULL;
    	}
    
    	~CCriSec2() {Reset();}
    
    	void Attach(CRITICAL_SECTION* pcrisec)
    	{
    		Reset();
    		m_pcrisec = pcrisec;
    	}
    
    	CRITICAL_SECTION* Detach()
    	{
    		CRITICAL_SECTION* pcrisec = m_pcrisec;
    		m_pcrisec = NULL;
    		return pcrisec;
    	}
    
    	void Lock()		{::EnterCriticalSection(m_pcrisec);}
    	void Unlock()	{::LeaveCriticalSection(m_pcrisec);}
    
    private:
    	CCriSec2(const CCriSec2& cs);
    	CCriSec2 operator = (const CCriSec2& cs);
    
    	void Reset()
    	{
    		if(m_pcrisec)
    		{
    			::DeleteCriticalSection(m_pcrisec);
    			delete m_pcrisec;
    			m_pcrisec = NULL;
    		}
    	}
    
    private:
    	CRITICAL_SECTION*    m_pcrisec;
    };
    
    
    template<class CLockObj> class CLocalLock
    {
    public:
    	CLocalLock(CLockObj& obj) : m_lock(obj) {m_lock.Lock();}
    	~CLocalLock() {m_lock.Unlock();}
    private:
    	CLockObj& m_lock;
    };
    
    typedef CLocalLock<CCriSec>		CCriSecLock;
    typedef CLocalLock<CCriSec2>	CCriSecLock2;
    


  • 相关阅读:
    ueditor后台配置项返回格式出错,上传功能将不能正常使用
    js控制多层单选,多选按钮,做隐藏操作
    js控制全屏及退出全屏
    springboot2.0jar包启动异常
    第九篇: 高可用的服务注册中心
    第八篇: 服务链路追踪(Spring Cloud Sleuth)
    第七篇: 消息总线(Spring Cloud Bus)
    第六篇: 分布式配置中心(Spring Cloud Config)
    第五篇: 路由网关(zuul)
    NodeJS
  • 原文地址:https://www.cnblogs.com/lytwajue/p/6834850.html
Copyright © 2011-2022 走看看