zoukankan      html  css  js  c++  java
  • NSQ golang

    生产者

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
    )
    
    func main() {
    
        nsqdTcpAddress := "172.16.30.129:4150"
        config := nsq.NewConfig()
        tPro, err := nsq.NewProducer(nsqdTcpAddress, config)
        if err != nil {
            fmt.Println(err)
        }
        // topic
        topic := "topic_test"
        // message
        tCommand := "new message!"
        //发布消息
        err = tPro.Publish(topic, []byte(tCommand))
        if err != nil {
            fmt.Println(err)
        }
    
    }

    消费者

    package main
    
    import (
    "github.com/nsqio/go-nsq"
    "fmt"
    "sync"
    )
    
    // 实现HandleMessage接口方法
    type NsqHandler struct {
        // 根据自身业务定义需要的字段信息
    }
    
    // message 消息处理
    func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    
        // 处理消息,此处仅打印
        fmt.Println("消息内容:", string(message.Body))
        return nil
    }
    
    func main() {
    
        //初始化配置
        config := nsq.NewConfig()
        //创造消费者,指定topic,channel
        topicName := "topic_test"
        channelName := "test_channel"
        com, err := nsq.NewConsumer(topicName, channelName, config)
        if err != nil {
            fmt.Println(err)
        }
    
        //添加处理回调
        com.AddHandler(&NsqHandler{})
        //连接对应的nsqd
        nsqdTcpAddress := "172.16.30.129:4150"
        err = com.ConnectToNSQD(nsqdTcpAddress)
        if err != nil {
            fmt.Println(err)
        }
    
        //只是为了不结束此进程,这里没有意义
        var wg  = &sync.WaitGroup{}
        wg.Add(1)
        wg.Wait()
    }
  • 相关阅读:
    Django Form组件的扩展
    Python TCP与UDP的区别
    Python三次握手和四次挥手
    网络基础之网络协议
    Python 类方法、实例方法、静态方法的使用与及实例
    python深浅拷贝
    2021牛客寒假算法基础集训营1 题解
    01 Trie 专题
    MOTS:多目标跟踪和分割论文翻译
    牛客巅峰赛S2第6场题解
  • 原文地址:https://www.cnblogs.com/hcy-fly/p/11966458.html
Copyright © 2011-2022 走看看