参考:
https://www.cnblogs.com/smartrui/p/12549562.html
https://blog.csdn.net/lldouble/article/details/82023125
https://blog.csdn.net/riba2534/article/details/106977687/
带你入门Go的消息队列NSQ
以前看到过NSQ这个东西,也一直没去看。今天刚好有时间就搭建了下,简单尝试了下这个Go语言下的消息队列NSQ,我这里简要记录下。
其实,NSQ国内用的是比较少的,我这里也是算了解这么个东西吧 ,稍微看下源码,学到东西而已。
NSQ简介
NSQ是一个基于Go语言的分布式实时消息平台, 它具有分布式、去中心化的拓扑结构,支持无限水平扩展。无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。另外,NSQ非常容易配置和部署, 且支持众多的消息协议。支持多种客户端,协议简单,如果有兴趣可参照协议自已实现一个也可以。
NSQ是一个基于Go语言的开源的分布式实时消息平台,他的代码托管在GitHub上。 NSQ可用于大规模系统的实时消息服务,它的设计目标是为在分布式环境下提供一个强大的去除中心化的分布式服务架构,可以每天处理数以亿计的实时消息。NSQ的优点是无单点故障、故障容错、高可用性和信息传递的高可靠性。NSQ安装部署简单,容易水平扩展,目前已有很多公司都是采用其作为自身企业内部的实时消息服务。而且它的灵活性很强,支持很多种协议。官方直接提供了拆箱可用的Go库和Python库。好,其他的就不废话了,随便搜索下NSQ相关的博文或者去NSQ的官方网站了解更详细的信息吧。
NSQ的几个组件
- nsqd:一个负责接收、排队、转发消息到客户端的守护进程
- nsqlookupd:管理拓扑信息, 用于收集nsqd上报的topic和channel,并提供最终一致性的发现服务的守护进程
- nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行相应的管理任务
- utilities:基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
相关网址
NSQ官网:NSQ官网
GitHub: Github
NSQ官方客户端:NSQ官方客户端
NSQ文档:NSQ文档
NSQ协议:NSQ协议
NSQ安装
NSQ的安装方式有好几种,可以通过二进制、源码、Docker、Brew等方式安装
二进制安装,可以到安装地址 上面下载对应平台的Release包,然后解压就行了。
如果是Mac电脑,直接用Brew安装
brew install nsq
如果是Docker的安装,就参照下上面那个网址吧,按照步骤操作就行了,我没有用的这个安装方式。
我是用的源码的安装方式,因为二进制的那个放在S3上面,下起来好慢,于是直接把Github的源代码下载来,这里也有一个好处就是可以看源码来跟踪学习。还方便些。
下载后的目录结构如下所示:
NSQ 运行
如果用源码运行,而不是Make后将可执行文件放到bin目录这种,那么下载后解决完所有的依赖包后,cd 进入到 nsqio/nsq/apps/nsqd目录后,可以执行 go run ./
或 go run main.go options.go
否则会报如下错误
nsqio/nsq/apps/nsqd/main.go:44:13: undefined: nsqdFlagSet
nsqio/nsq/apps/nsqd/main.go:54:10: undefined: config
其实进入到apps目录执行,最终还是会到 nsqio/nsq/nsqd这个下面去执行业务处理代码的,apps这里仅仅是用go-srv这个包进行了一层服务包装而已,变成守护和一些入口参数等。
$ go run ./
[nsqd] 2020/03/22 00:55:27.597911 INFO: nsqd v1.2.1-alpha (built w/go1.11.2)
[nsqd] 2020/03/22 00:55:27.597980 INFO: ID: 809
[nsqd] 2020/03/22 00:55:27.598396 INFO: TOPIC(test): created
[nsqd] 2020/03/22 00:55:27.598449 INFO: TOPIC(test): new channel(test)
[nsqd] 2020/03/22 00:55:27.598535 INFO: TOPIC(test): new channel(lc)
[nsqd] 2020/03/22 00:55:27.598545 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2020/03/22 00:55:27.599714 INFO: TCP: listening on [::]:4150
[nsqd] 2020/03/22 00:55:27.599806 INFO: HTTP: listening on [::]:4151
看到上面的提示,表示启动成功了,它会分别开放TCP和HTTP的端口,4150,4151可以通过配置或flag参数的方式更改, 同时它也支持TLS/SSL.
HTTP测试
启动nsqd后,可以用http来测试发送一条消息,可使用CURL来操作。
$ curl -d '这是一条测试消息' 'http://127.0.0.1:4151/pub?topic=test&channel=lc'
OK
NSQ消息模式
我们知道消息一般有推和拉模式,NSQ的消息模式为推的方式,这种模式可以保证消息的及时性,当有消息时可以及时推送出去。但是要根椐客户端的消耗能力和节奏去控制,NSQ是通过更改RDY的值来实现的。当没有消息时为0, 服务端推送消息后,客户端比如调用 updateRDY()这个方法改成3, 那么服务端推送时,就会根椐这个值做流控了。
发送消息是通过连接的TCP发出去的,先发到Topic下面,再转到Channel下面,最后从通道 memoryMsgChan 中取出msg,然后发出。
github.com/nsqio/nsq/nsqd/protocol_v2.go
func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
var err error
var memoryMsgChan chan *Message
var backendMsgChan <-chan []byte
var subChannel *Channel
// NOTE: `flusherChan` is used to bound message latency for
// the pathological case of a channel on a low volume topic
// with >1 clients having >1 RDY counts
var flusherChan <-chan time.Time
var sampleRate int32
subEventChan := client.SubEventChan
identifyEventChan := client.IdentifyEventChan
outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
heartbeatChan := heartbeatTicker.C
msgTimeout := client.MsgTimeout
...
...
case msg := <-memoryMsgChan:
if sampleRate > 0 && rand.Int31n(100) > sampleRate {
continue
}
msg.Attempts++
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
client.SendingMessage()
err = p.SendMessage(client, msg)
if err != nil {
goto exit
}
flushed = false
case <-client.ExitChan:
goto exit
}
NSQ还支持延时消息的发送,比如订单在30分钟未支付做无效处理等场景,延时使用的是heap包的优级先队列,实现了里面的一些方法。通过判断当前时间和延时时间做对比,然后从延时队列里面弹出消息再发送到channel中,后续流程和普通消息一样,我看网上有 人碰到过说延时消息会有并发问题,最后还用的Redis的ZSET实现的,所以不确定这个延时的靠不靠谱,要求不高的倒是可以试试。
curl -d '这是一条延迟消息' 'http://127.0.0.1:4151/pub?topic=test&channel=lc&defer=3000'
defer参数,单位:毫秒
NSQ消费
消费消息时,channel类似于kafka里面的消费组的概念,比如同一个channel。那么只会被一个实例消费,不会多个实例都能消费到那条消息,所以可用于消息的负载均衡, 我看到网上有人有疑惑就是他指定topic,然后再用不同的channel去消费,说怎么能收到其它channel的消息,不能直接过滤消息,其实channel不是用来过滤的。
NSQ发送的消息可以确保至少被一个消费者消费,它的消费级别为至少消费一次,为了确保消息消费,如果客户端超时、重新放入队列或重连等,重复消费是不可避免的,所以客户端业务流程一定要做消息的幂等处理。
客户端回复FIN 或者 REQ 表示成功或者重发。如果客户端未能及时发送,则NSQ将重复发送消息给该客户端。
另外,NSQ不像 Kafka,我们是能到消息的有序的,但NSQ不行,客户端收到的消费为无序的。虽然每条消息有一个时间戳,但如果对顺序有要求的,那就要注意了。所以,NSQ更适合处理数据量大但是彼此间没有顺序关系的消息。
NSQ的Go客户端
NSQ是支持多种形式的客户端的,像HTTP或客户端库来操作,而且官方其实还建议使用HTTP的方式,HTTP的方式,直接发GET或POST请求就行了。
这里Go的话,可使用go-nsq这个库,地址为:go-nsq :go-nsq
go get https://github.com/nsqio/go-nsq
发送消息
package main
import (
"errors"
"fmt"
"github.com/nsqio/go-nsq"
"time"
)
func main() {
var (
p1 *producer
p2 *producer
)
p1 = &producer{}
p2 = &producer{}
p1.producer,_ = InitProducer("127.0.0.1:4150")
p2.producer,_ = InitProducer("127.0.0.1:4150")
defer p1.producer.Stop()
defer p2.producer.Stop()
//p1.publish("test","hello!!!")
p1.deferredPublish("test", 10 * time.Second,"这是一条延迟消息?")
fmt.Println("done")
}
type producer struct {
producer *nsq.Producer
}
func(p *producer) publish(topic string,message string) (err error){
if message == "" {
return errors.New("message is empty")
}
if err = p.producer.Publish(topic,[]byte(message)); err != nil {
fmt.Println(err)
return err
}
return nil
}
// 延迟消息
func(p *producer) deferredPublish(topic string,delay time.Duration, message string) (err error){
if message == "" {
return errors.New("message is empty")
}
if err = p.producer.DeferredPublish(topic,delay, []byte(message)); err != nil {
fmt.Println(err)
return err
}
return nil
}
func InitProducer(addr string) (p *nsq.Producer,err error){
var (
config *nsq.Config
)
config = nsq.NewConfig()
if p, err = nsq.NewProducer(addr, config); err != nil {
return nil, err
}
return p, nil
}
消费消息
package main
import (
"encoding/json"
"fmt"
"github.com/nsqio/go-nsq"
)
//nsqio消费测试
type MyTestHandler struct {
q *nsq.Consumer
messageReceive int
}
func (h *MyTestHandler) HandleMessage(message *nsq.Message) error {
type Data struct {
}
var (
data *Data
err error
)
data = &Data{}
if err = json.Unmarshal(message.Body, data) ;err != nil {
fmt.Printf("Id:%s, Msg:%s
", message.ID, string(message.Body))
err = nil
}
message.Finish()
return nil
}
func initConsuemr(topic string, channel string) {
var (
config *nsq.Config
h *MyTestHandler
err error
)
h = &MyTestHandler{
}
config = nsq.NewConfig()
if h.q, err = nsq.NewConsumer(topic, channel, config); err != nil {
fmt.Println(err)
return
}
h.q.AddHandler(h)
if err = h.q.ConnectToNSQD("127.0.0.1:4150"); err != nil {
fmt.Println(err)
}
//<-h.q.StopChan
fmt.Println("stop")
return
}
func main() {
initConsuemr("test","test")
initConsuemr("test","lc")
select{}
}
总的来说,NSQ的消费是有保障的,能保证消息的可靠性。可用多个 nsqd和nsqlookupd做分布式集群等,使用Go的channel能够高并发消费,高吞吐量,而且,部署方面也简单。
不过,给我的感觉还是不如Kafka和RocketMQ这些专业的消息队列,不过在某些场景下还是够用的。这个就得根椐自已的情况去取舍了,毕竟,没有好的架构,只有合适的架构。
NSQ基本概念
核心概念
在讨论NSQ如何在实践中使用前,先理解NSQ队列的架构原理是非常值得的。它的设计很简单,可以通过几个核心概念来理解。
Topic ——一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。
Channels ——channel组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。
Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。
Message s——消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。
NSQ在操作期间同样运行着两个程序:
Nsqd ——nsqd守护进程是NSQ的核心部分,它是一个单独的监听某个端口进来的消息的二进制程序。每个nsqd节点都独立运行,不共享任何状态。当一个节点启动时,它向一组nsqlookupd节点进行注册操作,并将保存在此节点上的topic和channel进行广播。
客户端可以发布消息到nsqd守护进程上,或者从nsqd守护进程上读取消息。通常,消息发布者会向一个单一的local nsqd发布消息,消费者从连接了的一组nsqd节点的topic上远程读取消息。如果你不关心动态添加节点功能,你可以直接运行standalone模式。
Nsqlookupd ——nsqlookupd服务器像consul或etcd那样工作,只是它被设计得没有协调和强一致性能力。每个nsqlookupd都作为nsqd节点注册信息的短暂数据存储区。消费者连接这些节点去检测需要从哪个nsqd节点上读取消息。
消息的生命周期
让我们观察一个关于nsq如何在实际中工作的更为详细的例子。
NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。
首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中。
事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel,它们其中之一作为档案channel。消费者会获取这些消息并且上传到S3。
每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。
Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点。
然后每个worker向每个nsqd主机进行订阅操作,用于表明worker已经准备好接受消息了。这里我们不需要一个完整的连通图,但我们必须要保证每个单独的nsqd实例拥有足够的消费者去消费它们的消息,否则channel会被队列堆着。
从客户端库代码中抽取一部分,这里是一个关于如何处理我们的消息的一段代码:
如果因为某些原因第三方发生故障了,我们可以处理这些故障,在这个代码片中,我们有三种处理逻辑:
1、如果超过了某个尝试次数阀值,我们就将消息丢弃。
2、如果消息已经被处理成功了,我们就结束消息。
3、如果发生了错误,我们将需要传递的消息重新进行排队。
正如你所看到的,NSQ队列的行为既简单又明确。
在我们的案例中,我们在丢弃消息之前将容忍MAX_DELIVERY_ATTEMPTS * BACKOFF_TIME分钟的故障。
在Segment系统中,我们统计消息尝试的次数、消息丢弃数、消息重新排队数等等,然后结束某些消息以保证我们有一个好的服务质量。如果消息丢弃数超过了我们设置的阀值,我们将在任何时候对服务发出警报。
在实践中
在生产环境中,我们几乎在我们所有的实例中运行nsqd守护程序,发布者之间协同定位。NSQ在实际生产中运行良好有几个原因:
简单的协议 ——如果你的队列已经有了一个很好的客户端库,这个不是一个很大的问题,但如果你现在的客户端库存在bug或者过时了,一个简单的协议就能体现出优势了。
NSQ有一个快速的二进制协议,通过短短的几天工作量就可以很简单地实现这些协议,我们还自己创建了我们的纯JS驱动(当时只存在coffeescript驱动),这个纯JS驱动运行的很稳定可靠。
运行简单 ——NSQ没有复杂的水印设置或JVM级别的配置,相反,你可以配置保存到内存中的消息的数量和消息最大值,如果队列被消息填满了,消息会被保存到磁盘上。
分布式 ——因为NSQ没有在守护程序之间共享信息,所以它从一开始就是为了分布式操作而生。个别的机器可以随便宕机随便启动而不会影响到系统的其余部分,消息发布者可以在本地发布,即使面对网络分区。
这种“分布式优先”的设计理念意味着NSQ基本上可以永远不断地扩展,需要更高的吞吐量?那就添加更多的nsqd吧。
唯一的共享状态就是保存在lookup节点上,甚至它们不需要全局视图,配置某些nsqd注册到某些lookup节点上这是很简单的配置,唯一关键的地方就是消费者可以通过lookup节点获取所有完整的节点集。
清晰的故障事件——NSQ在组件内建立了一套明确关于可能导致故障的的故障权衡机制,这对消息传递和恢复都有意义。
我是最少意外原则的坚定信仰者,尤其是当它涉及到分布式系统时。系统发生故障,我们接收它,但我们不可能会指望系统以意外的形式发生故障,你最终会忽略这些故障案例,因为你甚至都不打算考虑它们为什么会发生。
虽然它们可能不像Kafka系统那样提供严格的保证级别,但NSQ简单的操作使故障情况非常明显。
UNIX-y工具 ——NSQ是一个很好的通用型工具,所以NSQ附带了很多实用的程序,这些程序是多用途和可组合的。
除了TCP协议,NSQ提供一个简单的CURL的HTTP接口用于维护操作,它从CLI附带了二进制文件管道,用tail跟踪队列的尾部,从一个队列使用管道到另外一个队列,还有HTTP发布订阅。
甚至还有一个用于监控和暂停队列的管理面板,包括一个动态的计数器在上面。
丢失了什么?
正如我所提到的,简单并不是没有折衷:
没有复制 ——不像其他的队列组件,NSQ并没有提供任何形式的复制和集群,也正是这点让它能够如此简单地运行,但它确实对于一些高保证性高可靠性的消息发布没有足够的保证。
我们可以通过降低文件同步的时间来部分避免,只需通过一个标志配置,通过EBS支持我们的队列。但是这样仍然存在一个消息被发布后马上死亡,丢失了有效的写入的情况。
基本消息路由 ——在NSQ中,topic和channel几乎是你所有能获得到的东西,没有关于路由和基于key的亲和力的观念。我们很乐意为各种用例提供支持,无论是根据条件去筛选消息,还是根据条件路由到某些节点上。取而代之的是,我们最终建立了路由worker,它们处于队列之间,扮演一个聪明的直通滤波器。
没有严格的顺序 ——虽然Kafka由一个有序的日志构成,但NSQ不是。消息可以在任何时间以任何顺序进入队列。在我们使用的案例中,这通常没有关系,因为所有的数据都被加上了时间戳,但它并不适合需要严格顺序的情况。
无数据重复删除功能 ——Aphyr已经在他的文章中广泛探讨了基于超时系统的危险性。NSQ同样也调入了这个陷阱,它使用了心跳检测机制去测试消费者是否存活还是死亡。我们之前已经写过关于很多原因会导致我们的worker无法完成心跳检测,所以在worker中必须有一个单独的步骤确保幂等性。
简单的工作原理
正如你所看到的,后面看到的所有好处的基本核心就是简单性,NSQ是一个简单的队列,这意味着它很容易进行故障推理和很容易发现bug。消费者可以自行处理故障事件而不会影响系统剩下的其余部分。
事实上,简单性是我们决定使用NSQ的首要因素,这方便与我们的许多其他软件一起维护,通过引入队列使我们得到了堪称完美的表现,通过队列甚至让我们增加了几个数量级的吞吐量。
今天,我们面临一个更加复杂的未来,我们越来越多的worker需要一套严格可靠性和顺序性保障,这已经超过了NSQ提供的简单功能。
我们计划在其他基础设施中用Kafka替换NSQ,在生产上从JVM中运行可以获取更多的好处。关于Kafka我们有一个明确的权衡,我们自己必须肩负起更多负责的运营。另一方面,它拥有一个可复制的、有序的日志可以提供给我们更好的服务。
但对于其他适合NSQ的worker,它为我们服务的相当好,我们期待着继续巩固它的坚实的基础。
NSQ使用入门
简介
消息队列是进程间通信或同一进程不同线程间进行通信一种方式,可以将服务异步化,对流量进行整形,削峰填谷,是高并发、大数据场景下不可或缺的中间件;使得消息生产者和消费者解耦,方便系统模块化设计。
NSQ是一个基于Go语言的分布式实时消息平台,它基于MIT开源协议发布,由bitly公司开源出来的一款简单易用的消息中间件。 官方和第三方还为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库。
特性
-
Distributed NSQ提供了分布式的,去中心化,且没有单点故障的拓扑结构,稳定的消息传输发布保障,能够具有高容错和HA(高可用)特性。
-
Scalable易于扩展NSQ支持水平扩展,没有中心化的brokers。内置的发现服务简化了在集群中增加节点。同时支持pub-sub和load-balanced 的消息分发。
-
Ops FriendlyNSQ非常容易配置和部署,生来就绑定了一个管理界面。二进制包没有运行时依赖。官方有Docker image。
-
Integrated高度集成官方的 Go 和 Python库都有提供。而且为大多数语言提供了库。
-
消息不持久化,全在内存。可配置–mem-queue-size来超出阈值的消息写到硬盘
-
保证at least once投递。nsqd没挂的前提下,由于超时,连接断开,重新入队等原因,消息可能多次投递,client自己保证消费消息的操作是具有幂等性的。
-
值得注意的是,重要的是 nsqd 和 nsqlookupd 守护进程被设计成独立运行,没有相互之间的沟通或协调。
概念
Topic :一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。
Channels :channel与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。
Messages:消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。
nsqd:nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。
nsqlookupd:nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。注:是消费者查询去哪里取消息,而不是生产者查询生产消息到哪里去
每个nsqd具有与nsqlookupd的长期TCP连接,在该连接上它定期推送其状态。此数据用于通知nsqlookupd将为消费者提供哪些nsqd地址。对于消费者,将公开HTTP /查找端点以进行轮询。
对于nsqlookupd,通过运行多个实例来实现高可用性。它们不直接相互通信,数据被认为最终是一致的。消费者轮询所有已配置的nsqlookupd实例并将响应联合起来。陈旧,不可访问或其他故障节点不会使系统停止运行。
nsqadmin:nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。 常用工具类:
nsq_to _file:消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。
nsq_to _http:消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。
nsq_to _nsq:消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。
拓扑结构
NSQ推荐通过他们相应的nsqd实例使用协同定位发布者,这意味着即使面对网络分区,消息也会被保存在本地,直到它们被一个消费者读取。更重要的是,发布者不必去发现其他的nsqd节点,他们总是可以向本地实例发布消息。
首先,一个发布者向它的本地nsqd发送消息,要做到这点,首先要先打开一个连接,然后发送一个包含topic和消息主体的发布命令,在这种情况下,我们将消息发布到事件topic上以分散到我们不同的worker中。 事件topic会复制这些消息并且在每一个连接topic的channel上进行排队,在我们的案例中,有三个channel,它们其中之一作为档案channel。消费者会获取这些消息并且上传到S3。
每个channel的消息都会进行排队,直到一个worker把他们消费,如果此队列超出了内存限制,消息将会被写入到磁盘中。Nsqd节点首先会向nsqlookup广播他们的位置信息,一旦它们注册成功,worker将会从nsqlookup服务器节点上发现所有包含事件topic的nsqd节点。
然后每个worker向每个nsqd主机进行订阅操作,用于表明worker已经准备好接受消息了。这里我们不需要一个完整的连通图,但我们必须要保证每个单独的nsqd实例拥有足够的消费者去消费它们的消息,否则channel会被队列堆着。