zoukankan      html  css  js  c++  java
  • golang nsq示例使用介绍

    消息队列废话不多说了,直切正题吧:
    直接去https://github.com/nsqio/nsq/releases下载编译好的执行文件,比如我下载的是nsq-1.2.0.linux-amd64.go1.12.9.tar.gz

    直接解压后(我的重命名了nsq-1.2.0):bin目录里就会出现一大堆nsq_…开头的可执行文件:

    进入bin目录

    cd nsq-1.2.0/bin

    启动nsqlookup

    ./nsqlookupd &

    启动nsqd

    ./nsqd --lookupd-tcp-address=127.0.0.1:4160 &

    运行 nsqadmin 管理

    ./nsqadmin --lookupd-http-address=0.0.0.0:4161 --http-address=10.20.23.29:8761&

    客户端写入消息

    curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=hello word'

    队列消息写入文件

    ./nsq_to_file --topic=test --output-dir=/tmp/log --lookupd-http-address=127.0.0.1:4161

    访问nsqadmin(以下是本机ip)

    http://10.20.23.29:8761/

    上面是结束了nsql的常用使用组件,我们再来回顾下上面的概念问题:

    Nsq服务端

    nsqlookupd

    看看官方的原话是怎么说:
    nsqlookupd是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息
    
    简单的说nsqlookupd就是中心管理服务,它使用tcp(默认端口4160)管理nsqd服务,使用http(默认端口4161)管理nsqadmin服务。同时为客户端提供查询功能
    
    总的来说,nsqlookupd具有以下功能或特性
    
    唯一性,在一个Nsq服务中只有一个nsqlookupd服务。当然也可以在集群中部署多个nsqlookupd,但它们之间是没有关联的
    去中心化,即使nsqlookupd崩溃,也会不影响正在运行的nsqd服务
    充当nsqd和naqadmin信息交互的中间件
    提供一个http查询服务,给客户端定时更新nsqd的地址目录

    nsqadmin

    官方原话:是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务
    
    总的来说,nsqadmin具有以下功能或特性
    
    提供一个对topic和channel统一管理的操作界面以及各种实时监控数据的展示,界面设计的很简洁,操作也很简单
    展示所有message的数量,恩…装X利器
    能够在后台创建topic和channel,这个应该不常用到
    nsqadmin的所有功能都必须依赖于nsqlookupd,nsqadmin只是向nsqlookupd传递用户操作并展示来自nsqlookupd的数据

    nsqadmin默认的访问地址是 http://127.0.0.1:4171/

    使用

    创建一个channer001主题,并发送一个how are you消息

    curl -d 'how are you' 'http://127.0.0.1:4151/pub?topic=channer001'

    nsqd

    官方原话:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端
    
    简单的说,真正干活的就是这个服务,它主要负责message的收发,队列的维护。nsqd会默认监听一个tcp端口(4150)和一个http端口(4151)以及一个可选的https端口
    
    总的来说,nsqd 具有以下功能或特性
    
    对订阅了同一个topic,同一个channel的消费者使用负载均衡策略(不是轮询)
    只要channel存在,即使没有该channel的消费者,也会将生产者的message缓存到队列中(注意消息的过期处理)
    保证队列中的message至少会被消费一次,即使nsqd退出,也会将队列中的消息暂存磁盘上(结束进程等意外情况除外)
    限定内存占用,能够配置nsqd中每个channel队列在内存中缓存的message数量,一旦超出,message将被缓存到磁盘中
    topic,channel一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的topic和channel,避免资源的浪费

    这是官方的图,第一个channel(meteics)因为有多个消费者,所以触发了负载均衡机制。后面两个channel由于没有消费者,所有的message均会被缓存在相应的队列里,直到消费者出现
    在这里插入图片描述

    使用Go操作NSQ

    官方提供了Go语言版的客户端:go-nsq,更多客户端支持请查看CLIENT LIBRARIES

    安装

    go get -u github.com/nsqio/go-nsq

    1.生产者

    一个简单的生产者示例代码如下:

    // gin-vue/product.go
    package main
    
    import (
        "log"
        "github.com/nsqio/go-nsq"
        "io/ioutil"
        "strconv"
    )
    
    var nullLogger = log.New(ioutil.Discard, "", log.LstdFlags)
    
    func sendMessage() {
        config := nsq.NewConfig() // 1. 创建生产者
        producer, err := nsq.NewProducer("127.0.0.1:4150", config)
        if err != nil {
            log.Fatalln("连接失败: (127.0.0.1:4150)", err)
        }
    
        errPing := producer.Ping() // 2. 生产者ping
        if errPing != nil {
            log.Fatalln("无法ping通: 127.0.0.1:4150", errPing)
        }
    
        producer.SetLogger(nullLogger, nsq.LogLevelInfo) // 3. 设置不输出info级别的日志
    
        for i := 0; i < 5; i++ { // 4. 生产者发布消息
            message := "消息发送测试 " + strconv.Itoa(i+10000)
            err2 := producer.Publish("one-test", []byte(message)) // 注意one-test 对应消费者consumer.go 保持一致
            if err2 != nil {
                log.Panic("生产者推送消息失败!")
            }
        }
    
        producer.Stop() // 5. 生产者停止执行
    }
    
    func main() {
        sendMessage()
    }

    2.消费者

    创建消费者的方式有两种,一种是通过http请求来发现nsqd生产者和配置的topic,另一种是直接使用tcp请求来连接本地实例 。

    示例代码如下所示:

    // gin-vue/consumer.go
    package main
    
    import (
        "log"
        "github.com/nsqio/go-nsq"
        "fmt"
    )
    
    func doConsumerTask() {
        // 1. 创建消费者
        config := nsq.NewConfig()
        q, errNewCsmr := nsq.NewConsumer("one-test", "ch-one-test", config)
        if errNewCsmr != nil {
            fmt.Printf("fail to new consumer!, topic=%s, channel=%s", "one-test", "ch-one-test")
        }
    
        // 2. 添加处理消息的方法
        q.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
            log.Printf("message: %v", string(message.Body))
            message.Finish()
            return nil
        }))
    
        // 3. 通过http请求来发现nsqd生产者和配置的topic(推荐使用这种方式)
        lookupAddr := []string{
            "127.0.0.1:4161",
        }
        err := q.ConnectToNSQLookupds(lookupAddr)
        if err != nil {
            log.Panic("[ConnectToNSQLookupds] Could not find nsqd!")
        }
    
        // 4. 接收消费者停止通知
        <-q.StopChan
    
        // 5. 获取统计结果
        stats := q.Stats()
        fmt.Sprintf("message received %d, finished %d, requeued:%s, connections:%s",
            stats.MessagesReceived, stats.MessagesFinished, stats.MessagesRequeued, stats.Connections)
    }
    
    func main() {
        doConsumerTask()
    }

    将上面的代码编译执行,然后在终端输入后效果如下:

    使用浏览器打开http://10.20.23.29:8761/可以查看到类似下面的页面: 在下面这个页面能看到当前的topic信息: 

    点击对应的topic进去查看详情: 


    未完待续........更多关于go-nsq的更多内容请阅读go-nsq的官方文档


  • 相关阅读:
    Study Plan The TwentySecond Day
    Study Plan The Nineteenth Day
    Study Plan The TwentySeventh Day
    Study Plan The Twentieth Day
    Study Plan The TwentyFirst Day
    python实现进程的三种方式及其区别
    yum makecache
    JSONPath 表达式的使用
    oracle执行cmd的实现方法
    php daodb插入、更新与删除数据
  • 原文地址:https://www.cnblogs.com/phpper/p/13408463.html
Copyright © 2011-2022 走看看