rabbitmq_1源码
带着疑惑rabbitmq如何用一条tcp连接的,下面就来解答
仓库地址: github.com/valinurovam/garagemq
-
定位main函数
// Start GarageMQ broker srv.Start()
-
Start()里面只有这个函数有用
go srv.listen()
-
Listen()函数
for { conn, err := srv.listener.AcceptTCP() if err != nil { if srv.status != Running { return } srv.stopWithError(err, "accepting connection") } log.WithFields(log.Fields{ "from": conn.RemoteAddr().String(), "to": conn.LocalAddr().String(), }).Info("accepting connection") glog.DEBUG.Println("读取tcp数据大小: ", srv.config.TCP.ReadBufSize, " 写大小: ", srv.config.TCP.WriteBufSize) conn.SetReadBuffer(srv.config.TCP.ReadBufSize) conn.SetWriteBuffer(srv.config.TCP.WriteBufSize) conn.SetNoDelay(srv.config.TCP.Nodelay) // 处理连接,放进server的map srv.acceptConnection(conn) }
-
处理连接函数,只是简单的定位连接表示,然后继续看连接处理函数
srv.connLock.Lock() defer srv.connLock.Unlock() connection := NewConnection(srv, conn) srv.connections[connection.id] = connection go connection.handleConnection()
-
handleConnection()
先读取8字节标识符
处理管道和连接的关系
conn.ctx, conn.cancelCtx = context.WithCancel(context.Background()) channel := NewChannel(0, conn) conn.channelsLock.Lock() conn.channels[channel.id] = channel conn.channelsLock.Unlock() channel.start() conn.wg.Add(1) go conn.handleOutgoing() conn.wg.Add(1) go conn.handleIncoming()
-
继续追踪管道start,类似于tcp连接读写监控
总结
初探之后,go仓库他没有做到连接复用,但是整体思想还是可以继续研究