zoukankan      html  css  js  c++  java
  • frp源码剖析-frp中的mux模块

    前言

    frp几乎所有的连接处理都是构建在mux模块之上的,重要性不必多说,来看一下这是个啥吧

    ps: 安装方法

    go get "github.com/fatedier/golib/net/mux"
    

    该模块很小,不到300行,分为两个文件:mux.gorule.go
    因为rule.go文件相对简单一些,我们先来看这个。

    role.go文件

    首先看其中所命名的函数类型MatchFunc

    type MatchFunc func(data []byte) (match bool)
    

    该类型的函数用来判断data属于什么协议。

    那么具体如何判断呢,这里也实现了三个例子:

    var (
    	HttpsNeedBytesNum uint32 = 1
    	HttpNeedBytesNum  uint32 = 3
    	YamuxNeedBytesNum uint32 = 2
    )
    
    var HttpsMatchFunc MatchFunc = func(data []byte) bool {
    	if len(data) < int(HttpsNeedBytesNum) {
    		return false
    	}
    
    	if data[0] == 0x16 {
    		return true
    	} else {
    		return false
    	}
    }
    
    // From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods
    var httpHeadBytes = map[string]struct{}{
    	"GET": struct{}{},
    	"HEA": struct{}{},
    	"POS": struct{}{},
    	"PUT": struct{}{},
    	"DEL": struct{}{},
    	"CON": struct{}{},
    	"OPT": struct{}{},
    	"TRA": struct{}{},
    	"PAT": struct{}{},
    }
    
    var HttpMatchFunc MatchFunc = func(data []byte) bool {
    	if len(data) < int(HttpNeedBytesNum) {
    		return false
    	}
    
    	_, ok := httpHeadBytes[string(data[:3])]
    	return ok
    }
    
    // From https://github.com/hashicorp/yamux/blob/master/spec.md
    var YamuxMatchFunc MatchFunc = func(data []byte) bool {
    	if len(data) < int(YamuxNeedBytesNum) {
    		return false
    	}
    
    	if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 {
    		return true
    	}
    	return false
    }
    

    这三个函数分别实现了区分HTTPS,HTTP以及go中特有的yamux(实际上这是一个库,可以参考Go中的I/O多路复用)。

    mux.go文件

    先来看其中的struct,第一个是Mux第二个是listener,这里先来看一下较为简单的listener

    listener结构体

    type listener struct {
    	mux *Mux
    
    	priority     int
    	needBytesNum uint32
    	matchFn      MatchFunc
    
    	c  chan net.Conn
    	mu sync.RWMutex
    }
    
    // Accept waits for and returns the next connection to the listener.
    func (ln *listener) Accept() (net.Conn, error) {
    	...
    }
    
    // Close removes this listener from the parent mux and closes the channel.
    func (ln *listener) Close() error {
    	...
    }
    
    func (ln *listener) Addr() net.Addr {
    	...
    }
    

    刚看到这个结构体我们可能很迷惑,不知道都是干啥的,而且网络编程中一般listener这种东西要绑定在一个套接字上,但很明显listener没有,不过其唯一跟套接字相关的可能是其c字段,其是一个由net包中的Conn接口组成的chanel;然后mu字段就是读写锁了,这个很简单;然后mux字段则是上面提到的两个结构体中的另一个结构体Mux的指针;接下来到了priority字段上,顾名思义,这个似乎跟优先级有关系,暂且存疑;needBytesNum则更有些蒙了,不过感觉其是跟读取byte的数量有关系,最后是matchFn

    好,初步认识了这个结构体的结构后,我们看看其方法。三个方法的listener实现了net模块中的Listener接口:

    // A Listener is a generic network listener for stream-oriented protocols.
    //
    // Multiple goroutines may invoke methods on a Listener simultaneously.
    type Listener interface {
    	// Accept waits for and returns the next connection to the listener.
    	Accept() (Conn, error)
    
    	// Close closes the listener.
    	// Any blocked Accept operations will be unblocked and return errors.
    	Close() error
    
    	// Addr returns the listener's network address.
    	Addr() Addr
    }
    

    然后先来分析其Accept方法:

    func (ln *listener) Accept() (net.Conn, error) {
    	conn, ok := <-ln.c
    	if !ok {
    		return nil, fmt.Errorf("network connection closed")
    	}
    	return conn, nil
    }
    

    该方法很简单,就是从c这个由Conn组成的channel中,获取Conn对象,好这里我们就明白了,这个listener和普通的不一样,他很特别,普通的listener监听的是套接字,而他监听的是channel,另外,肯定有某个地方在不停的往c这个channel中放Conn

    接下来是Close方法:

    func (ln *listener) Close() error {
    	if ok := ln.mux.release(ln); ok {
    		// Close done to signal to any RLock holders to release their lock.
    		close(ln.c)
    	}
    	return nil
    }
    

    我们暂且先把这个ln.mux.release(ln)放到一边,因为还不知道这个东西干了啥,暂且只需关注close(ln.c),我们知道这个函数是用来关闭channel的,go推荐由发送端调用,但这里似乎listener是一个消费端,可以看一下如何优雅的关闭Go Channel,看来重点在于ln.mux.release(ln)这里,我们暂且存疑[1],留待下面解决。

    最后是Addr方法:

    func (ln *listener) Addr() net.Addr {
    	if ln.mux == nil {
    		return nil
    	}
    	ln.mux.mu.RLock()
    	defer ln.mux.mu.RUnlock()
    	if ln.mux.ln == nil {
    		return nil
    	}
    	return ln.mux.ln.Addr()
    }
    

    在这里,mu字段就用上了,加读锁,然后返回mux字段中的ln字段的Addr方法。也就是这句return ln.mux.ln.Addr()

    Mux结构体

    字段以及相关函数

    Mux结构体则相对来说复杂很多,先来看一下他的字段定义:

    type Mux struct {
    	ln net.Listener
    
    	defaultLn *listener
    
    	// sorted by priority
    	lns             []*listener
    	maxNeedBytesNum uint32
    
    	mu sync.RWMutex
    }
    

    好,第一个字段ln是一个Listener接口;然后defaultLn是一个listener的指针;lns则是由listener的指针组成的切片,根据注释// sorted by priority,我们终于知道listenerpriority字段是干啥的了;接下来是maxNeedBytesNum字段,好奇怪,比起listenerneedBytesNum多了个“Max”,所以我们推测这个值取得是lns以及defaultLn字段中所有listenerneedBytesNum值最大的;最后的mu字段我们就不说了。

    需要注意的是:我们可能会发现Muxlistener存在相互引用,但在Go中我们倒也不用太担心,因为Go采用“标记-回收”或者其变种的垃圾回收算法,感兴趣可以参考Golang 垃圾回收剖析

    mux.go文件中定义了Mux的生成函数NewMux:

    func NewMux(ln net.Listener) (mux *Mux) {
    	mux = &Mux{
    		ln:  ln,
    		lns: make([]*listener, 0),
    	}
    	return
    }
    

    很简单,需要注意的是ln字段存储的一般不是listener这样的非常规Listener,一般是TCPListener这样具体的绑定了套接字的监听器。

    Mux方法

    接下来看Mux结构体的方法,首先看ListencopyLns

    // priority
    func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener {
        // 1
    	ln := &listener{
    		c:            make(chan net.Conn),
    		mux:          mux,
    		priority:     priority,
    		needBytesNum: needBytesNum,
    		matchFn:      fn,
    	}
    
    	mux.mu.Lock()
    	defer mux.mu.Unlock()
    	// 2
    	if needBytesNum > mux.maxNeedBytesNum {
    		mux.maxNeedBytesNum = needBytesNum
    	}
    
        // 3
    	newlns := append(mux.copyLns(), ln)
    	sort.Slice(newlns, func(i, j int) bool {
    		if newlns[i].priority == newlns[j].priority {
    			return newlns[i].needBytesNum < newlns[j].needBytesNum
    		}
    		return newlns[i].priority < newlns[j].priority
    	})
    	mux.lns = newlns
    	return ln
    }
    
    func (mux *Mux) copyLns() []*listener {
    	lns := make([]*listener, 0, len(mux.lns))
    	for _, l := range mux.lns {
    		lns = append(lns, l)
    	}
    	return lns
    }
    

    copyLns方法很简单,就是跟名字的含义一样,生成一个lns字段的副本并返回。

    Listen基本做了三步:

    1. 生成一个listener结构体实例,并获取互斥锁
    2. 根据情况更新needBytesNum字段
    3. 将新生成的listener实例按照优先级放入lns字段对应的slice中

    接下来是ListenHttpListenHttps方法:

    func (mux *Mux) ListenHttp(priority int) net.Listener {
    	return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc)
    }
    
    func (mux *Mux) ListenHttps(priority int) net.Listener {
    	return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc)
    }
    

    这两个差不多,所以放到一起说,基本都是专门写了一个方法让我们能方便的创建处理Http或者Httpslistener

    再来看DefaultListener方法:

    func (mux *Mux) DefaultListener() net.Listener {
    	mux.mu.Lock()
    	defer mux.mu.Unlock()
    	if mux.defaultLn == nil {
    		mux.defaultLn = &listener{
    			c:   make(chan net.Conn),
    			mux: mux,
    		}
    	}
    	return mux.defaultLn
    }
    

    这个方法很简单,基本就是有则返回没有则生成然后返回的套路。不过我们要注意defaultLn字段中的listener是不放入lns字段中的。

    接下来是Server方法:

    // Serve handles connections from ln and multiplexes then across registered listeners.
    func (mux *Mux) Serve() error {
    	for {
    		// Wait for the next connection.
    		// If it returns a temporary error then simply retry.
    		// If it returns any other error then exit immediately.
    		conn, err := mux.ln.Accept()
    		if err, ok := err.(interface {
    			Temporary() bool
    		}); ok && err.Temporary() {
    			continue
    		}
    
    		if err != nil {
    			return err
    		}
    
    		go mux.handleConn(conn)
    	}
    }
    

    一般来说,当我们调用NewMux函数以后,接下来就会调用Server方法,该方法基本上就是阻塞监听某个套接字,当有连接建立成功后立即另起一个goroutine调用handleConn方法;当连接建立失败根据err是否含有Temporary方法,如果有则执行并忽略错误,没有则返回错误。

    现在我们看看handleConn方法干了些啥:

    func (mux *Mux) handleConn(conn net.Conn) {
        // 1
    	mux.mu.RLock()
    	maxNeedBytesNum := mux.maxNeedBytesNum
    	lns := mux.lns
    	defaultLn := mux.defaultLn
    	mux.mu.RUnlock()
        
        // 2
    	sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum))
    	data := make([]byte, maxNeedBytesNum)
    
    	conn.SetReadDeadline(time.Now().Add(DefaultTimeout))
    	_, err := io.ReadFull(rd, data)
    	if err != nil {
    		conn.Close()
    		return
    	}
    	conn.SetReadDeadline(time.Time{})
        // 3
    	for _, ln := range lns {
    		if match := ln.matchFn(data); match {
    			err = errors.PanicToError(func() {
    				ln.c <- sharedConn
    			})
    			if err != nil {
    				conn.Close()
    			}
    			return
    		}
    	}
    
    	// No match listeners
    	if defaultLn != nil {
    		err = errors.PanicToError(func() {
    			defaultLn.c <- sharedConn
    		})
    		if err != nil {
    			conn.Close()
    		}
    		return
    	}
    
    	// No listeners for this connection, close it.
    	conn.Close()
    	return
    }
    

    handleConn方法也不算复杂,大体可以分为三步:

    1. 获取当前状态
    2. conn中读取数据,注意:shareConnrd存在单向关系,如果从rd中读取数据的话,数据也会复制一份放到shareConn中,反过来就不成立了
    3. 读取到的数据会被遍历,最终选出与matchFunc匹配的最高优先级的listener,并将shareConn放入该listenerc字段中,如果没有匹配到则放到defaultLn中的c字段中,如果defaultLnnil的话就不处理,直接关闭conn

    最后来到了release方法了:

    func (mux *Mux) release(ln *listener) bool {
    	result := false
    	mux.mu.Lock()
    	defer mux.mu.Unlock()
    	lns := mux.copyLns()
    
    	for i, l := range lns {
    		if l == ln {
    			lns = append(lns[:i], lns[i+1:]...)
    			result = true
    			break
    		}
    	}
    	mux.lns = lns
    	return result
    }
    

    release方法意思很明确:把对应的listenerlns中移除,并把结果返回,整个过程有互斥锁,我们回到存疑1,尽管有互斥锁,但在这种情况下:当某个goroutine运行到handleConn已经执行到了第三阶段的开始状态(也就是还没有找到匹配的listener)时,且Go运行在多核状态下,当另一个goroutine运行完listenerClose方法时,这时就可能发生往一个已经关闭的channel中send数据,但请注意handleConn的第三步的这段代码:

    err = errors.PanicToError(func() { // 就是这里了
    	ln.c <- sharedConn
    })
    if err != nil {
    	conn.Close()
    }
    

    这个PanicToError是这样的:

    func PanicToError(fn func()) (err error) {
    	defer func() {
    		if r := recover(); r != nil {
    			err = fmt.Errorf("Panic error: %v", r)
    		}
    	}()
    
    	fn()
    	return
    }
    

    基本上就是执行了recover然后将错误打印出来,结合下面的对err的判断,就会将send失败的conn关闭。

    总结

    1. Mux中包含了一个初始监听器,基本上所有的事件(比如说新的连接建立,之所以叫事件是因为我实在想不出更精确的词语了)都起源于此
    2. listener实现了net.Listener接口,可以作为二级监听器使用(比如传给net/http.Server结构体的Server方法进行处理)。
    3. Mux包含了一个由listener组成的有序slice,当有事件产生时就会遍历这个slice找出合适的listener并将事件传给他。

    讲到这里基本上是完事了。整个mux模块还是比较简单的,起码是由一个个简单的东西组合而成。那么一起来意淫一下整体流程吧。

    假如我要实现这么一个网络程序:

    1. 绑定监听一个基于tcp的套接字
    2. 我们允许其应用层可支持多个(比如说支持http https这两个吧,尽管http和https可以说是一个协议。。),不同的应用层协议对应不同的处理函数

    就这么两个很简单的要求,不难吧。

    那么我们一起来实现吧:

    
    type HandleFunc func(c net.Conn) (n int, err error) 
    
    type MyServer struct {
        l net.Listener
        hFunc HandleFunc
    }
    
    func (h *MyServer) Server() (err error) {
        for {
            conn, err := h.l.Accept()
            if err != nil {
                return
            }
            go h.hFunc(conn)
        }
    }
    
    func HandleHttp(c net.Conn)(n int, err error){
        n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?"))
    }
    
    func HandleHttps(c net.Conn)(n int, err error){
        n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?"))
    }
    
    
    func main() (err error){
        ln, err := net.Listen("tcp", "0.0.0.0:12345")
    	if err != nil {
    		err = fmt.Errorf("Create server listener error, %v", err)
    		return
    	}
    	muxer = mux.NewMux(ln)
    	
    	var lHttp, lHttps net.Listener
    	lHttp = muxer.ListenHttp(1)
    	httpServer := *MyServer{lHttp, HandleHttp}
    	
    	lHttps = muxer.ListenHttps(2)
    	httpsServer := *MyServer{lHttps, HandleHttps}
    	
    	go httpServer.Server()
    	go httpsServer.Server()
    
    	err = muxer.Serve()
    }
    
  • 相关阅读:
    1451. Rearrange Words in a Sentence
    1450. Number of Students Doing Homework at a Given Time
    1452. People Whose List of Favorite Companies Is Not a Subset of Another List
    1447. Simplified Fractions
    1446. Consecutive Characters
    1448. Count Good Nodes in Binary Tree
    709. To Lower Case
    211. Add and Search Word
    918. Maximum Sum Circular Subarray
    lua 时间戳和时间互转
  • 原文地址:https://www.cnblogs.com/MnCu8261/p/10639897.html
Copyright © 2011-2022 走看看