NSQ介绍
实时的分布式消息处理平台,nsq设计的目的是用来大规模地处理每天数以十亿计级别的消息。
官网:https://nsq.io/overview/quick_start.html
client端:https://nsq.io/clients/client_libraries.html
GitHub:https://github.com/nsqio/nsq
组件构成
nsq有三个组件以及辅助的几个工具构成。
nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。
它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。
- 服务启动后有两个端口:一个给客户端,另一个是 HTTP API。还能够开启HTTPS。
- 同一台服务器启动多个nsqd,要注意端口和数据路径必须不同,包括:–lookupd-tcp-address、 -tcp-address、–data-path。
- 删除topic、channel需要http api调用。
nsqlookupd 是守护进程,负责管理拓扑信息并提供最终一致性的发现服务。
客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。
- 该服务运行后有两个端口:TCP 接口,nsqd 用它来广播;HTTP 接口,客户端用它来发现和管理。
- 在生产环境中,为了高可用,最好部署三个nsqlookupd服务。
nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。
运行后,能够通过4171端口查看并管理topic和channel。
utilities 常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq。

概念
Topic ——一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。
Channels ——channel组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。
Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。
Message s——消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。
Topic和Channel
官方有个非常漂亮的动态图,展示了一个topic对应多个channel的效果


部署
准备192.168.1.93、192.168.2.41两台部署
1、下载 https://nsq.io/deployment/installing.html
解压到/usr/local/nsq目录下,创建conf、data、logs目录

2、定义配置文件
nsqd.cfg
##是否启用详细记录
verbose = true
## unique part for message IDs, (int) in range [0,1024) (default is hash of hostname) (default 616)
# node_id = 5150
## <addr>:<port>客户端TCP地址,客户端通过这个地址连接nsqd并进行订阅,发布。注意订阅必须通过TCP连接实现。
tcp_address = "0.0.0.0:4150"
## <addr>:<port> 用来进行发布的http端口,经过测试这里的ip只能是nsqd所在机器网卡的任意一个ip,如果填写127.0.0.1只接受本机请求。
http_address = "0.0.0.0:4151"
## <addr>:<port> to listen on for HTTPS clients (default "0.0.0.0:4152")
# https_address = "0.0.0.0:4152"
## address that will be registered with lookupd (defaults to the OS hostname) (default "PROSNAKES.local")
broadcast_address = "192.168.1.93"
## cluster of nsqlookupd TCP 地址,可以设置多个。
nsqlookupd_tcp_addresses = [
"192.168.1.93:4160",
"192.168.2.41:4160"
]
## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"
## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"
## path to store disk-backed messages
data_path="/usr/local/nsq/data/"
## number of messages to keep in memory (per topic/channel) (default 10000)
mem_queue_size = 10000
## number of bytes per diskqueue file before rolling
max_bytes_per_file = 104857600
## number of messages per diskqueue fsync
sync_every = 2500
## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"
## duration to wait before auto-requeing a message
msg_timeout = "60s"
## maximum duration before a message will timeout
max_msg_timeout = "15m"
## maximum size of a single message in bytes
max_msg_size = 1024768
## maximum requeuing timeout for a message
max_req_timeout = "1h"
## maximum size of a single command body
max_body_size = 5123840
## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"
## maximum RDY count for a client
max_rdy_count = 2500
## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536
## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"
## UDP <addr>:<port> of a statsd daemon for pushing stats
# statsd_address = "127.0.0.1:8125"
## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"
## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"
## toggle sending memory and GC stats to statsd
statsd_mem_stats = true
## path to certificate file
tls_cert = ""
## path to private key file
tls_key = ""
## set policy on client certificate (require - client must provide certificate,
## require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"
## set custom root Certificate Authority
# tls_root_ca_file = ""
## require client TLS upgrades
tls_required = false
## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""
## enable deflate feature negotiation (client compression) (default true)
deflate = true
## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6
## enable snappy feature negotiation (client compression)
snappy = true
nsqlookupd.cfg
##enable verbose logging verbose = false ## <addr>:<port> nsqd通过这个tcp地址注册到nsqlookup tcp_address = "0.0.0.0:4160" ## <addr>:<port> 消费客户端通过这个地址获取可用的服务列表 http_address = "0.0.0.0:4161" ## address that will be registered with lookupd (defaults to the OS hostname) # broadcast_address = "" ## duration of time a producer will remain in the active list since its last ping inactive_producer_timeout = "300s" ## duration of time a producer will remain tombstoned if registration remains tombstone_lifetime = "45s"
nsqadmin.cfg
## 在浏览器中访问这个地址进入管理。
http_address = "0.0.0.0:4171"
## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
notification_http_endpoint = ""
## nsq发现服务地址
nsqlookupd_http_addresses = [
"192.168.1.93:4161"
]
3、启动进程
nohup nsqlookupd -config /usr/local/nsq/conf/nsqlookupd.cfg > /usr/local/nsq/logs/nsqlookupd.log 2>&1& nohup nsqd -config /usr/local/nsq/conf/nsqd.cfg > /usr/local/nsq/logs/nsqd.log 2>&1& nohup nsqadmin -config /usr/local/nsq/conf/nsqadmin.cfg > /usr/local/nsq/logs/nsqadmin.log 2>&1&
4、访问页面

GO的client
go get -u github.com/nsqio/go-nsq
使用样例可以参考这个包中的test内容。
python的client
#-*- coding:utf-8 -*-
import nsq
import json
import logging
logger = logging.getLogger("file")
from threading import Thread
import tornado.ioloop
class MQSUB(Thread):
def __init__(self, addr_list, channel):
super(MQSUB,self).__init__()
self.lookupd_http_addresses = addr_list
self.channel = channel
def message_handle(self, message):
msg_s = message.body
try:
msg = json.loads(msg_s)
print msg
logger.debug('msg:%s'%msg)
message.finish()
except Exception as e:
logger.error('Exception:%s' % e)
message.requeue(backoff=False, delay=10)
def sub(self, topic):
nsq.Reader(message_handler=self.message_handle,
lookupd_http_addresses=self.lookupd_http_addresses,
topic=topic, channel=self.channel, max_in_flight=9)
def run(self):
tornado.ioloop.IOLoop.instance().start()
#nsq.run()
if __name__ == '__main__':
address1 = '192.168.1.73:4161'
address2 = '192.168.1.74:4161'
addr_list = [address1,address2]
topic = 'idcinfo'
channel = 'async'
m_s = MQSUB(addr_list,channel)
m_s.sub(topic)
m_s.start()
服务开机自启动注册
/lib/systemd/system/nsqlookupd.service内容如下
[Unit] Description=nsqlookupd ConditionFileIsExecutable=/usr/local/nsq/bin/nsqlookupd [Service] StartLimitInterval=5 StartLimitBurst=10 ExecStart=/usr/bin/sh -c '/usr/local/nsq/bin/nsqlookupd -config /usr/local/nsq/conf/nsqlookupd.cfg > /usr/local/nsq/logs/nsqlookupd.log 2>&1' Restart=always RestartSec=120 LimitNOFILE=65535 [Install] WantedBy=multi-user.target
/lib/systemd/system/nsqd.service 内容如下
[Unit] Description=nsqd ConditionFileIsExecutable=/usr/local/nsq/bin/nsqd [Service] StartLimitInterval=5 StartLimitBurst=10 ExecStart=/usr/bin/sh -c '/usr/local/nsq/bin/nsqd -config /usr/local/nsq/conf/nsqd.cfg > /usr/local/nsq/logs/nsqd.log 2>&1' Restart=always RestartSec=120 LimitNOFILE=65535 [Install] WantedBy=multi-user.target
/lib/systemd/system/nsqadmin.service内容如下
[Unit] Description=nsqadmin ConditionFileIsExecutable=/usr/local/nsq/bin/nsqadmin [Service] StartLimitInterval=5 StartLimitBurst=10 ExecStart=/usr/bin/sh -c '/usr/local/nsq/bin/nsqadmin -config /usr/local/nsq/conf/nsqadmin.cfg > /usr/local/nsq/logs/nsqadmin.log 2>&1' Restart=always RestartSec=120 LimitNOFILE=65535 [Install] WantedBy=multi-user.target
nsqd API调用样例
package main
import (
"fmt"
"time"
"bytes"
"net/http"
"net"
"io/ioutil"
"github.com/gin-gonic/gin/json"
)
const (
nsqdMaxIdleConns int = 5
nsqdMaxIdleConnsPerHost int = 5
nsqdIdleConnTimeout int = 30
)
func nsqdHttpCommon(method, url string, data []byte) error{
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: nsqdMaxIdleConns,
MaxIdleConnsPerHost: nsqdMaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(nsqdIdleConnTimeout) * time.Second,
},
Timeout: 6 * time.Second,
}
req, err := http.NewRequest(method, url, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println(resp.StatusCode, string(body))
return nil
}
func nsqdPing(addr string){
url := fmt.Sprint("http://", addr , "/ping")
fmt.Println(url)
nsqdHttpCommon("GET", url, nil)
}
func nsqdInfo(addr string){
url := fmt.Sprint("http://", addr , "/info")
fmt.Println(url)
nsqdHttpCommon("GET", url, nil)
}
func stats(addr string){
url := fmt.Sprint("http://", addr , "/stats")
fmt.Println(url)
nsqdHttpCommon("GET", url, nil)
}
func pub(addr, topic string, data []byte){
url := fmt.Sprintf("http://%s/pub?topic=%s", addr, topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, data)
}
func mpub(addr, topic string, data []byte){
url := fmt.Sprintf("http://%s/mpub?topic=%s", addr, topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, data)
}
func nsqdTopicCreate(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/create?topic=", topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdTopicDelete(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/delete?topic=", topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdTopicEmpty(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/empty?topic=", topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdTopicPause(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/pause?topic=", topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdTopicUnPause(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/unpause?topic=", topic)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdChannelCreate(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdChannelDelete(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdChannelEmpty(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/empty?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdChannelPause(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/pause?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func nsqdChannelUnPause(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/unpause?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
nsqdHttpCommon("POST", url, nil)
}
func main(){
addr := "192.168.1.93:4151"
topic := "shhnwangjian"
var data []string
data = append(data, "1323", "fdsf", "fdsafds")
dataj, _ := json.Marshal(data)
fmt.Println(dataj)
nsqdPing(addr)
nsqdInfo(addr)
stats(addr)
pub(addr, topic, dataj)
mpub(addr, topic, dataj)
nsqdTopicCreate(addr, topic)
nsqdTopicDelete(addr, topic)
nsqdTopicEmpty(addr, topic)
nsqdTopicPause(addr, topic)
nsqdTopicUnPause(addr, topic)
nsqdChannelCreate(addr, topic, "ch1")
nsqdChannelDelete(addr, topic, "ch1")
nsqdChannelEmpty(addr, topic, "ch2")
nsqdChannelPause(addr, topic, "ch1")
nsqdChannelUnPause(addr, topic, "ch1")
}
nsqlookupd调用样例
package main
import (
"time"
"bytes"
"net"
"net/http"
"io/ioutil"
"fmt"
)
const (
MaxIdleConns int = 5
MaxIdleConnsPerHost int = 5
IdleConnTimeout int = 30
)
func httpCommon(method, url string, data []byte) error{
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
MaxIdleConns: MaxIdleConns,
MaxIdleConnsPerHost: MaxIdleConnsPerHost,
IdleConnTimeout: time.Duration(IdleConnTimeout) * time.Second,
},
Timeout: 6 * time.Second,
}
req, err := http.NewRequest(method, url, bytes.NewBuffer(data))
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
fmt.Println(err)
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return err
}
fmt.Println(resp.StatusCode, string(body))
return nil
}
func lookup(addr, topic string) {
url := fmt.Sprint("http://", addr , "/lookup?topic=", topic)
fmt.Println(url)
httpCommon("GET", url, nil)
}
func topics(addr string){
url := fmt.Sprint("http://", addr , "/topics")
fmt.Println(url)
httpCommon("GET", url, nil)
}
func channles(addr, topic string){
url := fmt.Sprint("http://", addr , "/channels?topic=", topic)
fmt.Println(url)
httpCommon("GET", url, nil)
}
func ping(addr string){
url := fmt.Sprint("http://", addr , "/ping")
fmt.Println(url)
httpCommon("GET", url, nil)
}
func info(addr string){
url := fmt.Sprint("http://", addr , "/info")
fmt.Println(url)
httpCommon("GET", url, nil)
}
func nodes(addr string){
url := fmt.Sprint("http://", addr , "/nodes")
fmt.Println(url)
httpCommon("GET", url, nil)
}
func topicCreate(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/create?topic=", topic)
fmt.Println(url)
httpCommon("POST", url, nil)
}
func topicDelete(addr, topic string){
url := fmt.Sprint("http://", addr , "/topic/delete?topic=", topic)
fmt.Println(url)
httpCommon("POST", url, nil)
}
func channelCreate(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/create?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
httpCommon("POST", url, nil)
}
func channelDelete(addr, topic, ch string){
url := fmt.Sprintf("http://%s/channel/delete?topic=%s&channel=%s", addr, topic, ch)
fmt.Println(url)
httpCommon("POST", url, nil)
}
func topicTombstone(addr, topic, node string){
url := fmt.Sprintf("http://%s/topic/tombstone?topic=%s&node=%s", addr, topic, node)
fmt.Println(url)
httpCommon("POST", url, nil)
}
func main(){
addr := "192.168.1.93:4161"
topic := "test"
lookup(addr, topic)
topics(addr)
channles(addr, topic)
ping(addr)
info(addr)
nodes(addr)
topicCreate(addr, topic)
topicDelete(addr, topic)
channelCreate(addr, topic, "ch1")
channelDelete(addr, topic, "ch1")
topicTombstone(addr, topic, "192.168.1.93:4151")
}
go-nsq发布消息样例
package main
import (
"github.com/nsqio/go-nsq"
"fmt"
"time"
)
var (
addrs = [...]string{"192.168.1.93:4150", "192.168.2.41:4150"}
)
func ProducerPing(addr string) bool {
config := nsq.NewConfig()
w, _ := nsq.NewProducer(addr, config)
err := w.Ping()
if err != nil {
fmt.Println("should connect on ping")
return false
}
return true
}
func ProducerPublish (addr string){
config := nsq.NewConfig()
w, _ := nsq.NewProducer(addr, config)
defer w.Stop()
for {
err := w.Publish("shhnwangjian", []byte(addr + "_publish_test_case_" + fmt.Sprint(time.Now().Unix())))
if err != nil {
fmt.Println(err)
}
time.Sleep(10 * time.Millisecond)
}
}
func main(){
for _, addr := range addrs{
if ProducerPing(addr){
ProducerPublish(addr)
}
}
}
go-nsq订阅消息样例
package main
import (
"github.com/nsqio/go-nsq"
"fmt"
"sync"
"time"
)
type NSQHandler struct {
}
func (this *NSQHandler) HandleMessage(message *nsq.Message) error {
fmt.Println("recv:", string(message.Body))
return nil
}
func consumer () {
waiter := sync.WaitGroup{}
waiter.Add(1)
go func() {
defer waiter.Done()
config := nsq.NewConfig()
config.LookupdPollInterval = 5 * time.Second
q, _ := nsq.NewConsumer("shhnwangjian", "ch1", config)
q.AddHandler(&NSQHandler{})
//addr := "192.168.1.93:4150"
// 建立一个nsqd连接
//err := q.ConnectToNSQD(addr)
//if err != nil {
// fmt.Println(err)
//}
//建立多个nsqd连接
err := q.ConnectToNSQDs([]string{"192.168.1.93:4150", "192.168.2.41:4150"})
if err != nil {
fmt.Println(err)
}
select {}
}()
waiter.Wait()
}
func main(){
consumer()
}
参考文章: