zoukankan      html  css  js  c++  java
  • Golang 网络编程

    TCP网络编程

    存在的问题:

    • 拆包:
      • 对发送端来说应用程序写入的数据远大于socket缓冲区大小,不能一次性将这些数据发送到server端就会出现拆包的情况。
      • 通过网络传输的数据包最大是1500字节,当TCP报文的长度 - TCP头部的长度 > MSS(最大报文长度时)将会发生拆包,MSS一般长(1460~1480)字节。
    • 粘包:
      • 对发送端来说:应用程序发送的数据很小,远小于socket的缓冲区的大小,导致一个数据包里面有很多不通请求的数据。
      • 对接收端来说:接收数据的方法不能及时的读取socket缓冲区中的数据,导致缓冲区中积压了不同请求的数据。

    解决方法:

    • 使用带消息头的协议,在消息头中记录数据的长度。
    • 使用定长的协议,每次读取定长的内容,不够的使用空格补齐。
    • 使用消息边界,比如使用 分隔 不同的消息。
    • 使用诸如 xml json protobuf这种复杂的协议。

    实验:使用自定义协议

    整体的流程:

    客户端:发送端连接服务器,将要发送的数据通过编码器编码,发送。

    服务端:启动、监听端口、接收连接、将连接放在协程中处理、通过解码器解码数据。

    	//###########################
    //######  Server端代码  ###### 
    //###########################
    
    func main() {
    	// 1. 监听端口 2.accept连接 3.开goroutine处理连接
    	listen, err := net.Listen("tcp", "0.0.0.0:9090")
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	for{
    		conn, err := listen.Accept()
    		if err != nil {
    			fmt.Printf("Fail listen.Accept : %v", err)
    			continue
    		}
    		go ProcessConn(conn)
    	}
    }
    
    // 处理网络请求
    func ProcessConn(conn net.Conn) {
    	defer conn.Close()
    	for  {
    		bt,err:=coder.Decode(conn)
    		if err != nil {
    			fmt.Printf("Fail to decode error [%v]", err)
    			return
    		}
    		s := string(bt)
    		fmt.Printf("Read from conn:[%v]
    ",s)
    	}
    }
    
    //###########################
    //######  Clinet端代码  ###### 
    //###########################
    func main() {
    	conn, err := net.Dial("tcp", ":9090")
    	defer conn.Close()
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    
    	// 将数据编码并发送出去
    	coder.Encode(conn,"hi server i am here");
    }
    
    //###########################
    //######  编解码器代码  ###### 
    //###########################
    /**
     * 	解码:
     */
    func Decode(reader io.Reader) (bytes []byte, err error) {
    	// 先把消息头读出来
    	headerBuf := make([]byte, len(msgHeader))
    	if _, err = io.ReadFull(reader, headerBuf); err != nil {
    		fmt.Printf("Fail to read header from conn error:[%v]", err)
    		return nil, err
    	}
    	// 检验消息头
    	if string(headerBuf) != msgHeader {
    		err = errors.New("msgHeader error")
    		return nil, err
    	}
    	// 读取实际内容的长度
    	lengthBuf := make([]byte, 4)
    	if _, err = io.ReadFull(reader, lengthBuf); err != nil {
    		return nil, err
    	}
    	contentLength := binary.BigEndian.Uint32(lengthBuf)
    	contentBuf := make([]byte, contentLength)
    	// 读出消息体
    	if _, err := io.ReadFull(reader, contentBuf); err != nil {
    		return nil, err
    	}
    	return contentBuf, err
    }
    
    /**
     *  编码
     *  定义消息的格式: msgHeader + contentLength + content
     *  conn 本身实现了 io.Writer 接口
     */
    func Encode(conn io.Writer, content string) (err error) {
    	// 写入消息头
    	if err = binary.Write(conn, binary.BigEndian, []byte(msgHeader)); err != nil {
    		fmt.Printf("Fail to write msgHeader to conn,err:[%v]", err)
    	}
    	// 写入消息体长度
    	contentLength := int32(len([]byte(content)))
    	if err = binary.Write(conn, binary.BigEndian, contentLength); err != nil {
    		fmt.Printf("Fail to write contentLength to conn,err:[%v]", err)
    	}
    	// 写入消息
    	if err = binary.Write(conn, binary.BigEndian, []byte(content)); err != nil {
    		fmt.Printf("Fail to write content to conn,err:[%v]", err)
    	}
    	return err
    
    

    客户端的conn一直不被Close 有什么表现?

    四次挥手各个状态的如下:

    主从关闭方						被动关闭方
    established					established
    Fin-wait1					
    										closeWait
    Fin-wait2
    Tiem-wait						lastAck
    Closed							Closed
    

    如果客户端的连接手动的关闭,它和服务端的状态会一直保持established建立连接中的状态。

    MacBook-Pro% netstat -aln | grep 9090
    tcp4       0      0  127.0.0.1.9090         127.0.0.1.62348        ESTABLISHED
    tcp4       0      0  127.0.0.1.62348        127.0.0.1.9090         ESTABLISHED
    tcp46      0      0  *.9090                 *.*                    LISTEN
    

    服务端的conn一直不被关闭 有什么表现?

    客户端的进程结束后,会发送fin数据包给服务端,向服务端请求断开连接。

    服务端的conn不关闭的话,服务端就会停留在四次挥手的close_wait阶段(我们不手动Close,服务端就任务还有数据/任务没处理完,因此它不关闭)。

    客户端停留在 fin_wait2的阶段(在这个阶段等着服务端告诉自己可以真正断开连接的消息)。

    MacBook-Pro% netstat -aln | grep 9090
    tcp4       0      0  127.0.0.1.9090         127.0.0.1.62888        CLOSE_WAIT
    tcp4       0      0  127.0.0.1.62888        127.0.0.1.9090         FIN_WAIT_2
    tcp46      0      0  *.9090                 *.*                    LISTEN
    

    什么是binary.BigEndian?什么是binary.LittleEndian?

    对计算机来说一切都是二进制的数据,BigEndian和LittleEndian描述的就是二进制数据的字节顺序。计算机内部,小端序被广泛应用于现代性 CPU 内部存储数据;大端序常用于网络传输和文件存储。

    比如:

    一个数的二进制表示为 	 0x12345678
    BigEndian   表示为: 0x12 0x34 0x56 0x78 
    LittleEndian表示为: 0x78 0x56 0x34 0x12
    

    UDP网络编程

    思路:

    UDP服务器:1、监听 2、循环读取消息 3、回复数据。

    UDP客户端:1、连接服务器 2、发送消息 3、接收消息。

    // ################################
    // ######## UDPServer #########
    // ################################
    func main() {
    	// 1. 监听端口 2.accept连接 3.开goroutine处理连接
    	listen, err := net.Listen("tcp", "0.0.0.0:9090")
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	for{
    		conn, err := listen.Accept()
    		if err != nil {
    			fmt.Printf("Fail listen.Accept : %v", err)
    			continue
    		}
    		go ProcessConn(conn)
    	}
    }
    
    // 处理网络请求
    func ProcessConn(conn net.Conn) {
    	defer conn.Close()
    	for  {
    		bt,err:= coder.Decode(conn)
    		if err != nil {
    			fmt.Printf("Fail to decode error [%v]", err)
    			return
    		}
    		s := string(bt)
    		fmt.Printf("Read from conn:[%v]
    ",s)
    	}
    }
    
    // ################################
    // ######## UDPClient #########
    // ################################
    func main() {
    
    	udpConn, err := net.DialUDP("udp", nil, &net.UDPAddr{
    		IP:   net.IPv4(127, 0, 0, 1),
    		Port: 9091,
    	})
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    
    	_, err = udpConn.Write([]byte("i am udp client"))
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	bytes:=make([]byte,1024)
    	num, addr, err := udpConn.ReadFromUDP(bytes)
    	if err != nil {
    		fmt.Printf("Fail to read from udp error: [%v]", err)
    		return
    	}
    	fmt.Printf("Recieve from udp address:[%v], bytes:[%v], content:[%v]",addr,num,string(bytes))
    }
    

    Http网络编程

    思路整理:

    HttpServer:1、创建路由器。2、为路由器绑定路由规则。3、创建服务器、监听端口。 4启动读服务。

    HttpClient: 1、创建连接池。2、创建客户端,绑定连接池。3、发送请求。4、读取响应。

    func main() {
    	mux := http.NewServeMux()
    	mux.HandleFunc("/login", doLogin)
    	server := &http.Server{
    		Addr:         ":8081",
    		WriteTimeout: time.Second * 2,
    		Handler:      mux,
    	}
    	log.Fatal(server.ListenAndServe())
    }
    
    func doLogin(writer http.ResponseWriter,req *http.Request){
    	_, err := writer.Write([]byte("do login"))
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    }
    

    HttpClient端

    func main() {
    	transport := &http.Transport{
        // 拨号的上下文
    		DialContext: (&net.Dialer{
    			Timeout:   30 * time.Second, // 拨号建立连接时的超时时间
    			KeepAlive: 30 * time.Second, // 长连接存活的时间
    		}).DialContext,
        // 最大空闲连接数
    		MaxIdleConns:          100,  
        // 超过最大的空闲连接数的连接,经过 IdleConnTimeout时间后会失效
    		IdleConnTimeout:       10 * time.Second, 
        // https使用了SSL安全证书,TSL是SSL的升级版
        // 当我们使用https时,这行配置生效
    		TLSHandshakeTimeout:   10 * time.Second, 
    		ExpectContinueTimeout: 1 * time.Second,  // 100-continue 状态码超时时间
    	}
    
    	// 创建客户端
    	client := &http.Client{
    		Timeout:   time.Second * 10, //请求超时时间
    		Transport: transport,
    	}
    
    	// 请求数据
    	res, err := client.Get("http://localhost:8081/login")
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	defer res.Body.Close()
    
    	bytes, err := ioutil.ReadAll(res.Body)
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	fmt.Printf("Read from http server res:[%v]", string(bytes))
    }
    

    理解函数是一等公民

    点击查看在github中函数相关的笔记

    在golang中函数是一等公民,我们可以把一个函数当作普通变量一样使用。

    比如我们有个函数HelloHandle,我们可以直接使用它。

    func HelloHandle(name string, age int) {
    	fmt.Printf("name:[%v] age:[%v]", name, age)
    }
    
    func main() {
      HelloHandle("tom",12)
    }
    

    闭包

    如何理解闭包:闭包本质上是一个函数,而且这个函数会引用它外部的变量,如下例子中的f3中的匿名函数本身就是一个闭包。 通常我们使用闭包起到一个适配的作用。

    例1:

    // f2是一个普通函数,有两个入参数
    func f2() {
    	fmt.Printf("f2222")
    }
    
    // f1函数的入参是一个f2类型的函数
    func f1(f2 func()) {
    	f2()
    }
    
    func main() {
      // 由于golang中函数是一等公民,所以我们可以把f2同普通变量一般传递给f1
    	f1(f2)
    }
    

    例2: 在上例中更进一步。f2有了自己的参数, 这时就不能直接把f2传递给f1了。

    总不能傻傻的这样吧f1(f2(1,2)) ???

    而闭包就能解决这个问题。

    // f2是一个普通函数,有两个入参数
    func f2(x int, y int) {
    	fmt.Println("this is f2 start")
    	fmt.Printf("x: %d y: %d 
    ", x, y)
    	fmt.Println("this is f2 end")
    }
    
    // f1函数的入参是一个f2类型的函数
    func f1(f2 func()) {
    	fmt.Println("this is f1 will call f2")
    	f2()
    	fmt.Println("this is f1 finished call f2")
    }
    
    // 接受一个两个参数的函数, 返回一个包装函数
    func f3(f func(int,int) ,x,y int) func() {
    	fun := func() {
    		f(x,y)
    	}
    	return fun
    }
    
    func main() {
    	// 目标是实现如下的传递与调用
    	f1(f3(f2,6,6))
    }
    

    实现方法的回调:

    下面的例子中实现这样的功能:就好像是我设计了一个框架,定好了整个框架运转的流程(或者说是提供了一个编程模版),框架具体做事的函数你根据自己的需求自己实现,我的框架只是负责帮你回调你具体的方法。

    // 自定义类型,handler本质上是一个函数
    type HandlerFunc func(string, int)
    
    // 闭包
    func (f HandlerFunc) Serve(name string, age int) {
    	f(name, age)
    }
    
    // 具体的处理函数
    func HelloHandle(name string, age int) {
    	fmt.Printf("name:[%v] age:[%v]", name, age)
    }
    
    func main() {
      // 把HelloHandle转换进自定义的func中
    	handlerFunc := HandlerFunc(HelloHandle)
      // 本质上会去回调HelloHandle方法
    	handlerFunc.Serve("tom", 12)
      
      // 上面两行效果 == 下面这行
      // 只不过上面的代码是我在帮你回调,下面的是你自己主动调用
      HelloHandle("tom",12)
    }
    

    HttpServer源码阅读

    注册路由

    直观上看注册路由这一步,就是它要做的就是将在路由器url pattern和开发者提供的func关联起来。 很容易想到,它里面很可能是通过map实现的。

    
    func main() {
    	// 创建路由器
    	// 为路由器绑定路由规则
    	mux := http.NewServeMux()
    	mux.HandleFunc("/login", doLogin)
    	...
    }
    
    func doLogin(writer http.ResponseWriter,req *http.Request){
    	_, err := writer.Write([]byte("do login"))
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    }
    

    姑且将ServeMux当作是路由器。我们使用http包下的 NewServerMux 函数创建一个新的路由器对象,进而使用它的HandleFunc(pattern,func)函数完成路由的注册。

    跟进NewServerMux函数,可以看到,它通过new函数返回给我们一个ServeMux结构体。

    func NewServeMux() *ServeMux {
      return new(ServeMux) 
    }
    

    这个ServeMux结构体长下面这样:在这个ServeMux结构体中我们就看到了这个维护pattern和func的map

    type ServeMux struct {
    	mu    sync.RWMutex 
    	m     map[string]muxEntry
    	hosts bool // whether any patterns contain hostnames
    }
    

    这个muxEntry长下面这样:

    type muxEntry struct {
    	h       Handler
    	pattern string
    }
    
    type Handler interface {
    	ServeHTTP(ResponseWriter, *Request)
    }
    

    image-20200627161641153

    看到这里问题就来了,上面我们手动注册进路由器中的仅仅是一个有规定参数的方法,到这里怎么成了一个Handle了?我们也没有说去手动的实现Handler这个接口,也没有重写ServeHTTP函数啊, 在golang中实现一个接口不得像下面这样搞吗?**

    type Handle interface {
    	Serve(string, int, string)
    }
    
    type HandleImpl struct {
    
    }
    
    func (h HandleImpl)Serve(string, int, string){
    
    }
    

    带着这个疑问看下面的方法:

    	// 由于函数是一等公民,故我们将doLogin函数同普通变量一样当做入参传递进去。
     	mux.HandleFunc("/login", doLogin)
    
      func doLogin(writer http.ResponseWriter,req *http.Request){
        ...
    	}
    

    跟进去看 HandleFunc 函数的实现:

    首先:HandleFunc函数的第二个参数是接收的函数的类型和doLogin函数的类型是一致的,所以doLogin能正常的传递进HandleFunc中。

    其次:我们的关注点应该是下面的HandlerFunc(handler)

    // HandleFunc registers the handler function for the given pattern.
    func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
    	if handler == nil {
    		panic("http: nil handler")
    	}
    	mux.Handle(pattern, HandlerFunc(handler))
    }
    

    跟进这个HandlerFunc(handler) 看到下图,真相就大白于天下了。golang以一种优雅的方式悄无声息的为我们完成了一次适配。这么看来上面的HandlerFunc(handler)并不是函数的调用,而是doLogin转换成自定义类型。这个自定义类型去实现了Handle接口(因为它重写了ServeHTTP函数)以闭包的形式完美的将我们的doLogin适配成了Handle类型。

    image-20200625171922500

    在往下看Handle方法:

    第一:将pattern和handler注册进map中

    第二:为了保证整个过程的并发安全,使用锁保护整个过程。

    // Handle registers the handler for the given pattern.
    // If a handler already exists for pattern, Handle panics.
    func (mux *ServeMux) Handle(pattern string, handler Handler) {
    	mux.mu.Lock()
    	defer mux.mu.Unlock()
    
    	if pattern == "" {
    		panic("http: invalid pattern")
    	}
    	if handler == nil {
    		panic("http: nil handler")
    	}
    	if _, exist := mux.m[pattern]; exist {
    		panic("http: multiple registrations for " + pattern)
    	}
    
    	if mux.m == nil {
    		mux.m = make(map[string]muxEntry)
    	}
    	mux.m[pattern] = muxEntry{h: handler, pattern: pattern}
    
    	if pattern[0] != '/' {
    		mux.hosts = true
    	}
    
    

    启动服务

    概览图:

    image-20200627163736422

    和java对比着看,在java一组复杂的逻辑会被封装成一个class。在golang中对应的就是一组复杂的逻辑会被封装成一个结构体。

    对应HttpServer肯定也是这样,http服务器在golang的实现中有自己的结构体。它就是http包下的Server。

    它有一系列描述性属性。如监听的地址、写超时时间、路由器。

    	server := &http.Server{
    		Addr:         ":8081",
    		WriteTimeout: time.Second * 2,
    		Handler:      mux,
    	}
    	log.Fatal(server.ListenAndServe())
    

    我们看它启动服务的函数:server.ListenAndServe()

    实现的逻辑是使用net包下的Listen函数,获取给定地址上的tcp连接。

    再将这个tcp连接封装进 tcpKeepAliveListenner 结构体中。

    在将这个tcpKeepAliveListenner丢进Server的Serve函数中处理

    // ListenAndServe 会监听开发者给定网络地址上的tcp连接,当有请求到来时,会调用Serve函数去处理这个连接。
    // 它接收到所有连接都使用 TCP keep-alives相关的配置
    // 
    // 如果构造Server时没有指定Addr,他就会使用默认值: “:http”
    // 
    // 当Server ShutDown或者是Close,ListenAndServe总是会返回一个非nil的error。
    // 返回的这个Error是 ErrServerClosed
    func (srv *Server) ListenAndServe() error {
    	if srv.shuttingDown() {
    		return ErrServerClosed
    	}
    	addr := srv.Addr
    	if addr == "" {
    		addr = ":http"
    	}
      // 底层借助于tcp实现
    	ln, err := net.Listen("tcp", addr)
    	if err != nil {
    		return err
    	}
    	return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
    }
    
    // tcpKeepAliveListener会为TCP设置一个keep-alive 超时时长。
    // 它通常被 ListenAndServe 和 ListenAndServeTLS使用。
    // 它保证了已经dead的TCP最终都会消失。
    type tcpKeepAliveListener struct {
    	*net.TCPListener
    }
    

    接着去看看Serve方法,上一个函数中获取到了一个基于tcp的Listener,从这个Listener中可以不断的获取出新的连接,下面的方法中使用无限for循环完成这件事。conn获取到后将连接封装进httpConn,为了保证不阻塞下一个连接到到来,开启新的goroutine处理这个http连接。

    func (srv *Server) Serve(l net.Listener) error {
      // 如果有一个包裹了 srv 和 listener 的钩子函数,就执行它
    	if fn := testHookServerServe; fn != nil {
    		fn(srv, l) // call hook with unwrapped listener
    	}
    	
      // 将tcp的Listener封装进onceCloseListener,保证连接不会被关闭多次。
    	l = &onceCloseListener{Listener: l}
    	defer l.Close()
     
      // http2相关的配置
    	if err := srv.setupHTTP2_Serve(); err != nil {
    		return err
    	}
    
    	if !srv.trackListener(&l, true) {
    		return ErrServerClosed
    	}
    	defer srv.trackListener(&l, false)
    	
      // 如果没有接收到请求睡眠多久
    	var tempDelay time.Duration     // how long to sleep on accept failure
    	baseCtx := context.Background() // base is always background, per Issue 16220
    	ctx := context.WithValue(baseCtx, ServerContextKey, srv)
      // 开启无限循环,尝试从Listenner中获取连接。
    	for {
    		rw, e := l.Accept()
        // accpet过程中发生错屋
    		if e != nil {
    			select {
            // 如果从server的doneChan中可以获取内容,返回Server关闭了
    			case <-srv.getDoneChan():
    				return ErrServerClosed
    			default:
    			}
          // 如果发生了 net.Error 并且是临时的错误就睡5毫秒,再发生错误睡眠的时间*2,上线是1s
    			if ne, ok := e.(net.Error); ok && ne.Temporary() {
    				if tempDelay == 0 {
    					tempDelay = 5 * time.Millisecond
    				} else {
    					tempDelay *= 2
    				}
    				if max := 1 * time.Second; tempDelay > max {
    					tempDelay = max
    				}
    				srv.logf("http: Accept error: %v; retrying in %v", e, tempDelay)
    				time.Sleep(tempDelay)
    				continue
    			}
    			return e
    		}
        // 如果没有发生错误,清空睡眠的时间
    		tempDelay = 0
        // 将接收到连接封装进httpConn
    		c := srv.newConn(rw)
    		c.setState(c.rwc, StateNew) // before Serve can return
        // 开启一条新的协程处理这个连接
    		go c.serve(ctx)
    	}
    }
    

    处理请求

    c.serve(ctx)中就会去解析http相关的报文信息~,将http报文解析进Request结构体中。

    部分代码如下:

    		// 将 server 包裹为 serverHandler 的实例,执行它的 ServeHTTP 方法,处理请求,返回响应。
    		// serverHandler 委托给 server 的 Handler 或者 DefaultServeMux(默认路由器)
    		// 来处理 "OPTIONS *" 请求。
    		serverHandler{c.server}.ServeHTTP(w, w.req)
    
    // serverHandler delegates to either the server's Handler or
    // DefaultServeMux and also handles "OPTIONS *" requests.
    type serverHandler struct {
    	srv *Server
    }
    
    func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
      // 如果没有定义Handler就使用默认的
    	handler := sh.srv.Handler
    	if handler == nil {
    		handler = DefaultServeMux
    	}
    	if req.RequestURI == "*" && req.Method == "OPTIONS" {
    		handler = globalOptionsHandler{}
    	}
      // 处理请求,返回响应。
    	handler.ServeHTTP(rw, req)
    }
    

    image-20200625183225261

    可以看到,req中包含了我们前面说的pattern,叫做RequestUri,有了它下一步就知道该回调ServeMux中的哪一个函数。

    HttpClient源码阅读

    DemoCode

    func main() {
    	// 创建连接池
    	// 创建客户端,绑定连接池
    	// 发送请求
    	// 读取响应
    	transport := &http.Transport{
    		DialContext: (&net.Dialer{
    			Timeout:   30 * time.Second, // 连接超时
    			KeepAlive: 30 * time.Second, // 长连接存活的时间
    		}).DialContext,
        // 最大空闲连接数
    		MaxIdleConns:          100,             
        // 超过最大空闲连接数的连接会在IdleConnTimeout后被销毁
    		IdleConnTimeout:       10 * time.Second, 
    		TLSHandshakeTimeout:   10 * time.Second, // tls握手超时时间
    		ExpectContinueTimeout: 1 * time.Second,  // 100-continue 状态码超时时间
    	}
    
    	// 创建客户端
    	client := &http.Client{
    		Timeout:   time.Second * 10, //请求超时时间
    		Transport: transport,
    	}
    
    	// 请求数据,获得响应
    	res, err := client.Get("http://localhost:8081/login")
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	defer res.Body.Close()
      // 处理数据
    	bytes, err := ioutil.ReadAll(res.Body)
    	if err != nil {
    		fmt.Printf("error : %v", err)
    		return
    	}
    	fmt.Printf("Read from http server res:[%v]", string(bytes))
    }
    

    整理思路

    http.Client的代码其实是很多的,全部很细的过一遍肯定也会难度,下面可能也是只能提及其中的一部分。

    首先明白一件事,我们编写的HttpClient是在干什么?(虽然这个问题很傻,但是总得问一下)是在发送Http请求。

    一般我们在开发的时候,更多的编写的是HttpServer的代码。是在处理Http请求, 而不是去发送Http请求,Http请求都是是前端通过ajax经由浏览器发送到后端的。

    其次,Http请求实际上是建立在tcp连接之上的,所以如果我们去看http.Client肯定能找到net.Dial("tcp",adds)相关的代码。

    那也就是说,我们要看看,http.Client是如何在和服务端建立连接、发送数据、接收数据的。

    重要的struct

    http.Client中有机几个比较重要的struct,如下

    http.Client结构体中封装了和http请求相关的属性,诸如 cookie,timeout,redirect以及Transport。

    type Client struct {
    	Transport RoundTripper
    	CheckRedirect func(req *Request, via []*Request) error
    	Jar CookieJar
    	Timeout time.Duration
    }
    

    Tranport实现了RoundTrpper接口:

     type RoundTripper interface {   
      // 1、RoundTrip会去执行一个简单的 Http Trancation,并为requestt返回一个响应
      // 2、RoundTrip不会尝试去解析response
      // 3、注意:只要返回了Reponse,无论response的状态码是多少,RoundTrip返回的结果:err == nil 
      // 4、RoundTrip将请求发送出去后,如果他没有获取到response,他会返回一个非空的err。
      // 5、同样,RoundTrip不会尝试去解析诸如重定向、认证、cookie这种更高级的协议。 
      // 6、除了消费和关闭请求体之外,RoundTrip不会修改request的其他字段
      // 7、RoundTrip可以在一个单独的gorountine中读取request的部分字段。一直到ResponseBody关闭之前,调用者都不能取消,或者重用这个request
      // 8、RoundTrip始终会保证关闭Body(包含在发生err时)。根据实现的不同,在RoundTrip关闭前,关闭Body这件事可能会在一个单独的goroutine中去做。这就意味着,如果调用者想将请求体用于后续的请求,必须等待知道发生Close
      // 9、请求的URL和Header字段必须是被初始化的。 
    	RoundTrip(*Request) (*Response, error)
    }
    

    看上面RoundTrpper接口,它里面只有一个方法RoundTrip,方法的作用就是执行一次Http请求,发送Request然后获取Response。

    RoundTrpper被设计成了一个支持并发的结构体。

    Transport结构体如下:

    type Transport struct {
    	idleMu     sync.Mutex
       // user has requested to close all idle conns
    	wantIdle   bool
      // Transport的作用就是用来建立一个连接,这个idleConn就是Transport维护的空闲连接池。
    	idleConn   map[connectMethodKey][]*persistConn // most recently used at end
    	idleConnCh map[connectMethodKey]chan *persistConn
    }
    

    其中的connectMethodKey也是结构体:

    type connectMethodKey struct {
      // proxy 代理的URL,当他不为空时,就会一直使用这个key 
      // scheme 协议的类型, http https
      // addr 代理的url,也就是下游的url
    	proxy, scheme, addr string
    }
    

    persistConn是一个具体的连接实例,包含连接的上下文。

    type persistConn struct {
      // alt可选地指定TLS NextProto RoundTripper。 
      // 这用于今天的HTTP / 2和以后的将来的协议。 如果非零,则其余字段未使用。
    	alt RoundTripper
    	t         *Transport
    	cacheKey  connectMethodKey
    	conn      net.Conn
    	tlsState  *tls.ConnectionState
      // 用于从conn中读取内容
    	br        *bufio.Reader       // from conn
      // 用于往conn中写内容
    	bw        *bufio.Writer       // to conn
    	nwrite    int64               // bytes written
      // 他是个chan,roundTrip会将readLoop中的内容写入到reqch中
    	reqch     chan requestAndChan 
      // 他是个chan,roundTrip会将writeLoop中的内容写到writech中
    	writech   chan writeRequest  
    	closech   chan struct{}       // closed when conn closed
    

    另外补充一个结构体:Request,他用来描述一次http请求的实例,它定义于http包request.go, 里面封装了对Http请求相关的属性

    type Request struct {
       Method string
       URL *url.URL
       Proto      string // "HTTP/1.0"
       ProtoMajor int    // 1
       ProtoMinor int    // 0
       Header Header
       Body io.ReadCloser
       GetBody func() (io.ReadCloser, error)
       ContentLength int64
       TransferEncoding []string
       Close bool
       Host string
       Form url.Values
       PostForm url.Values
       MultipartForm *multipart.Form
       Trailer Header
       RemoteAddr string
       RequestURI string
       TLS *tls.ConnectionState
       Cancel <-chan struct{}
       Response *Response
       ctx context.Context
    }
    

    这几个结构体共同完成如下图所示http.Client的工作流程

    image-20200627131720251

    流程

    我们想发送一次Http请求。首先我们需要构造一个Request,Request本质上是对Http协议的描述(因为大家使用的都是Http协议,所以将这个Request发送到HttpServer后,HttpServer能识别并解析它)。

    // 从这行代码开始往下看
    	res, err := client.Get("http://localhost:8081/login")
    
    // 跟进Get
    	req, err := NewRequest("GET", url, nil)
    	if err != nil {
    		return nil, err
    	}
    	return c.Do(req)
    
    // 跟进Do
    	func (c *Client) Do(req *Request) (*Response, error) {
    	return c.do(req)
     } 
    
    // 跟进do,do函数中有下面的逻辑,可以看到执行完send后已经拿到返回值了。所以我们得继续跟进send方法
      if resp, didTimeout, err = c.send(req, deadline); err != nil 
    
    // 跟进send方法,可以看到send中还有一send方法,入参分别是:request,tranpost,deadline
    // 到现在为止,我们没有看到有任何和服务端建立连接的动作发生,但是构造的req和拥有连接池的tranport已经见面了~
    	resp, didTimeout, err = send(req, c.transport(), deadline)
    
    // 继续跟进这个send方法,看到了调用了rt的RoundTrip方法。
    // 这个rt就是我们编写HttpClient代码时创建的,绑定在http.Client上的tranport实例。
    // 这个RoundTrip方法的作用我们在上面已经说过了,最直接的作用就是:发送request 并获取response。
    	resp, err = rt.RoundTrip(req)
    
    

    但是RoundTrip他是个定义在RoundTripper接口中的抽象方法,我们看代码肯定是要去看具体的实现嘛
    这里可以使用断点调试法:在上面最后一行上打上断点,会进入到他的具体实现中。从图中可以看到具体的实现在roundtrip中。

    image-20200627103402751

    RoundTrip中调用的函数是我们自定义的transport的roundTrip函数, 跟进去如下:

    紧接着我们需要一个conn,这个conn我们通过Transport可以获取到。conn的类型为persistConn。

    // roundTrip函数中又一个无限for循环
    for {
        // 检查请求的上下文是否关闭了
    		select {
    		case <-ctx.Done():
    			req.closeBody()
    			return nil, ctx.Err()
    		default:
    		}
    
        // 对传递进来的req进行了有一层的封装,封装后的这个treq可以被roundTrip修改,所以每次重试都会新建
    		treq := &transportRequest{Request: req, trace: trace}
    		cm, err := t.connectMethodForRequest(treq)
    		if err != nil {
    			req.closeBody()
    			return nil, err
    		}
    
        // 到这里真的执行从tranport中获取和对应主机的连接,这个连接可能是http、https、http代理、http代理的高速缓存, 但是无论如何我们都已经准备好了向这个连接发送treq
        // 这里获取出来的连接就是我们在上文中提及的persistConn
    		pconn, err := t.getConn(treq, cm)
    		if err != nil {
    			t.setReqCanceler(req, nil)
    			req.closeBody()
    			return nil, err
    		}
    
    		var resp *Response
    		if pconn.alt != nil {
    			// HTTP/2 path.
    			t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per host
    			t.setReqCanceler(req, nil)   // not cancelable with CancelRequest
    			resp, err = pconn.alt.RoundTrip(req)
    		} else {
          
          // 调用persistConn的roundTrip方法,发送treq并获取响应。
    			resp, err = pconn.roundTrip(treq)
    		}
    		if err == nil {
    			return resp, nil
    		}
    		if !pconn.shouldRetryRequest(req, err) {
    			// Issue 16465: return underlying net.Conn.Read error from peek,
    			// as we've historically done.
    			if e, ok := err.(transportReadFromServerError); ok {
    				err = e.err
    			}
    			return nil, err
    		}
    		testHookRoundTripRetried()
    
    		// Rewind the body if we're able to.  (HTTP/2 does this itself so we only
    		// need to do it for HTTP/1.1 connections.)
    		if req.GetBody != nil && pconn.alt == nil {
    			newReq := *req
    			var err error
    			newReq.Body, err = req.GetBody()
    			if err != nil {
    				return nil, err
    			}
    			req = &newReq
    		}
    	}
    

    整理思路:然后看上面代码中获取conn和roundTrip的实现细节。

    我们需要一个conn,这个conn可以通过Transport获取到。conn的类型为persistConn。但是不管怎么样,都得先获取出 persistConn,才能进一步完成发送请求再得到服务端到响应。

    然后关于这个persistConn结构体其实上面已经提及过了。重新贴在下面

    type persistConn struct {
      // alt可选地指定TLS NextProto RoundTripper。 
      // 这用于今天的HTTP / 2和以后的将来的协议。 如果非零,则其余字段未使用。
    	alt RoundTripper
      
      conn      net.Conn
    	t         *Transport
    	br        *bufio.Reader  // 用于从conn中读取内容
    	bw        *bufio.Writer  // 用于往conn中写内容
      // 他是个chan,roundTrip会将readLoop中的内容写入到reqch中
    	reqch     chan requestAndChan 
      // 他是个chan,roundTrip会将writeLoop中的内容写到writech中
      
    	nwrite    int64               // bytes written
    	cacheKey  connectMethodKey
    	tlsState  *tls.ConnectionState
    	writech   chan writeRequest  
    	closech   chan struct{}       // closed when conn closed
    

    跟进 t.getConn(treq, cm)代码如下:

    	// 先尝试从空闲缓冲池中取得连接
      // 所谓的空闲缓冲池就是Tranport结构体中的: idleConn map[connectMethodKey][]*persistConn 
      // 入参位置的cm如下:
      /* type connectMethod struct {
          // 代理的url,如果没有代理的话,这个值为nil
    			proxyURL     *url.URL 
    			
    			// 连接所使用的协议 http、https
    			targetScheme string
          
    	    // 如果proxyURL指定了http代理或者是https代理,并且使用的协议是http而不是https。
    	    // 那么下面的targetAddr就会不包含在connect method key中。
    	    // 因为socket可以复用不同的targetAddr值
    			targetAddr string
    	}*/
    	t.getIdleConn(cm);
    
    	// 空闲缓冲池有的空闲连接的话返回conn,否则进行如下的select
    	select {
        // todo 这里我还不确定是在干什么,目前猜测是这样的:每个服务器能打开的socket句柄是有限的
        // 每次来获取链接的时候,我们就计数+1。当整体的句柄在Host允许范围内时我们不做任何干涉~
    		case <-t.incHostConnCount(cmKey):
    			// count below conn per host limit; proceed
        
        // 重新尝试从空闲连接池中获取连接,因为可能有的连接使用完后被放回连接池了
    		case pc := <-t.getIdleConnCh(cm):
    			if trace != nil && trace.GotConn != nil {
    				trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})
    			}
    			return pc, nil
        // 请求是否被取消了
    		case <-req.Cancel:
    			return nil, errRequestCanceledConn
        // 请求的上下文是否Done掉了
    		case <-req.Context().Done():
    			return nil, req.Context().Err()
    		case err := <-cancelc:
    			if err == errRequestCanceled {
    				err = errRequestCanceledConn
    			}
    			return nil, err
    		}
    
    	// 开启新的gorountine新建连接一个连接
    	go func() {
        /**
        *	新建连接,方法底层封装了tcp client dial相关的逻辑
        *	conn, err := t.dial(ctx, "tcp", cm.addr())
        *	以及根据不同的targetScheme构建不同的request的逻辑。
        */
        // 获取到persistConn
    		pc, err := t.dialConn(ctx, cm)
        // 将persistConn写到chan中
    		dialc <- dialRes{pc, err}
    	}()
    
    	// 再尝试从空闲连接池中获取
      idleConnCh := t.getIdleConnCh(cm)
    	select {
      // 如果上面的go协程拨号成功了,这里就能取出值来
    	case v := <-dialc:
    		// Our dial finished.
    		if v.pc != nil {
    			if trace != nil && trace.GotConn != nil && v.pc.alt == nil {
    				trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})
    			}
    			return v.pc, nil
    		}
    		// Our dial failed. See why to return a nicer error
    		// value.
        // 将Host的连接-1
    		t.decHostConnCount(cmKey)
    		select {
        ...
    
    

    transport.dialConn

    下面代码中的cm长这样

    image-20200627121729925

    // dialConn是Transprot的方法
    // 入参:context上下文, connectMethod
    // 出参:persisnConn
    func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {
    	// 构建将要返回的 persistConn
      pconn := &persistConn{
    		t:             t,
    		cacheKey:      cm.key(),
    		reqch:         make(chan requestAndChan, 1),
    		writech:       make(chan writeRequest, 1),
    		closech:       make(chan struct{}),
    		writeErrCh:    make(chan error, 1),
    		writeLoopDone: make(chan struct{}),
    	}
    	trace := httptrace.ContextClientTrace(ctx)
    	wrapErr := func(err error) error {
    		if cm.proxyURL != nil {
    			// Return a typed error, per Issue 16997
    			return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err}
    		}
    		return err
    	}
      
      // 判断cm中使用的协议是否是https
    	if cm.scheme() == "https" && t.DialTLS != nil {
    		var err error
    		pconn.conn, err = t.DialTLS("tcp", cm.addr())
    		if err != nil {
    			return nil, wrapErr(err)
    		}
    		if pconn.conn == nil {
    			return nil, wrapErr(errors.New("net/http: Transport.DialTLS returned (nil, nil)"))
    		}
    		if tc, ok := pconn.conn.(*tls.Conn); ok {
    			// Handshake here, in case DialTLS didn't. TLSNextProto below
    			// depends on it for knowing the connection state.
    			if trace != nil && trace.TLSHandshakeStart != nil {
    				trace.TLSHandshakeStart()
    			}
    			if err := tc.Handshake(); err != nil {
    				go pconn.conn.Close()
    				if trace != nil && trace.TLSHandshakeDone != nil {
    					trace.TLSHandshakeDone(tls.ConnectionState{}, err)
    				}
    				return nil, err
    			}
    			cs := tc.ConnectionState()
    			if trace != nil && trace.TLSHandshakeDone != nil {
    				trace.TLSHandshakeDone(cs, nil)
    			}
    			pconn.tlsState = &cs
    		}
    	} else {
        // 如果不是https协议就来到这里,使用tcp向httpserver拨号,获取一个tcp连接。
    		conn, err := t.dial(ctx, "tcp", cm.addr())
    		if err != nil {
    			return nil, wrapErr(err)
    		}
        // 将获取到tcp连接交给我们的persistConn维护
    		pconn.conn = conn
        
        // 处理https相关逻辑
    		if cm.scheme() == "https" {
    			var firstTLSHost string
    			if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil {
    				return nil, wrapErr(err)
    			}
    			if err = pconn.addTLS(firstTLSHost, trace); err != nil {
    				return nil, wrapErr(err)
    			}
    		}
    	}
    
    	// Proxy setup.
    	switch {
      // 如果代理URL为空,不做任何处理  
    	case cm.proxyURL == nil:
    		// Do nothing. Not using a proxy.
      //   
    	case cm.proxyURL.Scheme == "socks5":
    		conn := pconn.conn
    		d := socksNewDialer("tcp", conn.RemoteAddr().String())
    		if u := cm.proxyURL.User; u != nil {
    			auth := &socksUsernamePassword{
    				Username: u.Username(),
    			}
    			auth.Password, _ = u.Password()
    			d.AuthMethods = []socksAuthMethod{
    				socksAuthMethodNotRequired,
    				socksAuthMethodUsernamePassword,
    			}
    			d.Authenticate = auth.Authenticate
    		}
    		if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil {
    			conn.Close()
    			return nil, err
    		}
    	case cm.targetScheme == "http":
    		pconn.isProxy = true
    		if pa := cm.proxyAuth(); pa != "" {
    			pconn.mutateHeaderFunc = func(h Header) {
    				h.Set("Proxy-Authorization", pa)
    			}
    		}
    	case cm.targetScheme == "https":
    		conn := pconn.conn
    		hdr := t.ProxyConnectHeader
    		if hdr == nil {
    			hdr = make(Header)
    		}
    		connectReq := &Request{
    			Method: "CONNECT",
    			URL:    &url.URL{Opaque: cm.targetAddr},
    			Host:   cm.targetAddr,
    			Header: hdr,
    		}
    		if pa := cm.proxyAuth(); pa != "" {
    			connectReq.Header.Set("Proxy-Authorization", pa)
    		}
    		connectReq.Write(conn)
    
    		// Read response.
    		// Okay to use and discard buffered reader here, because
    		// TLS server will not speak until spoken to.
    		br := bufio.NewReader(conn)
    		resp, err := ReadResponse(br, connectReq)
    		if err != nil {
    			conn.Close()
    			return nil, err
    		}
    		if resp.StatusCode != 200 {
    			f := strings.SplitN(resp.Status, " ", 2)
    			conn.Close()
    			if len(f) < 2 {
    				return nil, errors.New("unknown status code")
    			}
    			return nil, errors.New(f[1])
    		}
    	}
    
    	if cm.proxyURL != nil && cm.targetScheme == "https" {
    		if err := pconn.addTLS(cm.tlsHost(), trace); err != nil {
    			return nil, err
    		}
    	}
    
    	if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {
    		if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {
    			return &persistConn{alt: next(cm.targetAddr, pconn.conn.(*tls.Conn))}, nil
    		}
    	}
    
    	if t.MaxConnsPerHost > 0 {
    		pconn.conn = &connCloseListener{Conn: pconn.conn, t: t, cmKey: pconn.cacheKey}
    	}
      
      // 初始化persistConn的bufferReader和bufferWriter
    	pconn.br = bufio.NewReader(pconn) // 可以从上面给pconn维护的tcpConn中读数据
    	pconn.bw = bufio.NewWriter(persistConnWriter{pconn})// 可以往上面pconn维护的tcpConn中写数据 
      
      // 新开启两条和persistConn相关的go协程。
    	go pconn.readLoop()
    	go pconn.writeLoop()
    	return pconn, nil
    }
    

    上面的两条goroutine 和 br bw共同完成如下图的流程

    image-20200627131859112

    发送请求

    发送req的逻辑在http包的下的tranport包中的func (t *Transport) roundTrip(req *Request) (*Response, error) {}函数中。

    如下:

    	// 发送treq
    	resp, err = pconn.roundTrip(treq)
    
    	// 跟进roundTrip
      // 可以看到他将一个writeRequest结构体类型的实例写入了writech中
    	// 而这个writech会被上图中的writeLoop消费,借助bufferWriter写入tcp连接中,完成往服务端数据的发送。
    	pc.writech <- writeRequest{req, writeErrCh, continueCh}
    
  • 相关阅读:
    打印九九乘法表
    PAT (Basic Level) Practice (中文) 1091 N-自守数 (15分)
    PAT (Basic Level) Practice (中文)1090 危险品装箱 (25分) (单身狗进阶版 使用map+ vector+数组标记)
    PAT (Basic Level) Practice (中文) 1088 三人行 (20分)
    PAT (Basic Level) Practice (中文) 1087 有多少不同的值 (20分)
    PAT (Basic Level) Practice (中文)1086 就不告诉你 (15分)
    PAT (Basic Level) Practice (中文) 1085 PAT单位排行 (25分) (map搜索+set排序+并列进行排行)
    PAT (Basic Level) Practice (中文) 1083 是否存在相等的差 (20分)
    PAT (Basic Level) Practice (中文) 1082 射击比赛 (20分)
    PAT (Basic Level) Practice (中文) 1081 检查密码 (15分)
  • 原文地址:https://www.cnblogs.com/ZhuChangwu/p/13198872.html
Copyright © 2011-2022 走看看