生产者发送消息源码剖析
func (c *ProducerHandler) SendMsg() {
conf := nsq.NewConfig()
p, err := nsq.NewProducer("127.0.0.1:4150", conf)
if err != nil {
logs.Error("create producer failed, err: %+v
", err)
return
}
for {
message := "time: " + time.Now().Format("2006-01-02 15:04:05")
if err = p.Publish(topic, []byte(message)); err != nil {
logs.Error("publish message failed, err: %+v
", err)
return
}
time.Sleep(10 * time.Second)
}
}
- 首先通过nsq的NewProducer的方法构建了一个producer,注意在这里传入的adder是sqd的地址,接下来就是调用producer的publish将我们要发送的topic和消息发布出去。
// Publish synchronously publishes a message body to the specified topic, returning
// an error if publish failed
func (w *Producer) Publish(topic string, body []byte) error {
return w.sendCommand(Publish(topic, body))
}
// Publish creates a new Command to write a message to a given topic
func Publish(topic string, body []byte) *Command {
var params = [][]byte{[]byte(topic)}
return &Command{[]byte("PUB"), params, body}
}
- 调用Publish方法,将topic和要发布的消息封装成 command 结构体,通过sendCommand 方法进行发送。
func (w *Producer) sendCommand(cmd *Command) error {
doneChan := make(chan *ProducerTransaction)
err := w.sendCommandAsync(cmd, doneChan, nil)
if err != nil {
close(doneChan)
return err
}
t := <-doneChan
return t.Error
}
- 声明一个 doneChan 来标识发送是否完成,同时通过sendCommandAsync核心方法进行发送。
func (w *Producer) sendCommandAsync(cmd *Command, doneChan chan *ProducerTransaction,
args []interface{}) error {
// keep track of how many outstanding producers we're dealing with
// in order to later ensure that we clean them all up...
atomic.AddInt32(&w.concurrentProducers, 1)
defer atomic.AddInt32(&w.concurrentProducers, -1)
if atomic.LoadInt32(&w.state) != StateConnected {
err := w.connect()
if err != nil {
return err
}
}
t := &ProducerTransaction{
cmd: cmd,
doneChan: doneChan,
Args: args,
}
select {
case w.transactionChan <- t:
case <-w.exitChan:
return ErrStopped
}
return nil
}
- 记录正在发送的消息的数目, 之后检查produer是否和nsqd建立了连接,没有的话先建立连接,最后将传入的参数封装成ProducerTransaction放到transactionChan 中等待被发送。
func (w *Producer) connect() error {
w.guard.Lock()
defer w.guard.Unlock()
if atomic.LoadInt32(&w.stopFlag) == 1 {
return ErrStopped
}
switch state := atomic.LoadInt32(&w.state); state {
case StateInit:
case StateConnected:
return nil
default:
return ErrNotConnected
}
w.log(LogLevelInfo, "(%s) connecting to nsqd", w.addr)
logger, logLvl := w.getLogger()
w.conn = NewConn(w.addr, &w.config, &producerConnDelegate{w})
w.conn.SetLogger(logger, logLvl, fmt.Sprintf("%3d (%%s)", w.id))
_, err := w.conn.Connect()
if err != nil {
w.conn.Close()
w.log(LogLevelError, "(%s) error connecting to nsqd - %s", w.addr, err)
return err
}
atomic.StoreInt32(&w.state, StateConnected)
w.closeChan = make(chan int)
w.wg.Add(1)
go w.router()
return nil
}
- 在connect中,先检查producer的连接状态是否可以进行连接,依旧是通过NewConn方法创建conn,并且将producer进行了委托包装,创建成功后建立连接,最后启动了一个goroutine。
- 连接方法w.conn.Connect()和consumer的是一样的,就是在建立完连接后会启动两个goroutine分别用来读和写。我们重点看一下router方法:
func (w *Producer) router() {
for {
select {
case t := <-w.transactionChan:
w.transactions = append(w.transactions, t)
err := w.conn.WriteCommand(t.cmd)
if err != nil {
w.log(LogLevelError, "(%s) sending command - %s", w.conn.String(), err)
w.close()
}
case data := <-w.responseChan:
w.popTransaction(FrameTypeResponse, data)
case data := <-w.errorChan:
w.popTransaction(FrameTypeError, data)
case <-w.closeChan:
goto exit
case <-w.exitChan:
goto exit
}
}
exit:
w.transactionCleanup()
w.wg.Done()
w.log(LogLevelInfo, "exiting router")
}
- 在这个方法里就是监听多个chan,分别是:是否有需要发送的消息,是否有收到的响应,是否有错误,是否有退出消息,当成功或者失败发布消息的时候,data都会得到数据并调用popTransaction方法。
func (w *Producer) popTransaction(frameType int32, data []byte) {
t := w.transactions[0]
w.transactions = w.transactions[1:]
if frameType == FrameTypeError {
t.Error = ErrProtocol{string(data)}
}
t.finish()
}
- 首先获取第一个transactions中的元素,如果是错误的响应,那么给他的Error上设置错误信息,最后调用finish方法。
func (t *ProducerTransaction) finish() {
if t.doneChan != nil {
t.doneChan <- t
}
}
- 就是向发送消息时创建的doneChan中传入发送结果,那么用户就可以通过doneChan知道消息是否发送成功。