zoukankan      html  css  js  c++  java
  • nsq源码-消息接收和发送完整流程

    一、TCP Handler

    nsqd里面的Main函数。

    //nsqd.go
    func (n *NSQD) Main() error {
    	//...
    	n.waitGroup.Wrap(func() {
    			exitFunc(protocol.TCPServer(n.tcpListener, n.tcpServer, n.logf))
    		})
    	//...
    }
    
    //tcp_server.go
    func TCPServer(listener net.Listener, handler TCPHandler, logf lg.AppLogFunc) error {
    	//...
    	go func() {
    			handler.Handle(clientConn)
    			wg.Done()
    		}()
    	//...
    }
    

    n.tcpServer实现了TCPHandler接口
    在nsqd.New()里面创建的tcpServer

    //nsqd.go
    func New(opts *Options) (*NSQD, error) {
    	//...
    	n.tcpServer = &tcpServer{}
    	//...
    }
    

    看下tcpServer实现的Handle接口里做了什么

    func (p *tcpServer) Handle(clientConn net.Conn) {
    
    	//...
    	//从socket中读取数据
    	buf := make([]byte, 4)
    	_, err := io.ReadFull(clientConn, buf)
    	if err != nil {
    		p.nsqd.logf(LOG_ERROR, "failed to read protocol version - %s", err)
    		clientConn.Close()
    		return
    	}
    	protocolMagic := string(buf)
    
    	p.nsqd.logf(LOG_INFO, "CLIENT(%s): desired protocol magic '%s'",
    		clientConn.RemoteAddr(), protocolMagic)
    
    	var prot protocol.Protocol
    	switch protocolMagic {
    	case "  V2":
    		//这里是关键,创建了一个protocolV2对象
    		prot = &protocolV2{nsqd: p.nsqd}
    	default:
    		protocol.SendFramedResponse(clientConn, frameTypeError, []byte("E_BAD_PROTOCOL"))
    		clientConn.Close()
    		p.nsqd.logf(LOG_ERROR, "client(%s) bad protocol magic '%s'",
    			clientConn.RemoteAddr(), protocolMagic)
    		return
    	}
    
    	p.conns.Store(clientConn.RemoteAddr(), clientConn)
    
    	//开启protocolV2的IOLoop,这是一个客户端连接的“守护”协程
    	//接收消息和发送消息给客户端,都在这里面处理了
    	err = prot.IOLoop(clientConn)
    	if err != nil {
    		p.nsqd.logf(LOG_ERROR, "client(%s) - %s", clientConn.RemoteAddr(), err)
    	}
    
    	p.conns.Delete(clientConn.RemoteAddr())
    }
    
    //protocol_v2.go
    //因为这里是在Handler里启动的,所以这里其实是为每个客户端都启动了一个Loop
    func (p *protocolV2) IOLoop(conn net.Conn) error {
    	...
    	clientID := atomic.AddInt64(&p.nsqd.clientIDSequence, 1)
    	client := newClientV2(clientID, conn, p.nsqd)
    	p.nsqd.AddClient(client.ID, client)
    
    	...
    	// messagePump负责从channel的memoryMsgChan和
    	//backend.ReadChan()中读取消息并将消息推送给client
    
    	messagePumpStartedChan := make(chan bool)
    	go p.messagePump(client, messagePumpStartedChan)
    	<-messagePumpStartedChan
    
    	//下面这个for循环负责接收客户端消息,比如消费订阅,以及生产消息等
    	//主要逻辑在p.Exec()里
    	for {
    		...
    		//主要逻辑在这个Exec里面
    		response, err = p.Exec(client, params)
    		...
    	}
    
    	//...
    }
    

    二、接收消息

    上面已经看到处理客户端消息主要在protocolV2.Exec()里
    这段代码我觉得很好理解了,直接去protocolV2.PUB()
    看客户端生产消息的逻辑,其他的指令先不看

    func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) {
    	if bytes.Equal(params[0], []byte("IDENTIFY")) {
    		return p.IDENTIFY(client, params)
    	}
    	err := enforceTLSPolicy(client, p, params[0])
    	if err != nil {
    		return nil, err
    	}
    	switch {
    	...
            //这里就是客户端生产消息的指令处理了
    	case bytes.Equal(params[0], []byte("PUB")):
    		return p.PUB(client, params)
    	...
    	case bytes.Equal(params[0], []byte("SUB")):
    		return p.SUB(client, params)
    	}
    	...
    	return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0]))
    }
    
    func (p *protocolV2) PUB(client *clientV2, params [][]byte) ([]byte, error) {
    	var err error
    	...
    	bodyLen, err := readLen(client.Reader, client.lenSlice)
    	...
    	//读取消息主体
    	messageBody := make([]byte, bodyLen)
    	_, err = io.ReadFull(client.Reader, messageBody)
    	if err != nil {
    		return nil, protocol.NewFatalClientErr(err, "E_BAD_MESSAGE", "PUB failed to read message body")
    	}
    
    	...
    	//将消息丢到topic.PutMessage()
    	//PutMessage直接将消息丢到msgChan或者diskqueue了
    	//关于topic的部分,参考https://www.cnblogs.com/werben/p/14518283.html
    	topic := p.nsqd.GetTopic(topicName)
    	msg := NewMessage(topic.GenerateID(), messageBody)
    	err = topic.PutMessage(msg)
    	if err != nil {
    		return nil, protocol.NewFatalClientErr(err, "E_PUB_FAILED", "PUB failed "+err.Error())
    	}
    
    	client.PublishedMessage(topicName, 1)
    
    	return okBytes, nil
    }
    

    三、发送消息

    现在来搞清楚,服务器端又在哪里发送消息给consumer?
    在这protocolV2.messagePump()

    func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    	var err error
    	var memoryMsgChan chan *Message
    	var backendMsgChan <-chan []byte
    	var subChannel *Channel
    	var flusherChan <-chan time.Time
    	var sampleRate int32
    
    	//客户端执行Sub的时候,会将Channle丢到这个SubEventChan里
    	//可以去看protocolV2.SUB()函数
    	subEventChan := client.SubEventChan
    
    	//鉴权Identify对应的chan,只能鉴权一次 
    	identifyEventChan := client.IdentifyEventChan
    
    	//flushChan赋值outputBufferTicker,默认是250ms时间间隔Flush一次数据
    	outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    
    	heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
    	heartbeatChan := heartbeatTicker.C
    	msgTimeout := client.MsgTimeout
    	flushed := true
    	close(startedChan)
    
    	for {
    		// 检查订阅状态和消息是否可处理状态
    		if subChannel == nil || !client.IsReadyForMessages() {
    			// the client is not ready to receive messages...
    			memoryMsgChan = nil
    			backendMsgChan = nil
    			flusherChan = nil
    			// force flush
    			client.writeLock.Lock()
    			err = client.Flush()
    			client.writeLock.Unlock()
    			if err != nil {
    				goto exit
    			}
    			flushed = true
    		} else if flushed {
    			// last iteration we flushed...
    			// do not select on the flusher ticker channel
    			memoryMsgChan = subChannel.memoryMsgChan
    			backendMsgChan = subChannel.backend.ReadChan()
    			flusherChan = nil
    		} else {
    			//这个memoryMsgChan是channel将消息存在内存中的地方
    			memoryMsgChan = subChannel.memoryMsgChan
    			//这个backendMsgChan是channel将消息存在磁盘中的地方
    			backendMsgChan = subChannel.backend.ReadChan()
    
    			flusherChan = outputBufferTicker.C
    		}
    		fmt.Printf("werben subChannel nil: %t
    ", subChannel == nil)
    
    		select {
    		case <-flusherChan:
    			//这个flusherChan就是outputBufferTicker
    			//250ms时间间隔Flush一次数据
    			client.writeLock.Lock()
    			err = client.Flush()
    			client.writeLock.Unlock()
    			if err != nil {
    				goto exit
    			}
    			flushed = true
    		case <-client.ReadyStateChan:
    		case subChannel = <-subEventChan:
    			//客户端Sub的时候,会将channel传到这个subEventChan通道,
    			//参考protocolV2.SUB()函数
    			subEventChan = nil
    		case identifyData := <-identifyEventChan:
    			//客户端提交identify时出发,只能提交一次identify,
    			//参考函数protocolV2.IDENTIFY()
    
    			//感觉这里就是在收到这个消息时
    			//重新启动心跳和flush同步的ticker
    			identifyEventChan = nil
    
    			outputBufferTicker.Stop()
    			if identifyData.OutputBufferTimeout > 0 {
    				outputBufferTicker = time.NewTicker(identifyData.OutputBufferTimeout)
    			}
    
    			heartbeatTicker.Stop()
    			heartbeatChan = nil
    			if identifyData.HeartbeatInterval > 0 {
    				heartbeatTicker = time.NewTicker(identifyData.HeartbeatInterval)
    				heartbeatChan = heartbeatTicker.C
    			}
    
    			if identifyData.SampleRate > 0 {
    				sampleRate = identifyData.SampleRate
    			}
    
    			msgTimeout = identifyData.MsgTimeout
    		case <-heartbeatChan:
    			//心跳处理
    			err = p.Send(client, frameTypeResponse, heartbeatBytes)
    			if err != nil {
    				goto exit
    			}
    		case b := <-backendMsgChan:
    			...
    			//磁盘消息处理
    			client.SendingMessage()
    			...
    			err = p.SendMessage(client, msg)
    			...
    			flushed = false
    		case msg := <-memoryMsgChan:
    			//将内存消息发送给客户端
    			client.SendingMessage()
    			...
    			err = p.SendMessage(client, msg)
    			...
    			flushed = false
    		case <-client.ExitChan:
    			goto exit
    		}
    	}
    	exit:
    		...
    		//结束时候关闭心跳和flush同步的ticke
    		heartbeatTicker.Stop()
    		outputBufferTicker.Stop()
    		...
    }
    
  • 相关阅读:
    vscode conda 配置python环境(windows)
    Linux分区只能分两个,无法安装双系统(解决)
    离散傅里叶变换,逆变换(c语言)
    vscode 配置task.json,执行多条指令
    cmake出错:CMAKE_CXX_COMPILER设置后,提示没有设置,找不到make命令的可执行程序
    产生makefiles文件后,make命令不可用
    Kali 下载地址
    vc6 保存文件卡住
    fatal error LNK1169: one or more multiply defined symbols found
    Google Chrome 退出清除浏览数据
  • 原文地址:https://www.cnblogs.com/werben/p/14523875.html
Copyright © 2011-2022 走看看