zoukankan      html  css  js  c++  java
  • bthread上下文的创建(转)

    本来是学习butil::get_object搜到的这篇文章,写得很详细,转过来备忘。

    转自:https://zhuanlan.zhihu.com/p/347499412

    在之前的文章有介绍过bthread上下文的切换(jump_stack,bthread栈的切换),其中涉及了汇编语言。本文来讲一讲与之对应的另外一个操作:上下文的创建(get_stack(),bthread栈的创建)。

    其实涉及到上下文创建的有两处,一处是TaskGroup初始化的时候,另外一个就是TaskGroup在死循环获取任务执行任务的时候,在jump_stack()之前会调用get_stack()。

    先看一下TaskGroup的初始化。

    TaskControl::create_group()

    TaskGroup* TaskControl::create_group() {
        TaskGroup* g = new (std::nothrow) TaskGroup(this);
        if (NULL == g) {
            LOG(FATAL) << "Fail to new TaskGroup";
            return NULL;
        }
        if (g->init(FLAGS_task_group_runqueue_capacity) != 0) {
            LOG(ERROR) << "Fail to init TaskGroup";
            delete g;
            return NULL;
        }
        if (_add_group(g) != 0) {
            delete g;
            return NULL;
        }
        return g;
    }

    在TaskGroup初始化的时候,会创建TaskGroup,并调用TaskGroup::init()初始化。

    TaskGroup::init()

    int TaskGroup::init(size_t runqueue_capacity) {
        if (_rq.init(runqueue_capacity) != 0) {
            LOG(FATAL) << "Fail to init _rq";
            return -1;
        }
        if (_remote_rq.init(runqueue_capacity / 2) != 0) {
            LOG(FATAL) << "Fail to init _remote_rq";
            return -1;
        }
        ContextualStack* stk = get_stack(STACK_TYPE_MAIN, NULL);
       ...

    可以看出gflag变量 FLAGS_task_group_runqueue_capacity 控制着TG中rq和remote_rq队列的容量(默认是4096),如果你想扩大TG中两个任务队列的大小,请修改task_group_runqueue_capacity这个gflags。当前这是题外话。

    重点关注一下get_stack()。初始化的时候调用getstack(),第二个参数是NULL。这便是get_stack()第一类调用的地方。另外一处是在TaskGroup::ending_sched()中。

    TaskGroup::ending_sched()

    TaskMeta* const cur_meta = g->_cur_meta;
        TaskMeta* next_meta = address_meta(next_tid);
        if (next_meta->stack == NULL) {
            if (next_meta->stack_type() == cur_meta->stack_type()) {
                next_meta->set_stack(cur_meta->release_stack());
            } else {
                ContextualStack* stk = get_stack(next_meta->stack_type(), task_runner);
                if (stk) {
                    next_meta->set_stack(stk);
                } else {
                    next_meta->attr.stack_type = BTHREAD_STACKTYPE_PTHREAD;
                    next_meta->set_stack(g->_main_stack);
                }
            }
        }
        sched_to(pg, next_meta);

    这里也会调用get_stack(),其第二个参数是task_runner而不是NULL了。这里会获取一个表示栈结构的stk,赋值给next_meta。在最后的sched_to()中会调用之前介绍过的jump_stack()

    get_stack()

    src/bthread/stack_inl.h中

    inline ContextualStack* get_stack(StackType type, void (*entry)(intptr_t)) {
        switch (type) {
        case STACK_TYPE_PTHREAD:
            return NULL;
        case STACK_TYPE_SMALL:
            return StackFactory<SmallStackClass>::get_stack(entry);
        case STACK_TYPE_NORMAL:
            return StackFactory<NormalStackClass>::get_stack(entry);
        case STACK_TYPE_LARGE:
            return StackFactory<LargeStackClass>::get_stack(entry);
        case STACK_TYPE_MAIN:
            return StackFactory<MainStackClass>::get_stack(entry);
        }
        return NULL;
    }

    根据栈类型的不同,调用不同的工厂函数去做实际的get_stack()操作。这里合法的栈类型公用4种,分别是:

    1. SmallStackClass
    2. NormalStackClass
    3. LargeStackClass
    4. MainStackClass

    而这4中类型又需要分成两类,MainStackClass自成一类,其余三个为一类。为什么这么说呢?

    因为SmallStackClass、NormalStackClass、LargeStackClass用到是StackFactory的通用模板:template<typename StackClass> struct StackFactory 而MainStackClass用到的是特化模板: template <> struct StackFactory<MainStackClass>

    StackFactory通用模板

    先看一下StackFactory的通用模板定义:

    template <typename StackClass> struct StackFactory {
    
        struct Wrapper : public ContextualStack {
            explicit Wrapper(void (*entry)(intptr_t)) {
                if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
                                           FLAGS_guard_page_size) != 0) {
                    storage.zeroize();
                    context = NULL;
                    return;
                }
                context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);
                stacktype = (StackType)StackClass::stacktype;
            }
            ~Wrapper() {
                if (context) {
                    context = NULL;
                    deallocate_stack_storage(&storage);
                    storage.zeroize();
                }
            }
        }; // end of struct Wrapper
        
        static ContextualStack* get_stack(void (*entry)(intptr_t)) {
            return butil::get_object<Wrapper>(entry);
        }
        
        static void return_stack(ContextualStack* sc) {
            butil::return_object(static_cast<Wrapper*>(sc));
        }
    };

    它包含两个成员函数,一是获取栈(get_statck),另外一个是归还栈(return_stack)。所谓的获取栈就是创建ContextualStack(子类)对象,然后做了初始化。“归还栈”则是“获取栈”的逆操作。

    另外StackFactory模板中有一内部类Wrapper,它是ContextualStack的子类。StackFactory成员函数get_stack()和return_stack()操作的其实就是Wrapper类型。

    Wrapper的构造函数接收一个参数entry,entry的类型是一个函数指针。void(*entry)(intptr_t)表示的是参数类型为intptr_t,返回值为void的函数指针。intptr_t 是和一个机器相关的整数类型,在64位机器上对应的是long,在32位机器上对应的是int。

    其实entry只有两个值,一种是NULL,另外一个就是 TaskGroup中的static函数:task_runner()。

     static void task_runner(intptr_t skip_remained);
     

    构造函数内会调用allocate_stack_storage()分配栈空间,接着对storage、context、stacktype的初始化。这三个是父类ContextualStack的成员。

    其中context的初始化会调用bthread_make_fcontext()函数。还记得在前面文章中解读过的bthread_jump_fcontext()吗?没错,这个就是和他一起定义的另外一个汇编语言实现的函数。这里先按下不表。

    Wrapper析构的时候会调用deallocate_stack_storage()释放占空间,并重置三个成员变量。

     

    StackFactory<MainStackClass> 特化模板

    再看一下MainStackClass的特化模板:

    template <> struct StackFactory<MainStackClass> {
        static ContextualStack* get_stack(void (*)(intptr_t)) {
            ContextualStack* s = new (std::nothrow) ContextualStack;
            if (NULL == s) {
                return NULL;
            }
            s->context = NULL;
            s->stacktype = STACK_TYPE_MAIN;
            s->storage.zeroize();
            return s;
        }
        
        static void return_stack(ContextualStack* s) {
            delete s;
        }
    };

    比较简洁,最大的区别就是它没有Wrapper,没有调用bthread_make_fcontext(),也就是没有分配上下文。

    ContextualStack类型

    好了,我们看下ContextualStack定义:

    struct ContextualStack {
        bthread_fcontext_t context;
        StackType stacktype;
        StackStorage storage;
    };

    bthread_fcontext_t其实是void*的别名。

    StackType是栈类型的枚举,所以 stacktype用来记录栈的类型。

    enum StackType {
        STACK_TYPE_MAIN = 0,
        STACK_TYPE_PTHREAD = BTHREAD_STACKTYPE_PTHREAD,
        STACK_TYPE_SMALL = BTHREAD_STACKTYPE_SMALL,
        STACK_TYPE_NORMAL = BTHREAD_STACKTYPE_NORMAL,
        STACK_TYPE_LARGE = BTHREAD_STACKTYPE_LARGE
    };

     

    StackStorage是具体表示栈信息的:

    struct StackStorage {
         int stacksize;
         int guardsize;
        // Assume stack grows upwards.
        // http://www.boost.org/doc/libs/1_55_0/libs/context/doc/html/context/stack.html
        void* bottom;
        unsigned valgrind_stack_id;
    
        // Clears all members.
        void zeroize() {
            stacksize = 0;
            guardsize = 0;
            bottom = NULL;
            valgrind_stack_id = 0;
        }
    };

     

    视线上移,重回StackFactory的通用模板,在Warpper的构造函数中有调用allocate_stack_storage()分配栈存储。我们看下:

    allocate_stack_storage()

    三种使用通用模板的栈类型,其主要差异就在于分配的栈大小不同了。

    allocate_stack_storage函数声明如下:

    // Allocate a piece of stack.
    int allocate_stack_storage(StackStorage* s, int stacksize, int guardsize);


    第一个参数是表示存储的指针s,表示栈大小的stacksize,表示保护页大小的guardsize。

    先看下它是如何被调用的:

    if (allocate_stack_storage(&storage, *StackClass::stack_size_flag,
                                           FLAGS_guard_page_size) != 0) {
    ...
    }

    保护页的大小guardsize是通过gflag定义的,对应FLAGS_guard_page_size 其默认值是4096。

    栈大小stacksize也就对应的三种栈类型中的stack_size_flag,也都是通过gflag定义:

    int* SmallStackClass::stack_size_flag = &FLAGS_stack_size_small;  // 默认值32768
    int* NormalStackClass::stack_size_flag = &FLAGS_stack_size_normal;// 默认值1048576
    int* LargeStackClass::stack_size_flag = &FLAGS_stack_size_large;  // 默认值8388608
    大家可以自己思考一下:为什么stack_size_flag要定义成int*指针类型,而不是直接定义成int类型?

    开始看allocate_stack_storage()的实现,它的定义代码很长,我们分段来看。

    int allocate_stack_storage(StackStorage* s, int stacksize_in, int guardsize_in) {
        
        const static int PAGESIZE = getpagesize();
        const int PAGESIZE_M1 = PAGESIZE - 1;
        const int MIN_STACKSIZE = PAGESIZE * 2;
        const int MIN_GUARDSIZE = PAGESIZE;

    在源文件定义中,参数二三的名称有调整,换成了stacksize_in和guardsize_in。它们就是刚才我们说的stacksize和guardsize(之所以改了个名字是因为下面还有变量会用到stacksize和guardsize这两个名字)。

    getpagesize()是<unistd.h>中的库函数,用来获取系统的一个分页的大小(所在内存的字节数)。上面共定义了4个页大小相关的变量。

    // Align stacksize
        const int stacksize =
            (std::max(stacksize_in, MIN_STACKSIZE) + PAGESIZE_M1) &
            ~PAGESIZE_M1;

    这里涉及到二进制运算,其实就是让内存大小按照页大小对齐(也就是页大小的整数倍)。可能理解计算过程会比较绕,不过我直接说一下结论就好。比如在我的Linux和Mac上页大小都是4096,然后经过上述运算stacksize的值基本上都是和传入的stacksize_in相同!这是因为三种栈的大小已经是4096的整数倍了。好了,不用纠结,我们继续。

    if (guardsize_in <= 0) {
        ...
        ...
        ...
    } else {

    因为我们的guardsize_in 默认是4096的(一般也没人去改它),我们直接忽略这个if里面的代码,直接看else。

     // Align guardsize
            const int guardsize =
                (std::max(guardsize_in, MIN_GUARDSIZE) + PAGESIZE_M1) &
                ~PAGESIZE_M1;

    和前面一样的计算过程,进行对齐。在我的Linux上计算之后的guardsize就是4096,等同于guardsize_in。这个毋庸置疑。

     

     const int memsize = stacksize + guardsize;
            void* const mem = mmap(NULL, memsize, (PROT_READ | PROT_WRITE),
                                   (MAP_PRIVATE | MAP_ANONYMOUS), -1, 0);
            if (MAP_FAILED == mem) {
                PLOG_EVERY_SECOND(ERROR) 
                    << "Fail to mmap size=" << memsize << " stack_count="
                    << s_stack_count.load(butil::memory_order_relaxed)
                    << ", possibly limited by /proc/sys/vm/max_map_count";
                // may fail due to limit of max_map_count (65536 in default)
                return -1;
            }

    用mmap分配一块内存,大小是stacksize,guardsize之和。

     void* aligned_mem = (void*)(((intptr_t)mem + PAGESIZE_M1) & ~PAGESIZE_M1);
            if (aligned_mem != mem) {
                LOG_ONCE(ERROR) << "addr=" << mem << " returned by mmap is not "
                    "aligned by pagesize=" << PAGESIZE;
            }

    这个是判断一下mmap返回的内存地址是不是按照页大小对齐的。如果不是就打一行ERROR日志。

    const int offset = (char*)aligned_mem - (char*)mem;
            if (guardsize <= offset ||
                mprotect(aligned_mem, guardsize - offset, PROT_NONE) != 0) {
                munmap(mem, memsize);
                PLOG_EVERY_SECOND(ERROR) 
                    << "Fail to mprotect " << (void*)aligned_mem << " length="
                    << guardsize - offset; 
                return -1;
            }

    计算offset,当不对齐的时候offset会大于0。接着如果offset大于保护页的大小,直接返回-1。如果offset小于保护页的大小,就调用mprotect()把多余的字节(guardsize - offset)设置成不可访问(PROT_NONE)。

    s_stack_count.fetch_add(1, butil::memory_order_relaxed);
    全局原子变量s_stack_count 加1。
    s->bottom = (char*)mem + memsize;
            s->stacksize = stacksize;
            s->guardsize = guardsize;

    给allocate_stack_storage()第一个参数s的三个字段赋值。

    s->bottom存储的是栈底部的地址,因为mem是开始的地址,memsize是长度,二者相加就到尾部了。

            if (RunningOnValgrind()) {
                s->valgrind_stack_id = VALGRIND_STACK_REGISTER(
                    s->bottom, (char*)s->bottom - stacksize);
            } else {
                s->valgrind_stack_id = 0;
            }

    如果当前是在运行Valgrind(检查内存泄漏的工具)则执行一些逻辑。这个是调试和分析时用的,可以忽略这段逻辑。

    接下来我们重新回到get_stack()这个函数上来,在StackFactory中:

        static ContextualStack* get_stack(void (*entry)(intptr_t)) {
            return butil::get_object<Wrapper>(entry);
        }
    butil::get_object()

    butil::get_object()是brpc实现的对象池相关函数。定义在butil/object_pool_inl.h 中,get_object()是一个模板函数,有三个重载,分别支持构造函数为0个参数、1个参数、2个参数的类对象。

    在我们这里的场景中,用到的是1个参数重载:

        template <typename A1>
        inline T* get_object(const A1& arg1) {
            LocalPool* lp = get_or_new_local_pool();
            if (BAIDU_LIKELY(lp != NULL)) {
                return lp->get(arg1);
            }
            return NULL;
        }

    BAIDU_LIKELY是一个宏,直接展开:

        template <typename A1>
        inline T* get_object(const A1& arg1) {
            LocalPool* lp = get_or_new_local_pool();
            if (__builtin_expect((bool)(lp != __null), true)) {
                return lp->get(arg1);
            }
            return NULL;
        }

    get_or_new_local_pool()是获取一个段内存区lp(这个是thread local的)

    下面的__builtin_expect()是gcc扩展函数,方便编译器做分支预测优化的。这里表示就是lp 大概率都不等于NULL,会比写普通的if (lp != __null)性能更好。但逻辑上是等价的:

           if (lp != __null) {
                return lp->get(arg1);
            }
    看下lp->get(arg1)的实现(还是butil/object_pool_inl.h 中)。这个get()也是有三个重载,分别支持0个参数,1个参数和2个参数。
            template <typename A1>
            inline T* get(const A1& a1) {
                BAIDU_OBJECT_POOL_GET((a1));
            }

    BAIDU_OBJECT_POOL_GET是一个复杂的宏。这个就是所谓对象池的主要逻辑了,我这里直接展开,然后添加一些注释。

    // 如果对象池中有剩余,则直接返回
    if (_cur_free.nfree) {
        BAIDU_OBJECT_POOL_FREE_ITEM_NUM_SUB1;
        return _cur_free.ptrs[--_cur_free.nfree];
    }
    // 对象池中无剩余,TODO
    if (_pool->pop_free_chunk(_cur_free)) {
        BAIDU_OBJECT_POOL_FREE_ITEM_NUM_SUB1;
        return _cur_free.ptrs[--_cur_free.nfree];
    }
    // 使用定位new,在指定内存位置去构造对象。
    // 在我们这个场景中就是构造Wrapper对象,a1就是传入的函数指针
    // 如果成功则直接把构造好的对象指针返回
    if (_cur_block && _cur_block->nitem < BLOCK_NITEM) {
        T *obj = new ((T *)_cur_block->items + _cur_block->nitem) T(a1);
        if (!ObjectPoolValidator<T>::validate(obj)) {
            obj->~T();
            return NULL;
        }
        ++_cur_block->nitem;
        return obj;
    }
    // 走到这说明构造对象失败了,则新建一个block
    // 还是用定位new,在指定位置构造对象
    _cur_block = add_block(&_cur_block_index);
    if (_cur_block != NULL) {
        T *obj = new ((T *)_cur_block->items + _cur_block->nitem) T(a1);
        if (!ObjectPoolValidator<T>::validate(obj)) {
            obj->~T();
            return NULL;
        }
        ++_cur_block->nitem;
        return obj;
    }
    return NULL;

    在上面代码中obj构造完成之后,返回之前。都会做一个if(!ObjectPoolValidator<T>::validate(obj))的验证。顾名思义是去验证一下obj是否是有效的。通用模板恒为true。

    template <typename T> struct ObjectPoolValidator {
        static bool validate(const T*) { return true; }
    };

    不同的类型可以自己实现特化的模板,比如我们的三种栈类型:

    template <> struct ObjectPoolValidator<
        bthread::StackFactory<bthread::LargeStackClass>::Wrapper> {
        inline static bool validate(
            const bthread::StackFactory<bthread::LargeStackClass>::Wrapper* w) {
            return w->context != NULL;
        }
    };
    template <> struct ObjectPoolValidator<
        bthread::StackFactory<bthread::NormalStackClass>::Wrapper> {
        inline static bool validate(
            const bthread::StackFactory<bthread::NormalStackClass>::Wrapper* w) {
            return w->context != NULL;
        }
    };
    
    template <> struct ObjectPoolValidator<
        bthread::StackFactory<bthread::SmallStackClass>::Wrapper> {
        inline static bool validate(
            const bthread::StackFactory<bthread::SmallStackClass>::Wrapper* w) {
            return w->context != NULL;
        }
    };

    一定要context不为NULL才是有效的。

    至此大部分基本讲完了。还剩一个重点没讲,那就是汇编实现的bthread_make_fcontext()!

    bthread_make_fcontext()

    先回顾一下它被调用的地方:

    context = bthread_make_fcontext(storage.bottom, storage.stacksize, entry);


    bthread_make_fcontext()作用是在当前栈顶创建一个上下文,用来执行第三个参数表示的函数entry。返回ContextualStack*类型上下文 。通过前文我们知道entry只有两种取值,一个是NULL,另外一个就是task_runner。

        static void task_runner(intptr_t skip_remained);

     

    看下bthread_make_fcontext()的定义吧,src/bthread/context.cpp中

    #if defined(BTHREAD_CONTEXT_PLATFORM_linux_x86_64) && defined(BTHREAD_CONTEXT_COMPILER_gcc)
    __asm (
    ".text
    "
    ".globl bthread_make_fcontext
    "
    ".type bthread_make_fcontext,@function
    "
    ".align 16
    "
    "bthread_make_fcontext:
    "
    "    movq  %rdi, %rax
    "
    "    andq  $-16, %rax
    "
    "    leaq  -0x48(%rax), %rax
    "
    "    movq  %rdx, 0x38(%rax)
    "
    "    stmxcsr  (%rax)
    "
    "    fnstcw   0x4(%rax)
    "
    "    leaq  finish(%rip), %rcx
    "
    "    movq  %rcx, 0x40(%rax)
    "
    "    ret 
    "
    "finish:
    "
    "    xorq  %rdi, %rdi
    "
    "    call  _exit@PLT
    "
    "    hlt
    "
    ".size bthread_make_fcontext,.-bthread_make_fcontext
    "
    ".section .note.GNU-stack,"",%progbits
    "
    );
    
    #endif

    bthread_make_fcontext()逻辑没有bthread_jump_fcontext()复杂。

    逐步来看汇编代码。

    movq  %rdi, %rax

    %rdi存储的是 第一个参数(也就是storage.bottom)复制到%rax寄存器中。

    andq  $-16, %rax

    %rax 存储的值减去16,表示对齐。设第一个参数为n(也就是storage.bottom),则这个命令表示 %rax=(8n+22)&-16 求得storage.bottom向下舍入16的最小的倍数,当n为奇数的时候为8n+8;当n为偶数的时候为8n+16; %rax 是用法作为返回值的,这里也就是通过storage.bottom计算出一个实际要返回的栈地址(不是直接返回storage.bottom)

    leaq  -0x48(%rax), %rax

    %rax存储地址减去72,再存入%rax寄存器中。

    movq  %rdx, 0x38(%rax)

    %rdx存储的是第三个参数(也就是函数指针变量entry)存入%rax指向地址+56的位置。

    stmxcsr  (%rax)
    fnstcw   0x4(%rax)

    保存MXCSR寄存器的值到%rax指向地址,保存当前FPU状态字到%rax+4的地址。(bthread_jump_fcontext 中也有类似操作)

    leaq  finish(%rip), %rcx

    计算finish标签的地址,存入%rcx。

    movq  %rcx, 0x40(%rax)

    把%rcx的值存入%rax+64指向的地址。

    finish:
        xorq  %rdi, %rdi
        call  _exit@PLT
        hlt

    xorq就是异或操作, xorq %rdi,%rdi 就是把%rdi寄存器清零。

    后面两句是退出和暂停。


    相关资料

    Swoole协程之旅-后篇-Swoole 官方文档手册-面试哥

    学习笔记 变长栈帧_qq_40065223的博客-CSDN博客

    联系方式:emhhbmdfbGlhbmcxOTkxQDEyNi5jb20=
  • 相关阅读:
    1052. 卖个萌 (20)
    1051. 复数乘法 (15)
    1050. 螺旋矩阵(25)
    1049. 数列的片段和(20)
    1048. 数字加密(20)
    1047. 编程团体赛(20)
    1046. 划拳(15)
    怎么用js代码改变单选框的选中状态
    Dom操作--全选反选
    Scoket简介
  • 原文地址:https://www.cnblogs.com/zl1991/p/15191429.html
Copyright © 2011-2022 走看看