生产者
package main import ( "fmt" "github.com/nsqio/go-nsq" ) func main() { nsqdTcpAddress := "172.16.30.129:4150" config := nsq.NewConfig() tPro, err := nsq.NewProducer(nsqdTcpAddress, config) if err != nil { fmt.Println(err) } // topic topic := "topic_test" // message tCommand := "new message!" //发布消息 err = tPro.Publish(topic, []byte(tCommand)) if err != nil { fmt.Println(err) } }
消费者
package main import ( "github.com/nsqio/go-nsq" "fmt" "sync" ) // 实现HandleMessage接口方法 type NsqHandler struct { // 根据自身业务定义需要的字段信息 } // message 消息处理 func (s *NsqHandler) HandleMessage(message *nsq.Message) error { // 处理消息,此处仅打印 fmt.Println("消息内容:", string(message.Body)) return nil } func main() { //初始化配置 config := nsq.NewConfig() //创造消费者,指定topic,channel topicName := "topic_test" channelName := "test_channel" com, err := nsq.NewConsumer(topicName, channelName, config) if err != nil { fmt.Println(err) } //添加处理回调 com.AddHandler(&NsqHandler{}) //连接对应的nsqd nsqdTcpAddress := "172.16.30.129:4150" err = com.ConnectToNSQD(nsqdTcpAddress) if err != nil { fmt.Println(err) } //只是为了不结束此进程,这里没有意义 var wg = &sync.WaitGroup{} wg.Add(1) wg.Wait() }