zoukankan      html  css  js  c++  java
  • Go学习笔记

    Go学习笔记 - 使用jsonrpc进行远程访问

    JSON-RPC


    JSON-RPC是一个轻量级的远程调用协议,简单易用。

    请求数据体:

    {
        "method": "getName",
        "params": ["1"],
        "id": 1
    }
    

    method: 远端的方法名

    params: 远程方法接收的参数列表

    id: 本次请求的标识码,远程返回时数据的标识码应与本次请求的标识码相同

    返回数据体:

    {
        "result": {"id": 1, "name": "name1"},
        "error": null,
        "id": 1
    }
    

    result: 远程方法返回值

    error: 错误信息

    id: 调用时所传来的id


    Go的rpc包

    net/rpc

    net/rpc包实现了最基本的rpc调用,它默认通过HTTP协议传输gob数据来实现远程调用。

    服务端实现了一个HTTP server,接收客户端的请求,在收到调用请求后,会反序列化客户端传来的gob数据,获取要调用的方法名,并通过反射来调用我们自己实现的处理方法,这个处理方法传入固定的两个参数,并返回一个error对象,参数分别为客户端的请求内容以及要返回给客户端的数据体的指针。

    net/rpc/jsonrpc

    net/rpc/jsonrpc包实现了JSON-RPC协议,即实现了net/rpc包的ClientCodec接口与ServerCodec,增加了对json数据的序列化与反序列化。


    Go JSON-RPC远程调用

    客户端与服务端双方传输数据,其中数据结构必须得让双方都能处理。

    首先定义rpc所传输的数据的结构,client端与server端都得用到。

    // 需要传输的对象
    type RpcObj struct {
        Id   int `json:"id"` // struct标签, 如果指定,jsonrpc包会在序列化json时,将该聚合字段命名为指定的字符串
        Name string `json:"name"`
    }
    
    // 需要传输的对象
    type ReplyObj struct {
        Ok  bool `json:"ok"`
        Id  int `json:"id"`
        Msg string `json:"msg"`
    }
    

    RpcObj 为传输的数据

    ReplyObj 为服务端返回的数据

    这两个结构体均可以在client和server端双向传递

    服务端

    引入两个包

    "net/rpc"
    "net/rpc/jsonrpc"
    

    net/rpc实现了go的rpc框架,而net/rpc/jsonrpc则具体实现了JSON-RPC协议,具有json数据的序列化与反序列化功能。

    实现处理器

    // server端的rpc处理器
    type ServerHandler struct {}
    
    // server端暴露的rpc方法
    func (serverHandler ServerHandler) GetName(id int, returnObj *RpcObj) error {
        log.Println("server	-", "recive GetName call, id:", id)
        returnObj.Id = id
        returnObj.Name = "名称1"
        return nil
    }
    
    // server端暴露的rpc方法
    func (serverHandler ServerHandler) SaveName(rpcObj RpcObj, returnObj *ReplyObj) error {
        log.Println("server	-", "recive SaveName call, RpcObj:", rpcObj)
        returnObj.Ok = true
        returnObj.Id = rpcObj.Id
        returnObj.Msg = "存储成功"
        return nil
    }
    

    ServerHandler结构可以不需要什么字段,只需要有符合net/rpcserver端处理器约定的方法即可。

    符合约定的方法必须具备两个参数和一个error类型的返回值

    第一个参数 为client端调用rpc时交给服务器的数据,可以是指针也可以是实体。net/rpc/jsonrpc的json处理器会将客户端传递的json数据解析为正确的struct对象。

    第二个参数 为server端返回给client端的数据,必须为指针类型。net/rpc/jsonrpc的json处理器会将这个对象正确序列化为json字符串,最终返回给client端。

    ServerHandler结构需要注册给net/rpc的HTTP处理器,HTTP处理器绑定后,会通过反射得到其暴露的方法,在处理请求时,根据JSON-RPC协议中的method字段动态的调用其指定的方法。

    // 新建Server
    server := rpc.NewServer()
    
    // 开始监听,使用端口 8888
    listener, err := net.Listen("tcp", ":8888")
    if err != nil {
        log.Fatal("server	-", "listen error:", err.Error())
    }
    defer listener.Close()
    
    log.Println("server	-", "start listion on port 8888")
    
    // 新建处理器
    serverHandler := &ServerHandler{}
    
    // 注册处理器
    server.Register(serverHandler)
    
    // 等待并处理链接
    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Fatal(err.Error())
        }
    
        // 在goroutine中处理请求
        // 绑定rpc的编码器,使用http connection新建一个jsonrpc编码器,并将该编码器绑定给http处理器
        go server.ServeCodec(jsonrpc.NewServerCodec(conn))
    }
    

    rpc server端大致的处理流程

    golang_jsonrpc_server

    客户端

    客户端必须确保存在服务端在传输的数据中所使用的struct,在这里,必须确保客户端也能使用RpcObjReplyObjstruct。

    client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超时时间
    if err != nil {
        log.Fatal("client	-", err.Error())
    }
    
    defer client.Close()
    

    首先,通过net包使用TCP协议连接至服务器,这里设定了超时时间。

    clientRpc := jsonrpc.NewClient(client)
    

    然后使用jsonrpc.NewClient通过之前的TCP链接建立一个rpcClient实例。

    对于net/rpc的客户端,在远程调用是有同步(Synchronous)和异步(Asynchronous)两种方式。不论那种方式,在源码中,请求总是在一个新的goroutine中执行,并且使用一个通道(chan)来存放服务器返回值。使用同步方式调用时,调用方法内部会等待chan的数据,并一直阻塞直到远程服务器返回。而使用异步方式时,客户端的调用方法会直接将chan返回,这样就可以适时的处理数据而不影响当前goroutine。

    下面是net/rpc/client中调用远程rpc的源码

    // Go invokes the function asynchronously.  It returns the Call structure representing
    // the invocation.  The done channel will signal when the call is complete by returning
    // the same Call object.  If done is nil, Go will allocate a new channel.
    // If non-nil, done must be buffered or Go will deliberately crash.
    func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call {
        call := new(Call)
        call.ServiceMethod = serviceMethod
        call.Args = args
        call.Reply = reply
        if done == nil {
            done = make(chan *Call, 10) // buffered.
        } else {
            // If caller passes done != nil, it must arrange that
            // done has enough buffer for the number of simultaneous
            // RPCs that will be using that channel.  If the channel
            // is totally unbuffered, it's best not to run at all.
            if cap(done) == 0 {
                log.Panic("rpc: done channel is unbuffered")
            }
        }
        call.Done = done
        client.send(call)
        return call
    }
    
    // Call invokes the named function, waits for it to complete, and returns its error status.
    func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
        call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
        return call.Error
    }
    

    同步调用的使用

    // 远程服务器返回的对象
    var rpcObj RpcObj
    // 请求数据,rpcObj对象会被填充
    clientRpc.Call("ServerHandler.GetName", 1, &rpcObj)
    
    // 远程返回的对象
    var reply ReplyObj
    // 传给远程服务器的对象参数
    saveObj := RpcObj{2, "对象2"}
    // 请求数据
    clientRpc.Call("ServerHandler.SaveName", saveObj, &reply)
    

    Call方法属于同步方式的调用。第一个参数为Server端JSON-RPC处理器的类名加方法名,第二个参数为提交给远端服务器的数据,第三个参数是服务器的返回数据,必须是指针。

    异步调用的使用

    // 传给远程的对象
    saveObj := RpcObj{i, "对象"}
    
    // 异步的请求数据
    divCall := clientRpc.Go("ServerHandler.SaveName", saveObj, &ReplyObj{}, nil)
    
    // 在一个新的goroutine中异步获取远程的返回数据,并不阻塞当前的goroutine
    go func() {
        reply := <-divCall.Done // 取出远程返回的数据
    }()
    

    Call方法属于同步方式的调用。第一个参数为Server端JSON-RPC处理器的类名加方法名,第二个参数为提交给远端服务器的数据,第三个参数是服务器的返回数据,必须是指针,第四个参数为一个通道,可以留空,留空的话它会帮忙建一个,并保存在divCall中。

    net/rpc/jsonrpc/client会把方法名与参数自动序列化为json格式,其结构如开头所述的JSON-RPC结构一样,并自动为JSON-RPC中的id赋值。而服务端返回的对象也会被正确的反序列化。

    rpc client端大致的处理流程

    golang_jsonrpc_client


    完整的程序

    package main
    
    import (
        "net/rpc"
        "net/rpc/jsonrpc"
        "net"
        "log"
    )
    
    // 需要传输的对象
    type RpcObj struct {
        Id   int `json:"id"` // struct标签, 如果指定,jsonrpc包会在序列化json时,将该聚合字段命名为指定的字符串
        Name string `json:"name"`
    }
    
    // 需要传输的对象
    type ReplyObj struct {
        Ok  bool `json:"ok"`
        Id  int `json:"id"`
        Msg string `json:"msg"`
    }
    
    // server端的rpc处理器
    type ServerHandler struct {}
    
    // server端暴露的rpc方法
    func (serverHandler ServerHandler) GetName(id int, returnObj *RpcObj) error {
        log.Println("server	-", "recive GetName call, id:", id)
        returnObj.Id = id
        returnObj.Name = "名称1"
        return nil
    }
    
    // server端暴露的rpc方法
    func (serverHandler ServerHandler) SaveName(rpcObj *RpcObj, returnObj *ReplyObj) error {
        log.Println("server	-", "recive SaveName call, RpcObj:", rpcObj)
        returnObj.Ok = true
        returnObj.Id = rpcObj.Id
        returnObj.Msg = "存储成功"
        return nil
    }
    
    // 开启rpc服务器
    func startServer() {
        // 新建Server
        server := rpc.NewServer()
    
        // 开始监听,使用端口 8888
        listener, err := net.Listen("tcp", ":8888")
        if err != nil {
            log.Fatal("server	-", "listen error:", err.Error())
        }
        defer listener.Close()
    
        log.Println("server	-", "start listion on port 8888")
    
        // 新建处理器
        serverHandler := &ServerHandler{}
    
        // 注册处理器
        server.Register(serverHandler)
    
        // 等待并处理链接
        for {
            conn, err := listener.Accept()
            if err != nil {
                log.Fatal(err.Error())
            }
    
            // 在goroutine中处理请求
            // 绑定rpc的编码器,使用http connection新建一个jsonrpc编码器,并将该编码器绑定给http处理器
            go server.ServeCodec(jsonrpc.NewServerCodec(conn))
        }
    }
    
    // 客户端以同步的方式向rpc服务器请求
    func callRpcBySynchronous() {
        // 连接至服务器
        client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超时时间
        if err != nil {
            log.Fatal("client	-", err.Error())
        }
    
        defer client.Close()
    
        // 建立rpc通道
        clientRpc := jsonrpc.NewClient(client)
    
    
        // 远程服务器返回的对象
        var rpcObj RpcObj
        log.Println("client	-", "call GetName method")
        // 请求数据,rpcObj对象会被填充
        clientRpc.Call("ServerHandler.GetName", 1, &rpcObj)
        log.Println("client	-", "recive remote return", rpcObj)
    
    
        // 远程返回的对象
        var reply ReplyObj
    
        // 传给远程服务器的对象参数
        saveObj := RpcObj{2, "对象2"}
    
        log.Println("client	-", "call SetName method")
        // 请求数据
        clientRpc.Call("ServerHandler.SaveName", saveObj, &reply)
    
        log.Println("client	-", "recive remote return", reply)
    }
    
    // 客户端以异步的方式向rpc服务器请求
    func callRpcByAsynchronous() {
        // 打开链接
        client, err := net.DialTimeout("tcp", "localhost:8888", 1000*1000*1000*30) // 30秒超时时间
        if err != nil {
            log.Fatal("client	-", err.Error())
        }
        defer client.Close()
    
        // 建立rpc通道
        clientRpc := jsonrpc.NewClient(client)
    
        // 用于阻塞主goroutine
        endChan := make(chan int, 15)
    
        // 15次请求
        for i := 1 ; i <= 15; i++ {
    
            // 传给远程的对象
            saveObj := RpcObj{i, "对象"}
    
            log.Println("client	-", "call SetName method")
            // 异步的请求数据
            divCall := clientRpc.Go("ServerHandler.SaveName", saveObj, &ReplyObj{}, nil)
    
            // 在一个新的goroutine中异步获取远程的返回数据
            go func(num int) {
                reply := <-divCall.Done
                log.Println("client	-", "recive remote return by Asynchronous", reply.Reply)
                endChan <- num
            }(i)
        }
    
        // 15个请求全部返回时此方法可以退出了
        for i := 1 ; i <= 15; i++ {
            _ = <-endChan
        }
    
    }
    
    func main() {
        go startServer()
        callRpcBySynchronous()
        callRpcByAsynchronous()
    }
    

    总结

    在使用net/rpc/jsonrpc时遇到这样一个问题:

    有多个client与一个server进行rpc调用,而这些client又处于不同的内网,在server端需要获取client端的公网IP。

    按照net/rpc的实现,在服务端处理器的自定义方法中只能获取被反序列化的数据,其他请求相关信息如client的IP只能在主goroutine的net.Listener.Accept中的Conn对象取得。

    按源码中的示例,每接收一个TCP请求都会在一个新的goroutine中处理,但是处理器的自定义方法都运行在不同的goroutine中,这些回调的方法没有暴露任何能获取conn的字段、方法。

    我是这样解决的,在server端rpc处理器struct中放一个聚合字段,用于存储ip地址的。

    处理器被注册与rpc server,全局只有一个,在每次接受到tcp请求后,开启一个goroutine,然后在goroutine内部立即加上排斥锁,然后再把请求的conn绑定给rpc server处理器,这样,即能保证handler字段的线程安全,又能及时的相应client的请求。

    ....
    ....
    ....
    mutex := &sync.Mutex{}
    
    // 等待链接
    for {
        // 相应请求
        conn, err := listener.Accept()
        if err != nil {
            log.Println(err.Error())
        }
    
        // 开启一个goroutine来处理请求,紧接着等待下一个请求。
        go func() {
            // 应用排斥锁
            mutex.Lock()
            // 记录ip地址
            reciveHandler.Ip = strings.Split(conn.RemoteAddr().String(), ":")[0]
            // 处理JSON-RPC调用
            server.ServeCodec(jsonrpc.NewServerCodec(conn))
            // 解锁
            mutex.Unlock()
        }()
    }
    ....
    ....
    ....
    
     
     
    分类: Go
  • 相关阅读:
    PTA —— 基础编程题目集 —— 函数题 —— 61 简单输出整数 (10 分)
    PTA —— 基础编程题目集 —— 函数题 —— 61 简单输出整数 (10 分)
    练习2.13 不用库函数,写一个高效计算ln N的C函数
    练习2.13 不用库函数,写一个高效计算ln N的C函数
    练习2.13 不用库函数,写一个高效计算ln N的C函数
    迷宫问题 POJ 3984
    UVA 820 Internet Bandwidth (因特网带宽)(最大流)
    UVA 1001 Say Cheese(奶酪里的老鼠)(flod)
    UVA 11105 Semiprime Hnumbers(H半素数)
    UVA 557 Burger(汉堡)(dp+概率)
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/3259267.html
Copyright © 2011-2022 走看看