zoukankan      html  css  js  c++  java
  • libco之协程分析

    代码来源

    libco是微信网络框架svrkit/summer的协程库。在网络IO操作较多的服务下,协程能够帮助提高服务的并发。在进行网络io操作的时候,让出cpu,服务更多的请求。

    重要的数据结构

    更大的图,见https://drive.google.com/file/d/1hyxb-5kwo2ezX8iRj5wjh0dJNy5oZXjv/view?usp=sharing

    协程上下文

    上下文定义,实际上,在这里对于X86_64系统,定义了14个寄存器,关于应该保存的寄存器应该有多少,在这篇文章用户态调度要保存些什么中有写。

    struct coctx_t
    {
    #if defined(__i386__)
    void *regs[ 8 ];
    #else
    // 14个寄存器
    void *regs[ 14 ];
    #endif
    size_t ss_size;
    //
    char *ss_sp;
    };
    

    对于协程的抽象,有如下几个接口:

    // 将整个结构体初始化0
    int coctx_init( coctx_t *ctx )
    {
        memset( ctx,0,sizeof(*ctx));
        return 0;
    }
    
    typedef void* (*coctx_pfn_t)( void* s, void* s2 );
    
    // 真正的初始化协程上下文
    int coctx_make( coctx_t *ctx, coctx_pfn_t pfn, const void *s, const void *s1 )
    {
        // 栈顶的指针是从高地址到低地址
        char *sp = ctx->ss_sp + ctx->ss_size;
        sp = (char*) ((unsigned long)sp & -16LL  );
        // 初始化所有寄存器内容为0
        memset(ctx->regs, 0, sizeof(ctx->regs));
        ctx->regs[ kRSP ] = sp - 8;
        // 返回函数地址
        ctx->regs[ kRETAddr] = (char*)pfn;
        // 设置第一个参数
        ctx->regs[ kRDI ] = (char*)s;
        // 设置第二个参数
        ctx->regs[ kRSI ] = (char*)s1;
        return 0;
    },
    

    看看寄存器在ctx->regs的分配情况

    //-------------
    // 64 bit
    //low | regs[0]: r15 |
    //    | regs[1]: r14 |
    //    | regs[2]: r13 |
    //    | regs[3]: r12 |
    //    | regs[4]: r9  |
    //    | regs[5]: r8  | 
    //    | regs[6]: rbp |
    //    | regs[7]: rdi |
    //    | regs[8]: rsi |
    //    | regs[9]: ret |  //ret func addr
    //    | regs[10]: rdx |
    //    | regs[11]: rcx | 
    //    | regs[12]: rbx |
    //hig | regs[13]: rsp |
    

    我们看看各个寄存器分配使用情况:

    看看定义的协程接口:

    int    co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr, void *(*routine)(void*),void *arg );
    void    co_resume( stCoRoutine_t *co );
    void    co_yield( stCoRoutine_t *co );
    void    co_yield_ct(); //ct = current thread
    void    co_release( stCoRoutine_t *co );
    

    我们看一下协程定义的环境结构体:

    struct stCoRoutineEnv_t
    {
        stCoRoutine_t *pCallStack[ 128 ];  // 协程栈,栈顶元素指向当前正在运行的协程
        int iCallStackSize;  // 栈的大小
        stCoEpoll_t *pEpoll;
    
    //for copy stack log lastco and nextco
    stCoRoutine_t* pending_co; 
    stCoRoutine_t* occupy_co;
    };
    

    我们看看co_create的使用

    int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr, pfn_co_routine_t pfn,void *arg )
    {
        // 获取当前进程的环境,如果该进程没有,那么分配内存,
        if( !co_get_curr_thread_env() )  
        {
            // 这里是分配stCroutine_t的空间
            co_init_curr_thread_env();
        }
        // 这里应该初始化stCoRoutine的
        stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
        *ppco = co;
        return 0;
    }
    
    stCoRoutineEnv_t *co_get_curr_thread_env() {
        return g_arrCoEnvPerThread[ GetPid() ];
    }
    

    下面的代码是用来初始化一个协程

    struct stCoRoutine_t *co_create_env( stCoRoutineEnv_t * env, const stCoRoutineAttr_t* attr,  pfn_co_routine_t pfn,  void *arg )
    {
        stCoRoutineAttr_t at;
        if ( attr ) {
            memcpy( &at,attr,sizeof(at) );
        }
        if( at.stack_size <= 0 ) {
        // 默认使用 128 K 字节
        at.stack_size = 128 * 1024;
        }
        // 最多使用8M
        else if( at.stack_size > 1024 * 1024 * 8 ) {
            at.stack_size = 1024 * 1024 * 8;
        }
        
        // 这里应该是地址对齐
        if( at.stack_size & 0xFFF )  {
            at.stack_size &= ~0xFFF;
            at.stack_size += 0x1000;
        }
        // 创建协程结构体
        stCoRoutine_t *lp = (stCoRoutine_t*)malloc( sizeof(stCoRoutine_t) );
        memset( lp,0,(long)(sizeof(stCoRoutine_t))); 
        // 设置协程的环境
        lp->env = env;
        lp->pfn = pfn;
        lp->arg = arg;
        stStackMem_t* stack_mem = NULL;
        if( at.share_stack ) {
            stack_mem = co_get_stackmem( at.share_stack);
            at.stack_size = at.share_stack->stack_size;
        }
        else {
            stack_mem = co_alloc_stackmem(at.stack_size);
        }
        // 分配栈空间
        lp->stack_mem = stack_mem;
        // 修改上下文的栈地址
        lp->ctx.ss_sp = stack_mem->stack_buffer;
        // 修改栈大小
        lp->ctx.ss_size = at.stack_size;
    
        lp->cStart = 0;
        lp->cEnd = 0;
        lp->cIsMain = 0;
        lp->cEnableSysHook = 0;
        lp->cIsShareStack = at.share_stack != NULL;
    
        lp->save_size = 0;
        lp->save_buffer = NULL;
       return lp;
    }
    

    co_resume源码,这个是重点,表示切换协程

    void co_resume( stCoRoutine_t *co ) {
        // 获取当前协程所在线程的环境
        stCoRoutineEnv_t *env = co->env;
        // 获取当前运行的协程
        stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
        // 如果要切换的协程没有开始运行过
        if( !co->cStart ) {
          // 初始化协程的上下文
         coctx_make( &co->ctx, (coctx_pfn_t)CoRoutineFunc, co, 0 );
         co->cStart = 1;
      }
       // 设置当前运行的协程为co
       env->pCallStack[ env->iCallStackSize++ ] = co;
        // 切换上下文,执行co
        co_swap( lpCurrRoutine, co );
    }
    

    co_swap的定义

    void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co);
    

    实现:

    // 交换上下文,这是关键
    void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co) {
        // 获取当前线程的环境变量
        stCoRoutineEnv_t* env = co_get_curr_thread_env();
        // get curr stack sp
        char c;
        // 获取当前线程栈底指针
        curr->stack_sp= &c;
        
        if (!pending_co->cIsShareStack)
        {
            env->pending_co = NULL;
            env->occupy_co = NULL;
        }
        else 
        {
            // 设置下一个协程
            env->pending_co = pending_co;
            //get last occupy co on the same stack mem
            stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
            //set pending co to occupy thest stack mem;
            // 设置当前栈空间的协程为pending_co
            pending_co->stack_mem->occupy_co = pending_co;
            // 设置之前的协程,记录下来
            env->occupy_co = occupy_co;
            if (occupy_co && occupy_co != pending_co)
            {
                // 拷贝共享栈中的栈空间到自己的私有栈。
                save_stack_buffer(occupy_co);
            }
        }
    
        // swap context
        // 并执行pending_co->ctx
        coctx_swap(&(curr->ctx),&(pending_co->ctx) );
    
        // stack buffer may be overwrite, so get again;
        stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
        stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
        stCoRoutine_t* update_pending_co = curr_env->pending_co;
        // 切进来的协程执行完毕,要将之前在save_buffer中保存协程上下文恢复过来。
        if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co)
        {
            //resume stack buffer
            if (update_pending_co->save_buffer && update_pending_co->save_size > 0)
            {
                memcpy(update_pending_co->stack_sp, update_pending_co->save_buffer, update_pending_co->save_size);
            }
        }
    }
    

    栈空间的保存:

    void save_stack_buffer(stCoRoutine_t* occupy_co)
    {
        ///copy out
        stStackMem_t* stack_mem = occupy_co->stack_mem;
        int len = stack_mem->stack_bp - occupy_co->stack_sp;
        // 之前已经保存过,那么释放之前保存的上下文。
        if (occupy_co->save_buffer)
        {
            // 删除释放
            free(occupy_co->save_buffer), occupy_co->save_buffer = NULL;
        }
    
        occupy_co->save_buffer = (char*)malloc(len); //malloc buf;
        occupy_co->save_size = len;
       // 将当前栈空间的内容拷贝过来。注意这里的是将当前的栈空间的内容保存到
       // save_buffer只能,这里要更重的是stack_bp是什么时候改变的。 
        memcpy(occupy_co->save_buffer, occupy_co->stack_sp, len);
    }
    

    上下文切换

    对于这部分的代码实际上是汇编写的,我们看看

    .globl coctx_swap
    #if !defined( __APPLE__ ) && !defined( __FreeBSD__ )
    .type  coctx_swap, @function
    #endif
    coctx_swap:
    
    #if defined(__i386__)
        ..... 
    #elif defined(__x86_64__)
    leaq 8(%rsp),%rax
    leaq 112(%rdi),%rsp
    pushq %rax
    pushq %rbx
    pushq %rcx
    pushq %rdx
    
    pushq -8(%rax) //ret func addr
    
    pushq %rsi
    pushq %rdi
    pushq %rbp
    pushq %r8
    pushq %r9
    pushq %r12
    pushq %r13
    pushq %r14
    pushq %r15
    
    movq %rsi, %rsp
    popq %r15
    popq %r14
    popq %r13
    popq %r12
    popq %r9
    popq %r8
    popq %rbp
    popq %rdi
    popq %rsi
    popq %rax //ret func addr
    popq %rdx
    popq %rcx
    popq %rbx
    popq %rsp
    pushq %rax
    xorl %eax, %eax
    ret
    #endif
    

    leaq 用于把其第一个参数的值赋值给第二个寄存器参数。第一条语句用来把 8(%rsp) 的本身的值存入到 %rax 中,注意这里使用的并不是 8(%rsp) 指向的值,而是把 8(%rsp) 表示的地址赋值给了 %rax。这一地址是父函数栈帧中除返回地址外栈帧顶的位置

    在第二条语句leaq 112(%rdi), %rsp中,%rdi 存放的是coctx_swap第一个参数的值,这一参数是指向 coctx_t 类型的指针,表示当前要切出的协程,这一类型的定义如下:

    struct coctx_t
    {
    #if defined(__i386__)
    void *regs[ 8 ];
    #else
    // 14个寄存器
    void *regs[ 14 ];
    #endif
    size_t ss_size;
    //
    char *ss_sp;
    };
    

    因而 112(%rdi) 表示的就是第一个协程的 coctx_t 中 regs[14] 数组的下一个64位地址。而接下来的语句:

      pushq %rax  
      pushq %rbx
      pushq %rcx
      pushq %rdx
      pushq -8(%rax) //ret func addr
      pushq %rsi
      pushq %rdi
      pushq %rbp
      pushq %r8
      pushq %r9
      pushq %r12
      pushq %r13
      pushq %r14
      pushq %r15
    

    第一条语句 pushq %rax 用于把 %rax 的值放入到 regs[13] 中,resg[13] 用来存储第一个协程的 %rsp 的值。这时 %rax 中的值是第一个协程 coctx_swap 父函数栈帧除返回地址外栈帧顶的地址。由于 regs[] 中有单独的元素存储返回地址,栈中再保存返回地址是无意义的,因而把父栈帧中除返回地址外的栈帧顶作为要保存的 %rsp 值是合理的。当协程恢复时,把保存的 regs[13] 的值赋值给 %rsp 即可恢复本协程 coctx_swap 父函数堆栈指针的位置。第一条语句之后的语句就是用pushq 把各CPU 寄存器的值依次从 regs 尾部向前压入。即通过调整%rsp 把 regs[14] 当作堆栈,然后利用 pushq 把寄存器的值和返回地址存储到 regs[14] 整个数组中。regs[14] 数组中各元素与其要存储的寄存器对应关系如下:

    //-------------
    // 64 bit
    //low | regs[0]: r15 |
    //    | regs[1]: r14 |
    //    | regs[2]: r13 |
    //    | regs[3]: r12 |
    //    | regs[4]: r9  |
    //    | regs[5]: r8  | 
    //    | regs[6]: rbp |
    //    | regs[7]: rdi |
    //    | regs[8]: rsi |
    //    | regs[9]: ret |  //ret func addr, 对应 rax
    //    | regs[10]: rdx |
    //    | regs[11]: rcx | 
    //    | regs[12]: rbx |
    //hig | regs[13]: rsp |
    

    接下来的汇编语句:

      movq %rsi, %rsp
      popq %r15
      popq %r14
      popq %r13
      popq %r12
      popq %r9
      popq %r8
      popq %rbp
      popq %rdi
      popq %rsi
      popq %rax //ret func addr
      popq %rdx
      popq %rcx
      popq %rbx
      popq %rsp
    

    这里用的方法还是通过改变%rsp 的值,把某块内存当作栈来使用。第一句 movq %rsi, %rsp 就是让%rsp 指向 coctx_swap 第二个参数,这一参数表示要进入的协程。而第二个参数也是coctx_t 类型的指针,即执行完 movq 语句后,%rsp 指向了第二个参数 coctx_t 中 regs[0],而之后的pop 语句就是用 regs[0-13] 中的值填充cpu 的寄存器,这里需要注意的是popq 会使得 %rsp 的值增加而不是减少,这一点保证了会从 regs[0] 到regs[13] 依次弹出到 cpu 寄存器中。在执行完最后一句 popq %rsp 后,%rsp 已经指向了新协程要恢复的栈指针(即新协程之前调用 coctx_swap 时父函数的栈帧顶指针),由于每个协程都有一个自己的栈空间,可以认为这一语句使得%rsp 指向了要进入协程的栈空间。

    coctx_swap 中最后三条语句如下:

      pushq %rax
      xorl %eax, %eax
      ret
    

    pushq %rax 用来把%rax 的值压入到新协程的栈中,这时 %rax 是要进入的目标协程的返回地址,即要恢复的执行点。然后用 xorl 把 %rax 低32位清0以实现地址对齐。最后ret 语句用来弹出栈的内容,并跳转到弹出的内容表示的地址处,而弹出的内容正好是上面 pushq %rax 时压入的 %rax 的值,即之前保存的此协程的返回地址。即最后这三条语句实现了转移到新协程返回地址处执行,从而完成了两个协程的切换。可以看出,这里通过调整%rsp 的值来恢复新协程的栈,并利用了 ret 语句来实现修改指令寄存器 %rip 的目的,通过修改 %rip 来实现程序运行逻辑跳转。注意%rip 的值不能直接修改,只能通过 call 或 ret 之类的指令来间接修改。

    参考资料

  • 相关阅读:
    在ubuntu系统使用SSR
    Pandas库
    Numpy
    06-Python之标准库
    do{}while(0)
    inet_XX族函数
    大端小端
    c++ 强制类型转换
    auto类型推导
    const浅析
  • 原文地址:https://www.cnblogs.com/bofengqiye/p/8835018.html
Copyright © 2011-2022 走看看