zoukankan      html  css  js  c++  java
  • NSQ(4)-nsqd执行流程

    nsqd执行流程源码剖析

    func (n *NSQD) Main() error {
    	exitCh := make(chan error)
    	var once sync.Once
    	exitFunc := func(err error) {
    		once.Do(func() {
    			if err != nil {
    				n.logf(LOG_FATAL, "%s", err)
    			}
    			exitCh <- err
    		})
    	}
    
    	n.tcpServer.nsqd = n
    	n.waitGroup.Wrap(func() {
    		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    	})
    
    	httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
    	n.waitGroup.Wrap(func() {
    		exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    	})
    
    	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
    		httpsServer := newHTTPServer(n, true, true)
    		n.waitGroup.Wrap(func() {
    			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
    		})
    	}
    
    	n.waitGroup.Wrap(n.queueScanLoop)
    	n.waitGroup.Wrap(n.lookupLoop)
    	if n.getOpts().StatsdAddress != "" {
    		n.waitGroup.Wrap(n.statsdLoop)
    	}
    
    	err := <-exitCh
    	return err
    }
    
    • 在这个方法中并发启动了5个方法:
      • 启动TCP(服务于客户端)
      • 启动HTTP(提供API,可用于创建、删除topic和channel、生产数据、清空数据)
      • 启动queueScanLoop方法
      • 启动lookupLoop方法
      • 启动statsdLoop方法
    // queueScanLoop runs in a single goroutine to process in-flight and deferred
    // priority queues. It manages a pool of queueScanWorker (configurable max of
    // QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
    //
    // It copies Redis's probabilistic expiration algorithm: it wakes up every
    // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
    // (default: 20) channels from a locally cached list (refreshed every
    // QueueScanRefreshInterval (default: 5s)).
    //
    // If either of the queues had work to do the channel is considered "dirty".
    //
    // If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
    // the loop continues without sleep.
    func (n *NSQD) queueScanLoop() {
    	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    	closeCh := make(chan int)
    
    	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
    
    	channels := n.channels()
    	n.resizePool(len(channels), workCh, responseCh, closeCh)
    
    	for {
    		select {
    		case <-workTicker.C:
    			if len(channels) == 0 {
    				continue
    			}
    		case <-refreshTicker.C:
    			channels = n.channels()
    			n.resizePool(len(channels), workCh, responseCh, closeCh)
    			continue
    		case <-n.exitChan:
    			goto exit
    		}
    
    		num := n.getOpts().QueueScanSelectionCount
    		if num > len(channels) {
    			num = len(channels)
    		}
    
    	loop:
    		for _, i := range util.UniqRands(num, len(channels)) {
    			workCh <- channels[i]
    		}
    
    		numDirty := 0
    		for i := 0; i < num; i++ {
    			if <-responseCh {
    				numDirty++
    			}
    		}
    
    		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
    			goto loop
    		}
    	}
    
    exit:
    	n.logf(LOG_INFO, "QUEUESCAN: closing")
    	close(closeCh)
    	workTicker.Stop()
    	refreshTicker.Stop()
    }
    
    • queueScanLoop管理一个queueScanWorker pool(默认大小为4),各个worker并发处理channel数据。
    • queueScanLoop的处理方法模仿了Redis的概率到期算法(probabilistic expiration algorithm),每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,如果某个被选中channel存在InFlighting消息或者Deferred消息,则认为该channel为“脏”channel。如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),则不投入睡眠,直接进行下一次概率选择[个人理解:目的是为了让“脏”消息尽快重新投放]。
    • channel缓存每QueueScanRefreshInterval(默认5s)刷新一次
    // resizePool adjusts the size of the pool of queueScanWorker goroutines
    //
    // 	1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
    //
    func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    	idealPoolSize := int(float64(num) * 0.25)
    	if idealPoolSize < 1 {
    		idealPoolSize = 1
    	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
    		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    	}
    	for {
    		if idealPoolSize == n.poolSize {
    			break
    		} else if idealPoolSize < n.poolSize {
    			// contract
    			closeCh <- 1
    			n.poolSize--
    		} else {
    			// expand
    			n.waitGroup.Wrap(func() {
    				n.queueScanWorker(workCh, responseCh, closeCh)
    			})
    			n.poolSize++
    		}
    	}
    }
    
    • resizePool 动态调整QueueScanWorkerPool的大小,负责worker的创建与销毁。
    • worker的完美个数为channel总数的四分之一,但是不能大于QueueScanWorkerPoolMax。
    • 所有的worker都会监听同一个workCh、closeCh,如果worker过多,则只需要向closeCh写入一个“通知”,收到这个“通知”的worker就会被销毁。
    • 一次for循环只创建或销毁一个worker,直至worker数目达到idealPoolSize。
    // queueScanWorker receives work (in the form of a channel) from queueScanLoop
    // and processes the deferred and in-flight queues
    func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    	for {
    		select {
    		case c := <-workCh:
    			now := time.Now().UnixNano()
    			dirty := false
    			if c.processInFlightQueue(now) {
    				dirty = true
    			}
    			if c.processDeferredQueue(now) {
    				dirty = true
    			}
    			responseCh <- dirty
    		case <-closeCh:
    			return
    		}
    	}
    }
    
    • Worker 对processInFlightQueue中消息进行处理。
    • Worker 对processDeferredQueue中消息进行处理。
    • 只要两个queue中任意一个有处理的消息,则标记该channel为“dirty”。
    func (c *Channel) processInFlightQueue(t int64) bool {
    	c.exitMutex.RLock()
    	defer c.exitMutex.RUnlock()
    
    	if c.Exiting() {
    		return false
    	}
    
    	dirty := false
    	for {
    		c.inFlightMutex.Lock()
    		msg, _ := c.inFlightPQ.PeekAndShift(t)
    		c.inFlightMutex.Unlock()
    
    		if msg == nil {
    			goto exit
    		}
    		dirty = true
    
    		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
    		if err != nil {
    			goto exit
    		}
    		atomic.AddUint64(&c.timeoutCount, 1)
    		c.RLock()
    		client, ok := c.clients[msg.clientID]
    		c.RUnlock()
    		if ok {
    			client.TimedOutMessage()
    		}
    		c.put(msg)
    	}
    
    exit:
    	return dirty
    }
    
    • 根据优先级(到期时间)从 inFlightPQ中 pop出该消息。
    • 将channel 中关联的inFlightMessages删除。
    • 然后把信息Put到该channel的memoryMsgChan中,供下次重新投放。
    //lookupLoop内容太多,这里分几个部分来说
    //part-1
    func (n *NSQD) lookupLoop() {
    	var lookupPeers []*lookupPeer
    	var lookupAddrs []string
    	connect := true
    
    	hostname, err := os.Hostname()
    	if err != nil {
    		n.logf(LOG_FATAL, "failed to get hostname - %s", err)
    		os.Exit(1)
    	}
    
    	// for announcements, lookupd determines the host automatically
    	ticker := time.Tick(15 * time.Second)
    
    • 首先创建了两个slice用来保存和nsqlookupd的连接和nsqlookup的地址,接下来获取本地主机的名字
    • 创建了一个每15s触发一次的定时器
    //lookupLoop内容太多,这里分几个部分来说
    //part-2
    	for {
    		if connect {
    			for _, host := range n.getOpts().NSQLookupdTCPAddresses {
    				if in(host, lookupAddrs) {
    					continue
    				}
    				n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
    				lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
    					connectCallback(n, hostname))
    				lookupPeer.Command(nil) // start the connection
    				lookupPeers = append(lookupPeers, lookupPeer)
    				lookupAddrs = append(lookupAddrs, host)
    			}
    			n.lookupPeers.Store(lookupPeers)
    			connect = false
    		}
    
    • 这是一个死循环,由于我们在启动nsqd的时候传入了NSQLookupd的TCP地址,所以一开始就会遍历我们传入的地址,如果之前处理过了会保存到lookupAddrs里面,如果没有继续向下执行,创建一个lookupPeer,执行回调函数进行建立连接,并保存当前信息,我们可以发现在创建lookupPeer的时候传入了一个回调函数,来看看它里面做了什么:
    func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
    	return func(lp *lookupPeer) {
    		ci := make(map[string]interface{})
    		ci["version"] = version.Binary
    		ci["tcp_port"] = n.getOpts().BroadcastTCPPort
    		ci["http_port"] = n.getOpts().BroadcastHTTPPort
    		ci["hostname"] = hostname
    		ci["broadcast_address"] = n.getOpts().BroadcastAddress
    
    		cmd, err := nsq.Identify(ci)
    		if err != nil {
    			lp.Close()
    			return
    		}
    		resp, err := lp.Command(cmd)
    		if err != nil {
    			n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
    			return
    		} else if bytes.Equal(resp, []byte("E_INVALID")) {
    			n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp)
    			lp.Close()
    			return
    		} else {
    			err = json.Unmarshal(resp, &lp.Info)
    			if err != nil {
    				n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp)
    				lp.Close()
    				return
    			} else {
    				n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info)
    				if lp.Info.BroadcastAddress == "" {
    					n.logf(LOG_ERROR, "LOOKUPD(%s): no broadcast address", lp)
    				}
    			}
    		}
    
    		// build all the commands first so we exit the lock(s) as fast as possible
    		var commands []*nsq.Command
    		n.RLock()
    		for _, topic := range n.topicMap {
    			topic.RLock()
    			if len(topic.channelMap) == 0 {
    				commands = append(commands, nsq.Register(topic.name, ""))
    			} else {
    				for _, channel := range topic.channelMap {
    					commands = append(commands, nsq.Register(channel.topicName, channel.name))
    				}
    			}
    			topic.RUnlock()
    		}
    		n.RUnlock()
    
    		for _, cmd := range commands {
    			n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd)
    			_, err := lp.Command(cmd)
    			if err != nil {
    				n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
    				return
    			}
    		}
    	}
    }
    
    • 保存当前nsqd的信息放到一个map中,构建IDENTIFY命名返回给cmd,将cmd发送给nsqlookupd并等待response响应,从响应中中获取nsqlookupd并保存到lookupPeer的Info中,接下来便利自己的topicMap,为每一个topic构造register请求并放到commands中,最后逐条发送commands里面的命令。
    • 总结一下这个回调函数的两个任务:
      • 向nsqlookupd发送自己的信息并从响应中获取当前nsqlookupd的信息保存起来。
      • 向nsqlookupd注册自己的所有topic。
    //lookupLoop内容太多,这里分几个部分来说
    //part-2		
    select {
    		case <-ticker:
    			// send a heartbeat and read a response (read detects closed conns)
    			for _, lookupPeer := range lookupPeers {
    				n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
    				cmd := nsq.Ping()
    				_, err := lookupPeer.Command(cmd)
    				if err != nil {
    					n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
    				}
    			}
    		case val := <-n.notifyChan:
    			var cmd *nsq.Command
    			var branch string
    
    			switch val.(type) {
    			case *Channel:
    				// notify all nsqlookupds that a new channel exists, or that it's removed
    				branch = "channel"
    				channel := val.(*Channel)
    				if channel.Exiting() == true {
    					cmd = nsq.UnRegister(channel.topicName, channel.name)
    				} else {
    					cmd = nsq.Register(channel.topicName, channel.name)
    				}
    			case *Topic:
    				// notify all nsqlookupds that a new topic exists, or that it's removed
    				branch = "topic"
    				topic := val.(*Topic)
    				if topic.Exiting() == true {
    					cmd = nsq.UnRegister(topic.name, "")
    				} else {
    					cmd = nsq.Register(topic.name, "")
    				}
    			}
    
    			for _, lookupPeer := range lookupPeers {
    				n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
    				_, err := lookupPeer.Command(cmd)
    				if err != nil {
    					n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
    				}
    			}
    		case <-n.optsNotificationChan:
    			var tmpPeers []*lookupPeer
    			var tmpAddrs []string
    			for _, lp := range lookupPeers {
    				if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
    					tmpPeers = append(tmpPeers, lp)
    					tmpAddrs = append(tmpAddrs, lp.addr)
    					continue
    				}
    				n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
    				lp.Close()
    			}
    			lookupPeers = tmpPeers
    			lookupAddrs = tmpAddrs
    			connect = true
    		case <-n.exitChan:
    			goto exit
    		}
    	}
    
    • 第一个是定时器每15s触发一次,向所有的nsqlookupd发送心跳包。
    • notifyChan是用来传递有关topic的信息的,当有新的topic添加或者topic清除的时候向notifyChan中添加消息,根据内容的类型构造register或者unregister命令向所有nsqlookupd发送。
    • optsNotificationChan使用来传递nsqlookupd变化的消息的。
    • exitChan传递退出消息。
    • 总结一下lookupLoop的主要功能:
      • 与所有的nsqlookupd建立连接并注册topic。
      • 每15s向nsqlookupd发送心跳包。
      • 当topic发生变化的时候,向nsqlookupd发送register或者unregister命令。
      • 对nsqlookupd进行监听。
      • 监听退出消息。
    所有博文均为原著,如若转载,请注明出处!
  • 相关阅读:
    LeetCode 226. Invert Binary Tree
    LeetCode 221. Maximal Square
    LeetCode 217. Contains Duplicate
    LeetCode 206. Reverse Linked List
    LeetCode 213. House Robber II
    LeetCode 198. House Robber
    LeetCode 188. Best Time to Buy and Sell Stock IV (stock problem)
    LeetCode 171. Excel Sheet Column Number
    LeetCode 169. Majority Element
    运维工程师常见面试题
  • 原文地址:https://www.cnblogs.com/zpcoding/p/14517528.html
Copyright © 2011-2022 走看看