zoukankan      html  css  js  c++  java
  • 高性能消息队列NSQ

    前言

    最近我再网上寻找使用golang实现的mq,因为我知道golang一般实现的应用部署起来很方便,所以我就找到了一个叫做nsq的mq,其实它并不能完全称为队列,但是它的轻量和性能的高效,让我真的大开眼界。

    如果你有兴趣,我觉得也可以了解一下:
    网上有人翻译了国外的一篇文章:
    我们是如何使用NSQ处理7500亿消息的

    安装和部署

    官网提供

    如果你有能力的话直接阅读官方的说明进行操作就可以了
    https://nsq.io/overview/quick_start.html

    如果看不懂我还找到了中文翻译过的:

    http://wiki.jikexueyuan.com/project/nsq-guide/

    简单部署

    下面是我使用的最快部署测试方式,使用服务器环境centos7.4,防火墙开放端口4160,4161,4151
    4171
    1、在下载页面下载对应版本(可能有的时候需要科学上网)
    https://nsq.io/deployment/installing.html
    这里使用linux版本
    nsq-1.1.0.linux-amd64.go1.10.3.tar.gz

    2、将包上传至服务器后解压;
    tar xvf nsq-1.1.0.linux-amd64.go1.10.3.tar.gz

    3、进入bin目录 cd nsq-1.1.0.linux-amd64.go1.10.3/bin

    4、后台启动三个服务
    nohup ./nsqlookupd > /dev/null 2>&1 &
    nohup ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
    nohup ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &

    简单使用

    1、使用
    curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
    会创建一个test主题,并发送一个hello world消息

    2、外部通过:http://127.0.0.1:4171/
    进行访问可以看到NSQ的管理界面,非常的简洁
    其中127.0.0.1为服务器IP

    3、使用
    ./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
    消费test中刚才的消息,并输出到服务器/tmp目录中

    特性

    官方给出的文档给出了很多特性的说明,针对于一个MQ来说,我认为下面几个特性是你必须知道的:

    • 默认一开始消息不是持久化的
      nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘
      1、如果将 --mem-queue-size 设置为 0,所有的消息将会存储到磁盘。
      2、但是即使服务器重启也会将当时在内存中的消息持久化

    • 消息是没有顺序的
      这一点很关键,由于nsq使用内存+磁盘的模式,而且还有requeue的操作,所以发送消息的顺序和接收的顺序可能不一样

    • 官方不推荐使用客户端发消息
      官方提供相应的客户端发送消息,但是HTTP可能更方便一些

    • 没有复制
      nsq节点相对独立,节点与节点之间没有复制或者集群的关系。

    • 没有鉴权相关模块
      当前release版本的nsq没有鉴权模块,只有版本v0.2.29+高于这个的才有

    • 几个小点
      topic名称有长度限制,命名建议用下划线连接;
      消息体大小有限制;

    优缺点

    优点:
    1、部署极其方便,没有任何环境依赖,直接启动就行
    2、轻量没有过多的配置参数,只需要简单的配置就可以直接使用
    3、性能高
    4、消息不存在丢失的情况

    缺点:
    1、消息无顺序
    2、节点之间没有消息复制
    3、没有鉴权

    多节点部署

    基本概念

    nsqd:基本的节点
    nsqlookupd:汇总节点信息,提供查询和管理topic等服务
    nsqadmin:管理端展示UI界面,能有一个web页面去查看和操作

    结构


    最简单的多节点部署可以是这样的一个结构

    部署步骤和命令

    PS:后台启动使用nohup即可,下面只是为了说明启动方式和命令参数

    第一步需要启动nsqlookupd
    ./nsqlookupd
    默认占用4161和4160两个端口
    使用-http-address和-tcp-address可以修改

    第二步启动两个nsqd
    ./nsqd -lookupd-tcp-address=192.168.1.102:4160 -broadcast-address=192.168.1.103 -data-path="/temp/nsq"
    其中
    -lookupd-tcp-address为上面nsqlookupd的IP和tcp的端口4160
    -broadcast-address我填写的是自己的IP,这个IP官网上写的是会注册到nsqlookupd
    -data-path为消息持久化的位置

    第三步启动nsqadmin
    ./nsqadmin -lookupd-http-address=192.168.4.102:4161
    同样需要指定-lookupd-http-address但是这次是http的端口也就是4161因为admin通过http请求来查询相关信息

    后续扩展

    上面只是最简单的两个节点的部署,如果后续想扩展就会如下

    其中nginx是可以不需要的,你可以果断选择同时向多个节点发送消息,或者当消息没有处理的时候重新进行发送,因为这样也是nsq设计之初的考虑。你也可以根据自己的需要设计你自己的架构。

    客户端

    官方提供了很多语言接入的客户端 https://nsq.io/clients/client_libraries.html
    针对消息生产者的客户端,官方还推荐直接使用post请求发送消息,如:
    curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
    表示向test主题发送hello world这个消息

    下面介绍两种客户端,一种是golang的客户端,一种是java的客户端

    Golang的客户端

    其中192.168.4.102:4150为发送消息的地址,消费者里面写的也是相同的地址就可以了。

    生产者:

    package main
    
    import (
        "github.com/nsqio/go-nsq"
        "time"
    )
    
    func main() {
        for i := 0 ; i < 10; i++  {
            sendMessage()
        }
        time.Sleep(time.Second * 10)
    }
    
    func sendMessage() {
        url := "192.168.4.102:4150"
        producer, err := nsq.NewProducer(url, nsq.NewConfig())
        if err != nil {
            panic(err)
        }
        err = producer.Publish("test", []byte("hello world"))
        if err != nil {
            panic(err)
        }
        producer.Stop()
    }

    消费者:

    package main
    
    import (
        "fmt"
        "github.com/nsqio/go-nsq"
        "sync"
    )
    
    func main() {
        testNSQ()
    }
    
    type NSQHandler struct {
    }
    
    func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
        fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
        return nil
    }
    
    func testNSQ() {
        url := "192.168.4.102:4150"
        
        waiter := sync.WaitGroup{}
        waiter.Add(1)
    
        go func() {
            defer waiter.Done()
            config:=nsq.NewConfig()
            config.MaxInFlight=9
    
            for i := 0; i<10; i++ {
                consumer, err := nsq.NewConsumer("test", "struggle", config)
                if nil != err {
                    fmt.Println("err", err)
                    return
                }
    
                consumer.AddHandler(&NSQHandler{})
                err = consumer.ConnectToNSQD(url)
                if nil != err {
                    fmt.Println("err", err)
                    return
                }
            }
            select{}
        }()
    
        waiter.Wait()
    }

    Java的客户端

    说实话java的客户端确实用的人比较少,因为我看到实际在github上面的星星和关注就比较少,所以客户端多多少少都存在一些问题。nsq-j和JavaNSQClient是官方排的考前的客户端。
    这里说一下nsq-j
    https://github.com/sproutsocial/nsq-j

    生产者

    Publisher publisher = new Publisher("192.168.4.102:4150");
    System.out.print(publisher);
    
    byte[] data = "Hello nsq".getBytes();
    publisher.publish("example_topic", data);
    publisher.publish("example_topic", data);
    
    // 注意这里需要这样关闭,不然的话就阻塞住了
    publisher.getClient().stop();

    消费者

    public class PubExample {
    
        public static void handleData(byte[] data) {
            System.out.println("Received:" + new String(data));
        }
    
        public static void main(String[] args) {
            Subscriber subscriber = new Subscriber("192.168.4.102:4161");
            subscriber.subscribe("test", "struggle", PubExample::handleData);
        }
    }

    需要注意的是其中192.168.4.102:4161这个是nsqlookupd的http地址和端口和生产者是不一样的

    java客户端是根据nsqlookupd来找到对应消费端口

    所以启动nsqlookupd的时候需要注意,启动nsqd需要加上参数--broadcast-address
    如:./nsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=192.168.4.102

    这样java消费者才能找到对应的地址否则会出现
    ERROR com.sproutsocial.nsq.Subscription - error connecting to:localhost.localdomain:4150
    java.net.UnknownHostException: localhost.localdomain
    这样类似的错误

    我建议的客户端

    官方也说了,发送消息其实不建议使用客户端,而建议使用http请求,所以我自己是使用okhttp进行消息的发送,案例如下:

    OkHttpClient client = new OkHttpClient();
    
    MediaType mediaType = MediaType.parse("application/json");
    RequestBody body = RequestBody.create(mediaType, "{"code": 1}");
    
    Request request = new Request.Builder()
    .url("http://192.168.4.102:4151/pub?topic=test")
    .post(body)
    .addHeader("Content-Type", "application/json")
    .build();
    
    Response response = client.newCall(request).execute();
    System.out.println(response);

    当然这里没有对client进行配置,这就涉及okhttp了,这里不再赘述

    至于消费端还是使用nsq-j的

    总结

    使用下来我们可以看到,nsq为了提供性能在一些方面是做出了妥协的,我们可以总结出下面几个方面供大家参考:
    1、暂时nsq的鉴权功能在高版本才支持,但是高版本没有release所以建议nsq在内网环境下使用,或者在一些安全的端口使用,避免被攻击
    2、部署节点在3个以上,nsq已经对于消息丢失做了很多的考虑,基本上不会出现丢失的情况,在你考虑幂等性的情况下,同时部署多个节点有利于消息进行处理
    3、如果对消息顺序有要求的情况下,nsq是不能使用的,因为nsq不能保证消息的顺序
    4、节点之间没有消息复制,所以即使多个节点部署,万一节点出现问题,还是有一段时间会出现消息无法接收到的情况,所以向多个节点同时发送消息也是一种解决方式
    5、因为nsq抛弃了一些东西,那么所带来的自然是方便,整体使用下来主要感受就是轻量,部署和配置都很方便,而且对于节点的监控能有界面

    希望后续nsq能在几个版本更新之后能给我们带来更加牛逼的表现。

  • 相关阅读:
    vue.js中英文api
    easyui combobox重复渲染问题
    大数据新兴思维
    机器学习技法 之 矩阵分解(Matrix Factorization)
    机器学习技法 之 终章(Final)
    CMake 中文简易手册
    线性判别分析(Linear Discriminat Analysis)
    梯度提升机(Gradient Boosting Machine)之 XGBoost
    机器学习技法 之 梯度提升决策树(Gradient Boosted Decision Tree)
    Host是什么?如何设置host文件?
  • 原文地址:https://www.cnblogs.com/linkstar/p/10341685.html
Copyright © 2011-2022 走看看