zoukankan      html  css  js  c++  java
  • http代理阅读3 发送mem处理

    每次客户端有可读数据触发时,优先检测是否还有数据没有发送,如果有则发送数据,然后在读取client数据

    //向后端发送请求的调用过程
    //ngx_http_upstream_send_request_body->ngx_output_chain->ngx_chain_writer
    static ngx_int_t
    ngx_http_upstream_send_request_body(ngx_http_request_t *r,
        ngx_http_upstream_t *u, ngx_uint_t do_write)
    {
        int                        tcp_nodelay;
        ngx_int_t                  rc;
        ngx_chain_t               *out, *cl, *ln;
        ngx_connection_t          *c;
        ngx_http_core_loc_conf_t  *clcf;
    
        ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                       "http upstream send request body");
    
        if (!r->request_body_no_buffering) {
    
           /* buffered request body */
        /* request_sent 标志位为 1 表示是否已经传递了 request_bufs 缓冲区 */
           if (!u->request_sent) { 
            /* 在第一次以 request_bufs 作为参数调用 ngx_output_chain 
                 * 方法后,request_sent 会置为 1 */
               u->request_sent = 1;
               out = u->request_bufs; //如果是fastcgi这里面为实际发往后端的数据(包括fastcgi格式头部+客户端包体等)
    
           } else {
               out = NULL;
           }
           /* 1. 调用 ngx_out_chain 方法向上游服务器发送 ngx_http_upstream_t 结构体
                   * 中的 request_bufs 链表,这个方法对于发送缓冲区构成的 ngx_chain_t 链表
                   * 非常有用,它会把未发送完成的链表缓冲区保存下来,这样就不用每次调用时
                   * 都携带上 request_bufs 链表。*/
           return ngx_output_chain(&u->output, out);
        }
    
        if (!u->request_sent) {
            u->request_sent = 1;
            out = u->request_bufs;
    
            if (r->request_body->bufs) {
                for (cl = out; cl->next; cl = out->next) { /* void */ }
                cl->next = r->request_body->bufs;
                r->request_body->bufs = NULL;
            }
    
            c = u->peer.connection;
            clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
    
            if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
                ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0, "tcp_nodelay");
    
                tcp_nodelay = 1;
    
                if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
                               (const void *) &tcp_nodelay, sizeof(int)) == -1)
                {
                    ngx_connection_error(c, ngx_socket_errno,
                                         "setsockopt(TCP_NODELAY) failed");
                    return NGX_ERROR;
                }
    
                c->tcp_nodelay = NGX_TCP_NODELAY_SET;
            }
    
            r->read_event_handler = ngx_http_upstream_read_request_handler;// 每次有请求数据来时,先发送后读取
    
        } else {
             /* 进入到这里表示之前的请求数据还未发送完全,现在是再次触发
             * 写事件时,将未发送完全的数据发送该上游服务器,这里将 out
             * 置为 NULL,表示下面的 ngx_output_chain 函数不需要在传递参数 */
            out = NULL;
        }
    
        for ( ;; ) {
    
            if (do_write) {
                rc = ngx_output_chain(&u->output, out);
    
                if (rc == NGX_ERROR) {
                    return NGX_ERROR;//send error return 
                
                }
    
                while (out) { // free 归还 free mem
                    ln = out;
                    out = out->next;
                    ngx_free_chain(r->pool, ln);
                }
    
                if (rc == NGX_OK && !r->reading_body) {
                    break;
                }
            }
    
            if (r->reading_body) {
                /* read client request body   也就是 请求缓存是否已经读玩 */
    
                rc = ngx_http_read_unbuffered_request_body(r);
    
                if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
                    return rc;
                }
    
                out = r->request_body->bufs;
                r->request_body->bufs = NULL;
            }
    
            /* stop if there is nothing to send */
    
            if (out == NULL) {
                rc = NGX_AGAIN;
                break;
            }
            //out != NULL 继续向上游发送数据
            do_write = 1;
        }
    
        if (!r->reading_body) {// 不懂------------不明白
            if (!u->store && !r->post_action && !u->conf->ignore_client_abort) {
                r->read_event_handler =
                                      ngx_http_upstream_rd_check_broken_connection;
            }
        }
    
        return rc;
    }

    发送状态保存 :发送需要多次才能完成,所以发送时需要有缓存以及状态机记录发送进展

    /* 
    函数目的是发送 in 中的数据,ctx 用来保存发送的上下文,因为发送通常情况下,不能一次完成。
    nginx 因为使用了 ET 模式,在网络编程事件管理上简单了,但是编程中处理事件复杂了,
    需要不停的循环做处理;事件的函数回调,次数也不确定,因此需要使用 context 上下文对象来保存发送到什么环节了。
    注意:这里的in实际上是已经指向数据内容部分,或者如果发送的数据需要从文件中读取,
    in中也会指定文件file_pos和file_last已经文件fd等,
       可以参考ngx_http_cache_send ngx_http_send_header ngx_http_output_filter */
    ngx_output_chain(ngx_output_chain_ctx_t *ctx, ngx_chain_t *in)  
    //in为需要发送的chain链,上面存储的是实际要发送的数据
    {//ctx为&u->output, in为u->request_bufs这里nginx filter的主要逻辑都在这个函数里面,
    //将in参数链表的缓冲块拷贝到
    //ctx->in,然后将ctx->in的数据拷贝到out,然后调用output_filter发送出去。
    
    //如果读取后端数据发往客户端,默认流程是
    //ngx_event_pipe->ngx_event_pipe_write_to_downstream->p->output_filter(p->output_ctx, p->out);走到这里
        off_t         bsize;
        ngx_int_t     rc, last;
        ngx_chain_t  *cl, *out, **last_out;
    
        ngx_uint_t sendfile = ctx->sendfile;
        ngx_uint_t aio = ctx->aio;
        ngx_uint_t directio = ctx->directio;
        
        ngx_log_debugall(ctx->pool->log, 0, "ctx->sendfile:%ui, ctx->aio:%ui, ctx->directio:%ui", sendfile, aio, directio);
        if (ctx->in == NULL && ctx->busy == NULL) //in是待发送的数据,busy是已经调用ngx_chain_writer但还没有发送完毕。
        {
            /*
             * the short path for the case when the ctx->in and ctx->busy chains
             * are empty, the incoming chain is empty too or has the single buf
             * that does not require the copy
             */
    // 要发送的 buf 只有一个,不需要复制
            if (in == NULL) { //如果要发送的数据为空,也就是啥也不用发送。那就直接调用output_filter的了。
                ngx_log_debugall(ctx->pool->log, 0, "ngx output chain, in = NULL");
                return ctx->output_filter(ctx->filter_ctx, in);
            }
    
            if (in->next == NULL //说明发送buf只有一个
    #if (NGX_SENDFILE_LIMIT)
                && !(in->buf->in_file && in->buf->file_last > NGX_SENDFILE_LIMIT)
    #endif
                && ngx_output_chain_as_is(ctx, in->buf)) //这个函数主要用来判断是否需要复制buf。返回1,表示不需要拷贝,否则为需要拷贝 
            {
                ngx_log_debugall(ctx->pool->log, 0, "only one chain buf to output_filter");
                return ctx->output_filter(ctx->filter_ctx, in);
            }
        }
    
        /* add the incoming buf to the chain ctx->in */
       
        if (in) {//拷贝一份数据到ctx->in里面,需要老老实实的进行数据拷贝了。将in参数里面的数据拷贝到ctx->in里面。换了个in
            if (ngx_output_chain_add_copy(ctx->pool, &ctx->in, in) == NGX_ERROR) {
                return NGX_ERROR;
            }
        }
    
        /* out为最终需要传输的chain,也就是交给剩下的filter处理的chain */  
        out = NULL;
        last_out = &out; //下面遍历ctx->in链中的数据并且添加到该last_out中,也就是添加到out链中
        last = NGX_NONE;
        //到现在了,in参数的缓冲链表已经放在了ctx->in里面了。下面准备发送吧。
    
        for ( ;; ) { //循环读取缓存中或者内存中的数据发送
            //结合ngx_http_xxx_create_request(ngx_http_fastcgi_create_request)阅读,ctx->in中的数据实际上是从ngx_http_xxx_create_request组成ngx_chain_t来的,数据来源在ngx_http_xxx_create_request
            while (ctx->in) {//遍历所有待发送的数据。将他们一个个拷贝到out指向的链表中
    // 遍历 ctx->in chain 列表,处理 in,只会处理一次,如果发送不完数据,下次再进入函数,ctx->in 就是空
                /*
                 * cycle while there are the ctx->in bufs
                 * and there are the free output bufs to copy in
                 */
    
                bsize = ngx_buf_size(ctx->in->buf);
                //这块内存大小为0,然后又不是special 可能有问题。 如果是special的buf,应该是从ngx_http_send_special过来的
                if (bsize == 0 && !ngx_buf_special(ctx->in->buf)) {
                    ngx_debug_point();
                    ctx->in = ctx->in->next;
                    continue;
                }
                /* 判断是否需要复制buf */    
                if (ngx_output_chain_as_is(ctx, ctx->in->buf)) {
                    //把ctx->in->buf从ctx->in上面取下来,然后加入到lst_out链表中
                    /* move the chain link to the output chain */
                    /* 如果不需要复制,则直接链接chain到out,然后继续循环 */ 
                    cl = ctx->in;
                    ctx->in = cl->next; //已经赋值的会从ctx->in上面摘掉
    
                    *last_out = cl;
                    last_out = &cl->next;
                    cl->next = NULL;
    
                    continue;
                }
    
    
    //注意从后端接收的数据到缓存文件中后,在filter模块中,有可能是新的buf数据指针了,因为ngx_http_copy_filter->ngx_output_chain中会重新分配内存读取缓存文件内容
    
                //如果是需要赋值buf(一般都是sendfile的时候),用户空间内存里面没有数据,所以需要开辟空间来把文件中的内容赋值一份出来
                
                /* 到达这里,说明我们需要拷贝buf,这里buf最终都会被拷贝进ctx->buf中, 因此这里先判断ctx->buf是否为空 */ 
                if (ctx->buf == NULL) { //每次拷贝数据前,先给ctx->buf分配空间,在下面的ngx_output_chain_get_buf函数中
    
                    /* 如果为空,则取得buf,这里要注意,一般来说如果没有开启directio的话,这个函数都会返回NGX_DECLINED */  
                    rc = ngx_output_chain_align_file_buf(ctx, bsize);
    
                    if (rc == NGX_ERROR) {
                        return NGX_ERROR;
                    }
    
                    if (rc != NGX_OK) {
    
                        if (ctx->free) {
    
                            /* get the free buf */
    
                            cl = ctx->free;
                            /* 得到free buf */    
                            ctx->buf = cl->buf;
                            ctx->free = cl->next;
                            /* 将要重用的chain链接到ctx->poll中,以便于chain的重用 */  
                            ngx_free_chain(ctx->pool, cl);
    
                        } else if (out || ctx->allocated == ctx->bufs.num) {//output_buffers 1 32768都用完了
                       /* 
                            如果已经等于buf的个数限制,则跳出循环,发送已经存在的buf。 这里可以看到如果out存在的话,nginx会跳出循环,然后发送out,
                            等发送完会再次处理,这里很好的体现了nginx的流式处理 
                            */ 
                            break;
    
                        } else if (ngx_output_chain_get_buf(ctx, bsize) != NGX_OK) {/* 上面这个函数也比较关键,它用来取得buf。接下来会详细看这个函数 */
                            //该函数获取到的内存保存到ctx->buf中
                            return NGX_ERROR;
                        }
                    }
                }
                
                /* 从原来的buf中拷贝内容或者从原来的文件中读取内容 
                创建新的 chain 对象追加到 ctx->in 列表中,这些对象指向输入 in 中的 buf 对象
                */  //注意如果是aio on或者aio thread=poll方式返回的是NGX_AGAIN    
                rc = ngx_output_chain_copy_buf(ctx); //把ctx->in->buf中的内容赋值给ctx->buf
    //ngx_output_chain_copy_bufc中tx->in中的内存数据或者缓存文件数据会拷贝到dst中,也就是ctx->buf,然后在ngx_output_chain_copy_buf函数
    //外层会重新把ctx->buf赋值给新的chain,然后write出去 ,见下面的创建新chain
    
                if (rc == NGX_ERROR) {
                    return rc;
                }
    
                if (rc == NGX_AGAIN) { 
                //AIO是异步方式,由内核自行发送出去,应用层不用管,读取文件中数据完毕后epoll会触发执行ngx_file_aio_event_handler中执行ngx_http_copy_aio_event_handler,表示内核已经读取完毕
                    if (out) {
                        break;
                    }
    
                    return rc;
                }
    
                /* delete the completed buf from the ctx->in chain */
    
                if (ngx_buf_size(ctx->in->buf) == 0) {//这个节点大小为0,移动到下一个节点。
                    ctx->in = ctx->in->next;
                }
    
                cl = ngx_alloc_chain_link(ctx->pool);
                if (cl == NULL) {
                    return NGX_ERROR;
                }
                //把ngx_output_chain_copy_buf中从原src拷贝的内容赋值给cl->buf,然后添加到lst_out的头部  也就是添加到out后面
                cl->buf = ctx->buf;
                cl->next = NULL;
                *last_out = cl;
                last_out = &cl->next;
                ctx->buf = NULL;
    
                //注意这里没有continue;直接往后走
            }
    
            if (out == NULL && last != NGX_NONE) {
    
                if (ctx->in) {
                    return NGX_AGAIN;
                }
    
                return last;
            }
    
            last = ctx->output_filter(ctx->filter_ctx, out); //  ngx_chain_writer
    
            if (last == NGX_ERROR || last == NGX_DONE) {
                return last;
            }
    
            ngx_chain_update_chains(ctx->pool, &ctx->free, &ctx->busy, &out,
                                    ctx->tag);
            last_out = &out;
        }
    }

    对发送缓存的发送处理:

    //向后端发送请求的调用过程ngx_http_upstream_send_request_body->ngx_output_chain->ngx_chain_writer
    ngx_int_t
    ngx_chain_writer(void *data, ngx_chain_t *in)
    {
        ngx_chain_writer_ctx_t *ctx = data;
    
        off_t              size;
        ngx_chain_t       *cl, *ln, *chain;
        ngx_connection_t  *c;
    
        c = ctx->connection;
        /*下面的循环,将in里面的每一个链接节点,添加到ctx->filter_ctx所指的链表中。并记录这些in的链表的大小。*/
        for (size = 0; in; in = in->next) {
    #if 1
            if (ngx_buf_size(in->buf) == 0 && !ngx_buf_special(in->buf)) {
                ngx_debug_point();
                continue;
            }
    #endif
    
            size += ngx_buf_size(in->buf);
            ngx_log_debug2(NGX_LOG_DEBUG_CORE, c->log, 0,
                           "chain writer buf fl:%d s:%uO",
                           in->buf->flush, ngx_buf_size(in->buf));
    
            cl = ngx_alloc_chain_link(ctx->pool);
            if (cl == NULL) {
                return NGX_ERROR;
            }
    
            cl->buf = in->buf; //把in->buf赋值给新的cl->buf,
            cl->next = NULL;
            //下面这两句实际上就是把cl添加到ctx->out链表头中,
            *ctx->last = cl; 
            ctx->last = &cl->next; //向后移动last指针,指向新的最后一个节点的next变量地址。再次循环走到这里的时候,调用ctx->last=cl会把新的cl添加到out的尾部
        }
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
                       "chain writer in: %p", ctx->out);
                       
        //遍历刚刚准备的链表,并统计其大小,这是啥意思?ctx->out为链表头,所以这里遍历的是所有的。
        for (cl = ctx->out; cl; cl = cl->next) {
    #if 1
            if (ngx_buf_size(cl->buf) == 0 && !ngx_buf_special(cl->buf)) {
                ngx_debug_point();
    
                continue;
            }
    #endif
            size += ngx_buf_size(cl->buf);
        }
    
        if (size == 0 && !c->buffered) {//啥数据都么有,不用发了都
            return NGX_OK;
        }
    
        //调用writev将ctx->out的数据全部发送出去。如果没法送完,则返回没发送完毕的部分。记录到out里面
        //在ngx_event_connect_peer连接上游服务器的时候设置的发送链接函数ngx_send_chain = ngx_writev_chain      。
        chain = c->send_chain(c, ctx->out, ctx->limit); //ngx_send_chain->ngx_writev_chain  到后端的请求报文是不会走filter过滤模块的,而是直接调用ngx_writev_chain->ngx_writev发送到后端
        ngx_log_debug1(NGX_LOG_DEBUG_CORE, c->log, 0,
                       "chain writer out: %p", chain);
        if (chain == NGX_CHAIN_ERROR) {
            return NGX_ERROR;
        }
    
        for (cl = ctx->out; cl && cl != chain; /* void */) { //把ctx->out中已经全部发送出去的in节点从out链表摘除放入free中,重复利用
            ln = cl;
            cl = cl->next;
            ngx_free_chain(ctx->pool, ln);
        }
    
        ctx->out = chain; //ctx->out上面现在只剩下还没有发送出去的in节点了
    
        if (ctx->out == NULL) { //说明已经ctx->out链中的所有数据已经全部发送完成
            ctx->last = &ctx->out;
    
            if (!c->buffered) { 
            //发送到后端的请求报文之前buffered一直都没有操作过为0,如果是应答给客户端的响应,则buffered可能在进入ngx_http_write_filter调用
            //c->send_chain()之前已经有赋值给,发送给客户端包体的时候会经过所有的filter模块走到这里
                return NGX_OK;
            }
        }
    
        return NGX_AGAIN; //如果上面的chain = c->send_chain(c, ctx->out, ctx->limit)后,out中还有数据则返回NGX_AGAIN等待再次事件触发调度
    }
    
    
    ngx_chain_t *
    ngx_writev_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit)
    {//调用writev一次发送多个缓冲区,如果没有发送完毕,则返回剩下的链接结构头部。
    //ngx_chain_writer调用这里,调用方式为 ctx->out = c->send_chain(c, ctx->out, ctx->limit);
    //第二个参数为要发送的数据
        ssize_t        n, sent;
        off_t          send, prev_send;
        ngx_chain_t   *cl;
        ngx_event_t   *wev;
        ngx_iovec_t    vec;
        struct iovec   iovs[NGX_IOVS_PREALLOCATE];
    
        wev = c->write;//拿到这个连接的写事件结构
    
        if (!wev->ready) {//连接还没准备好,返回当前的节点。
            return in;
        }
        /* the maximum limit size is the maximum size_t value - the page size */
        if (limit == 0 || limit > (off_t) (NGX_MAX_SIZE_T_VALUE - ngx_pagesize)) {
            limit = NGX_MAX_SIZE_T_VALUE - ngx_pagesize;//够大了,最大的整数
        }
    
        send = 0;
    
        vec.iovs = iovs;
        vec.nalloc = NGX_IOVS_PREALLOCATE;
    
        for ( ;; ) {
            prev_send = send; //prev_send为上一次调用ngx_writev发送出去的字节数
            /* create the iovec and coalesce the neighbouring bufs */
            //把in链中的buf拷贝到vec->iovs[n++]中,注意只会拷贝内存中的数据到iovec中,不会拷贝文件中的
            cl = ngx_output_chain_to_iovec(&vec, in, limit - send, c->log);
            if (cl == NGX_CHAIN_ERROR) {
                return NGX_CHAIN_ERROR;
            }
                ngx_debug_point();
                return NGX_CHAIN_ERROR;
            }
    
            send += vec.size; //为ngx_output_chain_to_iovec中组包的in链中所有数据长度和
    
            n = ngx_writev(c, &vec); 
            //我期望发送vec->size字节数据,但是实际上内核发送出去的很可能比vec->size小,n为实际发送出去的字节数,因此需要继续发送
    
            if (n == NGX_ERROR) {
                return NGX_CHAIN_ERROR;
            }
            sent = (n == NGX_AGAIN) ? 0 : n;//记录发送的数据大小。
            c->sent += sent;//递增统计数据,这个链接上发送的数据大小
            in = ngx_chain_update_sent(in, sent); //send是此次调用ngx_wrtev发送成功的字节数
            //ngx_chain_update_sent返回后的in链已经不包括之前发送成功的in节点了,这上面只包含剩余的数据       
            if (send - prev_send != sent) { //这里说明最多调用ngx_writev两次成功发送后,这里就会返回
                wev->ready = 0; //标记暂时不能发送数据了,必须重新epoll_add写事件
                return in;
            }
            if (send >= limit || in == NULL) { //数据发送完毕,或者本次发送成功的字节数比limit还多,则返回出去
                return in; //
            }
        }
    }
  • 相关阅读:
    (14) go 结构体
    (13) go map
    (12) go make初始化
    (11)go 数组和切片
    (10) go 错误
    (9) go 时间日期
    (8)go 字符串
    (7) go 函数
    (6) go 流程控制
    (5) go 格式化输入输出 类型转换
  • 原文地址:https://www.cnblogs.com/codestack/p/13897700.html
Copyright © 2011-2022 走看看