zoukankan      html  css  js  c++  java
  • SRS流媒体服务器04 ---- st-thread框架

    1.使用st-thread

    我们用一个简单的demo研究一下st框架。

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include "st.h"
    
    static void *_thread(void *arg) {
        printf("thread: %lu
    ", pthread_self());
        return NULL;
    }
    
    int main(int argc, char *argv[]) {
        if (st_init() < 0) {
            perror("st_init");
            exit(1);
        }
    
        for (unsigned int i=0; i<10; i++) {
            if (st_thread_create(_thread, NULL, 0, 0) == NULL) {
                perror("st_thread_create");
                exit(1);
            }
        }
    
        st_thread_exit(NULL);
        return 0;
    }

    out:

    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184
    thread: 140727908825184

     2.初始化st_init

    这里主要做了几件事:
    1. 选择io复用函数类型
    2. 配置系统相关设置:屏蔽SIGPIPE和配置文件描述符限制
    3. 初始化全局变量_st_this_vp
    4. 创建ide线程
    5. 创建primorial线程
    int st_init(void)
    {
      _st_thread_t *thread;
    
      if (_st_active_count) {
        /* Already initialized */
        return 0;
      }
        // 初始化空闲栈队列
      ST_INIT_CLIST(&_st_free_stacks);
    
      /* We can ignore return value here */
      // 选择使用哪种IO多路复用函数,eg:_st_select_eventsys
      st_set_eventsys(ST_EVENTSYS_DEFAULT);
    
      // 屏蔽SIGPIPE信号和调整文件描述符限制
      pthread_once(&io_once_control, (void (*)(void))_st_io_init);
    
      memset(&_st_this_vp, 0, sizeof(_st_vp_t));
    
      // 线程队列初始化
      ST_INIT_CLIST(&_ST_RUNQ);
      ST_INIT_CLIST(&_ST_IOQ);
      ST_INIT_CLIST(&_ST_ZOMBIEQ);
    #ifdef DEBUG
      ST_INIT_CLIST(&_ST_THREADQ);
    #endif
    
      // io多路复用函数对应的初始化,eg:_st_select_init
      if ((*_st_eventsys->init)() < 0)
        return -1;
    
      // 获取内存页大小,一般4096
      _st_this_vp.pagesize = getpagesize();
      // 获取时间,微妙
      _st_this_vp.last_clock = st_utime();
    
      // 创建ide线程:主要用来处理io事件和定时器
      _st_this_vp.idle_thread = st_thread_create(_st_idle_thread_start,
                             NULL, 0, 0);
      if (!_st_this_vp.idle_thread)
        return -1;
      _st_this_vp.idle_thread->flags = _ST_FL_IDLE_THREAD;
      _st_active_count--;
      // 从run队列移除ide线程:因为只有没有线程调用时,才会调用ide线程
      _ST_DEL_RUNQ(_st_this_vp.idle_thread);
    
      // 初始化primorial线程,primorial线程用来标记系统进程
      thread = (_st_thread_t *) calloc(1, sizeof(_st_thread_t) +
                       (ST_KEYS_MAX * sizeof(void *)));
      if (!thread)
        return -1;
      _st_this_vp.primorial_thread = thread;
      thread->private_data = (void **) (thread + 1);
      thread->state = _ST_ST_RUNNING;
      thread->flags = _ST_FL_PRIMORDIAL;
      // 当前运行的是primorial线程,当primorial线程退出时,整个进程也会终止。
      _ST_SET_CURRENT_THREAD(thread);
      _st_active_count++;
    #ifdef DEBUG
      _ST_ADD_THREADQ(thread);
    #endif
    
      return 0;
    }

    2.1 全局结构 _st_this_vp

    全局的相关信息都保存在_st_this_vp,主要保存了:

    1. primorial线程和系统进程共存,线程退出,进程也就退出了。
    2. idle线程:在没有线程运行会被调度,主要任务是用IO多路复用函数等待IO事件和处理定时器
    3. run线程队列:运行线程,我们的工作线程。
    4. io线程队列:当线程需要等待io事件时,会被放到io线程队列,当等待的io事件发生或者超时和中断时,会从队列移除加入到run队列中。
    5. zombie线程队列:线程结束时,如果设置了joinable,就会被加入到这个队列。
    6. sleep线程:处理定时器,基于二叉树保存定时器
    typedef struct _st_vp {
      _st_thread_t *primorial_thread;
      _st_thread_t *idle_thread;  /* Idle thread for this vp */
      st_utime_t last_clock;      /* The last time we went into vp_check_clock() */
    
      _st_clist_t run_q;          /* run queue for this vp */
      _st_clist_t io_q;           /* io queue for this vp */
      _st_clist_t zombie_q;       /* zombie queue for this vp */
    #ifdef DEBUG
      _st_clist_t thread_q;       /* all threads of this vp */
    #endif
      int pagesize;
    
      _st_thread_t *sleep_q;      /* sleep queue for this vp */
      int sleepq_size;          /* number of threads on sleep queue */
    
    #ifdef ST_SWITCH_CB
      st_switch_cb_t switch_out_cb;    /* called when a thread is switched out */
      st_switch_cb_t switch_in_cb;    /* called when a thread is switched in */
    #endif
    } _st_vp_t;

    2.2 线程结构 _st_thread_t

    线程的相关信息都保存在_st_thread,主要保存了:

    1. 线程状态
    2. 线程启动函数
    3. 线程所在的队列:run sleep和zombie等队列
    4. 线程堆栈:
    5. 线程上下文 :jmp_buf,线程的跳转主要通过setjmp/longjmp跳转,jmp_buf保存了线程的上下文信息
    struct _st_thread {
      int state;                  /* 线程状态 */
      int flags;                  /* Thread's flags */
    
      void *(*start)(void *arg);  /* 线程启动函数 */
      void *arg;                  /* 线程启动参数 */
      void *retval;               /* 线程启动函数返回值 */
    
      _st_stack_t *stack;           /* 记录堆栈信息 */
    
      _st_clist_t links;          /* 用于插入到 run/sleep/zombie 线程队列 */
      _st_clist_t wait_links;     /* 用于插入到 mutex/condvar 等待队列 */
    #ifdef DEBUG
      _st_clist_t tlink;          /* For putting on thread queue */
    #endif
    
      st_utime_t due;             /* Wakeup time when thread is sleeping */
      _st_thread_t *left;         /* 记录sleep定时器二叉树左节点 */
      _st_thread_t *right;          /* 记录sleep定时器二叉树右节点 */
      int heap_index;             /* 堆节点 */
    
      void **private_data;        /* 线程私有数据 */
    
      _st_cond_t *term;           /* joinable类型的线程 */
    
      jmp_buf context;            /* 线程上下文:线程的跳转主要通过setjmp/longjmp跳转,jmp_buf保存了线程的上下文信息 */
    };

    2.3 线程状态

    // 线程状态
    #define _ST_ST_RUNNING      0 // 执行中
    #define _ST_ST_RUNNABLE     1 // 可执行状态,等待调度
    #define _ST_ST_IO_WAIT      2 // 等待IO事件
    #define _ST_ST_LOCK_WAIT    3 // 等待互斥锁
    #define _ST_ST_COND_WAIT    4 // 等待条件变量
    #define _ST_ST_SLEEPING     5 // sleep
    #define _ST_ST_ZOMBIE       6 // 线程已结束,待其它线程调用st_thread_join收尸
    #define _ST_ST_SUSPENDED    7 // 暂停,只能调用st_thread_interrupt唤醒
    
    // 线程flag
    #define _ST_FL_PRIMORDIAL   0x01 // 原生线程,非创建的,没有分配私有栈
    #define _ST_FL_IDLE_THREAD  0x02 // 空闲线程,用于epoll,处理定时器
    #define _ST_FL_ON_SLEEPQ    0x04 // 线程sleep中,如调用st_usleep、st_cond_timedwait等
    #define _ST_FL_INTERRUPT    0x08 // 线程被st_thread_interrupt()中断
    #define _ST_FL_TIMEDOUT     0x10 // 定时器超时

    线程状态转换:

    3.创建线程 st_thread_create

    _st_thread_t *st_thread_create(void *(*start)(void *arg), void *arg,
                       int joinable, int stk_size)
    {
      _st_thread_t *thread;
      _st_stack_t *stack;
      void **ptds;
      char *sp;
    
      // 1.创建线程栈
      if (stk_size == 0)
        stk_size = ST_DEFAULT_STACK_SIZE;
      // 页对齐 eg: 64 * 1024 = 65536
      stk_size = ((stk_size + _ST_PAGE_SIZE - 1) / _ST_PAGE_SIZE) * _ST_PAGE_SIZE;
      stack = _st_stack_new(stk_size);
      if (!stack)
        return NULL;
    
      /* Allocate thread object and per-thread data off the stack */
      // 2.分配线程对象
      sp = stack->stk_top;
    
      sp = sp - (ST_KEYS_MAX * sizeof(void *));
      ptds = (void **) sp;
      sp = sp - sizeof(_st_thread_t);
      thread = (_st_thread_t *) sp;
    
      // 栈指针64位对齐
      if ((unsigned long)sp & 0x3f)
        sp = sp - ((unsigned long)sp & 0x3f);
      stack->sp = sp - _ST_STACK_PAD_SIZE;
    
      memset(thread, 0, sizeof(_st_thread_t));
      memset(ptds, 0, ST_KEYS_MAX * sizeof(void *));
    
      // 3.初始化线程
      thread->private_data = ptds;
      thread->stack = stack;
      thread->start = start;
      thread->arg = arg;
    
      // 4.初始化线程上下文
    #ifndef __ia64__
      _ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);
    #else
      _ST_INIT_CONTEXT(thread, stack->sp, stack->bsp, _st_thread_main);
    #endif
    
      /* If thread is joinable, allocate a termination condition variable */
      if (joinable) {
        thread->term = st_cond_new();
        if (thread->term == NULL) {
          _st_stack_free(thread->stack);
          return NULL;
        }
      }
    
      // 5.设置线程状态
      thread->state = _ST_ST_RUNNABLE;
      _st_active_count++;
      _ST_ADD_RUNQ(thread);
    #ifdef DEBUG
      _ST_ADD_THREADQ(thread);
    #endif
    
    #ifndef NVALGRIND
      thread->stack->valgrind_stack_id =
        VALGRIND_STACK_REGISTER(thread->stack->stk_top, thread->stack->stk_bottom);
    #endif
    
      return thread;
    }

    3.1线程堆栈 _st_stack_t

    typedef struct _st_stack {
      _st_clist_t links;          /* 空闲栈链表 */
      char *vaddr;                /* 内存分配的起始位置 */
      int  vaddr_size;            /* 栈 总大小 */
      int  stk_size;              /* 栈 可用部分大小 */
      char *stk_bottom;           /* 私有栈 结束位置 */
      char *stk_top;              /* 私有栈 起始位置 */
      void *sp;                   /* 栈指针 */
    #ifdef __ia64__
      void *bsp;                  /* Register stack backing store pointer */
    #endif
    #ifndef NVALGRIND
      /* id returned by VALGRIND_STACK_REGISTER */
      /* http://valgrind.org/docs/manual/manual-core-adv.html */
      unsigned int valgrind_stack_id;
    #endif
    } _st_stack_t;

    线程上下文 jmp_buf

    /* Calling environment, plus possibly a saved signal mask.  */
    struct __jmp_buf_tag
      {
        /* NOTE: The machine-dependent definitions of `__sigsetjmp'
           assume that a `jmp_buf' begins with a `__jmp_buf' and that
           `__mask_was_saved' follows it.  Do not move these members
           or add others before it.  */
        __jmp_buf __jmpbuf;         /* Calling environment.  */
        int __mask_was_saved;       /* Saved the signal mask?  */
        __sigset_t __saved_mask;    /* Saved signal mask.  */
      };
    
    
    __BEGIN_NAMESPACE_STD
    
    typedef struct __jmp_buf_tag jmp_buf[1];

    栈创建

    _st_stack_t *_st_stack_new(int stack_size)
    {
      _st_clist_t *qp;
      _st_stack_t *ts;
      int extra;
      // 如果_st_free_stacks非空,则从里面取
      for (qp = _st_free_stacks.next; qp != &_st_free_stacks; qp = qp->next) {
        ts = _ST_THREAD_STACK_PTR(qp);
        if (ts->stk_size >= stack_size) {
          /* Found a stack that is big enough */
          ST_REMOVE_LINK(&ts->links);
          _st_num_free_stacks--;
          ts->links.next = NULL;
          ts->links.prev = NULL;
          return ts;
        }
      }
    
      /* Make a new thread stack object. */
      if ((ts = (_st_stack_t *)calloc(1, sizeof(_st_stack_t))) == NULL)
        return NULL;
      // eg: _st_randomize_stacks:0 extra:0
      extra = _st_randomize_stacks ? _ST_PAGE_SIZE : 0;
      // eg: stack_size:65536 REDZONE:pagesize(4096)*2=8192
      //     vaddr_size = 16 * 4096 + 2*4096 = 18 *4096
      ts->vaddr_size = stack_size + 2*REDZONE + extra;
      // 申请内存,返回起始地址
      ts->vaddr = _st_new_stk_segment(ts->vaddr_size);
      if (!ts->vaddr) {
        free(ts);
        return NULL;
      }
      // 栈总大小
      ts->stk_size = stack_size;
      //
      ts->stk_bottom = ts->vaddr + REDZONE;
      ts->stk_top = ts->stk_bottom + stack_size;
    
    #ifdef DEBUG
      mprotect(ts->vaddr, REDZONE, PROT_NONE);
      mprotect(ts->stk_top + extra, REDZONE, PROT_NONE);
    #endif
    
      if (extra) {
        long offset = (random() % extra) & ~0xf;
    
        ts->stk_bottom += offset;
        ts->stk_top += offset;
      }
    
      return ts;
    }

    到此,我们可以分析出,栈空间布局:

    4.线程调度

     4.1 setjmp和longjmp函数

    这里线程的调度是基于 setjmp和longjmp,和goto类似,不过goto是函数内部跳转,这个跳转范围更大。所以讲解一下用法,

    #include <setjmp.h>
    int setjmp(jmp_buf  env);

    返回值:若直接调用则返回0,若从longjmp调用返回则返回非0值的longjmp中的val值

    void longjmp(jmp_buf env,int val);

    调用此函数则返回到语句setjmp所在的地方,其中env 就是setjmp中的 env,而val 则是使setjmp的返回值变为val。
    当检查到一个错误时,则以两个参数调用longjmp函数,第一个就是在调用setjmp时所用的env,第二个参数是具有非0值的val,它将成为从setjmp处返回的值。
    使用第二个参数的原因是对于一个setjmp可以有多个longjmp。

    我们看个demo

    #include <stdio.h>
    #include <setjmp.h>
     
    static jmp_buf buf;
     
    void second(void) {
        printf("second
    ");         // 打印
        longjmp(buf,1);             // 跳回setjmp的调用处 - 使得setjmp返回值为1
    }
     
    void first(void) {
        second();
        printf("first
    ");          // 不可能执行到此行
    }
     
    int main() {   
        if ( ! setjmp(buf) ) {
            first();                // 进入此行前,setjmp返回0
        } else {                    // 当longjmp跳转回,setjmp返回1,因此进入此行
            printf("main
    ");       // 打印
        }
     
        return 0;
    }

    out:

    second
    main

    注意到虽然first()子程序被调用,"first"不可能被打印。"main"被打印,因为条件语句if ( ! setjmp(buf) )被执行第二次。 

    使用setjmp和longjmp要注意以下几点:

    1、setjmp与longjmp结合使用时,它们必须有严格的先后执行顺序,也即先调用setjmp函数,之后再调用longjmp函数,以恢复到先前被保存的“程序执行点”。否则,如果在setjmp调用之前,执行longjmp函数,将导致程序的执行流变的不可预测,很容易导致程序崩溃而退出

    2.  longjmp必须在setjmp调用之后,而且longjmp必须在setjmp的作用域之内。具体来说,在一个函数中使用setjmp来初始化一个全局标号,然后只要该函数未曾返回,那么在其它任何地方都可以通过longjmp调用来跳转到 setjmp的下一条语句执行。实际上setjmp函数将发生调用处的局部环境保存在了一个jmp_buf的结构当中,只要主调函数中对应的内存未曾释放 (函数返回时局部内存就失效了),那么在调用longjmp的时候就可以根据已保存的jmp_buf参数恢复到setjmp的地方执行。

     4.2 线程上下文初始化

    可以看到线程上下文对象就是jmp_buf,我们看看怎么初始化的。

    _ST_INIT_CONTEXT(thread, stack->sp, _st_thread_main);

    展开如下:

    {
        if (setjmp(thread->context))
            _st_thread_main();
        thread->context[0].__jmpbuf[JB_RSP]
    }

     和上面的demo是不是很像,这里主要设置了setjmp返回0,那什么时候进入_st_thread_main呢?答案就在st_thread_exit

     4.3 线程退出st_thread_exit

    void st_thread_exit(void *retval)
    {
      _st_thread_t *thread = _ST_CURRENT_THREAD();
    
      printf("st_thread_exit current thread: %p
    ", thread);
      
      thread->retval = retval;
      _st_thread_cleanup(thread);
      _st_active_count--;
      if (thread->term) {
        /* Put thread on the zombie queue */
        thread->state = _ST_ST_ZOMBIE;
        _ST_ADD_ZOMBIEQ(thread);
    
        /* Notify on our termination condition variable */
        st_cond_signal(thread->term);
    
        /* Switch context and come back later */
        _ST_SWITCH_CONTEXT(thread);
    
        /* Continue the cleanup */
        st_cond_destroy(thread->term);
        thread->term = NULL;
      }
    
    #ifdef DEBUG
      _ST_DEL_THREADQ(thread);
    #endif
    
    #ifndef NVALGRIND
      if (!(thread->flags & _ST_FL_PRIMORDIAL)) {
        VALGRIND_STACK_DEREGISTER(thread->stack->valgrind_stack_id);
      }
    #endif
    
      if (!(thread->flags & _ST_FL_PRIMORDIAL)) {
        _st_stack_free(thread->stack);
      }
    
      // 启动线程调度
      _ST_SWITCH_CONTEXT(thread);
      free(thread);
      (*_st_eventsys->free)();
    }

     这里主要就是干了两件件事:

    1. 退出当前线程
    2. 线程调度:_ST_SWITCH_CONTEXT

    我们展开看一下_ST_SWITCH_CONTEXT,代码如下:

    if (!setjmp(_thread->context)) {
        _st_vp_schedule();
    }

    这里设置了当前线程的jmp_buf ,返回0,启动调度_st_vp_schedule

    void _st_vp_schedule(void)
    {
      _st_thread_t *thread;
      // 查找RUN队列,空就调度idle_thread
      if (_ST_RUNQ.next != &_ST_RUNQ) {
        // 从run队列取出next线程
        thread = _ST_THREAD_PTR(_ST_RUNQ.next);
        _ST_DEL_RUNQ(thread);
      } else {
        // 如果空就切换到idle线程
        thread = _st_this_vp.idle_thread;
      }
      ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
    
      // 切换到thread线程
      thread->state = _ST_ST_RUNNING;
      _ST_RESTORE_CONTEXT(thread);
    }

    这里展开一下_ST_RESTORE_CONTEXT

    _st_this_thread = (_thread)
    longjmp((_thread)->context, 1)

    就是设置了一下当前线程,然后通过longjmp跳转到_st_thread_main,执行线程的启动函数,到这里调度的基本原理就清楚了。

    总结一下:

    1. 每个线程通过jmp_buf 保存信息,然后把自己切出去,后面调度的时候,再通过longjmp跳转回来。
    2. 调度是非抢占式的,需要用户自己把握,如果,某个线程长时间占用CPU,别的线程就无法被调度。
    3. 所有的IO和sleep操作需要用st自己的接口,不然无法调度。

    参考:

      1.https://cloud.tencent.com/developer/article/1197338

  • 相关阅读:
    2019学期第十周编程总结
    2019学期第九周编程总结
    第七次作业
    第六次作业
    第五次作业
    jsp第四次作业
    3.10
    3.4
    3.3jsp作业
    最后一次安卓作业
  • 原文地址:https://www.cnblogs.com/vczf/p/14556736.html
Copyright © 2011-2022 走看看