zoukankan      html  css  js  c++  java
  • golang消息队列nsq

    golang消息队列nsq

    1、NSQ下载链接
    http://nsq.io/deployment/installing.html

    win下安装

    追加I: sq-1.2.0.windows-amd64.go1.12.9in

    2. 打开命令窗口,运行:nsqlookupd
    
    3. 打开新的命令窗口,运行:nsqd --lookupd-tcp-address=127.0.0.1:4160 //此处4160是nsqd与lookupd进行tcp连接的端口
    
    4. 打开新的命令窗口,运行: nsqadmin --lookupd-http-address=127.0.0.1:4161 //此处4161是nsqadmin与lookupd进行http连接的端口
    

      http://127.0.0.1:4171/ 后台查看

    package main
    
    import (
        "github.com/nsqio/go-nsq"
        "fmt"
    )
    
    var (
        //nsqd的地址,使用了tcp监听的端口
        tcpNsqdAddrr = "127.0.0.1:4150"
    )
    
    func main() {
        //初始化配置
        config := nsq.NewConfig()
        for i := 0; i < 100; i++ {
            //创建100个生产者
            tPro, err := nsq.NewProducer(tcpNsqdAddrr, config)
            if err != nil {
                fmt.Println(err)
            }
            //主题
            topic := "Insert"
            //主题内容
            tCommand := "new data!"
            //发布消息
            err = tPro.Publish(topic, []byte(tCommand))
            if err != nil {
                fmt.Println(err)
            }
        }
    
    }

    运行如上代码

     生产了100个

    接下来消费

    package main
    
    import (
        "github.com/nsqio/go-nsq"
        "fmt"
        "sync"
        "time"
    )
    
    var (
        //nsqd的地址,使用了tcp监听的端口
        tcpNsqdAddrr = "127.0.0.1:4150"
    )
    
    //声明一个结构体,实现HandleMessage接口方法(根据文档的要求)
    type NsqHandler struct {
        //消息数
        msqCount int64
        //标识ID
        nsqHandlerID string
    }
    
    //实现HandleMessage方法
    //message是接收到的消息
    func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
        //没收到一条消息+1
        s.msqCount++
        //打印输出信息和ID
        fmt.Println(s.msqCount,s.nsqHandlerID)
        //打印消息的一些基本信息
        fmt.Printf("msg.Timestamp=%v, msg.nsqaddress=%s,msg.body=%s 
    ", time.Unix(0  , message.Timestamp).Format("2006-01-02 03:04:05") , message.NSQDAddress, string(message.Body))
        return nil
    }
    
    func main() {
         //这个是监听 队列
        //初始化配置
        config := nsq.NewConfig()
        //创造消费者,参数一时订阅的主题,参数二是使用的通道
        com, err := nsq.NewConsumer("Insert", "channel1", config)
        if err != nil {
            fmt.Println(err)
        }
        //添加处理回调
        com.AddHandler(&NsqHandler{nsqHandlerID: "One"})
        //连接对应的nsqd
        err = com.ConnectToNSQD(tcpNsqdAddrr)
        if err != nil {
            fmt.Println(err)
        }
    
        //只是为了不结束此进程,这里没有意义
        var wg = &sync.WaitGroup{}
        wg.Add(1)
        wg.Wait()
    }

    这个nsq 问题有点多 建议不要用

  • 相关阅读:
    js中for..of..和迭代器
    Python与其他语言的区别
    数据结构和算法部分总结
    MVC设计模式及SSH框架的介绍
    块级元素和行内元素以及display中block、inline和inline-block的区别
    线程安全和非线程安全
    Spring MVC拦截器入门
    Java反射的理解
    mybatis
    重定向和转发
  • 原文地址:https://www.cnblogs.com/newmiracle/p/12981033.html
Copyright © 2011-2022 走看看