zoukankan      html  css  js  c++  java
  • State Threads之Co-routine的调度

    1. 相关结构体

    1.1 _st_epoll_data

    static struct _st_epolldata {
        _epoll_fd_data_t *fd_data;
        /* 调用 epoll_wait 前预先分配好的 epoll_event 结构体数组,epoll_wait 将会把发生的事件
         * 复制到 evtlist 数组中 */
        struct epoll_event *evtlist;
        int fd_data_size;
        /* 表示在 epoll_wait 中可返回的最大事件数目,通常该值与预分配的 evtlist 数组的大小是相等的 */
        int evtlist_size;
        /* evtlist数组中正在监听的事件的个数 */
        int evtlist_cnt;
        /* epoll 监听事件的最大值 */
        int fd_hint;
        /* epoll_create() 创建的句柄 */
        int epfd;
        /* 当前进程的 id 号 */
        pid_t pid;
    } *_st_epoll_data;
    

    1.2 _epoll_fd_data_t

    typedef struct _epoll_fd_data {
        int rd_ref_cnt;
        int wr_ref_cnt;
        int ex_ref_cnt;
        int revents;
    } _epoll_fd_data_t;
    

    1.3 _st_pollq_t

    /* #include <common.h> */
    typedef struct _st_pollq {
        _st_clist_t links;          /* For putting on io queue */
        _st_thread_t *thread;       /* Polling thread */
        struct pollfd *pds;         /* Array of poll descriptors */
        int npds;                   /* Length of the array */
        int on_ioq;                 /* Is it on ioq? */
    }
    

    2. idle 线程

    当每次要切换线程上下文的时候,若检测到 run 队列中没有可调度运行的线程,则会默认调度 idle 线程,该线程在 st_init() 函数中创建好。

    void *_st_idle_thread_start(void *arg)
    {
        _st_thread_t *me = _ST_CURRENT_THREAD();
        
        while (_st_active_count > 0) {
            /* Idle vp till I/O is ready or the smallest timeout expired */
            _ST_VP_IDLE();
            
            /* Check sleep queue for expired threads */
            _st_vp_check_clock();
            
            me->state = _ST_ST_RUNNABLE;
            _ST_SWITCH_CONTEXT(me);
        }
        
        /* 当所有线程都执行完毕并退出时,该 idle 才退出 */
        /* No more threads */
        exit(0);
        
        /* NOTREACHED */
        return NULL;
    }
    

    该函数是先调用 _ST_VP_IDLE(里面会调用到 epoll_wait)监听活动的 I/O 线程,有则将其放入到 run 队列中,然后调用 _st_vp_check_clock 检查超时的线程。

    2.1 _ST_VP_IDLE

    #define _ST_VP_IDLE()                   (*_st_eventsys->dispatch)()
    

    这里 _st_eventsys 指向封装了 epoll 事件监控机制的上下文结构体。因此调用的是 _st_epoll_dispatch 函数。

    /* #include <event.c> */
    #define _ST_EPOLL_REVENTS(fd)    (_st_epoll_data->fd_data[fd].revents)
    
    #define _ST_EPOLL_READ_BIT(fd)   (_ST_EPOLL_READ_CNT(fd) ? EPOLLIN : 0)
    #define _ST_EPOLL_WRITE_BIT(fd)  (_ST_EPOLL_WRITE_CNT(fd) ? EPOLLOUT : 0)
    #define _ST_EPOLL_EXCEP_BIT(fd)  (_ST_EPOLL_EXCEP_CNT(fd) ? EPOLLPRI : 0)
    #define _ST_EPOLL_EVENTS(fd) 
        (_ST_EPOLL_READ_BIT(fd)|_ST_EPOLL_WRITE_BIT(fd)|_ST_EPOLL_EXCEP_BIT(fd))
        
    /* #include <common.h> */
    #define _ST_POLLQUEUE_PTR(_qp)      
        ((_st_pollq_t *)((char *)(_qp) - offsetof(_st_pollq_t, links)))
    
    ST_HIDDEN void _st_epoll_dispatch(void)
    {
        st_utime_t min_timeout;
        _st_clist_t *q;
        _st_pollq_t *pq;
        struct pollfd *pds, *epds;
        int timeout, nfd, i, osfd, notify;
        int events, op;
        short revents;
        
        if (_ST_SLEEPQ == NULL) {
            /* 
             * 若 sleep 队列中没有要管理的超时线程,则设置 epoll_wait 的
             * 超时时间为 -1,即 epoll_wait 一直等待,直到有 I/O 事件到来 */
            timeout = -1;
        } else {
            /* 
             * 若 sleep 队列中有超时线程,则用 sleep 队列中超时时间最小的
             * 与_ST_LAST_CLOCK比较,若小于,说明该线程的超时时间已经达到了,
             * 因此设置 epoll_wait 的超时时间为 0,即非阻塞;若大于,则说明
             * 该线程超时时间仍未到达,因此设置 epoll_wait 的超时时间为
             * 两者之差.
             */
            min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 
                0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);
            timeout = (int) (min_timeout / 1000);
        }
        
        if (_st_epoll_data->pid != getpid()) {
            // WINLIN: remove it for bug introduced.
            // @see: https://github.com/ossrs/srs/issues/193
            exit(-1);
        }
        
        /* Check for I/O operations */
        nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist,   
                         _st_epoll_data->evtlist_size, timeout);
        
        if (nfd > 0) {
            for (i = 0; i < nfd; i++) {
                osfd = _st_epoll_data->evtlist[i].data.fd;
                _ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
                if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
                    /* Also set I/O bits on error */
                    _ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
                }
            }
            
            for (q = _ST_IOQ.next; q != &ST_IOQ; q = q->next) {
                pq = _ST_POLLQUEUE_PTR(q);
                notify = 0;
                epds = pq->pds + pq->npds;
                
                for (pds = pq->pds; pds < epds; pds++) {
                    if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
                        pds->revents = 0;
                        continue;
                    }
                    osfd = pds->fd;
                    events = pds->events;
                    revents = 0;
                    /* 检测监听到的事件类型 */
                    if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN)) {
                        revents |= POLLIN;
                    }
                    if ((events & POLLOUT) && (_ST_EPOLL_REVENTS(osfd) & EPOLLOUT)) {
                        revents |= POLLOUT;
                    }
                    if ((events & POLLPRI) && (_ST_EPOLL_REVENTS(osfd) & EPOLLPRI)) {
                        revents |= POLLPRI;
                    }
                    if (_ST_EPOLL_REVENTS(osfd) & EPOLLERR) {
                        revents |= POLLERR;
                    }
                    if (_ST_EPOLL_REVENTS(osfd) & EPOLLHUP) {
                        revents |= POLLHUP;
                    }
                    
                    pds->revents = revents;
                    if (revents) {
                        notify = 1;
                    }
                }
                if (notify) {
                    /* 将该线程从 io 队列中移除 */
                    ST_REMOVE_LINK(&pq->links);
                    pq->on_ioq = 0;
                    /*
                     * Here we will only delete/modify descriptors that
                     * didn't fire (see comments in _st_epoll_pollset_del()).
                     */
                    _st_epoll_pollset_del(pq->pds, pq->npds);
                    
                    /* 若该线程在 sleep 队列中,则将其重 sleep 队列中移除 */
                    if (pq->thread->flags & _ST_FL_ON_SLEEPQ) {
                        _ST_DEL_SLEEPQ(pq->thread);
                    }
                    /* 将该线程的状态标志位置为 RUNNABLE,并将其添加到 run 队列中 */
                    pq->thread->state = _ST_ST_RUNNABLE;
                    _ST_ADD_RUNQ(pq->thread);
                }
            }
            
            for (i = 0; i < nfd; i++) {
                /* Delete/modify descriptors that fired */
                osfd = _st_epoll_data->evtlist[i].data.fd;
                _ST_EPOLL_REVENTS(osfd) = 0;
                events = _ST_EPOLL_EVENTS(osfd);
                op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                ev.events = events;
                ev.data.fd = osfd;
                if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 && op == EPOLL_CTL_DEL)
                {
                    _st_epoll_data->evtlist_cnt--;
                }
            }
        }
    }
    

    2.1.1 _st_epoll_pollset_del

    #define _ST_EPOLL_READ_CNT(fd)   (_st_epoll_data->fd_data[fd].rd_ref_cnt)
    #define _ST_EPOLL_WRITE_CNT(fd)  (_st_epoll_data->fd_data[fd].wr_ref_cnt)
    #define _ST_EPOLL_EXCEP_CNT(fd)  (_st_epoll_data->fd_data[fd].ex_ref_cnt)
    
    ST_HIDDEN void _st_epoll_pollset_del(struct pollfd *pds, int npds)
    {
        struct epoll_event ev;
        struct pollfd *pd;
        struct pollfd *epd = pds + npds;
        int old_events, events, op;
        
        /*
         * It's more or less OK if deleting fails because a descriptor
         * will either be closed or deleted in dispatch function after
         * it fires.
         */
        for (pd = pds; pd < epd; pd++) {
            old_events = _ST_EPOLL_EVENTS(pd->fd);
            
            if (pd->events & POLLIN) {
                _ST_EPOLL_READ_CNT(pd->fd)--;
            }
            if (pd->events & POLLOUT) {
                _ST_EPOLL_WRITE_CNT(pd->fd)--;
            }
            if (pd->events & POLLPRI) {
                _ST_EPOLL_EXCEP_CNT(pd->fd)--;
            }
            
            events = _ST_EPOLL_EVENTS(pd->fd);
            /*
             * The _ST_EPOLL_REVENTS check below is needed so we can use
             * this function inside dispatch(). Outside of dispatch()
             * _ST_EPOLL_REVENTS is always zero for all descriptors.
             */
            if (events != old_events && _ST_EPOLL_REVENTS(pd->fd) == 0) {
                op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                ev.events = events;
                ev.data.fd = pd->fd;
                if (epoll_ctl(_st_epoll_data->epfd, op, pd->fd, &ev) == 0 
                    && op == EPOLL_CTL_DEL) {
                    _st_epoll_data->evtlist_cnt--;
                }
            }
        }
    }
    

    2.2 超时检测:_st_vp_check_clock

    void _st_vp_check_clock(void)
    {
        _st_thread_t *trd;
        st_utime_t elapsed, now;
        
        now = st_utime();
        elapsed = now - _ST_LAST_CLOCK;
        /* _ST_LAST_CLOCK 是每次调度时更新的时钟,且 ST 只在每次调度时更新一次时钟,
         * 其他时候都是使用相对时间 */
        _ST_LAST_CLOCK = now;
        
        if (_st_curr_time && now - _st_last_tset > 999000) {
            _st_curr_time = time(NULL);
            _st_last_tset = now;
        }
        
        while (_ST_SLEEPQ != NULL) {
            trd = _ST_SLEEPQ;
            ST_ASSERT(trd->flags & _ST_FL_ON_SLEEPQ);
            /* 检测该线程的超时时间是否已经到达 */
            if (trd->due > now) {
                break;
            }
            _ST_DEL_SLEEPQ(trd);
            
            /* If thread is waiting on condition variable, set the time out flag */
            if (trd->state == _ST_ST_COND_WAIT) {
                trd->flags |= _ST_FL_TIMEDOUT;
            }
            
            /* Make thread runnable */
            ST_ASSERT(!(trd->flags & _ST_FL_IDLE_THREAD));
            trd->state = _ST_ST_RUNNABLE;
            _ST_ADD_RUNQ(trd);
        }
    }
    

    注意:sleep 时的参数是相对时间,添加任务时使用绝对时间,超时时会平衡二叉树,总之,超时如果调用过多,会有性能问题。

    ST 所有的 timeout,都是用同样的机制实现的。包括 sleep,io 的超时,cond 超时等。

    所有的超时对象都放在超时队列,即 _ST_SLEEPQ。idle 线程,即 _st_idle_thread_start 会先 epoll_wait 进行事件调度,即 _st_epoll_dispatch。而在 epoll_wait 时最后一个参数就是超时的 ms,超时队列使用绝对时间,所以只要比较超时队列的第一个元素和现在的差值,就可以知道了。

    epoll_wait 事件会激活那些有 io 的线程,然后返回 idle 线程调用 _st_vp_check_clock,这个就是更新绝对时间和找出超时的线程。_ST_DEL_SLEEPQ 就是用来激活那些超时的线程,这个函数会调用 _st_del_sleep_q,然后调用 heap_delete。

    2.2.1 _ST_DEL_SLEEPQ

    #define _ST_DEL_SLEEPQ(_thr)        _st_del_sleep_q(_thr)
    

    2.2.2 _st_del_sleep_q

    void _st_del_sleep_q(_st_thread_t *trd)
    {
        heap_delete(trd);
        trd->flags &= ~_ST_FL_ON_SLEEPQ;
    }
    

    2.2.3 heap_delete

    /**
     * Delete "thread" from the timeout heap.
     */
    static void heap_delete(_st_thread_t *trd)
    {
        _st_thread_t *t, **p;
        int bits = 0;
        int s, bit;
        
        /* First find and unlink the last heap element */
        p = &_ST_SLEEPQ;
        s = _ST_SLEEPQ_SIZE;
        while (s) {
            s >>= 1;
            bits++;
        }
        
        for (bit = bits - 2; bit >= 0; bit--) {
            if (_ST_SLEEPQ_SIZE & (1 << bit)) {
                p = &((*p)->right);
            } else {
                p = &((*p)->left);
            }
        }
        
        t = *p;
        *p = NULL;
        --_ST_SLEEPQ_SIZE;
        if (t != trd) {
            /*
            * Insert the unlinked last element in place of the element we are deleting
            */
            t->heap_index = trd->heap_index;
            p = heap_insert(t);
            t = *p;
            t->left = trd->left;
            t->right = trd->right;
            
            /*
            * Reestablish the heap invariant.
            */
            for (;;) {
                _st_thread_t *y; /* The younger child */
                int index_tmp;
                
                if (t->left == NULL) {
                    break;
                } else if (t->right == NULL) {
                    y = t->left;
                } else if (t->left->due < t->right->due) {
                    y = t->left;
                } else {
                    y = t->right;
                }
                
                if (t->due > y->due) {
                    _st_thread_t *tl = y->left;
                    _st_thread_t *tr = y->right;
                    *p = y;
                    if (y == t->left) {
                        y->left = t;
                        y->right = t->right;
                        p = &y->left;
                    } else {
                        y->left = t->left;
                        y->right = t;
                        p = &y->right;
                    }
                    t->left = tl;
                    t->right = tr;
                    index_tmp = t->heap_index;
                    t->heap_index = y->heap_index;
                    y->heap_index = index_tmp;
                } else {
                    break;
                }
            }
        }
        
        trd->left = trd->right = NULL;
    }
    

    注:ST 最高性能时,就是没有 timeout,全部使用 epoll_wait 进行 I/O 调度,这个时候完全就是 linux 的性能了。

    2.3 _ST_SWITCH_CONTEXT

    /*
     * Switch away from the current thread context by saving its state and 
     * calling the thread scheduler/
     */
    #define _ST_SWITCH_CONTEXT(_thread)       
        ST_BEGIN_MACRO                        
        ST_SWITCH_OUT_CB(_thread);            
        if (!MD_SETJMP((_thread)->context)) { 
            _st_vp_schedule();                
        }
        ST_DEBUG_ITERATE_THREADS();           
        ST_SWITCH_IN_CB(_thread);             
        ST_END_MACRO
    

    2.3.1 ST_SWITCH_OUT_CB

    #ifdef ST_SWITCH_CB
        #define ST_SWITCH_OUT_CB(_thread)                
            if (_st_this_vp.switch_out_cb != NULL &&     
                _thread != _st_this_vp.idle_thread &&    
                _thread->state != _ST_ST_ZOMBIE) {       
                _st_this_vp.switch_out_cb();             
            }
        #define ST_SWITCH_IN_CB(_thread)                 
            if (_st_this_vp.switch_in_cb != NULL &&      
                _thread != _st_this_vi.idle_thread &&    
                _thread->state != _ST_ST_ZOMBIE) {       
                    _st_this_vp.switch_in_cb();          
                }
    #else
        #define ST_SWITCH_OUT_CB(_thread)
        #define ST_SWITCH_IN_CB(_thread)
    #endif
    

    2.4 _st_vp_schedule

    #define _ST_THREAD_PTR(_qp)         
        ((_st_thread_t *)((char *)(_qp) - offsetof(_st_thread_t, links)))
    
    void _st_vp_schedule(void)
    {
        _st_thread_t *trd;
        
        if (_ST_RUNQ.next != &ST_RUNQ) {
            /* Pull thread off of thre run queue */
            trd = _ST_THREAD_PTR(_ST_RUNQ.next);
            _ST_DEL_RUNQ(trd);
        } else {
            /* If there are no threads to run, switch to the idle thread */
            trd = _st_this_vp.idle_thread;
        }
        ST_ASSERT(trd->state == _ST_ST_RUNNABLE);
        
        /* Resume the thread */
        trd->state = _ST_ST_RUNNING;
        _ST_RESTORE_CONTEXT(trd);
    }
    

    2.4.1 _ST_RESTORE_CONTEXT

    /* #include <common.h> */
    #define _ST_SET_CURRENT_THREAD(_thread) (_st_this_thread = (_thread))
    
    /* #include <md.h> */
    #define MD_LONGJMP(env, val) _longjmp(env, val)
    
    /*
     * Restore a thread context that was saved by _ST_SWITCH_CONTEXT or 
     * initialized by _ST_INIT_CONTEXT
     */
    #define _ST_RESTORE_CONTEXT(_thread)   
        ST_BEGIN_MACRO                     
        _ST_SET_CURRENT_THREAD(_thread);   
        MD_LONGJMP((thread)->context, 1);  
        ST_END_MACRO
    

    该宏主要是将当前线程设为自己,然后调用 MD_LONGJMP 切换到第一次对该线程调用 MD_SETJMP 的地方。

  • 相关阅读:
    LINUX内核分析第三周学习总结——构造一个简单的Linux系统MenuOS
    Linux第二周学习总结——操作系统是如何工作的
    通过汇编一个简单的C程序,分析汇编代码理解计算机是如何工作的
    期末总结
    实验报告(实验五)
    HTTPServletResponse
    Eclipse 枚举类报错
    出现Unreachable code问题的原因
    myeclipse自动保存修改代码
    ssh连接虚拟机失败解决办法
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/9035199.html
Copyright © 2011-2022 走看看