zoukankan      html  css  js  c++  java
  • NSQ(4)-nsqd执行流程

    nsqd执行流程源码剖析

    func (n *NSQD) Main() error {
    	exitCh := make(chan error)
    	var once sync.Once
    	exitFunc := func(err error) {
    		once.Do(func() {
    			if err != nil {
    				n.logf(LOG_FATAL, "%s", err)
    			}
    			exitCh <- err
    		})
    	}
    
    	n.tcpServer.nsqd = n
    	n.waitGroup.Wrap(func() {
    		exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    	})
    
    	httpServer := newHTTPServer(n, false, n.getOpts().TLSRequired == TLSRequired)
    	n.waitGroup.Wrap(func() {
    		exitFunc(http_api.Serve(n.httpListener, httpServer, "HTTP", n.logf))
    	})
    
    	if n.tlsConfig != nil && n.getOpts().HTTPSAddress != "" {
    		httpsServer := newHTTPServer(n, true, true)
    		n.waitGroup.Wrap(func() {
    			exitFunc(http_api.Serve(n.httpsListener, httpsServer, "HTTPS", n.logf))
    		})
    	}
    
    	n.waitGroup.Wrap(n.queueScanLoop)
    	n.waitGroup.Wrap(n.lookupLoop)
    	if n.getOpts().StatsdAddress != "" {
    		n.waitGroup.Wrap(n.statsdLoop)
    	}
    
    	err := <-exitCh
    	return err
    }
    
    • 在这个方法中并发启动了5个方法:
      • 启动TCP(服务于客户端)
      • 启动HTTP(提供API,可用于创建、删除topic和channel、生产数据、清空数据)
      • 启动queueScanLoop方法
      • 启动lookupLoop方法
      • 启动statsdLoop方法
    // queueScanLoop runs in a single goroutine to process in-flight and deferred
    // priority queues. It manages a pool of queueScanWorker (configurable max of
    // QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
    //
    // It copies Redis's probabilistic expiration algorithm: it wakes up every
    // QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
    // (default: 20) channels from a locally cached list (refreshed every
    // QueueScanRefreshInterval (default: 5s)).
    //
    // If either of the queues had work to do the channel is considered "dirty".
    //
    // If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
    // the loop continues without sleep.
    func (n *NSQD) queueScanLoop() {
    	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    	closeCh := make(chan int)
    
    	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
    
    	channels := n.channels()
    	n.resizePool(len(channels), workCh, responseCh, closeCh)
    
    	for {
    		select {
    		case <-workTicker.C:
    			if len(channels) == 0 {
    				continue
    			}
    		case <-refreshTicker.C:
    			channels = n.channels()
    			n.resizePool(len(channels), workCh, responseCh, closeCh)
    			continue
    		case <-n.exitChan:
    			goto exit
    		}
    
    		num := n.getOpts().QueueScanSelectionCount
    		if num > len(channels) {
    			num = len(channels)
    		}
    
    	loop:
    		for _, i := range util.UniqRands(num, len(channels)) {
    			workCh <- channels[i]
    		}
    
    		numDirty := 0
    		for i := 0; i < num; i++ {
    			if <-responseCh {
    				numDirty++
    			}
    		}
    
    		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
    			goto loop
    		}
    	}
    
    exit:
    	n.logf(LOG_INFO, "QUEUESCAN: closing")
    	close(closeCh)
    	workTicker.Stop()
    	refreshTicker.Stop()
    }
    
    • queueScanLoop管理一个queueScanWorker pool(默认大小为4),各个worker并发处理channel数据。
    • queueScanLoop的处理方法模仿了Redis的概率到期算法(probabilistic expiration algorithm),每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,如果某个被选中channel存在InFlighting消息或者Deferred消息,则认为该channel为“脏”channel。如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),则不投入睡眠,直接进行下一次概率选择[个人理解:目的是为了让“脏”消息尽快重新投放]。
    • channel缓存每QueueScanRefreshInterval(默认5s)刷新一次
    // resizePool adjusts the size of the pool of queueScanWorker goroutines
    //
    // 	1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
    //
    func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    	idealPoolSize := int(float64(num) * 0.25)
    	if idealPoolSize < 1 {
    		idealPoolSize = 1
    	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
    		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    	}
    	for {
    		if idealPoolSize == n.poolSize {
    			break
    		} else if idealPoolSize < n.poolSize {
    			// contract
    			closeCh <- 1
    			n.poolSize--
    		} else {
    			// expand
    			n.waitGroup.Wrap(func() {
    				n.queueScanWorker(workCh, responseCh, closeCh)
    			})
    			n.poolSize++
    		}
    	}
    }
    
    • resizePool 动态调整QueueScanWorkerPool的大小,负责worker的创建与销毁。
    • worker的完美个数为channel总数的四分之一,但是不能大于QueueScanWorkerPoolMax。
    • 所有的worker都会监听同一个workCh、closeCh,如果worker过多,则只需要向closeCh写入一个“通知”,收到这个“通知”的worker就会被销毁。
    • 一次for循环只创建或销毁一个worker,直至worker数目达到idealPoolSize。
    // queueScanWorker receives work (in the form of a channel) from queueScanLoop
    // and processes the deferred and in-flight queues
    func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    	for {
    		select {
    		case c := <-workCh:
    			now := time.Now().UnixNano()
    			dirty := false
    			if c.processInFlightQueue(now) {
    				dirty = true
    			}
    			if c.processDeferredQueue(now) {
    				dirty = true
    			}
    			responseCh <- dirty
    		case <-closeCh:
    			return
    		}
    	}
    }
    
    • Worker 对processInFlightQueue中消息进行处理。
    • Worker 对processDeferredQueue中消息进行处理。
    • 只要两个queue中任意一个有处理的消息,则标记该channel为“dirty”。
    func (c *Channel) processInFlightQueue(t int64) bool {
    	c.exitMutex.RLock()
    	defer c.exitMutex.RUnlock()
    
    	if c.Exiting() {
    		return false
    	}
    
    	dirty := false
    	for {
    		c.inFlightMutex.Lock()
    		msg, _ := c.inFlightPQ.PeekAndShift(t)
    		c.inFlightMutex.Unlock()
    
    		if msg == nil {
    			goto exit
    		}
    		dirty = true
    
    		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
    		if err != nil {
    			goto exit
    		}
    		atomic.AddUint64(&c.timeoutCount, 1)
    		c.RLock()
    		client, ok := c.clients[msg.clientID]
    		c.RUnlock()
    		if ok {
    			client.TimedOutMessage()
    		}
    		c.put(msg)
    	}
    
    exit:
    	return dirty
    }
    
    • 根据优先级(到期时间)从 inFlightPQ中 pop出该消息。
    • 将channel 中关联的inFlightMessages删除。
    • 然后把信息Put到该channel的memoryMsgChan中,供下次重新投放。
    //lookupLoop内容太多,这里分几个部分来说
    //part-1
    func (n *NSQD) lookupLoop() {
    	var lookupPeers []*lookupPeer
    	var lookupAddrs []string
    	connect := true
    
    	hostname, err := os.Hostname()
    	if err != nil {
    		n.logf(LOG_FATAL, "failed to get hostname - %s", err)
    		os.Exit(1)
    	}
    
    	// for announcements, lookupd determines the host automatically
    	ticker := time.Tick(15 * time.Second)
    
    • 首先创建了两个slice用来保存和nsqlookupd的连接和nsqlookup的地址,接下来获取本地主机的名字
    • 创建了一个每15s触发一次的定时器
    //lookupLoop内容太多,这里分几个部分来说
    //part-2
    	for {
    		if connect {
    			for _, host := range n.getOpts().NSQLookupdTCPAddresses {
    				if in(host, lookupAddrs) {
    					continue
    				}
    				n.logf(LOG_INFO, "LOOKUP(%s): adding peer", host)
    				lookupPeer := newLookupPeer(host, n.getOpts().MaxBodySize, n.logf,
    					connectCallback(n, hostname))
    				lookupPeer.Command(nil) // start the connection
    				lookupPeers = append(lookupPeers, lookupPeer)
    				lookupAddrs = append(lookupAddrs, host)
    			}
    			n.lookupPeers.Store(lookupPeers)
    			connect = false
    		}
    
    • 这是一个死循环,由于我们在启动nsqd的时候传入了NSQLookupd的TCP地址,所以一开始就会遍历我们传入的地址,如果之前处理过了会保存到lookupAddrs里面,如果没有继续向下执行,创建一个lookupPeer,执行回调函数进行建立连接,并保存当前信息,我们可以发现在创建lookupPeer的时候传入了一个回调函数,来看看它里面做了什么:
    func connectCallback(n *NSQD, hostname string) func(*lookupPeer) {
    	return func(lp *lookupPeer) {
    		ci := make(map[string]interface{})
    		ci["version"] = version.Binary
    		ci["tcp_port"] = n.getOpts().BroadcastTCPPort
    		ci["http_port"] = n.getOpts().BroadcastHTTPPort
    		ci["hostname"] = hostname
    		ci["broadcast_address"] = n.getOpts().BroadcastAddress
    
    		cmd, err := nsq.Identify(ci)
    		if err != nil {
    			lp.Close()
    			return
    		}
    		resp, err := lp.Command(cmd)
    		if err != nil {
    			n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
    			return
    		} else if bytes.Equal(resp, []byte("E_INVALID")) {
    			n.logf(LOG_INFO, "LOOKUPD(%s): lookupd returned %s", lp, resp)
    			lp.Close()
    			return
    		} else {
    			err = json.Unmarshal(resp, &lp.Info)
    			if err != nil {
    				n.logf(LOG_ERROR, "LOOKUPD(%s): parsing response - %s", lp, resp)
    				lp.Close()
    				return
    			} else {
    				n.logf(LOG_INFO, "LOOKUPD(%s): peer info %+v", lp, lp.Info)
    				if lp.Info.BroadcastAddress == "" {
    					n.logf(LOG_ERROR, "LOOKUPD(%s): no broadcast address", lp)
    				}
    			}
    		}
    
    		// build all the commands first so we exit the lock(s) as fast as possible
    		var commands []*nsq.Command
    		n.RLock()
    		for _, topic := range n.topicMap {
    			topic.RLock()
    			if len(topic.channelMap) == 0 {
    				commands = append(commands, nsq.Register(topic.name, ""))
    			} else {
    				for _, channel := range topic.channelMap {
    					commands = append(commands, nsq.Register(channel.topicName, channel.name))
    				}
    			}
    			topic.RUnlock()
    		}
    		n.RUnlock()
    
    		for _, cmd := range commands {
    			n.logf(LOG_INFO, "LOOKUPD(%s): %s", lp, cmd)
    			_, err := lp.Command(cmd)
    			if err != nil {
    				n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lp, cmd, err)
    				return
    			}
    		}
    	}
    }
    
    • 保存当前nsqd的信息放到一个map中,构建IDENTIFY命名返回给cmd,将cmd发送给nsqlookupd并等待response响应,从响应中中获取nsqlookupd并保存到lookupPeer的Info中,接下来便利自己的topicMap,为每一个topic构造register请求并放到commands中,最后逐条发送commands里面的命令。
    • 总结一下这个回调函数的两个任务:
      • 向nsqlookupd发送自己的信息并从响应中获取当前nsqlookupd的信息保存起来。
      • 向nsqlookupd注册自己的所有topic。
    //lookupLoop内容太多,这里分几个部分来说
    //part-2		
    select {
    		case <-ticker:
    			// send a heartbeat and read a response (read detects closed conns)
    			for _, lookupPeer := range lookupPeers {
    				n.logf(LOG_DEBUG, "LOOKUPD(%s): sending heartbeat", lookupPeer)
    				cmd := nsq.Ping()
    				_, err := lookupPeer.Command(cmd)
    				if err != nil {
    					n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
    				}
    			}
    		case val := <-n.notifyChan:
    			var cmd *nsq.Command
    			var branch string
    
    			switch val.(type) {
    			case *Channel:
    				// notify all nsqlookupds that a new channel exists, or that it's removed
    				branch = "channel"
    				channel := val.(*Channel)
    				if channel.Exiting() == true {
    					cmd = nsq.UnRegister(channel.topicName, channel.name)
    				} else {
    					cmd = nsq.Register(channel.topicName, channel.name)
    				}
    			case *Topic:
    				// notify all nsqlookupds that a new topic exists, or that it's removed
    				branch = "topic"
    				topic := val.(*Topic)
    				if topic.Exiting() == true {
    					cmd = nsq.UnRegister(topic.name, "")
    				} else {
    					cmd = nsq.Register(topic.name, "")
    				}
    			}
    
    			for _, lookupPeer := range lookupPeers {
    				n.logf(LOG_INFO, "LOOKUPD(%s): %s %s", lookupPeer, branch, cmd)
    				_, err := lookupPeer.Command(cmd)
    				if err != nil {
    					n.logf(LOG_ERROR, "LOOKUPD(%s): %s - %s", lookupPeer, cmd, err)
    				}
    			}
    		case <-n.optsNotificationChan:
    			var tmpPeers []*lookupPeer
    			var tmpAddrs []string
    			for _, lp := range lookupPeers {
    				if in(lp.addr, n.getOpts().NSQLookupdTCPAddresses) {
    					tmpPeers = append(tmpPeers, lp)
    					tmpAddrs = append(tmpAddrs, lp.addr)
    					continue
    				}
    				n.logf(LOG_INFO, "LOOKUP(%s): removing peer", lp)
    				lp.Close()
    			}
    			lookupPeers = tmpPeers
    			lookupAddrs = tmpAddrs
    			connect = true
    		case <-n.exitChan:
    			goto exit
    		}
    	}
    
    • 第一个是定时器每15s触发一次,向所有的nsqlookupd发送心跳包。
    • notifyChan是用来传递有关topic的信息的,当有新的topic添加或者topic清除的时候向notifyChan中添加消息,根据内容的类型构造register或者unregister命令向所有nsqlookupd发送。
    • optsNotificationChan使用来传递nsqlookupd变化的消息的。
    • exitChan传递退出消息。
    • 总结一下lookupLoop的主要功能:
      • 与所有的nsqlookupd建立连接并注册topic。
      • 每15s向nsqlookupd发送心跳包。
      • 当topic发生变化的时候,向nsqlookupd发送register或者unregister命令。
      • 对nsqlookupd进行监听。
      • 监听退出消息。
    所有博文均为原著,如若转载,请注明出处!
  • 相关阅读:
    MySql 常用时间函数
    ORM执行原生SQL语句
    如何获取该变量(对象)是属于什么类型的
    预解析
    全局变量与局部变量
    函数可以作为参数使用,如果一个函数作为参数,那么我们说这个参数(函数)可以叫回调函数。
    函数的自调用,没有名字,声明的同时直接调用
    return之后的代码不执行
    js冒泡排序
    switch-case case后比较是严格模式
  • 原文地址:https://www.cnblogs.com/zpcoding/p/14517528.html
Copyright © 2011-2022 走看看