zoukankan      html  css  js  c++  java
  • Go NSQ分布式消息队列

     下载NSQ:https://nsq.io/deployment/installing.html

    以下步骤将在本地计算机上运行一个小型NSQ群集,并逐步完成将消息发布,使用和归档到磁盘的过程。

    1. 请按照INSTALLING文档中的说明进行操作

    2. 在一个shell中,执行nsqlookupd

      $ ./nsqlookupd
    3. 在另一个shell中,执行nsqd

      $ ./nsqd --lookupd-tcp-address=127.0.0.1:4160
    4. 在另一个shell中,开始nsqadmin

      $ nsqadmin --lookupd-http-address=127.0.0.1:4161
    5. 发布初始消息(也在集群中创建主题):

      $ curl -d 'hello world 1' 'http://127.0.0.1:4151/pub?topic=test'
    6. 最后,在另一个shell中,开始nsq_to_file

      $ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
    7. 将更多消息发布到nsqd

      $ curl -d 'hello world 2' 'http://127.0.0.1:4151/pub?topic=test'
      $ curl -d 'hello world 3' 'http://127.0.0.1:4151/pub?topic=test'
      
    8. 为了验证事情是否按预期进行,请在打开的网络浏览器http://127.0.0.1:4171/中查看nsqadmin用户界面并查看统计信息。

             另外,检查test.*.log写入的日志文件(的内容/tmp

      

    这里重要的一课是,nsq_to_file没有明确告知(客户端)test 主题的产生地,它从中检索该信息,nsqlookupd并且尽管连接定时,也不会丢失任何消息。

    Go操作NSQ

    go get -u github.com/nsqio/go-nsq

    生产者

    // nsq_producer/main.go
    package main
    
    import (
    	"bufio"
    	"fmt"
    	"os"
    	"strings"
    
    	"github.com/nsqio/go-nsq"
    )
    
    // NSQ Producer Demo
    
    var producer *nsq.Producer
    
    // 初始化生产者
    func initProducer(str string) (err error) {
    	config := nsq.NewConfig()
    	producer, err = nsq.NewProducer(str, config)
    	if err != nil {
    		fmt.Printf("create producer failed, err:%v
    ", err)
    		return err
    	}
    	return nil
    }
    
    func main() {
    	nsqAddress := "127.0.0.1:4150"
    	err := initProducer(nsqAddress)
    	if err != nil {
    		fmt.Printf("init producer failed, err:%v
    ", err)
    		return
    	}
    
    	reader := bufio.NewReader(os.Stdin) // 从标准输入读取
    	for {
    		data, err := reader.ReadString('
    ')
    		if err != nil {
    			fmt.Printf("read string from stdin failed, err:%v
    ", err)
    			continue
    		}
    		data = strings.TrimSpace(data)
    		if strings.ToUpper(data) == "Q" { // 输入Q退出
    			break
    		}
    		// 向 'topic_demo' publish 数据
    		err = producer.Publish("topic_demo", []byte(data))
    		if err != nil {
    			fmt.Printf("publish msg to nsq failed, err:%v
    ", err)
    			continue
    		}
    	}
    }

    消费者

    // nsq_consumer/main.go
    package main
    
    import (
    	"fmt"
    	"os"
    	"os/signal"
    	"syscall"
    	"time"
    
    	"github.com/nsqio/go-nsq"
    )
    
    // NSQ Consumer Demo
    
    // MyHandler 是一个消费者类型
    type MyHandler struct {
    	Title string
    }
    
    // HandleMessage 是需要实现的处理消息的方法
    func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
    	fmt.Printf("%s recv from %v, msg:%v
    ", m.Title, msg.NSQDAddress, string(msg.Body))
    	return
    }
    
    // 初始化消费者
    func initConsumer(topic string, channel string, address string) (err error) {
    	config := nsq.NewConfig()
    	config.LookupdPollInterval = 15 * time.Second
    	c, err := nsq.NewConsumer(topic, channel, config)
    	if err != nil {
    		fmt.Printf("create consumer failed, err:%v
    ", err)
    		return
    	}
    	consumer := &MyHandler{
    		Title: "沙河1号",
    	}
    	c.AddHandler(consumer)
    
    	// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD
    	if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询
    		return err
    	}
    	return nil
    
    }
    
    func main() {
    	err := initConsumer("topic_demo", "first", "127.0.0.1:4161")
    	if err != nil {
    		fmt.Printf("init consumer failed, err:%v
    ", err)
    		return
    	}
    	c := make(chan os.Signal)        // 定义一个信号的通道
    	signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c
    	<-c                              // 阻塞
    }
  • 相关阅读:
    题解 P2647 【最大收益】
    CF817E Choosing The Commander
    P2922 [USACO08DEC]Secret Message G
    洛谷月赛 P7107 天选之人
    如何在考场上快速用C++写高级对拍器
    题解 CF527D 【Clique Problem】
    P6768 [USACO05MAR]Ombrophobic Bovines 发抖的牛
    [USACO06NOV]Corn Fields G
    Orz 教主的比赛题解
    JZOI 4311 统一天下
  • 原文地址:https://www.cnblogs.com/staff/p/13273419.html
Copyright © 2011-2022 走看看