zoukankan      html  css  js  c++  java
  • gRPC接入etcd

      本文讲解gRPC接入etcd,实现服务注册与服务发现。
    需要先安装Go语言的etcd客户端包:

    go get go.etcd.io/etcd/clientv3
    

    然后就可以开始操作一波了。
    说明:
    以下代码需要根据实际代码位置对import语句内容进行微调。
    我的目录结构:
    $GOPATH/src/go-git/etcd-demo:

    一. 协议制定(proto/greet.proto)

    syntax = "proto3";
    
    option go_package = ".;greet";
    
    service Greet {
        rpc Morning(GreetRequest)returns(GreetResponse){}
        rpc Night(GreetRequest)returns(GreetResponse){}
    }
    
    message GreetRequest {
        string name = 1;
    }
    
    message GreetResponse {
        string message = 1;
        string from = 2;
    }
    

    生成代码:(proto子目录下执行)

    protoc --go_out=plugins=grpc:. *.proto
    

    执行完成,proto子目录生成文件greet.pb.go。

    二. 服务端(server/main.go)
    服务端主要有以下步骤:
    监听网络端口
    创建gRPC句柄,注册gRPC服务
    将服务地址注册到etcd
    监听并处理服务请求

    这里主要介绍一下将服务地址注册到etcd的过程(双保险):
    一方面,由于服务端无法保证自身是一直可用的,所以与etcd的租约是有时间期限的,租约一旦过期,服务端存储在etcd上的服务地址信息就会消失。
    另一方面,服务端可用时又必须保证调用方能发现自己,即保证自己在etcd上的服务地址信息不消失,所以需要发送心跳检测,一旦发现etcd上没有自己的服务地址时,请求重新添加(续租)。

    代码逻辑:

    /**
    * etcd demo server
    * author: JetWu
    * date: 2020.05.01
     */
    package main
    
    import (
        "flag"
        "fmt"
        proto "go-git/etcd-demo/proto"
        "net"
        "os"
        "os/signal"
        "strings"
        "syscall"
        "time"
    
        "go.etcd.io/etcd/clientv3"
        "golang.org/x/net/context"
        "google.golang.org/grpc"
    )
    
    const schema = "ns"
    
    var host = "127.0.0.1" //服务器主机
    var (
        Port        = flag.Int("Port", 3000, "listening port")                           //服务器监听端口
        ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服务名称
        EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
    )
    var cli *clientv3.Client
    
    //rpc服务接口
    type greetServer struct{}
    
    func (gs *greetServer) Morning(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
        fmt.Printf("Morning 调用: %s
    ", req.Name)
        return &proto.GreetResponse{
            Message: "Good morning, " + req.Name,
            From:    fmt.Sprintf("127.0.0.1:%d", *Port),
        }, nil
    }
    
    func (gs *greetServer) Night(ctx context.Context, req *proto.GreetRequest) (*proto.GreetResponse, error) {
        fmt.Printf("Night 调用: %s
    ", req.Name)
        return &proto.GreetResponse{
            Message: "Good night, " + req.Name,
            From:    fmt.Sprintf("127.0.0.1:%d", *Port),
        }, nil
    }
    
    //将服务地址注册到etcd中
    func register(etcdAddr, serviceName, serverAddr string, ttl int64) error {
        var err error
    
        if cli == nil {
            //构建etcd client
            cli, err = clientv3.New(clientv3.Config{
                Endpoints:   strings.Split(etcdAddr, ";"),
                DialTimeout: 15 * time.Second,
            })
            if err != nil {
                fmt.Printf("连接etcd失败:%s
    ", err)
                return err
            }
        }
    
        //与etcd建立长连接,并保证连接不断(心跳检测)
        ticker := time.NewTicker(time.Second * time.Duration(ttl))
        go func() {
            key := "/" + schema + "/" + serviceName + "/" + serverAddr
            for {
                resp, err := cli.Get(context.Background(), key)
                //fmt.Printf("resp:%+v
    ", resp)
                if err != nil {
                    fmt.Printf("获取服务地址失败:%s", err)
                } else if resp.Count == 0 { //尚未注册
                    err = keepAlive(serviceName, serverAddr, ttl)
                    if err != nil {
                        fmt.Printf("保持连接失败:%s", err)
                    }
                }
                <-ticker.C
            }
        }()
    
        return nil
    }
    
    //保持服务器与etcd的长连接
    func keepAlive(serviceName, serverAddr string, ttl int64) error {
        //创建租约
        leaseResp, err := cli.Grant(context.Background(), ttl)
        if err != nil {
            fmt.Printf("创建租期失败:%s
    ", err)
            return err
        }
    
        //将服务地址注册到etcd中
        key := "/" + schema + "/" + serviceName + "/" + serverAddr
        _, err = cli.Put(context.Background(), key, serverAddr, clientv3.WithLease(leaseResp.ID))
        if err != nil {
            fmt.Printf("注册服务失败:%s", err)
            return err
        }
    
        //建立长连接
        ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
        if err != nil {
            fmt.Printf("建立长连接失败:%s
    ", err)
            return err
        }
    
        //清空keepAlive返回的channel
        go func() {
            for {
                <-ch
            }
        }()
        return nil
    }
    
    //取消注册
    func unRegister(serviceName, serverAddr string) {
        if cli != nil {
            key := "/" + schema + "/" + serviceName + "/" + serverAddr
            cli.Delete(context.Background(), key)
        }
    }
    
    func main() {
        flag.Parse()
    
        //监听网络
        listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", *Port))
        if err != nil {
            fmt.Println("监听网络失败:", err)
            return
        }
        defer listener.Close()
    
        //创建grpc句柄
        srv := grpc.NewServer()
        defer srv.GracefulStop()
    
        //将greetServer结构体注册到grpc服务中
        proto.RegisterGreetServer(srv, &greetServer{})
    
        //将服务地址注册到etcd中
        serverAddr := fmt.Sprintf("%s:%d", host, *Port)
        fmt.Printf("greeting server address: %s
    ", serverAddr)
        register(*EtcdAddr, *ServiceName, serverAddr, 5)
    
        //关闭信号处理
        ch := make(chan os.Signal, 1)
        signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
        go func() {
            s := <-ch
            unRegister(*ServiceName, serverAddr)
            if i, ok := s.(syscall.Signal); ok {
                os.Exit(int(i))
            } else {
                os.Exit(0)
            }
        }()
    
        //监听服务
        err = srv.Serve(listener)
        if err != nil {
            fmt.Println("监听异常:", err)
            return
        }
    }
    

    三. 客户端(client/main.go)
    客户端首先需要实现接口resolver.Resolver,其中方法Build()用于创建一个etcd解析器,grpc.Dial()会同步调用该方法,解析器需要根据key前缀监听etcd中服务地址列表的变化并更新本地列表。然后注册解析器,创建gRPC句柄,使用轮询负载均衡请求服务。

    代码逻辑:

    /**
    * etcd demo client
    * author: JetWu
    * date: 2020.05.02
     */
     package main
    
    import (
        "flag"
        "fmt"
        proto "go-git/etcd-demo/proto"
        "log"
        "strings"
        "time"
    
        "github.com/coreos/etcd/mvcc/mvccpb"
        "go.etcd.io/etcd/clientv3"
        "golang.org/x/net/context"
        "google.golang.org/grpc"
        "google.golang.org/grpc/resolver"
    )
    
    const schema = "ns"
    
    var (
        ServiceName = flag.String("ServiceName", "greet_service", "service name")        //服务名称
        EtcdAddr    = flag.String("EtcdAddr", "127.0.0.1:2379", "register etcd address") //etcd的地址
    )
    
    var cli *clientv3.Client
    
    //etcd解析器
    type etcdResolver struct {
        etcdAddr   string
        clientConn resolver.ClientConn
    }
    
    //初始化一个etcd解析器
    func newResolver(etcdAddr string) resolver.Builder {
        return &etcdResolver{etcdAddr: etcdAddr}
    }
    
    func (r *etcdResolver) Scheme() string {
        return schema
    }
    
    //watch有变化以后会调用
    func (r *etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
        log.Println("ResolveNow")
        fmt.Println(rn)
    }
    
    //解析器关闭时调用
    func (r *etcdResolver) Close() {
        log.Println("Close")
    }
    
    //构建解析器 grpc.Dial()同步调用
    func (r *etcdResolver) Build(target resolver.Target, clientConn resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
        var err error
    
        //构建etcd client
        if cli == nil {
            cli, err = clientv3.New(clientv3.Config{
                Endpoints:   strings.Split(r.etcdAddr, ";"),
                DialTimeout: 15 * time.Second,
            })
            if err != nil {
                fmt.Printf("连接etcd失败:%s
    ", err)
                return nil, err
            }
        }
    
        r.clientConn = clientConn
    
        go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
    
        return r, nil
    }
    
    //监听etcd中某个key前缀的服务地址列表的变化
    func (r *etcdResolver) watch(keyPrefix string) {
        //初始化服务地址列表
        var addrList []resolver.Address
    
        resp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
        if err != nil {
            fmt.Println("获取服务地址列表失败:", err)
        } else {
            for i := range resp.Kvs {
                addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(resp.Kvs[i].Key), keyPrefix)})
            }
        }
    
        r.clientConn.NewAddress(addrList)
    
        //监听服务地址列表的变化
        rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
        for n := range rch {
            for _, ev := range n.Events {
                addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
                switch ev.Type {
                case mvccpb.PUT:
                    if !exists(addrList, addr) {
                        addrList = append(addrList, resolver.Address{Addr: addr})
                        r.clientConn.NewAddress(addrList)
                    }
                case mvccpb.DELETE:
                    if s, ok := remove(addrList, addr); ok {
                        addrList = s
                        r.clientConn.NewAddress(addrList)
                    }
                }
            }
        }
    }
    
    func exists(l []resolver.Address, addr string) bool {
        for i := range l {
            if l[i].Addr == addr {
                return true
            }
        }
        return false
    }
    
    func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
        for i := range s {
            if s[i].Addr == addr {
                s[i] = s[len(s)-1]
                return s[:len(s)-1], true
            }
        }
        return nil, false
    }
    
    func main() {
        flag.Parse()
    
        //注册etcd解析器
        r := newResolver(*EtcdAddr)
        resolver.Register(r)
    
        //客户端连接服务器(负载均衡:轮询) 会同步调用r.Build()
        conn, err := grpc.Dial(r.Scheme()+"://author/"+*ServiceName, grpc.WithBalancerName("round_robin"), grpc.WithInsecure())
        if err != nil {
            fmt.Println("连接服务器失败:", err)
        }
        defer conn.Close()
    
        //获得grpc句柄
        c := proto.NewGreetClient(conn)
        ticker := time.NewTicker(1 * time.Second)
        for range ticker.C {
            fmt.Println("Morning 调用...")
            resp1, err := c.Morning(
                context.Background(),
                &proto.GreetRequest{Name: "JetWu"},
            )
            if err != nil {
                fmt.Println("Morning调用失败:", err)
                return
            }
            fmt.Printf("Morning 响应:%s,来自:%s
    ", resp1.Message, resp1.From)
    
            fmt.Println("Night 调用...")
            resp2, err := c.Night(
                context.Background(),
                &proto.GreetRequest{Name: "JetWu"},
            )
            if err != nil {
                fmt.Println("Night调用失败:", err)
                return
            }
            fmt.Printf("Night 响应:%s,来自:%s
    ", resp2.Message, resp2.From)
        }
    }
    

      

    四. 运行验证

    启动etcd,使用3个不同端口运行三个服务端:

    启动客户端:

    可以看到,客户端使用轮询的方式对三个服务端进行请求,从而实现负载均衡。

  • 相关阅读:
    AdvComboBox
    带有可选选项的输入文本框(组合框)
    使用JavaScript为整个网站创建通用的Twitter按钮
    高速绘图控件
    Outlook样式分组列表控件
    CComboBox的禁用项目
    一个自定义的WPF滑块按钮
    23个设计模式
    MVC执行流程
    SQL注入面试
  • 原文地址:https://www.cnblogs.com/wujuntian/p/12838041.html
Copyright © 2011-2022 走看看