zoukankan      html  css  js  c++  java
  • ETCD组件在grpc中的实践

    一、前言

    grpc中没有像go-micro那样集成可插拔式的etcd库使用,如何使得grpc能够使用服务注册发现及命名解析的功能,因此本文基于etcd实现了Name Resolver。

    二、所需的grpc版本及高版本grpc、protobuf与etcd兼容问题

    grpc相关库:

    google.golang.org/grpc v1.26.0

    google.golang.org/grpc/balancer/roundrobin

    google.golang.org/grpc/resolver

     

    etcd相关库:

    go.etcd.io/etcd/clientv3

    github.com/coreos/etcd/mvcc/mvccpb

     

    此处需要注意的是,新版本grpc不兼容etcd相关库, 如果grpc版本大于1.26.0或者protobuf版本过高会出现以下问题:

    1. grpc版本过高,新版本不支持etcd  需降级

    # github.com/coreos/etcd/clientv3/balancer/picker
    ../../vendor/github.com/coreos/etcd/clientv3/balancer/picker/err.go:37:44: undefined: balancer.PickOptions
    ../../vendor/github.com/coreos/etcd/clientv3/balancer/picker/roundrobin_balanced.go:55:54: undefined: balancer.PickOptions
    # github.com/coreos/etcd/clientv3/balancer/resolver/endpoint
    ../../vendor/github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/endpoint.go:114:78: undefined: resolver.BuildOption
    ../../vendor/github.com/coreos/etcd/clientv3/balancer/resolver/endpoint/endpoint.go:182:31: undefined: resolver.ResolveNowOption
    

    解决办法:在go.mod 加入:replace google.golang.org/grpc => google.golang.org/grpc v1.26.0

    2. grpc版本过低  protobuf版本过高

    # ut-blogger/api/protos/user
    ../../api/protos/user/user.pb.go:327:11: undefined: grpc.SupportPackageIsVersion6
    ../../api/protos/user/user.pb.go:338:5: undefined: grpc.ClientConnInterface
    

    解决办法:降级protoc-gen-go的版本 go get github.com/golang/protobuf/protoc-gen-go@v1.3.2 重新生成proto

    三、自定义实现naming

    package etcdservice
    
    import (
    	"context"
    	"go.etcd.io/etcd/clientv3"
    	"log"
    	"strings"
    	"time"
    )
    
    const Schema = "grpcEtcd"
    
    // Register 注册地址到ETCD组件中 使用 ; 分割
    func Register(etcdAddr, name string, addr string, ttl int64) error {
    	var err error
    
    	if cli == nil {
    		cli, err = clientv3.New(clientv3.Config{
    			Endpoints:   strings.Split(etcdAddr, ";"),
    			DialTimeout: 15 * time.Second,
    		})
    		if err != nil {
    			log.Printf("connect to etcd err:%s", err)
    			return err
    		}
    	}
    
    	ticker := time.NewTicker(time.Second * time.Duration(ttl))
    
    	go func() {
    		for {
    			getResp, err := cli.Get(context.Background(), "/"+Schema+"/"+name+"/"+addr)
    			if err != nil {
    				log.Printf("getResp:%+v
    ", getResp)
    				log.Printf("Register:%s", err)
    			} else if getResp.Count == 0 {
    				err = withAlive(name, addr, ttl)
    				if err != nil {
    					log.Printf("keep alive:%s", err)
    				}
    			}
    
    			<-ticker.C
    		}
    	}()
    
    	return nil
    }
    
    // withAlive 创建租约
    func withAlive(name string, addr string, ttl int64) error {
    	leaseResp, err := cli.Grant(context.Background(), ttl)
    	if err != nil {
    		return err
    	}
    
    	log.Printf("key:%v
    ", "/"+Schema+"/"+name+"/"+addr)
    	_, err = cli.Put(context.Background(), "/"+Schema+"/"+name+"/"+addr, addr, clientv3.WithLease(leaseResp.ID))
    	if err != nil {
    		log.Printf("put etcd error:%s", err)
    		return err
    	}
    
    	ch, err := cli.KeepAlive(context.Background(), leaseResp.ID)
    	if err != nil {
    		log.Printf("keep alive error:%s", err)
    		return err
    	}
    
    	// 清空 keep alive 返回的channel
    	go func() {
    		for {
    			<-ch
    		}
    	}()
    
    	return nil
    }
    
    // UnRegister remove service from etcd
    func UnRegister(name string, addr string) {
    	if cli != nil {
    		cli.Delete(context.Background(), "/"+Schema+"/"+name+"/"+addr)
    	}
    }
    

    四、自定义实现resolver

    package etcdservice
    
    import (
    	"context"
    	"log"
    	"strings"
    	"time"
    
    	"go.etcd.io/etcd/clientv3"
    
    	"github.com/coreos/etcd/mvcc/mvccpb"
    	"google.golang.org/grpc/resolver"
    )
    
    var cli *clientv3.Client
    
    // etcdResolver 解析struct
    type etcdResolver struct {
    	rawAddr string
    	cc      resolver.ClientConn
    }
    
    // NewResolver initialize an etcd client
    func NewResolver(etcdAddr string) resolver.Builder {
    	return &etcdResolver{rawAddr: etcdAddr}
    }
    
    // Build 构建etcd client
    func (r *etcdResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    	var err error
    	if cli == nil {
    		cli, err = clientv3.New(clientv3.Config{
    			Endpoints:   strings.Split(r.rawAddr, ";"),
    			DialTimeout: 15 * time.Second,
    		})
    		if err != nil {
    			return nil, err
    		}
    	}
    
    	r.cc = cc
    
    	go r.watch("/" + target.Scheme + "/" + target.Endpoint + "/")
    
    	return r, nil
    }
    
    // Scheme etcd resolve scheme
    func (r etcdResolver) Scheme() string {
    	return Schema
    }
    
    // ResolveNow
    func (r etcdResolver) ResolveNow(rn resolver.ResolveNowOptions) {
    	log.Println("ResolveNow")
    }
    
    // Close closes the resolver
    func (r etcdResolver) Close() {
    	log.Println("Close")
    }
    
    // watch 监听resolve列表变化
    func (r *etcdResolver) watch(keyPrefix string) {
    	var addrList []resolver.Address
    
    	getResp, err := cli.Get(context.Background(), keyPrefix, clientv3.WithPrefix())
    	if err != nil {
    		log.Println(err)
    	} else {
    		for i := range getResp.Kvs {
    			addrList = append(addrList, resolver.Address{Addr: strings.TrimPrefix(string(getResp.Kvs[i].Key), keyPrefix)})
    		}
    	}
    
    	// 新版本etcd去除了NewAddress方法 以UpdateState代替
    	r.cc.UpdateState(resolver.State{Addresses: addrList})
    
    	rch := cli.Watch(context.Background(), keyPrefix, clientv3.WithPrefix())
    	for n := range rch {
    		for _, ev := range n.Events {
    			addr := strings.TrimPrefix(string(ev.Kv.Key), keyPrefix)
    			switch ev.Type {
    			case mvccpb.PUT:
    				if !exist(addrList, addr) {
    					addrList = append(addrList, resolver.Address{Addr: addr})
    					r.cc.UpdateState(resolver.State{Addresses: addrList})
    				}
    			case mvccpb.DELETE:
    				if s, ok := remove(addrList, addr); ok {
    					addrList = s
    					r.cc.UpdateState(resolver.State{Addresses: addrList})
    				}
    			}
    			log.Printf("%s %q : %q
    ", ev.Type, ev.Kv.Key, ev.Kv.Value)
    		}
    	}
    }
    
    // exist 判断resolve address是否存在
    func exist(l []resolver.Address, addr string) bool {
    	for i := range l {
    		if l[i].Addr == addr {
    			return true
    		}
    	}
    	return false
    }
    
    // remove 从resolver列表移除
    func remove(s []resolver.Address, addr string) ([]resolver.Address, bool) {
    	for i := range s {
    		if s[i].Addr == addr {
    			s[i] = s[len(s)-1]
    			return s[:len(s)-1], true
    		}
    	}
    	return nil, false
    }
    

    五、服务端调用ETCD服务注册

    EtcdAddr、ServiceName和Ttl可以从配置文件读取,读取配置文件的方式很多,本文不在此阐述
           //将服务地址注册到etcd中
    	go etcdservice.Register(s.o.EtcdAddr, s.o.ServiceName, addr, s.o.Ttl)
    
    	ch := make(chan os.Signal, 1)
    	signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    	go func() {
    		sig := <-ch
    		etcdservice.UnRegister(s.o.ServiceName, addr)
    
    		if i, ok := sig.(syscall.Signal); ok {
    			os.Exit(int(i))
    		} else {
    			os.Exit(0)
    		}
    
    	}()
    

    六、 客户端grpc服务 ectd服务发现

    客户端调用grpc服务以serviceName的形式从etcd中获取服务节点,此处采用roundrobin的形式作为负载均衡

            //注册etcd解析器
    	r := etcdservice.NewResolver(o.EtcdAddr)
    	resolver.Register(r)
    
    	// 客户端连接服务器
    	conn, err := grpc2.Dial(r.Scheme()+"://"+o.Caller+"/"+o.Callee, grpc2.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, roundrobin.Name)), grpc2.WithInsecure())
    	if err != nil {
    		log.Println("连接服务器失败", err)
    		return nil, errors.Wrap(err, "notify client dial error")
    	}
    
  • 相关阅读:
    「Android」Android4.04 在线源代码查看
    「Android」Huawei U8825d new LowmemoryKiller config
    「Linux」Ubuntu12.10的Libpcap1.3安装过程
    NSString+NSMutableString+NSValue+NSAraay用法汇总
    hd2 刷android
    EXT分区教程
    how to use a Class Breaks Renderer in the ESRI ArcGIS iPhone API to display cities of varying population with different
    windows server 2008修改远程桌面连接数
    iphone开发常用代码
    手持GPS坐标系统的转换与应用
  • 原文地址:https://www.cnblogs.com/FG123/p/13618503.html
Copyright © 2011-2022 走看看