事件驱动与协程调度
协程的“阻塞”与线程的“非阻塞”
生产者消费者模型
1 /*
2 * Tencent is pleased to support the open source community by making Libco available.
3
4 * Copyright (C) 2014 THL A29 Limited, a Tencent company. All rights reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #include <unistd.h>
20 #include <stdio.h>
21 #include <stdlib.h>
22 #include <queue>
23 #include "co_routine.h"
24 using namespace std;
25 struct stTask_t
26 {
27 int id;
28 };
29 struct stEnv_t
30 {
31 stCoCond_t* cond;
32 queue<stTask_t*> task_queue;
33 };
34 void* Producer(void* args)
35 {
36 co_enable_hook_sys();
37 stEnv_t* env= (stEnv_t*)args;
38 int id = 0;
39 while (true)
40 {
41 stTask_t* task = (stTask_t*)calloc(1, sizeof(stTask_t));
42 task->id = id++;
43 env->task_queue.push(task);
44 printf("%s:%d produce task %d
", __func__, __LINE__, task->id);
45 co_cond_signal(env->cond);
46 poll(NULL, 0, 1000);//等待1s
47 }
48 return NULL;
49 }
50 void* Consumer(void* args)
51 {
52 co_enable_hook_sys();
53 stEnv_t* env = (stEnv_t*)args;
54 while (true)
55 {
56 if (env->task_queue.empty())
57 {
58 co_cond_timedwait(env->cond, -1);
59 continue;
60 }
61 stTask_t* task = env->task_queue.front();
62 env->task_queue.pop();
63 printf("%s:%d consume task %d
", __func__, __LINE__, task->id);
64 free(task);
65 }
66 return NULL;
67 }
68 int main()
69 {
70 stEnv_t* env = new stEnv_t;
71 env->cond = co_cond_alloc();
72
73 stCoRoutine_t* consumer_routine;
74 co_create(&consumer_routine, NULL, Consumer, env);
75 co_resume(consumer_routine);
76
77 stCoRoutine_t* producer_routine;
78 co_create(&producer_routine, NULL, Producer, env);
79 co_resume(producer_routine);
80
81 co_eventloop(co_get_epoll_ct(), NULL, NULL);
82 return 0;
83 }
在 Producer 协程函数内我们会看到调用 poll 函数等待 1 秒,Consumer 中也会看到调用 co_cond_timedwait 函数等待生产者信号。注意,从协程的角度看,这些等待看起来都是同步的(synchronous),阻塞的(blocking);但从底层线程的角度来 看,则是非阻塞的(non-blocking)。
这跟 pthread 实现的原理是一样的。在 pthread 实现的消费者中,你可能用 pthread_cond_timedwait 函数去同步等待生产者的信号;在消费者中,你可能用 poll 或 sleep 函数去定时等待。从线程的角度看,这些函数都会让当前线程阻塞;但从内核的角度看,它本身并没有阻塞,内核可能要继续忙着调度别的线程运行。那么这里协程也是一样的道理,从协程的角度看,当前的程序阻塞了;但从它底下的线程来看,自己可能正忙着执行别的协程函数。在这个例子中,当 Consumer 协程调用 co_cond_timedwait 函数“阻塞”后,线程可能已经将 Producer 调度恢复执行,反之亦然。那么这个负责协程“调度”的线程在哪呢?它即是运行协程本身的这个线程。
主协程与协程的“调度”
还记得之前提过的“主协程”的概念吗?我们再次把它搬出来,这对我们理解协程 的“阻塞”与“调度”可能更有帮助。我们讲过,libco 程序都有一个主协程,即程序里首次调用 co_create() 显式创建第一个协程的协程。在生产者消费者例子中,即为 main 函数里调用 co_eventloop() 的这个协程。当 Consumer 或 Producer 阻塞后,CPU 将 yield 给主协程, 此时主协程在干什么呢?主协程在co_eventloop() 函数里头忙活。这个 co_eventloop() 即 “调度器”的核心所在。 需要补充说明的是,这里讲的“调度器”,严格意义上算不上真正的调度器,只是为了表述的方便。libco 的协程机制是非对称的,没有什么调度算法。在执行 yield 时, 当前协程只能将控制权交给调用者协程,没有任何可调度的余地。resume 灵活性稍强 一点,不过也还算不得调度。如果非要说有什么“调度算法”的话,那就只能说是“基 于 epoll/kqueue 事件驱动”的调度算法。“调度器”就是 epoll/kqueue 的事件循环。
我们知道,在 go 语言中,用户只需使用同步阻塞式的编程接口即可开发出高性能的服务器,epoll/kqueue 这样的 I/O 事件通知机制(I/O event notification mechanism)完全被隐藏了起来。在 libco 里也是一样的,你只需要使用普通 C 库函数 read()、write() 等等同步地读写数据就好了。那么 epoll 藏在哪呢?就藏在主协程的 co_eventloop() 中。 协程的调度与事件驱动是紧紧联系在一起的,因此与其说 libco 是一个协程库,还不如说它是一个网络库。在后台服务器程序中,一切逻辑都是围绕网络 I/O 转的,libco 这样的设计自有它的合理性。
stCoEpoll_t 结构与定时器
在分析 stCoRoutineEnv_t 结构(代码清单5)的时候,还有一个 stCoEpoll_t 类型的 pEpoll 指针成员没有讲到。作为 stCoRoutineEnv_t 的成员,这个结构也是一个全局性的资源,被同一个线程上所有协程共享。从命名也看得出来,stCoEpoll_t 是跟 epoll 的事件循环相关的。现在我们看一下它的内部字段:
1 struct stCoEpoll_t
2 {
3 int iEpollFd; // epoll 实例的⽂件描述符
4
5 /**
6 * 值为 10240 的整型常量。作为 epoll_wait() 系统调用的第三个参数,
7 * 即⼀次 epoll_wait 最多返回的就绪事件个数。
8 */
9 static const int _EPOLL_SIZE = 1024 * 10;
10
11 /**
12 * 该结构实际上是⼀个时间轮(Timingwheel)定时器,只是命名比较怪,让⼈摸不着头脑。
13 * 单级时间轮来处理其内部的超时事件。
14 */
15 struct stTimeout_t *pTimeout;
16
17 /**
18 * 该指针实际上是⼀个链表头。链表用于临时存放超时事件的 item。
19 */
20 struct stTimeoutItemLink_t *pstTimeoutList;
21
22 /**
23 * 也是指向⼀个链表。该链表用于存放 epoll_wait 得到的就绪事件和定时器超时事件。
24 */
25 struct stTimeoutItemLink_t *pstActiveList;
26
27 /**
28 * 对 epoll_wait()第⼆个参数的封装,即⼀次 epoll_wait 得到的结果集。
29 */
30 co_epoll_res *result;
31
32 };
- @iEpollFd: 显然是 epoll 实例的⽂件描述符。
- @_EPOLL_SIZE: 值为 10240 的整型常量。作为 epoll_wait() 系统调用的第三个参数,即⼀次 epoll_wait 最多返回的就绪事件个数。
- @pTimeout: 类型为 stTimeout_t 的结构体指针。该结构实际上是⼀个时间轮(Timing wheel)定时器,只是命名比较怪,让⼈摸不着头脑。
- @pstTimeoutList: 指向 stTimeoutItemLink_t 类型的结构体指针。该指针实际上是⼀个链表头。链表用于临时存放超时事件的 item。
- @pstActiveList: 指向 stTimeoutItemLink_t 类型的结构体指针。也是指向⼀个链表。 该链表用于存放 epoll_wait 得到的就绪事件和定时器超时事件。
- @result: 对 epoll_wait()第⼆个参数的封装,即⼀次 epoll_wait 得到的结果集。
我们知道,定时器是事件驱动模型的网络框架一个必不可少的功能。网络 I/O 的超时,定时任务,包括定时等待(poll 或 timedwait)都依赖于此。一般而言,使用定时功能时,我们首先向定时器中注册一个定时事件(Timer Event),在注册定时事件时需要指定这个事件在未来的触发时间。在到了触发时间点后,我们会收到定时器的通知。 网络框架里的定时器可以看做由两部分组成,第一部分是保存已注册 timer events 的数据结构,第二部分则是定时通知机制。保存已注册的 timer events,一般选用红黑树,比如 nginx;另外一种常见的数据结构便是时间轮,libco 就使用了这种结构。当然你也可以直接用链表来实现,只是时间复杂度比较高,在定时任务很多时会很容易成为框架的性能瓶颈。 定时器的第二部分,高精度的定时(精确到微秒级)通知机制,一般使用 getitimer/setitimer 这类接口,需要处理信号,是个比较麻烦的事。不过对一般的应用而言,精确到毫秒就够了。精度放宽到毫秒级时,可以顺便用 epoll/kqueue 这样的系统调用来完成定时通知;这样一来,网络 I/O 事件通知与定时事件通知的逻辑就能统一起来了。libco 内部也直接使用了 epoll 来进行定时,不同的只是保存 timer events 的用的是时间轮而已。
使用 epoll 加时间轮的实现定时器的算法如下:
Step 1 [epoll_wait] 调用 epoll_wait() 等待 I/O 就绪事件,最⼤等待时长设置为 1 毫 秒(即 epoll_wait() 的第 4 个参数)。
Step 2 就绪事件预处理,统计在超时时间到达由多少个事件发生,并设置事件被触发标志
Step 3 将就绪事件加入就绪事件链表
Step 4 [从时间轮取超时事件] 从时间轮取超时事件,放到 timeout 队列。
Step 5 将超时事件加入就绪事件链表
Step 6 处理就绪事件
挂起协程与恢复的执行
那么 协程究竟在什么时候需要 yield 让出 CPU,又在什么时候恢复执行呢?
先来看 yield,实际上在 libco 中共有 3 种调用 yield 的场景:
1. 用户程序中主动调用 co_yield_ct();
2. 程序调用了 poll() 或 co_cond_timedwait() 陷⼊“阻塞”等待;
3. 程序调用了 connect(), read(), write(), recv(), send() 等系统调用陷⼊“阻塞”等待。
相应地,重新 resume 启动一个协程也有 3 种情况:
1. 对应用户程序主动 yield 的情况,这种情况也有赖于用户程序主动将协程 co_resume() 启动;
2. poll() 的目标⽂件描述符事件就绪或超时,co_cond_timedwait() 等到了其他协程 的 co_cond_signal() 通知信号或等待超时;
3. read(), write() 等 I/O 接⼝成功读到或写⼊数据,或者读写超时。
在第一种情况下,即用户主动 yield 和 resume 协程,相当于 libco 的使用者承担了部 分的协程“调度”工作。这种情况其实也很常见,在 libco 源码包的example_echosvr.cpp例子中就有。这也是服务端使用 libco 的典型模型,属于手动“调度”协程的例子。
第二种情况,前面第3.1节中的生产者消费者就是个典型的例子。在那个例子中我们 看不到用户程序主动调用 yield,也只有在最初启动协程时调用了 resume。生产者和消费者协程是在哪里切换的呢?在 poll() 与 co_cond_timedwait() 函数中。首先来看消费者。 当消费者协程首先启动时,它会发现任务队列是空的,于是调用 co_cond_timedwait() 在条件变量 cond 上“阻塞”等待。同操作系统线程的条件等待原理一样,这里条件变量 stCoCond_t 类型内部也有一个“等待队列”。co_cond_timedwait() 函数内部会将当前协程挂入条件变量的等待队列上,并设置一个回调函数,该回调函数是用于未来“唤醒” 当前协程的(即 resume 挂起的协程)。此外,如果 wait 的 timeout 参数大于 0 的话,还要向当前执行环境的定时器上注册一个定时事件(即挂到时间轮上)。在这个例子中,消费者协程 co_cond_timedwait 的 timeout 参数为-1,即 indefinitly 地等待下去,直到等到生产者向条件变量发出 signal 信号。
然后我们再来看生产者。当生产者协程启动后,它会向任务队列里投放一个任务并调用 co_cond_signal() 通知消费者,然后再调用 poll() 在原地“阻塞”等待 1000 毫秒。 这里 co_cond_signal 函数内部其实也简单,就是将条件变量的等待队列里的协程拿出来,然后挂到当前执行环境的 pstActiveList。co_cond_signal 函数并没有立即 resume 条件变量上的等待协程,毕竟这还不到交出 CPU 的时机。那么什么时候交出 CPU 控制权,什么时候 resume 消费者协程呢?继续往下看,生产者在向消费者发出“信号”之后,紧接着便调用 poll() 进入了“阻塞”等待,等待 1 秒钟。这个 poll 函数内部实际上做了两件事。首先,将自己作为一个定时事件注册到当前执行环境的定时器,注册的时候设置了 1 秒钟的超时时间和一个回调函数(仍是一个用于未来 “唤醒”自己的回调)。然后,就调用 co_yield_env() 将 CPU 让给主协程了。
现在,CPU 控制权又回到了主协程手中。主协程此时要干什么呢?我们已经讲过, 主协程就是事件循环 co_eventloop() 函数。在 co_eventloop() 中,主协程周而复始地调用 epoll_wait(),当有就绪的 I/O 事件就处理 I/O 事件,当定时器上有超时的事件就处理超时事件,pstActiveList 队列中已有活跃事件就处理活跃事件。这里所谓的“处理事件”, 其实就是调用其他工作协程注册的各种回调函数而已。那么前面我们讲过,消费者协程和生产者协程的回调函数都是“唤醒”自己而已。工作协程调用 co_cond_timedwait() 或 poll() 陷入“阻塞”等待,本质上即是通过 co_yield_env 函数让出了 CPU;而主协程则负责在事件循环中“唤醒”这些“阻塞”的协程,所谓“唤醒”操作即调用工作协程注册的回调函数,这些回调内部使用 co_resume() 重新恢复挂起的工作协程。
最后,协程 yield 和 resume 的第三种情况,即调用 read(), write() 等 I/O 操作而陷入 “阻塞”和最后又恢复执行的过程。这种情况跟第二种过程基本相似。需要注意的是, 这里的“阻塞”依然是用户态实现的过程。我们知道,libco 的协程是在底层线程上串行执行的。如果调用 read 或 write 等系统调用陷入真正的阻塞(让当前线程被内核挂起)的话,那么不光当前协程被挂起了,其他协程也得不到执行的机会。因此,如果工作协程陷入真正的内核态阻塞,那么 libco 程序就会完全停止运转,后果是很严重的。
为了避免陷入内核态阻塞,我们必须得依靠内核提供的非阻塞 I/O 机制,将 socket 文件描述符设置为 non-blocking 的。为了让 libco 的使用者更方便,我们还得将这种 non-blocking 的过程给封装起来,伪装成“同步阻塞式”的调用(跟 co_cond_timedwait() 一样)。事实上,go 语言就是这么做的。而 libco 则将这个过程伪装得更加彻底,更加具有欺骗性。它通过dlsym机制 hook 了各种网络 I/O 相关的系统调用,使得用户可以以“同步”的方式直接使用诸如read()、write()和connect()等系统调用。因此,我们会看生产者消费者协程任务函数里第一句就调用了一个 co_enable_hook_sys() 的函数。调用了 co_enable_hook_sys 函数才会开启 hook 系统调用功能,并且需要事先将要读写的文件描述符设置为 non-blocking 属性,否则,工作协程就可能陷入真正的内核态阻塞,这一点在应用中要特别加以注意。
以 read() 为例,让我们再来分析一下这些“伪装”成同步阻塞式系统调用的内部原 理。首先,假如程序 accept 了一个新连接,那么首先我们将这个连接的 socket 文件描 述符设置为非阻塞模式,然后启动一个工作协程去处理这个连接。工作协程调用 read()试图从该新连接上读取数据。这时候由于系统 read() 函数已经被 hook,所以实际上会调用到 libco 内部准备好的read() 函数。这个函数内部实际上做了 4 件事:第一步将当 前协程注册到定时器上,用于将来处理 read() 函数的读超时。第二步,调用 epoll_ctl() 将自己注册到当前执行环境的 epoll 实例上。这两步注册过程都需要指定一个回调函数, 将来用于“唤醒”当前协程。第三步,调用 co_yield_env 函数让出 CPU。第四步要等到 该协程被主协程重新“唤醒”后才能继续。如果主协程 epoll_wait() 得知 read 操作的文 件描述符可读,则会执行原 read 协程注册的会回调将它唤醒(超时后同理,不过还要 设置超时标志)。工作协程被唤醒后,在调用原 glibc 内被 hook 替换掉的、真正的 read() 系统调用。这时候如果是正常 epoll_wait 得知文件描述符 I/O 就绪就会读到数据,如果是超时就会返回-1。总之,在外部使用者看来,这个 read() 就跟阻塞式的系统调用表现出几乎完全一致的行为了。所谓的hook机制就是用用户实现的函数代替系统函数,在真正需要调用系统函数的时候,再通过dlsym机制直接调用系统函数。
主协程事件循环源码简单分析
1 void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
2 {
3 if( !ctx->result )// 给结果集分配空间
4 {
5 // _EPOLL_SIZE:epoll结果集大小
6 ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
7 }
8 co_epoll_res *result = ctx->result;
9
10
11 for(;;)
12 {
13 //调用 epoll_wait() 等待 I/O 就绪事件,为了配合时间轮⼯作,这里的 timeout设置为 1 毫秒。
14 int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
15
16 /**
17 * 获取激活事件队列和定时超时事件的临时存放链表
18 * 不使用局部变量的原因是epoll循环并不是元素的唯一来源.例如条件变量相关(co_routine.cpp stCoCondItem_t)
19 */
20 stTimeoutItemLink_t *active = (ctx->pstActiveList);
21 stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
22
23 //初始化timeout
24 memset( timeout,0,sizeof(stTimeoutItemLink_t) );
25
26 /**
27 * 处理返回的结果集,如果pfnPrepare不为NULL,就直接调用注册的回调函数进行处理
28 * 否则,将其直接加入就绪事件的队列,pfnPrepare实际上就是OnPollPreparePfn函数
29 */
30 for(int i=0;i<ret;i++)
31 {
32 // 获取在co_poll_inner放入epoll_event中的stTimeoutItem_t结构体
33 stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
34 // 如果用户设置预处理回调的话就执行
35 if( item->pfnPrepare )
36 {
37 // 若是hook后的poll的话,会把此事件加入到active队列中,并更新一些状态
38 item->pfnPrepare( item,result->events[i],active );
39 }
40 else
41 {
42 AddTail( active,item );
43 }
44 }
45
46 //从时间轮上取出已超时的事件,放到 timeout 队列。
47 unsigned long long now = GetTickMS();
48 TakeAllTimeout( ctx->pTimeout,now,timeout );
49
50 //遍历 timeout 队列,设置事件已超时标志(bTimeout 设为 true)。
51 stTimeoutItem_t *lp = timeout->head;
52 // 遍历超时链表,设置超时标志,并加入active链表
53 while( lp )
54 {
55 //printf("raise timeout %p
",lp);
56 lp->bTimeout = true;
57 lp = lp->pNext;
58 }
59
60 //将 timeout 队列中事件合并到 active 队列。
61 Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
62
63 /**
64 * 遍历 active 队列,调用⼯作协程设置的 pfnProcess() 回调函数 resume挂起的⼯作协程,
65 * 处理对应的 I/O 或超时事件。
66 */
67 lp = active->head;
68 // 开始遍历active链表
69 while( lp )
70 {
71 // 在链表不为空的时候删除active的第一个元素 如果删除成功,那个元素就是lp
72 PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
73 //如果被设置为超时并且当前事件还没有到达实际设置的超时事件
74 if (lp->bTimeout && now < lp->ullExpireTime)
75 {
76 // 一种排错机制,在超时和所等待的时间内已经完成只有一个条件满足才是正确的
77 int ret = AddTimeout(ctx->pTimeout, lp, now);
78 if (!ret)//插入成功
79 {
80 //重新开始定时
81 lp->bTimeout = false;
82 lp = active->head;
83 continue;
84 }
85 }
86 if( lp->pfnProcess )
87 {
88 lp->pfnProcess( lp );
89 }
90
91 lp = active->head;
92 }
93 // 每次事件循环结束以后执行该函数, 用于终止协程,相当于终止回调函数
94 if( pfn )
95 {
96 if( -1 == pfn( arg ) )
97 {
98 break;
99 }
100 }
101
102 }
103 }
第 14 ⾏:调用 epoll_wait() 等待 I/O 就绪事件,为了配合时间轮⼯作,这里的 timeout 设置为 1 毫秒。
第 20~24 ⾏:active 指针指向当前执⾏环境的 pstActiveList 队列,注意这里面可能已经有“活跃”的待处理事件。timeout 指针指向 pstTimeoutList 列表,其实这个 timeout 完 全是个临时性的链表,pstTimeoutList 永远为空。
第 30~44 ⾏:处理就绪的⽂件描述符。如果用户设置了预处理回调,则调用 pfnPrepare 做预处理(38 ⾏);否则直接将就绪事件 item 直接加⼊ active 队列。实际上, pfnPrepare() 预处理函数内部也是会将就绪 item 加⼊ active 队列,最终都是加⼊到 active 队列等到 32~40 ⾏统⼀处理。
第 47~48 ⾏:从时间轮上取出已超时的事件,放到 timeout 队列。
第 51~58 ⾏:遍历 timeout 队列,设置事件已超时标志(bTimeout 设为 true)。
第 61 ⾏:将 timeout 队列中事件合并到 active 队列。
第 67~92 ⾏:遍历 active 队列,调用⼯作协程设置的 pfnProcess() 回调函数 resume 挂起的⼯作协程,处理对应的 I/O 或超时事件。
总结
1、从协程的角度看,当前的程序阻塞了;但从它底下的线程来看,自己可能正忙着执行别的协程函数。负责协程“调度”的线程是运行协程本身的这个线程。
2、libco 的协程机制是非对称的,没有什么调度算法。在执行 yield 时,当前协程只能将控制权交给调用者协程,没有任何可调度的余地。如果非要说有什么“调度算法”的话,那就只能说是“基 于 epoll/kqueue 事件驱动”的调度算法。“调度器”就是 epoll/kqueue 的事件循环,由主协程负责。
3、 协程的调度与事件驱动是紧紧联系在一起的,因此与其说 libco 是一个协程库,还不如说它是一个网络库,它的epoll被隐藏在 co_eventloop() 中。
4、网络框架里的定时器可以看做由两部分组成,第一部分是保存已注册 timer events 的数据结构,第二部分则是定时通知机制。libco使用的是是时间轮数据结构,实际上就是一个循环数组。定时通知机制是利用epoll的定时功能实现的。
5、协程让出CPU的集中情况
yield 的场景:
1. 用户程序中主动调用 co_yield_ct();
2. 程序调用了 poll() 或 co_cond_timedwait() 陷⼊“阻塞”等待;
3. 程序调用了 connect(), read(), write(), recv(), send() 等系统调用陷⼊“阻塞”等待。
resume 的场景:
1. 对应用户程序主动 yield 的情况,这种情况也有赖于用户程序主动将协程 co_resume() 起来;
2. poll() 的目标⽂件描述符事件就绪或超时,co_cond_timedwait() 等到了其他协程 的 co_cond_signal() 通知信号或等待超时;
3. read(), write() 等 I/O 接⼝成功读到或写⼊数据,或者读写超时。
6、所谓的hook机制就是用用户实现的函数代替系统函数,在真正需要调用系统函数的时候,再通过dlsym机制直接调用系统函数。
7、如果工作协程陷入真正的内核态阻塞,那么 libco 程序就会完全停止运转,后果是很严重的。因此需要事先将要读写的文件描述符设置为 non-blocking 属性,否则,工作协程就可能陷入真正的内核态阻塞,就会导致任何协程都无法获取CPU。