zoukankan      html  css  js  c++  java
  • ngx accept_mutex

    尝试获取锁,如果获取了锁,那么还要将当前监听端口全部注册到当前worker进程的epoll当中去  获取失败就需要确保此时ls-fd 没有被 epoll 监听

    ngx_int_t
    ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
    {
        if (ngx_shmtx_trylock(&ngx_accept_mutex)) {//
    
            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                           "accept mutex locked");
    
            //如果本来已经获得锁,则直接返回Ok   
            if (ngx_accept_mutex_held && ngx_accept_events == 0) {
                return NGX_OK;
            }
    
       //到达这里,说明重新获得锁成功,因此需要打开被关闭的listening句柄,调用ngx_enable_accept_events函数,将监听端口注册到当前worker进程的epoll当中去   
            if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
                ngx_shmtx_unlock(&ngx_accept_mutex);
                return NGX_ERROR;
            }
    
            ngx_accept_events = 0;
            ngx_accept_mutex_held = 1; ////表示当前获取了锁   
    
            return NGX_OK;
        }
    
        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex lock failed: %ui", ngx_accept_mutex_held);
    
    //这里表示的是以前曾经获取过,但是这次却获取失败了,那么需要将监听端口从当前的worker进程的epoll当中移除,调用的是ngx_disable_accept_events函数   
    -----当前进程没有获取到锁,证明是由别的进程获取到了。如果ngx_accept_mutex_held的值为1,证明该锁原来是由
        //本进程持有,即监听socket原先是加入到了本进程的事件驱动机制当中的。因此这里在进入下一次事件驱动机制(select/
        // poll/eploll)之前,我们需要先disable掉
        if (ngx_accept_mutex_held) {
            if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }
    
            ngx_accept_mutex_held = 0;
        }
    
        return NGX_OK;
    }
    
    
    static ngx_int_t
    ngx_enable_accept_events(ngx_cycle_t *cycle)
    {
        ngx_uint_t         i;
        ngx_listening_t   *ls;
        ngx_connection_t  *c;
    
        ls = cycle->listening.elts;
    
        /*本进程ngx_enable_accept_events把所有listen加入本进程epoll中后,本进程获取到ngx_accept_mutex锁后,在执行accept事件的
        过程中如果如果其他进程也开始ngx_trylock_accept_mutex,如果之前已经获取到锁,并把所有的listen添加到了epoll中,这时会因为没法获取到
        accept锁,而把之前加入到本进程,但没有accept过的时间全部清除。和ngx_disable_accept_events配合使用
        最终只有一个进程能accept到同一个客户端连接
         */
        for (i = 0; i < cycle->listening.nelts; i++) { 
    
            c = ls[i].connection;
    
            //后面的ngx_add_event->ngx_epoll_add_event中把listening中的c->read->active置1, ngx_epoll_del_event中把listening中置read->active置0
            if (c == NULL || c->read->active) { //之前本进程已经添加过,不用再加入epoll事件中,避免重复
                continue;
            }
    
            char tmpbuf[256];
            
            snprintf(tmpbuf, sizeof(tmpbuf), "<%25s, %5d> epoll NGX_READ_EVENT(et) read add", NGX_FUNC_LINE);
            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, tmpbuf);
            if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) { //ngx_epoll_add_event
                return NGX_ERROR;
            }
        }
    
        return NGX_OK;
    }

    1

    /addr为共享内存ngx_shm_alloc开辟的空间中的一个128字节首地址 --cache 长度
    ngx_int_t
    ngx_shmtx_create(ngx_shmtx_t *mtx, ngx_shmtx_sh_t *addr, u_char *name)
    {
        mtx->lock = &addr->lock;    //直接执行共享内存空间addr中的lock区间中
    
        if (mtx->spin == (ngx_uint_t) -1) { //注意,当spin值为-1时,表示不能使用信号量,这时直接返回成功
            return NGX_OK;
        }
    mtx->lock 原子操作数,在加锁时,作为判断条件值。
    mtx->spin 用于判断mtx->lock的次数,nginx的锁不是盲目的轮询判断或者判断只判断一次,它是选取了一个spin的循环判断次数,超过次数,让出cpu或者等待信号处理。
        mtx->spin = 2048; //spin值默认为2048
    
    //同时使用信号量
    #if (NGX_HAVE_POSIX_SEM)
        mtx->wait = &addr->wait;
    
        /*
        int  sem init (sem_t  sem,  int pshared,  unsigned int value) ,
        其中,参数sem即为我们定义的信号量,而参数pshared将指明sem信号量是用于进程间同步还是用于线程间同步,当pshared为0时表示线程间同步,
        而pshared为1时表示进程间同步。由于Nginx的每个进程都是单线程的,因此将参数pshared设为1即可。参数value表示信号量sem的初始值。
         */
        //以多进程使用的方式初始化sem信号量,sem初始值为0
        if (sem_init(&mtx->sem, 1, 0) == -1) {
            ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_errno,
                          "sem_init() failed");
        } else {
            mtx->semaphore = 1; //在信号量初始化成功后,设置semaphore标志位为1
        }
    
    #endif
    
        return NGX_OK;
    }
    /*
    首先是判断mtx的lock域是否等于0,如果不等于,那么就直接返回false好了,如果等于的话,那么就要调用原子操作ngx_atomic_cmp_set了,
    它用于比较mtx的lock域,如果等于零,那么设置为当前进程的进程id号,否则返回false。
    */ 
    ngx_uint_t
    ngx_shmtx_trylock(ngx_shmtx_t *mtx)
    {
        return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
    }

    使用ngx_pid 对lock赋值,ngx_pid用于区分进程,不可能重复

    接下来是一个for循环判断,判断多次无果,才进行下一步休眠操作或等待操作,

    接下来是一个选择,在自旋锁直接使用ngx_sched_yield函数,让出cpu,等待下次判断,在共享内存锁多了一个信号量的选择。

    等待sem_post唤醒,此时阻塞。

    sem_wait和ngx_sched_yield的对比,sem_wait是一个等待通知的机制,sched_yield是一个循环遍历的机制,在自旋锁使用sched_yield是因为自旋时间断,快速循环遍历,在共享内存锁中,可能需要等待时间长,使用sem机制,避免cpu浪费,

    void
    ngx_shmtx_lock(ngx_shmtx_t *mtx)
    {
        ngx_uint_t         i, n;
    
        ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx lock");
        
        //一个死循环,不断的去看是否获取了锁,直到获取了之后才退出   
        //所以支持原子变量的  
        for ( ;; ) {
     //lock值是当前的锁状态。注意,lock一般是在共享内存中的,它可能会时刻变化,而val是当前进程的栈中变量,下面代码的执行中它可能与lock值不一致
            if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                return;
            }
            //仅在多处理器状态下spin值才有意义,否则PAUSE指令是不会执行的
            if (ngx_ncpu > 1) {
                //循环执行PAUSE,检查锁是否已经释放
                for (n = 1; n < mtx->spin; n <<= 1) {
                    //随着长时间没有获得到锁,将会执行更多次PAUSE才会检查锁
                    for (i = 0; i < n; i++) {
                        ngx_cpu_pause();
                    }
    
                    //再次由共享内存中获得lock原子变量的值
                    if (*mtx->lock == 0// 使用ngx_pid 对lock赋值,ngx_pid用于区分进程,不可能重复,非常巧妙
                        && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid))
                    {
                        return;
                    }
                }
            }
    
    #if (NGX_HAVE_POSIX_SEM) //支持信号量时才继续执行
    
            if (mtx->semaphore) {//semaphore标志位为1才使用信号量
                (void) ngx_atomic_fetch_add(mtx->wait, 1);
    
                //重新获取一次可能虚共享内存中的lock原子变量
                if (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)) {
                    (void) ngx_atomic_fetch_add(mtx->wait, -1);
                    return;
                }
    
                ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                               "shmtx wait %uA", *mtx->wait);
    
                //如果没有拿到锁,这时Nginx进程将会睡眠,直到其他进程释放了锁
                /*
                    检查信号量sem的值,如果sem值为正数,则sem值减1,表示拿到了信号量互斥锁,同时sem wait方法返回o。如果sem值为0或
                    者负数,则当前进程进入睡眠状态,等待其他进程使用ngx_shmtx_unlock方法释放锁(等待sem信号量变为正数),到时Linux内核
                    会重新调度当前进程,继续检查sem值是否为正,重复以上流程
                   */
                while (sem_wait(&mtx->sem) == -1) {
                    ngx_err_t  err;
    
                    err = ngx_errno;
    
                    if (err != NGX_EINTR) {//当EINTR信号出现时,表示sem wait只是被打断,并不是出错
                        ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
                                      "sem_wait() failed while waiting on shmtx");
                        break;
                    }
                }
    
                ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0,
                               "shmtx awoke");
    
                continue; //循环检查lock锁的值,注意,当使用信号量后不会调用sched_yield
            }
    
    #endif
    
            ngx_sched_yield(); //在不使用信号量时,调用sched_yield将会使当前进程暂时“让出”处理器
        }
    }
    //判断锁的lock域与当前进程的进程id是否相等,如果相等的话,那么就将lock设置为0,然后就相当于释放了锁。
    void
    ngx_shmtx_unlock(ngx_shmtx_t *mtx)
    {
        if (mtx->spin != (ngx_uint_t) -1) {
            ngx_log_debug0(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, "shmtx unlock");
        }
    //ngx_atomic_cmp_set 将lock设置0,执行ngx_shmtx_wakeup。如果使用ngx_sched_yield休眠,ngx_shmtx_wakeup函数无意义,ngx_shmtx_wakeup主要唤醒sem。
        if (ngx_atomic_cmp_set(mtx->lock, ngx_pid, 0)) {
            ngx_shmtx_wakeup(mtx);
        }
    }
  • 相关阅读:
    LeetCode_441. Arranging Coins
    LeetCode_437. Path Sum III
    Spearman秩相关系数和Pearson皮尔森相关系数
    Spark MLlib 之 Basic Statistics
    Spark MLlib 之 Naive Bayes
    Spark MLlib Data Type
    maven 下载 源码和javadoc 命令
    Hadoop的数据输入的源码解析
    Spark相关错误汇总
    Spark External Datasets
  • 原文地址:https://www.cnblogs.com/codestack/p/13467142.html
Copyright © 2011-2022 走看看