zoukankan      html  css  js  c++  java
  • libdvbpsi源码分析(二)main函数

            从demon的dvbinfo.c中的main函数入口分析: 
            为了分析方便,此处将宏HAVE_SYS_SOCKET_H隔离的socket代码去掉,只关注libdvbpsi本身的实现。 

        1.数据结构的设计: 
    1.1、捕获器capture的数据结构设计如下:

    typedef struct dvbinfo_capture_s
    {
    fifo_t *fifo;
    fifo_t *empty;

    pthread_mutex_t lock;
    pthread_cond_t fifo_full;
    bool b_fifo_full;

    size_t size; /* prefered capture size */

    params_t *params;
    bool b_alive;
    } dvbinfo_capture_t;

    由数据结构可知,由于demo是本地的autotest,所以用有名管道fifo作测试。从命令行终端读入ts流文件,一个
    线程不断将buffer中的数据push进入fifo,同时主线程不断从fifo pop出数据并解析,直到从fifo中读出的数据为0

    1.2、dvbinfo_capture_t中的params_t 结构体

    typedef struct params_s
    {
    /* parameters */
    char *output;
    char *input;

    int port;
    char *mcast_interface;

    bool b_udp;
    bool b_tcp;
    bool b_file;

    /* */
    int fd_in;
    int fd_out;

    int debug;
    bool b_verbose;
    bool b_monitor; /* run in daemon mode */

    /* statistics */
    bool b_summary; /* write summary */
    struct summary_s {
    int mode; /* one of: i_summary_mode */
    int64_t period; /* summary period in ms */
    char *file; /* summary file name */
    FILE *fd; /* summary file descriptor */
    } summary;

    /* read data from file of socket */
    ssize_t (*pf_read)(int fd, void *buf, size_t count);
    ssize_t (*pf_write)(int fd, const void *buf, size_t count);
    } params_t;

    2.main函数入口

    int main(int argc, char **pp_argv)
    {
    dvbinfo_capture_t capture;
    params_t *param = NULL;
    char c;

    if (argc == 1)
    usage();

    param = params_init();
    capture.params = param;
    capture.fifo = fifo_new();
    capture.empty = fifo_new();
    capture.b_fifo_full = false;
    pthread_mutex_init(&capture.lock, NULL); //锁的初始化
    pthread_cond_init(&capture.fifo_full, NULL); //条件变量初始化

    static const struct option long_options[] =
    {
    { "debug", required_argument, NULL, 'd' },
    { "help", no_argument, NULL, 'h' },
    /* - inputs - */
    { "file", required_argument, NULL, 'f' },
    { NULL, 0, NULL, 0 }
    };
    /*
    解析命令行参数
    */
    while ((c = getopt_long(argc, pp_argv, "d:f:h", long_options, NULL)) != -1)
    {
    switch(c)
    {
    case 'd':
    if (optarg)
    {
    param->debug = 0;
    if (strncmp(optarg, "error", 5) == 0)
    param->debug = 1;
    else if (strncmp(optarg, "warn", 4) == 0)
    param->debug = 2;
    else if (strncmp(optarg, "debug", 5) == 0)
    param->debug = 3;
    }
    break;

    case 'f':
    if (optarg)
    {
    /*读入ts流文件,注意asprintf的用法*/
    if (asprintf(&param->input, "%s", optarg) < 0)
    {
    fprintf(stderr, "error: out of memory\n");
    params_free(param);
    usage();
    }
    /* */
    param->pf_read = read;
    param->b_file = true;
    }
    break;

    case ':':
    fprintf(stderr, "Option %c is missing arguments\n", c);
    params_free(param);
    exit(EXIT_FAILURE);
    break;

    case '?':
    fprintf(stderr, "Unknown option %c found\n", c);
    params_free(param);
    exit(EXIT_FAILURE);
    break;

    case 'h':
    default:
    params_free(param);
    usage();
    break;
    }
    };

    if (param->input == NULL)
    {
    libdvbpsi_log(param, DVBINFO_LOG_ERROR, "No source given\n");
    params_free(param);
    usage(); /* exits application */
    }

    {
    capture.size = 188; //ts packet的大小固定为188字节
    libdvbpsi_log(param, DVBINFO_LOG_INFO, "Examining: %s\n",
    param->input);
    }

    /* Capture thread */
    dvbinfo_open(param); //打开ts流文件,获取文件描述符
    pthread_t handle; //线程pid
    capture.b_alive = true;
    /*创建capture的线程*/
    if (pthread_create(&handle, NULL, dvbinfo_capture, (void *)&capture) < 0)
    {
    libdvbpsi_log(param, DVBINFO_LOG_ERROR, "failed creating thread\n");
    dvbinfo_close(param);

    params_free(param);
    exit(EXIT_FAILURE);
    }
    /*从fifo中不断读数据,同时解析PSI表*/
    int err = dvbinfo_process(&capture);
    /*如果fifo中读取数据为0,表示ts流文件解析完毕。将cpature的alive标志位置为false,并终止线程*/
    capture.b_alive = false; /* stop thread */
    if (pthread_join(handle, NULL) < 0)
    libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error joining capture thread\n");
    dvbinfo_close(param);

    /* cleanup */
    fifo_wake((&capture)->fifo);
    fifo_wake((&capture)->empty);

    fifo_free((&capture)->fifo);
    fifo_free((&capture)->empty);

    pthread_mutex_destroy(&capture.lock);
    pthread_cond_destroy(&capture.fifo_full);

    params_free(param);

    if (err < 0)
    exit(EXIT_FAILURE);
    else
    exit(EXIT_SUCCESS);
    }

    3.线程实例(routine)dvbinfo_capture的分析
    【注解】线程dvbinfo_capture不断地将ts流文件push进fifo, 同时并发地,dvbinfo_process不断地从fifo中取出数据并提供给每个specific decoder进行section的解析,完成table表的重建。直到线程dvbinfo_capture将ts流文件读取完毕,并且dvbinfo_process处理完成后,才将capture的活动标志位capture.b_alive置为false,最后终止线程并释放线程资源。典型的“生产者——消费者”模式。

    static void *dvbinfo_capture(void *data)
    {
    dvbinfo_capture_t *capture = (dvbinfo_capture_t *)data;
    const params_t *param = capture->params;
    bool b_eof = false;

    while (capture->b_alive && !b_eof)
    {
    buffer_t *buffer;

    if (fifo_count(capture->empty) == 0)
    buffer = buffer_new(capture->size);
    else
    buffer = fifo_pop(capture->empty);

    if (buffer == NULL) /* out of memory */
    break;

    ssize_t size = param->pf_read(param->fd_in, buffer->p_data, buffer->i_size);
    if (size < 0) /* short read ? */
    {
    fifo_push(capture->empty, buffer);
    continue;
    }
    else if (size == 0)
    {
    fifo_push(capture->empty, buffer);
    b_eof = true;
    continue;
    }

    buffer->i_date = mdate();

    /* check fifo size */
    if (fifo_size(capture->fifo) >= FIFO_THRESHOLD_SIZE)
    {
    pthread_mutex_lock(&capture->lock);
    capture->b_fifo_full = true;
    pthread_mutex_unlock(&capture->lock);

    if (param->b_file)
    {
    /* wait till buffer becomes smaller again */
    pthread_mutex_lock(&capture->lock);
    while(capture->b_fifo_full)
    pthread_cond_wait(&capture->fifo_full, &capture->lock);
    pthread_mutex_unlock(&capture->lock);
    }
    else
    {
    libdvbpsi_log(capture->params, DVBINFO_LOG_ERROR,
    "error fifo full discarding buffer");
    fifo_push(capture->empty, buffer);
    continue;
    }
    }

    /* store buffer */
    fifo_push(capture->fifo, buffer);
    buffer = NULL;
    }

    capture->b_alive = false;
    fifo_wake(capture->fifo);
    return NULL;
    }

    4、dvbinfo_process负责ts流中各种table的解析

    static int dvbinfo_process(dvbinfo_capture_t *capture)
    {
    int err = -1;
    bool b_error = false;
    params_t *param = capture->params;
    buffer_t *buffer = NULL;

    char *psz_temp = NULL;
    mtime_t deadline = 0;
    if (param->b_summary)
    {
    if (asprintf(&psz_temp, "%s.part", param->summary.file) < 0)
    {
    libdvbpsi_log(param, DVBINFO_LOG_ERROR, "Could not create temporary summary file %s\n",
    param->summary.file);
    return err;
    }
    deadline = mdate() + param->summary.period;
    }

    /*MPEG-TS PSI decoders create*/
    ts_stream_t *stream = libdvbpsi_init(param->debug, &libdvbpsi_log, (void *)param);
    if (!stream)
    goto out;

    while (!b_error)
    {
    /* Wait till fifo has emptied */
    if (!capture->b_alive && (fifo_count(capture->fifo) == 0))
    break;

    /* Wait for data to arrive */
    buffer = fifo_pop(capture->fifo);
    if (buffer == NULL)
    continue;

    if (param->output)
    {
    size_t size = param->pf_write(param->fd_out, buffer->p_data, buffer->i_size);
    if (size < 0) /* error writing */
    {
    libdvbpsi_log(param, DVBINFO_LOG_ERROR,
    "error (%d) writting to %s", errno, param->output);
    break;
    }
    else if (size < buffer->i_size) /* short writting disk full? */
    {
    libdvbpsi_log(param, DVBINFO_LOG_ERROR,
    "error writting to %s (disk full?)", param->output);
    break;
    }
    }

    if (!libdvbpsi_process(stream, buffer->p_data, buffer->i_size, buffer->i_date))
    b_error = true;

    /* summary statistics */
    if (param->b_summary)
    {
    if (mdate() >= deadline)
    {
    FILE *fd = fopen(psz_temp, "w+");
    if (fd)
    {
    libdvbpsi_summary(fd, stream, param->summary.mode);
    fflush(fd);
    fclose(fd);
    unlink(param->summary.file);
    rename(psz_temp, param->summary.file);
    }
    else
    {
    libdvbpsi_log(param, DVBINFO_LOG_ERROR,
    "failed opening summary file (disabling summary logging)\n");
    param->b_summary = false;
    }
    deadline = mdate() + param->summary.period;
    }
    }

    /* reuse buffer */
    fifo_push(capture->empty, buffer);
    buffer = NULL;

    /* check fifo size */
    if (fifo_size(capture->fifo) < FIFO_THRESHOLD_SIZE)
    {
    pthread_mutex_lock(&capture->lock);
    capture->b_fifo_full = false;
    pthread_cond_signal(&capture->fifo_full);
    pthread_mutex_unlock(&capture->lock);
    }
    }

    assert(fifo_count(capture->fifo) == 0);
    libdvbpsi_exit(stream);
    err = 0;

    out:
    if (b_error)
    libdvbpsi_log(param, DVBINFO_LOG_ERROR, "error while processing\n" );

    if (buffer) buffer_free(buffer);
    free(psz_temp);
    return err;
    }

  • 相关阅读:
    systemd 介绍
    Goland读取配置文件--viper包
    Goland日志记录
    进程查询端口占用
    Python zip() 函数
    Jenkins权限管理(角色权限)
    Django ORM查询总结
    如何查看windows计算机重启记录
    Django ORM迁移
    Django中文乱码解决
  • 原文地址:https://www.cnblogs.com/chunqiao/p/3793711.html
Copyright © 2011-2022 走看看