topic的入口在哪里:GetTopic()
GetTopic如果存在则直接返回,不存在则NewTopic()
个人觉得Topic里面有两个重要的变量和一个函数,搞清楚这三个东西就差不多了
-
memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小MemQueueSize,
默认配置是10000,也就是如果堆积的消息超过10000就会使用磁盘了 -
backend :就是diskqueue,这个就是磁盘存储消息的地方了,这个diskqueue一定要搞懂,因为后面channel也会用到这个queue,关于这个diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html
-
messagePump : 这是topic的一个“守护”协程,看源码里的英文注释,
messagePump selects over the in-memory and backend queue and writes messages to every channel for this topic
,它将topic收到的消息,分发给每个channel。
func (t *Topic) messagePump() {
var msg *Message
var buf []byte
var err error
var chans []*Channel
var memoryMsgChan chan *Message
var backendChan <-chan []byte
// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
//这里就是要等到startChan完成后才能往下走,
for {
select {
case <-t.channelUpdateChan:
continue
case <-t.pauseChan:
continue
case <-t.exitChan:
goto exit
case <-t.startChan:
//也就是要等到topic执行完GetChannel()之后才会接着往下走
}
break
}
t.RLock()
//将所有channel通道放在chans中
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) > 0 && !t.IsPaused() {
memoryMsgChan = t.memoryMsgChan
//backendChan就是backend暴露给外部的readChan
//参考: https://www.cnblogs.com/werben/p/14517781.html
backendChan = t.backend.ReadChan()
}
// main message loop
//这里是守护协程的主体了,也就是这个for会一直跑
for {
select {
case msg = <-memoryMsgChan:
//如果topic有收到新消息
case buf = <-backendChan:
//如果消息是从diskqueue里来的,还要解码反序列化成msg
msg, err = decodeMessage(buf)
if err != nil {
t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
continue
}
case <-t.channelUpdateChan:
//如果有新的channel通道
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.pauseChan:
//如果channel通道暂停
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
case <-t.exitChan:
goto exit
}
//遍历每一个channel通道,将消息投递过去
for i, channel := range chans {
chanMsg := msg
// copy the message because each channel
// needs a unique instance but...
// fastpath to avoid copy if its the first channel
// (the topic already created the first copy)
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
//如果是延时消息则将延时消息丢给channel
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
//将消息则将延时消息丢给channel
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
}
exit:
t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
}