zoukankan      html  css  js  c++  java
  • NATS_13:NATS Streaming案例讲解

    启动服务

      首先启动 go-nats-streaming 服务。为了更加能说明问题,我们启动的时候不使用默认端口号

     $ nats-streaming-server -p 4242 -m 8222 -DV
    

     编写一个简单的应用

    package main
    
    import (
        "log"
        "github.com/nats-io/go-nats-streaming"
    )
    
    func main() {
        //stan.Connect(clusterID, clientID, ops ...Option)
        ns, _ := stan.Connect("test-cluster", "myID", stan.NatsURL("nats://localhost:4242"))
    
        // Simple Synchronous Publisher
        // does not return until an ack has been received from NATS Streaming
        ns.Publish("foo", []byte("Hello World"))
    
        // Simple Async Subscriber
        sub, _ := ns.Subscribe("foo", func(m *stan.Msg) {
            log.Printf("Received a message: %s
    ", string(m.Data))
        }, stan.StartWithLastReceived())
    
        log.Printf("subscribing to subject 'foo' 
    ")
    
        // Unsubscribe
        sub.Unsubscribe()
    
        // Close connection
        ns.Close()
    }

      上面的代码使用了订阅者启动参数的 StartWithLastReceived,这个函数的含义为:读取刚才发布者最近发布的消息内容。具体还有哪些启动参数,以下列出详情:

    // Subscribe starting with most recently published value
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartWithLastReceived())
    
    // Receive all stored values in order
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, DeliverAllAvailable())
    
    // Receive messages starting at a specific sequence number
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartAtSequence(22))
    
    // Subscribe starting at a specific time
    var startTime time.Time
    ...
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartAtTime(startTime))
    
    // Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago)
    sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
        fmt.Printf("Received a message: %s
    ", string(m.Data))
    }, StartAtTimeDelta(time.Duration(30)))
  • 相关阅读:
    Bzoj1101 [POI2007]Zap
    Bzoj2393 Cirno的完美算数教室
    UVa10891 Game of Sum
    Bzoj4128 Matrix
    类的组合
    继承
    属性查找与绑定方法
    类与对象
    面向对象程序设计——基础概念
    修改个人信息的程序
  • 原文地址:https://www.cnblogs.com/liang1101/p/6679648.html
Copyright © 2011-2022 走看看