zoukankan      html  css  js  c++  java
  • go笔记 NSQ (6) ( nsqd如何创建消费者以及消费消息)

      前面几章中可以看到,nsq进行消息消费的时候主要使用tcpServer去处理,也就是如下的方法

    func (p *tcpServer) Handle(clientConn net.Conn) {
    	p.ctx.nsqd.logf(LOG_INFO, "TCP: new client(%s)", clientConn.RemoteAddr())
    
    	// The client should initialize itself by sending a 4 byte sequence indicating
    	// the version of the protocol that it intends to communicate, this will allow us
    	// to gracefully upgrade the protocol away from text/line oriented to whatever...
    	buf := make([]byte, 4)
    	_, err := io.ReadFull(clientConn, buf)
    	if err != nil {
    		p.ctx.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
    		return
    	}
    	protocolMagic := string(buf)
    
    	p.ctx.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
    		clientConn.RemoteAddr(), protocolMagic)
    
    	var prot protocol.Protocol
    	switch protocolMagic {
    	case "  V2":
    		prot = &protocolV2{ctx: p.ctx}
    	default:
    		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
    		clientConn.Close()
    		p.ctx.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
    			clientConn.RemoteAddr(), protocolMagic)
    		return
    	}
    
    	err = prot.IOLoop(clientConn)
    	if err != nil {
    		p.ctx.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    		return
    	}
    }
    

      在验证了协议魔数后,开启一个新的goroute去处理这个连接,也就是protocolV2.IOLoop来处理这个conn,一旦有消费者连接这个nsqd便会执行这个方法。具体的处理逻辑都在这个方法中。

    1.protocolV2.IOLoop

      

    func (p *protocolV2) IOLoop(conn net.Conn) error {
    	var err error
    	var line []byte
    	var zeroTime time.Time
    	//获取一个当前client的id   一般是在已有的client数量上原子性的+1
    	clientID := atomic.AddInt64(&p.ctx.nsqd.clientIDSequence, 1)
    	//创建一个client  
    	client := newClientV2(clientID, conn, p.ctx)
    	//当前client添加到上下文nsqd中
    	p.ctx.nsqd.AddClient(client.ID, client)
    
    	//client已经准备好接收消息时才接收消息
    	messagePumpStartedChan := make(chan bool)
    	//启动一个goroute来处理该client的接收的信号   例如订阅的topic有消息出现等
    	go p.messagePump(client, messagePumpStartedChan)
    	<-messagePumpStartedChan
    	//接收并处理client端发送过来的各种消息,例如身份验证,作为生产者生产消息,作为消费者消费消息
    	for {
    		//根据心跳间隔,设置连接超时时间 如果为0说明一直不会超时
    		if client.HeartbeatInterval > 0 {
    			client.SetReadDeadline(time.Now().Add(client.HeartbeatInterval * 2))
    		} else {
    			client.SetReadDeadline(zeroTime)
    		}
    
    		
    		//根据 间隔符来读取数据  这儿用的是换行符
    		line, err = client.Reader.ReadSlice('
    ')
    		if err != nil {
    			if err == io.EOF {
    				err = nil
    			} else {
    				err = fmt.Errorf("failed to read command - %s", err)
    			}
    			break
    		}
    
    		// 去掉最后一个换行符 
    		line = line[:len(line)-1]
    		//如果用的是回车 
      那要把这个
    也去掉
    		if len(line) > 0 && line[len(line)-1] == '
    ' {
    			line = line[:len(line)-1]
    		}
    		//将信息根据空格进行分隔
    		params := bytes.Split(line, separatorBytes)
    
    		p.ctx.nsqd.logf(LOG_DEBUG, "PROTOCOL(V2): [%s] %s", client, params)
    
    		var response []byte
    		//根据获取到的信息  分别进行不同的处理 
    		//处理结果通常会返回给上面client创建的goroute中处理
    		response, err = p.Exec(client, params)
    		.................//错误处理
    	}
    	//如果出错  或者断开连接  就会跳出上面的循环
    	p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting ioloop", client)
    	//关闭连接
    	conn.Close()
    	close(client.ExitChan)
    	//移除当前client  
    	if client.Channel != nil {
    		client.Channel.RemoveClient(client.ID)
    	}
    
    	p.ctx.nsqd.RemoveClient(client.ID)
    	return err
    }
    

      首先根据我们的提供的信息创建了client也就是连接客户端。然后下面两个核心操作  

          go p.messagePump(client, messagePumpStartedChan)  和  response, err = p.Exec(client, params)   

      前者主要是该client对接收到的信号进行处理,例如有需要消费的消息等,信号是nsqd端发出。后者是对该client接收到的信息处理,例如client身份验证,功能声明等,消息是client端发送过来的

      关于这个client我们可以看下其结构体,并做一些说明

    	c := &clientV2{
    		ID:  id, //id
    		ctx: ctx, //上下文  里面主要包含了nsqd
    
    		Conn: conn, //该client的tcp连接  
    
    		Reader: bufio.NewReaderSize(conn, defaultBufferSize), //该tcp连接的reader
    		Writer: bufio.NewWriterSize(conn, defaultBufferSize), //该tcp连接的writer
    
    		OutputBufferSize:    defaultBufferSize,//写数据缓冲区大小
    		OutputBufferTimeout: ctx.nsqd.getOpts().OutputBufferTimeout,//写数据缓冲超时时间
    
    		MsgTimeout: ctx.nsqd.getOpts().MsgTimeout,//消息超时时间
    
    		// ReadyStateChan has a buffer of 1 to guarantee that in the event
    		// there is a race the state update is not lost
    		ReadyStateChan: make(chan int, 1), //该client是否准备好
    		ExitChan:       make(chan int),//client断开信号
    		ConnectTime:    time.Now(),//第一次连接的时间
    		State:          stateInit,
    
    		ClientID: identifier, //client的验证信息
    		Hostname: identifier,//client的hostName
    
    		SubEventChan:      make(chan *Channel, 1), //subchan信号,如果该client为消费客户端且发送了sub信息,会接收到信号,接收到的是该client所订阅的topic下的某个channel
    		IdentifyEventChan: make(chan identifyEvent, 1),//验证chan客户端发送验证信息好会收到信号
    
    		// heartbeats are client configurable but default to 30s
    		HeartbeatInterval: ctx.nsqd.getOpts().ClientTimeout / 2, //心跳间隔
    
    		pubCounts: make(map[string]uint64),//如果为消息生产者则用来统计已生产消息数量
    	}
    	c.lenSlice = c.lenBuf[:]
    	return c
    }
    

      在本文中我们需要关注的就是这个SubEventChan,在客户端发送sub指令后该chan会收到信号,内容一般是其所订阅的channel

     

    2. response, err = p.Exec(client, params)

      我们先看nsqd是如何处理client端发送过来的信息的。

    func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    	if bytes.Equal(params[0], []byte("IDENTIFY")) {
    		return p.IDENTIFY(client, params)
    	}
    	err := enforceTLSPolicy(client, p, params[0])
    	if err != nil {
    		return nil, err
    	}
    	switch {
    	case bytes.Equal(params[0], []byte("FIN")):
    		return p.FIN(client, params)
    	case bytes.Equal(params[0], []byte("RDY")):
    		return p.RDY(client, params)
    	case bytes.Equal(params[0], []byte("REQ")):
    		return p.REQ(client, params)
    	case bytes.Equal(params[0], []byte("PUB")):
    		return p.PUB(client, params)
    	case bytes.Equal(params[0], []byte("MPUB")):
    		return p.MPUB(client, params)
    	case bytes.Equal(params[0], []byte("DPUB")):
    		return p.DPUB(client, params)
    	case bytes.Equal(params[0], []byte("NOP")):
    		return p.NOP(client, params)
    	case bytes.Equal(params[0], []byte("TOUCH")):
    		return p.TOUCH(client, params)
    	case bytes.Equal(params[0], []byte("SUB")):
    		return p.SUB(client, params)
    	case bytes.Equal(params[0], []byte("CLS")):
    		return p.CLS(client, params)
    	case bytes.Equal(params[0], []byte("AUTH")):
    		return p.AUTH(client, params)
    	}
    	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
    }
    

      可以看到都是根据信息通过空格分隔后的第一段消息来进行判断,选择指令对应的处理方式。这儿可以对部分指令做一个说明

    • IDENTIFY  client端的身份验证,一般是client端刚连上nsqd后,在协议魔数之后发送的东西,主要提供一些该client的信息,具体的信息可以查看https://nsq.io/clients/tcp_protocol_spec.html
    • FIN  丢弃某个消息
    • PUB   以生产者的身份生产消息 上节末尾有讲到
    • MPUB   以生产者的身份生产多条信息
    • SUB 以消费者的身份消费消息

      本文主要讲下以消费者的身份消费消息。

    2.1 protocolV2.SUB

      该方法主要是client作为消费者的时候来消费消息。

    func (p *protocolV2) SUB(client *clientV2, params [][]byte) ([]byte, error) {
    	
    	//一些检查
    	if atomic.LoadInt32(&client.State) != stateInit {
    		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot SUB in current state")
    	}
    
    	if client.HeartbeatInterval <= 0 {
    		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "cannot SUB with heartbeats disabled")
    	}
    
    	if len(params) < 3 {
    		return nil, protocol.NewFatalClientErr(nil, "E_INVALID", "SUB insufficient number of parameters")
    	}
    	//获取到topicName
    	topicName := string(params[1])
    	if !protocol.IsValidTopicName(topicName) {
    		return nil, protocol.NewFatalClientErr(nil, "E_BAD_TOPIC",
    			fmt.Sprintf("SUB topic name %q is not valid", topicName))
    	}
        //获取到channeName
    	channelName := string(params[2])
    	if !protocol.IsValidChannelName(channelName) {
    		return nil, protocol.NewFatalClientErr(nil, "E_BAD_CHANNEL",
    			fmt.Sprintf("SUB channel name %q is not valid", channelName))
    	}
    	//确认该client'是否有权限
    	if err := p.CheckAuth(client, "SUB", topicName, channelName); err != nil {
    		return nil, err
    	}
    	//获取到指定topic下的指定channel
    	//之所以轮询获取是防止在获取后topic或者channel就已经关闭了
    	var channel *Channel
    	for {
    		topic := p.ctx.nsqd.GetTopic(topicName)
    		channel = topic.GetChannel(channelName)
    		channel.AddClient(client.ID, client)
    
    		if (channel.ephemeral && channel.Exiting()) || (topic.ephemeral && topic.Exiting()) {
    			channel.RemoveClient(client.ID)
    			time.Sleep(1 * time.Millisecond)
    			continue
    		}
    		break
    	}
    	atomic.StoreInt32(&client.State, stateSubscribed)
    	//设置该client指定的获取洗脑的channel
    	client.Channel = channel
    	// 将该channel放入SubEventChan 中
    	client.SubEventChan <- channel
    
    	return okBytes, nil
    }
    

        可以看到最后根据sub信息获取到指定的channel,并将这个channel发送到了SubEventChan。这就和上一节讲到的东西串起来了,在上一节生产者的消息成功的传递到了topic,最后成功的分发到了topic下的每个channel的memoryMsgChan中。而此处channel会被放到所有订阅了该channel的client中,后面的处理其实就很容易想到,只要每个client都启动一个goroute,在for中获取其所属channel中memoryMsgChan的值,那每当有消息到达channel时,其下的client就总有一个能获取到该消息。   其实这正是我们1中所看到的另一个重要的方法 go p.messagePump(client, messagePumpStartedChan)中的操作。

    3.处理client信号

      我们直接看下go p.messagePump(client, messagePumpStartedChan)。方法的具体内容。

    func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    	
    	//声明错误
    	var err error
    	//内存消息chan  为topic下
    	var memoryMsgChan chan *Message
    	var backendMsgChan chan []byte
    	var subChannel *Channel
    	// NOTE: `flusherChan` is used to bound message latency for
    	// the pathological case of a channel on a low volume topic
    	// with >1 clients having >1 RDY counts
    	var flusherChan <-chan time.Time
    	var sampleRate int32
    	//获取client下的subEventChan 
    	subEventChan := client.SubEventChan
    	//获取识别  identifyEventChan 
    	identifyEventChan := client.IdentifyEventChan
    	//输出buffer缓冲超时
    	outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    	//心跳超时
    	heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
    	heartbeatChan := heartbeatTicker.C
    	//消息超时
    	msgTimeout := client.MsgTimeout
    
    	
    	flushed := true
    
    	//可以接收client端发送的信息了
    	close(startedChan)
    
    	for {
    		//还没准备好的操作
    		if subChannel == nil || !client.IsReadyForMessages() {
    			// the client is not ready to receive messages...
    			memoryMsgChan = nil
    			backendMsgChan = nil
    			flusherChan = nil
    			// force flush
    			client.writeLock.Lock()
    			err = client.Flush()
    			client.writeLock.Unlock()
    			if err != nil {
    				goto exit
    			}
    			flushed = true
    		//如果需要刷新的话  会进行操作  	
    		} else if flushed {
    			// last iteration we flushed...
    			// do not select on the flusher ticker channel
    			memoryMsgChan = subChannel.memoryMsgChan
    			backendMsgChan = subChannel.backend.ReadChan()
    			flusherChan = nil
    		} else {
    			// we're buffered (if there isn't any more data we should flush)...
    			// select on the flusher ticker channel, too
    			memoryMsgChan = subChannel.memoryMsgChan
    			backendMsgChan = subChannel.backend.ReadChan()
    			flusherChan = outputBufferTicker.C
    		}
    
    		select {
    		//如果接收到刷新信号  则会将flushed置为true
    		//下次循环的时候就会重置 memoryMsgChan  backendMsgChan 等
    		case <-flusherChan:
    			client.writeLock.Lock()
    			err = client.Flush()
    			client.writeLock.Unlock()
    			if err != nil {
    				goto exit
    			}
    			flushed = true 
    		//如果接收到了ReadyStateChan   这儿没有操作
    		case <-client.ReadyStateChan:
    		//如果接收到了subEventChan   
    		//***********************************************
    		//** 注意这儿就是当client端发送sub指令时最后会  *
    		//** 将其所属的channel发送到subEventChan中.     *
    		//***********************************************
    		case subChannel = <-subEventChan:
    			// you can't SUB anymore
    			subEventChan = nil
    		//当client刚连接时会发送IDENTIFY指令  最后会将验证信息发送到该chan	
    		case identifyData := <-identifyEventChan:
    			// you can't IDENTIFY anymore
    			identifyEventChan = nil
    
    			outputBufferTicker.Stop()
    			if identifyData.OutputBufferTimeout > 0 {
    				outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
    			}
    
    			heartbeatTicker.Stop()
    			heartbeatChan = nil
    			if identifyData.HeartbeatInterval > 0 {
    				heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
    				heartbeatChan = heartbeatTicker.C
    			}
    
    			if identifyData.SampleRate > 0 {
    				sampleRate = identifyData.SampleRate
    			}
    
    			msgTimeout = identifyData.MsgTimeout
    		//如果到了心跳验证的话  发送心跳验证信息	
    		case <-heartbeatChan:
    			err = p.Send(client, frameTypeResponse, heartbeatBytes)
    			if err != nil {
    				goto exit
    			}
    		//接收备份信息并发送给client	
    		case b := <-backendMsgChan:
    			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
    				continue
    			}
    			//解码消息
    			msg, err := decodeMessage(b)
    			if err != nil {
    				p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
    				continue
    			}
    			//添加消息尝试次数
    			msg.Attempts++
    
    			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
    			client.SendingMessage()
    			//发送消息
    			err = p.SendMessage(client, msg)
    			if err != nil {
    				goto exit
    			}
    			//下次循环时刷新 
    			flushed = false
    		//接收实时消息并发送给client	
    		case msg := <-memoryMsgChan:
    			if sampleRate > 0 && rand.Int31n(100) > sampleRate {
    				continue
    			}
    			//添加消息尝试次数
    			msg.Attempts++
    
    			subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
    			client.SendingMessage()
    			//发送消息
    			err = p.SendMessage(client, msg)
    			if err != nil {
    				goto exit
    			}
    			//下次循环时刷新 
    			flushed = false
    		//如果接收到该client关闭信息  执行关闭代码块	
    		case <-client.ExitChan:
    			goto exit
    		}
    	}
    //关闭代码块
    exit:
    	p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
    	heartbeatTicker.Stop()
    	outputBufferTicker.Stop()
    	if err != nil {
    		p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
    	}
    }
    

        可以看到其实处理过程也是比较简单的。就是监听多个信号,然后根据不同的信号,做不同的事儿。可以发现goroute+chan的方式可以说将生产者/消费者模型用到了极致,而且不通方法之间极大的松耦合,并且提高了并发性,而不用担心某一环耗时导致整个操作都僵住,这应该也就是go语言的核心魅力了,对于想开发高并发应用来说也主要就是玩转这两了。 这儿需要特殊说明下的是subEventChan,这个chan获取到的值也就是第2步最后我们塞到subEventChan的值,也就是该client所订阅的channel

    到此,其实整个消息的生产以及消费就算走完了,当然,目前还没涉及到nsqlookupd,这个后面会再讲到。

      可以看到nsq的生产消费相比与大众化的消息中间件,多了一个channel组件。channel组件的存在支撑了点对点消息传递和广播式的消息传递。nsq在获取到消息后会将消息发送到topic,由topic遍历发送到其所属的每个channel,而channl的消息将被其下的随机一个client获取。

      如果需要多个消费端都接收到某个topic下的消息,则可以在该topic下创建多个channel,多个消费者都订阅自己的channel,这样topic有消息则都可以接收到。

      如果需要多个消费端随机一个接收某个topic下的消息(比如需要负载均衡的场景),则可以多个消费端都只订阅topic下的唯一一个channel,nsq会将消息只发送到某个client

    可以再回顾下nsq官网的那种消息流转图,用来加深理解

     

     

  • 相关阅读:
    redhat 新装后不能联网
    [Linux 命令]df -h
    redhat安装VMware tools的方法
    linux 进入包含空格文件名的文件夹
    Redhat 使用中文安装后更换为英文的设定
    HibernateDaoSupport类的使用
    java中重载与重写的区别
    Servlet中Service方法
    持久化框架Hibernate 开发实例(二)
    持久化框架Hibernate 开发实例(一)
  • 原文地址:https://www.cnblogs.com/hetutu-5238/p/13043129.html
Copyright © 2011-2022 走看看