zoukankan      html  css  js  c++  java
  • 微服务架构攀登之路(二)之RPC

    1. RPC 简介

    ⚫  远程过程调用(Remote Procedure Call,RPC)是一个计算机通信协议

    ⚫  该协议允许运行于一台计算机的程序调用另一台计算机的子程序,而程序员无需额外地为这个交互作用编程

    ⚫  如果涉及的软件采用面向对象编程,那么远程过程调用亦可称作远程调用或远程方法调用

    2. 流行 RPC 框架的对比

    3. golang 中如何实现 RPC

    ⚫  golang 中实现 RPC 非常简单,官方提供了封装好的库,还有一些第三方的库

    ⚫  golang 官方的 net/rpc 库使用 encoding/gob 进行编解码,支持 tcp 和 http 数据传输方式,由于其他语言不支持 gob 编解码方式,所以 golang 的 RPC 只支持 golang 开发的服务器与客户端之间的交互

    ⚫  官方还提供了net/rpc/jsonrpc 库实现RPC 方法,jsonrpc 采用JSON 进行数据编解码,因而支持跨语言调用,目前 jsonrpc 库是基于 tcp 协议实现的,暂不支持 http 传输方式

    ⚫  golang 的 RPC 必须符合 4 个条件才可以

    ◼         结构体字段首字母要大写,要跨域访问,所以大写

    ◼         函数名必须首字母大写(可以序列号导出的)

    ◼         函数第一个参数是接收参数,第二个参数是返回给客户端参数,必须是指针类型

    ◼         函数必须有一个返回值 error

    ⚫ 例题:golang 实现 RPC 程序,实现求矩形面积和周长

       server端

    package main
    
    import (
    	"fmt"
    	"log"
    	"net/http"
    	"net/rpc"
    )
    
    // 服务端,求矩形面积和周长
    
    // 声明矩形对象
    type Rect struct {
    }
    
    // 声明参数结构体,字段首字母大写
    type Params struct {
    	// 长和宽
    	Width, Height int
    }
    
    // 定义求矩形面积的方法
    func (r *Rect) Area(p Params, ret *int) error {
    	*ret = p.Width * p.Height
    	return nil
    }
    
    //定义求矩形周长的方法
    func (r *Rect) Perimeter(p Params, ret *int) error {
    	*ret = (p.Width + p.Height) * 2
    	return nil
    }
    
    func main() {
    	// 1. 注册服务
    	rect := new(Rect)
    	rpc.Register(rect)
    	// 2.把服务处理绑定到http协议上
    	rpc.HandleHTTP()
    	fmt.Println("------ rpc service is already on ------")
    	// 3. 监听服务,等待客户端调用求周长和面积的方法
    	err := http.ListenAndServe(":8080", nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    }
    

     客户端

    package main
    
    import (
    	"fmt"
    	"log"
    	"net/rpc"
    )
    
    type Params struct {
    	Width, Height int
    }
    
    // 调用服务
    func main(){
    	// 1. 连接远程RPC服务
    	rp, err := rpc.DialHTTP("tcp","182.254.179.186:8080")
    	if err != nil {
    		log.Fatal(err)
    	}
    	// 2.调用远程方法
    	// 定义接收服务器传回来的计算结果的值
    	ret := 0
    	err2 := rp.Call("Rect.Area",Params{50,100},&ret)
    	if err2!=nil {
    		log.Fatal(err2)
    	}
    	fmt.Println("面积",ret)
    
    	//求周长
    	err3:= rp.Call("Rect.Perimeter",Params{50,100},&ret)
    	if err3!=nil {
    		log.Fatal(err3)
    	}
    	fmt.Println("周长",ret)
    
    }

    ⚫ 练习:模仿前面例题,自己实现 RPC 程序,服务端接收 2 个参数,可以做乘法运算,也可以做商和余数的运算,客户端进行传参和访问,得到结果如下:

    package main
    
    import (
        "errors"
        "fmt"
        "log"
        "net/http"
        "net/rpc"
    )
    
    type Arith struct {
    }
    // 声明接收的参数结构体
    type ArithRequest struct {
        A,B int
    }
    // 声明返回客户端的参数结构体
    type ArithResponse struct {
        //乘积
        Pro int
        // 商
        Quo int
        // 余数
        Rem int
    }
    //乘法
    func (this *Arith)Multiply(req ArithRequest,res *ArithResponse)error {
        res.Pro = req.A * req.B
        return nil
    }
    // 商和余数
    func (this *Arith)Divide(req *ArithRequest, res *ArithResponse)error{
        if req.B == 0 {
            return errors.New("出书不能为0")
        }
        //商
        res.Quo = req.A / req.B
        //余数
        res.Rem = req.A % req.B
        return nil
    }
    
    func main()  {
        //注册服务
        rpc.Register(new(Arith))
        // 采用http作为rpc载体
        rpc.HandleHTTP()
        fmt.Println("------ rpc service is already on ------")
        // 监听服务,等待客户端调用响应的方法
        err := http.ListenAndServe(":8081",nil)
        if err!=nil {
            log.Fatal(err)
        }
    }
    server.go
    package main
    
    import (
        "fmt"
        "log"
        "net/rpc"
    )
    
    type Arith struct {
    }
    // 声明接收的参数结构体
    type ArithRequest struct {
        A,B int
    }
    // 声明返回客户端的参数结构体
    type ArithResponse struct {
        //乘积
        Pro int
        // 商
        Quo int
        // 余数
        Rem int
    }
    
    func main()  {
        conn, err := rpc.DialHTTP("tcp", "182.254.179.186:8081")
        //conn, err := rpc.DialHTTP("tcp", "127.0.0.1:8081")
        if err!= nil{
            log.Fatal(err)
        }
        req := ArithRequest{9, 2}
        var res ArithResponse
        err2 := conn.Call("Arith.Multiply",req, &res)
        if err2 != nil {
            log.Println(err2)
        }
        fmt.Printf("%d * %d = %d
    ", req.A,req.B,res.Pro)
        err3 := conn.Call("Arith.Divide",req, &res)
        if err3 != nil {
            log.Println(err3)
        }
        fmt.Printf("%d / %d = %d,%d", req.A,req.B,res.Quo,res.Rem)
    }
    client.go

    ⚫ 另外,net/rpc/jsonrpc 库通过 json 格式编解码,支持跨语言调用

    package main
    
    import (
        "errors"
        "fmt"
        "log"
        "net"
        "net/rpc"
        "net/rpc/jsonrpc"
    )
    
    type Arith struct {
    }
    
    // 声明接收的参数结构体
    type ArithRequest struct {
        A, B int
    }
    
    // 声明返回客户端的参数结构体
    type ArithResponse struct {
        //乘积
        Pro int
        // 商
        Quo int
        // 余数
        Rem int
    }
    
    //乘法
    func (this *Arith) Multiply(req ArithRequest, res *ArithResponse) error {
        res.Pro = req.A * req.B
        return nil
    }
    
    // 商和余数
    func (this *Arith) Divide(req *ArithRequest, res *ArithResponse) error {
        if req.B == 0 {
            return errors.New("出书不能为0")
        }
        //商
        res.Quo = req.A / req.B
        //余数
        res.Rem = req.A % req.B
        return nil
    }
    
    func main() {
        //注册服务
        rpc.Register(new(Arith))
        // 采用http作为rpc载体
        fmt.Println("------ jsonrpc service is already on ------")
        // 监听服务,等待客户端调用响应的方法
        listener, err := net.Listen("tcp","127.0.0.1:8081")
        if err != nil {
            log.Fatal(err)
        }
        //循环监听服务
        for {
            conn, err := listener.Accept()
            if err != nil {
                continue
            }
            //协程
            go func(conn net.Conn) {
                fmt.Println("a new client visit -----")
                jsonrpc.ServeConn(conn)
            }(conn)
        }
    }
    server.go
    package main
    
    import (
        "fmt"
        "log"
        "net/rpc/jsonrpc"
    )
    
    type Arith struct {
    }
    // 声明接收的参数结构体
    type ArithRequest struct {
        A,B int
    }
    // 声明返回客户端的参数结构体
    type ArithResponse struct {
        //乘积
        Pro int
        // 商
        Quo int
        // 余数
        Rem int
    }
    
    func main()  {
        conn, err := jsonrpc.Dial("tcp", "182.254.179.186:8081")
        //conn, err := jsonrpc.Dial("tcp", "127.0.0.1:8081")
        if err!= nil{
            log.Fatal(err)
        }
        req := ArithRequest{9, 2}
        var res ArithResponse
        err2 := conn.Call("Arith.Multiply",req, &res)
        if err2 != nil {
            log.Println(err2)
        }
        fmt.Printf("%d * %d = %d
    ", req.A,req.B,res.Pro)
        err3 := conn.Call("Arith.Divide",req, &res)
        if err3 != nil {
            log.Println(err3)
        }
        fmt.Printf("%d / %d = %d,%d", req.A,req.B,res.Quo,res.Rem)
    client.go

    4. RPC 调用流程

    ⚫  微服务架构下数据交互一般是对内 RPC,对外 REST

    ⚫   将业务按功能模块拆分到各个微服务,具有提高项目协作效率、降低模块耦合度、提高系统可用性等优点,但是开发门槛比较高,比如 RPC  框架的使用、后期的服务监控等工作

    ⚫   一般情况下,我们会将功能代码在本地直接调用,微服务架构下,我们需要将这个函数作为单独的服务运行,客户端通过网络调用

    5. 网络传输数据格式

    ⚫  成熟的 RPC 框架会有自定义传输协议,这里网络传输格式定义如下,前面是固定长度消息头,后面是变长消息体

     

    6. 实现 RPC 服务端

    ⚫  服务端接收到的数据需要包括什么?

    ◼  调用的函数名、参数列表

    ◼  一般会约定函数的第二个返回值是 error 类型

    ◼  通过反射实现

    ⚫ 服务端需要解决的问题是什么?

    ◼  Client 调用时只传过来函数名,需要维护函数名到函数之间的 map 映射

    ⚫  服务端的核心功能有哪些?

    ◼  维护函数名到函数反射值的 map

    ◼  client 端传函数名、参数列表后,服务端要解析为反射值,调用执行

    ◼  函数的返回值打包,并通过网络返回给客户端

    7. 实现 RPC 客户端

    ⚫ 客户端只有函数原型,使用reflect.MakeFunc()  可以完成原型到函数的调用

    ⚫  reflect.MakeFunc()是 Client 从函数原型到网络调用的关键

    8. 实现 RPC 通信测试

    ⚫ 给服务端注册一个查询用户的方法,客户端去 RPC 调用

    package main
    
    import (
        "encoding/binary"
        "io"
        "log"
        "net"
    )
    
    // 会话连接的结构体
    type Session struct {
        conn net.Conn
    }
    
    // 创建新连接
    func NewSession(conn net.Conn) *Session  {
        return &Session{conn:conn}
    }
    
    // 向连接中写数据
    func (s *Session)Write(data []byte)error{
        // 4字节头 + 数据长度的切片
        buf := make([]byte, 4+len(data))
        // 写入头部数据,记录数据长度
        // binary只认固定长度的类型,所以使用uint32,而不是直接写入
        binary.BigEndian.PutUint32(buf[:4],uint32(len(data)))
        // 写入
        copy(buf[4:], data)
        _, err := s.conn.Write(buf)
        if err != nil{
            log.Fatal(err)
        }
        return nil
    }
    
    // 从连接中读取数据
    func (s *Session)Read()([]byte, error)  {
        // 读取头部长度
        header := make([]byte,4)
        // 按头部长度,读取头部数据
        _,err := io.ReadFull(s.conn, header)
        if err != nil{
            log.Fatal(err)
        }
        // 读取数据长度
        datalen := binary.BigEndian.Uint32(header)
        // 按照数据长度去读取数据
        data := make([]byte, datalen)
        _,err = io.ReadFull(s.conn, data)
        if err != nil{
            log.Fatal(err)
        }
        return data,nil
    }
    session.go
    package main
    
    import (
        "bytes"
        "encoding/gob"
    )
    
    // 定义数据格式和编解码
    
    // 定义RPC交互的数据格式
    type RPCData struct {
        // 访问的函数
        Name string
        // 访问时传的参数
        Args []interface{}
    }
    
    // 编码
    func encode(data RPCData)([]byte,error)  {
        var buf bytes.Buffer
        // 得到字节数组的编码器
        bufEnc := gob.NewEncoder(&buf)
        // 对数据编码
        if err := bufEnc.Encode(data);err!=nil {
            return nil,err
        }
        return buf.Bytes(), nil
    }
    //解码
    func decode(b []byte)(RPCData,error)  {
        buf := bytes.NewReader(b)
        // 返回字节数组解码器
        bufDec := gob.NewDecoder(buf)
        var data RPCData
        // 对数据解码
        if err := bufDec.Decode(&data); err!= nil{
            return data,err
        }
        return data, nil
    }
    codec.go
    package main
    
    import (
        "fmt"
        "net"
        "reflect"
    )
    
    //声明服务器
    type Server struct {
        // 地址
        addr string
        // 服务端维护的函数名到函数反射值的map
        funcs map[string]reflect.Value
    }
    // 创建服务端对象
    func NewServer(addr string)*Server  {
        return &Server{addr,make(map[string]reflect.Value)}
    }
    
    // 服务端绑定注册方法
    // 将函数名与函数真正实现对应起来
    // 第一个参数为函数名,第二个传入真正的函数
    func (s *Server)Register(rpcName string, f interface{}){
        if _, ok := s.funcs[rpcName];ok{
            return
        }
        // map中没有值,则映射添加进map,便于调用
        fVal := reflect.ValueOf(f)
        s.funcs[rpcName] = fVal
    }
    //服务端等待调用
    func (s *Server)Run()  {
        // 监听
        lis, err := net.Listen("tcp",s.addr)
        if err!=nil {
            fmt.Printf("监听 %s err:%v", s.addr, err)
            return
        }
        for {
            //拿到连接
            conn,err := lis.Accept()
            if err != nil {
                fmt.Printf("accept err:%v",err)
            }
            // 创建会话
            srvSession := NewSession(conn)
            // RPC读取数据
            b, err := srvSession.Read()
            if err != nil {
                fmt.Printf("read err:%v", err)
            }
            // 对数据解码
            rpcData, err := decode(b)
            if err != nil {
                fmt.Printf("read err:%v", err)
            }
            // 根据读取到的数据的Name,得到调用的函数名
            f, ok := s.funcs[rpcData.Name]
            if !ok {
                fmt.Printf("函数 %s 不存在", rpcData.Name)
                return
            }
            // 解析遍历客户端出来的参数,放到一个数组中
            inArgs := make([]reflect.Value, 0, len(rpcData.Args))
            for _, arg := range rpcData.Args {
                inArgs = append(inArgs, reflect.ValueOf(arg))
            }
            // 反射调用方法,传入参数
            out := f.Call(inArgs)
            // 解析遍历结果,放到一个数组中
            outArgs := make([]interface{}, 0, len(out))
            for _, o := range out {
                outArgs = append(outArgs, o.Interface())
            }
            // 包装数据,返回给客户端
            respRPCData := RPCData{rpcData.Name, outArgs}
            // 编码
            respBytes, err := encode(respRPCData)
            if err != nil {
                fmt.Printf("encode err:%v", err)
                return
            }
            // 使用rpc写出数据
            err = srvSession.Write(respBytes)
            if err != nil {
                fmt.Printf("session write err:%v", err)
                return
            }
        }
    }
    server.go
    package main
    
    import (
        "net"
        "reflect"
    )
    
    // 声明客户端
    type Client struct {
        conn net.Conn
    }
    
    // 创建客户端对象
    func NewClient(conn net.Conn) *Client {
        return &Client{conn: conn}
    }
    
    // 实现通用的RPC客户端
    // 绑定RPC访问的方法
    // 传入访问的函数名
    
    // 函数具体实现在Server端,Client只有函数原型
    // 使用MakeFunc()完成原型到函数的调用
    
    //fPtr指向函数原型
    // xxx.callRPC("queryUser", &query)
    func (c *Client) callRPC(rpcName string, fPtr interface{}) {
        // 通过反射,获取fPtr未初始化的函数原型
        fn := reflect.ValueOf(fPtr).Elem()
        // 另一个函数,作用是对第一个函数参数操作
        f := func(args []reflect.Value) []reflect.Value {
            // 处理输入的参数
            inArgs := make([]interface{}, 0, len(args))
            for _, arg := range args {
                inArgs = append(inArgs, arg.Interface())
            }
            // 创建连接
            cliSession := NewSession(c.conn)
            // 编码数据
            reqRPC := RPCData{rpcName, inArgs}
            b, err := encode(reqRPC)
            if err != nil {
                panic(err)
            }
            // 写出数据
            err = cliSession.Write(b)
            if err != nil {
                panic(err)
            }
            // 读取响应数据
            respBytes, err := cliSession.Read()
            if err != nil {
                panic(err)
            }
            // 解码数据
            respRPC, err := decode(respBytes)
            if err != nil {
                panic(err)
            }
            // 处理服务端返回的数据
            outArgs := make([]reflect.Value, 0, len(respRPC.Args))
            for i, arg := range respRPC.Args {
                // 必须进行nil转换
                if arg == nil {
                    // 必须填充一个真正的类型,不能是nil
                    outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
                    continue
                }
                outArgs = append(outArgs, reflect.ValueOf(arg))
            }
            return outArgs
        }
        // 参数1:一个未初始化函数的方法值,类型是reflect.Type
        // 参数2:另一个函数,作用是对第一个函数参数操作
        // 返回reflect.Value类型
        // MakeFunc 使用传入的函数原型,创建一个绑定 参数2的新函数
        v := reflect.MakeFunc(fn.Type(), f)
        // 为函数fPtr赋值
        fn.Set(v)
    }
    client.go
    • simple_rpc_test.go
    package main
    
    import (
    	"encoding/gob"
    	"fmt"
    	"net"
    	"testing"
    )
    
    // 用户查询
    
    //用于测试的结构体
    // 字段首字母必须大写
    type User struct {
    	Name string
    	Age int
    }
    
    // 用于测试的查询用户的方法
    func queryUser(uid int)(User,error)  {
    	user := make(map[int]User)
    	user[0] = User{"张三",20}
    	user[1] = User{"李四",21}
    	user[2] = User{"王五",22}
    	// 模拟查询用户
    	if u, ok := user[uid];ok {
    		return u,nil
    	}
    	return User{},fmt.Errorf("id %d not in user db",uid)
    }
    
    // 测试方法
    func TestRPC(t *testing.T)  {
    	// 需要对interface{}可能产生的类型进行注册
    	gob.Register(User{})
    	addr := "127.0.0.1:8080"
    	// 创建服务端
    	srv := NewServer(addr)
    	srv.Register("queryUser",queryUser)
    	// 服务端等待调用
    	go srv.Run()
    	// 客户端获取连接
    	conn, err := net.Dial("tcp",addr)
    	if err != nil {
    		t.Error(err)
    	}
    	// 创建客户端
    	cli := NewClient(conn)
    	// 声明函数原型
    	var query func(int)(User,error)
    	cli.callRPC("queryUser",&query)
    	// 得到查询结果
    	u,err := query(1)
    	if err!=nil {
    		t.Fatal(err)
    	}
    	fmt.Println(u)
    }

    • 测试读写
    package main
    
    import (
    	"fmt"
    	"log"
    	"net"
    	"sync"
    	"testing"
    )
    
    // 测试读写
    func TestSession_ReadWrite(t *testing.T) {
    	// 定义监听IP和端口
    	addr := "127.0.0.1:8000"
    	//定义传输的数据
    	my_data := "hello"
    	// 等待组
    	wg := sync.WaitGroup{}
    	// 协程1个读,1个写
    	wg.Add(2)
    	// 写数据协程
    	go func() {
    		defer wg.Done()
    		//创建rpc连接
    		listener,err := net.Listen("tcp",addr)
    		if err!=nil {
    			log.Fatal(err)
    		}
    		conn,_ := listener.Accept()
    		s := Session{conn:conn}
    		// 写数据
    		err = s.Write([]byte(my_data))
    		if err!=nil {
    			log.Fatal(err)
    		}
    	}()
    	// 读数据协程
    	go func() {
    		defer wg.Done()
    		conn,err := net.Dial("tcp",addr)
    		if err!=nil {
    			log.Fatal(err)
    		}
    		s := Session{conn:conn}
    		// 读数据
    		data, err := s.Read()
    		if err!=nil {
    			log.Fatal(err)
    		}
    		if string(data) != my_data {
    			t.Fatal(err)
    		}
    		fmt.Println(string(data))
    	}()
    	wg.Wait()
    }
    

  • 相关阅读:
    Django + Uwsgi + Nginx 的概念
    ubantu+nginx+uwsgi+django部署
    FileZilla以root用户登录Linux
    全文检索django-haystack+jieba+whoosh
    七牛云上传视频
    JWT登录与多方式登录
    vue绑定用户页面
    绑定微博用户接口
    vue微博回调空页面
    微博回调接口
  • 原文地址:https://www.cnblogs.com/zhangyafei/p/11930402.html
Copyright © 2011-2022 走看看