zoukankan      html  css  js  c++  java
  • Gstreamer 数据流线程(GstTask / GstTaskPool)分析

    作者:fengcc 原创文章 转载请注明出处


    GStreamer 是一个基于流水线的多媒体框架,基于 GObject,以 C 语言写成。
    凭借 GStreamer,程序员可以很容易地创建各种多媒体功能组件,包括简单的音频回放,音频和视频播放,录音,流媒体和音频编辑。基于流水线设计,可以创建诸如视频编辑器、流媒体广播和媒体播放器等等的很多多媒体应用。


    GstTask/GstTaskPool — streaming threads

    Gstreamer 将GstElementGstPad中关于数据流处理的线程封装成 GstTask,并提供gst_task_start(),gst_task_pause(),gst_task_stop()等接口,使数据流的处理更加方便。例如:GstPad通常会启动一个GstTask从另一个 pad 上拉数据或者推数据到另一个 pad。

    工作模式

    下面根据它提供的几个接口来详细介绍GstTask内部的实现。


    (1)创建任务

    GstTask *
    gst_task_new (GstTaskFunction func,
                  gpointer user_data,
                  GDestroyNotify notify);
    

    创建一个任务,该任务稍后将会在新线程里重复调用func函数,以user_data 为参数。注意,此时还没有创建或者启动一个新的线程。

    GstTaskgst_task_class_init() 函数中会调用gst_task_pool_get_default()函数,该函数代码如下:

    GstTaskPool *
    gst_task_pool_get_default (void)
    {
      static GstTaskPool *pool = NULL;
    
      if (g_once_init_enter (&pool)) { //整个程序周期只会进入一次
        GstTaskPool *_pool = gst_task_pool_new ();
    
        gst_task_pool_prepare (_pool, NULL);
        g_once_init_leave (&pool, _pool);
      }
    
      return gst_object_ref (pool);
    }
    

    它利用g_once_init_enter()/g_once_init_leave()操作来确保整个程序的运行周期中只会创建一个默认GstTaskPool结构,GstTaskPool结构是对 glib 线程池的封装。之后再调用gst_task_pool_prepare()函数,该函数会调用GstTaskPooldefault_prepare()函数来创建一个默认的 glib 线程池,使用的是上面提到的g_thread_pool_new()接口。

    传递给g_thread_pool_new()的第一个参数为default_func函数:

    static void
    default_func (TaskData * tdata, GstTaskPool * pool)
    {
      GstTaskPoolFunction func;
      gpointer user_data;
    
      func = tdata->func;
      user_data = tdata->user_data;
      g_slice_free (TaskData, tdata);
    
      func (user_data);	//运行自定义的任务
    }
    

    它的做法很简单,拆解传进来的参数,调用相应的函数,从而让线程池中的每个线程运行自定义的任务。

    当我们调用gst_task_new()函数创建一个新的任务时,会触发GstTaskgst_task_init()函数,该函数会将GstTask结构体中的GstTaskPool *pool变量指向刚才创建的默认的GstTaskPool。所以,整个程序中所有的GstTask 依赖的都是同一个GstTaskPool结构,即所有的GstTask线程都运行在同一个线程池中。


    (2)启动任务

    gboolean
    gst_task_start (GstTask *task);
    

    或:

    gboolean
    gst_task_set_state (GstTask *task,
                        GstTaskState state);
    

    对于GstTask的任务,有三种状态:

    • GST_TASK_STARTED:任务已经启动,正在相应的线程中运行。
    • GST_TASK_STOPPED:任务已经停止,此时还没有启动相应的线程,或者线程已经运行结束退出。
    • GST_TASK_PAUSED:任务暂停。任务对应的线程没有退出,处于暂停状态,其实就是让线程阻塞在某个条件变量上。

    gst_task_start()实际上就是调用gst_task_set_state (task, GST_TASK_STARTED),将任务设置为开始状态。

    gst_task_set_state()会先记录任务的原始状态为old,再将任务设置为新的状态,然后根据old,执行相关的操作,关键代码如下:

    /* if the state changed, do our thing */
      old = GET_TASK_STATE (task);
      if (old != state) {
        SET_TASK_STATE (task, state);
    
        switch (old) {
          case GST_TASK_STOPPED:
            /* If the task already has a thread scheduled we don't have to do
             * anything. */
            if (G_UNLIKELY (!task->running) &&
                (!task->priv->scheduleable || (task->priv->should_schedule
                        && state == GST_TASK_STARTED)))
              res = start_task (task);
            break;
          case GST_TASK_PAUSED:
            /* when we are paused, signal to go to the new state */
            GST_TASK_SIGNAL (task);
            break;
          case GST_TASK_STARTED:
            /* if we were started, we'll go to the new state after the next
             * iteration. */
            break;
        }
      }
    
    • 如果旧状态是GST_TASK_STOPPED,则新状态肯定是 GST_TASK_STARTED,则调用start_task函数,该函数稍候会解释。
    • 如果旧状态是GST_TASK_PAUSED,则增加相应的条件变量,这样,任务对应的线程就会结束在此条件变量的阻塞,从而完成状态转换。
    • 如果旧状态是GST_TASK_STARTED,这个注释的解释没有看懂,还要继续琢磨一下,抱歉。

    start_task()函数中,以gst_task_func()函数为参数调用gst_task_pool_push()gst_task_pool_push()是对default_push()函数的封装。default_push()函数代码如下:

    static gpointer
    default_push (GstTaskPool * pool, GstTaskPoolFunction func,
        gpointer user_data, GError ** error)
    {
      TaskData *tdata;
    
      tdata = g_slice_new (TaskData);
      tdata->func = func;
      tdata->user_data = user_data;
    
      GST_OBJECT_LOCK (pool);
      if (pool->pool)
        g_thread_pool_push (pool->pool, tdata, error);
      else {
        g_slice_free (TaskData, tdata);
      }
      GST_OBJECT_UNLOCK (pool);
    
      return NULL;
    }
    

    funcuser_data封装成TaskData结构体,作为参数调用上面提到的g_thread_pool_push()函数将任务插入到线程池的任务列表中。TaskData结构体的拆解在上面提到的default_func()函数中,线程池中的每个线程启动时都会调用该函数拆解参数,然后线程便切换到gst_task_func()函数运行。

    注意:这里的funcgst_task_func()函数,user_data是任务对应的GstTask结构体。而使用gst_task_new()创建任务时传入的自定义函数和参数是分别保存在GstTasktask->functask->user_data成员变量中,将会在gst_task_func()中采用task->func (task->user_data)的方式调用。

    最后,每个线程中运行的其实都是gst_task_func()函数,该函数关键代码如下:

    while (G_LIKELY (GET_TASK_STATE (task) != GST_TASK_STOPPED)) {
        GST_OBJECT_LOCK (task);
    
        if (G_UNLIKELY (priv->scheduleable
                && GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
          GST_OBJECT_UNLOCK (task);
          break;
        }
    
        while (G_UNLIKELY (!priv->scheduleable
                && GST_TASK_STATE (task) == GST_TASK_PAUSED)) {
          g_rec_mutex_unlock (lock);
    
          GST_TASK_SIGNAL (task);
          GST_INFO_OBJECT (task, "Task going to paused");
          GST_TASK_WAIT (task);
          GST_INFO_OBJECT (task, "Task resume from paused");
          GST_OBJECT_UNLOCK (task);
          /* locking order.. */
          g_rec_mutex_lock (lock);
          GST_OBJECT_LOCK (task);
        }
    
        if (G_UNLIKELY (GET_TASK_STATE (task) == GST_TASK_STOPPED)) {
          GST_OBJECT_UNLOCK (task);
          break;
        } else {
          GST_OBJECT_UNLOCK (task);
        }
    
        // 调用用户自定义的函数。
        task->func (task->user_data);
    
        if (priv->scheduleable)
          break;
      }
    

    如果任务被设置成GST_TASK_STOPPED状态,则退出循环,结束线程运行。若任务为暂停状态,则在对应的条件变量上阻塞,否则,就一直循环调用用户自定义函数处理数据流。

  • 相关阅读:
    机器学习之朴素贝叶斯法
    机器学习之支持向量机(Support Vector Machine)
    梯度下降(Gradient Descent)
    Error #include nested too deeply
    error while loading shared libraries: libXXX.so.x: cannot open shared object file: No such file or directory .
    ubuntu su Authentication failure
    Log4cplus使用
    passing ‘const ’ as ‘this’ argument of ‘’ discards qualifiers 错误处理
    select()函数 timval问题
    ICE——1.Printer
  • 原文地址:https://www.cnblogs.com/fengcc/p/6130810.html
Copyright © 2011-2022 走看看