zoukankan      html  css  js  c++  java
  • Golang实现RPC

    RPC

    一、简介

    远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议 ;

    该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额 外地为这个交互作用编程 ;

    如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方 法调用 ;

    二、Golang中实现RPC

    Golang中实现RPC非常简单,官方提供了封装好的库,还有一些第三方的库,官方的net/rpc库使用encoding/gob进行编码,所以Golang的RPC只支持Golang开发的服务器和客户端之间交互。

    官方还提供了net/rpc/jsonrpc库来实现RPC方法,jsonrpc采用了JSON进行数据编码解码,因而支持跨语言调用,目前jsonrpc库是基于tcp协议实现的,暂不支持http传输方式。

    • Golang的RPC必须符合四个条件才可以:
      • 结构体首字母必须大写,要跨域访问,所以大写
      • 函数名首字母必须大写
      • 函数第一个参数是接收参数,第二个参数是返回个客户端的参数,必须是指针类型
      • 函数有一个返回值error

    示例

    1、Golang实现RPC程序,实现求矩形面积和周长。

    • 服务端
    package main
    
    import (
    	"fmt"
    	"net/http"
    	"net/rpc"
    )
    
    // 服务端求矩形面积和周长
    
    // 声明矩形对象
    type Rect struct{}
    
    // 声明参数结构体,字段首字母大写
    type Params struct{
    	Width, Height int
    }
    
    // 定义求矩形面积的方法
    func (r *Rect)Area(p Params,ret *int) error {
    	*ret = p.Width * p.Height
    	return nil
    }
    
    // 定义求矩形周长的方法
    func (r *Rect) Perimeter(p Params, ret *int)error{
    	*ret = (p.Width + p.Height) * 2
    	return nil
    }
    
    func main() {
    	// 注册服务
    	rect := new(Rect)
    	rpc.Register(rect)
    	// 把服务处理绑定到http协议上
    	rpc.HandleHTTP()
    	// 监听服务,等待客户端调用求面试和周长的方法
    	if err := http.ListenAndServe(":8080",nil); err != nil {
    		fmt.Println(err)
    	}
    }
    
    • 客户端
    package main
    
    import (
    	"fmt"
    	"log"
    	"net/rpc"
    )
    
    type Params struct{
    	Width,Height int
    }
    
    // 调用服务
    func main() {
    	// 连接远程rpc服务
    	client, err := rpc.DialHTTP("tcp","127.0.0.1:8080")
    	if err != nil {
    		fmt.Println(err)
    	}
    	// 调用远程的方法
    	// 定义接受服务端传回来的计算结果变量
    	ret := 0
    	// 求面积
    	if err = client.Call("Rect.Area",Params{50,100},&ret); err != nil {
    		log.Fatal(err)
    	}
    	fmt.Println("面积:", ret)
    
    	// 求周长
    	if err = client.Call("Rect.Perimeter",Params{50,100},&ret);err !=nil{
    		log.Fatal(err)
    	}
    	fmt.Println("周长:",ret)
    }
    

    2 、服务端接收两个参数,可以做乘法运算,也可以做商和余数的运算,客户端进行传参和访问,得到结果如下:

    ​ 9 * 2 =18

    ​ 9 / 2, 商 = 4, 余数 = 1

    • 服务端
    package main
    
    import (
    	"errors"
    	"fmt"
    	"log"
    	"net"
    	"net/rpc"
    	"net/rpc/jsonrpc"
    )
    
    // 声明算术运算的结构体
    type Arith struct{}
    
    // 声明接收参数的结构体
    type ArithRequest struct{
    	A, B int
    }
    
    // 声明返回客户端参数结构体
    type ArithResponse struct{
    	Pro  int  // 乘积
    	Quo  int  // 商
    	Rem  int // 余数
    }
    
    
    // 乘法运算
    func (a *Arith) Multiply(req ArithRequest, resp *ArithResponse)error{
    	resp.Pro = req.A * req.B
    	fmt.Println(resp.Pro)
    	return nil
    }
    
    // 商和余数
    func (a *Arith) Divide(req ArithRequest, resp *ArithResponse) error {
    	if req.B == 0 {
    		return errors.New("除数不能为0")
    	}
    	resp.Quo = req.A / req.B
    	resp.Rem = req.A % req.B
    	return nil
    }
    
    // jsonRPC编码方式
    func main() {
    	// 注册服务
    	rpc.Register(new(Arith))
    	// 监听
    	lis,err := net.Listen("tcp","127.0.0.1:8080")
    	if err != nil {
    		log.Fatal(err)
    	}
    	// 循环监听服务
    	for {
    		conn, err := lis.Accept()
    		if err != nil{
    			continue
    		}
    		go func(conn net.Conn) {
    			fmt.Println("a new Client")
    			jsonrpc.ServeConn(conn)
    		}(conn)
    	}
    }
    
    • 客户端
    package main
    
    import (
    	"fmt"
    	"log"
    	"net/rpc/jsonrpc"
    )
    
    // 声明请求参数的结构体
    type ArithRequest struct{
    	A, B int
    }
    
    // 声明响应的结构体
    type ArithResponse struct{
    	Pro int
    	Quo int
    	Rem int
    }
    
    func main() {
    	// 连接远程的rpc
    	conn, err := jsonrpc.Dial("tcp","127.0.0.1:8080")
    	if err != nil{
    		log.Fatal(err)
    	}
    	req := ArithRequest{
    		A: 9,
    		B: 2,
    	}
    	resp := ArithResponse{}
    	
    	// 商和余数
    	if err = conn.Call("Arith.Divide",req,&resp); err != nil {
    		log.Fatal(err)
    	}
    	// 乘法
    	if err = conn.Call("Arith.Multiply", req, &resp); err != nil {
    		log.Fatal(err)
    	}
    
    	fmt.Printf("%d 和 %d, 积=%d, 商=%d,余数=%d
    ",req.A,req.B,resp.Pro,resp.Quo,resp.Rem)
    }
    

    三、RPC调用流程

    微服务架构下数据交互一般都是对内RPC,对外REST。

    将业务按功能模块拆分到各个微服务,具有提高项目协作效率,降低模块耦合度,提高系统系统可用性等优点,但是开发门槛较高,比如RPC框架的使用,后期服务监控等工作。

    一般情况下,我们会将功能代码在本地直接调用,微服务架构下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用。

    四、网络传输数据格式

    成熟的RPC框架会有自定义传输协议,这里网络传输格式定义如下,前面是固定长度消息头,后面是变长消息体。

    • 编写连接会话
    package rpc
    
    import (
    	"encoding/binary"
    	"io"
    	"net"
    )
    
    // 编写会话中数据读写
    
    // 会话连接的结构体
    type Session struct {
    	conn net.Conn
    }
    
    
    // 创建新的连接
    func NewSession(conn net.Conn) *Session{
    	return &Session{
    		conn: conn,
    	}
    }
    
    // 向连接中写入数据
    func (s *Session) Write(data []byte)error{
    	// 4字节头+数据长度的切片
    	buf := make([]byte, 4 + len(data))
    	// 写入头部数据,记录数据长度
    	// binary只认固定长度的类型,所以使用了uint32,而不是直接写入
    	binary.BigEndian.PutUint32(buf[:4],uint32(len(data)))
    	// 写入数据
    	copy(buf[4:],data)
    	_, err := s.conn.Write(buf)
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    // 从连接中读数据
    func (s *Session)Read()([]byte, error){
    	// 读取头部长度
    	header := make([]byte,4)
    	// 按头部长度,读取头部数据
    	_, err := io.ReadFull(s.conn, header)
    	if err != nil {
    		return nil, err
    	}
    	// 读取数据长度
    	dataLen := binary.BigEndian.Uint32(header)
    	// 按数据长度去读取数据
    	data := make([]byte,dataLen)
    	_, err = io.ReadFull(s.conn, data)
    	if err != nil{
    		return nil ,err
    	}
    	return data, nil
    }
    
    • 测试会话读写
    package rpc
    
    import (
    	"fmt"
    	"net"
    	"sync"
    	"testing"
    )
    
    // 测试读写
    
    func TestSession_ReadWrite(t *testing.T) {
    	// 定义监听的ip和端口
    	addr := "127.0.0.1:8000"
    	// 定义传输的数据
    	my_data := "hello"
    	// 等待组
    	wg := sync.WaitGroup{}
    	// 协程 一个读,一个写
    	wg.Add(2)
    	// 写数据
    	go func(){
    		defer wg.Done()
    		// 创建tcp连接
    		lis, err := net.Listen("tcp",addr)
    		if err != nil{
    			t.Fatal(err)
    		}
    		conn, _ := lis.Accept()
    		s := Session{conn: conn}
    		// 写数据
    		if err := s.Write([]byte(my_data)); err != nil {
    			t.Fatal(err)
    		}
    	}()
    	// 读数据
    	go func() {
    		defer wg.Done()
    		conn, err := net.Dial("tcp",addr)
    		if err != nil {
    			t.Fatal(err)
    		}
    		s := Session{conn: conn}
    		// 读数据
    		data, err := s.Read()
    		if err != nil{
    			t.Fatal(err)
    		}
    		if string(data) != my_data{
    			t.Fatal(err)
    		}
    		fmt.Println(string(data))
    	}()
    	wg.Wait()
    }
    
    • 编写编解码
    package rpc
    
    import (
    	"bytes"
    	"encoding/gob"
    )
    
    // 定义数据格式和编解码
    
    // 定义RPC交互的数据格式
    
    type RPCData struct{
    	// 访问的函数
    	Name string
    	// 访问时穿的参数
    	Args []interface{}
    }
    
    // 编码
    func encode(data RPCData)([]byte,error){
    	var buf bytes.Buffer
    	// 得到字节数据的编码器
    	bufEnc := gob.NewEncoder(&buf)
    	// 对数据进行编码
    	if err := bufEnc.Encode(data); err != nil{
    		return nil, err
    	}
    	return buf.Bytes(),nil
    }
    
    
    // 解码
    func decode(b []byte)(RPCData,error){
    	buf := bytes.NewBuffer(b)
    	// 返回字节数组解码器
    	bufDec := gob.NewDecoder(buf)
    	var data RPCData
    	// 对数据解码
    	if err := bufDec.Decode(&data); err != nil {
    		return data, err
    	}
    	return data,nil
    }
    

    五、实现RPC服务端

    1、服务端接收到的数据需要包括什么?

    • 调用的函数名,参数列表
    • 一般会约定函数的第二个返回值是error类型
    • 通过反射实现

    2、服务端需要解决的问题是什么?

    • Client调用时只传过来函数名,需要维护函数名到函数之间的map映射

    3、服务端的核心功能有哪些?

    • 维护函数名导函数反射值的map
    • client端传函数名,参数列表后,服务端要解析为反反射值,调用执行
    • 函数的返回值打包,并通过网络返回个客户端
    package rpc
    
    import (
    	"fmt"
    	"net"
    	"reflect"
    )
    
    // 声明服务端
    type Server struct {
    	// 地址
    	addr string
    	// 服务端维护的函数名到函数反射值的map
    	funcs map[string]reflect.Value
    }
    
    // 创建服务端对象
    func NewServer(addr string)*Server{
    	return &Server{
    		addr: addr,
    		funcs: make(map[string]reflect.Value),
    	}
    }
    
    // 服务端绑定注册方法
    // 将函数名与函数真正实现对应起来
    // 第一个参数为函数名,第二个传入真正的函数
    func (s *Server) Register (rpcName string, f interface{}){
    	if _, ok := s.funcs[rpcName]; ok {
    		return
    	}
    	// map 中没有值,则将映射添加到map,便于调用
    	fVal := reflect.ValueOf(f)
    	s.funcs[rpcName] = fVal
    }
    
    // 服务端等待调用
    func (s *Server) Run(){
    	// 监听
    	lis, err := net.Listen("tcp",s.addr)
    	if err != nil {
    		fmt.Printf("监听 %s err:%v
    ",s.addr,err)
    		return
    	}
    	for {
    		// 拿到连接
    		conn ,err := lis.Accept()
    		if err != nil {
    			fmt.Printf("accept err :%v
    ",err)
    			return
    		}
    		// 创建会话
    		srvSession := NewSession(conn)
    		// RPC读取数据
    		b,err := srvSession.Read()
    		if err != nil {
    			fmt.Printf("read err:%v
    ",err)
    			return
    		}
    		// 对数据进行解码
    		rpcData, err := decode(b)
    		if err != nil {
    			fmt.Printf("decode err:%v
    ", err)
    			return
    		}
    		// 根据读取到数据的Name,得到调用的函数名
    		f, ok := s.funcs[rpcData.Name]
    		if !ok{
    			fmt.Printf("函数 %s 不存在", rpcData.Name)
    			return
    		}
    		// 解析遍历客户端出来的参数,放到一个数组中
    		inArgs := make([]reflect.Value,0,len(rpcData.Args))
    		for _, arg := range rpcData.Args{
    			inArgs = append(inArgs, reflect.ValueOf(arg))
    		}
    		// 反射调用方法, 传入参数
    		out := f.Call(inArgs)
    		// 解析遍历执行结果,放到一个数组中
    		outArgs := make([]interface{},0,len(out))
    		for _, o := range out{
    			outArgs = append(outArgs, o.Interface())
    		}
    
    		// 包装数据,返回给客户端
    		respRPCData := RPCData{rpcData.Name,outArgs}
    		// 编码
    		respBytes, err := encode(respRPCData)
    		if err != nil {
    			fmt.Printf("encode err:%v
    ",err)
    			return
    		}
    		// 使用rpc写出数据
    		err = srvSession.Write(respBytes)
    		if err != nil {
    			fmt.Printf("session write err :%v
    ",err)
    			return
    		}
    	}
    }
    

    六、实现RPC客户端

    • 客户端只有函数原型,使用reflect.MakeFunc()可以完成原型到函数的调用
    • reflect.MakeFunc()是Client从函数原型到网络调度的关键
    package rpc
    
    import (
    	"net"
    	"reflect"
    )
    
    // 声明客户端
    type Client struct{
    	conn net.Conn
    }
    
    // 创建客户端对象
    func NewClient(conn net.Conn) * Client {
    	return &Client{conn:conn}
    }
    
    
    // 实现通用的RPC客户端
    // 绑定RPC访问的方法
    // 传入访问的函数名
    
    
    // 函数具体实现在Server端,Client只有函数原型
    // 使用MakeFunc()完成原型到函数的调用
    
    
    // fPtr指向函数原型
    // xxx.callRPC("queryUser", &query)
    func (c *Client)callRPC(rpcName string, fPtr interface{}){
    	// 通过反射,获取fPtr未初始化的函数原型
    	fn := reflect.ValueOf(fPtr).Elem()
    	// 另一个函数,作用是对用是对第一个函数操作
    	// 完成与Server的交互
    	f := func(args []reflect.Value)[]reflect.Value{
    		// 处理输入的参数
    		inArgs := make([]interface{},0, len(args))
    		for _, arg := range args {
    			inArgs = append(inArgs,arg.Interface())
    		}
    		// 创建连接
    		cliSession := NewSession(c.conn)
    		// 编码数据
    		reqRPC := RPCData{Name: rpcName,Args:inArgs}
    		b, err := encode(reqRPC)
    		if err != nil {
    			panic(err)
    		}
    		// 写出数据
    		if err= cliSession.Write(b); err != nil {
    			panic(err)
    		}
    		// 读取响应数据
    		respBytes, err := cliSession.Read()
    		if err != nil{
    			panic(err)
    		}
    		// 解码数据
    		respRPC, err := decode(respBytes)
    		if err != nil {
    			panic(err)
    		}
    
    		// 处理服务端返回的数据
    		outArgs := make([]reflect.Value,0,len(respRPC.Args))
    		for i, arg := range respRPC.Args{
    			// 必须对nil进行处理
    			if arg == nil {
    				// 必须填充一个真正的类型,不能是nil
    				outArgs = append(outArgs,reflect.Zero(fn.Type().Out(i)))
    				continue
    			}
    			outArgs = append(outArgs,reflect.ValueOf(arg))
    
    		}
    		return outArgs
    	}
    
    	// 参数1: 一个未初始化函数的方法值,类型时reflect.Type
    	// 参数2: 另一个函数,作用是对第一个函数参数操作
    	// 返回reflect.Value 类型
    	// MakeFunc 使用传入的函数原型,创建一个绑定 参数2 的新函数
    	v := reflect.MakeFunc(fn.Type(), f)
    	// 为函数fPtr赋值
    	fn.Set(v)
    }
    

    七、实现RPC通信测试

    • 给服务端注册一个查询用户的方法,客户端去RPC调用。
    package rpc
    
    import (
    	"encoding/gob"
    	"fmt"
    	"net"
    	"testing"
    )
    
    // 用户查询
    
    // 用于测试的结构体
    type User struct{
    	Name string
    	Age int
    }
    
    // 用于测试的查询用户的方法
    func queryUser(uid int)(User, error){
    	user := make(map[int]User)
    	user[0] = User{"zs",20}
    	user[1] = User{"ls",20}
    	user[2] = User{"ws",20}
    	// 模拟查询用户
    	if u, ok := user[uid]; ok {
    		return u, nil
    	}
    	return User{}, fmt.Errorf("ud %d not in user db", uid)
    }
    
    
    // 测试方法
    func TestRPC(t *testing.T){
    	// 需要对interface{}可能产生的类型进行注册
    	gob.Register(User{})
    	addr := "127.0.0.1:8000"
    	// 创建服务端
    	srv := NewServer(addr)
    	// 将方法注册到服务端
    	srv.Register("queryUser", queryUser)
    	// 服务端等待调用
    	go srv.Run()
    	// 客户端获取连接
    	conn, err := net.Dial("tcp", addr)
    	if err != nil {
    		t.Error(err)
    	}
    	// 创建客户端
    	cli := NewClient(conn)
    	// 声明函数原型
    	var query func(int)(User,error)
    	cli.callRPC("queryUser", &query)
    	// 得到查询结果
    	u, err := query(1)
    	if err != nil {
    		t.Fatal(err)
    	}
    	fmt.Println(u)
    }
    
  • 相关阅读:
    ubuntu16.04 下安装opencv2.4.9
    ubuntu16.04下CMake学习
    Ubuntu 16.04上用CMake图形界面交叉编译树莓派的OpenCV3.0
    BMP图片的C++水印算法
    OpenCV 2 学习笔记(9): 定义ROI(regions of interest):给图像加入水印
    OpenCV入门教程
    免费、高性能的人脸检测库(二进制)
    DBCP与C3P0数据库连接池
    (android高仿系列)今日头条 --新闻阅读器 (三) 完结 、总结 篇
    hadoop之WordCount源代码分析
  • 原文地址:https://www.cnblogs.com/huiyichanmian/p/14407933.html
Copyright © 2011-2022 走看看