zoukankan      html  css  js  c++  java
  • (转)st(state-threads) coroutine调度

    目录(?)[-]
     

    st(state-threads) https://github.com/winlinvip/state-threads

    以及基于st的RTMP/HLS服务器:https://github.com/winlinvip/simple-rtmp-server

    st是实现了coroutine的一套机制,即用户态线程,或者叫做协程。将epoll(async,nonblocking socket)的非阻塞变成协程的方式,将所有状态空间都放到stack中,避免异步的大循环和状态空间的判断。

    关于st的详细介绍,参考翻译:http://blog.csdn.net/win_lin/article/details/8242653

    我将st进行了简化,去掉了其他系统,只考虑linux系统,以及i386/x86_64/arm/mips四种cpu系列,参考:https://github.com/winlinvip/simple-rtmp-server/tree/master/trunk/research/st
    本文介绍了coroutine的调度,主要涉及epoll和timeout超时队列。

    EPOLL和TIMEOUT

    普通EPOLL的使用,就是读可能没有读完,写没有写完,能读多少不知道,能写多少也不知道,因此需要在fd可写时继续写,在fd可读时继续读。这就是一个大的epoll_wait循环,处理所有醒来的fd,哪些是该读的,哪些是该写的。

    TIMEOUT是应用很广的业务需求,譬如设置fd的超时,sleep一定时间之类。epoll_wait中也提供了timeout,最后一个就是超时时间。

    如果结合之前讨论的coroutine的创建和跳转方法,就可以知道st如何使用epoll了。调试程序,设置断点在_st_epoll_dispatch:

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. (gdb) bt  
    2. #0  _st_epoll_dispatch () at event.c:304  
    3. #1  0x000000000040171c in _st_idle_thread_start (arg=0x0) at sched.c:222  
    4. #2  0x0000000000401b26 in _st_thread_main () at sched.c:327  
    5. #3  0x00000000004022c0 in st_thread_create (start=0x635ed0, arg=0x186a0, joinable=0, stk_size=4199587) at sched.c:600  

    可以看到是idle线程调用了epoll的epoll_wait方法,计算出timeout和各种激活的fd,然后把对应的coroutine放到活动队列,然后一个一个线程的切换。

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. ST_HIDDEN void _st_epoll_dispatch(void)  
    2. {  
    3.     if (_ST_SLEEPQ == NULL) {  
    4.         timeout = -1;  
    5.     } else {  
    6.         min_timeout = (_ST_SLEEPQ->due <= _ST_LAST_CLOCK) ? 0 : (_ST_SLEEPQ->due - _ST_LAST_CLOCK);  
    7.         timeout = (int) (min_timeout / 1000);  
    8.     }  
    9.   
    10.     if (_st_epoll_data->pid != getpid()) {  
    11.         // WINLIN: remove it for bug introduced.  
    12.         // @see: https://github.com/winlinvip/simple-rtmp-server/issues/193  
    13.         exit(-1);  
    14.     }  
    15.   
    16.     /* Check for I/O operations */  
    17.     nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);  

    线程调度的核心,根据io或者timeout调度。

    调度其实就是idle线程做的,代码如下:

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. void *_st_idle_thread_start(void *arg)  
    2. {  
    3.     _st_thread_t *me = _ST_CURRENT_THREAD();  
    4.       
    5.     while (_st_active_count > 0) {  
    6.         /* Idle vp till I/O is ready or the smallest timeout expired */  
    7.         _ST_VP_IDLE();  
    8.           
    9.         /* Check sleep queue for expired threads */  
    10.         _st_vp_check_clock();  
    11.           
    12.         me->state = _ST_ST_RUNNABLE;  
    13.         _ST_SWITCH_CONTEXT(me);  
    14.     }  
    15.       
    16.     /* No more threads */  
    17.     exit(0);  
    18.       
    19.     /* NOTREACHED */  
    20.     return NULL;  
    21. }  

    可见是先_ST_VP_IDLE调用epoll_wait激活活动io的线程,然后在_st_vp_check_clock中检查超时的线程。

    TIME

    超时时,若使用相对时间,譬如st_usleep(100 * 1000),休眠100毫秒,最后传递给epoll_wait的时间就是100ms,即st使用相对时间:

    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. (gdb) f  
    2. #0  _st_epoll_dispatch () at event.c:308  
    3. 308         timeout = (int) (min_timeout / 1000);  
    4. (gdb) p min_timeout  
    5. $2 = 100000  

    使用相对时间就会有延迟的问题,譬如:
    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. st_usleep(100ms)  
    2. for (int i = 0; i < xxxx; i++) {  
    3.         // st没有控制权的运行时间,假设200ms  
    4. }  
    5. // st获取控制权  

    上面这段代码就会导致实际上st_usleep了有200毫秒,当然代码执行100毫秒已经是非常非常复杂的任务,是性能瓶颈了。这个其实可以忽略不计的。因此st的reference中说明如下:
    [html] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. Timeouts  
    2.   
    3. The timeout parameter to st_cond_timedwait() and the I/O functions, and the arguments to st_sleep() and st_usleep() specify a maximum time to wait since the last context switch not since the beginning of the function call.  

    超时是从线程切换算起,而不是从函数调用算起;也就是说st的超时总是有延时的啦。

    查看st_utime这个函数的实现,实际上默认是用gettimeofday,这个函数若频繁调用是有性能瓶颈的。实际上只有几个地方调用了这个函数:

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. sched.c:163:    _st_this_vp.last_clock = st_utime(); // st_init()  
    2. sched.c:478:    now = st_utime(); // _st_vp_check_clock()  
    3. stk.c:165:        srandom((unsigned int) st_utime()); // st_randomize_stacks()  
    4. sync.c:93:        _st_last_tset = st_utime(); // st_timecache_set()  

    真正调用较多的就只有_st_vp_check_clock,它实际上是在idle中调用:
    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. void *_st_idle_thread_start(void *arg)  
    2. {  
    3.     _st_thread_t *me = _ST_CURRENT_THREAD();  
    4.       
    5.     while (_st_active_count > 0) {  
    6.         /* Idle vp till I/O is ready or the smallest timeout expired */  
    7.         _ST_VP_IDLE();  
    8.           
    9.         /* Check sleep queue for expired threads */  
    10.         _st_vp_check_clock();  
    11.           
    12.         me->state = _ST_ST_RUNNABLE;  
    13.         _ST_SWITCH_CONTEXT(me);  
    14.     }  

    也就是说,这个实际上只会在每次调度时调用,实际上还是可以接受的。

    线程的超时是通过due字段设置,这个不管是sleep还是io,都是设置了这个字段:

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. sched.c:461:    trd->due = _ST_LAST_CLOCK + timeout;  

    实际上这个_ST_LAST_CLOCK就是每次调度时更新的时钟。可见,st只在每次调度时更新一次时钟,其他时候都是使用的相对时间。

    SLEEP时的参数是相对时间,添加任务时使用绝对时间,超时时会平衡二叉树,总之超时如果调用过多,是会有性能问题的。下面详细分析。

    TIMEOUT

    st所有的timeout,都是用同样的机制实现的。包括sleep,io的超时,cond超时等等。

    所有的超时对象都放在超时队列,即_ST_SLEEPQ。idle线程,即_st_idle_thread_start会先epoll_wait进行事件调度,即_st_epoll_dispatch。而在epoll_wait时最后一个参数就是超时的ms,超时队列使用绝对时间,所以只要比较超时队列的第一个元素和现在的差值,就可以知道了。

    epoll_wait事件会激活那些有io的线程,然后返回idle线程调用_st_vp_check_clock,这个就是更新绝对时间和找出超时的线程。_ST_DEL_SLEEPQ就是用来激活那些超时的线程,这个函数会调用_st_del_sleep_q,然后调用heap_delete。

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. static void heap_delete(_st_thread_t *trd)   
    2. {  
    3.     _st_thread_t *t, **p;  
    4.     int bits = 0;  
    5.     int s, bit;  
    6.       
    7.     /* First find and unlink the last heap element */  
    8.     p = &_ST_SLEEPQ;  
    9.     s = _ST_SLEEPQ_SIZE;  
    10.     while (s) {  
    11.         s >>= 1;  
    12.         bits++;  
    13.     }  
    14.       
    15.     for (bit = bits - 2; bit >= 0; bit--) {  
    16.         if (_ST_SLEEPQ_SIZE & (1 << bit)) {  
    17.             p = &((*p)->right);  
    18.         } else {  
    19.             p = &((*p)->left);  
    20.         }  
    21.     }  
    22.       
    23.     t = *p;  
    24.     *p = NULL;  
    25.     --_ST_SLEEPQ_SIZE;  
    26.     if (t != trd) {  
    27.         /* 
    28.         * Insert the unlinked last element in place of the element we are deleting 
    29.         */  
    30.         t->heap_index = trd->heap_index;  
    31.         p = heap_insert(t);  
    32.         t = *p;  
    33.         t->left = trd->left;  
    34.         t->right = trd->right;  
    35.           
    36.         /* 
    37.         * Reestablish the heap invariant. 
    38.         */  
    39.         for (;;) {  
    40.             _st_thread_t *y; /* The younger child */  
    41.             int index_tmp;  
    42.               
    43.             if (t->left == NULL) {  
    44.                 break;  
    45.             } else if (t->right == NULL) {  
    46.                 y = t->left;  
    47.             } else if (t->left->due < t->right->due) {  
    48.                 y = t->left;  
    49.             } else {  
    50.                 y = t->right;  
    51.             }  
    52.               
    53.             if (t->due > y->due) {  
    54.                 _st_thread_t *tl = y->left;  
    55.                 _st_thread_t *tr = y->right;  
    56.                 *p = y;  
    57.                 if (y == t->left) {  
    58.                     y->left = t;  
    59.                     y->right = t->right;  
    60.                     p = &y->left;  
    61.                 } else {  
    62.                     y->left = t->left;  
    63.                     y->right = t;  
    64.                     p = &y->right;  
    65.                 }  
    66.                 t->left = tl;  
    67.                 t->right = tr;  
    68.                 index_tmp = t->heap_index;  
    69.                 t->heap_index = y->heap_index;  
    70.                 y->heap_index = index_tmp;  
    71.             } else {  
    72.                 break;  
    73.             }  
    74.         }  
    75.     }  
    76.       
    77.     trd->left = trd->right = NULL;  
    78. }  

    可以看出来这个函数是比较复杂的,这个据st说是O(log N)复杂度的(参考timeout_heap.txt),但是如果频繁的调用,还是会比较成问题的。主要是频繁调用它时,意味着epoll_wait和epoll_ctl被频繁调用(因为有很多timeout嘛),所以实际上timeout使用过多,在st中是比较忌讳的。

    st最高性能时,就是没有timeout,全部使用epoll_wait进行io调度,这个时候完全就是linux的性能了,非常高。

    Deviation

    st的误差到底能到多少?测量发现(当然复杂度越高误差越大):

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. srs_trace("1. sleep...");  
    2. st_utime_t start = st_utime();  
    3. st_usleep(sleep_ms * 1000);  
    4. st_utime_t end = st_utime();  
    5.   
    6. srs_trace("2. sleep ok, sleep=%dus, deviation=%dus",   
    7.     (int)(sleep_ms * 1000), (int)(end - start - sleep_ms * 1000));  

    结果是:
    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. 1. sleep...  
    2. 2. sleep ok, sleep=100000us, deviation=147us  

    也就是说,系统空载时,误差为千分之一,完全可以忽略。

    系统繁忙时呢?做三十亿次空载循环运算后切换线程的测试:

    [cpp] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. st_mutex_t sleep_work_cond = NULL;  
    2. void* sleep_deviation_func(void* arg)  
    3. {  
    4.     st_mutex_lock(sleep_work_cond);  
    5.     srs_trace("2. work thread start.");  
    6.   
    7.     int64_t i;  
    8.     for (i = 0; i < 3000000000ULL; i++) {  
    9.     }  
    10.       
    11.     st_mutex_unlock(sleep_work_cond);  
    12.     srs_trace("3. work thread end.");  
    13.       
    14.     return NULL;  
    15. }  
    16.   
    17. int sleep_deviation_test()  
    18. {  
    19.     srs_trace("===================================================");  
    20.     srs_trace("sleep deviation test: start");  
    21.       
    22.     sleep_work_cond = st_mutex_new();  
    23.       
    24.     st_thread_create(sleep_deviation_func, NULL, 0, 0);  
    25.     st_mutex_lock(sleep_work_cond);  
    26.       
    27.     srs_trace("1. sleep...");  
    28.     st_utime_t start = st_utime();  
    29.       
    30.     // other thread to do some complex work.  
    31.     st_mutex_unlock(sleep_work_cond);  
    32.     st_usleep(1000 * 1000);  
    33.       
    34.     st_utime_t end = st_utime();  
    35.       
    36.     srs_trace("4. sleep ok, sleep=%dus, deviation=%dus",   
    37.         (int)(sleep_ms * 1000), (int)(end - start - sleep_ms * 1000));  
    38.   
    39.     st_mutex_lock(sleep_work_cond);  
    40.     srs_trace("sleep deviation test: end");  
    41.       
    42.     st_mutex_destroy(sleep_work_cond);  
    43.       
    44.     return 0;  
    45. }  

    这个时候st的误差是:
    [plain] view plaincopy在CODE上查看代码片派生到我的代码片
     
    1. sleep deviation test: start  
    2. 1. sleep...  
    3. 2. work thread start.  
    4. 3. work thread end.  
    5. 4. sleep ok, sleep=100000us, deviation=6560003us  
    6. sleep deviation test: end  

    查看io其他所有timeout的实现,都是一样的。所以st是有误差的,在一些性能有问题的程序中,会造成严重的调度问题(当然性能有问题应该解决性能问题)。

    st的timeout机制,总体来讲,是没有问题的,这就是结论。

    版权声明:本文为博主原创文章,未经博主允许不得转载。

    转自:http://blog.csdn.net/win_lin/article/details/41009137

    个人注解:

    1) 关于coroutine的理解:它是用户自己模拟出来的类似于线程的调度,而不是真正意义上的线程,它由用户自己管理调度,自己创建一段内存来模拟线程的堆栈。

    • coroutine创建的所谓的“线程”都不是真正的操作系统的线程,实际上是通过保存stack状态来模拟的。
    • 由于是假的线程,所以切换线程的开销极小,同时创建线程也是轻量级的,new_thread只是在内存新建了一个stack用于存放新coroutine的变量,也称作lua_State。

    2) setjmp()和longjmp()函数:作用类似于goto,解决goto它只能跳到所在函数内部的标号上,而不能将控制权转移到所在程序的任意地点(当然,除非你的所有代码都在main体中)。

      所以setjmp()和longjmp()函数,它们分别承担非局部标号和goto作用。

    3) 目前暂支持linux版本,但http://sourceforge.net/projects/state-threads/files/支持了window版本,只是这个版本比较老了而已。

  • 相关阅读:
    java 多线程 Future callable
    nginx Access-Control-Allow-Origin css跨域
    maven 项目调试本地代码
    tomcat -ROOT 与webapps 的关系,关于部署的一些问题
    需求分析,挖掘背后的原因
    js 短信验证码 计时器
    总结一些小问题
    基于synchronized 或 ReadWriteLock实现 简单缓存机制
    java cookie 工具类
    309. 最佳买卖股票时机含冷冻期
  • 原文地址:https://www.cnblogs.com/lihaiping/p/4755857.html
Copyright © 2011-2022 走看看