zoukankan      html  css  js  c++  java
  • NATS_13:NATS Streaming案例讲解

    启动服务

      首先启动 go-nats-streaming 服务。为了更加能说明问题,我们启动的时候不使用默认端口号

     $ nats-streaming-server -p 4242 -m 8222 -DV
    

     编写一个简单的应用

    package main
    
    import (
        "log"
        "github.com/nats-io/go-nats-streaming"
    )
    
    func main() {
        //stan.Connect(clusterID, clientID, ops ...Option)
        ns, _ := stan.Connect("test-cluster", "myID", stan.NatsURL("nats://localhost:4242"))
    
        // Simple Synchronous Publisher
        // does not return until an ack has been received from NATS Streaming
        ns.Publish("foo", []byte("Hello World"))
    
        // Simple Async Subscriber
        sub, _ := ns.Subscribe("foo", func(m *stan.Msg) {
            log.Printf("Received a message: %s
    ", string(m.Data))
        }, stan.StartWithLastReceived())
    
        log.Printf("subscribing to subject 'foo' 
    ")
    
        // Unsubscribe
        sub.Unsubscribe()
    
        // Close connection
        ns.Close()
    }

      上面的代码使用了订阅者启动参数的 StartWithLastReceived,这个函数的含义为:读取刚才发布者最近发布的消息内容。具体还有哪些启动参数,以下列出详情:

    // Subscribe starting with most recently published value
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartWithLastReceived())
    
    // Receive all stored values in order
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, DeliverAllAvailable())
    
    // Receive messages starting at a specific sequence number
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartAtSequence(22))
    
    // Subscribe starting at a specific time
    var startTime time.Time
    ...
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartAtTime(startTime))
    
    // Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago)
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartAtTimeDelta(time.Duration(30)))
  • 相关阅读:
    Ubuntu12.04安装svn1.8
    [NOIP模拟测试3] 建造游乐园 题解(欧拉图性质)
    图论模板
    [bzoj3073] Journeys 题解(线段树优化建图)
    [bzoj3033]太鼓达人 题解(搜索)
    [NOIP2016]天天爱跑步 题解(树上差分) (码长短跑的快)
    [HNOI2015]菜肴制作 题解(贪心+拓扑)
    [SDOI2015]排序 题解 (搜索)
    [CQOI2011]放棋子 题解(dp+组合数学)
    [ZJOI2011]看电影(组合数学/打表+高精)
  • 原文地址:https://www.cnblogs.com/liang1101/p/6679648.html
Copyright © 2011-2022 走看看