zoukankan      html  css  js  c++  java
  • epoll + 多线程实现并发网络连接处理

    epoll + 多线程实现并发网络连接处理

    简介

    触发方式

      条件触发

      边沿触发

    主要的应用接口

    1 Epoll的创建

      根据man手册介绍, epoll_create(int size) 用来创建一个epoll实例,向内核申请支持size个句柄的资源(存储)。Size的大小不代表epoll支持的最大句柄个数,而隐射了内核扩展句柄存储的尺寸,也就是说当后面需要再向epoll中添加句柄遇到存储不够的时候,内核会按照size追加分配。在2.6以后的内核中,该值失去了意义,但必须大于0。

    epoll_create执行成功,返回一个非负的epoll描述句柄,用来指定该资源,否则返回-1。

    例子:

             int epoll_fd = epoll_create(1);

    2 Epoll的控制

             Epoll的控制主要通过epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)完成。控制对象是用户申请的句柄,即fd;Epfd指定所控制的epoll资源;op指对fd的动作,包括向epoll中添加一个句柄EPOLL_CTL_ADD,删除一个句柄EPOLL_CTL_DEL,修改epoll对一个存在句柄的监控模式EPOLL_CTL_MOD;event指出需要让epoll对fd的监控模式(收、发、触发方式等)。epoll_ctl执行成功返回0, 否则返回-1。在介绍该接口之前,我们先看看内核对epoll的事件类型的定义

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    typedef union epoll_data {
     
              void    *ptr;
              int      fd;
              uint32_t u32;
              uint64_t u64;
     
    } epoll_data_t;
     
    struct epoll_event {
     
             uint32_t   events;    /* Epoll events */
             epoll_data_t data;      /* User data variable */
     
    };

    该结构中我们主要看epoll_event。epoll_event->data涵盖了调用epoll_ctl增加或者修改某指定句柄时写入的信息,epoll_event->event,则包含了返回事件的位域。

    例子:

    2.1 向epoll中增加句柄

    2.1.1 增加新的常规句柄:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    struct epoll_event ev;
     
    if(边沿触发)
             ev.events = EPOLIN | EPOLLOUT |EPOLLLET
    else
     
    条件触发(默认)
     
             ev.events = EPOLIN | EPOLLOUT
             ev.data = newfd;
     
             epoll_ctl(epoll_fd, EPOLL_CTL_ADD, newfd, &ev);

     2.1.2 增加网络监听句柄

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Struct epoll_event ev;
     
    if(边沿触发)
             ev.events = EPOLIN | EPOLLLET     (监听句柄只关心输入)
    else
     
    条件触发(默认)
     
             ev.events = EPOLIN;
             ev.data = newfd;
     
             epoll_ctl(epoll_fd, EPOLL_CTL_ADD, newfd, &ev);

     2.2 修改某个句柄的模式

    1
    2
    3
    4
    5
    6
    struct epoll_event newev;
     
    newev.events = NEWMOD;(新的触发方式可通过该接口修改)
    newev.data = oldfd;
     
    epoll_ctl(epoll_fd, EPOLL_CTL_MOD, oldfd, &newev);

     2.3 删除某个句柄

    1
    2
    3
    4
    5
    struct epoll_event newev;
     
    epoll_ctl(epoll_fd,EPOLL_CTL_DEL,oldfd,& newev);
     
    注:在2.6.9以前的内核中,当执行EPOLL_CTL_DEL时,event须非空,但在之后,event可空,一般为了兼容以前的内核版本,我们最好将event非空。

    3 Epoll的监控

             当向epoll中添加若干句柄后,就要进入监控状态,此时通过系统调用epoll_wait(int epfd, struct epoll_event *events,  int maxevents,  int timeout)完成。epoll_wait在执行的时候,在timeout内,将有动作的句柄的信息填充到event,event和maxevents决定了epoll监控句柄的上限。timeout的单位是微妙级别,当为-1时,除非内部句柄有动作,否则持续等待。epoll_wait执行成功返回有动作的句柄的总数,句柄信息在events中包含;如果在超时timeout内返回零,表示没有io请求的句柄;否则返回-1。

    例程

    下面是一个结合网上我做了修整的例子贴出来,简单总结一下epoll的用处。该例子是一个网络环回测试例程,服务器的地址默认,请求连接的端口号是11111

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    307
    308
    309
    310
    311
    312
    313
    314
    315
    316
    317
    318
    319
    320
    321
    322
    323
    324
    325
    326
    327
    328
    329
    330
    331
    332
    333
    334
    335
    336
    337
    338
    339
    340
    341
    342
    343
    344
    345
    346
    347
    348
    349
    350
    351
    352
    353
    354
    355
    356
    357
    358
    359
    360
    361
    362
    363
    364
    365
    366
    367
    368
    369
    370
    371
    #include "xs_epoll.h"   /* epoll releated include file */
    /* create an epoll 实例 */
    int xs_epoll_create(int __c)
    {
        XSOCKET __efd__ = -1;
     
        __efd__ = epoll_create(__c);
        if(__efd__ < 0){
            xs_dump_error_str(errno);
            return EXIT_ERROR;
        }
             
        return (__efd__);
    }
    int xs_epoll_ctl(XSOCKET __efd, int __method, XSOCKET __fd, struct epoll_event* __p)
    {
        int __result__ = -1;
     
        __result__ = (epoll_ctl(__efd, __method, __fd, __p) < 0);
        if(__result__ < 0){
            xs_dump_error_str(errno);
            return EXIT_ERROR;
        }
         
        return EXIT_OK;
    }
    int xs_epoll_wait(XSOCKET __efd, struct epoll_event* __ev, int __c, int __tw)
    {
        int __num__ = -1;
     
        __num__ = epoll_wait(__efd, __ev, __c, __tw);
        if(__num__ < 0)
            __num__ = EXIT_ERROR;
     
        return (__num__);
    }
    int xs_epoll_close(XSOCKET __f)
    {
        return xs_release_socket(__f);
    }
     
    /*
        Add fd to epollfd
        Register the target file descriptor fd on the epoll instance referred to by the file descriptor epfd
        and associate the event with the internal file linked to fd.
    */
    int xs_epoll_add(int __epollfd, int __fd, int __mod)
    {
        xs_epoll_event __xs_ev__;
         
        __xs_ev__.events = __mod;
        __xs_ev__.data.fd = __fd;
     
        if(xs_epoll_ctl(__epollfd, EPOLL_CTL_ADD, __fd, &__xs_ev__) < 0)
                return EXIT_ERROR; 
     
        return EXIT_OK;
    }
     
    /* 
        Remove  (deregister) the target file descriptor fd from the epoll instance referred to by epollfd. 
        The event is ignored and can be NULL.
        In kernel versions before 2.6.9, the EPOLL_CTL_DEL operation required a non-NULL pointer in event
        Since Linux 2.6.9, event can be specified as NULL when using EPOLL_CTL_DEL.
    */
    int xs_epoll_del(int __epollfd, int __fd)
    {
        xs_epoll_event __xs_ev__;
     
        /* We should better set event not NULL */
        __xs_ev__.events = __xs_ev__.events;
        __xs_ev__.data.fd = __fd;
     
        if(xs_epoll_ctl(__epollfd, EPOLL_CTL_DEL, __fd, &__xs_ev__) < 0)
                return EXIT_ERROR; 
     
        return EXIT_OK;
    }
     
    /* Change the event associated with the target file descriptor __fd. */
    int xs_epoll_mode(int __epollfd, int __fd, int __newmod)
    {
        xs_epoll_event __xs_ev__;
     
        __xs_ev__.events = __newmod;
        __xs_ev__.data.fd = __fd;
     
        if(xs_epoll_ctl(__epollfd, EPOLL_CTL_MOD, __fd, &__xs_ev__) < 0)
                return EXIT_ERROR; 
     
        return EXIT_OK;
    }
     
    /*
        xs_epoll_init: creates  an epoll "instance", requesting the kernel to allocate an
        event backing store dimensioned for size descriptors.
        Since Linux 2.6.8, the size argument is unused, but must be greater than zero.
        (The kernel  dynamically  sizes  the  required  data
           structures without needing this initial hint.)
    */<br>int xs_epoll_init(int size)
    {
        int epollfd = -1;
         
        if(size == 0)
            epollfd = xs_epoll_create(EPOLL_DEFAULT_SIZE);
        else epollfd = xs_epoll_create(size);
         
     
        if(epollfd < 0) return EXIT_ERROR;
     
        xs_logd("epoll create success -> fd : %d", epollfd);
     
        return epollfd;
    }
     
     
    //#define __VECTOR__
    #define xs_defult_thread_size   10
    Typedef pthread_cond_t  xs_pthread_cond_t;
    Typedef pthread_mutex_t xs_pthread_mutex_t;
    Typedef int         XSOCKET;
    #define EPOLL_DEFAULT_SIZE  10
    typedef struct block_queue 
    #ifdef __VECTOR__
        vector queue;
    #else
        int queue[xs_defult_thread_size];
        long size;   
    #endif
        xs_pthread_cond_t cond; 
        xs_pthread_mutex_t mutex; 
    }block_queue_t; 
    block_queue_t *bq; 
     
    typedef struct block_queue_param 
        void* func; 
        void* queue;    /* Point to block queue structure */
    }block_queue_param_t; 
    block_queue_param_t bqp;
     
    int g_xs_thread_count = xs_defult_thread_size;
    #define BUFFER_SIZE         1024
    #include <sys/resource.h>
    int g_epoll_fd = -1;
    xs_epoll_event xs_ev, xs_events[EPOLL_DEFAULT_SIZE];
    int g_epoll_size = EPOLL_DEFAULT_SIZE;
    int g_serv_fd = -1;
     
    static int xs_queue_init( block_queue_t *__q)
    {
        if(__q == NULL) return (-1);
         
    #ifdef __VECTOR__
        __q->queue = vector_init(xs_defult_thread_size);
    #else
        __q->size = 0;
    #endif
     
        xs_pthread_cond_init(&(__q->cond), NULL); 
        xs_pthread_mutex_init(&(__q->mutex), NULL); 
     
        return 0;
    }
     
    block_queue_t *xs_epoll_queue_create( void )
    {
        block_queue_t *__q;
     
        __q = xs_malloc(sizeof(block_queue_t));
     
        assert(__q);
     
        return ((xs_queue_init(__q) == 0) ? __q : NULL);
    }
    static inline void xs_network_epoll_loop(void* data)
    {
         int socket;
     
          socket = *(int *)data;
     
        xs_logd("%d !\n", socket);
        char buffer[BUFFER_SIZE];
        xs_pthread_t id = pthread_self();
        xs_logd("thread id is: %ld", id);
     
        /* We only send what recevied just now */
        int   length = xs_net_recv(socket, buffer, BUFFER_SIZE);
        if(length){
            xs_net_send(socket, buffer, strlen(buffer));
            memset(buffer, 0, BUFFER_SIZE);
        }
    }
     
    void *xs_handle_queue(void *param) 
        void(* func)(void* ); 
        int fd;
     
        block_queue_t* bque = ((block_queue_param_t*)param)->queue; 
        func = ((block_queue_param_t*)param)->func;
     
        xs_pthread_cond_init(&bque->cond,  NULL);
        xs_pthread_mutex_init(&bque->mutex,  NULL);
         
        for(;;) 
        
            if(xs_pthread_mutex_lock(&bque->mutex) == EXIT_OK) {
                 
                xs_pthread_cond_wait(&bque->cond, &bque->mutex); 
    #ifdef __VECTOR__
                if(bque->queue->active == 0){
                    xs_pthread_mutex_unlock(&bque->mutex);
                    continue;
                }else {
                    fd = *(int *)(vector_lookup(bque->queue, 0));
                }
    #else
                if(bque->size==0)  { 
                    xs_pthread_mutex_unlock(&bque->mutex);
                    continue;
                }else {    
                     
                    int i;
                    fd = bque->queue[0]; 
     
                    for(i = 0; i < bque->size - 1; ++i) 
                        bque->queue[i] = bque->queue[i + 1]; 
                     
                    bque->queue[bque->size-1] = 0; 
                    bque->size--; 
                
    #endif
                    xs_pthread_mutex_unlock(&bque->mutex); 
            }
             
            func((void *)&fd); 
        
     
    int xs_init_threads( void 
        int i = 0, ret; 
        xs_pthread_t child_thread[g_xs_thread_count]; 
        xs_pthread_attr_t child_thread_attr[g_xs_thread_count]; 
     
        bqp.func = (void*)xs_network_epoll_loop;
        bqp.queue = (void *)bq;
     
        for( i = 0; i < g_xs_thread_count; ++i)  { 
            ret = xs_pthread_attr_init(&child_thread_attr[i]); 
            if(ret != 0) xs_logd("error to init attr !\n");
            pthread_attr_setdetachstate(&child_thread_attr[i], PTHREAD_CREATE_DETACHED); 
            if( pthread_create(&child_thread[i],
                &child_thread_attr[i], xs_handle_queue, (void *)&bqp) < 0 ) { 
                    xs_logd("pthread_create Failed : %s - %m\n",strerror(errno)); 
                    return 1; 
            
            
        return 0;
    int xs_init_server(const char *name, short int port)
    {
        struct rlimit rt; 
        int server_socket = -1;
         
        server_socket = xs_create_server(name, port);
     
        rt.rlim_max = rt.rlim_cur = g_epoll_size; 
        if (setrlimit(RLIMIT_NOFILE, &rt) == -1) { 
            xs_logd("setrlimit - %m"); 
            exit(1); 
        
     
        return server_socket; 
    }
     
    static void xs_insert_queue(block_queue_t *bque, int *fd) 
        xs_pthread_mutex_lock(&bque->mutex); 
         
    #ifdef __VECTOR__
        vector_set(bque->queue, fd);
    #else
        if(bque->size == g_xs_thread_count) 
            return
     
        bque->queue[bque->size] = *fd; 
        bque->size++;
        if(bque->size > g_xs_thread_count)  { 
            fprintf(stderr,"Queue size over folow.%ld", bque->size); 
            exit (1); 
        
    #endif
     
        xs_pthread_cond_signal(&bque->cond); 
        xs_pthread_mutex_unlock(&bque->mutex); 
     
    }
     
    static inline void xs_handler(void* fd) 
        printf("handler:fd => %d\n", *(int *)(fd)); 
        xs_insert_queue(bq, fd);
     
    int xs_epoll_entry()
    {
        int nfds, n;
         
        g_serv_fd = xs_init_server(NULL, 11111);
        xs_logd("server thread [FD:%d] is ready for ...", g_serv_fd);
     
         
        bq = xs_epoll_queue_create();
        assert(bq);
         if(xs_init_threads() == 0) 
                xs_logd("Threads ready to use !"); 
     
        g_epoll_fd = xs_epoll_init(g_epoll_size);
        xs_epoll_add(g_epoll_fd, g_serv_fd, EPOLLIN | EPOLLET);
     
         for(;;) { 
                struct sockaddr_in local; 
                socklen_t length = sizeof(local); 
                int client = -1; 
                   
                nfds = xs_epoll_wait(g_epoll_fd, xs_events,
    EPOLL_DEFAULT_SIZE, epoll_wait_indefinite); 
     
                for(n = 0; n < nfds; ++n) { 
     
                if(xs_events[n].data.fd == g_serv_fd) { 
                    client = xs_net_accept(g_serv_fd, (struct sockaddr *)&local, &length); 
                    if(client < 0) { 
                        xs_logd("%s", strerror(errno)); 
                        continue
                    else {
                        xs_logd("add socket pool : %d", client);
                        set_nonblocking(client); 
                        xs_epoll_add(g_epoll_fd, client, EPOLLIN | EPOLLOUT | EPOLLET);
                        client = -1;
                    }
                
                else  /* It's a client fd that needed to process */
                    xs_handler((void *)&xs_events[n].data.fd);
                
            
     
        xs_pthread_mutex_destroy(&bq->mutex);
        xs_pthread_cond_destroy(&bq->cond);
        xs_close_socket(g_serv_fd);
        xs_epoll_close(g_epoll_fd);
    #ifdef __VECTOR__
        vector_free(bq->queue);
    #endif
     
        xs_free(bq);
        return 0;
    }
     
    int main(int argc, char *argv[])
    {
        argc = argc;
        argv = argv;
     
        xs_epoll_entry();
     
        return 0;
    }
    【本博客 http://www.cnblogs.com/iTsihang 中原创之博文,版权属作者所有,欢迎转载。转载之时请保留本段内容,否则作者将保留追究版权之权利】
     
  • 相关阅读:
    .net程序员书单
    脱敏小软件
    .NET处理HTTP请求
    WPF 按名称查找控件
    软件工程现行国标汇集
    企业应用架构模式读书笔记 第一章 分层
    mysql远程访问
    知道二叉树的先序和中序遍历,重建该二叉树
    微信小程序地图模块
    微信小程序蓝牙模块
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3095914.html
Copyright © 2011-2022 走看看