zoukankan      html  css  js  c++  java
  • 多路复用构建高性能服务器

    通过多路复用构建高性能服务器是一种常见的模型,单个I/O多路复用线程+一组工作线程,I/O线程负责协调分配任务,而实际工作交给工作线程处理。这种模型的好处在于高效并发和充分利用多线程的处理能力。

    以memcached的构架图为例

    memcached的主线程用epoll监听到EPOLLIN事件,并且触发该事件的fd是服务器listen的fd,就accept该连接请求,返回的fd主线程并不处理,而是通过CQ队列发送给工作线程去处理,工作线程又维护了一个epoll多路复用队列,子线程的epoll轮询和响应请求。 

      这种架构包含两大块:

    1.多路复用

     #include<iostream>

     #include<stdlib.h>

    #include<sys/epoll.h>
    #include<sys/socket.h>
    #include<netinet/in.h>
    #include<sys/types.h>
    #include<fcntl.h>

    using namespace std;
    const int PORT = 8888;
    const int MAX_CLIENT_NUM = 10000;
    const int MAX_LEN = 2000;

    bool setfdnoblock(int fd)
    {
        int flg = fcntl(fd, F_GETFL);
        if(flg < 0)
        {
            cout << "get fd flag failed" << endl;
            return false;
        }
        if(fcntl(fd, F_SETFL, O_NONBLOCK | flg) < 0)
        {
            return false;
        }
        return true;
    }

    int CreateTcpServer(int port, int listennum)
    {
        int fd;
        fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

        sockaddr_in TcpServer;
        bzero(&TcpServer, sizeof(TcpServer));
        TcpServer.sin_family = AF_INET;
        TcpServer.sin_port = htons(8888);
        TcpServer.sin_addr.s_addr = htonl(INADDR_ANY);

        int iRet = bind(fd, (struct sockaddr*)&TcpServer, sizeof(TcpServer));
        if(-1 == iRet)
        {
        cout << "server bind error!" << endl;
        return -1;
        }
        if(listen(fd, listennum) == -1)
        {
            cout << "server listen error" << endl;
            return -1;
        }
        return fd;
    }

    int main()
    {
        int Serverfd = CreateTcpServer(PORT, MAX_CLIENT_NUM);
        if(Serverfd == -1)
        {
            cout << "server create failed" << endl;
        }
        else
        {
           cout << "serverfd is :" << Serverfd << endl;
        }

        int Epollfd = epoll_create(MAX_CLIENT_NUM);
        if(Epollfd == -1)
        {
            cout << "epoll_create failed" << endl;
        }
        epoll_event ev, events[MAX_CLIENT_NUM];
        int nfds = 0;
        int client = 0;
        char buff[MAX_LEN];
        sockaddr_in CliAddr;
        unsigned int iCliSize = sizeof(CliAddr);
        ev.events = EPOLLIN|EPOLLOUT;
        ev.data.fd = Serverfd;
        if(!setfdnoblock(Serverfd))
        {
            cout << "set serverfd no_block failed" << endl;
        }
        if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, Serverfd, &ev))
        {
            cout << "epoll add serverfd error" << endl;
        }
        while(1)
        {
            nfds = epoll_wait(Epollfd, events, MAX_CLIENT_NUM, 100000);
            if(nfds == -1)
            {
                cout << "error occur, exit" << endl;
                return -1;
            }
            else if( nfds == 0)
            {
                cout << "epoll_wait return zero" << endl;
            }
            else
            {
                for(int i = 0; i < nfds; i++)
                {
                    cout << "events[i].data.fd is :" << events[i].data.fd << endl;
                    if(events[i].data.fd == Serverfd)
                    {
                        cout << " Serverfd received event" << endl;
                        client = accept(Serverfd, (struct sockaddr*)&CliAddr, &iCliSize);
                        if(client == -1)
                        {
                            cout << "accept error" << endl;
                            return -1;
                        }
                        ev.data.fd = client;
                        if(!setfdnoblock(client))
                        {
                            cout << "set client fd no_block error" << endl;
                        }
                        if(epoll_ctl(Epollfd, EPOLL_CTL_ADD, client, &ev))
                        {
                            cout << "epoll add client error" << endl;
                        }
                        else
                        {
                            cout << "success add client" << endl;
                        }
                    }
                    else if(events[i].events&EPOLLIN)
                    {
                        cout << "recv client msg" << endl;
                        if(events[i].data.fd < 0)
                        {
                            cout << " event[i].data.fd is smaller than zero" << endl;
                            continue;
                        }
                        if(read(events[i].data.fd, buff, MAX_LEN) == -1)
                        {
                            perror("clifd read");
                        }
                        else
                        {
                            cout << "read client msg suc" << endl;
                            printf("%s",buff);
                        }
                        char resp[] = "recv a client msg, this is resp msg";
                        write(events[i].data.fd, resp, strlen(resp)+1);
                        //read and mod
                    }
                    else if(events[i].events&EPOLLOUT)
                    {
                        //send and mod
                    }
                }
            }
        }
    }

    例子中epoll listen和accept新连接,并响应新连接的请求。

    2.工作线程or线程池

    进程的线程数量并不是越多越好,也不是越少越好,需要根据机器逐步调优。

    工作线程的工作原理, 

    1.I/O线程把收到的请求放入队列,并通知工作线程处理,队列和通知机制可以是传统的加锁消息队列、信号量,也可以是memcached+libevent的实现:CQ队列装消息,线程管道通知工作线程。

    2.I/O线程没有新的任务分配,工作线程阻塞或等待一段时间。 

    线程池用到的比较少,不做评价。 

  • 相关阅读:
    传智博客.NET培训第13季 Ajax教程(共十三季) 学习资源
    一些sql语句的常用总结(重要)
    处理oracle的死锁
    Adroid 总结--android ListView美化,个性化更改的属性
    如何远程备份sql server数据库
    VSS (Visual Source Safe 2005) 用法详解
    php插入代码数据库
    PHP之PHP文件引用详解
    需要引入库:vue-resource
    axios调用详解
  • 原文地址:https://www.cnblogs.com/learn-my-life/p/5339127.html
Copyright © 2011-2022 走看看