zoukankan      html  css  js  c++  java
  • golang http/transport 代码分析

    请结合源码阅读,本文只是总结一下,源码里有详细的注释。基于:go1.12.4

    http.Client 表示一个http client端,用来处理HTTP相关的工作,例如cookies, redirect, timeout等工作,其内部包含一个Transport,为RountTripper interface类型。

    type Client struct {
    	// Transport specifies the mechanism by which individual
    	// HTTP requests are made.
    	// If nil, DefaultTransport is used.
    	Transport RoundTripper
        ...
    }
    

    RountTripper定义了执行一次http请求时,如何根据reueqest返回response,它必须是支持并发的一个结构体,允许多个groutine同时调用:

    type RoundTripper interface {
    	RoundTrip(*Request) (*Response, error)
    }
    

    如果不给http.Client显式指定RoundTripper则会创建一个默认的DefaultTransport。Transport是用来保存多个请求过程中的一些状态,用来缓存tcp连接,客户可以重用这些连接,防止每次新建,transport需要同时支持http, https, 并且需要http/1.1, http/2。DefaultTransport默认就支持http/2.0,如果需要显式指定则调用ConfigureTransport

    transport必须实现interface中的roundTrip方法:

    
    // roundTrip implements a RoundTripper over HTTP.
    func (t *Transport) roundTrip(req *Request) (*Response, error) {
        ...
    	for {
    		select {
    		case <-ctx.Done():
    			req.closeBody()
    			return nil, ctx.Err()
    		default:
    		}
    
    		// treq gets modified by roundTrip, so we need to recreate for each retry.
    		treq := &transportRequest{Request: req, trace: trace}
    		cm, err := t.connectMethodForRequest(treq)
    		if err != nil {
    			req.closeBody()
    			return nil, err
    		}
    
            // 获取一个连接
    		// Get the cached or newly-created connection to either the
    		// host (for http or https), the http proxy, or the http proxy
    		// pre-CONNECTed to https server. In any case, we'll be ready
    		// to send it requests.
    		pconn, err := t.getConn(treq, cm)
    		if err != nil {
    			t.setReqCanceler(req, nil)
    			req.closeBody()
    			return nil, err
    		}
    
    		var resp *Response
    		if pconn.alt != nil {
    			// HTTP/2 path.
    			t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host
    			t.setReqCanceler(req, nil)   // not cancelable with CancelRequest
    			resp, err = pconn.alt.RoundTrip(req)
    		} else {
                // 开始调用该pconn的rountTrip方法取得response
    			resp, err = pconn.roundTrip(treq)
    		}
    		if err == nil {
    			return resp, nil
    		}
    		if !pconn.shouldRetryRequest(req, err) {
    			// Issue 16465: return underlying net.Conn.Read error from peek,
    			// as we've historically done.
    			if e, ok := err.(transportReadFromServerError); ok {
    				err = e.err
    			}
    			return nil, err
    		}
    		testHookRoundTripRetried()
    
    		// Rewind the body if we're able to.
    		if req.GetBody != nil {
    			newReq := *req
    			var err error
    			newReq.Body, err = req.GetBody()
    			if err != nil {
    				return nil, err
    			}
    			req = &newReq
    		}
    	}
    }
    

    roundTrip其实就是通过getConn用于获取一个连接persisConn并调用其roundTrip方法返回repsonse。其中getConn的实现如下:

    // getConn dials and creates a new persistConn to the target as
    // specified in the connectMethod. This includes doing a proxy CONNECT
    // and/or setting up TLS.  If this doesn't return an error, the persistConn
    // is ready to write requests to.
    func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {
    	req := treq.Request
    	trace := treq.trace
    	ctx := req.Context()
    	if trace != nil && trace.GetConn != nil {
    		trace.GetConn(cm.addr())
        }
        // 首先从idleConn空闲连接池中尝试获取闲置的连接
    	if pc, idleSince := t.getIdleConn(cm); pc != nil {
    		if trace != nil && trace.GotConn != nil {
    			trace.GotConn(pc.gotIdleConnTrace(idleSince))
    		}
    		// set request canceler to some non-nil function so we
    		// can detect whether it was cleared between now and when
    		// we enter roundTrip
    		t.setReqCanceler(req, func(error) {})
    		return pc, nil
    	}
    
    	type dialRes struct {
    		pc  *persistConn
    		err error
        }
    	dialc := make(chan dialRes)     // 连接创建完成之后会从该管道异步通知
    	cmKey := cm.key()               // 标识一个连接的key
    
    	// Copy these hooks so we don't race on the postPendingDial in
    	// the goroutine we launch. Issue 11136.
    	testHookPrePendingDial := testHookPrePendingDial
    	testHookPostPendingDial := testHookPostPendingDial
    
    	handlePendingDial := func() {
    		testHookPrePendingDial()
    		go func() {
    			if v := <-dialc; v.err == nil {
    				t.putOrCloseIdleConn(v.pc)
    			} else {
    				t.decHostConnCount(cmKey)
    			}
    			testHookPostPendingDial()
    		}()
    	}
    
    	cancelc := make(chan error, 1)
    	t.setReqCanceler(req, func(err error) { cancelc <- err })
    
        // 一边增加记录的连接数,一边尝试获取连接,一边监听取消事件
    	if t.MaxConnsPerHost > 0 {
    		select {
    		case <-t.incHostConnCount(cmKey):
    			// count below conn per host limit; proceed
    		case pc := <-t.getIdleConnCh(cm):
    			if trace != nil && trace.GotConn != nil {
    				trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
    			}
    			return pc, nil
    		case <-req.Cancel:
    			return nil, errRequestCanceledConn
    		case <-req.Context().Done():
    			return nil, req.Context().Err()
    		case err := <-cancelc:
    			if err == errRequestCanceled {
    				err = errRequestCanceledConn
    			}
    			return nil, err
    		}
    	}
    
        // 异步发起连接操作
    	go func() {
    		pc, err := t.dialConn(ctx, cm)
    		dialc <- dialRes{pc, err}
    	}()
    
        // 监听多个事件来源
        // 1. 新创建成功
        // 2. 其它连接结束,闲置连接池中有连接可以复用
        // 3. 连接被取消
        // 第一种情况和第二种情况谁先成功就直接返回
        // 除了新建连接成功,其它所有情况都需要处理调用`handlePendingDial`,该函数决定新建连接返回后该如何处理
    	idleConnCh := t.getIdleConnCh(cm)
    	select {
    	case v := <-dialc:  // 如果新建连接结束后会从该channel发送过来
    		// Our dial finished.
    		if v.pc != nil {
    			if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
    				trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
    			}
    			return v.pc, nil
    		}
    		// Our dial failed. See why to return a nicer error
    		// value.
    		t.decHostConnCount(cmKey)
    		select {
    		case <-req.Cancel:
    			// It was an error due to cancelation, so prioritize that
    			// error value. (Issue 16049)
    			return nil, errRequestCanceledConn
    		case <-req.Context().Done():
    			return nil, req.Context().Err()
    		case err := <-cancelc:
    			if err == errRequestCanceled {
    				err = errRequestCanceledConn
    			}
    			return nil, err
    		default:
    			// It wasn't an error due to cancelation, so
    			// return the original error message:
    			return nil, v.err
    		}
    	case pc := <-idleConnCh:        // 如果从空闲连接池中有了可用的连接,直接返回
    		// Another request finished first and its net.Conn
    		// became available before our dial. Or somebody
    		// else's dial that they didn't use.
    		// But our dial is still going, so give it away
    		// when it finishes:
    		handlePendingDial()
    		if trace != nil && trace.GotConn != nil {
    			trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
    		}
    		return pc, nil
    	case <-req.Cancel:
    		handlePendingDial()
    		return nil, errRequestCanceledConn
    	case <-req.Context().Done():
    		handlePendingDial()
    		return nil, req.Context().Err()
    	case err := <-cancelc:
    		handlePendingDial()
    		if err == errRequestCanceled {
    			err = errRequestCanceledConn
    		}
    		return nil, err
    	}
    }
    

    getConn首先从空闲连接池中获取连接,如果没有,则新建连接。在新建过程中,如果连接池中有空闲连接则也复用空闲连接。
    继续看一下dialConn是如何建立连接的:

    func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
        // 注意这里初始化的各种channle
    	pconn := &persistConn{
    		t:             t,
    		cacheKey:      cm.key(),
    		reqch:         make(chan requestAndChan, 1),    // 用于给readLoop发送request
    		writech:       make(chan writeRequest, 1),      // 用于给writeLoop发送request
    		closech:       make(chan struct{}),             // 当连接关闭是用于传递信息
    		writeErrCh:    make(chan error, 1),             // 由writeLoop返回给roundTrip错误信息
    		writeLoopDone: make(chan struct{}),             // 当writeLoop结束的时候会关闭该channel
    	}
    	trace := httptrace.ContextClientTrace(ctx)
    	wrapErr := func(err error) error {
    		if cm.proxyURL != nil {
    			// Return a typed error, per Issue 16997
    			return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
    		}
    		return err
    	}
    
        conn, err := t.dial(ctx, "tcp", cm.addr())
        if err != nil {
            return nil, wrapErr(err)
        }
        pconn.conn = conn
    
    
        // 包装一个请求成另一个结构体,方便后续处理
    	if t.MaxConnsPerHost > 0 {
    		pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey}
        }
        // 包装读写conn并开启读取和写入groutine
    	pconn.br = bufio.NewReader(pconn)   
    	pconn.bw = bufio.NewWriter(persistConnWriter{pconn})
    	go pconn.readLoop()
    	go pconn.writeLoop()
    	return pconn, nil
    }
    
    

    可以看到首先调用dial函数,获取一个conn对象,然后封装为pconn的, 启动readLoop和wirteLoop后将该pconn返回。
    以readLoop为例,看看是如何从一个pc中读取response的:

    func (pc *persistConn) readLoop() {
        // 默认是失败,如果失败则进行处理,移除该连接,使用defer语句表示在程序退出的时候执行,也就是说该groutine在正常情况下不会退出,是个死循环,通过channel与其它groutine通信,处理请求
    	closeErr := errReadLoopExiting // default value, if not changed below
    	defer func() {
    		pc.close(closeErr)
    		pc.t.removeIdleConn(pc)
    	}()
    
        // 尝试将该连接重新返回闲置连接池
    	tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
    		if err := pc.t.tryPutIdleConn(pc); err != nil {
    			closeErr = err
    			if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled {
    				trace.PutIdleConn(err)
    			}
    			return false
    		}
    		if trace != nil && trace.PutIdleConn != nil {
    			trace.PutIdleConn(nil)
    		}
    		return true
    	}
    
        // 用来保证先后次序,先归还连接再读取response.Body
    	// eofc is used to block caller goroutines reading from Response.Body
    	// at EOF until this goroutines has (potentially) added the connection
    	// back to the idle pool.
    	eofc := make(chan struct{})
    	defer close(eofc) // unblock reader on errors
    
    	// Read this once, before loop starts. (to avoid races in tests)
    	testHookMu.Lock()
    	testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead
    	testHookMu.Unlock()
    
    	alive := true
    	for alive {
    		pc.readLimit = pc.maxHeaderResponseSize()
    		_, err := pc.br.Peek(1)
    
    		pc.mu.Lock()
    		if pc.numExpectedResponses == 0 {
    			pc.readLoopPeekFailLocked(err)
    			pc.mu.Unlock()
    			return
    		}
    		pc.mu.Unlock()
    
            // 获取一个新连接来处理
    		rc := <-pc.reqch
    		trace := httptrace.ContextClientTrace(rc.req.Context())
    
    		var resp *Response
    		if err == nil {
                // 读取返回结果
    			resp, err = pc.readResponse(rc, trace)
    		} else {
    			err = transportReadFromServerError{err}
    			closeErr = err
    		}
    
    		if err != nil {
    			if pc.readLimit <= 0 {
    				err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize())
    			}
    
    			select {
    			case rc.ch <- responseAndError{err: err}:
    			case <-rc.callerGone:
    				return
    			}
    			return
    		}
    		pc.readLimit = maxInt64 // effictively no limit for response bodies
    
    		pc.mu.Lock()
    		pc.numExpectedResponses--
    		pc.mu.Unlock()
    
    		bodyWritable := resp.bodyIsWritable()
    		hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0
    
    		if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable {
    			// Don't do keep-alive on error if either party requested a close
    			// or we get an unexpected informational (1xx) response.
    			// StatusCode 100 is already handled above.
    			alive = false
    		}
    
    		if !hasBody || bodyWritable {
    			pc.t.setReqCanceler(rc.req, nil)
    
    			// Put the idle conn back into the pool before we send the response
    			// so if they process it quickly and make another request, they'll
    			// get this same conn. But we use the unbuffered channel 'rc'
    			// to guarantee that persistConn.roundTrip got out of its select
    			// potentially waiting for this persistConn to close.
    			// but after
    			alive = alive &&
    				!pc.sawEOF &&
    				pc.wroteRequest() &&
    				tryPutIdleConn(trace)
    
    			if bodyWritable {
    				closeErr = errCallerOwnsConn
    			}
    
    			select {
    			case rc.ch <- responseAndError{res: resp}:
    			case <-rc.callerGone:
    				return
    			}
    
    			// Now that they've read from the unbuffered channel, they're safely
    			// out of the select that also waits on this goroutine to die, so
    			// we're allowed to exit now if needed (if alive is false)
    			testHookReadLoopBeforeNextRead()
    			continue
    		}
    
            // bodyEOFSignal实现了io.ReadCloser interface, 保证读取的时候,该response已经收到了eof
    		waitForBodyRead := make(chan bool, 2)
    		body := &bodyEOFSignal{
    			body: resp.Body,
    			earlyCloseFn: func() error {
    				waitForBodyRead <- false
    				<-eofc // will be closed by deferred call at the end of the function
    				return nil
    
    			},
    			fn: func(err error) error {
    				isEOF := err == io.EOF
    				waitForBodyRead <- isEOF
    				if isEOF {
    					<-eofc // see comment above eofc declaration
    				} else if err != nil {
    					if cerr := pc.canceled(); cerr != nil {
    						return cerr
    					}
    				}
    				return err
    			},
    		}
    
    		resp.Body = body
    		if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") {
    			resp.Body = &gzipReader{body: body}
    			resp.Header.Del("Content-Encoding")
    			resp.Header.Del("Content-Length")
    			resp.ContentLength = -1
    			resp.Uncompressed = true
    		}
    
    		select {
            // 将分装好的repsponse发送回去
    		case rc.ch <- responseAndError{res: resp}:
    		case <-rc.callerGone:
    			return
    		}
    
    		// Before looping back to the top of this function and peeking on
    		// the bufio.Reader, wait for the caller goroutine to finish
    		// reading the response body. (or for cancelation or death)
    		select {
    		case bodyEOF := <-waitForBodyRead:
    			pc.t.setReqCanceler(rc.req, nil) // before pc might return to idle pool
    			alive = alive &&
    				bodyEOF &&
    				!pc.sawEOF &&
    				pc.wroteRequest() &&
    				tryPutIdleConn(trace)
    			if bodyEOF {
    				eofc <- struct{}{}      //前面所有检查完毕,通知对端开始读取
    			}
    		case <-rc.req.Cancel:
    			alive = false
    			pc.t.CancelRequest(rc.req)
    		case <-rc.req.Context().Done():
    			alive = false
    			pc.t.cancelRequest(rc.req, rc.req.Context().Err())
    		case <-pc.closech:
    			alive = false
    		}
    
    		testHookReadLoopBeforeNextRead()
    	}
    }
    
    

    上面readLoop中从一个channel中读取出来需要处理的request, 然后读取readResponse并通过管道返回回去。那接受到的request是从哪个地方发送过来的呐?
    回到最开始的Transport.roundTrip函数里,它先调用getConn返回一个pconn后然后调用pconn.roundTrip方法,就是在这里面发送的,我们看看:

    func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
    	testHookEnterRoundTrip()
    	if !pc.t.replaceReqCanceler(req.Request, pc.cancelRequest) {
    		pc.t.putOrCloseIdleConn(pc)
    		return nil, errRequestCanceled
    	}
    	pc.mu.Lock()
    	pc.numExpectedResponses++
    	headerFn := pc.mutateHeaderFunc
    	pc.mu.Unlock()
    
    	if headerFn != nil {
    		headerFn(req.extraHeaders())
    	}
    
        ...
    
    	var continueCh chan struct{}
    	if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() {
    		continueCh = make(chan struct{}, 1)
    	}
    
    	if pc.t.DisableKeepAlives && !req.wantsClose() {
    		req.extraHeaders().Set("Connection", "close")
    	}
    
    	gone := make(chan struct{})
    	defer close(gone)
    
    	defer func() {
    		if err != nil {
    			pc.t.setReqCanceler(req.Request, nil)
    		}
    	}()
    
    	const debugRoundTrip = false
    
        // 通过writech发送该请求
    	// Write the request concurrently with waiting for a response,
    	// in case the server decides to reply before reading our full
    	// request body.
    	startBytesWritten := pc.nwrite
    	writeErrCh := make(chan error, 1)
    	pc.writech <- writeRequest{req, writeErrCh, continueCh}
    
        resc := make(chan responseAndError)
        // 发送当前正在处理的请求给readLoop,readLoop中从channle中读取出该请求,进行readResponse
        // 其中的requestAndChan.ch是response返回的channel
    	pc.reqch <- requestAndChan{
    		req:        req.Request,
    		ch:         resc,
    		addedGzip:  requestedGzip,
    		continueCh: continueCh,
    		callerGone: gone,
    	}
    
    	var respHeaderTimer <-chan time.Time
    	cancelChan := req.Request.Cancel
    	ctxDoneChan := req.Context().Done()
    	for {
    		testHookWaitResLoop()
    		select {
    		case err := <-writeErrCh:   // writeLoop出现错误
    			if debugRoundTrip {
    				req.logf("writeErrCh resv: %T/%#v", err, err)
    			}
    			if err != nil {
    				pc.close(fmt.Errorf("write error: %v", err))
    				return nil, pc.mapRoundTripError(req, startBytesWritten, err)
    			}
    			if d := pc.t.ResponseHeaderTimeout; d > 0 {
    				if debugRoundTrip {
    					req.logf("starting timer for %v", d)
    				}
    				timer := time.NewTimer(d)
    				defer timer.Stop() // prevent leaks
    				respHeaderTimer = timer.C
    			}
    		case <-pc.closech:
    			if debugRoundTrip {
    				req.logf("closech recv: %T %#v", pc.closed, pc.closed)
    			}
    			return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
    		case <-respHeaderTimer:
    			if debugRoundTrip {
    				req.logf("timeout waiting for response headers.")
    			}
    			pc.close(errTimeout)
    			return nil, errTimeout
    		case re := <-resc:          // readLoop会将读取的结果通过resc channel发送回来
    			if (re.res == nil) == (re.err == nil) {
    				panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
    			}
    			if debugRoundTrip {
    				req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
    			}
    			if re.err != nil {
    				return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
    			}
    			return re.res, nil
    		case <-cancelChan:
    			pc.t.CancelRequest(req.Request)
    			cancelChan = nil
    		case <-ctxDoneChan:
    			pc.t.cancelRequest(req.Request, req.Context().Err())
    			cancelChan = nil
    			ctxDoneChan = nil
    		}
    	}
    }
    

    该函数中会将request进行封装,然后分别通过channel发送给readLoop和writeLoop,并监听各个channel的事件,分别进行不同的处理。

    整体流程走完之后,我们回顾一下两个比较重要的结构体:persistConnTransport的成员

    // persistConn wraps a connection, usually a persistent one
    // (but may be used for non-keep-alive requests as well)
    type persistConn struct {
    	// alt optionally specifies the TLS NextProto RoundTripper.
    	// This is used for HTTP/2 today and future protocols later.
    	// If it's non-nil, the rest of the fields are unused.
    	alt RoundTripper
    
    	t         *Transport
    	cacheKey  connectMethodKey        // 当前连接对应的key, 也是idleConns map中的key
    	conn      net.Conn                  // 被封装的conn对象
    	tlsState  *tls.ConnectionState  
    	br        *bufio.Reader       // from conn      // bufio.Reader 对象,封装conn
    	bw        *bufio.Writer       // to conn        // bufio.Writer 对象,封装conn
    	nwrite    int64               // bytes written  // 记录写入的长度
    	reqch     chan requestAndChan // written by roundTrip; read by readLoop // rountTrip在创建一个请求的时候会讲请求通过该chenel发送给readLoop,  readLoop后面解释
    	writech   chan writeRequest   // written by roundTrip; read by writeLoop    // writeTrop 从中读取写入请求并执行写入
    	closech   chan struct{}       // closed when conn closed                    // 连接关闭的时候从该channle通信
    	isProxy   bool
    	sawEOF    bool  // whether we've seen EOF from conn; owned by readLoop
    	readLimit int64 // bytes allowed to be read; owned by readLoop
    	// writeErrCh passes the request write error (usually nil)
    	// from the writeLoop goroutine to the readLoop which passes
    	// it off to the res.Body reader, which then uses it to decide
    	// whether or not a connection can be reused. Issue 7569.
    	writeErrCh chan error                                       // 
    
    	writeLoopDone chan struct{} // closed when write loop ends
    
    	// Both guarded by Transport.idleMu:
    	idleAt    time.Time   // time it last become idle
    	idleTimer *time.Timer // holding an AfterFunc to close it
    
    	mu                   sync.Mutex // guards following fields
    	numExpectedResponses int            //表示当期期望的返回response数目
    	closed               error // set non-nil when conn is closed, before closech is closed
    	canceledErr          error // set non-nil if conn is canceled
    	broken               bool  // an error has happened on this connection; marked broken so it's not reused.
    	reused               bool  // whether conn has had successful request/response and is being reused.
    	// mutateHeaderFunc is an optional func to modify extra
    	// headers on each outbound request before it's written. (the
    	// original Request given to RoundTrip is not modified)
    	mutateHeaderFunc func(Header)
    }
    
    type Transport struct {
    	idleMu     sync.Mutex       // 互斥锁,用于保护下面空闲连接池
    	wantIdle   bool                                // user has requested to close all idle conns// 标识是否idle
    	idleConn   map[connectMethodKey][]*persistConn // most recently used at end                 // 空闲连接池
    	idleConnCh map[connectMethodKey]chan *persistConn    // 用于在groutine中间传递空闲的连接,一般用于当连接池中没有连接,但是还有请求需要处理,当连接池中出现空闲连接时通过该channel通知
    	idleLRU    connLRU
    
    	reqMu       sync.Mutex
    	reqCanceler map[*Request]func(error)
    
    	altMu    sync.Mutex   // guards changing altProto only
    	altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme
    
    	connCountMu          sync.Mutex
    	connPerHostCount     map[connectMethodKey]int
    	connPerHostAvailable map[connectMethodKey]chan struct{}
    
    	// Proxy specifies a function to return a proxy for a given
    	// Request. If the function returns a non-nil error, the
    	// request is aborted with the provided error.
    	//
    	// The proxy type is determined by the URL scheme. "http",
    	// "https", and "socks5" are supported. If the scheme is empty,
    	// "http" is assumed.
    	//
    	// If Proxy is nil or returns a nil *URL, no proxy is used.
    	Proxy func(*Request) (*url.URL, error)
    
    	// DialContext specifies the dial function for creating unencrypted TCP connections.
    	// If DialContext is nil (and the deprecated Dial below is also nil),
    	// then the transport dials using package net.
    	//
    	// DialContext runs concurrently with calls to RoundTrip.
    	// A RoundTrip call that initiates a dial may end up using
    	// a connection dialed previously when the earlier connection
    	// becomes idle before the later DialContext completes.
    	DialContext func(ctx context.Context, network, addr string) (net.Conn, error)// 用于新建连接时使用
    
    	// Dial specifies the dial function for creating unencrypted TCP connections.
    	//
    	// Dial runs concurrently with calls to RoundTrip.
    	// A RoundTrip call that initiates a dial may end up using
    	// a connection dialed previously when the earlier connection
    	// becomes idle before the later Dial completes.
    	//
    	// Deprecated: Use DialContext instead, which allows the transport
    	// to cancel dials as soon as they are no longer needed.
    	// If both are set, DialContext takes priority.
    	Dial func(network, addr string) (net.Conn, error)
    
    	// DialTLS specifies an optional dial function for creating
    	// TLS connections for non-proxied HTTPS requests.
    	//
    	// If DialTLS is nil, Dial and TLSClientConfig are used.
    	//
    	// If DialTLS is set, the Dial hook is not used for HTTPS
    	// requests and the TLSClientConfig and TLSHandshakeTimeout
    	// are ignored. The returned net.Conn is assumed to already be
    	// past the TLS handshake.
    	DialTLS func(network, addr string) (net.Conn, error)
    
    	// TLSClientConfig specifies the TLS configuration to use with
    	// tls.Client.
    	// If nil, the default configuration is used.
    	// If non-nil, HTTP/2 support may not be enabled by default.
    	TLSClientConfig *tls.Config
    
    	// TLSHandshakeTimeout specifies the maximum amount of time waiting to
    	// wait for a TLS handshake. Zero means no timeout.
    	TLSHandshakeTimeout time.Duration
    
    	// DisableKeepAlives, if true, disables HTTP keep-alives and
    	// will only use the connection to the server for a single
    	// HTTP request.
    	//
    	// This is unrelated to the similarly named TCP keep-alives.
    	DisableKeepAlives bool
    
    	// DisableCompression, if true, prevents the Transport from
    	// requesting compression with an "Accept-Encoding: gzip"
    	// request header when the Request contains no existing
    	// Accept-Encoding value. If the Transport requests gzip on
    	// its own and gets a gzipped response, it's transparently
    	// decoded in the Response.Body. However, if the user
    	// explicitly requested gzip it is not automatically
    	// uncompressed.
    	DisableCompression bool
    
    	// MaxIdleConns controls the maximum number of idle (keep-alive)
    	// connections across all hosts. Zero means no limit.
    	MaxIdleConns int
    
    	// MaxIdleConnsPerHost, if non-zero, controls the maximum idle
    	// (keep-alive) connections to keep per-host. If zero,
    	// DefaultMaxIdleConnsPerHost is used.
    	MaxIdleConnsPerHost int
    
    	// MaxConnsPerHost optionally limits the total number of
    	// connections per host, including connections in the dialing,
    	// active, and idle states. On limit violation, dials will block.
    	//
    	// Zero means no limit.
    	//
    	// For HTTP/2, this currently only controls the number of new
    	// connections being created at a time, instead of the total
    	// number. In practice, hosts using HTTP/2 only have about one
    	// idle connection, though.
    	MaxConnsPerHost int
    
    	// IdleConnTimeout is the maximum amount of time an idle
    	// (keep-alive) connection will remain idle before closing
    	// itself.
    	// Zero means no limit.
    	IdleConnTimeout time.Duration
    
    	// ResponseHeaderTimeout, if non-zero, specifies the amount of
    	// time to wait for a server's response headers after fully
    	// writing the request (including its body, if any). This
    	// time does not include the time to read the response body.
    	ResponseHeaderTimeout time.Duration
    
    	// ExpectContinueTimeout, if non-zero, specifies the amount of
    	// time to wait for a server's first response headers after fully
    	// writing the request headers if the request has an
    	// "Expect: 100-continue" header. Zero means no timeout and
    	// causes the body to be sent immediately, without
    	// waiting for the server to approve.
    	// This time does not include the time to send the request header.
    	ExpectContinueTimeout time.Duration
    
    	// TLSNextProto specifies how the Transport switches to an
    	// alternate protocol (such as HTTP/2) after a TLS NPN/ALPN
    	// protocol negotiation. If Transport dials an TLS connection
    	// with a non-empty protocol name and TLSNextProto contains a
    	// map entry for that key (such as "h2"), then the func is
    	// called with the request's authority (such as "example.com"
    	// or "example.com:1234") and the TLS connection. The function
    	// must return a RoundTripper that then handles the request.
    	// If TLSNextProto is not nil, HTTP/2 support is not enabled
    	// automatically.
    	TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper
    
    	// ProxyConnectHeader optionally specifies headers to send to
    	// proxies during CONNECT requests.
    	ProxyConnectHeader Header
    
    	// MaxResponseHeaderBytes specifies a limit on how many
    	// response bytes are allowed in the server's response
    	// header.
    	//
    	// Zero means to use a default limit.
    	MaxResponseHeaderBytes int64
    
    	// nextProtoOnce guards initialization of TLSNextProto and
    	// h2transport (via onceSetNextProtoDefaults)
    	nextProtoOnce sync.Once
    	h2transport   h2Transport // non-nil if http2 wired up
    }
    

    如此便是整个流程,其实还是很清晰的,最后总结一下:
    tranport用来建立一个连接,其中维护了一个空闲连接池idleConn map[connectMethodKey][]*persistConn,其中的每个成员都是一个persistConn对象,persistConn是个具体的连接实例,包含了连接的上下文,会启动两个groutine分别执行readLoopwriteLoop, 每当transport调用roundTrip的时候,就会从连接池中选择一个空闲的persistConn,然后调用其roundTrip方法,将读写请求通过channel分别发送到readLoopwriteLoop中,然后会进行select各个channel的信息,包括连接关闭,请求超时,writeLoop出错, readLoop返回读取结果等。在writeLoop中发送请求,在readLoop中获取response并通过channe返回给roundTrip函数中,并再次将自己加入到idleConn中,等待下次请求到来。

    推荐阅读

    Go HTTP Client 持久连接

  • 相关阅读:
    初探Java设计模式2:结构型模式(代理模式,适配器模式等)
    初探Java设计模式1:创建型模式(工厂,单例等)
    Java集合详解8:Java集合类细节精讲,细节决定成败
    Java集合详解7:一文搞清楚HashSet,TreeSet与LinkedHashSet的异同
    MySQL教程67-使用DISTINCT过滤重复数据
    MySQL教程66-数据表查询语句
    MySQL教程65-MySQL操作表中数据
    MySQL教程64-MySQL 流程控制函数
    MySQL教程63-MySQL 聚合函数
    MySQL教程62-MySQL日期和时间函数
  • 原文地址:https://www.cnblogs.com/gaorong/p/10887021.html
Copyright © 2011-2022 走看看