zoukankan      html  css  js  c++  java
  • golang-nsq高性能消息队列

    前言

    tips:如果本文对你有用,请爱心点个赞,提高排名,让这篇文章帮助更多的人。谢谢大家!比心❤~
    如果解决不了,可以在文末加我微信,进群交流。

    NSQ 是实时的分布式消息处理平台,其设计的目的是用来大规模地处理每天数以十亿计级别的消息。

    NSQ 具有分布式和去中心化拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。

    NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者有兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。

    网上有人翻译了国外的一篇文章:我们是如何使用NSQ处理7500亿消息的

    在这里插入图片描述

    安装和部署

    官网文档:https://nsq.io/overview/quick_start.html
    中文文档:http://wiki.jikexueyuan.com/project/nsq-guide/

    我是在ubuntu系统中按照官方操作进行部署测试。

    1. 安装nsq启动服务
      https://nsq.io/deployment/installing.html选择对应的版本,并解压。

      $ tar -zxvf nsq-1.2.0.linux-amd64.go1.12.9.tar.gz
      $ cd nsq-1.2.0.linux-amd64.go1.12.9/bin	
      $ sudo cp ~/Downloads/nsq-1.2.0.linux-amd64.go1.12.9/bin/ -r /usr/local/nsq/bin
      $ sudo vim /etc/profile
      $ source 
      
    2. 后台启动三个服务

      $ ./nsqlookupd > /dev/null 2>&1 &
      [1] 20076
      $ ./nsqd --lookupd-tcp-address=127.0.0.1:4160 > /dev/null 2>&1 &
      [2] 20420
      $ ./nsqadmin --lookupd-http-address=127.0.0.1:4161 > /dev/null 2>&1 &
      [3] 20620
      

      > -lookupd-tcp-address 为上面nsqlookupd的IP和tcp的端口4160
      > -lookupd-http-address 是http的端口也就是4161因为admin通过http请求来查询相关信息

    3. 基本概念

      nsqd:基本的节点

      nsqlookupd:汇总节点信息,提供查询和管理topic等服务
      nsqadmin:管理端展示UI界面,能有一个web页面去查看和操作

    4. 简单使用

      • 执行:curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'会创建一个test主题,并发送一个hello world消息
      • 外部通过:http://127.0.0.1:4171/进行访问可以看到NSQ的管理界面,非常的简洁,其中127.0.0.1为服务器IP
        在这里插入图片描述
        在这里插入图片描述
      • 使用./nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161消费test中刚才的消息,并输出到服务器/tmp目录中

    特性

    默认一开始消息不是持久化的
    nsq采用的方式时内存+硬盘的模式,当内存到达一定程度时就会将数据持久化到硬盘

    1. 如果将 --mem-queue-size 设置为 0,所有的消息将会存储到磁盘。
    2. 是即使服务器重启也会将当时在内存中的消息持久化
    3. 消息是没有顺序的
      这一点很关键,由于nsq使用内存+磁盘的模式,而且还有requeue的操作,所以发送消息的顺序和接收的顺序可能不一样
    4. 官方不推荐使用客户端发消息
      官方提供相应的客户端发送消息,但是HTTP可能更方便一些
    5. 没有复制
      nsq节点相对独立,节点与节点之间没有复制或者集群的关系。
    6. 没有鉴权相关模块
      当前release版本的nsq没有鉴权模块,只有版本v0.2.29+高于这个的才有
    7. 几个小点
      topic名称有长度限制,命名建议用下划线连接
      消息体大小有限制

    nsq优点&缺点

    优点:

    1. 部署极其方便,没有任何环境依赖,直接启动就行
    2. 轻量没有过多的配置参数,只需要简单的配置就可以直接使用
    3. 性能高
    4. 消息不存在丢失的情况

    缺点:

    1. 消息无顺序
    2. 节点之间没有消息复制
    3. 没有鉴权

    客户端

    官方提供了很多语言接入的客户端 https://nsq.io/clients/client_libraries.html
    针对消息生产者的客户端,官方还推荐直接使用post请求发送消息,如:
    curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test'
    表示向test主题发送hello world这个消息

    Golang的客户端

    deb安装

    $ curl https://raw.githubusercontent.com/golang/dep/master/install.sh | sh
    

    在这里插入图片描述

    依赖包下载:

    $ go get github.com/nsqio/go-nsq
    

    生产者:

    package main
    
    // 生产者
    import (
    	"fmt"
    
    	"github.com/nsqio/go-nsq"
    )
    
    var tcpNsqdAddr = "127.0.0.1:4150"
    
    func main() {
    	// 初始化配置
    	config := nsq.NewConfig()
    	for i := 0; i < 100; i++ {
    		// 创建100个生产者
    		tPro, err := nsq.NewProducer(tcpNsqdAddr, config)
    		if err != nil {
    			fmt.Printf("tPro new failed:%s", err)
    		}
    
    		// 主题
    		topic := "Insert"
    		// 主题内容
    		tCommand := "New data!"
    		// 发布消息
    		err = tPro.Publish(topic, []byte(tCommand))
    		if err != nil {
    			fmt.Printf("Publish failed:%s", err)
    		}
    	}
    }
    
    

    其中127.0.0.1:4150为发送消息的地址,消费者里面写的也是相同的地址就可以了。

    消费者:

    package main
    // 消费者
    import (
    	"fmt"
    	"sync"
    	"time"
    
    	"github.com/nsqio/go-nsq"
    )
    
    var tcpNsqdAddr = "127.0.0.1:4150"
    
    type NsqHandler struct {
    	// 消息数
    	msqCount int
    	// 标识id
    	nsqHandlerID string
    }
    
    func main() {
    	// 初始化配置
    	config := nsq.NewConfig()
    	// 创造消费者,参数一是订阅的主题,参数二是使用的通道
    	com, err := nsq.NewConsumer("Insert", "channel1", config)
    	if err != nil {
    		fmt.Println(err)
    	}
    
    	// 添加处理回调
    	com.AddHandler(&NsqHandler{nsqHandlerID: "One"})
    
    	// 连接对应的nsqd
    	err = com.ConnectToNSQD(tcpNsqdAddr)
    	if err != nil {
    		fmt.Println(err)
    	}
    
    	// 只是为了不结束进程,这里没有意义
    	var wg = &sync.WaitGroup{}
    	wg.Add(1)
    	wg.Wait()
    
    }
    
    // HandleMessage 实现HandleMessage方法
    // message是接收到的消息
    func (s *NsqHandler) HandleMessage(message *nsq.Message) error {
    	// 每接收到一条消息]+1
    	s.msqCount ++ 
    	// 打印输出信息和ID
    	fmt.Println(s.msqCount,s.nsqHandlerID)
    	// 打印消息的一些基本信息
    	fmt.Printf("msg.Timestamp=]%v,msg.nsqaddress=%s,msg.body=]%s", time.Unix(0,message.Timestamp).Format("2006-01-02 03:04:05"),message.NSQDAddress,string(message.Body))
    	return nil
    }
    
  • 相关阅读:
    ReflectionException: There is no getter for property named
    iframe发送post请求
    wget已安装但命令没找到
    linux性能观察命令
    ELK搭建
    python之中特性(attribute)与属性(property)有什么区别?
    Django中的日志详解
    创建fastdfs_nginx容器及nginx配置
    2. 顺序表 数据结构与算法(python)
    Ubuntu安装和卸载搜狗输入法
  • 原文地址:https://www.cnblogs.com/mengyilingjian/p/13410929.html
Copyright © 2011-2022 走看看