简介
NSQ是1个分布式(distributed)、可扩展(scalable)、配置简单(Ops Friendly)、可集成(integrated)、实时( realtime )的消息传递平台。即消息中间件。
可以将原本耦合、同步执行的程序 解耦成 生产端+ 消息队列+消费端模型的异步程序,加上分布式的生产者和消费者架构就可以在一定程度上支撑大并发。
NSQ是go语言开发的消息队列,它的设计目标是为在多台计算机上运行的松散服务提供一个现代化的基础设施骨架。
官方和第三方为NSQ开发了众多客户端功能库,如官方提供的基于HTTP的nsqd、Go客户端go-nsq、Python客户端pynsq、基于Node.js的JavaScript客户端nsqjs、异步C客户端libnsq、Java客户端nsq-java以及基于各种语言的众多第三方客户端功能库。
组成
NSQ主要是由3个进程组成的:
- nsqd是一个接收、排队、然后转发消息到客户端的进程。
- nsqlookupd 管理拓扑信息并提供最终一致性的发现服务。
- nsqadmin用于实时查看集群的统计数据(并且执行各种各样的管理任务)
Topic 和 Channel
每个nsqd实例旨在一次处理多个数据流。这些数据流称为“topics”,
1个topic具有1个或N个“channels”,所有的channels都会收到topic消息的副本,实际上topic是通过channel来消费它产生的消息的。
如果说topic是工厂,channel就是各种不同的销售渠道,那么生产者就是工人们,消费者就是来自许多不同渠道的客户们。
topic 由生产者端指定,channel由消费者端指定
topic
和channel
都相互独立地缓冲数据,防止缓慢的消费者导致其他chennel
的积压(同样适用于topic
级别)。
channel
可以并且通常会连接多个客户端。假设所有连接的客户端都处于准备接收消息的状态,则每条消息将被传递到随机客户端。例如:
部署
建议使用 docker 部署,插拔方便,不影响主机器。
创建一个文件夹,如 mkdir docker-compose-test
, 在其中创建如下 docker-compose.yml 文件。
docker-compose.yml 如下:
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
# networks: # 本地部署可以删除地址绑定这三行,下同
# nsqnetwork:
# ipv4_address: 172.20.0.2
ports:
- "9910:4160" # Tcp端口 广播
- "9911:4161" # Http端口 客户端发现和管理操作
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
# networks:
# nsqnetwork:
# ipv4_address: 172.20.0.3
ports:
- "9912:4150" # tcp服务客户端
- "9913:4151" # 提供http端口
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
# networks:
# nsqnetwork:
# ipv4_address: 172.20.0.4
ports:
- "9914:4171"
保存退出后,使用如下命令启动
docker-compose up
后台启动则可以 使用命令
docker-compose up -d
访问本地网址 127.0.0.1:9914 即可看到页面板。
代码使用
相关客户端支持可参看此链接
因为业务相关,我们一般采用的是 HTTP 的方式生产消息。使用方式如下:
curl -d "<message>" http://127.0.0.1:9913/pub?topic=name
采用上述命令生产一个消息给 nsq 后,可在 nsq 面板(127.0.0.1:9914)看到这条消息。
主页面
点击,即可进入内部查看具体信息。包括 NSQD 节点,具体的channel,队列中的消息数,连接数等。
nsqadmin
channel 。上图最下一行提示没有客户端(消费者)连接这个 channel。启动客户端即可。
客户端测试代码:
# nsq_consume_yezi.py
import nsq
def handler(message):
print(message)
print(message.body)
return True
r = nsq.Reader(message_handler=handler, nsqd_tcp_addresses=['127.0.0.1:9912'], topic='yezi_topic', channel='ceshi01', lookupd_poll_interval=15)
nsq.run()
使用 python 启动此代码,
面板也显示了客户端的连接。
其他
列出所有的 NSQD 节点:
消息的统计:
lookup主机的列表: