一、TCP Handler
nsqd里面的Main函数。
//nsqd.go
func (n *NSQD) Main() error {
//...
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
//...
}
//tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
//...
go func() {
handler.Handle(clientConn)
wg.Done()
}()
//...
}
n.tcpServer实现了TCPHandler接口
在nsqd.New()里面创建的tcpServer
//nsqd.go
func New(opts *Options) (*NSQD, error) {
//...
n.tcpServer = &tcpServer{}
//...
}
看下tcpServer实现的Handle接口里做了什么
func (p *tcpServer) Handle(clientConn net.Conn) {
//...
//从socket中读取数据
buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)
if err != nil {
p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
clientConn.Close()
return
}
protocolMagic := string(buf)
p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
var prot protocol.Protocol
switch protocolMagic {
case " V2":
//这里是关键,创建了一个protocolV2对象
prot = &protocolV2{nsqd: p.nsqd}
default:
protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
clientConn.Close()
p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
clientConn.RemoteAddr(), protocolMagic)
return
}
p.conns.Store(clientConn.RemoteAddr(), clientConn)
//开启protocolV2的IOLoop,这是一个客户端连接的“守护”协程
//接收消息和发送消息给客户端,都在这里面处理了
err = prot.IOLoop(clientConn)
if err != nil {
p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
}
p.conns.Delete(clientConn.RemoteAddr())
}
//protocol_v2.go
//因为这里是在Handler里启动的,所以这里其实是为每个客户端都启动了一个Loop
func (p *protocolV2) IOLoop(conn net.Conn) error {
...
clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1)
client := newClientV2(clientID, conn, p.nsqd)
p.nsqd.AddClient(client.ID, client)
...
// messagePump负责从channel的memoryMsgChan和
//backend.ReadChan()中读取消息并将消息推送给client
messagePumpStartedChan := make(chan bool)
go p.messagePump(client, messagePumpStartedChan)
<-messagePumpStartedChan
//下面这个for循环负责接收客户端消息,比如消费订阅,以及生产消息等
//主要逻辑在p.Exec()里
for {
...
//主要逻辑在这个Exec里面
response, err = p.Exec(client, params)
...
}
//...
}
二、接收消息
上面已经看到处理客户端消息主要在protocolV2.Exec()里
这段代码我觉得很好理解了,直接去protocolV2.PUB()
看客户端生产消息的逻辑,其他的指令先不看
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
if bytes.Equal(params[0], []byte("IDENTIFY")) {
return p.IDENTIFY(client, params)
}
err := enforceTLSPolicy(client, p, params[0])
if err != nil {
return nil, err
}
switch {
...
//这里就是客户端生产消息的指令处理了
case bytes.Equal(params[0], []byte("PUB")):
return p.PUB(client, params)
...
case bytes.Equal(params[0], []byte("SUB")):
return p.SUB(client, params)
}
...
return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
}
func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
var err error
...
bodyLen, err := readLen(client.Reader, client.lenSlice)
...
//读取消息主体
messageBody := make([]byte, bodyLen)
_, err = io.ReadFull(client.Reader, messageBody)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
}
...
//将消息丢到topic.PutMessage()
//PutMessage直接将消息丢到msgChan或者diskqueue了
//关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html
topic := p.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
}
client.PublishedMessage(topicName, 1)
return okBytes, nil
}
三、发送消息
现在来搞清楚,服务器端又在哪里发送消息给consumer?
在这protocolV2.messagePump()
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
var flusherChan <-chan time.Time
var sampleRate int32
//客户端执行Sub的时候,会将Channle丢到这个SubEventChan里
//可以去看protocolV2.SUB()函数
subEventChan := client.SubEventChan
//鉴权Identify对应的chan,只能鉴权一次
identifyEventChan := client.IdentifyEventChan
//flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
flushed := true
close(startedChan)
for {
// 检查订阅状态和消息是否可处理状态
if subChannel == nil || !client.IsReadyForMessages() {
// the client is not ready to receive messages...
memoryMsgChan = nil
backendMsgChan = nil
flusherChan = nil
// force flush
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
} else if flushed {
// last iteration we flushed...
// do not select on the flusher ticker channel
memoryMsgChan = subChannel.memoryMsgChan
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = nil
} else {
//这个memoryMsgChan是channel将消息存在内存中的地方
memoryMsgChan = subChannel.memoryMsgChan
//这个backendMsgChan是channel将消息存在磁盘中的地方
backendMsgChan = subChannel.backend.ReadChan()
flusherChan = outputBufferTicker.C
}
fmt.Printf("werben subChannel nil: %t
", subChannel == nil)
select {
case <-flusherChan:
//这个flusherChan就是outputBufferTicker
//250ms时间间隔Flush一次数据
client.writeLock.Lock()
err = client.Flush()
client.writeLock.Unlock()
if err != nil {
goto exit
}
flushed = true
case <-client.ReadyStateChan:
case subChannel = <-subEventChan:
//客户端Sub的时候,会将channel传到这个subEventChan通道,
//参考protocolV2.SUB()函数
subEventChan = nil
case identifyData := <-identifyEventChan:
//客户端提交identify时出发,只能提交一次identify,
//参考函数protocolV2.IDENTIFY()
//感觉这里就是在收到这个消息时
//重新启动心跳和flush同步的ticker
identifyEventChan = nil
outputBufferTicker.Stop()
if identifyData.OutputBufferTimeout > 0 {
outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
}
heartbeatTicker.Stop()
heartbeatChan = nil
if identifyData.HeartbeatInterval > 0 {
heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
heartbeatChan = heartbeatTicker.C
}
if identifyData.SampleRate > 0 {
sampleRate = identifyData.SampleRate
}
msgTimeout = identifyData.MsgTimeout
case <-heartbeatChan:
//心跳处理
err = p.Send(client, frameTypeResponse, heartbeatBytes)
if err != nil {
goto exit
}
case b := <-backendMsgChan:
...
//磁盘消息处理
client.SendingMessage()
...
err = p.SendMessage(client, msg)
...
flushed = false
case msg := <-memoryMsgChan:
//将内存消息发送给客户端
client.SendingMessage()
...
err = p.SendMessage(client, msg)
...
flushed = false
case <-client.ExitChan:
goto exit
}
}
exit:
...
//结束时候关闭心跳和flush同步的ticke
heartbeatTicker.Stop()
outputBufferTicker.Stop()
...
}