目录
基本Tcp服务器
- Server模块:服务器配置信息、启动、停止、运行、添加路由、路由管理器、链接管理器等。
- 链接属性配置模块:启动链接、停止链接、获取链接对象等。
- 消息封装:消息id、长度、内容、及消息的封包与解包等。
- 基础路由模块:提供路由接口及基类,开启服务前必须继承基类实现重写基类中的方法再注册。
- 全局配置模块:配置服务器的信息,使用前必须在根目录下配置json文件。
- 多路由模块:使服务器根据数据不同类型做不同业务,提供路由注册、添加、与删除。
- 读写分离:服务端读写分离。
- 任务队列+工作池:服务端默认开启工作池,工作池中协程的数量与任务队列数量相同,任务队列为缓冲信道。
- 链接管理:提供链接管理功能,链接的添加、删除、查找、清理等。
简单使用
1.配置json文件,名字为server.json。
{ "Name":"Demo_Server", "Version": "tcp4", "Host":"127.0.0.1", "TcpPort":"7777", "MaxPackSize":"4096", "MaxConn":"5000" }
2.服务端继承重写路由基类方法,业务方法。
3.配置封包与解包。
package pack import ( "bytes" "encoding/binary" "fmt" "sync" ) type Message struct { Msglen uint32 Msgid uint32 Msgdata []byte } //封包函数 func Pack(len uint32,id uint32,data []byte)([]byte,error) { var bufferPool = sync.Pool{ New:func() interface{}{ return new(bytes.Buffer) }, } //获取一个存放bytes的缓冲区,存储字节序列 dataBuff := bufferPool.Get().(*bytes.Buffer) //将数据长度写入字节流 err := binary.Write(dataBuff,binary.LittleEndian,len) checkerr(err) //将id写入字节流 err = binary.Write(dataBuff,binary.LittleEndian,id) checkerr(err) //将数据内容写入字节流 err = binary.Write(dataBuff,binary.LittleEndian,data) checkerr(err) return dataBuff.Bytes(),nil } //解包函数 func Unpack(data []byte)(*Message,error){ //这里可以不需要额外创建一个数据缓冲 //创建一个io。Reader boolBuffer := bytes.NewReader(data) msg := &Message{} //读取数据长度和id err := binary.Read(boolBuffer, binary.LittleEndian, &msg.Msglen) checkerr(err) err = binary.Read(boolBuffer, binary.LittleEndian, &msg.Msgid) checkerr(err) //数据包限制 //if // //} return msg,nil } func checkerr(err error){ if err != nil{ fmt.Println("数据写入与读取失败") } }
4.创建服务端句柄。
5.注册路由。
6.运行服务器。
服务端示例:
MyDemo/tcptest/pack:为封包与解包目录
Zinx_Server/znet为服务源码目录
package main import ( "MyDemo/tcptest/pack" "Zinx_Server/znet" "fmt" ) type PingRouter struct{ znet.BaseRouter } //Test Handle func (this *PingRouter)Handle(request znet.IRequest){ var id uint32 =0 s := "hello client,i am Handle" //将字符串转为字节切片 data := []byte(s) //对数据进行封包 data,_ = pack.Pack(uint32(len(data)),id,data) _,err := request.GetConnecttion().GetTcpConnecttion().Write(data) if err != nil { fmt.Println("call back ping error") } } func main(){ //创建Server句柄 Server := znet.NerServer() //注册路由1:客户端数据类型为1,则由服务端注册的此条路由处理 //所有注册的类都应该继承BaseRouter类并重写,而BaseRouter类则实现了IRouter接口。 Server.AddRouter(1,&PingRouter{}) //注册路由2:客户端数据类型为2,则由服务端注册的此条路由处理 //Server.AddRouter(2,&test{}) Server.Run() }
客户端示例:
package main import ( "MyDemo/tcptest/pack" "fmt" "net" "time" ) func main() { //1.创建链接远程链接服务器,得到一个conn链接 conn, err := net.Dial("tcp", "127.0.0.1:7777") if err != nil { fmt.Println("client start err,exit!") return } i := 1 var id uint32 = 1 for { //2.调用链接Write写数据 s := "Hello Server ,i am clent2" //将字符串转为字节切片 data := []byte(s) //对数据进行封包 data,_ = pack.Pack(uint32(len(data)),id,data) _, err := conn.Write(data) if err != nil { fmt.Println("write conn err", err) return } buf := make([]byte, 512) _,err = conn.Read(buf) //解包tcp数据 msg,_ := pack.Unpack(buf) fmt.Printf("recv msg id:%d ", msg.Msgid) fmt.Printf("recv msg len:%d ", msg.Msglen) fmt.Printf("recv buf:%s ",string(buf[8:8+msg.Msglen])) if err != nil { fmt.Println("read buf err") return } //fmt.Printf("Server call back:%s,cnt = %d ", buf, cnt) i++ time.Sleep(5 *time.Second) } }
服务端运行:
源码
文件目录
server.go
package znet import ( "fmt" "net" ) type IServer interface { //启动 Start() //停止 Stop() //运行 Run() //路由功能:给当前服务注册一个路由方法,供客户端链接处理使用 AddRouter(index uint32,router IRouter) //得到路由管理器 GetRoutermanger()IRoutermanger //得到链接管理器 GetConnManager()IConnManager } //IServer接口的实现,定义一个Server的服务模块 type Server struct{ //服务器的名称 Name string //IP版本 IPversion string //服务器的IP IP string //服务器的端口 Port string //最大链接数 MaxConn int64 //路由管理器 Routermange IRoutermanger //链接管理器 ConnManager IConnManager //工作池 Pool *WorkerPool } //启动服务器 func (s *Server)Start(){ s.Pool.StartWorkPool() //在这里使用go协程让服务端阻塞接收客户端的消息为异步。 //1.获取一个tcp的addr addr, err := net.ResolveTCPAddr(s.IPversion, fmt.Sprintf("%s:%s", s.IP, s.Port)) if err != nil { fmt.Println("resolve tcp addr error:", err) return } //2.监听服务器的地址 listenner, err := net.ListenTCP(s.IPversion, addr) if err != nil { fmt.Printf("listen:%s err:%s", s.IPversion, err) return } fmt.Println("Start Zinx serversucc:", s.Name) var cid uint32 cid = 0 go func() { //3.阻塞的等待客户端链接,触处理客户端链接业务 for { //如果有客户端链接过来,阻塞会返回 conn, err := listenner.AcceptTCP() if err != nil { fmt.Println("Accept err", err) continue } //判断是否超过最大链接数 if int64(s.ConnManager.GetConnLen()) >= s.MaxConn{ err := conn.Close() if err != nil{ fmt.Println("拒绝用户失败!") } }else{ //将每个客户端链接与服务端路由模块进行绑定 //c:封装每个客户端链接信息,绑定链接、id、及服务器的路由管理器 Conn := NewConnection(s,conn,cid,s.Routermange) cid++ //启动链接进行业务处理 s.Pool.Put(Conn) } } }() } //停止服务器 func (s *Server)Stop(){ s.ConnManager.ClearConn() //将服务器的资源、状态或者已经开辟的链接信息回收 } //运行服务器 func (s *Server)Run(){ //启动Server的服务功能 s.Start() //做一些启动服务器之后的业务// //阻塞状态 select {} } //添加路由功能 func (s *Server)AddRouter(index uint32,router IRouter){ s.Routermange.AddRouter(index,router) fmt.Println("Add Router Succ!!!") } func (s *Server)GetRoutermanger()IRoutermanger{ return s.Routermange } func (s *Server)GetConnManager()IConnManager{ return s.ConnManager } //初始化Server模块方法 func NerServer() *Server{ config := NewConfig() s := &Server{ Name :config.Name, IPversion:config.Version, IP:config.Host, Port:config.TcpPort, MaxConn:config.MaxConn, Routermange:NewRoutermange(), ConnManager:NewConnManger(), Pool:NewWorkerPool(2,10), } return s }
connection.go
package znet import ( "fmt" "net" ) type IConnection interface { //启动链接:让当前链接开始工作 Start() //停止链接:结束当前链接的工作 Stop() //获取当前链接的绑定socket conn GetTcpConnecttion() *net.TCPConn //获取当前链接模块的ID GetConnID() uint32 //获取远程客户端tpc的状态、ip、port GetRemoteAddr() net.Addr //发送数据、将数据发送给远程的客户端 Send(data []byte) error } //链接模块 type Connection struct { //当前链接绑定的服务端 Server IServer //当前链接的socket tcp套接字 Conn *net.TCPConn //链接的ID ConnID uint32 //当前的链接状态 ConnStatus bool //告知当前链接已经退出的/停止的channel ExitChan chan bool //路由管理器 Routermange IRoutermanger //读协程与写协程之间数据信道 done chan IRequest } func (c *Connection)StartReader() { go c.Reader() go c.Write() } //读 func (c *Connection)Reader() { fmt.Println("Reader Goroutine is running") defer fmt.Println("connID=",c.ConnID,"Reader is exit,remote addr is",c.GetRemoteAddr().String()) for{ buf := make([]byte,1024) _,err := c.Conn.Read(buf) if err != nil{ fmt.Printf("recv buf err %s ",err) c.ExitChan <-true //通知Write,客户端断开链接 break } c.ExitChan <- false //如果客户端没有断开链接,向信道发送信息,继续执行解包 //将数据解包,得到结构体对象 // msg:封装数据长度、数据id(类型)、数据内容 msg := NewMessage(buf) //将当前链接信息和请求数据封装 req :=NewRequest(c,msg) //将解包后的数据传入信道,数据传入信道后会也会阻塞,直到信道中的数据被取走 c.done <- req } } //写 func (c *Connection)Write(){ fmt.Println("Writer Goroutine is running") for{ //信道阻塞,只有从信道中接收到信息才能往下执行 status := <- c.ExitChan if status{ c.Stop() break } else{ req := <-c.done //执行注册的路由方法,每次将req传入 go func(request IRequest){ //获取数据类型,查询路由表执行不同的路由 id := req.Getdata().Getmsgid() c.Routermange.IndexRouter(id).Handle(request) }(req) } } } //启动链接:让当前链接开始准备工作 func (c *Connection)Start(){ fmt.Println("Start Conn,ConnID:",c.ConnID) //TODO 启动从当前链接写数据的业务 go c.StartReader() } //停止链接:结束当前链接的工作 func (c *Connection)Stop(){ fmt.Println("Stop Conn,ConnID:",c.ConnID) if c.ConnStatus == true{ return } //设置状态 c.ConnStatus = false //关闭链接 c.Conn.Close() c.Server.GetConnManager().Remove(c.ConnID) //回收资源 close(c.ExitChan) close(c.done) } //获取当前链接的绑定socket conn func (c *Connection)GetTcpConnecttion() *net.TCPConn{ return c.Conn } //获取当前链接模块的ID func (c *Connection)GetConnID() uint32{ return c.ConnID } //获取远程客户端tpc的状态、ip、port func (c *Connection)GetRemoteAddr() net.Addr{ return c.Conn.RemoteAddr() } //发送数据、将数据发送给远程的客户端 func (c *Connection)Send(data []byte) error{ _,err := c.Conn.Write([]byte(fmt.Sprintf("Hello"))) return err } //此方法用来初始化每一个客户端链接 func NewConnection(Server IServer,conn *net.TCPConn,ConnID uint32,Routermange IRoutermanger)*Connection{ c := & Connection{ Server:Server, Conn:conn, ConnID:ConnID, ConnStatus:false, Routermange:Routermange, ExitChan:make(chan bool,1), done:make(chan IRequest), } Server.GetConnManager().Add(c.ConnID,c) return c }
connectmager.go
package znet import ( "errors" "sync" ) type IConnManager interface { Add(connID uint32,conn IConnection) Remove(connID uint32) GetConn(connID uint32)(IConnection,error) GetConnLen() int ClearConn() } type ConnManger struct { ConnMangerMap map[uint32]IConnection connLock *sync.RWMutex } //添加 func (M *ConnManger)Add(connID uint32,conn IConnection){ M.connLock.Lock() //加写锁 defer M.connLock.Unlock()//解写锁 M.ConnMangerMap[connID] = conn } //删除 func (M *ConnManger)Remove(connID uint32){ M.connLock.Lock() //加写锁 defer M.connLock.Unlock()//解写锁 delete(M.ConnMangerMap,connID) } //得到链接 func (M *ConnManger)GetConn(connID uint32)(IConnection,error){ M.connLock.RLock() //加读锁 defer M.connLock.RUnlock()//解读锁 if conn,ok := M.ConnMangerMap[connID];ok{ return conn,nil }else{ return nil,errors.New("connnection not Found!") } } //得到链接个数 func (M *ConnManger)GetConnLen()int{ return len(M.ConnMangerMap) } func (M *ConnManger)ClearConn(){ M.connLock.Lock() //加写锁 defer M.connLock.Unlock()//解写锁 for ConnID,conn := range M.ConnMangerMap{ //停止链接 conn.Stop() //删除链接 delete(M.ConnMangerMap,ConnID) } } func NewConnManger() *ConnManger{ return &ConnManger{ make(map[uint32]IConnection), new(sync.RWMutex), } }
datapack.go
package znet import ( "bytes" "encoding/binary" "fmt" ) type IDatapack interface { //获取包的头长度方法 GetHead() uint32 //封包 Pack(msg IMessage)([]byte,error) //解包 Unpack(data []byte)(Message,error) } type Datapack struct {} func (dp *Datapack)GetHead() uint32 { //4字节长度+4字节长度 return 8 } func (dp *Datapack)Pack(msg IMessage)([]byte,error) { //创建一个存放bytes的缓冲 dataBuff := bytes.NewBuffer([]byte{}) //将数据长度写入字节流 err := binary.Write(dataBuff,binary.LittleEndian,msg.Getmsglen()) checkerr(err) //将id写入字节流 err = binary.Write(dataBuff,binary.LittleEndian,msg.Getmsgid()) checkerr(err) //将数据内容写入字节流 err = binary.Write(dataBuff,binary.LittleEndian,msg.Getmsg()) checkerr(err) return dataBuff.Bytes(),nil } func (dp *Datapack)Unpack(data []byte)(*Message,error){ //这里可以不需要额外创建一个数据缓冲 //创建一个io。Reader boolBuffer := bytes.NewReader(data) msg := &Message{} //读取数据长度和id err := binary.Read(boolBuffer, binary.LittleEndian, &msg.Msglen) checkerr(err) err = binary.Read(boolBuffer, binary.LittleEndian, &msg.Msgid) checkerr(err) //数据包限制 //if // //} return msg,nil } func NewDatapack()*Datapack{ return &Datapack{} } func checkerr(err error){ if err != nil{ fmt.Println("数据写入与读取失败") } }
message.go
package znet import ( "fmt" ) type IMessage interface { //获取消息的内容 Getmsg() []byte //获取消息的长度 Getmsglen() uint32 //获取消息的ID Getmsgid() uint32 //设置消息的ID Setmsgid(id uint32) //设置消息的内容 Setmsg(data []byte) } type Message struct { //TLV格式数据 //消息长度 Msglen uint32 //消息ID Msgid uint32 //消息内容 data []byte } func (M *Message)Getmsg() []byte{ return M.data } func (M *Message)Getmsglen() uint32{ return M.Msglen } func (M *Message)Getmsgid() uint32{ return M.Msgid } func (M *Message)Setmsgid(id uint32){ M.Msgid = id } func (M *Message)Setmsg(data []byte){ M.data = data } func NewMessage(data []byte)*Message{ pack := NewDatapack() msg,_ := pack.Unpack(data) fmt.Printf("recv buf:%s ",string(data[8:8+msg.Msglen])) msg.data = data[8:8+msg.Msglen] return msg }
request.go
package znet type IRequest interface { //得到请求数据封装 Getdata() IMessage //得到当前链接封装 GetConnecttion() IConnection } type Request struct { //已经和客户端建立链接对象 conn IConnection //客户端请求数据 Msg IMessage } //得到请求数据 func (r *Request)Getdata()IMessage{ return r.Msg } //得到当前链接 func (r *Request)GetConnecttion() IConnection{ return r.conn } //初始化Request对象 func NewRequest(conn IConnection,Msg IMessage) *Request{ r := &Request{ conn:conn, Msg:Msg, } return r }
router.go
package znet type IRouter interface { //Hook机制:其主要思想是提前在可能增加功能的地方埋好(预设)一个钩子,这个钩子并没有实际的意义,当我们需要重新修改或者增加这个地方的逻辑的时候,把扩展的类或者方法挂载到这个点即可。 //在处理conn业务之前的钩子方法Hook PreHandle(request IRequest) //在处理conn业务主方法Hook Handle(request IRequest) //在处理conn业务之后的钩子方法Hook PostHandle(request IRequest) } //基类实现所有接口方法,但不是不具体写死,然后子类继承基类,重写基类方法 type BaseRouter struct{} //在处理conn业务之前的钩子方法Hook func (br *BaseRouter)PreHandle(request IRequest){} //在处理conn业务主方法Hook func (br *BaseRouter)Handle(request IRequest){} //在处理conn业务之后的钩子方法Hook func (br *BaseRouter)PostHandle(request IRequest){}
routermange.go
package znet type IRoutermanger interface { //返回索引对应的路由 IndexRouter(index uint32)IRouter //添加路由 AddRouter(index uint32, router IRouter) ////删除路由 DeleteRouter(index uint32) } type Routermange struct { //路由映射 Routermap map[uint32]IRouter //负责worker取任务的消息队列 //业务工作worker池的worker数量 } //返回索引对应的路由,服务端通过消息绑定的ID(类型),可以执行对应不同的注册路由函数 func (R * Routermange)IndexRouter(index uint32)IRouter{ return R.Routermap[index] } //添加路由 func (R * Routermange)AddRouter(index uint32,router IRouter){ R.Routermap[index] = router } ////删除路由 func (R * Routermange)DeleteRouter(index uint32){ delete(R.Routermap,index) } //初始化路由 func NewRoutermange()*Routermange{ return &Routermange{ make(map[uint32]IRouter), } }
config.go
package znet import ( "encoding/json" "fmt" "os" "strconv" ) type Config struct { Name string Version string Host string TcpPort string MaxPackSize int64 MaxConn int64 } func (c *Config)setname(Name string){ c.Name = Name } func (c *Config)setVersion(Version string){ c.Version = Version } func (c *Config)setHost(Host string){ c.Host = Host } func (c *Config)setTcpPort(TcpPort string){ c.TcpPort = TcpPort } func (c *Config)setMaxConn(MaxConn int64){ c.MaxConn = MaxConn } func (c *Config)setMaxPackSize(MaxPackSize int64){ c.MaxPackSize = MaxPackSize } func NewConfig() *Config { //打开json文件 srcFile,err := os.Open("D:/Go/test/src/Zinx_Server/znet/server.json") if err != nil{ fmt.Println("文件打开失败,err=",err) return nil } defer srcFile.Close() //创建接收数据的map类型数据 datamap := make(map[string]string) //创建解码器 decoder := json.NewDecoder(srcFile) //解码 err = decoder.Decode(&datamap) if err != nil{ fmt.Println("解码失败,err:",err) return nil } MaxConn,err := strconv.ParseInt(datamap["MaxConn"], 10, 64) MaxPackSize,err := strconv.ParseInt(datamap["MaxPackSize"],10,64) c := &Config{ datamap["Name"], datamap["Version"], datamap["Host"], datamap["TcpPort"], MaxPackSize, MaxConn, } return c }
WorkerPool.go
package znet //限制 //1.worker工作池的任务队列的最大值 //2.任务队列中任务的最大数量 //协程池 type WorkerPool struct { cap int tasksSize int TaskQueue []chan IConnection //信道集合 } //启动一个worker工作池,开启工作池只能发生一次 func (W *WorkerPool)StartWorkPool(){ //根据任务队列的大小,分别开启worker,每个worker用go来承载,每一个worker对应一个任务队列 for i:=0;i<W.cap;i++{ //为每个worker开辟缓冲信道(任务队列) W.TaskQueue[i] = make(chan IConnection,W.tasksSize) //启动worker,阻塞等待任务从channel中到来 go W.StartOneWorker(i,W.TaskQueue[i]) } } func (W *WorkerPool)StartOneWorker(id int,taskqueue chan IConnection){ for{ select { case request :=<- taskqueue: //如果有消息过来,则处理业务 request.Start() default: continue } } } func (W *WorkerPool)Put(Connection IConnection){ index := Connection.GetConnID()%uint32(W.cap) W.TaskQueue[index] <- Connection } func NewWorkerPool(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan IConnection,cap), } }