zoukankan      html  css  js  c++  java
  • NSQ(3)-消费者消费消息流程

    消费者消费消息源码剖析

    func (c *ConsumerHandler) HandlerMsg() {
    	conf := nsq.NewConfig()
    	consumer, err := nsq.NewConsumer(topic, "ch", conf)
    	if err != nil {
    		logs.Error("create consumer failed, err: %+v
    ", err)
    		return
    	}
    
    	//添加消息处理函数
    	handler := &MsgHandler{}
    	consumer.AddHandler(handler)
    
    	err = consumer.ConnectToNSQLookupd(lookupdAddr)
    	if err != nil {
    		logs.Error("consumer connect nsq failed, err: %+v
    ", err)
    		return
    	}
    
    }
    
    • 在声明一个消费者的时候,直接调用 nsq的NewConsumer方法,第一个参数是 topic,第二个参数是channel,第三个参数是consumer的默认配置。创建好之后向consumer中添加我们自定义的一个handler,它是实现了Handler接口的HandleMessage。最后连接nsqlookupd。
    func NewConsumer(topic string, channel string, config *Config) (*Consumer, error) {
    	config.assertInitialized()
    
    	if err := config.Validate(); err != nil {
    		return nil, err
    	}
    
    	if !IsValidTopicName(topic) {
    		return nil, errors.New("invalid topic name")
    	}
    
    	if !IsValidChannelName(channel) {
    		return nil, errors.New("invalid channel name")
    	}
    
    	r := &Consumer{
    		id: atomic.AddInt64(&instCount, 1),
    
    		topic:   topic,
    		channel: channel,
    		config:  *config,
    
    		logger:      log.New(os.Stderr, "", log.Flags()),
    		logLvl:      LogLevelInfo,
    		maxInFlight: int32(config.MaxInFlight),
    
    		incomingMessages: make(chan *Message),
    
    		rdyRetryTimers:     make(map[string]*time.Timer),
    		pendingConnections: make(map[string]*Conn),
    		connections:        make(map[string]*Conn),
    
    		lookupdRecheckChan: make(chan int, 1),
    
    		rng: rand.New(rand.NewSource(time.Now().UnixNano())),
    
    		StopChan: make(chan int),
    		exitChan: make(chan int),
    	}
    	r.wg.Add(1)
    	go r.rdyLoop()
    	return r, nil
    }
    
    • 在创建consumer的过程中,首先对传入的参数进行验证,然后初始化consumer结构体里面字段的值,最后启动了一个 goroutine定时更新RDY的值,它是用来控制服务端向客户端推送的消息的数量的。
    //关于 nsqd 流控相关的内容,后面会专题进行剖析,此处不再进行分析
    go r.rdyLoop()
    
    • nsq采用push的方式进行消息推送,无论客户端是否繁忙,服务端都会推送消息,如果没有一个流控机制,很容易让客户端最终因为消费速度跟不上导致各种性能问题。nsq于是才有了一个RDY的状态字段来表示流控。简单来说,就是客户端连接上 nsqd服务之后,会告诉nsqd它的可接受消息数量是多少,每当nsqd给客户端推送一条消息,这个RDY就会减一,而客户端消费完一个消息,发送完FIN之后,这个RDY又会加一(有点类似于TCP中用来控制流量的窗口机制),当然,在连接之后,会启动一个单独的 goroutine在后台不断去调整这个 rdycount。
    // AddHandler sets the Handler for messages received by this Consumer. This can be called
    // multiple times to add additional handlers. Handler will have a 1:1 ratio to message handling goroutines.
    func (r *Consumer) AddHandler(handler Handler) {
    	r.AddConcurrentHandlers(handler, 1)
    }
    
    • 在向consumer中添加handler的时候,又调用了AddConcurrentHandlers方法,看名字应该是并发执行的handler的数量,这里默认传入的是1。从注释可以看到处理消息的goroutine 和接收消息的goroutine是一对一的
    func (r *Consumer) AddConcurrentHandlers(handler Handler, concurrency int) {
    	if atomic.LoadInt32(&r.connectedFlag) == 1 {
    		panic("already connected")
    	}
    
    	atomic.AddInt32(&r.runningHandlers, int32(concurrency))
    	for i := 0; i < concurrency; i++ {
    		go r.handlerLoop(handler)
    	}
    }
    
    • concurrency 就是用来说明我们现在传入的handler要并发执行多少个,首先对正在运行的handler进行计数,然后根据并发量启动handler开始工作,启动的每一个handler也都是一个goroutine。
    func (r *Consumer) handlerLoop(handler Handler) {
    	r.log(LogLevelDebug, "starting Handler")
    
    	for {
    		message, ok := <-r.incomingMessages
    		if !ok {
    			goto exit
    		}
    
    		if r.shouldFailMessage(message, handler) {
    			message.Finish()
    			continue
    		}
    
    		err := handler.HandleMessage(message)
    		if err != nil {
    			r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
    			if !message.IsAutoResponseDisabled() {
    				message.Requeue(-1)
    			}
    			continue
    		}
    
    		if !message.IsAutoResponseDisabled() {
    			message.Finish()
    		}
    	}
    
    exit:
    	r.log(LogLevelDebug, "stopping Handler")
    	if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
    		r.exit()
    	}
    }
    
    • 在执行handler的这个函数中是一个死循环,每次都会阻塞从consumer的incomingMessages中读取消息,然后判断消息是否失效,没有失效才继续用我们传入的handler对消息进行处理。
    func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
    	// message passed the max number of attempts
    	if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
    		r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
    			message.ID, message.Attempts)
    
    		logger, ok := handler.(FailedMessageLogger)
    		if ok {
    			logger.LogFailedMessage(message)
    		}
    
    		return true
    	}
    	return false
    }
    
    • 当一个消息的重试次数达到最大重试次数,依旧没有成功时,则认为该消息已经失效。
    // ConnectToNSQLookupd adds an nsqlookupd address to the list for this Consumer instance.
    //
    // If it is the first to be added, it initiates an HTTP request to discover nsqd
    // producers for the configured topic.
    //
    // A goroutine is spawned to handle continual polling.
    func (r *Consumer) ConnectToNSQLookupd(addr string) error {
    	if atomic.LoadInt32(&r.stopFlag) == 1 {
    		return errors.New("consumer stopped")
    	}
    	if atomic.LoadInt32(&r.runningHandlers) == 0 {
    		return errors.New("no handlers")
    	}
    
    	if err := validatedLookupAddr(addr); err != nil {
    		return err
    	}
    
    	atomic.StoreInt32(&r.connectedFlag, 1)
    
    	r.mtx.Lock()
    	for _, x := range r.lookupdHTTPAddrs {
    		if x == addr {
    			r.mtx.Unlock()
    			return nil
    		}
    	}
    	r.lookupdHTTPAddrs = append(r.lookupdHTTPAddrs, addr)
    	numLookupd := len(r.lookupdHTTPAddrs)
    	r.mtx.Unlock()
    
    	// if this is the first one, kick off the go loop
    	if numLookupd == 1 {
    		r.queryLookupd()
    		r.wg.Add(1)
    		go r.lookupdLoop()
    	}
    
    	return nil
    }
    
    • 首先判断consumer是否停止,是否有handler在工作,将consumer标记为连接状态,接着遍历lookupdHTTPAddrs,如果当前连接的地址没有,则添加,如果lookupdHTTPAddrs长度是1,说明这是第一次连接 lookupd,还没有启动过lookupdLoop,那么执行queryLookupd,最后启动一个goroutine。
    // make an HTTP req to one of the configured nsqlookupd instances to discover
    // which nsqd's provide the topic we are consuming.
    //
    // initiate a connection to any new producers that are identified.
    func (r *Consumer) queryLookupd() {
    	retries := 0
    
    retry:
    	endpoint := r.nextLookupdEndpoint()
    
    	r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
    
    	var data lookupResp
    	err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
    	if err != nil {
    		r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
    		retries++
    		if retries < 3 {
    			r.log(LogLevelInfo, "retrying with next nsqlookupd")
    			goto retry
    		}
    		return
    	}
    
    	var nsqdAddrs []string
    	for _, producer := range data.Producers {
    		broadcastAddress := producer.BroadcastAddress
    		port := producer.TCPPort
    		joined := net.JoinHostPort(broadcastAddress, strconv.Itoa(port))
    		nsqdAddrs = append(nsqdAddrs, joined)
    	}
    	// apply filter
    	if discoveryFilter, ok := r.behaviorDelegate.(DiscoveryFilter); ok {
    		nsqdAddrs = discoveryFilter.Filter(nsqdAddrs)
    	}
    	for _, addr := range nsqdAddrs {
    		err = r.ConnectToNSQD(addr)
    		if err != nil && err != ErrAlreadyConnected {
    			r.log(LogLevelError, "(%s) error connecting to nsqd - %s", addr, err)
    			continue
    		}
    	}
    }
    
    • 首先找到一个向nsqlookupd发送的http的链接,调用apiRequestNegotiateV1发送,nsqlookupd会向消费者返回存在用户想消费的topic的所有nsqd的地址,接下来的工作就是遍历nsqlookupd返回的消息组装成nsqdAdder添加到nsqdAddrs中,并对它进行过滤,最后和过滤得到的所有nsqd建立连接。
    // return the next lookupd endpoint to query
    // keeping track of which one was last used
    func (r *Consumer) nextLookupdEndpoint() string {
    	r.mtx.RLock()
    	if r.lookupdQueryIndex >= len(r.lookupdHTTPAddrs) {
    		r.lookupdQueryIndex = 0
    	}
    	addr := r.lookupdHTTPAddrs[r.lookupdQueryIndex]
    	num := len(r.lookupdHTTPAddrs)
    	r.mtx.RUnlock()
    	r.lookupdQueryIndex = (r.lookupdQueryIndex + 1) % num
    
    	urlString := addr
    	if !strings.Contains(urlString, "://") {
    		urlString = "http://" + addr
    	}
    
    	u, err := url.Parse(urlString)
    	if err != nil {
    		panic(err)
    	}
    	if u.Path == "/" || u.Path == "" {
    		u.Path = "/lookup"
    	}
    
    	v, err := url.ParseQuery(u.RawQuery)
    	v.Add("topic", r.topic)
    	u.RawQuery = v.Encode()
    	return u.String()
    }
    
    • 在获取向nsqlookupd发送的http链接的时候,nsqlookupd可能有多个实例构成了集群,在消费者这边会通过轮询的方式选择向哪一台nsqlookupd发送,具体是通过消费者中的lookupdQueryIndex参数。
    // ConnectToNSQD takes a nsqd address to connect directly to.
    //
    // It is recommended to use ConnectToNSQLookupd so that topics are discovered
    // automatically.  This method is useful when you want to connect to a single, local,
    // instance.
    func (r *Consumer) ConnectToNSQD(addr string) error {
    	if atomic.LoadInt32(&r.stopFlag) == 1 {
    		return errors.New("consumer stopped")
    	}
    
    	if atomic.LoadInt32(&r.runningHandlers) == 0 {
    		return errors.New("no handlers")
    	}
    
    	atomic.StoreInt32(&r.connectedFlag, 1)
    
    	logger, logLvl := r.getLogger()
    
    	conn := NewConn(addr, &r.config, &consumerConnDelegate{r})
    	conn.SetLogger(logger, logLvl,
    		fmt.Sprintf("%3d [%s/%s] (%%s)", r.id, r.topic, r.channel))
    
    	r.mtx.Lock()
    	_, pendingOk := r.pendingConnections[addr]
    	_, ok := r.connections[addr]
    	if ok || pendingOk {
    		r.mtx.Unlock()
    		return ErrAlreadyConnected
    	}
    	r.pendingConnections[addr] = conn
    	if idx := indexOf(addr, r.nsqdTCPAddrs); idx == -1 {
    		r.nsqdTCPAddrs = append(r.nsqdTCPAddrs, addr)
    	}
    	r.mtx.Unlock()
    
    	r.log(LogLevelInfo, "(%s) connecting to nsqd", addr)
    
    	cleanupConnection := func() {
    		r.mtx.Lock()
    		delete(r.pendingConnections, addr)
    		r.mtx.Unlock()
    		conn.Close()
    	}
    
    	resp, err := conn.Connect()
    	if err != nil {
    		cleanupConnection()
    		return err
    	}
    
    	if resp != nil {
    		if resp.MaxRdyCount < int64(r.getMaxInFlight()) {
    			r.log(LogLevelWarning,
    				"(%s) max RDY count %d < consumer max in flight %d, truncation possible",
    				conn.String(), resp.MaxRdyCount, r.getMaxInFlight())
    		}
    	}
    
    	cmd := Subscribe(r.topic, r.channel)
    	err = conn.WriteCommand(cmd)
    	if err != nil {
    		cleanupConnection()
    		return fmt.Errorf("[%s] failed to subscribe to %s:%s - %s",
    			conn, r.topic, r.channel, err.Error())
    	}
    
    	r.mtx.Lock()
    	delete(r.pendingConnections, addr)
    	r.connections[addr] = conn
    	r.mtx.Unlock()
    
    	// pre-emptive signal to existing connections to lower their RDY count
    	for _, c := range r.conns() {
    		r.maybeUpdateRDY(c)
    	}
    
    	return nil
    }
    
    • 还是先对参数进行验证,之后根据我们传入的链接,配置和封装了consumer的consumerConnDelegate创建了conn,这个代理类的作用是非常大的,在最后我们会仔细看一下。接下来并没有马上建立连接,先从pendingConnections和connections中尝试获取addr对应的conn,如果获取到了,说明建立过连接了,直接返回,否则先添加到pendingConnections中,创建了一个匿名函数cleanupConnection,当连接建立失败后进行清理工作,之后才正式建立连接。如果建立成功建立一个订阅命令,通过conn向当前的nsqd发送过去,更新pendingConnections和connections,最后检查当前consumer的所有conn是否有必要更新RDY的值。
    func (r *Consumer) maybeUpdateRDY(conn *Conn) {
    	inBackoff := r.inBackoff()
    	inBackoffTimeout := r.inBackoffTimeout()
    	if inBackoff || inBackoffTimeout {
    		r.log(LogLevelDebug, "(%s) skip sending RDY inBackoff:%v || inBackoffTimeout:%v",
    			conn, inBackoff, inBackoffTimeout)
    		return
    	}
    
    	count := r.perConnMaxInFlight()
    	r.log(LogLevelDebug, "(%s) sending RDY %d", conn, count)
    	r.updateRDY(conn, count)
    }
    
    • 更新RDY的值,主要是根据当前consumer最大能接收的消息的数目发送给nsqd的。
    // poll all known lookup servers every LookupdPollInterval
    func (r *Consumer) lookupdLoop() {
    	// add some jitter so that multiple consumers discovering the same topic,
    	// when restarted at the same time, dont all connect at once.
    	r.rngMtx.Lock()
    	jitter := time.Duration(int64(r.rng.Float64() *
    		r.config.LookupdPollJitter * float64(r.config.LookupdPollInterval)))
    	r.rngMtx.Unlock()
    	var ticker *time.Ticker
    
    	select {
    	case <-time.After(jitter):
    	case <-r.exitChan:
    		goto exit
    	}
    
    	ticker = time.NewTicker(r.config.LookupdPollInterval)
    
    	for {
    		select {
    		case <-ticker.C:
    			r.queryLookupd()
    		case <-r.lookupdRecheckChan:
    			r.queryLookupd()
    		case <-r.exitChan:
    			goto exit
    		}
    	}
    
    exit:
    	if ticker != nil {
    		ticker.Stop()
    	}
    	r.log(LogLevelInfo, "exiting lookupdLoop")
    	r.wg.Done()
    }
    
    • 在ConnectToNSQLookupd的最后一步就是启动一个goroutine,在这里面会定时向nsqlookupd发送http请求更新和nsqd的连接,当有新的nsqd负责topic的存储的时候可以马上向这个nsqd获取消息。
    • consumer的启动流程走完了,可是我们没有看到consumer是如何获取消息的呢,我开始再看的时候也没有找到,但是,还记不记得我们刚刚在创建conn的时候传入的是consumer的委托,没错,那个地方就是关键所在,我们先来看一下consumer的委托:
    // keeps the exported Consumer struct clean of the exported methods
    // required to implement the ConnDelegate interface
    type consumerConnDelegate struct {
    	r *Consumer
    }
    
    func (d *consumerConnDelegate) OnResponse(c *Conn, data []byte)       { d.r.onConnResponse(c, data) }
    func (d *consumerConnDelegate) OnError(c *Conn, data []byte)          { d.r.onConnError(c, data) }
    func (d *consumerConnDelegate) OnMessage(c *Conn, m *Message)         { d.r.onConnMessage(c, m) }
    func (d *consumerConnDelegate) OnMessageFinished(c *Conn, m *Message) { d.r.onConnMessageFinished(c, m) }
    func (d *consumerConnDelegate) OnMessageRequeued(c *Conn, m *Message) { d.r.onConnMessageRequeued(c, m) }
    func (d *consumerConnDelegate) OnBackoff(c *Conn)                     { d.r.onConnBackoff(c) }
    func (d *consumerConnDelegate) OnContinue(c *Conn)                    { d.r.onConnContinue(c) }
    func (d *consumerConnDelegate) OnResume(c *Conn)                      { d.r.onConnResume(c) }
    func (d *consumerConnDelegate) OnIOError(c *Conn, err error)          { d.r.onConnIOError(c, err) }
    func (d *consumerConnDelegate) OnHeartbeat(c *Conn)                   { d.r.onConnHeartbeat(c) }
    func (d *consumerConnDelegate) OnClose(c *Conn)                       { d.r.onConnClose(c) }
    
    • consumerConnDelegate 中只有一个参数就是consumer,但是它的方法我们一看就能知道是什么意思,并且知道它们都是什么时候执行的,先放一下,看看conn的创建:
    // NewConn returns a new Conn instance
    func NewConn(addr string, config *Config, delegate ConnDelegate) *Conn {
    	if !config.initialized {
    		panic("Config must be created with NewConfig()")
    	}
    	return &Conn{
    		addr: addr,
    
    		config:   config,
    		delegate: delegate,
    
    		maxRdyCount:      2500,
    		lastMsgTimestamp: time.Now().UnixNano(),
    
    		cmdChan:         make(chan *Command),
    		msgResponseChan: make(chan *msgResponse),
    		exitChan:        make(chan int),
    		drainReady:      make(chan int),
    	}
    }
    
    • 此处只是初始化了一个Conn结构体,将委托ConnDelegate传入,于是继续找:
    // Connect dials and bootstraps the nsqd connection
    // (including IDENTIFY) and returns the IdentifyResponse
    func (c *Conn) Connect() (*IdentifyResponse, error) {
    	dialer := &net.Dialer{
    		LocalAddr: c.config.LocalAddr,
    		Timeout:   c.config.DialTimeout,
    	}
    
    	conn, err := dialer.Dial("tcp", c.addr)
    	if err != nil {
    		return nil, err
    	}
    	c.conn = conn.(*net.TCPConn)
    	c.r = conn
    	c.w = conn
    
    	_, err = c.Write(MagicV2)
    	if err != nil {
    		c.Close()
    		return nil, fmt.Errorf("[%s] failed to write magic - %s", c.addr, err)
    	}
    
    	resp, err := c.identify()
    	if err != nil {
    		return nil, err
    	}
    
    	if resp != nil && resp.AuthRequired {
    		if c.config.AuthSecret == "" {
    			c.log(LogLevelError, "Auth Required")
    			return nil, errors.New("Auth Required")
    		}
    		err := c.auth(c.config.AuthSecret)
    		if err != nil {
    			c.log(LogLevelError, "Auth Failed %s", err)
    			return nil, err
    		}
    	}
    
    	c.wg.Add(2)
    	atomic.StoreInt32(&c.readLoopRunning, 1)
    	go c.readLoop()
    	go c.writeLoop()
    	return resp, nil
    }
    
    • 可以看到,在建立连接之后,启动了两个goroutine,一个用来读,一个用来写。
    func (c *Conn) readLoop() {
    	delegate := &connMessageDelegate{c}
    	for {
    		if atomic.LoadInt32(&c.closeFlag) == 1 {
    			goto exit
    		}
    
    		frameType, data, err := ReadUnpackedResponse(c)
    		if err != nil {
    			if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
    				goto exit
    			}
    			if !strings.Contains(err.Error(), "use of closed network connection") {
    				c.log(LogLevelError, "IO error - %s", err)
    				c.delegate.OnIOError(c, err)
    			}
    			goto exit
    		}
    
    		if frameType == FrameTypeResponse && bytes.Equal(data, []byte("_heartbeat_")) {
    			c.log(LogLevelDebug, "heartbeat received")
    			c.delegate.OnHeartbeat(c)
    			err := c.WriteCommand(Nop())
    			if err != nil {
    				c.log(LogLevelError, "IO error - %s", err)
    				c.delegate.OnIOError(c, err)
    				goto exit
    			}
    			continue
    		}
    
    		switch frameType {
    		case FrameTypeResponse:
    			c.delegate.OnResponse(c, data)
    		case FrameTypeMessage:
    			msg, err := DecodeMessage(data)
    			if err != nil {
    				c.log(LogLevelError, "IO error - %s", err)
    				c.delegate.OnIOError(c, err)
    				goto exit
    			}
    			msg.Delegate = delegate
    			msg.NSQDAddress = c.String()
    
    			atomic.AddInt64(&c.messagesInFlight, 1)
    			atomic.StoreInt64(&c.lastMsgTimestamp, time.Now().UnixNano())
    
    			c.delegate.OnMessage(c, msg)
    		case FrameTypeError:
    			c.log(LogLevelError, "protocol error - %s", data)
    			c.delegate.OnError(c, data)
    		default:
    			c.log(LogLevelError, "IO error - %s", err)
    			c.delegate.OnIOError(c, fmt.Errorf("unknown frame type %d", frameType))
    		}
    	}
    
    exit:
    	atomic.StoreInt32(&c.readLoopRunning, 0)
    	// start the connection close
    	messagesInFlight := atomic.LoadInt64(&c.messagesInFlight)
    	if messagesInFlight == 0 {
    		// if we exited readLoop with no messages in flight
    		// we need to explicitly trigger the close because
    		// writeLoop won't
    		c.close()
    	} else {
    		c.log(LogLevelWarning, "delaying close, %d outstanding messages", messagesInFlight)
    	}
    	c.wg.Done()
    	c.log(LogLevelInfo, "readLoop exiting")
    }
    
    • 首先也是获取了conn的委托,和consumer的一样为它添加了一些相关事件的处理方法,接下来在ReadUnpackedResponse方法中从conn中不断读取Response,根据Response的类型,将Response的内容传给consumer的相关方法,我们就来看看当接收到订阅的消息后的工作:
    func (r *Consumer) onConnMessage(c *Conn, msg *Message) {
    	atomic.AddUint64(&r.messagesReceived, 1)
    	r.incomingMessages <- msg
    }
    
    • 就是向handler阻塞的通道里面写数据。看到这我们发现消费者消费的消息是nsqd主动推送过来的,那么服务端是怎么知道的呢,其实在和nsqd建立完连接的时候向它发送了一个订阅的命令。
    	cmd := Subscribe(r.topic, r.channel)
    	err = conn.WriteCommand(cmd)
    
    • nsqd就是从这个命令中得知当前消费者要订阅的消息,之后根据消费者更新过来的RDY的值来确定推送的数量。
    所有博文均为原著,如若转载,请注明出处!
  • 相关阅读:
    HTML 与 HTML 页面之间动态传值的问题
    maven 导入本地项目(JQuery中的绝杀 $("表单").serialize() 可以自动提交表格数据)+邮件发送+通用的Servlet写法
    linux服务器nginx的卸载
    http协议
    所谓的批量删除
    查看本机ssh公钥,生成公钥
    centos7 redis5编译安装
    linux没有ll等命令的解决办法
    Linux 安装python3.7.0
    CentOS7 安装mysql
  • 原文地址:https://www.cnblogs.com/zpcoding/p/14517513.html
Copyright © 2011-2022 走看看