现在还没有办法理清整个过程,分析的东西也比较局部。因此就当写在这自己记录一下,后续随着自己理解的加深可以更系统整理一下吧。
// src/event/ngx_event_connect.c
该文件主要只有一个函数 ngx_event_connect_peer(ngx_peer_connection_t *pc),该函数在以下几个地方被调用了:
- http/ngx_http_upstream.c, line 1103
- mail/ngx_mail_proxy_module.c, line 150
- mail/ngx_mail_auth_http_module.c, line 194
可见,这个函数主要作用是nginx自身建立socket连接,通常用于upstream等与后端连接/子请求等。
先来看几个结构
typedef struct { ngx_recv_pt recv; ngx_recv_chain_pt recv_chain; ngx_recv_pt udp_recv; ngx_send_pt send; ngx_send_chain_pt send_chain; ngx_uint_t flags; } ngx_os_io_t;
ngx_os_io_t为os的io相关结构。
然后来认识下ngx_event_actions_t这个nginx的事件结构体
typedef struct { ngx_int_t (*add)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*del)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*enable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*disable)(ngx_event_t *ev, ngx_int_t event, ngx_uint_t flags); ngx_int_t (*add_conn)(ngx_connection_t *c); ngx_int_t (*del_conn)(ngx_connection_t *c, ngx_uint_t flags); ngx_int_t (*process_changes)(ngx_cycle_t *cycle, ngx_uint_t nowait); ngx_int_t (*process_events)(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags); ngx_int_t (*init)(ngx_cycle_t *cycle, ngx_msec_t timer); void (*done)(ngx_cycle_t *cycle); } ngx_event_actions_t;
主要有8个事件:增加事件、删除事件、启用事件、禁用事件、增加连接、删除连接、处理变化、处理事件。
接下来看看这个主要的函数:
ngx_int_t ngx_event_connect_peer(ngx_peer_connection_t *pc) { int rc; ngx_int_t event; ngx_err_t err; ngx_uint_t level; ngx_socket_t s; ngx_event_t *rev, *wev; ngx_connection_t *c; rc = pc->get(pc, pc->data); //回调函数,获取状态 if (rc != NGX_OK) { return rc; } s = ngx_socket(pc->sockaddr->sa_family, SOCK_STREAM, 0); //创建socket ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "socket %d", s); if (s == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_socket_n " failed"); return NGX_ERROR; } c = ngx_get_connection(s, pc->log); //为socket分配一个空闲的连接 //每个connection中注册了read/write事件 //c->fd = s, 在c中记录了socket句柄s //c->log = pc->log, connection的日志指针使用peer_connection的指针 if (c == NULL) { //获取connection失败时,关闭socket if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_close_socket_n "failed"); } return NGX_ERROR; } if (pc->rcvbuf) { //peer_connection的收数据长度非0 //setsockopt的作用是表明希望不经历由系统缓冲区到socket缓冲区的拷贝而影响程序的性能 //此处为了提高接收数据的性能 if (setsockopt(s, SOL_SOCKET, SO_RCVBUF, (const void *) &pc->rcvbuf, sizeof(int)) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, "setsockopt(SO_RCVBUF) failed"); goto failed; } } if (ngx_nonblocking(s) == -1) { //设置socket为nonblocking的I/O ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_nonblocking_n " failed"); goto failed; } //local结构体包含sockaddr、socklen_t、ngx_str_t name三个变量 if (pc->local) { if (bind(s, pc->local->sockaddr, pc->local->socklen) == -1) { //socket bind操作 ngx_log_error(NGX_LOG_CRIT, pc->log, ngx_socket_errno, "bind(%V) failed", &pc->local->name); goto failed; } } //给socket对应的空闲连接分配操作函数句柄/指针 c->recv = ngx_recv; //ngx_os_io_t的recv方法 c->send = ngx_send; //send方法 c->recv_chain = ngx_recv_chain; //recv链表 c->send_chain = ngx_send_chain; //send链表 c->sendfile = 1; c->log_error = pc->log_error; if (pc->sockaddr->sa_family != AF_INET) { c->tcp_nopush = NGX_TCP_NOPUSH_DISABLED; c->tcp_nodelay = NGX_TCP_NODELAY_DISABLED; #if (NGX_SOLARIS) /* Solaris's sendfilev() supports AF_NCA, AF_INET, and AF_INET6 */ c->sendfile = 0; #endif } //记录connection的读写事件 rev = c->read; wev = c->write; rev->log = pc->log; wev->log = pc->log; //当socket对应的connection并填充好后,peer_connection的connection也指向该connection pc->connection = c; //连接数+1,ngx_connection_counter为共享内存中记录的每个worker进程的总连接数? c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); if (ngx_add_conn) { if (ngx_add_conn(c) == NGX_ERROR) { //添加一个add_connection事件 goto failed; } } ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connect to %V, fd:%d #%d", pc->name, s, c->number); rc = connect(s, pc->sockaddr, pc->socklen); //建立socket连接 if (rc == -1) { //-1表示当前没有可用的客户端连接 err = ngx_socket_errno; if (err != NGX_EINPROGRESS #if (NGX_WIN32) /* Winsock returns WSAEWOULDBLOCK (NGX_EAGAIN) */ && err != NGX_EAGAIN #endif ) { if (err == NGX_ECONNREFUSED #if (NGX_LINUX) /* * Linux returns EAGAIN instead of ECONNREFUSED * for unix sockets if listen queue is full */ || err == NGX_EAGAIN #endif || err == NGX_ECONNRESET || err == NGX_ENETDOWN || err == NGX_ENETUNREACH || err == NGX_EHOSTDOWN || err == NGX_EHOSTUNREACH) { level = NGX_LOG_ERR; } else { level = NGX_LOG_CRIT; } ngx_log_error(level, c->log, err, "connect() to %V failed", pc->name); return NGX_DECLINED; } } if (ngx_add_conn) { // 句柄非空的话,之前添加过一个connection事件 if (rc == -1) { /* NGX_EINPROGRESS */ return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; return NGX_OK; } if (ngx_event_flags & NGX_USE_AIO_EVENT) { //epoll使用的AIO ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, ngx_socket_errno, "connect(): %d", rc); /* aio, iocp */ if (ngx_blocking(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_blocking_n " failed"); goto failed; } /* * FreeBSD's aio allows to post an operation on non-connected socket. * NT does not support it. * * TODO: check in Win32, etc. As workaround we can use NGX_ONESHOT_EVENT */ rev->ready = 1; wev->ready = 1; return NGX_OK; } if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { /* kqueue */ event = NGX_CLEAR_EVENT; } else { /* select, poll, /dev/poll */ event = NGX_LEVEL_EVENT; } if (ngx_add_event(rev, NGX_READ_EVENT, event) != NGX_OK) { //添加一个read事件 goto failed; } if (rc == -1) { /* NGX_EINPROGRESS */ if (ngx_add_event(wev, NGX_WRITE_EVENT, event) != NGX_OK) { //没有可用连接时,添加一个write事件? goto failed; } return NGX_AGAIN; } ngx_log_debug0(NGX_LOG_DEBUG_EVENT, pc->log, 0, "connected"); wev->ready = 1; //write ready return NGX_OK; failed: ngx_free_connection(c); //失败之后释放并关闭连接 //将可用连接数+1,把当前连接给free_connection,给新来的连接使用 if (ngx_close_socket(s) == -1) { ngx_log_error(NGX_LOG_ALERT, pc->log, ngx_socket_errno, ngx_close_socket_n " failed"); } return NGX_ERROR; }
还不太理解代码中后面关于挂载read/write的情况。后续回头解决。