zoukankan      html  css  js  c++  java
  • kcp-go源码解析

    概念

    ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.
    RTO:Retransmission TimeOut
    FEC:Forward Error Correction

    kcp简介

    kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟降低30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcp

    kcp-go是用go实现了kcp协议的一个库,其实kcp类似tcp,协议的实现也很多参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。
    kcp和tcp一样,也分客户端和监听端。

        +-+-+-+-+-+            +-+-+-+-+-+
        |  Client |            |  Server |
        +-+-+-+-+-+            +-+-+-+-+-+
            |------ kcp data ------>|     
            |<----- kcp data -------|     
    

    kcp协议

    layer model

    +----------------------+
    |      Session         |
    +----------------------+
    |      KCP(ARQ)        |
    +----------------------+
    |      FEC(OPTIONAL)   |
    +----------------------+
    |      CRYPTO(OPTIONAL)|
    +----------------------+
    |      UDP(Packet)     |
    +----------------------+
    

    KCP header

    KCP Header Format

          4           1   1     2 (Byte)
    +---+---+---+---+---+---+---+---+
    |     conv      |cmd|frg|  wnd  |
    +---+---+---+---+---+---+---+---+
    |     ts        |     sn        |
    +---+---+---+---+---+---+---+---+
    |     una       |     len       |
    +---+---+---+---+---+---+---+---+
    |                               |
    +             DATA              +
    |                               |
    +---+---+---+---+---+---+---+---+
    

    代码结构

    src/vendor/github.com/xtaci/kcp-go/
    ├── LICENSE
    ├── README.md
    ├── crypt.go    加解密实现
    ├── crypt_test.go
    ├── donate.png
    ├── fec.go      向前纠错实现
    ├── frame.png
    ├── kcp-go.png
    ├── kcp.go      kcp协议实现
    ├── kcp_test.go
    ├── sess.go     会话管理实现
    ├── sess_test.go
    ├── snmp.go     数据统计实现
    ├── updater.go  任务调度实现
    ├── xor.go      xor封装
    └── xor_test.go
    

    着重研究两个文件kcp.gosess.go

    kcp浅析

    kcp是基于udp实现的,所有udp的实现这里不做介绍,kcp做的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各种处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端整体实现的流程,只是大概介绍一下函数流,不做详细解析,详细解析看后面数据流的解析。

    kcp client整体函数流

    和tcp一样,kcp要连接服务端需要先拨号,但是和tcp有个很大的不同是,即使服务端没有启动,客户端一样可以拨号成功,因为实际上这里的拨号没有发送任何信息,而tcp在这里需要三次握手。

    DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int)
    	V
    net.DialUDP("udp", nil, udpaddr)
    	V
    NewConn()
    	V
    newUDPSession() {初始化UDPSession}
    	V
    NewKCP() {初始化kcp}
    	V
    updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
    	V
    go sess.readLoop()
    	V
    go s.receiver(chPacket)
    	V
    s.kcpInput(data)
    	V
    s.fecDecoder.decodeBytes(data)
    	V
    s.kcp.Input(data, true, s.ackNoDelay)
    	V
    kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
    	V
    notifyReadEvent()
    
    

    客户端大体的流程如上面所示,先Dial,建立udp连接,将这个连接封装成一个会话,然后启动一个go程,接收udp的消息。

    kcp server整体函数流

    ListenWithOptions() 
        V
    net.ListenUDP()
        V
    ServerConn()
        V
    newFECDecoder()
        V
    go l.monitor() {从chPacket接收udp数据,写入kcp}
        V
    go l.receiver(chPacket) {从upd接收数据,并入队列}
        V
    newUDPSession()
        V
    updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务}
        V
    s.kcpInput(data)`
        V
    s.fecDecoder.decodeBytes(data)
        V
    s.kcp.Input(data, true, s.ackNoDelay)
        V
    kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲}
        V
    notifyReadEvent()
    

    服务端的大体流程如上图所示,先Listen,启动udp监听,接着用一个go程监控udp的数据包,负责将不同session的数据写入不同的udp连接,然后解析封装将数据交给上层。

    kcp 数据流详细解析

    不管是kcp的客户端还是服务端,他们都有io行为,就是读与写,我们只分析一个就好了,因为它们读写的实现是一样的,这里分析客户端的读与写。

    kcp client 发送消息

    s.Write(b []byte) 
    	V
    s.kcp.WaitSnd() {}
    	V
    s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue}
     	V
    s.kcp.flush(false) [flush data to output] {
    	if writeDelay==true {
    		flush
    	}else{
    		每隔`interval`时间flush一次
    	}
    }
     	V
    kcp.output(buffer, size) 
     	V
    s.output(buf)
     	V
    s.conn.WriteTo(ext, s.remote)
     	V
    s.conn..Conn.WriteTo(buf)
    

    读写都是在sess.go文件中实现的,Write方法:

    // Write implements net.Conn
    func (s *UDPSession) Write(b []byte) (n int, err error) {
    	for {
    	    ...
    
    		// api flow control
    		if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) {
    			n = len(b)
    			for {
    				if len(b) <= int(s.kcp.mss) {
    					s.kcp.Send(b)
    					break
    				} else {
    					s.kcp.Send(b[:s.kcp.mss])
    					b = b[s.kcp.mss:]
    				}
    			}
    
    			if !s.writeDelay {
    				s.kcp.flush(false)
    			}
    			s.mu.Unlock()
    			atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n))
    			return n, nil
    		}
    
            ...
    		// wait for write event or timeout
    		select {
    		case <-s.chWriteEvent:
    		case <-c:
    		case <-s.die:
    		}
    
    		if timeout != nil {
    			timeout.Stop()
    		}
    	}
    }
    

    假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send(“hello”),而Send函数实现根据mss的值对数据分段,当然这里的发送的hello,长度太短,只分了一个段,并把它们插入发送的队列里。

    func (kcp *KCP) Send(buffer []byte) int {
        ...
    	for i := 0; i < count; i++ {
    		var size int
    		if len(buffer) > int(kcp.mss) {
    			size = int(kcp.mss)
    		} else {
    			size = len(buffer)
    		}
    		seg := kcp.newSegment(size)
    		copy(seg.data, buffer[:size])
    		if kcp.stream == 0 { // message mode
    			seg.frg = uint8(count - i - 1)
    		} else { // stream mode
    			seg.frg = 0
    		}
    		kcp.snd_queue = append(kcp.snd_queue, seg)
    		buffer = buffer[size:]
    	}
    	return 0
    }
    

    接着判断参数writeDelay,如果参数设置为false,则立马发送消息,否则需要任务调度后才会触发发送,发送消息是由flush函数实现的。

    // flush pending data
    func (kcp *KCP) flush(ackOnly bool) {
    	var seg Segment
    	seg.conv = kcp.conv
    	seg.cmd = IKCP_CMD_ACK
    	seg.wnd = kcp.wnd_unused()
    	seg.una = kcp.rcv_nxt
    
    	buffer := kcp.buffer
    	// flush acknowledges
    	ptr := buffer
    	for i, ack := range kcp.acklist {
    		size := len(buffer) - len(ptr)
    		if size+IKCP_OVERHEAD > int(kcp.mtu) {
    			kcp.output(buffer, size)
    			ptr = buffer
    		}
    		// filter jitters caused by bufferbloat
    		if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i {
    			seg.sn, seg.ts = ack.sn, ack.ts
    			ptr = seg.encode(ptr)
    
    		}
    	}
    	kcp.acklist = kcp.acklist[0:0]
    
    	if ackOnly { // flash remain ack segments
    		size := len(buffer) - len(ptr)
    		if size > 0 {
    			kcp.output(buffer, size)
    		}
    		return
    	}
    
    	// probe window size (if remote window size equals zero)
    	if kcp.rmt_wnd == 0 {
    		current := currentMs()
    		if kcp.probe_wait == 0 {
    			kcp.probe_wait = IKCP_PROBE_INIT
    			kcp.ts_probe = current + kcp.probe_wait
    		} else {
    			if _itimediff(current, kcp.ts_probe) >= 0 {
    				if kcp.probe_wait < IKCP_PROBE_INIT {
    					kcp.probe_wait = IKCP_PROBE_INIT
    				}
    				kcp.probe_wait += kcp.probe_wait / 2
    				if kcp.probe_wait > IKCP_PROBE_LIMIT {
    					kcp.probe_wait = IKCP_PROBE_LIMIT
    				}
    				kcp.ts_probe = current + kcp.probe_wait
    				kcp.probe |= IKCP_ASK_SEND
    			}
    		}
    	} else {
    		kcp.ts_probe = 0
    		kcp.probe_wait = 0
    	}
    
    	// flush window probing commands
    	if (kcp.probe & IKCP_ASK_SEND) != 0 {
    		seg.cmd = IKCP_CMD_WASK
    		size := len(buffer) - len(ptr)
    		if size+IKCP_OVERHEAD > int(kcp.mtu) {
    			kcp.output(buffer, size)
    			ptr = buffer
    		}
    		ptr = seg.encode(ptr)
    	}
    
    	// flush window probing commands
    	if (kcp.probe & IKCP_ASK_TELL) != 0 {
    		seg.cmd = IKCP_CMD_WINS
    		size := len(buffer) - len(ptr)
    		if size+IKCP_OVERHEAD > int(kcp.mtu) {
    			kcp.output(buffer, size)
    			ptr = buffer
    		}
    		ptr = seg.encode(ptr)
    	}
    
    	kcp.probe = 0
    
    	// calculate window size
    	cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd)
    	if kcp.nocwnd == 0 {
    		cwnd = _imin_(kcp.cwnd, cwnd)
    	}
    
    	// sliding window, controlled by snd_nxt && sna_una+cwnd
    	newSegsCount := 0
    	for k := range kcp.snd_queue {
    		if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 {
    			break
    		}
    		newseg := kcp.snd_queue[k]
    		newseg.conv = kcp.conv
    		newseg.cmd = IKCP_CMD_PUSH
    		newseg.sn = kcp.snd_nxt
    		kcp.snd_buf = append(kcp.snd_buf, newseg)
    		kcp.snd_nxt++
    		newSegsCount++
    		kcp.snd_queue[k].data = nil
    	}
    	if newSegsCount > 0 {
    		kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount)
    	}
    
    	// calculate resent
    	resent := uint32(kcp.fastresend)
    	if kcp.fastresend <= 0 {
    		resent = 0xffffffff
    	}
    
    	// check for retransmissions
    	current := currentMs()
    	var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64
    	for k := range kcp.snd_buf {
    		segment := &kcp.snd_buf[k]
    		needsend := false
    		if segment.xmit == 0 { // initial transmit
    			needsend = true
    			segment.rto = kcp.rx_rto
    			segment.resendts = current + segment.rto
    		} else if _itimediff(current, segment.resendts) >= 0 { // RTO
    			needsend = true
    			if kcp.nodelay == 0 {
    				segment.rto += kcp.rx_rto
    			} else {
    				segment.rto += kcp.rx_rto / 2
    			}
    			segment.resendts = current + segment.rto
    			lost++
    			lostSegs++
    		} else if segment.fastack >= resent { // fast retransmit
    			needsend = true
    			segment.fastack = 0
    			segment.rto = kcp.rx_rto
    			segment.resendts = current + segment.rto
    			change++
    			fastRetransSegs++
    		} else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit
    			needsend = true
    			segment.fastack = 0
    			segment.rto = kcp.rx_rto
    			segment.resendts = current + segment.rto
    			change++
    			earlyRetransSegs++
    		}
    
    		if needsend {
    			segment.xmit++
    			segment.ts = current
    			segment.wnd = seg.wnd
    			segment.una = seg.una
    
    			size := len(buffer) - len(ptr)
    			need := IKCP_OVERHEAD + len(segment.data)
    
    			if size+need > int(kcp.mtu) {
    				kcp.output(buffer, size)
    				current = currentMs() // time update for a blocking call
    				ptr = buffer
    			}
    
    			ptr = segment.encode(ptr)
    			copy(ptr, segment.data)
    			ptr = ptr[len(segment.data):]
    
    			if segment.xmit >= kcp.dead_link {
    				kcp.state = 0xFFFFFFFF
    			}
    		}
    	}
    
    	// flash remain segments
    	size := len(buffer) - len(ptr)
    	if size > 0 {
    		kcp.output(buffer, size)
    	}
    
    	// counter updates
    	sum := lostSegs
    	if lostSegs > 0 {
    		atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs)
    	}
    	if fastRetransSegs > 0 {
    		atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs)
    		sum += fastRetransSegs
    	}
    	if earlyRetransSegs > 0 {
    		atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs)
    		sum += earlyRetransSegs
    	}
    	if sum > 0 {
    		atomic.AddUint64(&DefaultSnmp.RetransSegs, sum)
    	}
    
    	// update ssthresh
    	// rate halving, https://tools.ietf.org/html/rfc6937
    	if change > 0 {
    		inflight := kcp.snd_nxt - kcp.snd_una
    		kcp.ssthresh = inflight / 2
    		if kcp.ssthresh < IKCP_THRESH_MIN {
    			kcp.ssthresh = IKCP_THRESH_MIN
    		}
    		kcp.cwnd = kcp.ssthresh + resent
    		kcp.incr = kcp.cwnd * kcp.mss
    	}
    
    	// congestion control, https://tools.ietf.org/html/rfc5681
    	if lost > 0 {
    		kcp.ssthresh = cwnd / 2
    		if kcp.ssthresh < IKCP_THRESH_MIN {
    			kcp.ssthresh = IKCP_THRESH_MIN
    		}
    		kcp.cwnd = 1
    		kcp.incr = kcp.mss
    	}
    
    	if kcp.cwnd < 1 {
    		kcp.cwnd = 1
    		kcp.incr = kcp.mss
    	}
    }
    

    flush函数非常的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly,意思就是只发送ack,如果ackOnly为true的话,该函数只遍历ack列表,然后发送,就完事了。 如果不是,也会发送真实数据。 在发送数据前先进行windSize探测,如果开启了拥塞控制nc=0,则每次发送前检测服务端的winsize,如果服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。如果没有开启拥塞控制,就按设置的winsize进行数据发送。
    接着循环每个段数据,并判断每个段数据的是否该重发,还有什么时候重发:
    1. 如果这个段数据首次发送,则直接发送数据。 2. 如果这个段数据的当前时间大于它自身重发的时间,也就是RTO,则重传消息。 3. 如果这个段数据的ack丢失累计超过resent次数,则重传,也就是快速重传机制。这个resent参数由resend参数决定。 4. 如果这个段数据的ack有丢失且没有新的数据段,则触发ER,ER相关信息ER

    最后通过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go的:

    func (s *UDPSession) output(buf []byte) {
    	var ecc [][]byte
    
    	// extend buf's header space
    	ext := buf
    	if s.headerSize > 0 {
    		ext = s.ext[:s.headerSize+len(buf)]
    		copy(ext[s.headerSize:], buf)
    	}
    
    	// FEC stage
    	if s.fecEncoder != nil {
    		ecc = s.fecEncoder.Encode(ext)
    	}
    
    	// encryption stage
    	if s.block != nil {
    		io.ReadFull(rand.Reader, ext[:nonceSize])
    		checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:])
    		binary.LittleEndian.PutUint32(ext[nonceSize:], checksum)
    		s.block.Encrypt(ext, ext)
    
    		if ecc != nil {
    			for k := range ecc {
    				io.ReadFull(rand.Reader, ecc[k][:nonceSize])
    				checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:])
    				binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum)
    				s.block.Encrypt(ecc[k], ecc[k])
    			}
    		}
    	}
    
    	// WriteTo kernel
    	nbytes := 0
    	npkts := 0
    	// if mrand.Intn(100) < 50 {
    	for i := 0; i < s.dup+1; i++ {
    		if n, err := s.conn.WriteTo(ext, s.remote); err == nil {
    			nbytes += n
    			npkts++
    		}
    	}
    	// }
    
    	if ecc != nil {
    		for k := range ecc {
    			if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil {
    				nbytes += n
    				npkts++
    			}
    		}
    	}
    	atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts))
    	atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes))
    }
    

    output函数才是真正的将数据写入内核中,在写入之前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码以后的hello就不是和原来的hello一样了,至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShardsparityShards,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。

    kcp的任务调度器

    其实这里任务调度器是一个很简单的实现,用一个全局变量updater来管理session,代码文件为updater.go。其中最主要的函数

    func (h *updateHeap) updateTask() {
    	var timer <-chan time.Time
    	for {
    		select {
    		case <-timer:
    		case <-h.chWakeUp:
    		}
    
    		h.mu.Lock()
    		hlen := h.Len()
    		now := time.Now()
    		if hlen > 0 && now.After(h.entries[0].ts) {
    			for i := 0; i < hlen; i++ {
    				entry := heap.Pop(h).(entry)
    				if now.After(entry.ts) {
    					entry.ts = now.Add(entry.s.update())
    					heap.Push(h, entry)
    				} else {
    					heap.Push(h, entry)
    					break
    				}
    			}
    		}
    		if hlen > 0 {
    			timer = time.After(h.entries[0].ts.Sub(now))
    		}
    		h.mu.Unlock()
    	}
    }
    

    任务调度器实现了一个堆结构,每当有新的连接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,得到entry然后执行entry.s.update()。而entry.s.update()会执行s.kcp.flush(false)来发送数据。

    总结

    这里简单介绍了kcp的整体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是需要返回ack的,而客户端也需要根据返回的ack来判断数据段是否需要重传还是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。

  • 相关阅读:
    今天开通我的博客
    在ArcGIS中,利用“行政单元面积权重法”实现人口数据格网化
    ArcGIS 下的水文分析
    常用计数器的verilog实现(binary、gray、onehot、LFSR、环形、扭环形)
    简单组合逻辑电路的verilog实现(包括三态门、38译码器、83优先编码器、8bit奇偶校验器)
    乘法器的verilog实现(并行、移位相加、查找表)
    简单时序逻辑电路的verilog实现,包括D触发器、JK触发器、锁存器、寄存器、
    简单ALU(算术逻辑单元)的verilog实现
    ubuntu下安装virtualbox 错误及解决办法
    C++单例模式对象的控制释放分析
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/34c07c3577c85e9ae5c3477d7cab5f52.html
Copyright © 2011-2022 走看看