zoukankan      html  css  js  c++  java
  • RPC和Protubuf(四)

     Protofbuf与RPC实例

    下面已网上最多的一个RPC实例进行分析。首先定义好proto文件,在proto目录下创建message.proto文件:

    syntax = "proto3";
    
    package proto;
    
    // 订单请求参数
    message OrderRequest {
        string orderId = 1;
        int64 timeStamp = 2;
    }
    
    // 订单信息
    message OrderInfo {
        string OrderId = 1;
        string OrderName = 2;
        string OrderStatus =3;
    }
    

      使用:protoc --go_out=. message.proto 生成相应的go文件,然后创建服务端代码:mes_server.go文件:

    package main
    
    import (
    	"errors"
    	"fmt"
    	"log"
    	"net"
    	"net/http"
    	"net/rpc"
    	"rpc_protobuf/proto"
    	"time"
    )
    
    // 订单服务
    type OrderService struct {
    }
    
    // 获取订单函数,根据接口定义的请求和响应返回相应的结果
    func (os *OrderService) GetOrderInfo(request *proto.OrderRequest, response *proto.OrderInfo) error {
    	// 最后返回的结果体满足proto的OrderInfo接口
    	orderMap := map[string]proto.OrderInfo{
    		"201907300001": proto.OrderInfo{OrderId: "201907300001", OrderName: "衣服", OrderStatus: "已付款"},
    		"201907310001": proto.OrderInfo{OrderId: "201907310001", OrderName: "零食", OrderStatus: "已付款"},
    		"201907310002": proto.OrderInfo{OrderId: "201907310002", OrderName: "食品", OrderStatus: "未付款"},
    	}
    	current := time.Now().Unix()
    	// 请求发起的时间,以及调用函数的时间进行比较
    	if request.TimeStamp > current {
    		fmt.Println("超时时间戳")
    		*response = proto.OrderInfo{OrderId: "0", OrderName: "", OrderStatus: "订单信息异常"}
    	} else {
    		// 取出数据
    		result := orderMap[request.OrderId]
    		if result.OrderId != "" {
    			*response = orderMap[request.OrderId]
    		} else {
    			return errors.New("server error")
    		}
    	}
    	return nil
    }
    
    func main() {
    	orderService := new(OrderService)
    	// 注册服务
    	//rpc.RegisterName("",orderService))  // 这个函数可以传入注册的服务空间
    	rpc.Register(orderService)
    	rpc.HandleHTTP()
    	log.Println("server starting....")
    	listen, err := net.Listen("tcp", ":1234")
    	if err != nil {
    		panic(err.Error())
    	}
    	// 针对监听到得每个请求,执行handler
    	http.Serve(listen, nil)
    }  

    有了服务端代码后,紧接着我们要开始写客户端代码:mes_client.go:

    package main
    
    import (
    	"fmt"
    	"net/rpc"
    	"rpc_protobuf/proto"
    	"time"
    )
    
    func  main() {
    	// 拨号
    	client, err := rpc.DialHTTP("tcp", ":1234")
    	if err != nil {
    		panic(err.Error())
    	}
    	timeStamp := time.Now().Unix()
    	// 根据接口提供的数据结构提交请求
    	request := proto.OrderRequest{OrderId:"201907310002", TimeStamp:timeStamp}
    	var response *proto.OrderInfo
    	// 调用远程服务,这里默认是结构体名字.方法
    	err = client.Call("OrderService.GetOrderInfo", request, &response)
    	if err != nil {
    		panic(err)
    	}
    	fmt.Println(*response)
    }
    

      

    RPC实现原理

    GO语言中的RPC最简单的使用时通过Client.Call() 方法进行通过同步阻塞调用,该方法实现如下:我们打开源码

    // Call invokes the named function, waits for it to complete, and returns its error status.
    func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) error {
    	call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    	return call.Error
    }
    
    // 这里通过client.Go返回返回一个对象,最后调用该Call,结构体对象的.Done的channel中读出一个call

    这一块可以自定义,改为异步调用,具体参考《GO 语言高级编程》P219

    基于RPC实现监控功能

    在很多系统中提供了监控(watch)功能的接口, 当系统满足某些条件时,Watch方法返回监控的结果,在这里可以尝试通过RPC实现基本的监控功能, 首先我们先定义好proto文件,如果是出现传输的数据是map,或者其他复杂的数据结构,我们需要重新定义proto文件,watch.proto文件:

    syntax = "proto3";
    
    package proto;
    
    // 设置的数组定义
    message SetList {
        int32 id = 1;
        repeated string val = 2;  // repeated 标识可以重复的元素,如果是一个Map需要嵌套
    }
    
    // 超时时间设置
    message SetTime {
        int32 time = 1;
    }
    
    // 设置值结构
    message KeyChanged {
        string keyChange = 1;
    }
    
    // 设置返回值
    message ReplyString {
        string reply = 1;
    }
    

    然后通过下面的命令开始生成go文件,进入到proto目录下使用命令: protoc --go_out=. watch.proto,紧接着我们继续完成wat_service.go:

    package main
    
    import (
       "fmt"
       "log"
       "math/rand"
       "net"
       "net/rpc"
       "rpc_protobuf/proto"
       "sync"
       "time"
    )
    
    // 定义一个内存性键值对数据库
    type KVStoreService struct {
       m      map[string]string
       filter map[string]func(key string)
       mu     sync.Mutex
    }
    
    // 结构体初始化函数,调用函数前进行初始化
    func NewKVStoreService() *KVStoreService {
       return &KVStoreService{
          m:      make(map[string]string),
          filter: make(map[string]func(key string)),
       }
    }
    
    // 通过Set函数设置结构体的设置键值性数据,注意参数都是接口中传来的
    func (p *KVStoreService) Set(kv *proto.SetList, reply *proto.ReplyString) error {
       p.mu.Lock()
       defer p.mu.Unlock()
       key, value := kv.Val[0], kv.Val[1]
       fmt.Println("exect:",p.m[key],value)
       if oldValue := p.m[key]; oldValue != value {
          for _, fn := range p.filter {
             fmt.Println("ok")
             fn(key)
          }
       }
       p.m[key] = value
       reply.Reply = "success reply"
       return nil
    }
    
    // 监控函数用于客户端异步启动后监听服务
    func (p *KVStoreService) Watch(timoutSencod *proto.SetTime, keyChanged *proto.KeyChanged) error {
       id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
       ch := make(chan string, 10)
    
       p.mu.Lock()
       p.filter[id] = func(key string) { ch <- key }
       fmt.Println("watch",p.m,p.filter,ch)
       p.mu.Unlock()
    
       select {
       case <-time.After(time.Duration(timoutSencod.Time) * time.Second):
          return fmt.Errorf("time out")
       case keys := <-ch:
          fmt.Println("key change:", keys)
          keyChanged.KeyChange = keys
          return nil
       }
    }
    
    func main() {
       kvstoreService := NewKVStoreService()
       kvstoreService.m["name"] = "wang"
    
       // 注册服务
       //rpc.RegisterName("",orderService))  // 这个函数可以传入注册的服务空间
       rpc.Register(kvstoreService)
    
       log.Println("server starting....")
       listen, err := net.Listen("tcp", ":9999")
       if err != nil {
          panic(err.Error())
       }
       conn, err := listen.Accept()
       // 针对监听到得每个请求,执行handler
       rpc.ServeConn(conn)
    }
    

      

    然后是写客户端代码:

    package main
    
    import (
    	"fmt"
    	"log"
    	"net/rpc"
    	"rpc_protobuf/proto"
    	"time"
    )
    
    // 先启动监控服务,之后调用Call
    func doClientWork(client *rpc.Client) {
    	go func() {
    		var keyChange =  &proto.KeyChanged{}
    		var times = &proto.SetTime{
    			Time:30,
    		}
    		err := client.Call("KVStoreService.Watch",times,keyChange)
    		if err != nil {
    			log.Fatal(err)
    		}
    		fmt.Println("watch:",keyChange.KeyChange)
    	}()
    	// 需要停顿下否则filter数据未写入
    	time.Sleep(time.Second)
    	var reply = &proto.ReplyString{}
    	var request = &proto.SetList{
    		Id: 1,
    		Val:  []string{"name","li"},
    	}
    	err := client.Call("KVStoreService.Set",request,reply)
    	fmt.Println(reply)
    	if err != nil {
    		log.Fatal(err)
    	}
    	time.Sleep(time.Second*3)
    }
    
    
    func  main() {
    	// 拨号
    	client, err := rpc.Dial("tcp", ":9999")
    	if err != nil {
    		panic(err.Error())
    	}
    	// 调用远程服务,这里默认是结构体名字.方法
    	doClientWork(client)
    	time.Sleep(2*time.Second)
    }
    

      

  • 相关阅读:
    1021 个位数统计 (15 分)
    10. HttpServletResponse接口
    9. HttpServletRequest接口
    11. jQuery 获取元素尺寸
    10. jQuery 对元素属性的操作~ 一篇就够.
    7. HttpServlet类
    6 .数据库-增删改
    6. GenericServlet类
    9. jQuery 的节点操作
    8.jQuery 的 基本绑定事件操作
  • 原文地址:https://www.cnblogs.com/double-W/p/12746516.html
Copyright © 2011-2022 走看看