zoukankan      html  css  js  c++  java
  • tcpsock.v2 与 ecocache

      因为很不满意 tcpsock 的设计与实现,及有意专为譬如游戏服务器端开发设计一套 TCP 网络库,所以年初即有了 tcpsock.v2 的开发计划,于是粗略整理出了以下几条目标计划:

        1) TcpConn 的 ID 类型由 uint32 升级为 uint64
        2) 比较灵活的 Create / Config Options 支持
        3) 以队列的方式实现数据发送等逻辑
        4) 库代码最好不使用反射
        5) 对游戏服务器端开发友好(TCP ONLY)
        6) TcpServer 增加诸如 Iterate、Send、Kick 等常用接口
        7) 回调、超时、重连等逻辑的整理优化与支持等

      而截至目前,除了第 2 条, tcpsock.v2 基本初步实现了以上目标(譬如重连,可能在应用层实现更合适),但不少设计实现或还比较粗糙,我应该会在测试及使用中对之慢慢修补改进增强(不排除后期再开发一套全新的库)。其改动较大的部分设计实现,如下可见一斑(conn.go):

    // Copyright (C) 2018 ecofast(胡光耀). All rights reserved.
    // Use of this source code is governed by a BSD-style license.
    
    package tcpsock
    
    import (
    	"errors"
    	"net"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    const (
    	RecvBufLenMax = 4 * 1024
    	SendBufLenMax = 24 * 1024
    	TcpBufLenMax  = 4 * 1024
    )
    
    type queueNode struct {
    	buf  []byte
    	next *queueNode
    }
    
    type TcpConn struct {
    	id         uint64
    	owner      *tcpSock
    	conn       net.Conn
    	closeChan  chan struct{}
    	closeOnce  sync.Once
    	closedFlag int32
    	onClose    OnTcpDisconnect
    
    	mutex         sync.Mutex
    	firstSendNode *queueNode
    	lastSendNode  *queueNode
    
    	sendBuf [SendBufLenMax]byte
    	bufLen  int
    
    	onRead func(p []byte) (n int, err error)
    }
    
    func newTcpConn(id uint64, owner *tcpSock, conn net.Conn, onClose OnTcpDisconnect) *TcpConn {
    	return &TcpConn{
    		id:        id,
    		owner:     owner,
    		conn:      conn,
    		closeChan: make(chan struct{}),
    		onClose:   onClose,
    	}
    }
    
    func (self *TcpConn) ID() uint64 {
    	return self.id
    }
    
    func (self *TcpConn) Write(b []byte) (n int, err error) {
    	if self.closed() {
    		return 0, errors.New("connection closed")
    	}
    
    	cnt := len(b)
    	if cnt == 0 || cnt > SendBufLenMax {
    		return 0, errors.New("invalid data")
    	}
    
    	node := &queueNode{}
    	node.buf = make([]byte, cnt)
    	copy(node.buf, b)
    	self.mutex.Lock()
    	if self.lastSendNode != nil {
    		self.lastSendNode.next = node
    	}
    	if self.firstSendNode == nil {
    		self.firstSendNode = node
    	}
    	self.lastSendNode = node
    	self.mutex.Unlock()
    	return cnt, nil
    }
    
    func (self *TcpConn) Close() error {
    	self.closeOnce.Do(func() {
    		atomic.StoreInt32(&self.closedFlag, 1)
    		close(self.closeChan)
    		self.conn.Close()
    		if self.onClose != nil {
    			self.onClose(self)
    		}
    		self.clear()
    	})
    	return nil
    }
    
    func (self *TcpConn) closed() bool {
    	return atomic.LoadInt32(&self.closedFlag) == 1
    }
    
    func (self *TcpConn) RawConn() net.Conn {
    	return self.conn
    }
    
    func (self *TcpConn) run() {
    	startGoroutine(self.send, self.owner.waitGroup)
    	startGoroutine(self.recv, self.owner.waitGroup)
    }
    
    func startGoroutine(fn func(), wg *sync.WaitGroup) {
    	wg.Add(1)
    	go func() {
    		fn()
    		wg.Done()
    	}()
    }
    
    func (self *TcpConn) clear() {
    	self.mutex.Lock()
    	defer self.mutex.Unlock()
    	self.firstSendNode = nil
    	self.lastSendNode = nil
    	self.bufLen = 0
    }
    
    func (self *TcpConn) getSendBuf() []byte {
    	self.mutex.Lock()
    	defer self.mutex.Unlock()
    
    	for self.firstSendNode != nil {
    		node := self.firstSendNode
    		self.firstSendNode = node.next
    		copy(self.sendBuf[self.bufLen:], node.buf)
    		self.bufLen += len(node.buf)
    		if self.bufLen >= TcpBufLenMax {
    			break
    		}
    	}
    
    	if self.firstSendNode == nil {
    		self.lastSendNode = nil
    	}
    
    	l := self.bufLen
    	if l > TcpBufLenMax {
    		l = TcpBufLenMax
    	}
    	if l > 0 {
    		b := make([]byte, l)
    		copy(b, self.sendBuf[:l])
    		self.bufLen -= l
    		if self.bufLen > 0 {
    			copy(self.sendBuf[0:], self.sendBuf[l:])
    		}
    		if self.bufLen < 0 {
    			self.bufLen = 0
    		}
    		return b
    	}
    
    	return nil
    }
    
    func (self *TcpConn) send() {
    	defer func() {
    		recover()
    		self.Close()
    	}()
    
    	for {
    		if self.closed() {
    			return
    		}
    
    		select {
    		case <-self.owner.exitChan:
    			return
    
    		case <-self.closeChan:
    			return
    
    		default:
    			if b := self.getSendBuf(); b != nil {
    				if n, err := self.conn.Write(b); err != nil || n != len(b) {
    					// to do
    					//
    					return
    				}
    			} else {
    				time.Sleep(5 * time.Millisecond)
    			}
    		}
    	}
    }
    
    func (self *TcpConn) recv() {
    	defer func() {
    		recover()
    		self.Close()
    	}()
    
    	buf := make([]byte, RecvBufLenMax)
    	for {
    		select {
    		case <-self.owner.exitChan:
    			return
    
    		case <-self.closeChan:
    			return
    
    		default:
    		}
    
    		cnt, err := self.conn.Read(buf)
    		if err != nil || cnt == 0 {
    			return
    		}
    		if self.onRead != nil {
    			if n, err := self.onRead(buf[:cnt]); n != cnt || err != nil {
    				// to do
    				//
    				return
    			}
    		}
    	}
    }

      基于 tcpsock.v2,我编写了个简单的聊天室,包括服务器端及客户端机器人等,其设计及实现或许能部分演示如何使用 tcpsock.v2 开发基于 TCP 协议的服务器端程序。当然单就这个聊天室而言,性能上应还有明显的可提升空间(设计上的调整与优化)。

      相较 Redis / Memcached,可能 groupcache 并不怎么知名(当然它们的设计使用场景差异很大),它的作者其实正是写出 Memcached 的那个大牛。早前我在阅读 groupcache 部分代码的时候,有时却居然有一种恍若几年前阅读范哥编写的一些基础库代码的感觉,譬如 consistenthash,譬如 lru,看起来平实无华没有半点所谓奇技或淫巧,但我相信,没有深厚的功底,断难写出这样的高质量实现(不才深感学识距大牛隔了数条街)。

      去年初的时候,因兴趣练手之故,我有写过一个简单的基于 TCP 的文件下载服务器。而其实,它和 groupcache 的使用场景很相似,只是后者使用了 HTTP 且不太容易适配某些使用场景等。于是我想,能否部分参考 groupcache,也编写个基于 TCP 的 cacheserver 呢?于是便有了 ecocache 的开发计划。

      一开始,ecocache 只是单纯地想设计成类似 groupcache 的缓存服务器,但前者将是完整的程序并能定制提供数据获取等实现(譬如以动态库的方式),如此 ecocache 即既能作为譬如文件下载服务器,亦能在一些场合取代 Redis 等。但在开发过程中却总觉得,作为完整的程序,若 ecocache 提供以动态库的方式来定制实现数据获取,既增加了复杂度,也使得代码看上去挺 ugly。综合考虑之下,ecocache 对外将只提供譬如 GET、SET 等有限功能,且通讯协议也会足够简单:

    const (
    	CmSmDif = 127
    
    	CM_REGSVR = 1
    	SM_REGSVR = CmSmDif + CM_REGSVR
    
    	CM_DELSVR = 2
    	SM_DELSVR = CmSmDif + CM_DELSVR
    
    	CM_REQSVR = 3
    	SM_REQSVR = CmSmDif + CM_REQSVR
    
    	CM_PING = 4
    	SM_PING = CmSmDif + CM_PING
    
    	CM_GET = 5
    	SM_GET = CmSmDif + CM_GET
    
    	CM_SET = 6
    	SM_SET = CmSmDif + CM_SET
    )
    
    const (
    	SizeOfPacketHeadLen   = 4
    	SizeOfPacketHeadCmd   = 1
    	SizeOfPacketHeadRet   = 1
    	SizeOfPacketHeadParam = 2
    	SizeOfPacketHead      = SizeOfPacketHeadLen + SizeOfPacketHeadCmd + SizeOfPacketHeadRet + SizeOfPacketHeadParam
    )
    
    type Head struct {
    	Len   uint32
    	Cmd   uint8
    	Ret   uint8
    	Param uint16
    }
    
    type Body struct {
    	Content []byte
    }
    
    type Packet struct {
    	Head
    	Body
    }

      ecocache 的架构图基本如下:

      1) cacheserver 分布式部署,相互之间没有交互,且都与 load balancer 保持连接(当然 load balancer 并非必须)

      2) load balancer 以一致性哈希的方式为客户端提供一个可用的 cacheserver 地址

      3) 客户端连接至此 cacheserver,继而 GET、SET 等

      而其实,ecocache 更适合使用在只有或绝多都是 GET 的场景下,SET 的实现尚很粗糙原始。实际使用上,应先预热缓存。

      基本上,ecocache 也比较可能会慢慢修补改进。

  • 相关阅读:
    flume1.7.0的安装与使用
    获取top10
    editplus格式化xml文档
    LOG4J.PROPERTIES配置详解
    Oracle自增列
    javascript 传递引用类型参数
    {JavaScript}栈和堆内存,作用域
    JAVA中String与StringBuffer的区别
    Java中堆和栈的区别(转)
    JAVA错误:org.apache.jasper.JasperException: java.lang.ClassCastException:org.apache.catalina.util.DefaultAnnotationProcessor cannot be cast to org.apach
  • 原文地址:https://www.cnblogs.com/ecofast/p/9084160.html
Copyright © 2011-2022 走看看