zoukankan      html  css  js  c++  java
  • golang RPC通信读写超时设置

    golang RPC通信中,有时候就怕读写hang住。

    那是否可以设置读写超时呢?

    1.方案一: 设置连接的读写超时

    1.1 client

    RPC通信基于底层网络通信,可以通过设置connection的读写超时时间,达到RPC读写超时的目的。更多细节可参考golang网络通信超时设置.

    下面以client端的读超时为例,介绍设置方法。

    server端和client端代码如下。
    server
    一个简单的json RPC server。

    package main
    
    import (
        "fmt"
        "log"
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
        "time"
    )
    
    type Counter struct {
        Sum int
    }
    
    func (this *Counter) Add(i int, r *int) error {
        //sleep
        time.Sleep(3*time.Second)
    
        if i < 0 {
            return fmt.Errorf("data format incorrect")
        }
    
        this.Sum += i
        *r = this.Sum
        log.Printf("i: %v
    ", i)
        return nil
    }
    
    func NewJsonRpcSocketServer() {
    
        rpc.Register(new(Counter))
    
        l, err := net.Listen("tcp", ":3333")
        if err != nil {
            log.Printf("Listener tcp err: %s", err)
            return
        }
    
        for {
            log.Println("waiting...")
            conn, err := l.Accept()
            if err != nil {
                log.Printf("accept connection err: %s
    ", conn)
                continue
            }
    
            go jsonrpc.ServeConn(conn)
        }
    
    }
    
    func main() {
        NewJsonRpcSocketServer()
    }
    

    client

    json RPC client。

    连接建立后,会设置connection的读超时时间。

    package main
    
    import (
        "log"
        "net"
        "net/rpc/jsonrpc"
        "time"
    )
    
    
    func main() {
    
         NewJsonRpcSocketClient()
    }
    
    func NewJsonRpcSocketClient() {
        timeout := time.Second*30
        conn, err := net.DialTimeout("tcp", "127.0.0.1:3333", timeout)
        if err != nil {
            log.Printf("create client err:%s
    ", err)
            return
        }
        defer conn.Close()
    
        // timeout
        readAndWriteTimeout := 2*time.Second
        err = conn.SetDeadline(time.Now().Add(readAndWriteTimeout))
        if err != nil {
            log.Println("SetDeadline failed:", err)
        }
    
        client := jsonrpc.NewClient(conn)
    
        var reply int
    
        err = client.Call("Counter.Add", 2, &reply)
        if err != nil {
            log.Println("error:", err, "reply:", reply)
            return
        }
    
        log.Printf("reply: %d, err: %v
    ", reply, err)
    
    }
    
    

    client输出:

    2019/05/12 22:52:57 error: read tcp 127.0.0.1:55226->127.0.0.1:3333: i/o timeout reply: 0
    

    1.2 server

    通常情况下,RPC server端的代码如下:

        server := rpc.NewServer()
        ... ....
    	for {
    		conn, err := l.Accept()
    		if err != nil {
    			log.Println("listener accept fail:", err)
    			time.Sleep(time.Duration(100) * time.Millisecond)
    			continue
    		}
    		
    		// timeout
    		timeout := 10*time.Second
    		conn.SetDeadline(time.Now().Add(timeout))
    		
    		go server.ServeCodec(jsonrpc.NewServerCodec(conn))
    	}
    

    这样,如果设置超时,只有效的影响一次。

    对于server来说,都是多次读写。所以,暂时没有方法设置。

    2.方案二:定时器

    通过定时器的方式,如果RPC调用在指定时间内没有完成,触发定时器,返回超时错误,关闭连接。

    2.1 client端

    func  RpcCall(method string, args interface{}, reply interface{}) error {
        ... ...
    
    	timeout := time.Duration(10 * time.Second)
    	done := make(chan error, 1)
    
    	go func() {
    		err := rpcClient.Call(method, args, reply)
    		done <- err
    	}()
    
    	select {
    	case <-time.After(timeout):
    		log.Printf("[WARN] rpc call timeout %v => %v", rpcClient, RpcServer)
    		rpcClient.Close()
            return fmt.Errorf("timeout")
    	case err := <-done:
    		if err != nil {
    		    rpcClient.Close()
    			return err
    		}
    	}
    
    	return nil
    }
    

    2.2 server端

    无论是gobServerCodec 或者 jsonrpc ,都是实现了ServerCodec接口。

    gobServerCodec文件路径:/usr/local/go/src/net/rpc/server.go

    jsonrpc文件路径:/usr/local/go/src/net/rpc/server.go

    要给server端RPC读写加上超时机制,需要重新定义ServerCodec接口,加上超时的控制。

    // A ServerCodec implements reading of RPC requests and writing of
    // RPC responses for the server side of an RPC session.
    // The server calls ReadRequestHeader and ReadRequestBody in pairs
    // to read requests from the connection, and it calls WriteResponse to
    // write a response back. The server calls Close when finished with the
    // connection. ReadRequestBody may be called with a nil
    // argument to force the body of the request to be read and discarded.
    type ServerCodec interface {
    	ReadRequestHeader(*Request) error
    	ReadRequestBody(interface{}) error
    	// WriteResponse must be safe for concurrent use by multiple goroutines.
    	WriteResponse(*Response, interface{}) error
    
    	Close() error
    }
    
    

    目前,已经有现成的实现方式,可参考Golang标准库RPC实践及改进

    3.参考

    Does RPC have a timeout mechanism?

    net/rpc: expected Timeout based alternatives to functions for rpc.Dial, rpc.DialHTTP, rpc.DialHTTPPath [proposal]. #15236

  • 相关阅读:
    storm学习途径
    Spark中的RDD操作简介
    【转】Hadoop web页面的授权设定
    源码安装ipython,并在ipython中整合spark
    Spark调优
    在Java中调用C
    查看Mysql表空间大小 优化空间
    update A inner join
    kibana 汉化
    xcode-select --install不能下载该软件的解决办法
  • 原文地址:https://www.cnblogs.com/lanyangsh/p/10853903.html
Copyright © 2011-2022 走看看