zoukankan      html  css  js  c++  java
  • go标准库的学习-net/rpc

    参考:https://studygolang.com/pkgdoc

    导入方法:

    import "net/rpc"

    RPC(Remote Procedure Call Protocol)就是想实现函数调用模式的网络化,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。

    客户端就像调用本地函数一样,然后客户端把这些参数打包之后通过网络传给服务端,服务端解包到处理过程中执行,然后执行结果返回给客户端

    运行时一次客户机对服务器的RPC调用步骤有:

    • 调用客户端句柄,执行传送参数
    • 调用本地系统内核发送网络信息
    • 消息传送到远程主机
    • 服务器句柄得到消息并取得参数
    • 执行远程过程
    • 执行的过程将结果返回服务器句柄
    • 服务器句柄返回结果,调用远程系统内核
    • 消息传回本地主机
    • 客户句柄由内核接收消息
    • 客户接收句柄返回的数据

    Go标准包中已经提供了对RPC的支持,支持三个级别的RPC:TCP、HTTP、JSONRPC,下面将一一说明

    Go的RPC包与传统的RPC系统不同,他只支持Go开发的服务器与客户端之间的交互,因为在内部,它们采用了Gob来编码

    Go RPC的函数要满足下面的条件才能够被远程调用,不然会被忽略:

    • 函数必须是导出的,即首字母为大写
    • 必须有两个导出类型的参数
    • 第一个参数是接收的参数,第二个参数是返回给客户端的参数,第二个参数必须是指针类型的
    • 函数还要有一个error类型返回值。方法的返回值,如果非nil,将被作为字符串回传,在客户端看来就和errors.New创建的一样。如果返回了错误,回复的参数将不会被发送给客户端。

    举个例子,正确的RPC函数格式为:

    func (t *T) MethidName(argType T1, replyType *T2) error

    T、T1和T2类型都必须能被encoding/gob包编解码

    任何RPC都需要通过网络来传递数据,Go RPC可以利用HTTP和TCP来传递数据

    Constants

    const (
        // HandleHTTP使用的默认值
        DefaultRPCPath   = "/_goRPC_"
        DefaultDebugPath = "/debug/rpc"
    )

    Variables 

    var DefaultServer = NewServer()

    DefaultServer是*Server的默认实例,本包和Server方法同名的函数都是对其方法的封装。

    type Server

    type Server struct {
        // 内含隐藏或非导出字段
    }

    Server代表RPC服务端。

    func NewServer

    func NewServer() *Server

    NewServer创建并返回一个*Server。

    func (*Server) Register

    func (server *Server) Register(rcvr interface{}) error

    Register在server注册并公布rcvr的方法集中满足如下要求的方法:

    - 方法是导出的
    - 方法有两个参数,都是导出类型或内建类型
    - 方法的第二个参数是指针
    - 方法只有一个error接口类型的返回值

    如果rcvr不是一个导出类型的值,或者该类型没有满足要求的方法,Register会返回错误。Register也会使用log包将错误写入日志。客户端可以使用格式为"Type.Method"的字符串访问这些方法,其中Type是rcvr的具体类型。

    func (*Server) RegisterName

    func (server *Server) RegisterName(name string, rcvr interface{}) error

    RegisterName类似Register,但使用提供的name代替rcvr的具体类型名作为服务名。

    func Register

    func Register(rcvr interface{}) error

    Register在DefaultServer注册并公布rcvr的方法。

    其实就相当于调用NewServer函数生成一个*Server,然后再调用其的(*Server) Register函数

    func HandleHTTP

    func HandleHTTP()

    HandleHTTP函数注册DefaultServer的RPC信息HTTP处理器对应到DefaultRPCPath,和DefaultServer的debug处理器对应到DefaultDebugPath。HandleHTTP函数会注册到http.DefaultServeMux。之后,仍需要调用http.Serve(),一般会另开线程:"go http.Serve(l, nil)"

    其实就相当于调用NewServer函数生成一个*Server,然后再调用其的(*Server) HandleHTTP函数

    func DialHTTP

    func DialHTTP(network, address string) (*Client, error)

    DialHTTP在指定的网络和地址与在默认HTTP RPC路径监听的HTTP RPC服务端连接。

    func DialHTTPPath

    func DialHTTPPath(network, address, path string) (*Client, error)

    DialHTTPPath在指定的网络、地址和路径与HTTP RPC服务端连接。

    上面两个函数都是通过HTTP的方式和服务器建立连接,之间的区别之在于是否设置上下文路径。

    type Client

    type Client struct {
        codec ClientCodec
    
        reqMutex sync.Mutex // protects following
        request  Request
    
        mutex    sync.Mutex // protects following
        seq      uint64
        pending  map[uint64]*Call
        closing  bool // user has called Close
        shutdown bool // server has told us to stop
    }

    Client类型代表RPC客户端。同一个客户端可能有多个未返回的调用,也可能被多个go程同时使用。

    func (*Client) Call

    func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error

    Call调用指定的方法,等待调用返回,将结果写入reply,然后返回执行的错误状态。

    有三个参数,第一个要写调用函数的名字,第二个是要传递的参数,第三个是要返回的参数(这个注意是指针类型)

    举例:

    HTTP RPC

    利用HTTP的好处是可以直接复用net/http中的一些函数,下面举例说明:

    服务端:

    package main
    
    import (
        "fmt"
        "net/http"
        "net/rpc"
        "errors"
    )
    
    type Args struct{
        A, B int
    }
    
    type Quotient struct{
        Quo, Rem int
    }
    
    type Arith int
    
    func (t *Arith) Multiply(args *Args, reply *int) error{
        *reply = args.A * args.B
        return nil
    }
    
    func (t *Arith) Divide(args *Args, quo *Quotient) error{
        if args.B == 0{
            return errors.New("divide by zero")
        }
        quo.Quo = args.A / args.B
        quo.Rem = args.A % args.B
        return nil
    }
    
    func main() {
        arith := new(Arith)
        rpc.Register(arith)
        rpc.HandleHTTP()
    
        err := http.ListenAndServe(":1234", nil)
        if err != nil{
            fmt.Println(err.Error())
        }
    }

    客户端:

    package main
    
    import (
        "fmt"
        "net/rpc"
        "log"
        "os"
    )
    type Args struct{
        A, B int
    }
    
    type Quotient struct{
        Quo, Rem int
    }
    
    func main() {
        if len(os.Args) != 2{
            fmt.Println("Usage: ", os.Args[0], "server")
            os.Exit(1)
        }
        serverAddress := os.Args[1]
    
        client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
        if err != nil{
            log.Fatal("dialing : ", err)
        }
    
        //Synchronous call
        args := Args{17, 8}
        var reply int
        err = client.Call("Arith.Multiply", args, &reply)
        if err != nil{
            log.Fatal("arith error : ", err)
        }
        fmt.Printf("Arith: %d*%d = %d 
    ", args.A, args.B, reply)
    
        var quot Quotient
        err = client.Call("Arith.Divide", args, &quot)
        if err != nil{
            log.Fatal("arith error : ", err)
        }
        fmt.Printf("Arith: %d/%d = %d remainder %d
    ", args.A, args.B, quot.Quo, quot.Rem)   
    }

    客户端返回:

    userdeMBP:go-learning user$ go run test.go
    Usage:  /var/folders/2_/g5wrlg3x75zbzyqvsd5f093r0000gn/T/go-build438875911/b001/exe/test server
    exit status 1
    userdeMBP:go-learning user$ go run test.go 127.0.0.1
    Arith: 17*8 = 136 
    Arith: 17/8 = 2 remainder 1

    TCP RPC连接

    func Dial

    func Dial(network, address string) (*Client, error)

    Dial在指定的网络和地址与RPC服务端连接。

    func (*Server) ServeConn

    func (server *Server) ServeConn(conn io.ReadWriteCloser)

    ServeConn在单个连接上执行server。ServeConn会阻塞,服务该连接直到客户端挂起。调用者一般应另开线程调用本函数:"go server.ServeConn(conn)"。ServeConn在该连接使用gob(参见encoding/gob包)有线格式。要使用其他的编解码器,可调用ServeCodec方法。

    func ServeConn

    func ServeConn(conn io.ReadWriteCloser)

    ServeConn在单个连接上执行DefaultServer。ServeConn会阻塞,服务该连接直到客户端挂起。调用者一般应另开线程调用本函数:"go ServeConn(conn)"。ServeConn在该连接使用gob(参见encoding/gob包)有线格式。要使用其他的编解码器,可调用ServeCodec方法。

    其实就相当于调用NewServer函数生成一个*Server,然后再调用其的(*Server) ServeConn函数

    JSON RPC连接

    服务端:

    package main
    
    import (
        "fmt"
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
        "errors"
        "os"
    )
    
    type Args struct{
        A, B int
    }
    
    type Quotient struct{
        Quo, Rem int
    }
    
    type Arith int
    
    func (t *Arith) Multiply(args *Args, reply *int) error{
        *reply = args.A * args.B
        return nil
    }
    
    func (t *Arith) Divide(args *Args, quo *Quotient) error{
        if args.B == 0{
            return errors.New("divide by zero")
        }
        quo.Quo = args.A / args.B
        quo.Rem = args.A % args.B
        return nil
    }
    
    func main() {
        arith := new(Arith)
        rpc.Register(arith)
    
        tcpAddr, err := net.ResolveTCPAddr("tcp", ":1234")//jsonrpc是基于TCP协议的,现在他还不支持http协议
        if err != nil{
            fmt.Println(err.Error())
            os.Exit(1)
        }
    
        listener, err := net.ListenTCP("tcp", tcpAddr)
        if err != nil{
            fmt.Println(err.Error())
            os.Exit(1)
        }
        for{
            conn, err := listener.Accept()
            if err != nil{
                continue
            }
            jsonrpc.ServeConn(conn)
        }    
    }

    客户端:

    package main
    
    import (
        "fmt"
        "net/rpc/jsonrpc"
        "log"
        "os"
    )
    type Args struct{
        A, B int
    }
    
    type Quotient struct{
        Quo, Rem int
    }
    
    func main() {
        if len(os.Args) != 2{
            fmt.Println("Usage: ", os.Args[0], "server:port")
            os.Exit(1)
        }
        service := os.Args[1]
    
        client, err := jsonrpc.Dial("tcp", service)
        if err != nil{
            log.Fatal("dialing : ", err)
        }
    
        //Synchronous call
        args := Args{17, 8}
        var reply int
        err = client.Call("Arith.Multiply", args, &reply)
        if err != nil{
            log.Fatal("arith error : ", err)
        }
        fmt.Printf("Arith: %d*%d = %d 
    ", args.A, args.B, reply)
    
        var quot Quotient
        err = client.Call("Arith.Divide", args, &quot)
        if err != nil{
            log.Fatal("arith error : ", err)
        }
        fmt.Printf("Arith: %d/%d = %d remainder %d
    ", args.A, args.B, quot.Quo, quot.Rem)   
    }

    客户端返回:

    userdeMBP:go-learning user$ go run test.go 127.0.0.1:1234
    Arith: 17*8 = 136 
    Arith: 17/8 = 2 remainder 1

    type Request

    type Request struct {
        ServiceMethod string // 格式:"Service.Method"
        Seq           uint64 // 由客户端选择的序列号
        // 内含隐藏或非导出字段
    }

    Request是每个RPC调用请求的头域。它是被内部使用的,这里的文档用于帮助debug,如分析网络拥堵时。

    type Response

    type Response struct {
        ServiceMethod string // 对应请求的同一字段
        Seq           uint64 // 对应请求的同一字段
        Error         string // 可能的错误
        // 内含隐藏或非导出字段
    }

    Response是每个RPC调用回复的头域。它是被内部使用的,这里的文档用于帮助debug,如分析网络拥堵时。

    type ClientCodec

    type ClientCodec interface {
        // 本方法必须能安全的被多个go程同时使用
        WriteRequest(*Request, interface{}) error
        ReadResponseHeader(*Response) error
        ReadResponseBody(interface{}) error
        Close() error
    }

    ClientCodec接口实现了RPC会话的客户端一侧RPC请求的写入和RPC回复的读取。客户端调用WriteRequest来写入请求到连接,然后成对调用ReadRsponseHeader和ReadResponseBody以读取回复。客户端在结束该连接的事务时调用Close方法。ReadResponseBody可以使用nil参数调用,以强制回复的主体被读取然后丢弃。

    func NewClient

    func NewClient(conn io.ReadWriteCloser) *Client

    NewClient返回一个新的Client,以管理对连接另一端的服务的请求。它添加缓冲到连接的写入侧,以便将回复的头域和有效负载作为一个单元发送。

    func NewClientWithCodec

    func NewClientWithCodec(codec ClientCodec) *Client

    NewClientWithCodec类似NewClient,但使用指定的编解码器,以编码请求主体和解码回复主体。

    NewClient使用默认编码gobClientCodec,NewClientWithCodec使用自定义的其它编码。

    默认的gobClientCodec代码为:

    type gobClientCodec struct {
        rwc    io.ReadWriteCloser
        dec    *gob.Decoder
        enc    *gob.Encoder
        encBuf *bufio.Writer
    }
    
    //指定客户端将以什么样的编码方式将信息发送给服务端,在调用Go和Call函数是会自动使用WriteRequest方法发送信息 func (c
    *gobClientCodec) WriteRequest(r *Request, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { //Encode方法将request r编码后发送 return } if err = c.enc.Encode(body); err != nil {//Encode方法将body编码后发送 return } return c.encBuf.Flush() } func (c *gobClientCodec) ReadResponseHeader(r *Response) error { return c.dec.Decode(r)////Decode从输入流读取下一个之并将该值存入response r } func (c *gobClientCodec) ReadResponseBody(body interface{}) error { return c.dec.Decode(body) //Decode从输入流读取下一个之并将该值存入body } func (c *gobClientCodec) Close() error { return c.rwc.Close() }

    创建Client时将调用默认成对调用ReadRsponseHeader和ReadResponseBody以读取回复

    举例自定义编码:

    package main
    
    import (
        "log"
        "net/rpc"
        // "errors"
        "fmt"
    )
    type shutdownCodec struct {
        responded chan int
        closed    bool
    }
    
    func (c *shutdownCodec) WriteRequest(*rpc.Request, interface{}) error {//这是client用来发送请求的方法
        fmt.Println("call WriteRequest")
        return nil 
    }
    func (c *shutdownCodec) ReadResponseBody(interface{}) error{
        fmt.Println("call ReadResponseBody")
        return nil 
    }
    func (c *shutdownCodec) ReadResponseHeader(*rpc.Response) error {
        c.responded <- 1 //如果注释掉这里,则会一直卡在"wait response : "
        return nil
        // return errors.New("shutdownCodec ReadResponseHeader") //如果返回的是error,那么就不会去调用ReadResponseBody了
    }
    func (c *shutdownCodec) Close() error {
        c.closed = true
        return nil
    }
    
    func main() {
        codec := &shutdownCodec{responded: make(chan int)}
        client := rpc.NewClientWithCodec(codec)
        fmt.Println("wait response : ") //从返回结果可以看出来,NewClientWithCodec后会自动成对调用ReadResponseBody和ReadResponseHeader
        fmt.Println(<-codec.responded)
        fmt.Println(codec.closed) //false
        client.Close()
        if !codec.closed {
            log.Fatal("client.Close did not close codec")
        }
        fmt.Println(codec.closed) //true
    }

     

    type ServerCodec

    type ServerCodec interface {
        ReadRequestHeader(*Request) error
        ReadRequestBody(interface{}) error
        // 本方法必须能安全的被多个go程同时使用
        WriteResponse(*Response, interface{}) error
        Close() error
    }

    ServerCodec接口实现了RPC会话的服务端一侧RPC请求的读取和RPC回复的写入。服务端通过成对调用方法ReadRequestHeader和ReadRequestBody从连接读取请求,然后调用WriteResponse来写入回复。服务端在结束该连接的事务时调用Close方法。ReadRequestBody可以使用nil参数调用,以强制请求的主体被读取然后丢弃。

    默认的gobServerCodec:

    type gobServerCodec struct {
        rwc    io.ReadWriteCloser
        dec    *gob.Decoder
        enc    *gob.Encoder
        encBuf *bufio.Writer
        closed bool
    }
    
    func (c *gobServerCodec) ReadRequestHeader(r *Request) error {
        return c.dec.Decode(r) //解码并读取client发来的请求request
    }
    
    func (c *gobServerCodec) ReadRequestBody(body interface{}) error {
        return c.dec.Decode(body) //解码并读取client发来的请求body
    }
    
    //指定服务端将会以什么样的编码方式将数据返回给客户端,在调用ServeRequest和ServeCodec方法时会自动调用该WriteResponse函数 func (c
    *gobServerCodec) WriteResponse(r *Response, body interface{}) (err error) { if err = c.enc.Encode(r); err != nil { //对响应信息request编码 if c.encBuf.Flush() == nil { // Gob couldn't encode the header. Should not happen, so if it does, // shut down the connection to signal that the connection is broken. log.Println("rpc: gob error encoding response:", err) c.Close() } return } if err = c.enc.Encode(body); err != nil {//对响应信息body编码 if c.encBuf.Flush() == nil { // Was a gob problem encoding the body but the header has been written. // Shut down the connection to signal that the connection is broken. log.Println("rpc: gob error encoding body:", err) c.Close() } return } return c.encBuf.Flush() } func (c *gobServerCodec) Close() error { if c.closed { // Only call c.rwc.Close once; otherwise the semantics are undefined. return nil } c.closed = true return c.rwc.Close() }

    func (*Server) ServeCodec

    func (server *Server) ServeCodec(codec ServerCodec)

    ServeCodec类似ServeConn,但使用指定的编解码器,以编码请求主体和解码回复主体。

    func (*Server) ServeRequest

    func (server *Server) ServeRequest(codec ServerCodec) error

    ServeRequest类似ServeCodec,但异步的服务单个请求。它不会在调用结束后关闭codec。

    下面两个函数的不同在于他们使用在DefaultServer上:

    func ServeCodec

    func ServeCodec(codec ServerCodec)

    ServeCodec类似ServeConn,但使用指定的编解码器,以编码请求主体和解码回复主体。

    func ServeRequest

    func ServeRequest(codec ServerCodec) error

    ServeRequest类似ServeCodec,但异步的服务单个请求。它不会在调用结束后关闭codec。

    func (*Client) Close

    func (client *Client) Close() error

    func (*Server) Accept

    func (server *Server) Accept(lis net.Listener)

    Accept接收监听器l获取的连接,然后服务每一个连接。Accept会阻塞,调用者应另开线程:"go server.Accept(l)"

    举例:

    package main
    
    import (
        "log"
        "net"
        "net/rpc"
        "strings"
        "fmt"
    )
    type R struct {
        // Not exported, so R does not work with gob.
        // 所以这样运行的话会报错rpc: gob error encoding body: gob: type main.R has no exported fields
        // msg []byte 
        Msg []byte //改成这样
    }
    
    type S struct{}
    
    func (s *S) Recv(nul *struct{}, reply *R) error {
        *reply = R{[]byte("foo")}
        return nil
    }
    
    func main() {
        defer func() {
            err := recover()
            if err == nil {
                log.Fatal("no error")
            }
            if !strings.Contains(err.(error).Error(), "reading body EOF") {
                log.Fatal("expected `reading body EOF', got", err)
            }
        }()
        //服务端
        rpc.Register(new(S))
    
        listen, err := net.Listen("tcp", "127.0.0.1:1234")//端口为0表示任意端口
        if err != nil {
            panic(err)
        }
        go rpc.Accept(listen) //必须用并发监听,因为客户端和服务端写在了一起
    
        //客户端
        client, err := rpc.Dial("tcp", listen.Addr().String())
        if err != nil {
            panic(err)
        }
    
        var reply R
        err = client.Call("S.Recv", &struct{}{}, &reply)
        if err != nil {
            panic(err)
        }
    
        fmt.Printf("%q
    ", reply)
        
        client.Close()
        listen.Close()
        
    }

    返回:

    userdeMBP:go-learning user$ go run test.go
    {"foo"}
    2019/02/28 15:05:07 rpc.Serve: accept:accept tcp 127.0.0.1:1234: use of closed network connection
    2019/02/28 15:05:07 no error
    exit status 1

    type Call

    type Call struct {
        ServiceMethod string      // 调用的服务和方法的名称
        Args          interface{} // 函数的参数(下层为结构体指针)
        Reply         interface{} // 函数的回复(下层为结构体指针)
        Error         error       // 在调用结束后,保管错误的状态
        Done          chan *Call  // 对其的接收操作会阻塞,直到远程调用结束
    }

    Call类型代表一个执行中/执行完毕的RPC会话。

    func (*Client) Go

    func (client *Client) Go(serviceMethod string, args interface{}, reply interface{}, done chan *Call) *Call

    Go异步的调用函数。本方法Call结构体类型指针的返回值代表该次远程调用。通道类型的参数done会在本次调用完成时发出信号(通过返回本次Go方法的返回值)。如果done为nil,Go会申请一个新的通道(写入返回值的Done字段);如果done非nil,done必须有缓冲,否则Go方法会故意崩溃。

    举例:

    使用的仍是上面TCP RPC的服务端,客户端改为:

    package main
    
    import (
        "fmt"
        "net/rpc"
        "log"
        "os"
    )
    type Args struct{
        A, B int
    }
    
    type Quotient struct{
        Quo, Rem int
    }
    
    func main() {
        if len(os.Args) != 2{
            fmt.Println("Usage: ", os.Args[0], "server:port")
            os.Exit(1)
        }
        service := os.Args[1]
    
        client, err := rpc.Dial("tcp", service)
        if err != nil{
            log.Fatal("dialing : ", err)
        }
    
        //Synchronous call
        args := Args{17, 8}
        var reply int
        done := make(chan *rpc.Call, 1)
    
        call := client.Go("Arith.Multiply", args, &reply, done) //异步调用,只有当该方法执行完毕后done的值才不为nil
        if call.Error != nil {
            log.Fatal(err)
        }
    
        
        if resultCall := <-done; resultCall != nil {//如果不<-done,reply将不会有结果,reply将为0
            fmt.Printf("Arith: %d*%d = %d 
    ", args.A, args.B, reply)
            fmt.Printf("done : %#v
    ", resultCall)
            fmt.Printf("Multiply result : %#v
    ", *(resultCall.Reply.(*int)))//根据Call的Reply得到返回的值
        }
        
        var quot Quotient
        err = client.Call("Arith.Divide", args, &quot)
        if err != nil{
            log.Fatal("arith error : ", err)
        }
        fmt.Printf("Arith: %d/%d = %d remainder %d
    ", args.A, args.B, quot.Quo, quot.Rem)   
    
    }

    返回:

    userdeMBP:go-learning user$ go run test.go 127.0.0.1:1234
    Arith: 17*8 = 136 
    done : &rpc.Call{ServiceMethod:"Arith.Multiply", Args:main.Args{A:17, B:8}, Reply:(*int)(0xc000012100), Error:error(nil), Done:(chan *rpc.Call)(0xc0001142a0)}
    Multiply result : 136
    Arith: 17/8 = 2 remainder 1
  • 相关阅读:
    20180925-6 四则运算试题生成
    20180925-7 规格说明书-吉林市2日游
    20180925-5 代码规范,结对要求
    20180925-2 功能测试
    20180918-2 每周例行报告
    互评Alpha版本
    2017软工 — 每周PSP
    王者荣耀交流协会 — Alpha阶段中间产物
    王者荣耀交流协会第四次Scrum立会
    2017秋软工 —— 本周PSP
  • 原文地址:https://www.cnblogs.com/wanghui-garcia/p/10451028.html
Copyright © 2011-2022 走看看