nsqd执行流程源码剖析
func (n *NSQD) Main() error {
exitCh := make(chan error)
var once sync.Once
exitFunc := func(err error) {
once.Do(func() {
if err != nil {
n.logf(LOG_FATAL, "%s", err)
}
exitCh <- err
})
}
n.tcpServer.nsqd = n
n.waitGroup.Wrap(func() {
exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
})
httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
})
if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
httpsServer := newHTTPServer(n, true, true)
n.waitGroup.Wrap(func() {
exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
})
}
n.waitGroup.Wrap(n.queueScanLoop)
n.waitGroup.Wrap(n.lookupLoop)
if n.getOpts().StatsdAddress != "" {
n.waitGroup.Wrap(n.statsdLoop)
}
err := <-exitCh
return err
}
- 在这个方法中并发启动了5个方法:
- 启动TCP(服务于客户端)
- 启动HTTP(提供API,可用于创建、删除topic和channel、生产数据、清空数据)
- 启动queueScanLoop方法
- 启动lookupLoop方法
- 启动statsdLoop方法
// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered "dirty".
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.
func (n *NSQD) queueScanLoop() {
workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
closeCh := make(chan int)
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
channels := n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
for {
select {
case <-workTicker.C:
if len(channels) == 0 {
continue
}
case <-refreshTicker.C:
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:
goto exit
}
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)
}
loop:
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]
}
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++
}
}
if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop
}
}
exit:
n.logf(LOG_INFO, "QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()
}
- queueScanLoop管理一个queueScanWorker pool(默认大小为4),各个worker并发处理channel数据。
- queueScanLoop的处理方法模仿了Redis的概率到期算法(probabilistic expiration algorithm),每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,如果某个被选中channel存在InFlighting消息或者Deferred消息,则认为该channel为“脏”channel。如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),则不投入睡眠,直接进行下一次概率选择[个人理解:目的是为了让“脏”消息尽快重新投放]。
- channel缓存每QueueScanRefreshInterval(默认5s)刷新一次
// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1
} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
}
for {
if idealPoolSize == n.poolSize {
break
} else if idealPoolSize < n.poolSize {
// contract
closeCh <- 1
n.poolSize--
} else {
// expand
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)
})
n.poolSize++
}
}
}
- resizePool 动态调整QueueScanWorkerPool的大小,负责worker的创建与销毁。
- worker的完美个数为channel总数的四分之一,但是不能大于QueueScanWorkerPoolMax。
- 所有的worker都会监听同一个workCh、closeCh,如果worker过多,则只需要向closeCh写入一个“通知”,收到这个“通知”的worker就会被销毁。
- 一次for循环只创建或销毁一个worker,直至worker数目达到idealPoolSize。
// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
if c.processInFlightQueue(now) {
dirty = true
}
if c.processDeferredQueue(now) {
dirty = true
}
responseCh <- dirty
case <-closeCh:
return
}
}
}
- Worker 对processInFlightQueue中消息进行处理。
- Worker 对processDeferredQueue中消息进行处理。
- 只要两个queue中任意一个有处理的消息,则标记该channel为“dirty”。
func (c *Channel) processInFlightQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()
if c.Exiting() {
return false
}
dirty := false
for {
c.inFlightMutex.Lock()
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()
if msg == nil {
goto exit
}
dirty = true
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit
}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()
}
c.put(msg)
}
exit:
return dirty
}
- 根据优先级(到期时间)从 inFlightPQ中 pop出该消息。
- 将channel 中关联的inFlightMessages删除。
- 然后把信息Put到该channel的memoryMsgChan中,供下次重新投放。
//lookupLoop内容太多,这里分几个部分来说
//part-1
func (n *NSQD) lookupLoop() {
var lookupPeers []*lookupPeer
var lookupAddrs []string
connect := true
hostname, err := os.Hostname()
if err != nil {
n.logf(LOG_FATAL, "failed to get hostname - %s", err)
os.Exit(1)
}
// for announcements, lookupd determines the host automatically
ticker := time.Tick(15 * time.Second)
- 首先创建了两个slice用来保存和nsqlookupd的连接和nsqlookup的地址,接下来获取本地主机的名字
- 创建了一个每15s触发一次的定时器
//lookupLoop内容太多,这里分几个部分来说
//part-2
for {
if connect {
for _, host := range n.getOpts().NSQLookupdTCPAddresses {
if in(host, lookupAddrs) {
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
connectCallback(n, hostname))
lookupPeer.Command(nil) // start the connection
lookupPeers = append(lookupPeers, lookupPeer)
lookupAddrs = append(lookupAddrs, host)
}
n.lookupPeers.Store(lookupPeers)
connect = false
}
- 这是一个死循环,由于我们在启动nsqd的时候传入了NSQLookupd的TCP地址,所以一开始就会遍历我们传入的地址,如果之前处理过了会保存到lookupAddrs里面,如果没有继续向下执行,创建一个lookupPeer,执行回调函数进行建立连接,并保存当前信息,我们可以发现在创建lookupPeer的时候传入了一个回调函数,来看看它里面做了什么:
func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
return func(lp *lookupPeer) {
ci := make(map[string]interface{})
ci["version"] = version.Binary
ci["tcp_port"] = n.getOpts().BroadcastTCPPort
ci["http_port"] = n.getOpts().BroadcastHTTPPort
ci["hostname"] = hostname
ci["broadcast_address"] = n.getOpts().BroadcastAddress
cmd, err := nsq.Identify(ci)
if err != nil {
lp.Close()
return
}
resp, err := lp.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
return
} else if bytes.Equal(resp, []byte("E_INVALID")) {
n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp)
lp.Close()
return
} else {
err = json.Unmarshal(resp, &lp.Info)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp)
lp.Close()
return
} else {
n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info)
if lp.Info.BroadcastAddress == "" {
n.logf(LOG_ERROR, "LOOKUPD(%s): no broadcast address", lp)
}
}
}
// build all the commands first so we exit the lock(s) as fast as possible
var commands []*nsq.Command
n.RLock()
for _, topic := range n.topicMap {
topic.RLock()
if len(topic.channelMap) == 0 {
commands = append(commands, nsq.Register(topic.name, ""))
} else {
for _, channel := range topic.channelMap {
commands = append(commands, nsq.Register(channel.topicName, channel.name))
}
}
topic.RUnlock()
}
n.RUnlock()
for _, cmd := range commands {
n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd)
_, err := lp.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
return
}
}
}
}
- 保存当前nsqd的信息放到一个map中,构建IDENTIFY命名返回给cmd,将cmd发送给nsqlookupd并等待response响应,从响应中中获取nsqlookupd并保存到lookupPeer的Info中,接下来便利自己的topicMap,为每一个topic构造register请求并放到commands中,最后逐条发送commands里面的命令。
- 总结一下这个回调函数的两个任务:
- 向nsqlookupd发送自己的信息并从响应中获取当前nsqlookupd的信息保存起来。
- 向nsqlookupd注册自己的所有topic。
//lookupLoop内容太多,这里分几个部分来说
//part-2
select {
case <-ticker:
// send a heartbeat and read a response (read detects closed conns)
for _, lookupPeer := range lookupPeers {
n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
cmd := nsq.Ping()
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
}
}
case val := <-n.notifyChan:
var cmd *nsq.Command
var branch string
switch val.(type) {
case *Channel:
// notify all nsqlookupds that a new channel exists, or that it's removed
branch = "channel"
channel := val.(*Channel)
if channel.Exiting() == true {
cmd = nsq.UnRegister(channel.topicName, channel.name)
} else {
cmd = nsq.Register(channel.topicName, channel.name)
}
case *Topic:
// notify all nsqlookupds that a new topic exists, or that it's removed
branch = "topic"
topic := val.(*Topic)
if topic.Exiting() == true {
cmd = nsq.UnRegister(topic.name, "")
} else {
cmd = nsq.Register(topic.name, "")
}
}
for _, lookupPeer := range lookupPeers {
n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
_, err := lookupPeer.Command(cmd)
if err != nil {
n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
}
}
case <-n.optsNotificationChan:
var tmpPeers []*lookupPeer
var tmpAddrs []string
for _, lp := range lookupPeers {
if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
tmpPeers = append(tmpPeers, lp)
tmpAddrs = append(tmpAddrs, lp.addr)
continue
}
n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
lp.Close()
}
lookupPeers = tmpPeers
lookupAddrs = tmpAddrs
connect = true
case <-n.exitChan:
goto exit
}
}
- 第一个是定时器每15s触发一次,向所有的nsqlookupd发送心跳包。
- notifyChan是用来传递有关topic的信息的,当有新的topic添加或者topic清除的时候向notifyChan中添加消息,根据内容的类型构造register或者unregister命令向所有nsqlookupd发送。
- optsNotificationChan使用来传递nsqlookupd变化的消息的。
- exitChan传递退出消息。
- 总结一下lookupLoop的主要功能:
- 与所有的nsqlookupd建立连接并注册topic。
- 每15s向nsqlookupd发送心跳包。
- 当topic发生变化的时候,向nsqlookupd发送register或者unregister命令。
- 对nsqlookupd进行监听。
- 监听退出消息。