前言
NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,代码托管在GitHub,其当前最新版本是0.3.1版。NSQ可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。NSQ具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用Go和Python库。如果读者兴趣构建自己的客户端的话,还可以参考官方提供的协议规范。
NSQ是由四个重要组件构成:
- nsqd:一个负责接收、排队、转发消息到客户端的守护进程
- nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
- nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
- utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
NSQ的主要特点如下:
- 具有分布式且无单点故障的拓扑结构 支持水平扩展,在无中断情况下能够无缝地添加集群节点
- 低延迟的消息推送,参见官方提供的性能说明文档
- 具有组合式的负载均衡和多播形式的消息路由
- 既擅长处理面向流(高吞吐量)的工作负载,也擅长处理面向Job的(低吞吐量)工作负载
- 消息数据既可以存储于内存中,也可以存储在磁盘中
- 实现了生产者、消费者自动发现和消费者自动连接生产者,参见nsqlookupd
- 支持安全传输层协议(TLS),从而确保了消息传递的安全性
- 具有与数据格式无关的消息结构,支持JSON、Protocol Buffers、MsgPacek等消息格式
- 非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置)
- 使用了简单的TCP协议且具有多种语言的客户端功能库
- 具有用于信息统计、管理员操作和实现生产者等的HTTP接口
- 为实时检测集成了统计数据收集器StatsD
- 具有强大的集群管理界面,参见nsqadmin
为了达到高效的分布式消息服务,NSQ实现了合理、智能的权衡,从而使得其能够完全适用于生产环境中,具体内容如下:
- 支持消息内存队列的大小设置,默认完全持久化(值为0),消息即可持久到磁盘也可以保存在内存中
- 保证消息至少传递一次,以确保消息可以最终成功发送
- 收到的消息是无序的, 实现了松散订购
- 发现服务nsqlookupd具有最终一致性,消息最终能够找到所有Topic生产者
官方文档:https://nsq.io/overview/quick_start.html
一、Windows 环境静态方式安装
1、下载安装 nsq
下载最新版本:https://github.com/nsqio/nsq/releases/download/v1.0.0-compat/nsq-1.0.0-compat.windows-amd64.go1.8.tar.gz
直接解压(重命名)放在自己的磁盘位置(D盘)
2、配置环境变量path
3、新建CMD命令行
运行命令:nsqlookupd
4、再新建一个CMD命令行
运行命令:nsqd --lookupd-tcp-address=127.0.0.1:4160
5、再新建一个CMD命令行
运行命令:nsqadmin --lookupd-http-address=127.0.0.1:4161
6、再新建一个CMD命令行(windows上用git bash)发布一个带有初始化信息的topic,
运行命令:curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
7、再新建一个CMD命令行,开始 nsq_to_file(消费者开始消费这些消息)
运行命令:nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
注意以上信息保存在你本地C盘下
9、发送更多的信息
查看磁盘消费信息内容
10、打开admin后台,可以查看更多信息
地址栏运行:http://127.0.0.1:4171/
使用go编写一个客户端
1、安装官网客户端链接,这里以go 库为案例
(1)go-nsq扩展库地址:https://github.com/nsqio/go-nsq
(2)使用go get方式安装:go get -u github.com/nsqio/go-nsq
2、开始编写代码
(1)订阅:go_client_sub.go
package main
import (
"fmt"
"sync"
"github.com/nsqio/go-nsq"
)
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}
func testNSQ() {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config:=nsq.NewConfig()
config.MaxInFlight=9
//建立多个连接
for i := 0; i<10; i++ {
consumer, err := nsq.NewConsumer("test", "nsq_to_Tinywan", config)
if nil != err {
fmt.Println("err", err)
return
}
consumer.AddHandler(&NSQHandler{})
err = consumer.ConnectToNSQD("127.0.0.1:4150")
if nil != err {
fmt.Println("err", err)
return
}
}
select{}
}()
waiter.Wait()
}
func main() {
testNSQ();
}
(2)发布:go_client_pub.go
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "127.0.0.1:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("Hello Tinywan 0002"))
if err != nil {
panic(err)
}
}
(3)测试结果
3、查看nsqadmin 后台监控
代码订阅修改频道(channel)信息:consumer, err := nsq.NewConsumer("test", "nsq_to_Tinywan", config)
当然我们发送的消息同时发送给文件了,打开文件内容如下
打开的所有测试窗口如下所示
二、Linux 下使用docker 搭建环境
1、官方docker文档:https://nsq.io/deployment/docker.html
2、请提前在你的虚拟主机安装好 docker 服务
3、使用 docker pull ,从镜像仓库中拉取或者更新指定镜像
$ docker pull nsqio/nsq
Using default tag: latest
latest: Pulling from nsqio/nsq
709515475419: Pull complete
efd1c5a69d15: Pull complete
fa61d00bb52d: Pull complete
Digest: sha256:fad1937a88fec5b66fb9f4837b72ad3b70012692826aed5c6435f93c5a23b690
Status: Downloaded newer image for nsqio/nsq:latest
4、查看镜像
~$ docker images
REPOSITORY TAG IMAGE ID CREATED SIZE
ubuntu 17.10 14107f6d2c97 3 weeks ago 99.1MB
nginx latest b175e7467d66 3 weeks ago 109MB
hello-world latest f2a91732366c 5 months ago 1.85kB
nsqio/nsq latest 2714222e1b39 13 months ago 55.8MB
5、docker 运行 nsqlookupd
$ docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
[nsqlookupd] 2018/05/06 05:37:27.120708 nsqlookupd v1.0.0-compat (built w/go1.8)
[nsqlookupd] 2018/05/06 05:37:27.121254 TCP: listening on [::]:4160
[nsqlookupd] 2018/05/06 05:37:27.121291 HTTP: listening on [::]:4161
PS:如果官方的是lookupd 会导致 nsqadmin 连接失败,修改成了:nsq
lookupd
6、docker 运行 nsqd
$ docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=139.224.239.21 --lookupd-tcp-address=139.224.239.21:4160
[nsqd] 2018/05/06 05:32:10.163609 nsqd v1.0.0-compat (built w/go1.8)
[nsqd] 2018/05/06 05:32:10.163642 ID: 302
[nsqd] 2018/05/06 05:32:10.163677 NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2018/05/06 05:32:10.167041 TCP: listening on [::]:4150
[nsqd] 2018/05/06 05:32:10.167078 HTTP: listening on [::]:4151
说明:上面的http和tcp连接地址我都是写的公网IP地址,端口号默认不变
7、docker 运行 nsqadmin
$ docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=139.224.239.21:4161
[nsqadmin] 2018/05/06 05:38:41.011751 nsqadmin v1.0.0-compat (built w/go1.8)
[nsqadmin] 2018/05/06 05:38:41.011901 HTTP: listening on [::]:4171
使用 docker ps 命令列出已经运行中的容器
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED PORTS NAMES
6534ccee1e6a nsqio/nsq "/nsqlookupd" 2 seconds ago 4150-4151/tcp, 4170-4171/tcp, 0.0.0.0:4160-4161->4160-4161/tcp lookupd
1e642cf63071 nsqio/nsq "/nsqadmin --lookupd…" About a minute ago 4150-4151/tcp, 4160-4161/tcp, 4170/tcp, 0.0.0.0:4171->4171/tcp nsqadmin
9d5e0678077a nsqio/nsq "/nsqd --broadcast-a…" About a minute ago 4160-4161/tcp, 0.0.0.0:4150-4151->4150-4151/tcp, 4170-4171/tcp nsqd
8、使用Nginx 做一个域名代理,代理nsqadmin
server {
server_name nsqadmin.tinywan.com;
location / {
proxy_pass http://139.224.239.21:4171;
#Proxy Settings
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}
PS:这一步为非必须,只是为了好看而已,如果域名的话,配置个还是很好的
9、浏览器访问nsqadmin,以下表示配置ok
10、通过不同的方式生产一个新消息
(1)在Linux服务端通过curl 生产一个新消息
curl -d 'Hello Linux localhost Msg' 'http://127.0.0.1:4151/pub?topic=test'
(2)在windows本地环境,在git环境中通过curl 生产一个新消息
$ curl -d "hello windows MSG" "http://nsqadmin.tinywan.com:4151/pub?topic=test"
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 19 100 2 100 17 64 548 --:--:-- --:--:-- --:--:-- 548OK
(3)通过go语言脚本发送一条信息
package main
import (
"github.com/nsqio/go-nsq"
)
var producer *nsq.Producer
func main() {
nsqd := "139.224.239.21:4150"
producer, err := nsq.NewProducer(nsqd, nsq.NewConfig())
producer.Publish("test", []byte("Hello Go Client Msg"))
if err != nil {
panic(err)
}
}
11、开始消费消息
由于这里docker 消费消息需要挂在一个虚拟磁盘,而我没有,所以换是直接通过上面演示的go代码进行测试(消费消息)。
(1)通过以上的 go_client_sub.go 脚本去跑这个消费信息,注意修改host 为当前nsq服务器的IP,而不是本地
err = consumer.ConnectToNSQD("139.224.239.21:4150")
(2)修改要订阅的频道
consumer, err := nsq.NewConsumer("test", "nsq_to_Linux", config)
(3)开始运行脚本
D:Tinywango-socket>go run go_client_sub.go
2018/05/06 14:12:57 INF 1 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 2 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 3 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 4 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 5 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 6 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 7 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 8 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 9 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
2018/05/06 14:12:57 INF 10 [test/nsq_to_Linux] (139.224.239.21:4150) connecting to nsqd
receive 139.224.239.21:4150 message: Hello Linux localhost Msg
receive 139.224.239.21:4150 message: Hello windows MSG
receive 139.224.239.21:4150 message: Hello Go Client Msg
以上表示消费的不同的消息
12、在看看监控页面
PS:以上可以很清楚的看到接收到3条信息了
13、使用docker 需要注意
(1)docker 没有在后台进程跑的时候(没有加参数 -d),也就是测试的时候,如:docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
(2)如果使用 ctrl + c 退出后,继续启动提示错误:
docker run --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
docker: Error response from daemon: Conflict. The container name "/lookupd" is already in use by container "35bb6a3e51d367db846d376580d5b8e389d419e48e2f3c330c4e0dd093a218c3". You have to remove (or rename) that container to be able to reuse that name.
See 'docker run --help'.
PS:开启一个容器然后先停止、删除后才可以继续重启,请执行以下命令后重新运行命令
(3)停止:docker stop lookupd
(4)移除:docker rm lookupd
三、基于ThinkPHP5.0 的一个demo
<?php
namespace appindexcontroller;
use thinkController;
class NsqController extends Controller
{
public function index()
{
ini_set('memory_limit', '8M');
$nsqdAddr = [
"127.0.0.1:4151",
"127.0.0.1:4150"
];
$nsq = new Nsq();
$isTrue = $nsq->connectNsqd($nsqdAddr);
for ($i = 0; $i < 6; $i++) {
$nsq->publish("test", "Hi Tinywan");
}
$nsq->closeNsqdConnection();
halt($isTrue);
// Deferred publish
//function : deferredPublish(string topic,string message, int millisecond);
//millisecond default : [0 < millisecond < 3600000]
$deferred = new Nsq();
$isTrue = $deferred->connectNsqd($nsqdAddr);
for ($i = 0; $i < 20; $i++) {
$deferred->deferredPublish("test", "message daly", 3000);
}
$deferred->closeNsqdConnection();
}
public function nsqSubMessage()
{
$nsq_lookupd = new NsqLookupd("127.0.0.1:4161"); //the nsqlookupd http addr
$nsq = new Nsq();
$config = array(
"topic" => "test",
"channel" => "struggle",
"rdy" => 2, //optional , default 1
"connect_num" => 1, //optional , default 1
"retry_delay_time" => 5000, //optional, default 0 , if run callback failed, after 5000 msec, message will be retried
"auto_finish" => true, //default true
);
$nsq->subscribe($nsq_lookupd, $config, function ($msg, $bev) {
echo $msg->payload . "
";
echo $msg->attempts . "
";
echo $msg->messageId . "
";
echo $msg->timestamp . "
";
});
}
}
使用命令行模式,在console 显示信息
php think pay nsq
Connect succeed
nihao
1
09f0f1ae3bef9002
1530525939845171789
Hi Tinywan
1
09f0f1c2262f9000
1530525961228600358
Hi Tinywan
1
09f0f1c2266f9001
1530525961229073539
Hi Tinywan
1
09f0f1ec952f9000
1530526006791156404
Hi Tinywan
1
09f0f1ec956f9001
1530526006792159596
Hi Tinywan
1
09f0f1ec95af9000
1530526006792847311
四、集群搭建
1、主服务器IP地址(公网):59.110.213.20
(1)拉取NSQ镜像
docker pull nsqio/nsq
(2)查看nsq镜像
docker images
(3)启动nsqlookupd服务
docker run -d --name lookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd
(4)开启nsqadmin管理系统
docker run -d --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=59.110.213.20:4161
nsqadmin可以部署在任何一个安装有nsq服务的机器上,只需要指定唯一的 --lookupd-http-address 服务IP地址
(5)开启一个nsqd节点服务
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=59.110.213.20 --lookupd-tcp-address=59.110.213.20:4160
- -d 表示守护进程
- --broadcast-address:当前服务器IP地址,
- --lookupd-tcp-address:指向的lookupd服务器IP地址
2、从服务器IP地址(公网):47.99.94.49
开启一个nsqd节点服务
docker run -d --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=47.99.94.49 --lookupd-tcp-address=59.110.213.20:4160
- -d 表示守护进程
- --broadcast-address:当前服务器IP地址,
- --lookupd-tcp-address:指向的lookupd服务器IP地址
3、测试主从
打开admin 管理平台
可看出已经部署的两个nsqd 节点以及广播地址
打开任意一个shell终端(本测试为Windows 系统的bash )
主
tinyw@DESKTOP-HH02AC6 MINGW64 ~/Desktop
$ curl -d 'Hello Nsq 20-2' 'http://59.110.213.20:4151/pub?topic=Tinywan_20'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 16 100 2 100 14 32 225 --:--:-- --:--:-- --:--:-- 258OK
从
tinyw@DESKTOP-HH02AC6 MINGW64 ~/Desktop
$ curl -d 'Hello Nsq 49-2' 'http://47.99.94.49:4151/pub?topic=Tinywan_49'
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 16 100 2 100 14 125 875 --:--:-- --:--:-- --:--:-- 1000OK
admin 测试结果
注意点:消费消息是根据节点Id去消费,单独分开的,在20上订阅的,在49上是没法消费的