zoukankan      html  css  js  c++  java
  • NSQ(2)-生产者发布消息流程

    生产者发送消息源码剖析

    func (c *ProducerHandler) SendMsg() {
    	conf := nsq.NewConfig()
    	p, err := nsq.NewProducer("127.0.0.1:4150", conf)
    	if err != nil {
    		logs.Error("create producer failed, err: %+v
    ", err)
    		return
    	}
    
    	for {
    		message := "time: " + time.Now().Format("2006-01-02 15:04:05")
    		if err = p.Publish(topic, []byte(message)); err != nil {
    			logs.Error("publish message failed, err: %+v
    ", err)
    			return
    		}
    
    		time.Sleep(10 * time.Second)
    	}
    }
    
    • 首先通过nsq的NewProducer的方法构建了一个producer,注意在这里传入的adder是sqd的地址,接下来就是调用producer的publish将我们要发送的topic和消息发布出去。
    // Publish synchronously publishes a message body to the specified topic, returning
    // an error if publish failed
    func (w *Producer) Publish(topic string, body []byte) error {
    	return w.sendCommand(Publish(topic, body))
    }
    
    // Publish creates a new Command to write a message to a given topic
    func Publish(topic string, body []byte) *Command {
    	var params = [][]byte{[]byte(topic)}
    	return &Command{[]byte("PUB"), params, body}
    }
    
    • 调用Publish方法,将topic和要发布的消息封装成 command 结构体,通过sendCommand 方法进行发送。
    func (w *Producer) sendCommand(cmd *Command) error {
    	doneChan := make(chan *ProducerTransaction)
    	err := w.sendCommandAsync(cmd, doneChan, nil)
    	if err != nil {
    		close(doneChan)
    		return err
    	}
    	t := <-doneChan
    	return t.Error
    }
    
    • 声明一个 doneChan 来标识发送是否完成,同时通过sendCommandAsync核心方法进行发送。
    func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
    	args []interface{}) error {
    	// keep track of how many outstanding producers we're dealing with
    	// in order to later ensure that we clean them all up...
    	atomic.AddInt32(&w.concurrentProducers, 1)
    	defer atomic.AddInt32(&w.concurrentProducers, -1)
    
    	if atomic.LoadInt32(&w.state) != StateConnected {
    		err := w.connect()
    		if err != nil {
    			return err
    		}
    	}
    
    	t := &ProducerTransaction{
    		cmd:      cmd,
    		doneChan: doneChan,
    		Args:     args,
    	}
    
    	select {
    	case w.transactionChan <- t:
    	case <-w.exitChan:
    		return ErrStopped
    	}
    
    	return nil
    }
    
    • 记录正在发送的消息的数目, 之后检查produer是否和nsqd建立了连接,没有的话先建立连接,最后将传入的参数封装成ProducerTransaction放到transactionChan 中等待被发送。
    func (w *Producer) connect() error {
    	w.guard.Lock()
    	defer w.guard.Unlock()
    
    	if atomic.LoadInt32(&w.stopFlag) == 1 {
    		return ErrStopped
    	}
    
    	switch state := atomic.LoadInt32(&w.state); state {
    	case StateInit:
    	case StateConnected:
    		return nil
    	default:
    		return ErrNotConnected
    	}
    
    	w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)
    
    	logger, logLvl := w.getLogger()
    
    	w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
    	w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
    
    	_, err := w.conn.Connect()
    	if err != nil {
    		w.conn.Close()
    		w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
    		return err
    	}
    	atomic.StoreInt32(&w.state, StateConnected)
    	w.closeChan = make(chan int)
    	w.wg.Add(1)
    	go w.router()
    
    	return nil
    }
    
    • 在connect中,先检查producer的连接状态是否可以进行连接,依旧是通过NewConn方法创建conn,并且将producer进行了委托包装,创建成功后建立连接,最后启动了一个goroutine。
    • 连接方法w.conn.Connect()和consumer的是一样的,就是在建立完连接后会启动两个goroutine分别用来读和写。我们重点看一下router方法:
    func (w *Producer) router() {
    	for {
    		select {
    		case t := <-w.transactionChan:
    			w.transactions = append(w.transactions, t)
    			err := w.conn.WriteCommand(t.cmd)
    			if err != nil {
    				w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
    				w.close()
    			}
    		case data := <-w.responseChan:
    			w.popTransaction(FrameTypeResponse, data)
    		case data := <-w.errorChan:
    			w.popTransaction(FrameTypeError, data)
    		case <-w.closeChan:
    			goto exit
    		case <-w.exitChan:
    			goto exit
    		}
    	}
    
    exit:
    	w.transactionCleanup()
    	w.wg.Done()
    	w.log(LogLevelInfo, "exiting router")
    }
    
    • 在这个方法里就是监听多个chan,分别是:是否有需要发送的消息,是否有收到的响应,是否有错误,是否有退出消息,当成功或者失败发布消息的时候,data都会得到数据并调用popTransaction方法。
    func (w *Producer) popTransaction(frameType int32, data []byte) {
    	t := w.transactions[0]
    	w.transactions = w.transactions[1:]
    	if frameType == FrameTypeError {
    		t.Error = ErrProtocol{string(data)}
    	}
    	t.finish()
    }
    
    • 首先获取第一个transactions中的元素,如果是错误的响应,那么给他的Error上设置错误信息,最后调用finish方法。
    func (t *ProducerTransaction) finish() {
    	if t.doneChan != nil {
    		t.doneChan <- t
    	}
    }
    
    • 就是向发送消息时创建的doneChan中传入发送结果,那么用户就可以通过doneChan知道消息是否发送成功。
    所有博文均为原著,如若转载,请注明出处!
  • 相关阅读:
    JSON.stringify()学习
    【EntityFramework学习笔记】为什么要使用迁移
    使用Telerik的DataPager进行服务器端分页(改进版)
    使用Telerik的DataPager进行服务器端分页
    使用MVVM写的WPF分页控件
    【PostgreSQL】PostgreSQL的安装
    【Telerik】<telerik:RadGridView/>控件的使用
    【PostgreSQL】PostGreSQL数据库,时间数据类型
    WCF的同步和异步(以WPF连接为例)
    【WPF】绑定数据
  • 原文地址:https://www.cnblogs.com/zpcoding/p/14517499.html
Copyright © 2011-2022 走看看