zoukankan      html  css  js  c++  java
  • 面向连接的Socket Server的简单实现

    一、基本原理

    有时候我们需要实现一个公共的模块,需要对多个其他的模块提供服务,最常用的方式就是实现一个Socket Server,接受客户的请求,并返回给客户结果。

    这经常涉及到如果管理多个连接及如何多线程的提供服务的问题,常用的方式就是连接池和线程池,基本流程如下:

    SocketServer

    首先服务器端有一个监听线程,不断监听来自客户端的连接。

    当一个客户端连接到监听线程后,便建立了一个新的连接。

    监听线程将新建立的连接放入连接池进行管理,然后继续监听新来的连接。

    线程池中有多个服务线程,每个线程都监听一个任务队列,一个建立的连接对应一个服务任务,当服务线程发现有新的任务的时候,便用此连接向客户端提供服务。

    一个Socket Server所能够提供的连接数可配置,如果超过配置的个数则拒绝新的连接。

    当服务线程完成服务的时候,客户端关闭连接,服务线程关闭连接,空闲并等待处理新的任务。

    连接池的监控线程清除其中关闭的连接对象,从而可以建立新的连接。

    二、对Socket的封装

    Socket的调用主要包含以下的步骤:

    clip_image001

    调用比较复杂,我们首先区分两类Socket,一类是Listening Socket,一类是Connected Socket.

    Listening Socket由MySocketServer负责,一旦accept,则生成一个Connected Socket,又MySocket负责。

    MySocket主要实现的方法如下:

    int MySocket::write(const char * buf, int length)
    {
            int ret = 0;
            int left = length;
            int index = 0;
            while(left > 0)
            {
                    ret = send(m_socket, buf + index, left, 0);
                    if(ret == 0)
                            break;
                    else if(ret == -1)
                    {
                            break;
                    }
                    left -= ret;
                    index += ret;
            }
            if(left > 0)
                    return -1;
            return 0;
    }

    int MySocket::read(char * buf, int length)
    {
            int ret = 0;
            int left = length;
            int index = 0;
            while(left > 0)
            {
                    ret = recv(m_socket, buf + index, left, 0);
                    if(ret == 0)
                            break;
                    else if(ret == -1)
                            return -1;
                    left -= ret;
                    index += ret;
            }

            return index;
    }

    int MySocket::status()
    {
            int status;
            int ret;
            fd_set checkset;
            struct timeval timeout;

            FD_ZERO(&checkset);
            FD_SET(m_socket, &checkset);

            timeout.tv_sec = 10;
            timeout.tv_usec = 0;

            status = select((int)m_socket + 1, &checkset, 0, 0, &timeout);
            if(status < 0)
                    ret = -1;
            else if(status == 0)
                    ret = 0;
            else
                    ret = 0;
            return ret;
    }

    int MySocket::close()
    {
            struct linger lin;
            lin.l_onoff = 1;
            lin.l_linger = 0;
            setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
            ::close(m_socket);
            return 0;
    }

    MySocketServer的主要方法实现如下:

    int MySocketServer::init(int port)
    {
            if((m_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1)
            {
                    return -1;
            }

            struct sockaddr_in serverAddr;
            memset(&serverAddr, 0, sizeof(struct sockaddr_in));
            serverAddr.sin_addr.s_addr = htonl(INADDR_ANY);
            serverAddr.sin_family = AF_INET;
            serverAddr.sin_port = htons(port);

            if(bind(m_socket, (struct sockaddr *)&serverAddr, sizeof(serverAddr)) == -1)
            {
                    ::close(m_socket);
                    return -1;
            }

            if(listen(m_socket, SOMAXCONN) == -1)
            {
                    ::close(m_socket);
                    return -1;
            }

            struct linger lin;
            lin.l_onoff = 1;
            lin.l_linger = 0;

            setsockopt(m_socket, SOL_SOCKET, SO_LINGER, (const char *)&lin, sizeof(lin));
            m_port = port;
            m_inited = true;
            return 0;
    }

    MySocket * MySocketServer::accept()
    {
            int sock;
            struct sockaddr_in clientAddr;
            socklen_t clientAddrSize = sizeof(clientAddr);
            if((sock = ::accept(m_socket, (struct sockaddr *)&clientAddr, &clientAddrSize)) == -1)
            {
                    return NULL;
            }
            MySocket* socket = new MySocket(sock);
            return socket;
    }

    MySocket * MySocketServer::accept(int timeout)
    {
            struct timeval timeout;
            timeout.tv_sec = timeout;
            timeout.tv_usec = 0;

            fd_set checkset;
            FD_ZERO(&checkset);
            FD_SET(m_socket, &checkset);

            int status = (int)select((int)(m_socket + 1), &checkset, NULL, NULL, &timeout);
            if(status < 0)
                    return NULL;
            else if(status == 0)
                    return NULL;

            if(FD_ISSET(m_socket, &checkset))
            {
                    return accept();
            }
    }

    三、线程池的实现

    一个线程池一般有一个任务队列,启动的各个线程从任务队列中竞争任务,得到的线程则进行处理:list<MyTask *>  m_taskQueue;

    任务队列由锁保护,使得线程安全:pthread_mutex_t m_queueMutex

    任务队列需要条件变量来支持生产者消费者模式:pthread_cond_t m_cond

    如果任务列表为空,则线程等待,等待中的线程个数为:m_numWaitThreads

    需要一个列表来维护线程池中的线程:vector<MyThread *> m_threads

    每个线程需要一个线程运行函数:

    void * __thread_new_proc(void *p)
    {
        ((MyThread *)p)->run();
        return 0;
    }

    每个线程由MyThread类负责,主要函数如下:

    int MyThread::start()
    {

        pthread_attr_t  attr;
        pthread_attr_init(&attr);
        pthread_attr_setschedpolicy(&attr, SCHED_FIFO);

        int ret = pthread_create(&m_thread, &attr, thread_func, args);
        pthread_attr_destroy(&attr);

        if(ret != 0)
            return –1;

    }

    int MyThread::stop()
    {

        int ret = pthread_kill(m_thread, SIGINT);

        if(ret != 0)
            return –1;
    }

    int MyThread::join()

    {

        int ret = pthread_join(m_thread, NULL);

        if(ret != 0)

            return –1;

    }

    void MyThread::run()

    {

        while (false == m_bStop)

        {

            MyTask *pTask = m_threadPool->getNextTask();

            if (NULL != pTask)

            {

                pTask->process();

            }

        }

    }

    线程池由MyThreadPool负责,主要函数如下:

    int MyThreadPool::init()
    {

        pthread_condattr_t cond_attr;
        pthread_condattr_init(&cond_attr);
        pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED);
        int ret =  pthread_cond_init(&m_cond, &cond_attr);
        pthread_condattr_destroy(&cond_attr);

        if (ret_val != 0)
            return –1;

        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
        ret = pthread_mutex_init(&m_queueMutex, &attr);
        pthread_mutexattr_destroy(&attr);

        if (ret_val != 0)
            return –1;

        for (int i = 0; i< m_poolSize; ++i)
        {
            MyThread *thread = new MyThread(i+1, this);       
            m_threads.push_back(thread);
        }

        return 0;
    }

    int MyThreadPool::start()
    {
        int ret;
        for (int i = 0; i< m_poolSize; ++i)
        {       
           ret = m_threads[i]->start();
           if (ret != 0)
               break;      
        }

        ret = pthread_cond_broadcast(&m_cond);

        if(ret != 0)
            return –1;
        return 0;
    }

    void MyThreadPool::addTask(MyTask *ptask)
    {
        if (NULL == ptask)
            return;

        pthread_mutex_lock(&m_queueMutex);

        m_taskQueue.push_back(ptask);       

        if (m_waitingThreadCount > 0)
            pthread_cond_signal(&m_cond);

        pthread_mutex_unlock(&m_queueMutex);
    }

    MyTask * MyThreadPool::getNextTask()
    {
        MyTask *pTask = NULL; 

        pthread_mutex_lock(&m_queueMutex);

        while (m_taskQueue.begin() == m_taskQueue.end())
        { 
            ++m_waitingThreadCount;

            pthread_cond_wait(&n_cond, &m_queueMutex);

            --m_waitingThreadCount;      
        }   

        pTask = m_taskQueue.front();

        m_taskQueue.pop_front();

        pthread_mutex_unlock(&m_queueMutex);

        return pTask;  
    }

    其中每一个任务的执行由MyTask负责,其主要方法如下:

    void MyTask::process()

    {

        //用read从客户端读取指令

        //对指令进行处理

        //用write向客户端写入结果

    }

    四、连接池的实现

    每个连接池保存一个链表保存已经建立的连接:list<MyConnection *> * m_connections

    当然这个链表也需要锁来进行多线程保护:pthread_mutex_t m_connectionMutex;

    此处一个MyConnection也是一个MyTask,由一个线程来负责。

    线程池也作为连接池的成员变量:MyThreadPool * m_threadPool   

    连接池由类MyConnectionPool负责,其主要函数如下:

    void MyConnectionPool::addConnection(MyConnection * pConn)
    {

        pthread_mutex_lock(&m_connectionMutex);

        m_connections->push_back(pConn);

        pthread_mutex_unlock(&m_connectionMutex);

        m_threadPool->addTask(pConn);
    }

    MyConnectionPool也要启动一个背后的线程,来管理这些连接,移除结束的连接和错误的连接。

    void MyConnectionPool::managePool()
    {

        pthread_mutex_lock(&m_connectionMutex);

        for (list<MyConnection *>::iterator itr = m_connections->begin(); itr!=m_connections->end(); )
        {
            MyConnection *conn = *itr;       
            if (conn->isFinish())
            {
                delete conn;
                conn = NULL;
                list<MyConnection *>::iterator pos = itr++;
                m_connections->erase(pos);                        
            }
            else if (conn->isError())
            {

                //处理错误的连接
                ++itr;
            }
            else
            {
                ++itr;
            }
        }

        pthread_mutex_unlock(&m_connectionMutex);

    }

    五、监听线程的实现

    监听线程需要有一个MySocketServer来监听客户端的连接,每当形成一个新的连接,查看是否超过设置的最大连接数,如果超过则关闭连接,如果未超过设置的最大连接数,则形成一个新的MyConnection,将其加入连接池和线程池。

    MySocketServer *pServer = new MySocketServer(port);

    MyConnectionPool *pPool = new MyConnectionPool();

    while (!stopFlag)

    {

        MySocket * sock = pServer->acceptConnection(5);

        if(sock != null)

        {

            if(m_connections.size > maxConnectionSize)

            {

                sock.close();

            }

            MyTask *pTask = new MyConnection();

            pPool->addConnection(pTask);      

        }

    }

  • 相关阅读:
    angular 动态取到的html片段 在页面的展示
    angular 1.2.29版本下 动态添加多个表单、 校验全部、 提交 、ng-form方案
    解决JS浮点数(小数)计算加减乘除的BUG
    angular分页插件tm.pagination 解决触发二次请求的问题
    Powershell中显示隐藏文件
    canvas 弹幕效果
    安装指南【win10下安装fedora】
    centos7安装chrome的历程(fedora同)
    安装指南:Win10下安装CentOs7
    pushState与replaceState区别
  • 原文地址:https://www.cnblogs.com/forfuture1978/p/1824443.html
Copyright © 2011-2022 走看看