zoukankan      html  css  js  c++  java
  • nginx学习 connection_peer

    现在还没有办法理清整个过程,分析的东西也比较局部。因此就当写在这自己记录一下,后续随着自己理解的加深可以更系统整理一下吧。

    // src/event/ngx_event_connect.c

    该文件主要只有一个函数 ngx_event_connect_peer(ngx_peer_connection_t *pc),该函数在以下几个地方被调用了:

    • http/ngx_http_upstream.c, line 1103
    • mail/ngx_mail_proxy_module.c, line 150
    • mail/ngx_mail_auth_http_module.c, line 194

    可见,这个函数主要作用是nginx自身建立socket连接,通常用于upstream等与后端连接/子请求等。

    先来看几个结构

     typedef struct {
          ngx_recv_pt        recv;
          ngx_recv_chain_pt  recv_chain;
          ngx_recv_pt        udp_recv;
          ngx_send_pt        send;
          ngx_send_chain_pt  send_chain;
          ngx_uint_t         flags;
     } ngx_os_io_t;

    ngx_os_io_t为os的io相关结构。

    然后来认识下ngx_event_actions_t这个nginx的事件结构体

    typedef struct {
          ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
          ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    
          ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
          ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags);
    
          ngx_int_t (*add_conn)(ngx_connection_t *c);
          ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags);
    
          ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait);
          ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags);
    
          ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer);
          void (*done)(ngx_cycle_t *cycle);
    } ngx_event_actions_t;

    主要有8个事件:增加事件、删除事件、启用事件、禁用事件、增加连接、删除连接、处理变化、处理事件。

    接下来看看这个主要的函数:

    ngx_int_t
    ngx_event_connect_peer(ngx_peer_connection_t *pc)
    {
        int                rc;
        ngx_int_t          event;
        ngx_err_t          err;
        ngx_uint_t         level;
        ngx_socket_t       s;
        ngx_event_t       *rev, *wev;
        ngx_connection_t  *c;
    
        rc = pc->get(pc, pc->data);  //回调函数,获取状态
        if (rc != NGX_OK) {
            return rc;
        }
    
        s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0); //创建socket
    
        ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s);
    
        if (s == -1) {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          ngx_socket_n " failed");
            return NGX_ERROR;
        }
    
    
        c = ngx_get_connection(s, pc->log); //为socket分配一个空闲的连接
        //每个connection中注册了read/write事件
        //c->fd = s, 在c中记录了socket句柄s
        //c->log = pc->log, connection的日志指针使用peer_connection的指针
    
        if (c == NULL) { //获取connection失败时,关闭socket
            if (ngx_close_socket(s) == -1) {
                ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                              ngx_close_socket_n "failed");
            }
    
            return NGX_ERROR;
        }
    
    if (pc->rcvbuf) { //peer_connection的收数据长度非0 
      //setsockopt的作用是表明希望不经历由系统缓冲区到socket缓冲区的拷贝而影响程序的性能 
      //此处为了提高接收数据的性能 
      if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1)
            {
                ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                              "setsockopt(SO_RCVBUF) failed");
                goto failed;
            }
        }
    
        if (ngx_nonblocking(s) == -1) { //设置socket为nonblocking的I/O
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          ngx_nonblocking_n " failed");
    
            goto failed;
        }
    
        //local结构体包含sockaddr、socklen_t、ngx_str_t name三个变量 
        if (pc->local) { 
            if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) { //socket bind操作
                ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno,
                              "bind(%V) failed", &pc->local->name);
    
                goto failed;
            }
        }
    
        //给socket对应的空闲连接分配操作函数句柄/指针
        c->recv = ngx_recv; //ngx_os_io_t的recv方法
        c->send = ngx_send; //send方法
        c->recv_chain = ngx_recv_chain; //recv链表
        c->send_chain = ngx_send_chain; //send链表
    
        c->sendfile = 1;
    
        c->log_error = pc->log_error;
    
        if (pc->sockaddr->sa_family != AF_INET) {
          c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED;
            c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED;
    
    #if (NGX_SOLARIS)
            /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */
            c->sendfile = 0;
    #endif
        }
    
        //记录connection的读写事件
        rev = c->read;
        wev = c->write;
    
        rev->log = pc->log;
        wev->log = pc->log;
    
        //当socket对应的connection并填充好后,peer_connection的connection也指向该connection
        pc->connection = c;
    
        //连接数+1,ngx_connection_counter为共享内存中记录的每个worker进程的总连接数?
        c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); 
    
        if (ngx_add_conn) {
            if (ngx_add_conn(c) == NGX_ERROR) { //添加一个add_connection事件
                goto failed;
            }
        }
    
        ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0,
                       "connect to %V, fd:%d #%d", pc->name, s, c->number);
    
        rc = connect(s, pc->sockaddr, pc->socklen); //建立socket连接
    
        if (rc == -1) { //-1表示当前没有可用的客户端连接
            err = ngx_socket_errno;
    
    
            if (err != NGX_EINPROGRESS
    #if (NGX_WIN32)
                /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */
                && err != NGX_EAGAIN
    #endif
                )
            {
                if (err == NGX_ECONNREFUSED
    #if (NGX_LINUX)
                    /*
                     * Linux returns EAGAIN instead of ECONNREFUSED
                     * for unix sockets if listen queue is full
                     */
                    || err == NGX_EAGAIN
    #endif
                    || err == NGX_ECONNRESET
                    || err == NGX_ENETDOWN
                    || err == NGX_ENETUNREACH
                    || err == NGX_EHOSTDOWN
                    || err == NGX_EHOSTUNREACH)
                {
                    level = NGX_LOG_ERR;
    
                } else {
                    level = NGX_LOG_CRIT;
                }
    
                ngx_log_error(level, c->log, err, "connect() to %V failed",
                              pc->name);
    
                return NGX_DECLINED;
            }
        }
    
        if (ngx_add_conn) { // 句柄非空的话,之前添加过一个connection事件
            if (rc == -1) {
    
                /* NGX_EINPROGRESS */
    
                return NGX_AGAIN;
            }
    
            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
    
            wev->ready = 1;
    
            return NGX_OK;
        }
    
        if (ngx_event_flags & NGX_USE_AIO_EVENT) { //epoll使用的AIO
    
            ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno,
                           "connect(): %d", rc);
    
            /* aio, iocp */
    
            if (ngx_blocking(s) == -1) {
                ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                              ngx_blocking_n " failed");
                goto failed;
            }
    
            /*
             * FreeBSD's aio allows to post an operation on non-connected socket.
             * NT does not support it.
             *
             * TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT
             */
    
            rev->ready = 1;
            wev->ready = 1;
    
            return NGX_OK;
        }
    
        if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
    
            /* kqueue */
    
            event = NGX_CLEAR_EVENT;
    
        } else {
    
            /* select, poll, /dev/poll */
    
            event = NGX_LEVEL_EVENT;
        }
    
        if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) { //添加一个read事件
            goto failed;
        }
    
        if (rc == -1) {
    
            /* NGX_EINPROGRESS */
    
            if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) { //没有可用连接时,添加一个write事件?
                goto failed;
            }
    
            return NGX_AGAIN;
        }
    
        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected");
    
        wev->ready = 1; //write ready
    
        return NGX_OK;
    
    failed:
    
        ngx_free_connection(c); //失败之后释放并关闭连接
        //将可用连接数+1,把当前连接给free_connection,给新来的连接使用
    
        if (ngx_close_socket(s) == -1) {
            ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno,
                          ngx_close_socket_n " failed");
        }
    
        return NGX_ERROR;
    }

    还不太理解代码中后面关于挂载read/write的情况。后续回头解决。

  • 相关阅读:
    Redis QPS测试
    go语言下载及安装
    企业级Docker镜像仓库Harbor部署与使用
    Linux格式化数据盘
    【一周一Q】如何快速复制有规律内容
    聊一聊职业能力之执行力
    面试那些事
    使用gitlab时候 fork仓库不会实时从主仓库更新解决方案
    从给定字符串中提取姓名
    测试Websocket建立通信,使用protobuf格式交换数据
  • 原文地址:https://www.cnblogs.com/xiaohuo/p/2555226.html
Copyright © 2011-2022 走看看