zoukankan      html  css  js  c++  java
  • nsq源码-channel

    channel有下面几个重要的成员,其实跟topic还有点像,都有一个memoryMsgChan和diskqueue

    1. memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小MemQueueSize,
      默认配置是10000,也就是如果堆积的消息超过10000就会使用磁盘了

    2. backend :就是diskqueue,这个就是磁盘存储消息的地方了,关于这个diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html

    3. clients : Consumer 这里关联的是客户端的订阅者

    4. deferredMessages和deferredPQ : 延迟消息存放的地方,其中deferredPQ,是一个优先级管理的队列,直接丢在内存

    5. inFlightMessages和inFlightPQ: 这个标识正在执行中的消息,也是直接丢在内存

    topic在messagePump()中处理消息的时候,通过下面两个函数,将消息投递到channel。

    //延时消息
    channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
    //非延时消息
    channel.PutMessage(chanMsg)
    

    在这里我们看不到Channel的“守护”协程,也就是我们只看到topic将msg投递到channel,channel将延时消息丢到队列中,但是找不到channel从队列中取出数据发送给client consumer的地方

    找到channel的两个函数, 看了下就是这两个函数将msg发送给client consumer的

    func (c *Channel) processDeferredQueue(t int64) bool
    func (c *Channel) processInFlightQueue(t int64) bool
    

    但是channel本身没有“守护”协程,一直运行来调用这两个函数,找这两个函数调用的地方,一直往上找调用者。最终找到“守护”协程在哪里了。

    原来在nsqd里面运行了一个queueScanLoop的“守护”协程。

    一、nsqd.queueScanLoop

    //nsqd.go
    
    func (n *NSQD) queueScanLoop() {
    	workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    	responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    	closeCh := make(chan int)
    
    	//定时执行loop的间隔时间,默认100ms
    	workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    	//刷新channel数量,重新调整协程池,默认时间是5s刷新一次
    	refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)
    
    	channels := n.channels()
    	n.resizePool(len(channels), workCh, responseCh, closeCh)
    
    	for {
    		select {
    		case <-workTicker.C:
    			if len(channels) == 0 {
    				continue
    			}
    		case <-refreshTicker.C:
    			channels = n.channels()
    			n.resizePool(len(channels), workCh, responseCh, closeCh)
    			continue
    		case <-n.exitChan:
    			goto exit
    		}
    
    		num := n.getOpts().QueueScanSelectionCount
    		if num > len(channels) {
    			num = len(channels)
    		}
    
    	loop:
    		//从 channels 中随机选择 num 个 channel
    		for _, i := range util.UniqRands(num, len(channels)) {
    			workCh <- channels[i]
    		}
    
    		//等待处理响应,记录失败次数
    		numDirty := 0
    		for i := 0; i < num; i++ {
    			if <-responseCh {
    				numDirty++
    			}
    		}
    
    		//queueScanLoop的处理方法模仿了Redis的概率到期算法
    		//(probabilistic expiration algorithm),
    		//每过一个QueueScanInterval(默认100ms)间隔,进行一次概率选择,
    		//从所有的channel缓存中随机选择QueueScanSelectionCount(默认20)个channel,
    		//如果某个被选中channel存在InFlighting消息或者Deferred消息,
    		//则认为该channel为“脏”channel。
    		//如果被选中channel中“脏”channel的比例大于QueueScanDirtyPercent(默认25%),
    		//则不投入睡眠,直接进行下一次概率选择
    		if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
    			goto loop
    		}
    	}
    
    exit:
    	n.logf(LOG_INFO, "QUEUESCAN: closing")
    	close(closeCh)
    	workTicker.Stop()
    	refreshTicker.Stop()
    }
    
    //协程池调整
    func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    
    	// 协程池大小 = 总channel数 / 4
    	idealPoolSize := int(float64(num) * 0.25)
    	if idealPoolSize < 1 {
    		idealPoolSize = 1
    	} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
    		//idealPoolSize 协程池大小,最大默认是4个
    		idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    	}
    	for {
    		if idealPoolSize == n.poolSize {
    			break
    		} else if idealPoolSize < n.poolSize {
    			// contract
    			// 协程池协程容量 < 当前已经开启的协程数量
    			// 说明开启的协程过多,需要关闭协程
    			// closeCh queueScanWorker会中断“守护”协程
    			// 关闭后,将当前开启的协程数量-1
    			closeCh <- 1
    			n.poolSize--
    		} else {
    			// expand
    			// 协程池协程容量 > 当前已经开启的协程数量
    			// 开启新的协程
    			n.waitGroup.Wrap(func() {
    				n.queueScanWorker(workCh, responseCh, closeCh)
    			})
    			n.poolSize++
    		}
    	}
    }
    
    // 真正调用channel.processInFlightQueue的地方
    func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    	for {
    		select {
    		case c := <-workCh:
    			//处理一次某个channel的消息
    			now := time.Now().UnixNano()
    			dirty := false
    			if c.processInFlightQueue(now) {
    				dirty = true
    			}
    			if c.processDeferredQueue(now) {
    				dirty = true
    			}
    			//如果这个channel有消息,则设置为dirty=true
    			responseCh <- dirty
    		case <-closeCh:
    			return
    		}
    	}
    }
    

    二、channel.processInFlightQueue

    现在搞清楚了processInFlightQueue调用的地方,那这个processInFlightQueue到底做了什么

    func (c *Channel) processInFlightQueue(t int64) bool {
    	c.exitMutex.RLock()
    	defer c.exitMutex.RUnlock()
    
    	if c.Exiting() {
    		return false
    	}
    
    	dirty := false
    	for {
    		c.inFlightMutex.Lock()
    		msg, _ := c.inFlightPQ.PeekAndShift(t)
    		c.inFlightMutex.Unlock()
    
    		if msg == nil {
    			goto exit
    		}
    
    		//只要有消息就标识这个通道是脏的
    		dirty = true
    
    		_, err := c.popInFlightMessage(msg.clientID, msg.ID)
    		if err != nil {
    			goto exit
    		}
    		atomic.AddUint64(&c.timeoutCount, 1)
    		c.RLock()
    		client, ok := c.clients[msg.clientID]
    		c.RUnlock()
    		if ok {
    			client.TimedOutMessage()
    		}
    		//循环将channel中存在inFlightPQ中消息put出去
    		//put到memoryMsgChan或者磁盘
    		c.put(msg)
    	}
    
    exit:
    	return dirty
    }
    
    func (c *Channel) put(m *Message) error {
    	select {
    	//将消息写入到memoryMsgChan中去
    	case c.memoryMsgChan <- m:
    	//如果memoryMsgChan满了则将消息写到磁盘中区
    	default:
    		err := writeMessageToBackend(m, c.backend)
    		c.nsqd.SetHealth(err)
    		if err != nil {
    			c.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
    				c.name, err)
    			return err
    		}
    	}
    	return nil
    }
    

    三、channel.processDeferredQueue

    接下来看看延时消息是怎么处理的
    延时消息看起来跟非延时消息没什么区别
    其实是有区别的,区别就在c.deferredPQ.PeekAndShift(t)这里

    func (c *Channel) processDeferredQueue(t int64) bool {
    	c.exitMutex.RLock()
    	defer c.exitMutex.RUnlock()
    
    	if c.Exiting() {
    		return false
    	}
    
    	dirty := false
    	for {
    		c.deferredMutex.Lock()
    		//区别在这个PeekAndShift里面
    		item, _ := c.deferredPQ.PeekAndShift(t)
    		c.deferredMutex.Unlock()
    
    		if item == nil {
    			goto exit
    		}
    		dirty = true
    
    		msg := item.Value.(*Message)
    		_, err := c.popDeferredMessage(msg.ID)
    		if err != nil {
    			goto exit
    		}
    		//put到memoryMsgChan或者磁盘
    		c.put(msg)
    	}
    
    exit:
    	return dirty
    }
    
    //pqueue.go 
    //max实参传入的是当前时间
    func (pq *PriorityQueue) PeekAndShift(max int64) (*Item, int64) {
    	if pq.Len() == 0 {
    		return nil, 0
    	}
    
    	item := (*pq)[0]
    	//存入延时消息时,Priority优先级就是消息的到期时间
    	//这里只有当前时间大于了消息的到期时间才回返回
    	if item.Priority > max {
    		return nil, item.Priority - max
    	}
    	heap.Remove(pq, 0)
    
    	return item, 0
    }
    
  • 相关阅读:
    ServiceStack支持跨域提交
    CookiesHelper
    poj 3669 线段树成段更新+区间合并
    poj2528 线段树+离散化
    hdu3308 线段树 区间合并
    hdu1542矩阵的并 线段树+扫描线
    hdu1255 矩阵的交 线段树+扫描线
    简单单点更新线段树
    树状数组模版
    hdu1873优先队列
  • 原文地址:https://www.cnblogs.com/werben/p/14522563.html
Copyright © 2011-2022 走看看