zoukankan      html  css  js  c++  java
  • 【GStreamer开发】GStreamer基础教程08——pipeline的快捷访问

    目标

          GStreamer建立的pipeline不需要完全关闭。有多种方法可以让数据在任何时候送到pipeline中或者从pipeline中取出。本教程会展示:

          如何把外部数据送到pipeline中

          如何把数据从pipeline中取出

          如何操作这些数据


    介绍

          有几种方法可以让应用通过pipeline和数据流交互。本教程讲述了最简单的一种,因为使用了专门为这个而创建的element。

          专门让应用可以往pipeline里面传入数据的element时appsrc,而appsink就正好相反,让应用可以从pipeline中获得数据。为了避免混淆,我们可以这么来理解,appsrc是一个普通的source element,不过它的数据都是来自外太空,而appsink是一个普通的sink element,数据从这里出去的就消失不见了。

          appsrc和appsink用得非常多,所以他们都自己提供API,你只要连接了gstreamer-app库,那么就可以访问到。在本教程里,我们会使用一种简单地方法通过信号来实现。

          appsrc可以有不同的工作模式:在pull模式,在需要时向应用请求数据;在push模式,应用根据自己的节奏把数据推送过来。而且,在push模式,如果已经有了足够的数据,应用可以在push时被阻塞,或者可以经由enough-data和need-data信号来控制。本教程中得例子就采用了这种信号控制的方式,其他没有提及的方法可以在appsrc的文档中查阅。


    Buffers

          通过pipeline传递的大块数据被称为buffers。因为本例子会制造数据同时也消耗数据,所以我们需要了解GstBuffer。

          Source Pads负责制造buffer,这些buffer被sink pad消耗掉。GStreamer在一个个element之间传递这些buffer。

          一个buffer只能简单地描述一小片数据,不要认为我们所有的buffer都是一样大小的。而且,buffer有一个时间戳和有效期,这个就描述了什么时候buffer里的数据需要渲染出来。时间戳是个非常复杂和精深的话题,但目前这个简单地解释也足够了。

          作为一个例子,一个filesrc会提供“ANY”属性的buffers并且没有时间戳信息。在demux(《GStreamer基础教程03——动态pipeline》)之后,buffers会有一些特定的cap了,比如"video/x-h264",在解码后,每一个buffer都会包含一帧有原始caps的视频帧(比如:video/x-raw-yuv),并且有非常明确地时间戳用来指示这一帧在什么时候显示。


    教程

          本教程是上一篇教程(《GStreamer基础教程07——多线程和Pad的有效性》)在两个方面的扩展:第一是用appsrc来取代audiotestsrc来生成音频数据;第二是在tee里新加了一个分支,这样流入audio sink和波形显示的数据同样复制了一份传给appsink。这个appsink就把信息回传给应用,应用就可以通知用户收到了数据或者做其他更复杂的工作。



    一个粗糙的波形发生器

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. #include <gst/gst.h>  
    2. #include <string.h>  
    3.     
    4. #define CHUNK_SIZE 1024   /* Amount of bytes we are sending in each buffer */  
    5. #define SAMPLE_RATE 44100 /* Samples per second we are sending */  
    6. #define AUDIO_CAPS "audio/x-raw-int,channels=1,rate=%d,signed=(boolean)true,width=16,depth=16,endianness=BYTE_ORDER"  
    7.     
    8. /* Structure to contain all our information, so we can pass it to callbacks */  
    9. typedef struct _CustomData {  
    10.   GstElement *pipeline, *app_source, *tee, *audio_queue, *audio_convert1, *audio_resample, *audio_sink;  
    11.   GstElement *video_queue, *audio_convert2, *visual, *video_convert, *video_sink;  
    12.   GstElement *app_queue, *app_sink;  
    13.     
    14.   guint64 num_samples;   /* Number of samples generated so far (for timestamp generation) */  
    15.   gfloat a, b, c, d;     /* For waveform generation */  
    16.     
    17.   guint sourceid;        /* To control the GSource */  
    18.     
    19.   GMainLoop *main_loop;  /* GLib's Main Loop */  
    20. } CustomData;  
    21.     
    22. /* This method is called by the idle GSource in the mainloop, to feed CHUNK_SIZE bytes into appsrc. 
    23.  * The idle handler is added to the mainloop when appsrc requests us to start sending data (need-data signal) 
    24.  * and is removed when appsrc has enough data (enough-data signal). 
    25.  */  
    26. static gboolean push_data (CustomData *data) {  
    27.   GstBuffer *buffer;  
    28.   GstFlowReturn ret;  
    29.   int i;  
    30.   gint16 *raw;  
    31.   gint num_samples = CHUNK_SIZE / 2/* Because each sample is 16 bits */  
    32.   gfloat freq;  
    33.     
    34.   /* Create a new empty buffer */  
    35.   buffer = gst_buffer_new_and_alloc (CHUNK_SIZE);  
    36.     
    37.   /* Set its timestamp and duration */  
    38.   GST_BUFFER_TIMESTAMP (buffer) = gst_util_uint64_scale (data->num_samples, GST_SECOND, SAMPLE_RATE);  
    39.   GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale (CHUNK_SIZE, GST_SECOND, SAMPLE_RATE);  
    40.     
    41.   /* Generate some psychodelic waveforms */  
    42.   raw = (gint16 *)GST_BUFFER_DATA (buffer);  
    43.   data->c += data->d;  
    44.   data->d -= data->c / 1000;  
    45.   freq = 1100 + 11000 * data->d;  
    46.   for (i = 0; i < num_samples; i++) {  
    47.     data->a += data->b;  
    48.     data->b -= data->a / freq;  
    49.     raw[i] = (gint16)(5500 * data->a);  
    50.   }  
    51.   data->num_samples += num_samples;  
    52.     
    53.   /* Push the buffer into the appsrc */  
    54.   g_signal_emit_by_name (data->app_source, "push-buffer", buffer, &ret);  
    55.     
    56.   /* Free the buffer now that we are done with it */  
    57.   gst_buffer_unref (buffer);  
    58.     
    59.   if (ret != GST_FLOW_OK) {  
    60.     /* We got some error, stop sending data */  
    61.     return FALSE;  
    62.   }  
    63.     
    64.   return TRUE;  
    65. }  
    66.     
    67. /* This signal callback triggers when appsrc needs data. Here, we add an idle handler 
    68.  * to the mainloop to start pushing data into the appsrc */  
    69. static void start_feed (GstElement *source, guint size, CustomData *data) {  
    70.   if (data->sourceid == 0) {  
    71.     g_print ("Start feeding ");  
    72.     data->sourceid = g_idle_add ((GSourceFunc) push_data, data);  
    73.   }  
    74. }  
    75.     
    76. /* This callback triggers when appsrc has enough data and we can stop sending. 
    77.  * We remove the idle handler from the mainloop */  
    78. static void stop_feed (GstElement *source, CustomData *data) {  
    79.   if (data->sourceid != 0) {  
    80.     g_print ("Stop feeding ");  
    81.     g_source_remove (data->sourceid);  
    82.     data->sourceid = 0;  
    83.   }  
    84. }  
    85.     
    86. /* The appsink has received a buffer */  
    87. static void new_buffer (GstElement *sink, CustomData *data) {  
    88.   GstBuffer *buffer;  
    89.     
    90.   /* Retrieve the buffer */  
    91.   g_signal_emit_by_name (sink, "pull-buffer", &buffer);  
    92.   if (buffer) {  
    93.     /* The only thing we do in this example is print a * to indicate a received buffer */  
    94.     g_print ("*");  
    95.     gst_buffer_unref (buffer);  
    96.   }  
    97. }  
    98.     
    99. /* This function is called when an error message is posted on the bus */  
    100. static void error_cb (GstBus *bus, GstMessage *msg, CustomData *data) {  
    101.   GError *err;  
    102.   gchar *debug_info;  
    103.     
    104.   /* Print error details on the screen */  
    105.   gst_message_parse_error (msg, &err, &debug_info);  
    106.   g_printerr ("Error received from element %s: %s ", GST_OBJECT_NAME (msg->src), err->message);  
    107.   g_printerr ("Debugging information: %s ", debug_info ? debug_info : "none");  
    108.   g_clear_error (&err);  
    109.   g_free (debug_info);  
    110.     
    111.   g_main_loop_quit (data->main_loop);  
    112. }  
    113.     
    114. int main(int argc, charchar *argv[]) {  
    115.   CustomData data;  
    116.   GstPadTemplate *tee_src_pad_template;  
    117.   GstPad *tee_audio_pad, *tee_video_pad, *tee_app_pad;  
    118.   GstPad *queue_audio_pad, *queue_video_pad, *queue_app_pad;  
    119.   gchar *audio_caps_text;  
    120.   GstCaps *audio_caps;  
    121.   GstBus *bus;  
    122.     
    123.   /* Initialize cumstom data structure */  
    124.   memset (&data, 0sizeof (data));  
    125.   data.b = 1/* For waveform generation */  
    126.   data.d = 1;  
    127.     
    128.   /* Initialize GStreamer */  
    129.   gst_init (&argc, &argv);  
    130.     
    131.   /* Create the elements */  
    132.   data.app_source = gst_element_factory_make ("appsrc""audio_source");  
    133.   data.tee = gst_element_factory_make ("tee""tee");  
    134.   data.audio_queue = gst_element_factory_make ("queue""audio_queue");  
    135.   data.audio_convert1 = gst_element_factory_make ("audioconvert""audio_convert1");  
    136.   data.audio_resample = gst_element_factory_make ("audioresample""audio_resample");  
    137.   data.audio_sink = gst_element_factory_make ("autoaudiosink""audio_sink");  
    138.   data.video_queue = gst_element_factory_make ("queue""video_queue");  
    139.   data.audio_convert2 = gst_element_factory_make ("audioconvert""audio_convert2");  
    140.   data.visual = gst_element_factory_make ("wavescope""visual");  
    141.   data.video_convert = gst_element_factory_make ("ffmpegcolorspace""csp");  
    142.   data.video_sink = gst_element_factory_make ("autovideosink""video_sink");  
    143.   data.app_queue = gst_element_factory_make ("queue""app_queue");  
    144.   data.app_sink = gst_element_factory_make ("appsink""app_sink");  
    145.     
    146.   /* Create the empty pipeline */  
    147.   data.pipeline = gst_pipeline_new ("test-pipeline");  
    148.     
    149.   if (!data.pipeline || !data.app_source || !data.tee || !data.audio_queue || !data.audio_convert1 ||  
    150.       !data.audio_resample || !data.audio_sink || !data.video_queue || !data.audio_convert2 || !data.visual ||  
    151.       !data.video_convert || !data.video_sink || !data.app_queue || !data.app_sink) {  
    152.     g_printerr ("Not all elements could be created. ");  
    153.     return -1;  
    154.   }  
    155.     
    156.   /* Configure wavescope */  
    157.   g_object_set (data.visual"shader"0"style"0NULL);  
    158.     
    159.   /* Configure appsrc */  
    160.   audio_caps_text = g_strdup_printf (AUDIO_CAPS, SAMPLE_RATE);  
    161.   audio_caps = gst_caps_from_string (audio_caps_text);  
    162.   g_object_set (data.app_source"caps", audio_caps, NULL);  
    163.   g_signal_connect (data.app_source"need-data", G_CALLBACK (start_feed), &data);  
    164.   g_signal_connect (data.app_source"enough-data", G_CALLBACK (stop_feed), &data);  
    165.     
    166.   /* Configure appsink */  
    167.   g_object_set (data.app_sink"emit-signals", TRUE, "caps", audio_caps, NULL);  
    168.   g_signal_connect (data.app_sink"new-buffer", G_CALLBACK (new_buffer), &data);  
    169.   gst_caps_unref (audio_caps);  
    170.   g_free (audio_caps_text);  
    171.     
    172.   /* Link all elements that can be automatically linked because they have "Always" pads */  
    173.   gst_bin_add_many (GST_BIN (data.pipeline), data.app_source, data.tee, data.audio_queue, data.audio_convert1, data.audio_resample,   
    174.       data.audio_sink, data.video_queue, data.audio_convert2, data.visual, data.video_convert, data.video_sink, data.app_queue,  
    175.       data.app_sinkNULL);  
    176.   if (gst_element_link_many (data.app_source, data.teeNULL) != TRUE ||  
    177.       gst_element_link_many (data.audio_queue, data.audio_convert1, data.audio_resample, data.audio_sinkNULL) != TRUE ||  
    178.       gst_element_link_many (data.video_queue, data.audio_convert2, data.visual, data.video_convert, data.video_sinkNULL) != TRUE ||  
    179.       gst_element_link_many (data.app_queue, data.app_sinkNULL) != TRUE) {  
    180.     g_printerr ("Elements could not be linked. ");  
    181.     gst_object_unref (data.pipeline);  
    182.     return -1;  
    183.   }  
    184.     
    185.   /* Manually link the Tee, which has "Request" pads */  
    186.   tee_src_pad_template = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (data.tee), "src%d");  
    187.   tee_audio_pad = gst_element_request_pad (data.tee, tee_src_pad_template, NULLNULL);  
    188.   g_print ("Obtained request pad %s for audio branch. ", gst_pad_get_name (tee_audio_pad));  
    189.   queue_audio_pad = gst_element_get_static_pad (data.audio_queue"sink");  
    190.   tee_video_pad = gst_element_request_pad (data.tee, tee_src_pad_template, NULLNULL);  
    191.   g_print ("Obtained request pad %s for video branch. ", gst_pad_get_name (tee_video_pad));  
    192.   queue_video_pad = gst_element_get_static_pad (data.video_queue"sink");  
    193.   tee_app_pad = gst_element_request_pad (data.tee, tee_src_pad_template, NULLNULL);  
    194.   g_print ("Obtained request pad %s for app branch. ", gst_pad_get_name (tee_app_pad));  
    195.   queue_app_pad = gst_element_get_static_pad (data.app_queue"sink");  
    196.   if (gst_pad_link (tee_audio_pad, queue_audio_pad) != GST_PAD_LINK_OK ||  
    197.       gst_pad_link (tee_video_pad, queue_video_pad) != GST_PAD_LINK_OK ||  
    198.       gst_pad_link (tee_app_pad, queue_app_pad) != GST_PAD_LINK_OK) {  
    199.     g_printerr ("Tee could not be linked ");  
    200.     gst_object_unref (data.pipeline);  
    201.     return -1;  
    202.   }  
    203.   gst_object_unref (queue_audio_pad);  
    204.   gst_object_unref (queue_video_pad);  
    205.   gst_object_unref (queue_app_pad);  
    206.     
    207.   /* Instruct the bus to emit signals for each received message, and connect to the interesting signals */  
    208.   bus = gst_element_get_bus (data.pipeline);  
    209.   gst_bus_add_signal_watch (bus);  
    210.   g_signal_connect (G_OBJECT (bus), "message::error", (GCallback)error_cb, &data);  
    211.   gst_object_unref (bus);  
    212.     
    213.   /* Start playing the pipeline */  
    214.   gst_element_set_state (data.pipeline, GST_STATE_PLAYING);  
    215.     
    216.   /* Create a GLib Main Loop and set it to run */  
    217.   data.main_loop = g_main_loop_new (NULL, FALSE);  
    218.   g_main_loop_run (data.main_loop);  
    219.     
    220.   /* Release the request pads from the Tee, and unref them */  
    221.   gst_element_release_request_pad (data.tee, tee_audio_pad);  
    222.   gst_element_release_request_pad (data.tee, tee_video_pad);  
    223.   gst_element_release_request_pad (data.tee, tee_app_pad);  
    224.   gst_object_unref (tee_audio_pad);  
    225.   gst_object_unref (tee_video_pad);  
    226.   gst_object_unref (tee_app_pad);  
    227.     
    228.   /* Free resources */  
    229.   gst_element_set_state (data.pipeline, GST_STATE_NULL);  
    230.   gst_object_unref (data.pipeline);  
    231.   return 0;  
    232. }  

    工作流程

          创建pipeline段的代码就是上一篇的教程中得例子的扩大版。包括初始或所有的element,连接有Always Pad的element然后手动连接tee element的Request Pad。

          下面我们关注一下appsrc和appsink这两个element的配置:

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* Configure appsrc */  
    2. audio_caps_text = g_strdup_printf (AUDIO_CAPS, SAMPLE_RATE);  
    3. audio_caps = gst_caps_from_string (audio_caps_text);  
    4. g_object_set (data.app_source"caps", audio_caps, NULL);  
    5. g_signal_connect (data.app_source"need-data", G_CALLBACK (start_feed), &data);  
    6. g_signal_connect (data.app_source"enough-data", G_CALLBACK (stop_feed), &data);  
          appsrc里面第一个需要关注的属性是caps。它说明了element准备生成的数据的类型,这样GStreamer就可以检查下游的element看看是否支持。这个属性必须是一个GstCaps对象,这个对象可以很方便的由gst_caps_from_string()来生成。

          然后我们把need-data和enough-data信号和回调连接起来,这样在appsrc内部的队列里面数据不足或将要满地时候会发送信号,我们就用这些信号来启动/停止我们的信号发生过程。

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* Configure appsink */  
    2. g_object_set (data.app_sink"emit-signals", TRUE, "caps", audio_caps, NULL);  
    3. g_signal_connect (data.app_sink"new-buffer", G_CALLBACK (new_buffer), &data);  
    4. gst_caps_unref (audio_caps);  
    5. g_free (audio_caps_text);  
          关于appsink的配置,我们连接了new-buffer的信号,这个信号在每次收到buffer的时候发出。当然,这个信号的发出需要emit-signals这个信号属性被开启(默认是关闭的)。

          启动pipeline,等到消息和最后的清理资源都和以前的没什么区别。让我们关注我们刚刚注册的回调吧。

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* This signal callback triggers when appsrc needs data. Here, we add an idle handler 
    2.  * to the mainloop to start pushing data into the appsrc */  
    3. static void start_feed (GstElement *source, guint size, CustomData *data) {  
    4.   if (data->sourceid == 0) {  
    5.     g_print ("Start feeding ");  
    6.     data->sourceid = g_idle_add ((GSourceFunc) push_data, data);  
    7.   }  
    8. }  
          这个函数在appsrc内部队列将要空的时候调用,在这里我们做的事情仅仅是用g_idle_add()方法注册一个GLib的idle函数,这个函数会给appsrc输入数据知道内部队列满为止。一个GLib的idle函数是一个GLib在主循环在“idle”时会调用的方法,也就是说,当时没有更高优先级的任务运行。

          这只是appsrc多种发出数据方法中的一个。特别需要指出的是,buffer不是必须要在主线程中用GLib方法来传递给appsrc的,你也不是一定要用need-data和enough-data信号来同步appsrc的(据说这样最方便)。

          我们记录下g_idle_add()的返回的sourceid,这样后面可以关掉它。

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* This callback triggers when appsrc has enough data and we can stop sending. 
    2.  * We remove the idle handler from the mainloop */  
    3. static void stop_feed (GstElement *source, CustomData *data) {  
    4.   if (data->sourceid != 0) {  
    5.     g_print ("Stop feeding ");  
    6.     g_source_remove (data->sourceid);  
    7.     data->sourceid = 0;  
    8.   }  
    9. }  
          这个函数当appsrc内部的队列满的时候调用,所以我们需要停止发送数据。这里我们简单地用g_source_remove()来把idle函数移走。

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* This method is called by the idle GSource in the mainloop, to feed CHUNK_SIZE bytes into appsrc. 
    2.  * The idle handler is added to the mainloop when appsrc requests us to start sending data (need-data signal) 
    3.  * and is removed when appsrc has enough data (enough-data signal). 
    4.  */  
    5. static gboolean push_data (CustomData *data) {  
    6.   GstBuffer *buffer;  
    7.   GstFlowReturn ret;  
    8.   int i;  
    9.   gint16 *raw;  
    10.   gint num_samples = CHUNK_SIZE / 2/* Because each sample is 16 bits */  
    11.   gfloat freq;  
    12.     
    13.   /* Create a new empty buffer */  
    14.   buffer = gst_buffer_new_and_alloc (CHUNK_SIZE);  
    15.     
    16.   /* Set its timestamp and duration */  
    17.   GST_BUFFER_TIMESTAMP (buffer) = gst_util_uint64_scale (data->num_samples, GST_SECOND, SAMPLE_RATE);  
    18.   GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale (CHUNK_SIZE, GST_SECOND, SAMPLE_RATE);  
    19.     
    20.   /* Generate some psychodelic waveforms */  
    21.   raw = (gint16 *)GST_BUFFER_DATA (buffer);  
          这个函数给appsrc发送数据。它被GLib调用的次数和频率我们不加以控制,但我们会在它任务完成时关闭它(appsrc内部队列满)。

          这里第一步是用gst_buffer_new_and_alloc()方法和给定的大小创建一个新buffer(例子中是1024字节)。

          我们计算我们生成的采样数据的数据量,把数据存在CustomData.num_samples里面,这样我们可以用GstBuffer提供的GST_BUFFER_TIMESTAMP宏来生成buffer的时间戳。

          gst_util_uint64_scale是一个工具函数,用来缩放数据,确保不会溢出。

          这些给buffer的数据可以用GstBuffer提供的GST_BUFFER_DATA宏来访问。

          我们会跳过波形的生成部分,因为这不是本教程要讲述的内容。

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* Push the buffer into the appsrc */  
    2. g_signal_emit_by_name (data->app_source, "push-buffer", buffer, &ret);  
    3.   
    4. /* Free the buffer now that we are done with it */  
    5. gst_buffer_unref (buffer);  
          一旦我们的buffer已经准备好,我们把带着这个buffer的push-buffer信号传给appsrc,然后就调用gst_buffer_unref()方法,因为我们不会再用到它了。

    [objc] view plain copy
     在CODE上查看代码片派生到我的代码片
    1. /* The appsink has received a buffer */  
    2. static void new_buffer (GstElement *sink, CustomData *data) {  
    3.   GstBuffer *buffer;  
    4.     
    5.   /* Retrieve the buffer */  
    6.   g_signal_emit_by_name (sink, "pull-buffer", &buffer);  
    7.   if (buffer) {  
    8.     /* The only thing we do in this example is print a * to indicate a received buffer */  
    9.     g_print ("*");  
    10.     gst_buffer_unref (buffer);  
    11.   }  
    12. }  
          最后,这个函数在appsink收到buffer时被调用。我们使用了pull-buffer的信号来重新获得buffer,因为是例子,所以仅仅在屏幕上打印一些内容。我们可以用GstBuffer的GST_BUFFER_DATA宏来获得数据指针和用GST_BUFFER_SIZE宏来获得数据大小。请记住,这里的buffer不是一定要和我们在push_data函数里面创建的buffer完全一致的,在传输路径上得任何一个element都可能对buffer进行一些改变。(这个例子中仅仅是在appsrc和appsink中间通过一个tee element,所以buffer没有变化)。

        请不要忘记调用gst_buffer_unref()来释放buffer,就讲这么多吧。

  • 相关阅读:
    C# 2008核心编程(20130713)
    java 异常处理机制
    指定节点滚动到屏幕中间的js
    mysql 数据误删恢复
    《How Tomcat works》
    HashMap中 工具方法tableSizeFor的作用
    mysql 是如何保证在高并发的情况下autoincrement关键字修饰的列不会出现重复
    为什么java io流必须得关闭
    下载文件出现内存溢出问题
    当使用junit4 对spring框架中controller/service/mapper各层进行测试时,需要添加的配置
  • 原文地址:https://www.cnblogs.com/huty/p/8517316.html
Copyright © 2011-2022 走看看