zoukankan      html  css  js  c++  java
  • nsq源码-topic

    topic的入口在哪里:GetTopic()

    GetTopic如果存在则直接返回,不存在则NewTopic()

    个人觉得Topic里面有两个重要的变量和一个函数,搞清楚这三个东西就差不多了

    1. memoryMsgChan: 这是存放消息的内存,就是一个通道,通道的大小MemQueueSize,
      默认配置是10000,也就是如果堆积的消息超过10000就会使用磁盘了

    2. backend :就是diskqueue,这个就是磁盘存储消息的地方了,这个diskqueue一定要搞懂,因为后面channel也会用到这个queue,关于这个diskqueue,请参考:https://www.cnblogs.com/werben/p/14517781.html

    3. messagePump : 这是topic的一个“守护”协程,看源码里的英文注释, messagePump selects over the in-memory and backend queue and writes messages to every channel for this topic,它将topic收到的消息,分发给每个channel。

    func (t *Topic) messagePump() {
    	var msg *Message
    	var buf []byte
    	var err error
    	var chans []*Channel
    	var memoryMsgChan chan *Message
    	var backendChan <-chan []byte
    
    	// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
    	//这里就是要等到startChan完成后才能往下走,
    	for {
    		select {
    		case <-t.channelUpdateChan:
    			continue
    		case <-t.pauseChan:
    			continue
    		case <-t.exitChan:
    			goto exit
    		case <-t.startChan:
    		//也就是要等到topic执行完GetChannel()之后才会接着往下走
    		}
    		break
    	}
    	t.RLock()
    	//将所有channel通道放在chans中
    	for _, c := range t.channelMap {
    		chans = append(chans, c)
    	}
    	t.RUnlock()
    	if len(chans) > 0 && !t.IsPaused() {
    		memoryMsgChan = t.memoryMsgChan
    		//backendChan就是backend暴露给外部的readChan
    		//参考: https://www.cnblogs.com/werben/p/14517781.html
    		backendChan = t.backend.ReadChan()
    	}
    
    	// main message loop
    	//这里是守护协程的主体了,也就是这个for会一直跑
    	for {
    		select {
    		case msg = <-memoryMsgChan:
    			//如果topic有收到新消息
    		case buf = <-backendChan:
    			//如果消息是从diskqueue里来的,还要解码反序列化成msg
    			msg, err = decodeMessage(buf)
    			if err != nil {
    				t.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
    				continue
    			}
    		case <-t.channelUpdateChan:
    			//如果有新的channel通道
    			chans = chans[:0]
    			t.RLock()
    			for _, c := range t.channelMap {
    				chans = append(chans, c)
    			}
    			t.RUnlock()
    			if len(chans) == 0 || t.IsPaused() {
    				memoryMsgChan = nil
    				backendChan = nil
    			} else {
    				memoryMsgChan = t.memoryMsgChan
    				backendChan = t.backend.ReadChan()
    			}
    			continue
    		case <-t.pauseChan:
    			//如果channel通道暂停
    			if len(chans) == 0 || t.IsPaused() {
    				memoryMsgChan = nil
    				backendChan = nil
    			} else {
    				memoryMsgChan = t.memoryMsgChan
    				backendChan = t.backend.ReadChan()
    			}
    			continue
    		case <-t.exitChan:
    			goto exit
    		}
    
    		//遍历每一个channel通道,将消息投递过去
    		for i, channel := range chans {
    			chanMsg := msg
    			// copy the message because each channel
    			// needs a unique instance but...
    			// fastpath to avoid copy if its the first channel
    			// (the topic already created the first copy)
    			if i > 0 {
    				chanMsg = NewMessage(msg.ID, msg.Body)
    				chanMsg.Timestamp = msg.Timestamp
    				chanMsg.deferred = msg.deferred
    			}
    			if chanMsg.deferred != 0 {
    				//如果是延时消息则将延时消息丢给channel
    				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
    				continue
    			}
    			//将消息则将延时消息丢给channel
    			err := channel.PutMessage(chanMsg)
    			if err != nil {
    				t.nsqd.logf(LOG_ERROR,
    					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
    					t.name, msg.ID, channel.name, err)
    			}
    		}
    	}
    
    exit:
    	t.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
    }
    
  • 相关阅读:
    c#生成图片验证码
    关于Aspcms如何嵌入整个网站,以及网站导航所指向页面的内容显示
    web 验证控件
    MVC Link连接数据库增删改查方法的不同写法
    Mvc 翻页查询,代码很有用
    MVC添加分布视图做唯一验证
    MVc路由查询,路由到底有什么作用呢??
    MVC添加动态视图的参考代码。重点是添加部分视图的使用方法,非常有用的代码!!!!!!!!!!!!!!
    tyvj P1209
    bzoj 1051: [HAOI2006]受欢迎的牛 tarjan缩点
  • 原文地址:https://www.cnblogs.com/werben/p/14518283.html
Copyright © 2011-2022 走看看