zoukankan      html  css  js  c++  java
  • Nginx-rtmp之 ngx_rtmp_send.c 文件分析

    1. 简述

    1.1 RTMP 消息类型

    /* RTMP message types */
    #define NGX_RTMP_MSG_CHUNK_SIZE         1
    #define NGX_RTMP_MSG_ABORT              2
    #define NGX_RTMP_MSG_ACK                3
    #define NGX_RTMP_MSG_USER               4
    #define NGX_RTMP_MSG_ACK_SIZE           5
    #define NGX_RTMP_MSG_BANDWIDTH          6
    #define NGX_RTMP_MSG_EDGE               7
    #define NGX_RTMP_MSG_AUDIO              8
    #define NGX_RTMP_MSG_VIDEO              9
    #define NGX_RTMP_MSG_AMF3_META          15
    #define NGX_RTMP_MSG_AMF3_SHARED        16
    #define NGX_RTMP_MSG_AMF3_CMD           17
    #define NGX_RTMP_MSG_AMF_META           18
    #define NGX_RTMP_MSG_AMF_SHARED         19
    #define NGX_RTMP_MSG_AMF_CMD            20
    #define NGX_RTMP_MSG_AGGREGATE          22
    #define NGX_RTMP_MSG_MAX                22
    

    1.2 RMTP control message types

    #define NGX_RTMP_USER_STREAM_BEGIN      0
    #define NGX_RTMP_USER_STREAM_EOF        1
    #define NGX_RTMP_USER_STREAM_DRY        2
    #define NGX_RTMP_USER_SET_BUFLEN        3
    #define NGX_RTMP_USER_RECORDED          4
    #define NGX_RTMP_USER_PING_REQUEST      6
    #define NGX_RTMP_USER_PING_RESPONSE     7
    #define NGX_RTMP_USER_UNKNOWN           8
    #define NGX_RTMP_USER_BUFFER_END        31
    

    2. 源码分析

    2.1 ngx_rtmp_send_ack_size:发送 ack_size 包

    ngx_int_t ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_ack_size(s, ack_size));
    }
    
    send ack_size == 5000000

    2.2 ngx_rtmp_create_ack_size

    ngx_chain_t *ngx_rtmp_create_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size)
    {
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: ack_size=%uD", ack_size);
    
        {
            NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK_SIZE);
    
            NGX_RTMP_USER_OUT4(ack_size);
    
            NGX_RTMP_USER_END(s);
        }
    }
    

    2.2.1 NGX_RTMP_USER_START

    #define NGX_RTMP_USER_START(s, tp)                                          
        ngx_rtmp_header_t               __h;                                    
        ngx_chain_t                    *__l;                                    
        ngx_buf_t                      *__b;                                    
        ngx_rtmp_core_srv_conf_t       *__cscf;                                 
                                                                                
        __cscf = ngx_rtmp_get_module_srv_conf(                                  
                s, ngx_rtmp_core_module);                                       
        /* 初始化 RTMP 消息的头部 */
        memset(&__h, 0, sizeof(__h));                                           
        __h.type = tp;                                                          
        /* csid 为 2 表明该chunk是控制信息和一些命令信息 */
        __h.csid = 2;                                                           
        __l = ngx_rtmp_alloc_shared_buf(__cscf);                                
        if (__l == NULL) {                                                      
            return NULL;                                                        
        }                                                                       
        __b = __l->buf;
    

    该宏主要是初始化 RTMP 消息头,并为 ngx_chain_t 结构体指针分配内存,用来存储将要发送的数据。

    ngx_rtmp_alloc_shared_buf: 分配一个 shared 的 ngx_chain_t 结构体内存

    ngx_chain_t *ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf)
    {
        u_char                     *p;
        ngx_chain_t                *out;
        ngx_buf_t                  *b;
        size_t                      size;
    
        /* 若 free 有空闲的 ngx_chain_t,则直接从 free 指向的链表头中取 */
        if (cscf->free) {
            out = cscf->free;
            cscf->free = out->next;
    
        } else {
    
            /* 计算一个实际的 rtmp 块的最大值 */
            size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
    
            /* 从内存池中分配一块连续的内存 */
            p = ngx_pcalloc(cscf->pool, NGX_RTMP_REFCOUNT_BYTES
                    + sizeof(ngx_chain_t)
                    + sizeof(ngx_buf_t)
                    + size);
            if (p == NULL) {
                return NULL;
            }
    
            /* 这块内存的开始 4 bytes 是用于保存这块 shared 内存的引用计数值的 */
            p += NGX_RTMP_REFCOUNT_BYTES;
            out = (ngx_chain_t *)p;
    
            p += sizeof(ngx_chain_t);
            out->buf = (ngx_buf_t *)p;
    
            p += sizeof(ngx_buf_t);
            out->buf->start = p;
            out->buf->end = p + size;
        }
    
        out->next = NULL;
        b = out->buf;
        /* 由于分配好内存后,下一步操作是直接向这块内存写入 rtmp 的 chunk data,
         * 即跳过了 rtmp 的头部,暂时先写入实际数据,因此,这里需要为 rtmp 的头部
         * 预留足够的内存,这里为NGX_RTMP_MAX_CHUNK_HEADER */
        b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
        b->memory = 1;
    
        /* 这里引用计数最初置为 1 */
        /* buffer has refcount =1 when created! */
        ngx_rtmp_ref_set(out, 1);
    
        return out;
    }
    

    2.2.2 NGX_RTMP_USER_OUT4

    #define NGX_RTMP_USER_OUT4(v)                                               
        *(__b->last++) = ((u_char*)&v)[3];                                      
        *(__b->last++) = ((u_char*)&v)[2];                                      
        *(__b->last++) = ((u_char*)&v)[1];                                      
        *(__b->last++) = ((u_char*)&v)[0];
    

    该宏主要是将 v 的值转换为大端字节序放置到通过 NGX_RTMP_USER_START 宏分配好缓存的内存中。

    2.2.3 NGX_RTMP_USER_END

    #define NGX_RTMP_USER_END(s)                                                
        ngx_rtmp_prepare_message(s, &__h, NULL, __l);                           
        return __l;
    

    2.2.4 ngx_rtmp_prepare_message

    void ngx_rtmp_prepare_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
            ngx_rtmp_header_t *lh, ngx_chain_t *out)
    {
        ngx_chain_t                *l;
        u_char                     *p, *pp;
        ngx_int_t                   hsize, thsize, nbufs;
        uint32_t                    mlen, timestamp, ext_timestamp;
        static uint8_t              hdrsize[] = { 12, 8, 4, 1 };
        u_char                      th[7];
        ngx_rtmp_core_srv_conf_t   *cscf;
        uint8_t                     fmt;
        ngx_connection_t           *c;
    
        c = s->connection;
        cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
    
        /* 检测 csid 是否太大 */
        if (h->csid >= (uint32_t)cscf->max_streams) {
            ngx_log_error(NGX_LOG_INFO, c->log, 0,
                    "RTMP out chunk stream too big: %D >= %D",
                    h->csid, cscf->max_streams);
            ngx_rtmp_finalize_session(s);
            return;
        }
    
        /* 检测输出缓存的总大小 */
        /* detect packet size */
        mlen = 0;
        nbufs = 0;
        for(l = out; l; l = l->next) {
            mlen += (l->buf->last - l->buf->pos);
            ++nbufs;
        }
    
        /* 若当前发送的是 RTMP 消息的第一个 chunk 时,fmt 必须为 0 */
        fmt = 0;
        
        /* 下面的判断是检测是否分块,若分块了,则头部中相应的字段就有变化了 */
        
        /* 若这两个 chunk:lh 和 h 是在同一个消息的流中,即 msid 相同 */
        if (lh && lh->csid && h->msid == lh->msid) 
        {
            /* fmt 加 1,此时 Message Header 需 7 bytes */
            ++fmt;
            /* 若不仅在同一个流中,且 chunk 的长度和消息类型都相同 */
            if (h->type == lh->type && mlen && mlen == lh->mlen) {
                /*  fmt 再加 1,此时表示 Message Header 仅需 3 bytes,即 timestamp delta  */
                ++fmt;
                if (h->timestamp == lh->timestamp) {
                    /* fmt 再加 1,此时表示这个 chunk 和上一个完全相同的,
                     * 即 不需要 Message Header 了,为 0 byte */
                    ++fmt;
                }
            }
            /* 这是计算当前 chunk 和上一个 chunk 的时间戳差值,即 timestamp delta  */
            timestamp = h->timestamp - lh->timestamp;
        } 
        else 
        {
            /* 这里表示这是第一个 chunk 或者是不需要分块的 */
            timestamp = h->timestamp;
        }
    
        /*if (lh) {
            *lh = *h;
            lh->mlen = mlen;
        }*/
    
        /* 根据 fmt 得出 rtmp 消息头的实际大小,这里默认 rtmp 的 Basic Header 大小为 1 byte */
        hsize = hdrsize[fmt];
    
        ngx_log_debug8(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                "RTMP prep %s (%d) fmt=%d csid=%uD timestamp=%uD "
                "mlen=%uD msid=%uD nbufs=%d",
                ngx_rtmp_message_type(h->type), (int)h->type, (int)fmt,
                h->csid, timestamp, mlen, h->msid, nbufs);
    
        /* 这里只有当 timestamp 大于 3 字节能表示的最大数值时,才会有 扩展时间戳 */
        ext_timestamp = 0;
        if (timestamp >= 0x00ffffff) {
            /* 若需要用到扩展时间戳,则 ext_timestamp 表示真正的时间戳,而 timestamp 字段全置为 1 */
            ext_timestamp = timestamp;
            timestamp = 0x00ffffff;
            /* 扩展时间戳占 4 bytes */
            hsize += 4;
        }
    
        /* 若 csid 大于 64,则表示 Basic Header 需要 2 个字节,此时 csid 范围 [64, 319] */
        if (h->csid >= 64) {
            ++hsize;
            /* 若 csid 大于 320,则表示 Basic Header 需要 3 个字节,此时 csid 范围 [64, 65599] */
            if (h->csid >= 320) {
                ++hsize;
            }
        }
    
        /* 将 out->buf->pos 指针指向 rtmp 消息的头部起始处 */
        /* fill initial header */
        out->buf->pos -= hsize;
        p = out->buf->pos;
    
        /* basic header */
        *p = (fmt << 6);
        if (h->csid >= 2 && h->csid <= 63) {
            *p++ |= (((uint8_t)h->csid) & 0x3f);
        } else if (h->csid >= 64 && h->csid < 320) {
            ++p;
            *p++ = (uint8_t)(h->csid - 64);
        } else {
            *p++ |= 1;
            *p++ = (uint8_t)(h->csid - 64);
            *p++ = (uint8_t)((h->csid - 64) >> 8);
        }
    
        /* create fmt3 header for successive fragments */
        thsize = p - out->buf->pos;
        ngx_memcpy(th, out->buf->pos, thsize);
        /* 提取出 fmt 值 */
        th[0] |= 0xc0;
    
        /* message header */
        if (fmt <= 2) {
            /* 此时 timestamp 存在*/
            pp = (u_char*)&timestamp;
            *p++ = pp[2];
            *p++ = pp[1];
            *p++ = pp[0];
            if (fmt <= 1) {
                /* 此时 message type id 和 message length 存在 */
                pp = (u_char*)&mlen;
                *p++ = pp[2];
                *p++ = pp[1];
                *p++ = pp[0];
                *p++ = h->type;
                if (fmt == 0) {
                    /* 此时 message stream id 存在 */
                    pp = (u_char*)&h->msid;
                    *p++ = pp[0];
                    *p++ = pp[1];
                    *p++ = pp[2];
                    *p++ = pp[3];
                }
            }
        }
        /* 若 fmt 大于 2,则说明没有 message header */
    
        /* extended header */
        if (ext_timestamp) {
            pp = (u_char*)&ext_timestamp;
            *p++ = pp[3];
            *p++ = pp[2];
            *p++ = pp[1];
            *p++ = pp[0];
    
            /* This CONTRADICTS the standard
             * but that's the way flash client
             * wants data to be encoded;
             * ffmpeg complains */
            if (cscf->play_time_fix) {
                ngx_memcpy(&th[thsize], p - 4, 4);
                thsize += 4;
            }
        }
    
        /* append headers to successive fragments */
        for(out = out->next; out; out = out->next) {
            out->buf->pos -= thsize;
            ngx_memcpy(out->buf->pos, th, thsize);
        }
    }
    

    2.3 ngx_rtmp_send_shared_packet

    static ngx_int_t ngx_rtmp_send_shared_packet(ngx_rtmp_session_t *s, ngx_chain_t *cl)
    {
        ngx_rtmp_core_srv_conf_t       *cscf;
        ngx_int_t                       rc;
    
        if (cl == NULL) {
            return NGX_ERROR;
        }
    
        cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
    
        /* cl 中已经包含一个完整的将要发送给客户端的 rtmp 包 */
        rc = ngx_rtmp_send_message(s, cl, 0);
    
        /* 将 cl 的引用计数减 1,当为 0 时,则将其插入到 cscf->free 链表头中 */
        ngx_rtmp_free_shared_chain(cscf, cl);
    
        return rc;
    }
    

    2.3.1 ngx_rtmp_send_message

    ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out,
            ngx_uint_t priority)
    {
        ngx_uint_t                      nmsg;
    
        /* 计算要发送的消息数 */
        nmsg = (s->out_last - s->out_pos) % s->out_queue + 1;
    
        if (priority > 3) {
            priority = 3;
        }
    
        /* drop packet? 丢包
         * Note we always leave 1 slot free */
        if (nmsg + priority * s->out_queue / 4 >= s->out_queue) {
            ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                    "RTMP drop message bufs=%ui, priority=%ui",
                    nmsg, priority);
            return NGX_AGAIN;
        }
    
        s->out[s->out_last++] = out;
        s->out_last %= s->out_queue;
    
        /* out 的引用计数加 1 */
        ngx_rtmp_acquire_shared_chain(out);
    
        ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                "RTMP send nmsg=%ui, priority=%ui #%ui",
                nmsg, priority, s->out_last);
    
        if (priority && s->out_buffer && nmsg < s->out_cork) {
            return NGX_OK;
        }
    
        /* 若当前连接的 write 事件还没有活跃时,发送该 rtmp 包 */
        if (!s->connection->write->active) {
            ngx_rtmp_send(s->connection->write);
            /*return ngx_add_event(s->connection->write, NGX_WRITE_EVENT, NGX_CLEAR_EVENT);*/
        }
    
        return NGX_OK;
    }
    

    2.3.2 ngx_rtmp_send

    static void ngx_rtmp_send(ngx_event_t *wev)
    {
        ngx_connection_t           *c;
        ngx_rtmp_session_t         *s;
        ngx_int_t                   n;
        ngx_rtmp_core_srv_conf_t   *cscf;
    
        c = wev->data;
        s = c->data;
    
        if (c->destroyed) {
            return;
        }
    
        if (wev->timedout) {
            ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
                    "client timed out");
            c->timedout = 1;
            ngx_rtmp_finalize_session(s);
            return;
        }
    
        if (wev->timer_set) {
            ngx_del_timer(wev);
        }
    
        if (s->out_chain == NULL && s->out_pos != s->out_last) {
            /* 将保存着将要发送的 rtmp 包赋给 s->out_chain */
            s->out_chain = s->out[s->out_pos];
            s->out_bpos = s->out_chain->buf->pos;
        }
    
        while (s->out_chain) {
            /* 调用 ngx_unix_send 回调函数发送 */
            n = c->send(c, s->out_bpos, s->out_chain->buf->last - s->out_bpos);
    
            if (n == NGX_AGAIN || n == 0) {
                ngx_add_timer(c->write, s->timeout);
                if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
                    ngx_rtmp_finalize_session(s);
                }
                return;
            }
    
            if (n < 0) {
                ngx_rtmp_finalize_session(s);
                return;
            }
    
            s->out_bytes += n;
            s->ping_reset = 1;
            ngx_rtmp_update_bandwidth(&ngx_rtmp_bw_out, n);
            s->out_bpos += n;
            /* 若已经完全发送 */
            if (s->out_bpos == s->out_chain->buf->last) {
                /* 指向下一个 ngx_chian_t */
                s->out_chain = s->out_chain->next;
                /* 若不存在 */
                if (s->out_chain == NULL) {
                    cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
                    /* 则释放之前已经发送完成的 ngx_chain_t */
                    ngx_rtmp_free_shared_chain(cscf, s->out[s->out_pos]);
                    ++s->out_pos;
                    s->out_pos %= s->out_queue;
                    /* 若相等,则表明所有消息的都发送了 */
                    if (s->out_pos == s->out_last) {
                        break;
                    }
                    s->out_chain = s->out[s->out_pos];
                }
                s->out_bpos = s->out_chain->buf->pos;
            }
        }
    
        /* 若当前 写事件 是活跃的,则将其从 epoll 等事件监控机制中删除 */
        if (wev->active) {
            ngx_del_event(wev, NGX_WRITE_EVENT, 0);
        }
    
        /* 将 posted_dry_events 延迟队列上的事件都移除,并执行 */
        ngx_event_process_posted((ngx_cycle_t *) ngx_cycle, &s->posted_dry_events);
    }
    

    2.3.3 ngx_rtmp_free_shared_chain

    void ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *in)
    {
        ngx_chain_t        *cl;
    
        /* 引用计数减 1 */
        if (ngx_rtmp_ref_put(in)) {
            return;
        }
    
        /* 将 cl 插入到 cscf->free 链表头中 */
        for (cl = in; ; cl = cl->next) {
            if (cl->next == NULL) {
                cl->next = cscf->free;
                cscf->free = in;
                return;
            }
        }
    }
    

    2.4 ngx_rtmp_send_bandwidth

    ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
                            uint8_t limit_type)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_bandwidth(s, ack_size, limit_type));
    }
    

    2.4.1 ngx_rtmp_create_bandwidth

    ngx_chain_t *ngx_rtmp_create_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size,
                              uint8_t limit_type)
    {
        ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: bandwidth ack_size=%uD limit=%d",
                       ack_size, (int)limit_type);
    
        {
            /* 创建发送缓存并初始化 rtmp 头部一些信心,如 message type、csid 等 */
            NGX_RTMP_USER_START(s, NGX_RTMP_MSG_BANDWIDTH);
    
            /* 填充将要发送的实际数据 */
            NGX_RTMP_USER_OUT4(ack_size);
            NGX_RTMP_USER_OUT1(limit_type);
    
            /* 主要是构造 rtmp 头,形成完整的 rtmp 包 */
            NGX_RTMP_USER_END(s);
        }
    }
    

    2.4.2 NGX_RTMP_USER_OUT1

    #define NGX_RTMP_USER_OUT1(v)                                               
        *(__b->last++) = ((u_char*)&v)[0];
    
    send bandwidth

    2.5 ngx_rtmp_send_chunk_size

    ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_chunk_size(s, chunk_size));
    }
    

    2.5.1 ngx_rtmp_create_chunk_size

    ngx_chain_t *ngx_rtmp_create_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size)
    {
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "chunk_size=%uD", chunk_size);
    
        {
            NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE);
    
            NGX_RTMP_USER_OUT4(chunk_size);
    
            NGX_RTMP_USER_END(s);
        }
    }
    
    send chunk_size

    2.6 ngx_rtmp_send_amf

    ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
                      ngx_rtmp_amf_elt_t *elts, size_t nelts)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_amf(s, h, elts, nelts));
    }
    

    2.6.1 ngx_rtmp_create_amf

    ngx_chain_t *ngx_rtmp_create_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
                        ngx_rtmp_amf_elt_t *elts, size_t nelts)
    {
        ngx_chain_t                *first;
        ngx_int_t                   rc;
        ngx_rtmp_core_srv_conf_t   *cscf;
    
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: amf nelts=%ui", nelts);
    
        cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
    
        first = NULL;
    
        /* 先将 amf 的数据写入到  */
        rc = ngx_rtmp_append_amf(s, &first, NULL, elts, nelts);
    
        if (rc != NGX_OK && first) {
            ngx_rtmp_free_shared_chain(cscf, first);
            first = NULL;
        }
    
        if (first) {
            /* 最后将其封装成 rtmp 包 */
            ngx_rtmp_prepare_message(s, h, NULL, first);
        }
    
        return first;
    }
    

    2.6.2 ngx_rtmp_append_amf

    /* AMF sender */
    
    /* NOTE: this function does not free shared bufs on error */
    ngx_int_t ngx_rtmp_append_amf(ngx_rtmp_session_t *s,
                        ngx_chain_t **first, ngx_chain_t **last,
                        ngx_rtmp_amf_elt_t *elts, size_t nelts)
    {
        ngx_rtmp_amf_ctx_t          act;
        ngx_rtmp_core_srv_conf_t   *cscf;
        ngx_int_t                   rc;
    
        cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
    
        memset(&act, 0, sizeof(act));
        act.arg = cscf;
        act.alloc = ngx_rtmp_alloc_amf_buf;
        act.log = s->connection->log;
    
        if (first) {
            act.first = *first;
        }
    
        if (last) {
            act.link = *last;
        }
    
        /* 将 elts 中的 amf 数据写入到 act 中 */
        rc = ngx_rtmp_amf_write(&act, elts, nelts);
    
        if (first) {
            *first = act.first;
        }
    
        if (last) {
            *last = act.link;
        }
    
        return rc;
    }
    
    send amf

    2.7 ngx_rtmp_send_stream_begin

    ngx_int_t
    ngx_rtmp_send_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_stream_begin(s, msid));
    }
    

    2.7.1 ngx_rtmp_create_stream_begin

    /* User control messages */
    
    ngx_chain_t *
    ngx_rtmp_create_stream_begin(ngx_rtmp_session_t *s, uint32_t msid)
    {
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: stream_begin msid=%uD", msid);
    
        {
            NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_BEGIN);
    
            NGX_RTMP_USER_OUT4(msid);
    
            NGX_RTMP_USER_END(s);
        }
    }
    

    2.7.2 NGX_RTMP_UCTL_START

    #define NGX_RTMP_UCTL_START(s, type, utype)                                 
        NGX_RTMP_USER_START(s, type);                                           
        *(__b->last++) = (u_char)((utype) >> 8);                                
        *(__b->last++) = (u_char)(utype);
    
    send stream begin

    2.8 ngx_rtmp_send_status

    ngx_int_t
    ngx_rtmp_send_status(ngx_rtmp_session_t *s, char *code, char* level, char *desc)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_status(s, code, level, desc));
    }
    

    2.8.1 ngx_rtmp_create_status

    ngx_chain_t *
    ngx_rtmp_create_status(ngx_rtmp_session_t *s, char *code, char* level,
                           char *desc)
    {
        ngx_rtmp_header_t               h;
        static double                   trans;
    
        static ngx_rtmp_amf_elt_t       out_inf[] = {
    
            { NGX_RTMP_AMF_STRING,
              ngx_string("level"),
              NULL, 0 },
    
            { NGX_RTMP_AMF_STRING,
              ngx_string("code"),
              NULL, 0 },
    
            { NGX_RTMP_AMF_STRING,
              ngx_string("description"),
              NULL, 0 },
        };
    
        static ngx_rtmp_amf_elt_t       out_elts[] = {
    
            { NGX_RTMP_AMF_STRING,
              ngx_null_string,
              "onStatus", 0 },
    
            { NGX_RTMP_AMF_NUMBER,
              ngx_null_string,
              &trans, 0 },
    
            { NGX_RTMP_AMF_NULL,
              ngx_null_string,
              NULL, 0 },
    
            { NGX_RTMP_AMF_OBJECT,
              ngx_null_string,
              out_inf,
              sizeof(out_inf) },
        };
    
        ngx_log_debug3(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: status code='%s' level='%s' desc='%s'",
                       code, level, desc);
    
        out_inf[0].data = level;
        out_inf[1].data = code;
        out_inf[2].data = desc;
    
        memset(&h, 0, sizeof(h));
    
        h.type = NGX_RTMP_MSG_AMF_CMD;
        h.csid = NGX_RTMP_CSID_AMF;
        h.msid = NGX_RTMP_MSID;
    
        return ngx_rtmp_create_amf(s, &h, out_elts,
                                   sizeof(out_elts) / sizeof(out_elts[0]));
    }
    
    send onStatus

    2.9 ngx_rtmp_send_recorded

    ngx_int_t
    ngx_rtmp_send_recorded(ngx_rtmp_session_t *s, uint32_t msid)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_recorded(s, msid));
    }
    

    2.9.1 ngx_rtmp_create_recorded

    ngx_chain_t *
    ngx_rtmp_create_recorded(ngx_rtmp_session_t *s, uint32_t msid)
    {
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: recorded msid=%uD", msid);
    
        {
            NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_RECORDED);
    
            NGX_RTMP_USER_OUT4(msid);
    
            NGX_RTMP_USER_END(s);
        }
    }
    
    send record

    2.10 ngx_rtmp_send_sample_access

    ngx_int_t
    ngx_rtmp_send_sample_access(ngx_rtmp_session_t *s)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_sample_access(s));
    }
    

    2.10.1 ngx_rtmp_create_sample_access

    ngx_chain_t *
    ngx_rtmp_create_sample_access(ngx_rtmp_session_t *s)
    {
        ngx_rtmp_header_t               h;
    
        static int                      access = 1;
    
        static ngx_rtmp_amf_elt_t       access_elts[] = {
    
            { NGX_RTMP_AMF_STRING,
              ngx_null_string,
              "|RtmpSampleAccess", 0 },
    
            { NGX_RTMP_AMF_BOOLEAN,
              ngx_null_string,
              &access, 0 },
    
            { NGX_RTMP_AMF_BOOLEAN,
              ngx_null_string,
              &access, 0 },
        };
    
        memset(&h, 0, sizeof(h));
    
        h.type = NGX_RTMP_MSG_AMF_META;
        h.csid = NGX_RTMP_CSID_AMF;
        h.msid = NGX_RTMP_MSID;
    
        return ngx_rtmp_create_amf(s, &h, access_elts,
                                   sizeof(access_elts) / sizeof(access_elts[0]));
    }
    
    send sample access

    2.11 ngx_rtmp_send_stream_eof

    ngx_int_t
    ngx_rtmp_send_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
    {
        return ngx_rtmp_send_shared_packet(s,
               ngx_rtmp_create_stream_eof(s, msid));
    }
    

    2.11.1 ngx_rtmp_create_stream_eof

    ngx_chain_t *
    ngx_rtmp_create_stream_eof(ngx_rtmp_session_t *s, uint32_t msid)
    {
        ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
                       "create: stream_end msid=%uD", msid);
    
        {
            NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_EOF);
    
            NGX_RTMP_USER_OUT4(msid);
    
            NGX_RTMP_USER_END(s);
        }
    }
    
    send stream eof

  • 相关阅读:
    LeetCode刷题--只出现一次的数
    Java进阶--多线程
    .NETCore :正则表达式
    .net core多线程:Thread
    .NETCore : Linq简介
    .NET Core:List,ArrayList和Dictionary
    .NET Core 装箱与拆箱
    .NET Core 泛型、逆变与协变
    .NET Core 类的生命周期
    .NET Core 类的多态与继承
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/8987867.html
Copyright © 2011-2022 走看看