zoukankan      html  css  js  c++  java
  • linux tcp server

    这里分析两种模型

    A: 来源于网络,http://bbs.chinaunix.net/thread-4067753-1-1.html,号称50万QPS

    B: 本人自己写的,我觉得性能上比上述的模型要好

    ——————————————————————————————————————————

    A:

    #define _GNU_SOURCE

    #include <stdlib.h>

    #include <stdio.h>

    #include <string.h>

    #include <sched.h>

    #include <pthread.h>

    #include <sys/epoll.h>

    #include <sys/socket.h>

    #include <netinet/in.h>

    #include <arpa/inet.h>

    #include <signal.h>

    #include <unistd.h>

    #include <fcntl.h>

    #include <errno.h>

    #include <time.h>

    typedef struct connection_st {

        int sock;

        int index; /* which epoll fd this conn belongs to*/

        int using;

    #define BUF_SIZE 4096

        int roff;

        char rbuf[BUF_SIZE];

        int woff;

        char wbuf[BUF_SIZE];

    }*connection_t;

    #define CONN_MAXFD 65536

    struct connection_st g_conn_table[CONN_MAXFD] = {0};

    static sig_atomic_t shut_server = 0;

    void shut_server_handler(int signo) {

        shut_server = 1;

    }

    #define EPOLL_NUM 8

    int epfd[EPOLL_NUM];

    int lisSock;

    #define WORKER_PER_GROUP 1

    #define NUM_WORKER (EPOLL_NUM * WORKER_PER_GROUP)

    pthread_t worker[NUM_WORKER]; /* echo group has 6 worker threads */

    int sendData(connection_t conn, char *data, int len) {

        if (conn->woff){

            if (conn->woff + len > BUF_SIZE) {

                return -1;

            }

            memcpy(conn->wbuf + conn->woff, data, len);

            conn->woff += len;

            return 0;

        } else {

            int ret = write(conn->sock, data, len);

            if (ret > 0){

                if (ret == len) {

                    return 0;

                }

                int left = len - ret;

                if (left > BUF_SIZE) return -1;

               

                memcpy(conn->wbuf, data + ret, left);

                conn->woff = left;

            } else {

                if (errno != EINTR && errno != EAGAIN) {

                    return -1;

                }

                if (len > BUF_SIZE) {

                    return -1;

                }

                memcpy(conn->wbuf, data, len);

                conn->woff = len;

            }

        }

        return 0;

    }

    int handleReadEvent(connection_t conn) {

        if (conn->roff == BUF_SIZE) {

            return -1;

        }

       

        int ret = read(conn->sock, conn->rbuf + conn->roff, BUF_SIZE - conn->roff);

        if (ret > 0) {

            conn->roff += ret;

           

            int beg, end, len;

            beg = end = 0;

            while (beg < conn->roff) {

                char *endPos = (char *)memchr(conn->rbuf + beg, ' ', conn->roff - beg);

                if (!endPos) break;

                end = endPos - conn->rbuf;

                len = end - beg + 1;

               

                /*echo*/

                if (sendData(conn, conn->rbuf + beg, len) == -1) return -1;

                beg = end + 1;

                printf("request_finish_time=%ld ", time(NULL));

            }

            int left = conn->roff - beg;

            if (beg != 0 && left > 0) {

                memmove(conn->rbuf, conn->rbuf + beg, left);

            }

            conn->roff = left;

        } else if (ret == 0) {

            return -1;

        } else {

            if (errno != EINTR && errno != EAGAIN) {

                return -1;

            }

        }

        return 0;

    }

    int handleWriteEvent(connection_t conn) {

        if (conn->woff == 0) return 0;

        int ret = write(conn->sock, conn->wbuf, conn->woff);

        if (ret == -1) {

            if (errno != EINTR && errno != EAGAIN) {

                return -1;

            }

        } else {

            int left = conn->woff - ret;

            if (left > 0) {

                memmove(conn->wbuf, conn->wbuf + ret, left);

            }

            conn->woff = left;

        }

        return 0;

    }

    void closeConnection(connection_t conn) {

        struct epoll_event evReg;

        conn->using = 0;

        conn->woff = conn->roff = 0;

        epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);

        close(conn->sock);

    }

    void *workerThread(void *arg) {

        int epfd = *(int *)arg;

       

        struct epoll_event event;

        struct epoll_event evReg;

        /* only handle connected socket */

        while (!shut_server) {

            int numEvents = epoll_wait(epfd, &event, 1, 1000);

            

            if (numEvents > 0) {

                int sock = event.data.fd;

                connection_t conn = &g_conn_table[sock];

                   

                if (event.events & EPOLLOUT) {

                    if (handleWriteEvent(conn) == -1) {

                        closeConnection(conn);

                        continue;

                    }

                }

                if (event.events & EPOLLIN) {

                    if (handleReadEvent(conn) == -1) {

                        closeConnection(conn);

                        continue;

                    }

                }

                   

                evReg.events = EPOLLIN | EPOLLONESHOT;

                if (conn->woff > 0) evReg.events |= EPOLLOUT;

                evReg.data.fd = sock;

                epoll_ctl(epfd, EPOLL_CTL_MOD, conn->sock, &evReg);

            }

        }

        return NULL;

    }

    void *listenThread(void *arg) {

        int lisEpfd = epoll_create(5);

        struct epoll_event evReg;

        evReg.events  = EPOLLIN;

        evReg.data.fd = lisSock;

        epoll_ctl(lisEpfd, EPOLL_CTL_ADD, lisSock, &evReg);

       

        struct epoll_event event;

        int rrIndex = 0; /* round robin index */

       

        /* only handle listen socekt */

        while (!shut_server) {

            int numEvent = epoll_wait(lisEpfd, &event, 1, 1000);

            if (numEvent > 0) {

                int sock = accept(lisSock, NULL, NULL);

                if (sock > 0) {

                    g_conn_table[sock].using = 1;

                       

                    int flag;

                    flag = fcntl(sock, F_GETFL);

                    fcntl(sock, F_SETFL, flag | O_NONBLOCK);

                       

                    evReg.data.fd = sock;

                    evReg.events = EPOLLIN | EPOLLONESHOT;

                               

                    /* register to worker-pool's epoll,

                     * not the listen epoll */

                    g_conn_table[sock].index= rrIndex;

                    epoll_ctl(epfd[rrIndex], EPOLL_CTL_ADD, sock, &evReg);

                    rrIndex = (rrIndex + 1) % EPOLL_NUM;

                }

            }

        }

        close(lisEpfd);

        return NULL;

    }

    int main(int argc, char *const argv[]) {

        int c;

        for (c = 0; c < CONN_MAXFD; ++c) {

            g_conn_table[c].sock = c;

        }

        struct sigaction act;

        memset(&act, 0, sizeof(act));

        act.sa_handler = shut_server_handler;

        sigaction(SIGINT, &act, NULL);

        sigaction(SIGTERM, &act, NULL);

        /* create 2 different epoll fd */

       

        lisSock = socket(AF_INET, SOCK_STREAM, 0);

       

        int reuse = 1;

        setsockopt(lisSock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));

       

        int flag;

        flag = fcntl(lisSock, F_GETFL);

        fcntl(lisSock, F_SETFL, flag | O_NONBLOCK);

        struct sockaddr_in lisAddr;

        lisAddr.sin_family = AF_INET;

        lisAddr.sin_port = htons(9876);

        lisAddr.sin_addr.s_addr = htonl(INADDR_ANY);

       

        if (bind(lisSock, (struct sockaddr *)&lisAddr, sizeof(lisAddr)) == -1) {

            perror("bind");

            return -1;

        }

        listen(lisSock, 4096);

       

        pthread_t lisTid;

        pthread_create(&lisTid, NULL, listenThread, NULL);

        int epi;

        for (epi = 0; epi < EPOLL_NUM; ++ epi) {

            epfd[epi] = epoll_create(20);

        }

        int i;

        cpu_set_t mask;

        for (i = 0; i < EPOLL_NUM; ++i) {

            int j;

            for (j = 0; j < WORKER_PER_GROUP; ++j) {

                pthread_create(worker + (i * WORKER_PER_GROUP + j), NULL, workerThread, epfd + i);

                CPU_ZERO(&mask);

                CPU_SET(i, &mask);

                if (pthread_setaffinity_np(*(worker + (i * WORKER_PER_GROUP + j)), sizeof(mask), &mask) < 0)

                {

                    fprintf(stderr, "set thread affinity failed ");

                }

            }

        }

       

        for (i = 0; i < NUM_WORKER; ++i) {

            pthread_join(worker[i], NULL);

        }

        pthread_join(lisTid, NULL);

       

        struct epoll_event evReg;

        for (c = 0; c < CONN_MAXFD; ++c) {

            connection_t conn = g_conn_table + c;

            if (conn->using) {

                epoll_ctl(epfd[conn->index], EPOLL_CTL_DEL, conn->sock, &evReg);

                close(conn->sock);

            }

        }   

        for (epi = 0; epi < EPOLL_NUM; ++epi) {

            close(epfd[epi]);

        }

        close(lisSock);

        return 0;

    }

    B:

    #define _GNU_SOURCE

    #include <stdio.h>

    #include <stdlib.h>

    #include <string.h>

    #include <sys/types.h>

    #include <sys/socket.h>

    #include <netdb.h>

    #include <unistd.h>

    #include <fcntl.h>

    #include <sys/epoll.h>

    #include <error.h>

    #include <errno.h>

    #include <signal.h>

    #include <sched.h>

    #include "thread-pool.h"

    #define MAXEVENTS 64

    static int make_socket_non_blocking(int sfd)

    {

        int flags, s;

        flags = fcntl(sfd, F_GETFL, 0);

        if (-1==flags)

        {

            perror("fcntl");

             return -1;

        }

        flags |= O_NONBLOCK;

        s = fcntl(sfd, F_SETFL, flags);

        if (-1==s)

        {

            perror("fcntl");

             return -1;

        }

        return 0;

    }

    static int create_and_bind(char *port)

    {

        struct addrinfo hints;

        struct addrinfo *result, *rp;

        int s, sfd;

        memset(&hints, 0, sizeof(struct addrinfo));

        hints.ai_family = AF_UNSPEC;//return IPv4 and IPv6 choices

        hints.ai_socktype = SOCK_STREAM;//we want a TCP socket

        hints.ai_flags = AI_PASSIVE;//all interfaces

        s = getaddrinfo(NULL, port, &hints, &result);

        if (0!=s)

        {

            fprintf(stderr, "getaddrinfo:%s ", gai_strerror(s));

             return -1;

        }

        for(rp=result; NULL!=rp; rp=rp->ai_next)

        {

            sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);

             if (-1==sfd)

            {

                 continue;

             }

             s = bind(sfd, rp->ai_addr, rp->ai_addrlen);

             if (0==s)

             {

                 //we managed to bind successfully

                 break;

             }

             close(sfd);

        }

        if (NULL==rp)

        {

             fprintf(stderr, "could not bind");

                 return -1;

        }

        freeaddrinfo(result);

        return sfd;

    }

    int run = 1;

    void SignalHandler(int iSignNum)

    {

        printf("capture signal number:%d ", iSignNum);

        run = 0;

    }

    void *handler(void *arg)

    {

        int s;

        int fd = *((int*)arg);

        /*we have data on the fd waiting to be read. read and

        display it. we must read whatever data is available

        completely, as we are running in edge-triggered mode

        and won't get notification again for the same data.*/

        int done = 0;

        while(1)

        {

            ssize_t count;

            char buf[512];

            count = read(fd, buf, sizeof(buf));

            if (-1==count)

            {

                /*if errno==EAGAIN, that means we have read all

                data. so go back to the main loop*/

                if (errno==EAGAIN||errno==EWOULDBLOCK)

                {

                    done = 1;

                }

                else

                {

                    fprintf(stderr, "fd:%d ", fd);

                    perror("read client data");

                }

                break;

            }

            else if (0==count)

            {

                /*end of file. the remote has closed the connection*/

                done = 1;

                break;

            }

            //write the buffer to standard output

            s = write(1, buf, count);

            if (-1==s)

            {

                perror("write");

                abort();

            }

        }

        if (done)

        {

            write(fd, "fine, thank you", strlen("fine, thank you")+1);

            printf("closed connection on descriptor %d ", fd);

            /*closing the descriptor will make epoll remove it

            from the set of descriptors which are monitored.*/

            close(fd);

        }

    }

    int main(int argc, char *argv[])

    {

        int sfd, s;

        int efd;

        struct epoll_event event;

        struct epoll_event *events;

        if (2!=argc)

        {

            fprintf(stderr, "Usage:%s [port] ", argv[0]);

                 exit(EXIT_FAILURE);

        }

        // init thread-pool

        unsigned count = 1;

        count = sysconf(_SC_NPROCESSORS_ONLN);

        pool_init(count);

        thread_pool *pool = (thread_pool*)pool_instance();

        // wait thread to run

        sleep(5);

        // thread cpu affinity

        cpu_set_t mask;

        cpu_set_t get;

        int thread_ccore = 0;

        for(thread_ccore=0; thread_ccore<count; thread_ccore++)

        {

            CPU_ZERO(&mask);

            CPU_SET(thread_ccore, &mask);

            if (pthread_setaffinity_np(pool->threadid[thread_ccore], sizeof(mask), &mask) < 0)

            {

                fprintf(stderr, "set thread affinity failed ");

            }

            CPU_ZERO(&get);

            if (pthread_getaffinity_np(pool->threadid[thread_ccore], sizeof(get), &get) < 0)

            {

                fprintf(stderr, "get thread affinity failed ");

            }

            if (CPU_ISSET(thread_ccore, &get))

            {

                printf("thread %ld is running in processor %d ", pool->threadid[thread_ccore], thread_ccore);

            }

        }

        // listen

        sfd = create_and_bind(argv[1]);

        if (-1==sfd)

        {

            abort();

        }

        s = make_socket_non_blocking(sfd);

        if (-1==s)

        {

            abort();

        }

        s = listen(sfd, SOMAXCONN);

        if (-1==s)

        {

                 perror("listen");

                 abort();

        }

        efd = epoll_create1(0);

        if (-1==efd)

        {

            perror("epoll_create");

            abort();

        }

        event.data.fd = sfd;

        event.events = EPOLLIN|EPOLLET;

        s = epoll_ctl(efd, EPOLL_CTL_ADD, sfd, &event);

        if (-1==s)

        {

            perror("epoll_ctl");

            abort();

        }

       

        //buffer where events are returned

        events = calloc(MAXEVENTS, sizeof event);

        //the event loop

        while(1)

        {

            signal(SIGINT, SignalHandler);

            if (!run)

            {

                break;

            }

            int n, i;

           

            n = epoll_wait(efd, events, MAXEVENTS, -1);

            for(i=0; i<n; i++)

            {

                if ((events[i].events&EPOLLERR)||

                    (events[i].events&EPOLLHUP)||

                    (!(events[i].events&EPOLLIN)))

                {

                    /*an error has occured on this fd, or the socet is not

                     ready for reading (whe were we notified then?) */

                    fprintf(stderr, "epoll error ");

                    close(events[i].data.fd);

                    continue;

                }

                else if (sfd!=events[i].data.fd)

                {

                    pool_add_job(handler, (void*)&(events[i].data.fd));

                }

                else

                {

                    /*we have a notification on the listening socket, which

                     means one or more incoming connections*/

                    while(1)

                    {

                        struct sockaddr in_addr;

                        socklen_t in_len;

                        int infd;

                        char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                        in_len = sizeof in_addr;

                        infd = accept(sfd, &in_addr, &in_len);

                        if (-1==infd)

                        {

                            if ((errno==EAGAIN)||

                                (errno==EWOULDBLOCK))

                            {

                                //we have processed all incoming connections

                                break;

                            }

                            else

                            {

                                perror("accept");

                                break;

                            }

                        }

                        s = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf,

                                        sbuf, sizeof sbuf,

                                        NI_NUMERICHOST|NI_NUMERICSERV);

                        if (0==s)

                        {

                            printf("accepted connection on descriptor %d"

                                    "(host=%s, port=%s) ", infd, hbuf, sbuf);

                        }

                        /*make the incoming socket non-blocking and add it to the

                         list of fds to monitor*/

                        s = make_socket_non_blocking(infd);

                        if (-1==s)

                        {

                            abort();

                        }

                        event.data.fd = infd;

                        event.events = EPOLLIN|EPOLLET|EPOLLONESHOT;

                        s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);

                        if (-1==s)

                        {

                            perror("epoll_ctl");

                            abort();

                        }

                    }

                }//else

            }

        }

        free(events);

        close(sfd);

        close(efd);

        pool_destroy();

        printf("process exit ");

    }

    ————————————————————————————————————————————————————————————————————

    本机环境,CPU八核(虚拟机)

    8 工作者 线程

    A: 测试

    //LoadRunner

    #include "lrs.h"


    Action()
    {
        lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:9876",  LrsLastArg);

        lr_think_time(7);

        lrs_send("socket0", "buf0", LrsLastArg);

        lrs_receive("socket0", "buf1", LrsLastArg);

        lrs_close_socket("socket0");

        return 0;
    }

    B: 测试

    #include "lrs.h"


    Action()
    {
        lrs_create_socket("socket0", "TCP", "RemoteHost=10.20.61.117:8093",  LrsLastArg);

        lr_think_time(6);

        lrs_send("socket0", "buf0", LrsLastArg);

        lrs_receive("socket0", "buf1", LrsLastArg);

        lrs_close_socket("socket0");

        return 0;
    }

     Finally:

    A模式:多epoll, 多线程。accept后,将socket fd分配给各个epoll fd,各个线程epoll_wait各自的epoll fd,不设置锁。

                 以大多数开发者的想法,这种不设置锁的多线程应该高效。但其实不然!!!!

                 首先,这个模型里,各个线程没有休眠,再有,connnect结构占用内容偏高。

         结果,造成系统响应迟钝,退出缓慢,网络吞吐并不高。

                 

         

    B模式:单epoll,启用工作者线程池

         大多数开发者看见了线程池有锁,就认为效率低下。其实不然!!!!

                 有人分析过,内核锁的效率不是应用效率的主障碍!!!!!!!

                 首先,这个模型里,cpu和内存占用极低,所有耗时都费在了应该费时的I/O上。

                 结果,系统响应极快,退出正常,网络吞吐是上个模型的2.5倍

                 

                 

    有时候,新生事务是要比老事务先进的多的。因为A模型实在2013年提出的!!!

  • 相关阅读:
    python的基本操作while循环体
    python中类的神奇方法应用案例
    Python中类的神奇方法
    python 中类的初始化方法
    Python中类的创建和self的作用
    投掷骰子的游戏,键值对
    字典常用方法
    剪刀石头布
    math 模块
    PYTHON内置模块
  • 原文地址:https://www.cnblogs.com/woodzcl/p/7987249.html
Copyright © 2011-2022 走看看