zoukankan      html  css  js  c++  java
  • Nginx-rtmp之监听端口的管理

    1. 概述

    监听端口属于 server 虚拟主机,它是由 server{} 块下的 listen 配置项决定的。

    每监听一个 TCP 端口,都将使用一个独立的 ngx_rtmp_conf_port_t 结构体来表示:

    typedef struct {
        /* socket 地址家族 */
        int                     family;
        
        /* 监听端口 */
        in_port_t               port;
        
        /* 监听的端口下对应着的所有 ngx_rtmp_conf_addr_t 地址 */
        ngx_array_t             addrs;       /* array of ngx_rtmp_conf_addr_t */
    } ngx_rtmp_conf_port_t;
    

    一个端口,可能对应着多个地址(当主机上有多个 IP 地址时),该地址用 ngx_rtmp_conf_addr_t 表示:

    typedef struct {
        struct sockaddr        *sockaddr;
        socklen_t               socklen;
    
        ngx_rtmp_conf_ctx_t    *ctx;
    
        unsigned                bind:1;
        unsigned                wildcard:1;
    #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
        unsigned                ipv6only:2;
    #endif
        unsigned                so_keepalive:2;
        unsigned                proxy_protocol:1;
    #if (NGX_HAVE_KEEPALIVE_TUNABLE)
        int                     tcp_keepidle;
        int                     tcp_keepintvl;
        int                     tcp_keepcnt;
    #endif
    } ngx_rtmp_conf_addr_t;
    

    在 ngx_rtmp_core_main_conf_t 结构体中,保存着 nginx.conf 中所有需要监听的端口:

    typedef struct {
        ...
        /* 该数组保存着的元素是指向 ngx_rtmp_listen_t 结构体的指针,
         * 每个元素代表一个需要监听的端口 */
        ngx_array_t     listen;  /* ngx_rtmp_listen_t */
        ...
    }ngx_rtmp_core_main_conf_t;
    

    Nginx 每从 nginx.conf 中解析中一个 listen 配置项,就把该配置项的值组织成 ngx_rtmp_listen_t 结构体添加到
    ngx_rtmp_core_main_conf_t 的成员 listen 数组中,如下:

    typedef struct {
        u_char                  sockaddr[NGX_SOCKADDRLEN];
        socklen_t               socklen;
    
        /* server ctx */
        ngx_rtmp_conf_ctx_t    *ctx;
    
        unsigned                bind:1;
        unsigned                wildcard:1;
    #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
        unsigned                ipv6only:2;
    #endif
        unsigned                so_keepalive:2;
        unsigned                proxy_protocol:1;
    #if (NGX_HAVE_KEEPALIVE_TUNABLE)
        int                     tcp_keepidle;
        int                     tcp_keepintvl;
        int                     tcp_keepcnt;
    #endif
    } ngx_rtmp_listen_t;
    

    对同一个端口 1935,可以同时监听 127.0.0.1:8000、192.168.56.101:1935 这两个地址,当一台物理机器具备多个 IP
    地址时这是很有用的。对应到 RTMP 的框架上,Nginx 是使用 ngx_rtmp_conf_addr_t 结构体来表示一个对应着具体地址
    的监听端口,因此,一个 ngx_rtmp_conf_port_t 将会对应多个 ngx_rtmp_conf_addr_t,而 ngx_rtmp_conf_addr_t 是
    以动态数组的形式保存在 ngx_rtmp_conf_port_t 中的 addrs 成员中的。

    2. 监听端口的解析与管理

    2.1 ngx_rtmp_core_listen:解析 listen 配置项

    static char *ngx_rtmp_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
    {
        size_t                      len, off;
        in_port_t                   port;
        ngx_str_t                  *value;
        ngx_url_t                   u;
        ngx_uint_t                  i, m;
        ngx_module_t              **modules;
        struct sockaddr            *sa;
        ngx_rtmp_listen_t          *ls;
        struct sockaddr_in         *sin;
        ngx_rtmp_core_main_conf_t  *cmcf;
    #if (NGX_HAVE_INET6)
        struct sockaddr_in6        *sin6;
    #endif
    
        /* 指向 "listen xxx" 配置项,如 "listen 1935" */
        value = cf->args->elts;
    
        ngx_memzero(&u, sizeof(ngx_url_t));
    
        /* 将监听的端口赋给 u.url,如 "1935" */
        u.url = value[1];
        u.listen = 1;
    
        /* 解析该监听的端口 */
        if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
            if (u.err) {
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                   "%s in "%V" of the "listen" directive",
                                   u.err, &u.url);
            }
    
            return NGX_CONF_ERROR;
        }
    
        cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
    
        ls = cmcf->listen.elts;
    
        /* 遍历listen数组,若有元素,将待监听的地址与数组中的保存的地址进行比较,
         * 若有相同的,则返回错误,否则添加到listen数组中 */
        for (i = 0; i < cmcf->listen.nelts; i++) {
    
            sa = (struct sockaddr *) ls[i].sockaddr;
    
            if (sa->sa_family != u.family) {
                continue;
            }
    
            switch (sa->sa_family) {
    
    #if (NGX_HAVE_INET6)
            case AF_INET6:
                off = offsetof(struct sockaddr_in6, sin6_addr);
                len = 16;
                sin6 = (struct sockaddr_in6 *) sa;
                port = sin6->sin6_port;
                break;
    #endif
    
            default: /* AF_INET */
                off = offsetof(struct sockaddr_in, sin_addr);
                len = 4;
                sin = (struct sockaddr_in *) sa;
                port = sin->sin_port;
                break;
            }
    
            if (ngx_memcmp(ls[i].sockaddr + off, (u_char *) &u.sockaddr + off, len)
                != 0)
            {
                continue;
            }
    
            if (port != u.port) {
                continue;
            }
    
            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                               "duplicate "%V" address and port pair", &u.url);
            return NGX_CONF_ERROR;
        }
    
        ls = ngx_array_push(&cmcf->listen);
        if (ls == NULL) {
            return NGX_CONF_ERROR;
        }
    
        ngx_memzero(ls, sizeof(ngx_rtmp_listen_t));
    
        ngx_memcpy(ls->sockaddr, (u_char *) &u.sockaddr, u.socklen);
    
        ls->socklen = u.socklen;
        ls->wildcard = u.wildcard;
        ls->ctx = cf->ctx;
    
    #if (nginx_version >= 1009011)
        modules = cf->cycle->modules;
    #else
        modules = ngx_modules;
    #endif
    
        for (m = 0; modules[m]; m++) {
            if (modules[m]->type != NGX_RTMP_MODULE) {
                continue;
            }
        }
    
        /* 若监听端口后面还附加有其他条件,则进行处理,否则直接返回 */
        for (i = 2; i < cf->args->nelts; i++) {
    
            if (ngx_strcmp(value[i].data, "bind") == 0) {
                ls->bind = 1;
                continue;
            }
    
            if (ngx_strncmp(value[i].data, "ipv6only=o", 10) == 0) {
    #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
                struct sockaddr  *sa;
                u_char            buf[NGX_SOCKADDR_STRLEN];
    
                sa = (struct sockaddr *) ls->sockaddr;
    
                if (sa->sa_family == AF_INET6) {
    
                    if (ngx_strcmp(&value[i].data[10], "n") == 0) {
                        ls->ipv6only = 1;
    
                    } else if (ngx_strcmp(&value[i].data[10], "ff") == 0) {
                        ls->ipv6only = 0;
    
                    } else {
                        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                           "invalid ipv6only flags "%s"",
                                           &value[i].data[9]);
                        return NGX_CONF_ERROR;
                    }
    
                    ls->bind = 1;
    
                } else {
                    len = ngx_sock_ntop(sa,
    #if (nginx_version >= 1005003)
                                        ls->socklen,
    #endif
                                        buf, NGX_SOCKADDR_STRLEN, 1);
    
                    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                       "ipv6only is not supported "
                                       "on addr "%*s", ignored", len, buf);
                }
    
                continue;
    #else
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                   "bind ipv6only is not supported "
                                   "on this platform");
                return NGX_CONF_ERROR;
    #endif
            }
    
            if (ngx_strncmp(value[i].data, "so_keepalive=", 13) == 0) {
    
                if (ngx_strcmp(&value[i].data[13], "on") == 0) {
                    ls->so_keepalive = 1;
    
                } else if (ngx_strcmp(&value[i].data[13], "off") == 0) {
                    ls->so_keepalive = 2;
    
                } else {
    
    #if (NGX_HAVE_KEEPALIVE_TUNABLE)
                    u_char     *p, *end;
                    ngx_str_t   s;
    
                    end = value[i].data + value[i].len;
                    s.data = value[i].data + 13;
    
                    p = ngx_strlchr(s.data, end, ':');
                    if (p == NULL) {
                        p = end;
                    }
    
                    if (p > s.data) {
                        s.len = p - s.data;
    
                        ls->tcp_keepidle = ngx_parse_time(&s, 1);
                        if (ls->tcp_keepidle == (time_t) NGX_ERROR) {
                            goto invalid_so_keepalive;
                        }
                    }
    
                    s.data = (p < end) ? (p + 1) : end;
    
                    p = ngx_strlchr(s.data, end, ':');
                    if (p == NULL) {
                        p = end;
                    }
    
                    if (p > s.data) {
                        s.len = p - s.data;
    
                        ls->tcp_keepintvl = ngx_parse_time(&s, 1);
                        if (ls->tcp_keepintvl == (time_t) NGX_ERROR) {
                            goto invalid_so_keepalive;
                        }
                    }
    
                    s.data = (p < end) ? (p + 1) : end;
    
                    if (s.data < end) {
                        s.len = end - s.data;
    
                        ls->tcp_keepcnt = ngx_atoi(s.data, s.len);
                        if (ls->tcp_keepcnt == NGX_ERROR) {
                            goto invalid_so_keepalive;
                        }
                    }
    
                    if (ls->tcp_keepidle == 0 && ls->tcp_keepintvl == 0
                        && ls->tcp_keepcnt == 0)
                    {
                        goto invalid_so_keepalive;
                    }
    
                    ls->so_keepalive = 1;
    
    #else
    
                    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                       "the "so_keepalive" parameter accepts "
                                       "only "on" or "off" on this platform");
                    return NGX_CONF_ERROR;
    
    #endif
                }
    
                ls->bind = 1;
    
                continue;
    
    #if (NGX_HAVE_KEEPALIVE_TUNABLE)
            invalid_so_keepalive:
    
                ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                                   "invalid so_keepalive value: "%s"",
                                   &value[i].data[13]);
                return NGX_CONF_ERROR;
    #endif
            }
    
            if (ngx_strcmp(value[i].data, "proxy_protocol") == 0) {
                ls->proxy_protocol = 1;
                continue;
            }
    
            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                               "the invalid "%V" parameter", &value[i]);
            return NGX_CONF_ERROR;
        }
    
        return NGX_CONF_OK;
    }
    

    2.2 ngx_rtmp_add_ports: 将监听端口添加到 ports 数组中

    static ngx_int_t ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
        ngx_rtmp_listen_t *listen)
    {
        in_port_t              p;
        ngx_uint_t             i;
        struct sockaddr       *sa;
        struct sockaddr_in    *sin;
        ngx_rtmp_conf_port_t  *port;
        ngx_rtmp_conf_addr_t  *addr;
    #if (NGX_HAVE_INET6)
        struct sockaddr_in6   *sin6;
    #endif
    
        sa = (struct sockaddr *) &listen->sockaddr;
    
        switch (sa->sa_family) {
    
    #if (NGX_HAVE_INET6)
        case AF_INET6:
            sin6 = (struct sockaddr_in6 *) sa;
            p = sin6->sin6_port;
            break;
    #endif
    
        default: /* AF_INET */
            sin = (struct sockaddr_in *) sa;
            p = sin->sin_port;
            break;
        }
    
        /* 首先比较 ports 数组中是否已经存在该待监听端口的,若已经存在,
         * 则不用再次添加了,直接跳到 found 下,否则添加一个新的端口 */
        port = ports->elts;
        for (i = 0; i < ports->nelts; i++) {
            if (p == port[i].port && sa->sa_family == port[i].family) {
    
                /* a port is already in the port list */
    
                port = &port[i];
                goto found;
            }
        }
    
        /* add a port to the port list */
    
        port = ngx_array_push(ports);
        if (port == NULL) {
            return NGX_ERROR;
        }
    
        port->family = sa->sa_family;
        port->port   = p;
    
        if (ngx_array_init(&port->addrs, cf->temp_pool, 2,
                           sizeof(ngx_rtmp_conf_addr_t))
            != NGX_OK)
        {
            return NGX_ERROR;
        }
    
    found:
        
        /* 这里有可能一个端口对应几个 IP 地址 */
        addr = ngx_array_push(&port->addrs);
        if (addr == NULL) {
            return NGX_ERROR;
        }
    
        addr->sockaddr       = (struct sockaddr *) &listen->sockaddr;
        addr->socklen        = listen->socklen;
        addr->ctx            = listen->ctx;
        addr->bind           = listen->bind;
        addr->wildcard       = listen->wildcard;
        addr->so_keepalive   = listen->so_keepalive;
        addr->proxy_protocol = listen->proxy_protocol;
    #if (NGX_HAVE_KEEPALIVE_TUNABLE)
        addr->tcp_keepidle   = listen->tcp_keepidle;
        addr->tcp_keepintvl  = listen->tcp_keepintvl;
        addr->tcp_keepcnt    = listen->tcp_keepcnt;
    #endif
    #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
        addr->ipv6only       = listen->ipv6only;
    #endif
    
        return NGX_OK;
    }
    

    2.3 ngx_rtmp_optimize_servers

    static char *ngx_rtmp_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports)
    {
        ngx_uint_t             i, p, last, bind_wildcard;
        ngx_listening_t       *ls;
        ngx_rtmp_port_t       *mport;
        ngx_rtmp_conf_port_t  *port;
        ngx_rtmp_conf_addr_t  *addr;
    
        port = ports->elts;
        for (p = 0; p < ports->nelts; p++) {
    
            ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
                     sizeof(ngx_rtmp_conf_addr_t), ngx_rtmp_cmp_conf_addrs);
    
            addr = port[p].addrs.elts;
            last = port[p].addrs.nelts;
    
            /*
             * if there is the binding to the "*:port" then we need to bind()
             * to the "*:port" only and ignore the other bindings
             */
    
            if (addr[last - 1].wildcard) {
                addr[last - 1].bind = 1;
                bind_wildcard = 1;
    
            } else {
                bind_wildcard = 0;
            }
    
            i = 0;
    
            while (i < last) {
    
                if (bind_wildcard && !addr[i].bind) {
                    i++;
                    continue;
                }
    
                /* 创建并初始化 ngx_listening_t 结构体 */
                ls = ngx_create_listening(cf, addr[i].sockaddr, addr[i].socklen);
                if (ls == NULL) {
                    return NGX_CONF_ERROR;
                }
    
                ls->addr_ntop = 1;
                ls->handler = ngx_rtmp_init_connection;
                ls->pool_size = 4096;
    
                /* TODO: error_log directive */
                ls->logp = &cf->cycle->new_log;
                ls->log.data = &ls->addr_text;
                ls->log.handler = ngx_accept_log_error;
    
                ls->keepalive = addr[i].so_keepalive;
    #if (NGX_HAVE_KEEPALIVE_TUNABLE)
                ls->keepidle = addr[i].tcp_keepidle;
                ls->keepintvl = addr[i].tcp_keepintvl;
                ls->keepcnt = addr[i].tcp_keepcnt;
    #endif
    
    #if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
                ls->ipv6only = addr[i].ipv6only;
    #endif
    
                mport = ngx_palloc(cf->pool, sizeof(ngx_rtmp_port_t));
                if (mport == NULL) {
                    return NGX_CONF_ERROR;
                }
    
                ls->servers = mport;
    
                if (i == last - 1) {
                    mport->naddrs = last;
    
                } else {
                    mport->naddrs = 1;
                    i = 0;
                }
    
                switch (ls->sockaddr->sa_family) {
    #if (NGX_HAVE_INET6)
                case AF_INET6:
                    if (ngx_rtmp_add_addrs6(cf, mport, addr) != NGX_OK) {
                        return NGX_CONF_ERROR;
                    }
                    break;
    #endif
                default: /* AF_INET */
                    if (ngx_rtmp_add_addrs(cf, mport, addr) != NGX_OK) {
                        return NGX_CONF_ERROR;
                    }
                    break;
                }
    
                addr++;
                last--;
            }
        }
    
        return NGX_CONF_OK;
    }
    
  • 相关阅读:
    Express ejs 3.* layout.ejs
    old header
    mac 命令行 安装 需要管理员 权限
    Understanding the Debug Log
    insufficient_access_on_cross_reference_entity APEX / Salesforce
    custom list view
    exam help
    Backbone.js Wine Cellar 教程
    理解RESTful架构
    SpringCloud入门之应用程序上下文服务(Spring Cloud Context)详解
  • 原文地址:https://www.cnblogs.com/jimodetiantang/p/8971302.html
Copyright © 2011-2022 走看看