zoukankan      html  css  js  c++  java
  • NATS_08:NATS客户端Go语言手动编写

    NATS客户端
        一个NATS客户端是基于NATS服务端来说既可以是一个生产数据的也可以是消费数据的。生产数据的叫生产者英文为 publishers,消费数据的叫消费者英文为 subscribers,其中消费者既可以是同步的也可以是异步的。NATS客户端与NATS服务端是通过点对点的方式进行连接通信的,客户端是不需要知道彼此的位置就可以相互通信的。

      目前Apcera也积极的为我们维护和提供了多个其他语言的客户端,我们可以直接下载使用。当然,我们也可以自己去写相关的客户端代码。

    Go语言版的客户端

      接下来我们就用 Go 语言来自己写客户端实现与 NATS 服务进行通信

     1. 异步的订阅者subscriber

      手动编写一个名为 async-sub.go 的源码,它是一个最基本的带有 debugging 日志的客户端代码,具体代码如下所示:

    package main
    
    import (
        "runtime"
        "log"
        "github.com/nats-io/go-nats"
    )
    
    func main() {
        // create server connection
        natsConnection, _ := nats.Connect(nats.DefaultURL)
        log.Println("connected to " + nats.DefaultURL)
    
        // subscribe to subject
        log.Printf("subscribing to subject 'foo' 
    ")
        natsConnection.Subscribe("foo", func(msg *nats.Msg) {
            //handle the message
            log.Printf("received message '%s
    ", string(msg.Data) + "'")
        })
        
        // keep the connection alive
        runtime.Goexit()
    }

    解释

      1. import packages

        我们需要引入有关 nats 相关服务包;而对于订阅者subscriber还必须引入go包中的runtime;我们还需要使用go中的log包,用于客户端的日志信息打印,方便我们随时查看问题。

      2. 创建连接

        NATS服务端口默认是运行在4222,客户端采用默认连接方式创建一个连接。其中nats.DefaultURL->nats://localhost:4222

      3. 订阅主题

        客户端订阅主题为“foo“的NATS消息。订阅者的方法会返回收到消息的有效信息。

      4. 消息处理

        订阅者实现异步消息处理程序来处理消息。在这个应用案例中,客户端只是将每个收到的消息进行日志的打印。没有显式的去编写消息处理程序代码,订阅是同步的要求是需要额外客户端代码来处理消息(参见下面的同步用户的例子)。

      5. 保持连接一直为激活状体啊

        这个 runtime.Goexit() 是保证在主程序执行完之后客户端程序一直为激活状态,换句话说,客户端并不会单一接收到消息后就终止运行了。

      6. 测试

        运行我们上面写到代码,如果你是在编译器中编写的代码,那么可以直接在编译器上运行;如果是记事本类进行编写的,则需要在终端运行:

        go run async-sub.go

        最终打印的结果为:

      2017/04/05 14:54:57 connected to nats://localhost:4222
      2017/04/05 14:54:57 subscribing to subject 'foo' 

     2. 简单发布者publisher

      这个客户端 pub-simple.go 在主题 "foo" 上发布一个简单的NATS消息为“Hello NATS“,那么订阅者客户端 async-sub.go 就应该会收到这条消息

    package main
    
    import (
        "log"
        "github.com/nats-io/go-nats"
        "time"
    )
    
    func main() {
        // create server connection and defer close
        natsConnection, _ := nats.Connect(nats.DefaultURL)
        defer natsConnection.Close()
        log.Println("connected to " + nats.DefaultURL)
    
        // publish messge on subject by name foo
        subject := "foo"
        natsConnection.Publish(subject, []byte("Hello NATS"))
        log.Printf("published message on subject " + subject)
    
        time.Sleep(30 * time.Second)
    }

      这里对应订阅主题是可以支持简单正则相关的定义,所以我们可以指定很多规则。这段代码和上面订阅者的代码类似,这里就不多做冗余讲解了,直接将运行结果贴出来:

    2017/04/05 16:00:03 connected to nats://localhost:4222
    2017/04/05 16:00:03 subscribing to subject 'foo' 
    2017/04/05 16:00:12 received message 'Hello NATS'

      除了一个简单的产生者发送消息(使用字符串的方式)案例,我们还可以使用内置的 msg 结构体,具体案例如下所示:

    package main
    
    import (
        "log"
        "github.com/nats-io/go-nats"
    )
    
    func main() {
        // create server connection and defer close
        natsConnection, _ := nats.Connect(nats.DefaultURL)
        defer natsConnection.Close()
        log.Println("connected to " + nats.DefaultURL)
    
        // msg structure
        msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World")}
        natsConnection.PublishMsg(msg)
    
        log.Println("published msg.Subject = " + msg.Subject, " | msg.Data = " + string(msg.Data))
    }

      运行结果和上面的类似。

     3. 同步订阅者客户端

      同步客户并没有实现与用户的消息处理程序。相反,接收订阅的客户端负责实现代码来处理消息。此时客户被阻塞无法接受更多的信息,直到客户端处理完返回消息。

      举例如下

    sub, err := natsConnection.SubscribeSync("foo")
    m, err := sub.NextMsg(timeout)

     4. 权限验证

      我们在启动NATS服务的时候想要设定一定的权限控制,这样当客户端连接的时候必须需要验证才能访问连接。目前具体的有两种方式,一种是配置文件配置,另外一种就是在启动时加上相应的权限内容。这里说一下命令行启动时如何开启权限验证:

    gnatsd -DV -m 8222 -user foo -pass bar

      其中 -DV 是 logging 的配置项,目的是为了更加详细的观察每一个客户端的连接详情,具体其他的可以参看我上一篇博客。这里只讲用到的:

      -D, --debug                   Enable debugging output

      -V,  --trace                    Trace the raw protocol

      -DV                               Debug and trace

      -user          User required for connections

      -pass          Password required for connections

      启动信息打印如下:

    [4037] 2017/04/05 17:20:47.015397 [INF] Starting nats-server version 0.9.6
    [4037] 2017/04/05 17:20:47.015478 [DBG] Go build version go1.8
    [4037] 2017/04/05 17:20:47.015489 [INF] Starting http monitor on 0.0.0.0:8222
    [4037] 2017/04/05 17:20:47.015589 [INF] Listening for client connections on 0.0.0.0:4222
    [4037] 2017/04/05 17:20:47.015624 [DBG] Server id is S1wbxzcmrpcNKURHXNEfvc
    [4037] 2017/04/05 17:20:47.015629 [INF] Server is ready

      如果我还直接用上面没有改动的 async-sub.go 代码直接运行,观察 NATS 服务器日志信息打印如下:

    [4037] 2017/04/05 17:23:41.289881 [DBG] ::1:52074 - cid:1 - Client connection created
    [4037] 2017/04/05 17:23:41.290715 [TRC] ::1:52074 - cid:1 - ->> [CONNECT {"verbose":false,"pedantic":false,"tls_required":false,"name":"","lang":"go","version":"1.2.2","protocol":1}]
    [4037] 2017/04/05 17:23:41.290805 [ERR] ::1:52074 - cid:1 - Authorization Error
    [4037] 2017/04/05 17:23:41.290817 [TRC] ::1:52074 - cid:1 - <<- [-ERR Authorization Violation]
    [4037] 2017/04/05 17:23:41.290839 [DBG] ::1:52074 - cid:1 - Client connection closed

      那么接下来我们就需要改动上面已经写过的客户端 订阅者 和 发布者 的代码

      1. async-sub.go 代码修改,其中修改的部分用红色字体标注

    package main
    
    import (
        "runtime"
        "log"
        "github.com/nats-io/go-nats"
    )
    
    func main() {
        // create server connection
        natsConnection, _ := nats.Connect("nats://foo:bar@localhost:4222")
        log.Println("connected to " + nats.DefaultURL)
    
        // subscribe to subject
        log.Printf("subscribing to subject 'foo' 
    ")
        natsConnection.Subscribe("foo", func(msg *nats.Msg) {
            //handle the message
            log.Printf("received message '%s
    ", string(msg.Data) + "'")
        })
    
        // keep the connection alive
        runtime.Goexit()
    }

      此时服务器后端日志信息打印如下内容:

    [4037] 2017/04/05 17:31:13.681407 [TRC] ::1:52104 - cid:2 - ->> [PING]
    [4037] 2017/04/05 17:31:13.681411 [TRC] ::1:52104 - cid:2 - <<- [PONG]
    [4037] 2017/04/05 17:31:13.681608 [TRC] ::1:52104 - cid:2 - ->> [SUB foo  1]

      2. pub-simple.go 代码修改,其中修改的部分用红色字体标注

    package main
    
    import (
        "log"
        "github.com/nats-io/go-nats"
    )
    
    func main() {
        // create server connection and defer close
        natsConnectionString := "nats://foo:bar@localhost:4222"
        natsConnection, _ := nats.Connect(natsConnectionString)
        defer natsConnection.Close()
        log.Println("connected to " + nats.DefaultURL)
    
        // msg structure
        msg := &nats.Msg{Subject: "foo", Reply: "bar", Data: []byte("Hello World")}
        natsConnection.PublishMsg(msg)
    
        log.Println("published msg.Subject = " + msg.Subject, " | msg.Data = " + string(msg.Data))
    }

      此时服务器终端打印日志信息如下:

    [4037] 2017/04/05 17:33:13.687547 [DBG] ::1:52104 - cid:2 - Client Ping Timer
    [4037] 2017/04/05 17:33:13.687584 [TRC] ::1:52104 - cid:2 - <<- [PING]
    [4037] 2017/04/05 17:33:13.687651 [TRC] ::1:52104 - cid:2 - ->> [PING]
    [4037] 2017/04/05 17:33:13.687660 [TRC] ::1:52104 - cid:2 - <<- [PONG]
    [4037] 2017/04/05 17:33:13.687854 [TRC] ::1:52104 - cid:2 - ->> [PONG]
    [4037] 2017/04/05 17:35:13.691474 [DBG] ::1:52104 - cid:2 - Client Ping Timer
    [4037] 2017/04/05 17:35:13.691504 [TRC] ::1:52104 - cid:2 - <<- [PING]
    [4037] 2017/04/05 17:35:13.691589 [TRC] ::1:52104 - cid:2 - ->> [PING]
    [4037] 2017/04/05 17:35:13.691599 [TRC] ::1:52104 - cid:2 - <<- [PONG]
    [4037] 2017/04/05 17:35:13.691658 [TRC] ::1:52104 - cid:2 - ->> [PONG]
    [4037] 2017/04/05 17:35:28.933556 [DBG] ::1:52117 - cid:3 - Client connection created
    [4037] 2017/04/05 17:35:28.933944 [TRC] ::1:52117 - cid:3 - ->> [CONNECT {"verbose":false,"pedantic":false,"user":"foo","pass":"bar","tls_required":false,"name":"","lang":"go","version":"1.2.2","protocol":1}]
    [4037] 2017/04/05 17:35:28.933986 [TRC] ::1:52117 - cid:3 - ->> [PING]
    [4037] 2017/04/05 17:35:28.933992 [TRC] ::1:52117 - cid:3 - <<- [PONG]
    [4037] 2017/04/05 17:35:28.934227 [TRC] ::1:52117 - cid:3 - ->> [PUB foo bar 11]
    [4037] 2017/04/05 17:35:28.934240 [TRC] ::1:52117 - cid:3 - ->> MSG_PAYLOAD: [Hello World]
    [4037] 2017/04/05 17:35:28.934270 [TRC] ::1:52104 - cid:2 - <<- [MSG foo 1 bar 11]
    [4037] 2017/04/05 17:35:28.935859 [DBG] ::1:52117 - cid:3 - Client connection closed

      从以上结果可以看出,前10行日志是订阅者自发的ping-pong操作,检测服务是否可以ping通,说白了就是心跳检测。后面开始有发布者开始创建连接,创建成功后会打印发布的消息日志,最后关闭响应的连接。

      以上就是使用 Go 语言模拟客户端 发布者/订阅者的实现

  • 相关阅读:
    session和cookie
    数据库备份
    使用pip安装.whl文件时出现is not a supported wheel on this platform的解决办法
    multiprocessing模块
    threading模块
    python中多线程相关
    python中实现单例模式
    Flask-SQLAlchemy相关与Flask-Migrate相关
    redis模块
    Flask-Login中装饰器@login_manager.user_loader的作用及原理
  • 原文地址:https://www.cnblogs.com/liang1101/p/6668861.html
Copyright © 2011-2022 走看看