zoukankan      html  css  js  c++  java
  • Nginx accept锁分析

    nginx中accept锁主要是为了防止一个新连接的accept事件导致多个进程被唤醒的问题发生。

    nginx中的进程只有抢占到accept锁时才会将监听的fd放入epoll中,这样同一时间只有一个进程可以接受新连接的accept事件。工作进程抢占到accept锁后,将监听端口的fd放入epoll中,再调用epoll_wait()获取新的事件。

    如果调用epoll_wait获取到新的事件后立即处理这些事件,可能会导致read事件处理时间过长,其他进程无法获取新的连接的问题。为了避免这种情况的发生,nginx将read事件延后处理。维护两个队列:ngx_posted_accept_events存放accept事件,ngx_posted_events队列存放读事件。如果进程获取到accept锁,先处理ngx_posted_accept_events队列,再即释放accept锁,最后处理ngx_posted_events队列。

    这一部分的逻辑在nginx的ngx_event.c:ngx_process_events_and_timers()函数中:

    在epoll_wait之前先抢占accept锁,如果抢占成功,则在中flag增加NGX_POST_EVENTS选项,传入ngx_process_event函数。

        // 抢占accept锁
        if (ngx_use_accept_mutex) {
            if (ngx_accept_disabled > 0) {
                ngx_accept_disabled--;
    
            } else {
                // 获取锁错误
                if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
                    return;
                }
    
                if (ngx_accept_mutex_held) {
                    flags |= NGX_POST_EVENTS;
    
                } else {
                    // 抢占锁失败,设置延时
                    if (timer == NGX_TIMER_INFINITE
                        || timer > ngx_accept_mutex_delay)
                    {
                        timer = ngx_accept_mutex_delay;
                    }
                }
            }
        }

    ngx_trylock_accept_mutex函数在加锁时检查是否有未处理的accept事件,和epoll是否可用,同时调用ngx_shmtx_trylock加锁。如果获得锁将ngx_accept_mutex_held 置为1

    ngx_int_t
    ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
    {
        if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
    
            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "accept mutex locked");
    
            if (ngx_accept_mutex_held && ngx_accept_events == 0) {
                return NGX_OK;
            }
    
            if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
                ngx_shmtx_unlock(&ngx_accept_mutex);
                return NGX_ERROR;
            }
    
            ngx_accept_events = 0;
            ngx_accept_mutex_held = 1;
    
            return NGX_OK;
        }
    
        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex lock failed: %ui", ngx_accept_mutex_held);
    
        if (ngx_accept_mutex_held) {
            if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }
    
            ngx_accept_mutex_held = 0;
        }
    
        return NGX_OK;
    }

    ngx_shmtx_trylock函数通过原子变量进行加锁:

    ngx_uint_t
    ngx_shmtx_trylock(ngx_shmtx_t *mtx)
    {
        return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
    }

    尝试获取accept锁后,进入事件处理函数ngx_process_events:

    (void) ngx_process_events(cycle, timer, flags);

    ngx_process_events是一个函数指针,使用epoll时,指向:ngx_epoll_process_events函数。首先,调用epoll_wait()获取evs,再对事件进行处理:

    // epoll_wait 
    events = epoll_wait(ep, event_list, (int) nevents, timer);
    
    err = (events == -1) ? ngx_errno : 0;

    拿到accept锁,即flags=NGX_POST_EVENTS时:不会直接处理事件,将accept事件放到ngx_posted_accept_events队列,read事件放到ngx_posted_events队列。如果没有拿到accept锁,则处理的全部是read事件,直接进行回调函数处理。

    没获取到accept锁时:如果处理的事件是可写事件,则将事件的ready置为1,供后续流程使用。如果获处理的事件是可读事件,直接调用写事件的回调函数(handler)。

      1     // 处理epoll_wait()返回的事件
      2     for (i = 0; i < events; i++) {
      3         c = event_list[i].data.ptr;
      4 
      5         instance = (uintptr_t) c & 1;
      6         c = (ngx_connection_t *) ((uintptr_t) c & (uintptr_t) ~1);
      7 
      8         rev = c->read;
      9 
     10         // 事件过期
     11         if (c->fd == -1 || rev->instance != instance) {
     12 
     13             /*
     14              * the stale event from a file descriptor
     15              * that was just closed in this iteration
     16              */
     17 
     18             ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
     19                            "epoll: stale event %p", c);
     20             continue;
     21         }
     22 
     23         revents = event_list[i].events;
     24 
     25         ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
     26                        "epoll: fd:%d ev:%04XD d:%p",
     27                        c->fd, revents, event_list[i].data.ptr);
     28 
     29         // 错误事件
     30         if (revents & (EPOLLERR|EPOLLHUP)) {
     31             ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
     32                            "epoll_wait() error on fd:%d ev:%04XD",
     33                            c->fd, revents);
     34 
     35             /*
     36              * if the error events were returned, add EPOLLIN and EPOLLOUT
     37              * to handle the events at least in one active handler
     38              */
     39 
     40             revents |= EPOLLIN|EPOLLOUT;
     41         }
     42 
     43 #if 0
     44         if (revents & ~(EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP)) {
     45             ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
     46                           "strange epoll_wait() events fd:%d ev:%04XD",
     47                           c->fd, revents);
     48         }
     49 #endif
     50 
     51         // 读事件活跃
     52         if ((revents & EPOLLIN) && rev->active) {
     53 
     54 #if (NGX_HAVE_EPOLLRDHUP)
     55             if (revents & EPOLLRDHUP) {
     56                 rev->pending_eof = 1;
     57             }
     58 #endif
     59 
     60             rev->ready = 1;
     61             rev->available = -1;
     62             // 延后处理,存放在队列中
     63             if (flags & NGX_POST_EVENTS) {
     64                 queue = rev->accept ? &ngx_posted_accept_events
     65                                     : &ngx_posted_events;
     66 
     67                 ngx_post_event(rev, queue);
     68 
     69             } else {
     70                 // 调用read回调 ngx_http_wait_request_handler
     71                 // 第一次调用 ngx_event_accept
     72                 rev->handler(rev);
     73             }
     74         }
     75 
     76         wev = c->write;
     77 
     78         // 写事件活跃
     79         if ((revents & EPOLLOUT) && wev->active) {
     80 
     81             if (c->fd == -1 || wev->instance != instance) {
     82 
     83                 /*
     84                  * the stale event from a file descriptor
     85                  * that was just closed in this iteration
     86                  */
     87 
     88                 ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
     89                                "epoll: stale event %p", c);
     90                 continue;
     91             }
     92 
     93             wev->ready = 1;
     94 #if (NGX_THREADS)
     95             wev->complete = 1;
     96 #endif
     97 
     98             // 延后处理,添加到队列
     99             if (flags & NGX_POST_EVENTS) {
    100                 ngx_post_event(wev, &ngx_posted_events);
    101 
    102             } else {
    103                 // 调用写事件的回调函数
    104                 wev->handler(wev);
    105             }
    106         }
    107     }
  • 相关阅读:
    电脑网络连接正常,无法连接浏览器,无法上网
    幂等性
    jvm问题解决
    Mybatis设计模式
    单进程单线程的Redis如何能够高并发
    分布式锁(Zookeeper)
    MyBatis 的 DAO 接口跟 XML 文件里面的 SQL 是如何建立关系的
    ArrayList、LinkedList、Vector、HashSet、Treeset、HashMap、TreeMap的区别和适用场景
    时间函数-线程安全
    socket
  • 原文地址:https://www.cnblogs.com/HadesBlog/p/14573062.html
Copyright © 2011-2022 走看看