zoukankan      html  css  js  c++  java
  • golang nats[4] request reply模式

    请求响应模式

    无论是发布订阅模式还是queue模式,nats都不能保证消息一定发送到订阅方,除非订阅者发送一个响应给发布者。
    所以订阅者发送一个回执给发布者,就是请求响应模式。

    这种模式有什么用?

    nats要求订阅者一定要先完成订阅,发布消息后,订阅者才能收到消息,类似离线消息的模式nats不支持。就算先完成订阅,后发送消息,消息发送方也不知道是否有订阅者收到了消息,请求响应模式就是应对这种情况。

    基本流程

    A发送消息,B收到消息,发送回执给A。这就是request reply的基本流程。

    基本实现原理

    • A启用request模式发送消息(消息中包含了回执信息,replya主题),同步等待回执(有超时时间)。
    • B收到消息,在消息中取出回执信息=replay主题,对replay主题,主动发送普通消息(消息内容可自定义,比如server A上的service1收到msgid=xxxx的消息。)。
    • A在超时内收到消息,确认结束。
    • A在超时内未收到消息,超时结束。

    注意

    • 因为A发送的消息中包装了回执测相关信息,订阅者B收到消息后,也要主动发送回执,所以请求响应模式,对双方都有影响。
    • A发送消息后,等待B的回执,需要给A设置超时时间,超时后,不在等待回执,直接结束,效果和不需要回执的消息发送一样,不在关心是否有订阅者收到消息。

    两种模式

    request reply有两种模式:

    • one to one 默认模式

    1条消息,N个订阅者,消息发送方,仅会收到一条回执记录(因为消息发送方收到回执消息后,就自动断开了对回执消息的订阅。),即使N个订阅都都收到了消息。注意:pub/sub和queue模式的不同

    • one to many 非默认模式,需要自己实现

    1条消息,N个订阅者,消息发送方,可以自己设定一个数量限制N,接受到N个回执消息后,断开对回执消息的订阅。

    Server

    package main
    
    import (
        "github.com/nats-io/go-nats"
        "log"
        "flag"
    )
    
    const (
        //url   = "nats://192.168.3.125:4222"
        url = nats.DefaultURL
    )
    
    var (
        nc *nats.Conn
    
        encodeConn *nats.EncodedConn
        err        error
    )
    
    func init() {
        if nc, err = nats.Connect(url); checkErr(err) {
            //
            if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
                checkErr(err) {
            }
        }
    }
    
    func main() {
        var (
            servername = flag.String("servername", "Y", "name for server")
            queueGroup = flag.String("group", "", "group name for Subscribe")
            subj       = flag.String("subj", "yasenagat", "subject name")
        )
        flag.Parse()
    
        mode := "queue"
        if *queueGroup == "" {
            mode = "pub/sub"
        }
        log.Printf("Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode)
    
        startService(*subj, *servername+" worker1", *queueGroup)
        startService(*subj, *servername+" worker2", *queueGroup)
        startService(*subj, *servername+" worker3", *queueGroup)
    
        nc.Flush()
        select {}
    }
    
    //receive message
    func startService(subj, name, queue string) {
        go async(nc, subj, name, queue)
    }
    
    func async(nc *nats.Conn, subj, name, queue string) {
        replyMsg := name + " Received a msg"
        if queue == "" {
            nc.Subscribe(subj, func(msg *nats.Msg) {
                nc.Publish(msg.Reply, []byte(replyMsg))
                log.Println(name, "Received a message From Async : ", string(msg.Data))
            })
        } else {
            nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
                nc.Publish(msg.Reply, []byte(replyMsg))
                log.Println(name, "Received a message From Async : ", string(msg.Data))
            })
        }
    
    }
    
    func checkErr(err error) bool {
        if err != nil {
            log.Println(err)
            return false
        }
        return true
    }
    
    

    Client

    package main
    
    import (
        "github.com/nats-io/go-nats"
        "log"
        "github.com/pborman/uuid"
        "flag"
        "time"
    )
    
    const (
        //url   = "nats://192.168.3.125:4222"
        url = nats.DefaultURL
    )
    
    var (
        nc         *nats.Conn
        encodeConn *nats.EncodedConn
        err        error
    )
    
    func init() {
        if nc, err = nats.Connect(url); checkErr(err, func() {
    
        }) {
            //
            if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
                checkErr(err, func() {
    
                }) {
    
            }
        }
    }
    
    func main() {
        var (
            subj = flag.String("subj", "yasenagat", "subject name")
        )
        flag.Parse()
        log.Println(*subj)
        startClient(*subj)
    
        time.Sleep(time.Second)
    }
    
    //send message to server
    func startClient(subj string) {
        for i := 0; i < 3; i++ {
            id := uuid.New()
            log.Println(id)
            if msg, err := nc.Request(subj, []byte(id+" hello"), time.Second); checkErr(err, func() {
                // handle err
            }) {
                log.Println(string(msg.Data))
            }
        }
    }
    
    func checkErr(err error, errFun func()) bool {
        if err != nil {
            log.Println(err)
            errFun()
            return false
        }
        return true
    }
    
    

    pub/sub模式启动

    $ ./main
    2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode
    2018/08/18 18:54:26 Y worker2 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
    2018/08/18 18:54:26 Y worker1 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
    2018/08/18 18:54:26 Y worker3 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
    2018/08/18 18:54:26 Y worker2 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
    2018/08/18 18:54:26 Y worker1 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
    2018/08/18 18:54:26 Y worker3 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
    2018/08/18 18:54:26 Y worker2 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
    2018/08/18 18:54:26 Y worker1 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
    2018/08/18 18:54:26 Y worker3 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
    

    发送消息

    $ ./main
    2018/08/18 18:54:26 yasenagat
    2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a
    2018/08/18 18:54:26 Y worker3 Received a msg
    2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c
    2018/08/18 18:54:26 Y worker2 Received a msg
    2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e
    2018/08/18 18:54:26 Y worker2 Received a msg
    

    queue模式启动

    $ ./main -group=test
    2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode
    2018/08/18 19:14:33 Y worker2 Received a message From Async :  4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg
    2018/08/18 19:14:33 Y worker3 Received a message From Async :  4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg
    2018/08/18 19:14:33 Y worker2 Received a message From Async :  38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg
    

    发送消息

    $ ./main
    2018/08/18 19:14:33 yasenagat
    2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e
    2018/08/18 19:14:33 Y worker2 Received a msg
    2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0
    2018/08/18 19:14:33 Y worker3 Received a msg
    2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127
    2018/08/18 19:14:33 Y worker2 Received a msg
    

    queue模式下,发送3条消息,3个订阅者有相同的queue,每条消息只有一个订阅者收到。

    pub/sub模式下,发送3条消息,3个订阅者都收到3条消息,一共9条。

    总结:

    回执主要解决:订阅者是否收到消息的问题、有多少个订阅者收到消息的问题。(不是具体业务是否执行完成的回执!)
    基于事件的架构模式可以构建于消息机制之上,依赖消息机制。异步调用的其中一种实现方式,就是基于事件模式。异步调用又是分布式系统中常见的任务处理方式。

    业务模式

    • 业务A发送eventA给事件中心,等待回执
    • 事件中心告知A收到了消息,开始对外发送广播
    • 订阅者B订阅了eventA主题
    • 事件中心对eventA主题发送广播,等待回执
    • B收到消息,告知事件中心,收到eventA,开始执行任务taskA
    • B异步执行完taskA,通知事件中心taskAComplete,等待回执
    • 事件中心发送回执给B,对外发送广播,taskAComplete
    • ........

    如果超时,未能收到回执,需要回执信息的确认方可以主动调用相关接口,查询任务执行状态,根据任务状态做后续的处理。



    作者:luckyase
    链接:https://www.jianshu.com/p/89f245ec7365
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    安装了windows mobile 5.0 pocket pc SDK
    落户这里
    NOIP 2021 游记
    log4net重复记日志
    Eclipse中properties文件中文显示编码、乱码问题
    查找包下已经实施的增强
    vs2010 编译 Ogre 1.8 源码
    Ogre 设计模式之Singleton
    23种设计模式的解析与C++实现及源码打包下载
    vs2010 编译 SALVIA源码
  • 原文地址:https://www.cnblogs.com/gao88/p/10007752.html
Copyright © 2011-2022 走看看