1.0 协程库引言
协程对于上层语言还是比较常见的. 例如C# 中 yield retrun, lua 中 coroutine.yield 等来构建同步并发的程序.
本文就是探讨如何从底层实现开发级别的协程库. 在说协程之前, 简单温故一下进程和线程关系.
进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在. 线程是进程的一部分,没有自己的地址空间,
与进程内的其他线程一起共享分配给该进程的所有资源.进程和线程是1对多关系, 协程同线程关系也是类似.
一个线程中可以有多个协程. 协程同线程相比区别再于, 线程是操作系统控制调度(异步并发),
而线程是程序自身控制调度(同步串行). 简单总结协程特性如下:
1. 相比线程具有更优的性能(假定, 程序写的没有明显失误) , 省略了操作系统的切换操作
2. 相比线程具有更少的内存空间, 线程是操作系统对象很耗资源, 协程是用户态资源, 占用系统层资源很少.
3. 对比线程开发, 逻辑结构更复杂, 需要开发人员了解程序运行走向.
举个例子 数码宝贝例子 : 滚球兽 -> 亚古兽-> 暴龙兽-> 机械暴龙兽 -> 战斗暴龙兽
'类比协程进化史' if .. else / switch -> goto -> setjmp / logjump -> coroutine -> .......
协程开发是串行程序开发中构建异步效果的开发模型.
本文参照博文和资料记录
C 的 coroutine 库 : http://blog.codingnow.com/2012/07/c_coroutine.html
纤程 : http://blog.codingnow.com/2005/10/fiber.html
这里补充说明一下, 为什么需要再造轮子. 也是有''历史''原因额. 有一个腾讯写的libco协程库, 但是用的是汇编加cpp混编的.
而云风的coroutine是运行在linux 和 mac OS上的, window上没法跑. 因此需要一个支持linux 加 window上纯c运行的库.
这就是设计这个库的历史原因. 主要思想还是参照云风关于协程的理解, 我只是有幸站在绝顶高手的脚底下, 兴风作浪~~~~
一流高手和绝顶高手的差距在哪里? https://www.zhihu.com/question/43704220
2.0 协程库操作系统相关知识储备
2.1 window fiber 储备
window fiber也叫纤程. 官方说明是 "Microsoft公司给Windows添加了一种纤程,以便能够非常容易地将现有的UNIX服务器应用程序移植到Windows中".
这就是纤程概念的由来.
window核心编程中关于fiber介绍 http://www.cnblogs.com/wz19860913/archive/2008/08/26/1276816.html
Microsoft fiber desc https://msdn.microsoft.com/en-us/library/windows/desktop/ms682661(v=vs.85).aspx
而我们这里会详细解释其中关于window fiber常用api. 先浏览关于当前线程开启纤程相关接口说明.
// // Fiber creation flags // #define FIBER_FLAG_FLOAT_SWITCH 0x1 // context switch floating point /* * VS编译器特性约定 * 1. 其参数都是从右向左通过堆栈传递的 * 2. 函数调用在返回前要由被调用者清理堆栈(被调用函数弹出的时候销毁堆栈) */ #define WINAPI __stdcall /* * 将当前线程转成纤程, 返回转换成功的主纤程对象域 * lpParameter : 转换的时候传入到主线程中用户数据 * dwFlags : 附加参数, 默认填写 FIBER_FLAG_FLOAT_SWITCH * : 返回转换成功后的主纤程对象域 */ WINBASEAPI __out_opt LPVOID WINAPI ConvertThreadToFiberEx( __in_opt LPVOID lpParameter, __in DWORD dwFlags ); // 得到当前纤程中用户传入的数据, 就是上面 lpParameter __inline PVOID GetFiberData( void ) { return *(PVOID *) (ULONG_PTR) __readfsdword (0x10);} // 得到当前运行纤程对象 __inline PVOID GetCurrentFiber( void ) { return (PVOID) (ULONG_PTR) __readfsdword (0x10);} /* * 将当前纤程转换成线程, 对映ConvertThreadToFiberEx操作系列函数. 返回原始环境 * : 返回成功状态, TRUE标识成功 */ WINBASEAPI BOOL WINAPI ConvertFiberToThread(VOID);
下面是关于如何创建纤程并切换(启动)官方接口说明.
// 标识纤程执行体的注册函数声明, lpFiberParameter 可以通过 GetFiberData 得到 typedef VOID (WINAPI *PFIBER_START_ROUTINE)(LPVOID lpFiberParameter); typedef PFIBER_START_ROUTINE LPFIBER_START_ROUTINE; /* * 创建一个没有启动纤程对象并返回 * dwStackCommitSize : 当前纤程栈大小, 0标识默认大小 * dwStackReserveSize : 当前纤程初始化化保留大小, 0标识默认大小 * dwFlags : 纤程创建状态, 默认FIBER_FLAG_FLOAT_SWITCH, 支持浮点数操作 * lpStartAddress : 指定纤程运行的载体.等同于纤程执行需要指明执行函数 * lpParameter : 纤程执行的时候, 传入的用户数据, 在纤程中GetFiberData可以得到 * : 返回创建好的纤程对象 */ WINBASEAPI __out_opt LPVOID WINAPI CreateFiberEx( __in SIZE_T dwStackCommitSize, __in SIZE_T dwStackReserveSize, __in DWORD dwFlags, __in LPFIBER_START_ROUTINE lpStartAddress, __in_opt LPVOID lpParameter ); // 销毁一个申请的纤程资源和CreateFiberEx成对出现 WINBASEAPI VOID WINAPI DeleteFiber(__in LPVOID lpFiber); // 纤程跳转, 跳转到lpFiber指定的纤程 WINBASEAPI VOID WINAPI SwitchToFiber(__in LPVOID lpFiber);
我们通过上面api 写一个基础的演示demo , fiber_handle.c, 实践能补充猜想.
#include <Windows.h> #include <stdio.h> #include <stdlib.h> // fiber one run static void WINAPI _fiber_one_run(LPVOID pars) { LPVOID * fibers = pars; puts("_fiber_one_run start"); fibers[1] = GetCurrentFiber(); // 切换到主纤程中 SwitchToFiber(fibers[0]); puts("_fiber_one_run end"); SwitchToFiber(fibers[0]); } /* * test 纤程练习 */ int main(int argc, char * argv[]) { PVOID fibers[2]; // A pointer to a variable that is passed to the fiber. The fiber can retrieve this data by using the GetFiberData macro. fibers[0] = ConvertThreadToFiberEx(NULL, FIBER_FLAG_FLOAT_SWITCH); // 创建普通纤程, 当前还是在主纤程中 fibers[1] = CreateFiberEx(0, 0, FIBER_FLAG_FLOAT_SWITCH, _fiber_one_run, fibers); puts("main ConvertThreadToFiberEx start"); SwitchToFiber(fibers[1]); puts("main ConvertThreadToFiber SwitchToFiber"); SwitchToFiber(fibers[1]); puts("main ConvertThreadToFiber SwitchToFiber two"); DeleteFiber(fibers[1]); ConvertFiberToThread(); puts("main ConvertThreadToFiber SwitchToFiber two end"); return 0; }
示例演示结果
到这儿关于window 纤程部分储备完毕.
自己看一遍, 练习一遍, 基本上就能熟练掌握window fiber 对象了. 哎, 如果人如何NB. 我的猜测是
遇到更NB人 && 不懒
2.2 linux ucontext 储备
同样对于linux, 同样有一套机制ucp, 上下文记录机制. 翻译了其中用的api
#include <ucontext.h> /* * 得到当前程序运行此处上下文信息 * ucp : 返回当前程序上下文并保存在ucp指向的内存中 * : -1标识失败, 0标识成功 */ int getcontext(ucontext_t * ucp); /* * 设置到执行程序上下文对象中. * ucp : 准备跳转的上下文对象 * : 失败返回-1. 成功不返回 */ int setcontext(const ucontext_t * ucp); /* * 重新设置ucp上下文. * ucp : 待设置的上下文对象 * func : 新上下文执行函数体, 其实gcc认为声明是void * func(void) * argc : func 函数参数个数 * ... : 传入func中的参数 */ void makecontext(ucontext_t * ucp, void (* func)(), int argc, ...); /* * 保存当前上下文对象 oucp, 并且跳转到执行上下文件对象 ucp 中 * oucp : 保存当前上下文对象 * ucp : 执行的上下文对象 * : 失败返回-1, 成功不返回 */ int swapcontext (ucontext_t * oucp, ucontext_t * ucp);
相比window fiber确实很清爽. 扩充一下, 关于ucontext_t 一种结构实现
/* Userlevel context. */ typedef struct ucontext { unsigned long int uc_flags; struct ucontext * uc_link; // 下一个执行的序列, NULL不继续执行了 stack_t uc_stack; // 当前上下文, 堆栈信息 mcontext_t uc_mcontext; __sigset_t uc_sigmask; struct _libc_fpstate __fpregs_mem; } ucontext_t; /* Alternate, preferred interface. */ typedef struct sigaltstack { void * ss_sp; // 指向当前堆栈信息首地址 int ss_flags; size_t ss_size; // 当前堆栈大小 } stack_t;
上面加了中文注释的部分, 就是我们开发中需要用到的几个字段. 设置执行顺序, 指定当前上下文堆栈信息.
有了这些知识, 我们在linux上练练手, 采用官方 man 手册中提供的一段代码, 演示一下结果. ucontext_demo.c
#include <ucontext.h> #include <stdio.h> #include <stdlib.h> static ucontext_t uctx_main, uctx_func1, uctx_func2; #define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0) static void _func1(void) { printf("func1: started "); printf("func1: swapcontext(&uctx_func1, &uctx_func2) "); if (swapcontext(&uctx_func1, &uctx_func2) == -1) handle_error("swapcontext"); printf("func1: returning "); } static void _func2(void) { printf("func2: started "); printf("func2: swapcontext(&uctx_func2, &uctx_func1) "); if (swapcontext(&uctx_func2, &uctx_func1) == -1) handle_error("swapcontext"); printf("func2: returning "); } int main(int argc, char * argv[]) { char func1_stack[16384]; char func2_stack[16384]; if (getcontext(&uctx_func1) == -1) handle_error("getcontext"); uctx_func1.uc_stack.ss_sp = func1_stack; uctx_func1.uc_stack.ss_size = sizeof(func1_stack); uctx_func1.uc_link = &uctx_main; makecontext(&uctx_func1, _func1, 0); if (getcontext(&uctx_func2) == -1) handle_error("getcontext"); uctx_func2.uc_stack.ss_sp = func2_stack; uctx_func2.uc_stack.ss_size = sizeof(func2_stack); uctx_func2.uc_link = &uctx_func1; makecontext(&uctx_func2, _func2, 0); printf("main: swapcontext(&uctx_main, &uctx_func2) "); if (swapcontext(&uctx_main, &uctx_func2) == -1) handle_error("swapcontext"); printf("main: exiting "); return 0; }
参照下面编译操作
run 结果
通过上练test, 对于linux ucontext api 基本全部熟悉了.
上面代码埋了一个小坑, _func1, _func2都没有传参, 大家试试为上面函数传参结果会如何, x86和x64都试试.
恭喜, 到这里基本上操作系统提供上下文切换(高级 longjmp/setjmp)知识点都储备完毕, 后面就可以不用看了.
3.0 协程库封装
3.1 协程库统一接口封装
备注 : 协程,纤程,上下文 认为是一个概念.
到这里基本上就是开发级别封装库了, 还是存在相当大含金量的. 先提供统一接口 coroutine.h
#ifndef _H_COROUTINE #define _H_COROUTINE typedef enum costatus { // 纤程存在状态 CS_Dead = 0, // 纤程死亡状态 CS_Ready = 1, // 纤程已经就绪 CS_Running = 2, // 纤程正在运行 CS_Suspend = 3, // 纤程暂停等待 } costatus_e; typedef struct comng * comng_t; /* * 创建运行纤程的主体, 等同于纤程创建需要执行的函数体. * schedule : co_start 函数返回的结果 * ud : 用户自定义数据 */ typedef void (* co_f)(comng_t comng, void * ud); /* * 开启纤程系统, 并创建主纤程 * : 返回开启的纤程调度系统管理器 */ extern comng_t co_start(void); /* * 关闭开启的纤程系统 * comng : co_start 返回的纤程管理器 */ extern void co_close(comng_t comng); /* * 创建一个纤程对象,并返回创建纤程的id. 创建好的纤程状态是CS_Ready * comng : co_start 返回的纤程管理器 * func : 纤程运行的主体 * ud : 用户传入的数据, co_f 中 ud 会使用 * : 返回创建好的纤程标识id */ extern int co_create(comng_t comng, co_f func, void * ud); /* * 激活创建的纤程对象. * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 */ extern void co_resume(comng_t comng, int id); /* * 中断当前运行的的纤程, 并将CPU交给主纤程处理调度. * comng : 纤程管理器对象 */ extern void co_yield(comng_t comng); /* * 得到当前纤程运行的状态 * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 * : 返回状态具体参照 costatus_e */ extern costatus_e co_status(comng_t comng, int id); /* * 得到当前纤程系统中运行的纤程, 返回 < 0表示没有纤程在运行 * comng : 纤程管理器对象 * : 返回当前运行的纤程标识id, */ extern int co_running(comng_t comng); #endif // !_H_COROUTINE
核心思路是
co_create -> CS_Ready
co_resume -> CS_Running
co_yield -> CS_Suspend
协程运行完毕就是 CS_Dead. 主协程默认一直运行不参与状态变化中. 协调控制所有子协程.
这里我们先入为主的给出一个演示内容 main.c
#include <stdio.h> #include "coroutine.h" struct args { int n; }; static void _foo(void * comng, void * ud) { struct args * arg = ud; int start = arg->n; int i; for (i = 0;i<5;i++) { printf("coroutine %d : %d ", co_running(comng), start + i); co_yield(comng); } } static void _test(void * comng) { struct args arg1 = { 0 }; struct args arg2 = { 100 }; int co1 = co_create(comng, _foo, &arg1); int co2 = co_create(comng, _foo, &arg2); printf("main start "); while (co_status(comng, co1) && co_status(comng, co2)) { co_resume(comng, co1); co_resume(comng, co2); } printf("main end "); } /* * test coroutine demo */ int main(int argc, char * argv[]) { void * comng = co_start(); _test(comng); co_close(comng); return 0; }
演示结果
同样在window 上演示结果 也是如此
协程总的逻辑就是, 得到资源运行, 阻塞, 其它协程得到资源运行 这种定向跳转. 关于协程设计的总方针就是以上那些.
3.2 window实现封装
coroutine-window.c
#include "coroutine.h" #include <Windows.h> #include <string.h> #include <assert.h> #include <stdlib.h> // 纤程栈大小 #define _INT_STACK (1024 * 1024) // 默认初始化创建纤程数目 #define _INT_COROUTINE (16) /* * 单个纤程单元 coroutine , 还有纤程集管理器 comng */ struct coroutine; struct comng { PVOID main; // 纤程管理器中保存的临时纤程对象 int running; // 当前纤程管理器中运行的纤程id int nco; // 当前纤程集轮询中当前索引 int cap; // 纤程集容量, struct coroutine ** co; // 保存的纤程集 }; struct coroutine { PVOID ctx; // 操作系统纤程对象 co_f func; // 纤程执行的函数体 void * ud; // 纤程执行的额外参数 costatus_e status; // 当前纤程运行状态 struct comng * comng; // 当前纤程集管理器 }; /* * 开启纤程系统, 并创建主纤程 * : 返回开启的纤程调度系统管理器 */ inline comng_t co_start(void) { struct comng * comng = malloc(sizeof(struct comng)); assert(NULL != comng); comng->nco = 0; comng->running = -1; comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *)); assert(NULL != comng->co); // 开启Window协程 comng->main = ConvertThreadToFiberEx(NULL, FIBER_FLAG_FLOAT_SWITCH); return comng; } // 销毁一个纤程 static inline void _co_delete(struct coroutine * co) { DeleteFiber(co->ctx); free(co); } /* * 关闭开启的纤程系统 * comng : co_start 返回的纤程管理器 */ void co_close(comng_t comng) { int i; for (i = 0; i < comng->cap; ++i) { struct coroutine * co = comng->co[i]; if (co) { _co_delete(co); comng->co[i] = NULL; } } free(comng->co); comng->co = NULL; free(comng); ConvertFiberToThread(); } // 创建一个纤程对象 static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) { struct coroutine * co = malloc(sizeof(struct coroutine)); assert(co && comng && func); co->func = func; co->ud = ud; co->comng = comng; co->status = CS_Ready; return co; } /* * 创建一个纤程对象,并返回创建纤程的id. 创建好的纤程状态是CS_Ready * comng : co_start 返回的纤程管理器 * func : 纤程运行的主体 * ud : 用户传入的数据, co_f 中 ud 会使用 * : 返回创建好的纤程标识id */ int co_create(comng_t comng, co_f func, void * ud) { struct coroutine * co = _co_new(comng, func, ud); // 下面是普通情况, 可以找见 if (comng->nco < comng->cap) { int i; for (i = 0; i < comng->cap; ++i) { int id = (i + comng->nco) % comng->cap; if (NULL == comng->co[id]) { comng->co[id] = co; ++comng->nco; return id; } } assert(i == comng->cap); return -1; } // 需要重新分配空间, 构造完毕后返回 comng->co = realloc(comng->co, sizeof(struct coroutine *) * comng->cap * 2); assert(NULL != comng->co); memset(comng->co + comng->cap, 0, sizeof(struct coroutine *) * comng->cap); comng->cap <<= 1; comng->co[comng->nco] = co; return comng->nco++; } static inline VOID WINAPI _comain(LPVOID ptr) { struct comng * comng = ptr; int id = comng->running; struct coroutine * co = comng->co[id]; co->func(comng, co->ud); _co_delete(co); comng->co[id] = NULL; --comng->nco; comng->running = -1; } /* * 激活创建的纤程对象. * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 */ void co_resume(comng_t comng, int id) { struct coroutine * co; assert(comng->running == -1 && id >= 0 && id < comng->cap); co = comng->co[id]; if(NULL == co || co->status == CS_Dead) return; switch(co->status) { case CS_Ready: comng->running = id; co->status = CS_Running; co->ctx = CreateFiberEx(_INT_STACK, 0, FIBER_FLAG_FLOAT_SWITCH, _comain, comng); comng->main = GetCurrentFiber(); SwitchToFiber(co->ctx); break; case CS_Suspend: comng->running = id; co->status = CS_Running; comng->main = GetCurrentFiber(); SwitchToFiber(co->ctx); break; default: assert(0); } } /* * 中断当前运行的的纤程, 并将CPU交给主纤程处理调度. * comng : 纤程管理器对象 */ inline void co_yield(comng_t comng) { struct coroutine * co; int id = comng->running; assert(id >= 0); co = comng->co[id]; co->status = CS_Suspend; comng->running = -1; co->ctx = GetCurrentFiber(); SwitchToFiber(comng->main); } /* * 得到当前纤程运行的状态 * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 * : 返回状态具体参照 costatus_e */ inline costatus_e co_status(comng_t comng, int id) { assert(comng && id >=0 && id < comng->cap); return comng->co[id] ? comng->co[id]->status : CS_Dead; } /* * 得到当前纤程系统中运行的纤程, 返回 < 0表示没有纤程在运行 * comng : 纤程管理器对象 * : 返回当前运行的纤程标识id, */ inline int co_running(comng_t comng) { return comng->running; }
实现是非常四平八稳, 利用
struct comng { PVOID main; // 纤程管理器中保存的临时纤程对象 int running; // 当前纤程管理器中运行的纤程id int nco; // 当前纤程集轮询中当前索引 int cap; // 纤程集容量, struct coroutine ** co; // 保存的纤程集 };
comng :: co 中保存所有的协程对象, 不够就realloc, 够直接返回. 其中查询不是用的协程对象思路就是, 循环查找.
协程之间的跳转采用 先记录当前环境, 后跳转思路
co->ctx = GetCurrentFiber();
SwitchToFiber(comng->main);
思路还是主要参照云风大仙的, 实现起来还是很直白小巧的. 容易理解, 极力欢迎尝试. 写起来还是很爽的, 抄起来提高很快.
3.3 linux实现封装
coroutine-linux.c
#include "coroutine.h" #include <ucontext.h> #include <string.h> #include <assert.h> #include <stdlib.h> #include <stddef.h> #include <stdint.h> // 纤程栈大小 #define _INT_STACK (1024 * 1024) // 默认初始化创建纤程数目 #define _INT_COROUTINE (16) /* * 单个纤程单元 coroutine , 还有纤程集管理器 comng */ struct coroutine; struct comng { char stack[_INT_STACK]; ucontext_t main; // 纤程管理器中保存的临时纤程对象 int running; // 当前纤程管理器中运行的纤程id int nco; // 当前纤程集轮询中当前索引 int cap; // 纤程集容量, struct coroutine ** co; // 保存的纤程集 }; struct coroutine { char * stack; ucontext_t ctx; // 操作系统纤程对象 ptrdiff_t cap; ptrdiff_t size; co_f func; // 纤程执行的函数体 void * ud; // 纤程执行的额外参数 costatus_e status; // 当前纤程运行状态 struct comng * comng; // 当前纤程集管理器 }; /* * 开启纤程系统, 并创建主纤程 * : 返回开启的纤程调度系统管理器 */ inline comng_t co_start(void) { struct comng * comng = malloc(sizeof(struct comng)); assert(NULL != comng); comng->nco = 0; comng->running = -1; comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *)); assert(NULL != comng->co); return comng; } // 销毁一个纤程 static inline void _co_delete(struct coroutine * co) { free(co->stack); free(co); } /* * 关闭开启的纤程系统 * comng : co_start 返回的纤程管理器 */ void co_close(comng_t comng) { int i; for (i = 0; i < comng->cap; ++i) { struct coroutine * co = comng->co[i]; if (co) { _co_delete(co); comng->co[i] = NULL; } } free(comng->co); comng->co = NULL; free(comng); } // 创建一个纤程对象 static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) { struct coroutine * co = malloc(sizeof(struct coroutine)); assert(co && comng && func); co->func = func; co->ud = ud; co->comng = comng; co->status = CS_Ready; co->cap = 0; co->size = 0; co->stack = NULL; return co; } /* * 创建一个纤程对象,并返回创建纤程的id. 创建好的纤程状态是CS_Ready * comng : co_start 返回的纤程管理器 * func : 纤程运行的主体 * ud : 用户传入的数据, co_f 中 ud 会使用 * : 返回创建好的纤程标识id */ int co_create(comng_t comng, co_f func, void * ud) { struct coroutine * co = _co_new(comng, func, ud); // 下面是普通情况, 可以找见 if (comng->nco < comng->cap) { int i; for (i = 0; i < comng->cap; ++i) { int id = (i + comng->nco) % comng->cap; if (NULL == comng->co[id]) { comng->co[id] = co; ++comng->nco; return id; } } assert(i == comng->cap); return -1; } // 需要重新分配空间, 构造完毕后返回 comng->co = realloc(comng->co, sizeof(struct coroutine *) * comng->cap * 2); assert(NULL != comng->co); memset(comng->co + comng->cap, 0, sizeof(struct coroutine *) * comng->cap); comng->cap <<= 1; comng->co[comng->nco] = co; return comng->nco++; } static inline void _comain(uint32_t low32, uint32_t hig32) { uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hig32 << 32); struct comng * comng = (struct comng *)ptr; int id = comng->running; struct coroutine * co = comng->co[id]; co->func(comng, co->ud); _co_delete(co); comng->co[id] = NULL; --comng->nco; comng->running = -1; } /* * 激活创建的纤程对象. * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 */ void co_resume(comng_t comng, int id) { struct coroutine * co; uintptr_t ptr; assert(comng->running == -1 && id >= 0 && id < comng->cap); co = comng->co[id]; if(NULL == co || co->status == CS_Dead) return; switch(co->status) { case CS_Ready: comng->running = id; co->status = CS_Running; getcontext(&co->ctx); co->ctx.uc_stack.ss_sp = comng->stack; co->ctx.uc_stack.ss_size = _INT_STACK; co->ctx.uc_link = &comng->main; ptr = (uintptr_t)comng; makecontext(&co->ctx, (void (*)())_comain, 2, (uint32_t)ptr, (uint32_t)(ptr >> 32)); swapcontext(&comng->main, &co->ctx); break; case CS_Suspend: comng->running = id; co->status = CS_Running; // stack add is high -> low memcpy(comng->stack + _INT_STACK - co->size, co->stack, co->size); swapcontext(&comng->main, &co->ctx); break; default: assert(0); } } // 保存当前运行的堆栈信息 static void _save_stack(struct coroutine * co, char * top) { char dummy = 0; assert(top - &dummy <= _INT_STACK); if(co->cap < top - &dummy) { free(co->stack); co->cap = top - &dummy; co->stack = malloc(co->cap); assert(co->stack); } co->size = top - &dummy; memcpy(co->stack, &dummy, co->size); } /* * 中断当前运行的的纤程, 并将CPU交给主纤程处理调度. * comng : 纤程管理器对象 */ inline void co_yield(comng_t comng) { struct coroutine * co; int id = comng->running; assert(id >= 0); co = comng->co[id]; assert((char *)&co > comng->stack); _save_stack(co, comng->stack + _INT_STACK); co->status = CS_Suspend; comng->running = -1; swapcontext(&co->ctx, &comng->main); } /* * 得到当前纤程运行的状态 * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 * : 返回状态具体参照 costatus_e */ inline costatus_e co_status(comng_t comng, int id) { assert(comng && id >=0 && id < comng->cap); return comng->co[id] ? comng->co[id]->status : CS_Dead; } /* * 得到当前纤程系统中运行的纤程, 返回 < 0表示没有纤程在运行 * comng : 纤程管理器对象 * : 返回当前运行的纤程标识id, */ inline int co_running(comng_t comng) { return comng->running; }
对于linux上关于协程启动部分 static inline void _comain(uint32_t low32, uint32_t hig32)
函数声明方式, 主要为了解决gcc x64 编译接收的内存地址, 高地位顺序问题.
ptr = (uintptr_t)comng; makecontext(&co->ctx, (void (*)())_comain, 2, (uint32_t)ptr, (uint32_t)(ptr >> 32));
上面在实际调用中, 如果只用一个comng参数传过去, 到了_comain 中接收的 comng地址顺序就会错位. 以上就是linux上解决makecontext传地址错误的思路.
_save_stack 保存当前堆栈信息一个技巧性函数调用. 其它思路等同于window封装的那套库代码.
4.0 协程库融合
最终形态 coroutine.c
#include "coroutine.h" #include <string.h> #include <assert.h> #include <stdlib.h> // 纤程栈大小 #define _INT_STACK (1024 * 1024) // 默认初始化创建纤程数目 #define _INT_COROUTINE (16) /* * 单个纤程单元 coroutine , 还有纤程集管理器 comng */ struct coroutine; #if defined(__GNUC__) #include <ucontext.h> #include <stddef.h> #include <stdint.h> struct comng { char stack[_INT_STACK]; ucontext_t main; // 纤程管理器中保存的临时纤程对象 int running; // 当前纤程管理器中运行的纤程id int nco; // 当前纤程集轮询中当前索引 int cap; // 纤程集容量, struct coroutine ** co; // 保存的纤程集 }; struct coroutine { char * stack; ucontext_t ctx; // 操作系统纤程对象 ptrdiff_t cap; ptrdiff_t size; co_f func; // 纤程执行的函数体 void * ud; // 纤程执行的额外参数 costatus_e status; // 当前纤程运行状态 struct comng * comng; // 当前纤程集管理器 }; /* * 开启纤程系统, 并创建主纤程 * : 返回开启的纤程调度系统管理器 */ inline comng_t co_start(void) { struct comng * comng = malloc(sizeof(struct comng)); assert(NULL != comng); comng->nco = 0; comng->running = -1; comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *)); assert(NULL != comng->co); return comng; } // 销毁一个纤程 static inline void _co_delete(struct coroutine * co) { free(co->stack); free(co); } // 创建一个纤程对象 static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) { struct coroutine * co = malloc(sizeof(struct coroutine)); assert(co && comng && func); co->func = func; co->ud = ud; co->comng = comng; co->status = CS_Ready; co->cap = 0; co->size = 0; co->stack = NULL; return co; } static inline void _comain(uint32_t low32, uint32_t hig32) { uintptr_t ptr = (uintptr_t)low32 | ((uintptr_t)hig32 << 32); struct comng * comng = (struct comng *)ptr; int id = comng->running; struct coroutine * co = comng->co[id]; co->func(comng, co->ud); _co_delete(co); comng->co[id] = NULL; --comng->nco; comng->running = -1; } /* * 激活创建的纤程对象. * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 */ void co_resume(comng_t comng, int id) { struct coroutine * co; uintptr_t ptr; assert(comng->running == -1 && id >= 0 && id < comng->cap); co = comng->co[id]; if(NULL == co || co->status == CS_Dead) return; switch(co->status) { case CS_Ready: comng->running = id; co->status = CS_Running; getcontext(&co->ctx); co->ctx.uc_stack.ss_sp = comng->stack; co->ctx.uc_stack.ss_size = _INT_STACK; co->ctx.uc_link = &comng->main; ptr = (uintptr_t)comng; makecontext(&co->ctx, (void (*)())_comain, 2, (uint32_t)ptr, (uint32_t)(ptr >> 32)); swapcontext(&comng->main, &co->ctx); break; case CS_Suspend: comng->running = id; co->status = CS_Running; // stack add is high -> low memcpy(comng->stack + _INT_STACK - co->size, co->stack, co->size); swapcontext(&comng->main, &co->ctx); break; default: assert(0); } } // 保存当前运行的堆栈信息 static void _save_stack(struct coroutine * co, char * top) { char dummy = 0; assert(top - &dummy <= _INT_STACK); if(co->cap < top - &dummy) { free(co->stack); co->cap = top - &dummy; co->stack = malloc(co->cap); assert(co->stack); } co->size = top - &dummy; memcpy(co->stack, &dummy, co->size); } /* * 中断当前运行的的纤程, 并将CPU交给主纤程处理调度. * comng : 纤程管理器对象 */ inline void co_yield(comng_t comng) { struct coroutine * co; int id = comng->running; assert(id >= 0); co = comng->co[id]; assert((char *)&co > comng->stack); _save_stack(co, comng->stack + _INT_STACK); co->status = CS_Suspend; comng->running = -1; swapcontext(&co->ctx, &comng->main); } #endif #if defined(_MSC_VER) #include <Windows.h> #define inline __inline struct comng { PVOID main; // 纤程管理器中保存的临时纤程对象 int running; // 当前纤程管理器中运行的纤程id int nco; // 当前纤程集轮询中当前索引 int cap; // 纤程集容量, struct coroutine ** co; // 保存的纤程集 }; struct coroutine { PVOID ctx; // 操作系统纤程对象 co_f func; // 纤程执行的函数体 void * ud; // 纤程执行的额外参数 costatus_e status; // 当前纤程运行状态 struct comng * comng; // 当前纤程集管理器 }; /* * 开启纤程系统, 并创建主纤程 * : 返回开启的纤程调度系统管理器 */ inline comng_t co_start(void) { struct comng * comng = malloc(sizeof(struct comng)); assert(NULL != comng); comng->nco = 0; comng->running = -1; comng->co = calloc(comng->cap = _INT_COROUTINE, sizeof(struct coroutine *)); assert(NULL != comng->co); // 开启Window协程 comng->main = ConvertThreadToFiberEx(NULL, FIBER_FLAG_FLOAT_SWITCH); return comng; } // 销毁一个纤程 static inline void _co_delete(struct coroutine * co) { DeleteFiber(co->ctx); free(co); } // 创建一个纤程对象 static inline struct coroutine * _co_new(comng_t comng, co_f func, void * ud) { struct coroutine * co = malloc(sizeof(struct coroutine)); assert(co && comng && func); co->func = func; co->ud = ud; co->comng = comng; co->status = CS_Ready; return co; } static inline VOID WINAPI _comain(LPVOID ptr) { struct comng * comng = ptr; int id = comng->running; struct coroutine * co = comng->co[id]; co->func(comng, co->ud); _co_delete(co); comng->co[id] = NULL; --comng->nco; comng->running = -1; } /* * 激活创建的纤程对象. * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 */ void co_resume(comng_t comng, int id) { struct coroutine * co; assert(comng->running == -1 && id >= 0 && id < comng->cap); co = comng->co[id]; if(NULL == co || co->status == CS_Dead) return; switch(co->status) { case CS_Ready: comng->running = id; co->status = CS_Running; co->ctx = CreateFiberEx(_INT_STACK, 0, FIBER_FLAG_FLOAT_SWITCH, _comain, comng); comng->main = GetCurrentFiber(); SwitchToFiber(co->ctx); break; case CS_Suspend: comng->running = id; co->status = CS_Running; comng->main = GetCurrentFiber(); SwitchToFiber(co->ctx); break; default: assert(0); } } /* * 中断当前运行的的纤程, 并将CPU交给主纤程处理调度. * comng : 纤程管理器对象 */ inline void co_yield(comng_t comng) { struct coroutine * co; int id = comng->running; assert(id >= 0); co = comng->co[id]; co->status = CS_Suspend; comng->running = -1; co->ctx = GetCurrentFiber(); SwitchToFiber(comng->main); } #endif /* * 关闭开启的纤程系统 * comng : co_start 返回的纤程管理器 */ void co_close(comng_t comng) { int i; for (i = 0; i < comng->cap; ++i) { struct coroutine * co = comng->co[i]; if (co) { _co_delete(co); comng->co[i] = NULL; } } free(comng->co); comng->co = NULL; free(comng); } /* * 创建一个纤程对象,并返回创建纤程的id. 创建好的纤程状态是CS_Ready * comng : co_start 返回的纤程管理器 * func : 纤程运行的主体 * ud : 用户传入的数据, co_f 中 ud 会使用 * : 返回创建好的纤程标识id */ int co_create(comng_t comng, co_f func, void * ud) { struct coroutine * co = _co_new(comng, func, ud); // 下面是普通情况, 可以找见 if (comng->nco < comng->cap) { int i; for (i = 0; i < comng->cap; ++i) { int id = (i + comng->nco) % comng->cap; if (NULL == comng->co[id]) { comng->co[id] = co; ++comng->nco; return id; } } assert(i == comng->cap); return -1; } // 需要重新分配空间, 构造完毕后返回 comng->co = realloc(comng->co, sizeof(struct coroutine *) * comng->cap * 2); assert(NULL != comng->co); memset(comng->co + comng->cap, 0, sizeof(struct coroutine *) * comng->cap); comng->cap <<= 1; comng->co[comng->nco] = co; return comng->nco++; } /* * 得到当前纤程运行的状态 * comng : 纤程管理器对象 * id : co_create 创建的纤程对象 * : 返回状态具体参照 costatus_e */ inline costatus_e co_status(comng_t comng, int id) { assert(comng && id >=0 && id < comng->cap); return comng->co[id] ? comng->co[id]->status : CS_Dead; } /* * 得到当前纤程系统中运行的纤程, 返回 < 0表示没有纤程在运行 * comng : 纤程管理器对象 * : 返回当前运行的纤程标识id, */ inline int co_running(comng_t comng) { return comng->running; }
主要做的操作, 是通过 _MSC_VER 和 __GNUC__ 区分编译器, 执行相关操作.
无数的前戏到这里基本就是完工了. 精彩往往很短暂, 遇见都是幸运.
<<心愿>> http://music.163.com/#/song?id=379785
5.0 最后的话
All knowledge is, in final analysis, history.
All sciences are, in the abstract, mathematics.
All judgements are, in their rationale,
statistics.