zoukankan      html  css  js  c++  java
  • go笔记 NSQ (5) ( nsqd如何监听生产者的消息,select关键字使用)

      本节主要来探究nsq如何监听生产者的消息。

      通过上节我们得知nsq接收消息发送主要是靠下面这个http处理器   当然了也可以通过原生tcp的方式进行消息发送,由于具体处理流程类似,所以文末会有提到。

    router.Handle("POST", "/pub", http_api.Decorate(s.doPUB, http_api.V1))
    

      我们发送一个http请求例如如下就可以向指定topic生产一个消息

    $ curl -d "<message>" http://127.0.0.1:4151/pub?topic=name

      所以本文的内容主要看  s.doPUB是如何处理请求的。

    1.接收请求生成消息结构体

      

    func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
    	//检查消息是否过大
    	if req.ContentLength > s.ctx.nsqd.getOpts().MaxMsgSize {
    		return nil, http_api.Err{413, "MSG_TOO_BIG"}
    	}
    
    	//最大可阅读孩值+1
    	readMax := s.ctx.nsqd.getOpts().MaxMsgSize + 1
    	//获得请求体
    	body, err := ioutil.ReadAll(io.LimitReader(req.Body, readMax))
    	if err != nil {
    		return nil, http_api.Err{500, "INTERNAL_ERROR"}
    	}
    	if int64(len(body)) == readMax {
    		return nil, http_api.Err{413, "MSG_TOO_BIG"}
    	}
    	if len(body) == 0 {
    		return nil, http_api.Err{400, "MSG_EMPTY"}
    	}
    	//获得对应的topic  以及消息内容,如果topic没有会直接创建
    	reqParams, topic, err := s.getTopicFromQuery(req)
    	if err != nil {
    		return nil, err
    	}
    	//这儿判断消息是否是延时队列  如果是的话获得延时时间
    	var deferred time.Duration
    	if ds, ok := reqParams["defer"]; ok {
    		var di int64
    		di, err = strconv.ParseInt(ds[0], 10, 64)
    		if err != nil {
    			return nil, http_api.Err{400, "INVALID_DEFER"}
    		}
    		deferred = time.Duration(di) * time.Millisecond
    		if deferred < 0 || deferred > s.ctx.nsqd.getOpts().MaxReqTimeout {
    			return nil, http_api.Err{400, "INVALID_DEFER"}
    		}
    	}
    	//创建一个message结构体
    	msg := NewMessage(topic.GenerateID(), body)
    	msg.deferred = deferred
    	//将消息发送给topic下的channel
    	err = topic.PutMessage(msg)
    	if err != nil {
    		return nil, http_api.Err{503, "EXITING"}
    	}
    
    	return "OK", nil
    }
    

       其实代码主要分为几个部分

       1.检查消息是否满足要求,例如长度等

       2.根据消息获得对应topic(如果没有会创建)

       3.生成消息结构体,将消息发送到指定topic下的所有channel 

        我们主要看下 2和3的具体操作

    2.获取topic

      思路也是比较常规的思路,获取请求参数里指定的topic,然后去nsqd结构体下的topic map查看是否有对应的topic ,如果有则直接返回,如果没有就创建一个新的topic存入这个topic map,然后返回。看具体getTopicFromQuery

    代码,

      

    func (s *httpServer) getTopicFromQuery(req *http.Request) (url.Values, *Topic, error) {
    	//如果请求参数
    	reqParams, err := url.ParseQuery(req.URL.RawQuery)
    	if err != nil {
    		s.ctx.nsqd.logf(LOG_ERROR, "failed to parse request params - %s", err)
    		return nil, nil, http_api.Err{400, "INVALID_REQUEST"}
    	}
    	//查看是否有topic参数 如果没有直接返回错误信息
    	topicNames, ok := reqParams["topic"]
    	if !ok {
    		return nil, nil, http_api.Err{400, "MISSING_ARG_TOPIC"}
    	}
    	//获取到topicName
    	topicName := topicNames[0]
    	//如果topicName非法也会报错  比如长度大于1 小于64
    	if !protocol.IsValidTopicName(topicName) {
    		return nil, nil, http_api.Err{400, "INVALID_TOPIC"}
    	}
    	//去nsqd中查询topic
    	return reqParams, s.ctx.nsqd.GetTopic(topicName), nil
    }
    

       这儿主要获取到topicName,然后去nsqd中查找,再看nsqd.GetTopic方法。

    // GetTopic performs a thread safe operation
    // to return a pointer to a Topic object (potentially new)
    func (n *NSQD) GetTopic(topicName string) *Topic {
    	//读写锁,防止重复创建某个topic
    	//加读锁读,如果未读到
    	n.RLock()
    	t, ok := n.topicMap[topicName]
    	n.RUnlock()
    	if ok {
    		return t
    	}
    	//加写锁并在此检查是否存在
    	n.Lock()
    
    	t, ok = n.topicMap[topicName]
    	if ok {
    		n.Unlock()
    		return t
    	}
    	//定义个删除topic后的回调函数
    	deleteCallback := func(t *Topic) {
    		n.DeleteExistingTopic(t.name)
    	}
    	//新建一个topic  
    	t = NewTopic(topicName, &context{n}, deleteCallback)
    	//放入上下文即nsqd的topicMap中
    	n.topicMap[topicName] = t
    	//解锁
    	n.Unlock()
    
    	n.logf(LOG_INFO, "TOPIC(%s): created", t.name)
    	// topic is created but messagePump not yet started
    
    	// 如果topic已经是loading状态就直接返回
    	if atomic.LoadInt32(&n.isLoading) == 1 {
    		return t
    	}
    
    	//如果我们配置了lookupd  查看  lookupd在该topicName下是否有channel  如果有则获取到其下面的channel名字新建到当前topic下 
    	//这儿比如说已经有一个nsqd 关联nsqlookupd topic为xxx ,下面有channelA   channelB   这个时候如果我们又启动了一个nsqd  也是关联这个nsqlookupd
    	//并且也有生产者往这个nsqd发送topic为xxx的信息,为了保证集群的一致性,需要其下面也要有channelA   channelB
    	lookupdHTTPAddrs := n.lookupdHTTPAddrs()
    	if len(lookupdHTTPAddrs) > 0 {
    		channelNames, err := n.ci.GetLookupdTopicChannels(t.name, lookupdHTTPAddrs)
    		if err != nil {
    			n.logf(LOG_WARN, "failed to query nsqlookupd for channels to pre-create for topic %s - %s", t.name, err)
    		}
    		for _, channelName := range channelNames {
    			if strings.HasSuffix(channelName, "#ephemeral") {
    				continue // do not create ephemeral channel with no consumer client
    			}
    			t.GetChannel(channelName)
    		}
    	} else if len(n.getOpts().NSQLookupdTCPAddresses) > 0 {
    		n.logf(LOG_ERROR, "no available nsqlookupd to query for channels to pre-create for topic %s", t.name)
    	}
    
    	// 所有channel都添加  可以开始让topic接收消息
    	t.Start()
    	return t
    }
    

      这儿其实就是一个新建topic的过程,这儿需要注意的地方也就是两个,一个是新建topic结构体,二是去nsqlookupd同步该topic已有的channel。这儿我们主要看下新建topic结构体的操作,即这个NewTopic方法。

    func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
    	t := &Topic{
    		name:              topicName,//topicName
    		channelMap:        make(map[string]*Channel), //该topic下包含的chanel,所有channel都将保存topic消息的副本
    		memoryMsgChan:     make(chan *Message, ctx.nsqd.getOpts().MemQueueSize),//消息将先到达->memoryMsgChan,然后会轮流推送到所有的channel
    		startChan:         make(chan int, 1), //启动信号
    		exitChan:          make(chan int), //关闭信号
    		channelUpdateChan: make(chan int),//包含的channel修改信号  例如添加或者删除
    		ctx:               ctx,//nsqd上下文
    		paused:            0,
    		pauseChan:         make(chan int),//暂停信号
    		deleteCallback:    deleteCallback, //删除回调函数
    		idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),//id
    	}
    	//临时topic
    	if strings.HasSuffix(topicName, "#ephemeral") {
    		t.ephemeral = true
    		t.backend = newDummyBackendQueue()
    	} else {
    		dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
    			opts := ctx.nsqd.getOpts()
    			lg.Logf(opts.Logger, opts.logLevel, lg.LogLevel(level), f, args...)
    		}
    		//backend是消息如果已经达到了topic容纳消息的最长时的备份策略
    		//这儿是存在硬盘中
    		t.backend = diskqueue.New(
    			topicName,
    			ctx.nsqd.getOpts().DataPath,
    			ctx.nsqd.getOpts().MaxBytesPerFile,
    			int32(minValidMsgLength),
    			int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
    			ctx.nsqd.getOpts().SyncEvery,
    			ctx.nsqd.getOpts().SyncTimeout,
    			dqLogf,
    		)
    	}
    	//topic准备开始接收消息
    	t.waitGroup.Wrap(t.messagePump)
    	//通知nsqlookupd该nsqd新建了一个topic
    	t.ctx.nsqd.Notify(t)
    
    	return t
    }
    

        这段创建topic的代码比较核心,主要有几个地方需要注意。1是新建topic结构体   2是设置topic的backend  这关系到消息达到最大时的存储策略  3是topic开始进入准备接收消息状态

        关于topic结构体的一些核心变量有必要做一个说明

    • channelMap  用来存放该topic下的所有channel,当有消息推送到topic时,下面的所有channel都会收到信息
    • memoryMsgChan 消息推送chan,当有消息到来时会先到该chan,然后接收chan信息遍历推送到每个channel
    • startChan 开始信号,有信息进入该chan说明topic可以开始接收消息推送
    • exitChan 接收信号,当该topic不需要在接收消息推送(例如被删除),可以将信息设置到该chan
    • channelUpdateChan channel修改信号,例如该topic下有新创建channel,则可以将信息推入该chan,可以用来同步nsqlookupd等
    • ctx  即nsqd上下文
    • deleteCallback 即删除回调函数

      新建topic后,会设置其backend即信息消息超出chan最大值时的备份方法,这儿一般存在硬盘上,这个后面会说到。  

      t.waitGroup.Wrap(t.messagePump) 则是接收处理消息的方法。t.ctx.nsqd.Notify(t) 内部最后主要是会通知nsqlookupd做一些同步信息。

      我们主要看下接收处理消息的方法。

    3. topic消息处理

      

    func (t *Topic) messagePump() {
    	//消息体
    	var msg *Message
    	//消息buf
    	var buf []byte
    	//错误
    	var err error
    	//该topic下所有channel
    	var chans []*Channel
    	//该topic的memoryMsgChan  消息入口
    	var memoryMsgChan chan *Message
    	//超出最大消息队列的备份chan
    	var backendChan chan []byte
    
    	// do not pass messages before Start(), but avoid blocking Pause() or GetChannel()
    	for {
    		select {
    		case <-t.channelUpdateChan:
    			continue
    		case <-t.pauseChan:
    			continue
    		case <-t.exitChan:
    			goto exit
    		//如果startChan准备好了才开始接收消息推送	
    		case <-t.startChan:
    		}
    		break
    	}
    	//读取该topic中所有的channel
    	t.RLock()
    	for _, c := range t.channelMap {
    		chans = append(chans, c)
    	}
    	t.RUnlock()
    	if len(chans) > 0 && !t.IsPaused() {
    		//赋值memoryMsgChan backendChan
    		memoryMsgChan = t.memoryMsgChan
    		backendChan = t.backend.ReadChan()
    	}
    
    	//核心消息轮询处理
    	for {
    		select {
    		//接收到消息推送	
    		case msg = <-memoryMsgChan:
    		//接收到备份消息推送  例如磁盘
    		case buf = <-backendChan:
    			msg, err = decodeMessage(buf)
    			if err != nil {
    				t.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
    				continue
    			}
    		//如果channel有改动	  则重新获取该topic下的channel
    		case <-t.channelUpdateChan:
    			chans = chans[:0]
    			t.RLock()
    			for _, c := range t.channelMap {
    				chans = append(chans, c)
    			}
    			t.RUnlock()
    			//如果暂停了就直接为空
    			if len(chans) == 0 || t.IsPaused() {
    				memoryMsgChan = nil
    				backendChan = nil
    			} else {
    				//重新赋值memoryMsgChan 和backendChan
    				memoryMsgChan = t.memoryMsgChan
    				backendChan = t.backend.ReadChan()
    			}
    			continue
    		//如果是暂停信号 则和上面暂停时一个操作	
    		case <-t.pauseChan:
    			if len(chans) == 0 || t.IsPaused() {
    				memoryMsgChan = nil
    				backendChan = nil
    			} else {
    				memoryMsgChan = t.memoryMsgChan
    				backendChan = t.backend.ReadChan()
    			}
    			continue
    		//如果是exit信号说明该topic不在消费消息,直接goto到exit代码块	
    		case <-t.exitChan:
    			goto exit
    		}
    		//遍历该topic下的channel   并且发送消息
    		//注意如果为延时消息则会扔到延时队列里边去
    		for i, channel := range chans {
    			chanMsg := msg
    			// copy the message because each channel
    			// needs a unique instance but...
    			// fastpath to avoid copy if its the first channel
    			// (the topic already created the first copy)
    			if i > 0 {
    				chanMsg = NewMessage(msg.ID, msg.Body)
    				chanMsg.Timestamp = msg.Timestamp
    				chanMsg.deferred = msg.deferred
    			}
    			if chanMsg.deferred != 0 {
    				channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
    				continue
    			}
    			err := channel.PutMessage(chanMsg)
    			if err != nil {
    				t.ctx.nsqd.logf(LOG_ERROR,
    					"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
    					t.name, msg.ID, channel.name, err)
    			}
    		}
    	}
    
    exit:
    	t.ctx.nsqd.logf(LOG_INFO, "TOPIC(%s): closing ... messagePump", t.name)
    }
    

        这里面逻辑还是很简单的,就先轮询等待,直到接收到start信号,则开始处理消息信号。

    • 如果接收到正常的msg即消息信号或者backend信号即还未被消费的备份消息,则直接遍历该topic下的所有channel并发送消息 (延时消息会放置到延时队列)
    • 如果接收到channel改动信号比如新增或者删除,则重新赋值该topic下的所有channel
    • 如果接收到暂停信号,且是暂停命令,则将实时消息和备份消息chan都置为null,如果不是则将上下文中的chan重新赋值到该方法中
    • 如果接收到exit信号,则退出消息接收轮询,执行exit后的代码块

      其实这儿比较关注的点应该是两个,何时会接收到memoryMsgChan的值,每个channel具体是怎么推送消息到其下面的所有client也就是consumer的。 我们先看第一个问题,怎么接收memoryMsgChan,要明白这个问题,我们要回到菜单1中,创建好topic后的操作,代码中创建topic后会执行一个putMessage操作、

    	msg := NewMessage(topic.GenerateID(), body)
    	msg.deferred = deferred
    	err = topic.PutMessage(msg)
    

    4. 推送消息到topic的memoryMsgChan

      

    func (t *Topic) PutMessage(m *Message) error {
    	t.RLock()
    	defer t.RUnlock()
    	//检查topic是否已经被停了
    	if atomic.LoadInt32(&t.exitFlag) == 1 {
    		return errors.New("exiting")
    	}
    	//发送消息
    	err := t.put(m)
    	if err != nil {
    		return err
    	}
    	//已接收消息+1
    	atomic.AddUint64(&t.messageCount, 1)
    	//备份消息内容
    	atomic.AddUint64(&t.messageBytes, uint64(len(m.Body)))
    	return nil
    }
    

      这儿就是检查了一下,核心还是这个put方法。

    func (t *Topic) put(m *Message) error {
    	select {
    	//****************关键操作  消息放入到该topic的	memoryMsgChan中
    	case t.memoryMsgChan <- m:
    	//如果放入失败说明chan已经满了 此时需要放入磁盘
    	default:
    		b := bufferPoolGet()
    		//信息写入备份
    		err := writeMessageToBackend(b, m, t.backend)
    		bufferPoolPut(b)
    		//该nsqd处于不健康状态
    		t.ctx.nsqd.SetHealth(err)
    		if err != nil {
    			t.ctx.nsqd.logf(LOG_ERROR,
    				"TOPIC(%s) ERROR: failed to write message to backend - %s",
    				t.name, err)
    			return err
    		}
    	}
    	return nil
    }
    

      可以看到 这儿就很明显的看到会将消息写入memoryMsgChan ,而另一边topic中就会接收到这个消息并推送到其包含的所有channel。当然如果chan已经满了,就会执行default操作,即执行备份操作,一般是写入磁盘,关于备份磁盘有关的操作后面会专门讲到,这儿就不先说明了。

    5.topic如何将消息推送到其包含的所有channel

      通过三我们知道topic会遍历其包含的所有channel,然后将消息推送到channel。我们可以看下具体的推送细节即channel.PutMessage方法

    err := channel.PutMessage(chanMsg)
    

      

    func (c *Channel) PutMessage(m *Message) error {
    	c.RLock()
    	defer c.RUnlock()
    	if c.Exiting() {
    		return errors.New("exiting")
    	}
    	err := c.put(m)
    	if err != nil {
    		return err
    	}
    	atomic.AddUint64(&c.messageCount, 1)
    	return nil
    }
    

      和topic的那儿的操作很像啊,都是做了一个状态判断。然后执行其put方法。

    func (c *Channel) put(m *Message) error {
    	select {
    	case c.memoryMsgChan <- m:
    	default:
    		b := bufferPoolGet()
    		err := writeMessageToBackend(b, m, c.backend)
    		bufferPoolPut(b)
    		c.ctx.nsqd.SetHealth(err)
    		if err != nil {
    			c.ctx.nsqd.logf(LOG_ERROR, "CHANNEL(%s): failed to write message to backend - %s",
    				c.name, err)
    			return err
    		}
    	}
    	return nil
    }
    

      这儿也和topic很像,会将消息推送到Channel中的memoryMsgChan   如果长度过大,那么就会放入到备份队列中。我们发现channel的操作和topic非常像。那么也很容易可以得知,channel中所包含的消费者肯定也会轮询接收这个memoryMsgChan 的信号。当然具体的channe有关的内容属于消费者的范畴,本文主要讲述生产者。

      此时我们已经成功接收到生产者推送的消息,并将消息分发到topic下所有的channel。用图来表示的话大致可以如下

      

      到此生产者已经完成工作,在后面一章将会讲到消费者是如何接收channel中的消息。

      官网中信息传递图 nsqd的部分已经完成

    6. select使用

      本文很多地方用到select,该关键字一般用来操作接收多个chan事件时分别做出对应的处理,比如我们可以用如下demo来了解

    func main() {
    	read :=bufio.NewReader(os.Stdin)
    	ch1 := make(chan int ,1)
    	ch2 := make(chan int ,1)
    	ch3 := make(chan int ,1)
    
    	go func() {
    
    		for  {
    			select {
    			case <-ch1:
    				fmt.Println("接收到指令1")
    			case <-ch2:
    				fmt.Println("接收到指令2")
    			case <-ch3:
    				fmt.Println("接收到指令3")
    			default:
    
    			}
    		}
    	}()
    
    	for  {
    		s,_ :=read.ReadString('
    ')
    		str :=strings.ReplaceAll(s,"
    ","")
    		switch str {
    		case "1":
    			ch1 <- 1
    		case "2":
    			ch2 <- 1
    		case "3":
    			ch3 <- 1
    		default:
    			fmt.Println("未知指令")
    		}
    	}
    }
    

      

       其实就是可以监听多个chan信号,监听到其中某个可以执行对应的操作,一般和for结合使用。

    后记

      nsq不光能通过http的方式发送消息,也支持原生tcp协议监听端口监听生产者消息。具体可以看到nsqd.main方法中创建的tcpServer。其接收到新的套接字后会执行protocolV2.IOLoop(clientConn)方法。在该方法中最终可以走到如下代码

    response, err = p.Exec(client, params)
    

      在该方法中

    func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    	if bytes.Equal(params[0], []byte("IDENTIFY")) {
    		return p.IDENTIFY(client, params)
    	}
    	err := enforceTLSPolicy(client, p, params[0])
    	if err != nil {
    		return nil, err
    	}
    	switch {
    	case bytes.Equal(params[0], []byte("FIN")):
    		return p.FIN(client, params)
    	case bytes.Equal(params[0], []byte("RDY")):
    		return p.RDY(client, params)
    	case bytes.Equal(params[0], []byte("REQ")):
    		return p.REQ(client, params)
    	//接收生产者消息	
    	case bytes.Equal(params[0], []byte("PUB")):
    		return p.PUB(client, params)
    	case bytes.Equal(params[0], []byte("MPUB")):
    		return p.MPUB(client, params)
    	case bytes.Equal(params[0], []byte("DPUB")):
    		return p.DPUB(client, params)
    	case bytes.Equal(params[0], []byte("NOP")):
    		return p.NOP(client, params)
    	case bytes.Equal(params[0], []byte("TOUCH")):
    		return p.TOUCH(client, params)
    	case bytes.Equal(params[0], []byte("SUB")):
    		return p.SUB(client, params)
    	case bytes.Equal(params[0], []byte("CLS")):
    		return p.CLS(client, params)
    	case bytes.Equal(params[0], []byte("AUTH")):
    		return p.AUTH(client, params)
    	}
    	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
    }
    

      我们可以看这个p.PUB方法就是接收生产者消息的方法。内部的处理操作和http方式的一致,这儿就不在说明。

  • 相关阅读:
    Connected Components? Codeforces
    洛谷 P1344 [USACO4.4]追查坏牛奶Pollutant Control
    洛谷 P4174 [NOI2006]最大获利 && 洛谷 P2762 太空飞行计划问题 (最大权闭合子图 && 最小割输出任意一组方案)
    表达式求值
    费用流(自用,勿看)
    二分图??(自用,勿看)
    C 标准库
    Linux-socket使用
    C 标准库
    C 标准库
  • 原文地址:https://www.cnblogs.com/hetutu-5238/p/13038643.html
Copyright © 2011-2022 走看看