zoukankan      html  css  js  c++  java
  • etcd学习(3)-grpc使用etcd做服务发现

    grpc通过etcd实现服务发现

    前言

    项目中使用etcd实现了grpc的服务户注册和服务发现,这里来看下如何实现的服务注册和服务发现

    先来看下使用的demo,demo中的代码discovery

    服务注册

    package discovery
    
    import (
    	"context"
    	"encoding/json"
    	"errors"
    	"net/http"
    	"strconv"
    	"strings"
    	"time"
    
    	clientv3 "go.etcd.io/etcd/client/v3"
    	"go.uber.org/zap"
    )
    
    // Register for grpc server
    type Register struct {
    	EtcdAddrs   []string
    	DialTimeout int
    
    	closeCh     chan struct{}
    	leasesID    clientv3.LeaseID
    	keepAliveCh <-chan *clientv3.LeaseKeepAliveResponse
    
    	srvInfo Server
    	srvTTL  int64
    	cli     *clientv3.Client
    	logger  *zap.Logger
    }
    
    // NewRegister create a register base on etcd
    func NewRegister(etcdAddrs []string, logger *zap.Logger) *Register {
    	return &Register{
    		EtcdAddrs:   etcdAddrs,
    		DialTimeout: 3,
    		logger:      logger,
    	}
    }
    
    // Register a service
    func (r *Register) Register(srvInfo Server, ttl int64) (chan<- struct{}, error) {
    	var err error
    
    	if strings.Split(srvInfo.Addr, ":")[0] == "" {
    		return nil, errors.New("invalid ip")
    	}
    
    	if r.cli, err = clientv3.New(clientv3.Config{
    		Endpoints:   r.EtcdAddrs,
    		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
    	}); err != nil {
    		return nil, err
    	}
    
    	r.srvInfo = srvInfo
    	r.srvTTL = ttl
    
    	if err = r.register(); err != nil {
    		return nil, err
    	}
    
    	r.closeCh = make(chan struct{})
    
    	go r.keepAlive()
    
    	return r.closeCh, nil
    }
    
    // Stop stop register
    func (r *Register) Stop() {
    	r.closeCh <- struct{}{}
    }
    
    // register 注册节点
    func (r *Register) register() error {
    	leaseCtx, cancel := context.WithTimeout(context.Background(), time.Duration(r.DialTimeout)*time.Second)
    	defer cancel()
    
    	leaseResp, err := r.cli.Grant(leaseCtx, r.srvTTL)
    	if err != nil {
    		return err
    	}
    	r.leasesID = leaseResp.ID
    	if r.keepAliveCh, err = r.cli.KeepAlive(context.Background(), leaseResp.ID); err != nil {
    		return err
    	}
    
    	data, err := json.Marshal(r.srvInfo)
    	if err != nil {
    		return err
    	}
    	_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
    	return err
    }
    
    // unregister 删除节点
    func (r *Register) unregister() error {
    	_, err := r.cli.Delete(context.Background(), BuildRegPath(r.srvInfo))
    	return err
    }
    
    // keepAlive
    func (r *Register) keepAlive() {
    	ticker := time.NewTicker(time.Duration(r.srvTTL) * time.Second)
    	for {
    		select {
    		case <-r.closeCh:
    			if err := r.unregister(); err != nil {
    				r.logger.Error("unregister failed", zap.Error(err))
    			}
    			if _, err := r.cli.Revoke(context.Background(), r.leasesID); err != nil {
    				r.logger.Error("revoke failed", zap.Error(err))
    			}
    			return
    		case res := <-r.keepAliveCh:
    			if res == nil {
    				if err := r.register(); err != nil {
    					r.logger.Error("register failed", zap.Error(err))
    				}
    			}
    		case <-ticker.C:
    			if r.keepAliveCh == nil {
    				if err := r.register(); err != nil {
    					r.logger.Error("register failed", zap.Error(err))
    				}
    			}
    		}
    	}
    }
    
    // UpdateHandler return http handler
    func (r *Register) UpdateHandler() http.HandlerFunc {
    	return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    		wi := req.URL.Query().Get("weight")
    		weight, err := strconv.Atoi(wi)
    		if err != nil {
    			w.WriteHeader(http.StatusBadRequest)
    			w.Write([]byte(err.Error()))
    			return
    		}
    
    		var update = func() error {
    			r.srvInfo.Weight = int64(weight)
    			data, err := json.Marshal(r.srvInfo)
    			if err != nil {
    				return err
    			}
    			_, err = r.cli.Put(context.Background(), BuildRegPath(r.srvInfo), string(data), clientv3.WithLease(r.leasesID))
    			return err
    		}
    
    		if err := update(); err != nil {
    			w.WriteHeader(http.StatusInternalServerError)
    			w.Write([]byte(err.Error()))
    			return
    		}
    		w.Write([]byte("update server weight success"))
    	})
    }
    
    func (r *Register) GetServerInfo() (Server, error) {
    	resp, err := r.cli.Get(context.Background(), BuildRegPath(r.srvInfo))
    	if err != nil {
    		return r.srvInfo, err
    	}
    	info := Server{}
    	if resp.Count >= 1 {
    		if err := json.Unmarshal(resp.Kvs[0].Value, &info); err != nil {
    			return info, err
    		}
    	}
    	return info, nil
    }
    

    来分析下上面的代码实现

    当启动一个grpc的时候我们注册到etcd中

    	etcdRegister := discovery.NewRegister(config.Etcd.Addrs, log.Logger)
    	node := discovery.Server{
    		Name: app,
    		Addr: utils.InternalIP() + config.Port.GRPC,
    	}
    
    	if _, err := etcdRegister.Register(node, 10); err != nil {
    		panic(fmt.Sprintf("server register failed: %v", err))
    	}
    

    调用服务注册的时候首先分配了一个租约

    func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
    	r := &pb.LeaseGrantRequest{TTL: ttl}
    	resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
    	if err == nil {
    		gresp := &LeaseGrantResponse{
    			ResponseHeader: resp.GetHeader(),
    			ID:             LeaseID(resp.ID),
    			TTL:            resp.TTL,
    			Error:          resp.Error,
    		}
    		return gresp, nil
    	}
    	return nil, toErr(ctx, err)
    }
    

    然后通过KeepAlive保活

    // KeepAlive尝试保持给定的租约永久alive
    func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
    	ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
    
    	l.mu.Lock()
    	// ensure that recvKeepAliveLoop is still running
    	select {
    	case <-l.donec:
    		err := l.loopErr
    		l.mu.Unlock()
    		close(ch)
    		return ch, ErrKeepAliveHalted{Reason: err}
    	default:
    	}
    	ka, ok := l.keepAlives[id]
    	if !ok {
    		// create fresh keep alive
    		ka = &keepAlive{
    			chs:           []chan<- *LeaseKeepAliveResponse{ch},
    			ctxs:          []context.Context{ctx},
    			deadline:      time.Now().Add(l.firstKeepAliveTimeout),
    			nextKeepAlive: time.Now(),
    			donec:         make(chan struct{}),
    		}
    		l.keepAlives[id] = ka
    	} else {
    		// add channel and context to existing keep alive
    		ka.ctxs = append(ka.ctxs, ctx)
    		ka.chs = append(ka.chs, ch)
    	}
    	l.mu.Unlock()
    
    	go l.keepAliveCtxCloser(ctx, id, ka.donec)
    	// 使用once只在第一次调用
    	l.firstKeepAliveOnce.Do(func() {
    		// 500毫秒一次,不断的发送保持活动请求
    		go l.recvKeepAliveLoop()
    		// 删除等待太久没反馈的租约
    		go l.deadlineLoop()
    	})
    
    	return ch, nil
    }
    
    // deadlineLoop获取在租约TTL中没有收到响应的任何保持活动的通道
    func (l *lessor) deadlineLoop() {
    	for {
    		select {
    		case <-time.After(time.Second):
    			// donec 关闭,当 recvKeepAliveLoop 停止时设置 loopErr
    		case <-l.donec:
    			return
    		}
    		now := time.Now()
    		l.mu.Lock()
    		for id, ka := range l.keepAlives {
    			if ka.deadline.Before(now) {
    				// 等待响应太久;租约可能已过期
    				ka.close()
    				delete(l.keepAlives, id)
    			}
    		}
    		l.mu.Unlock()
    	}
    }
    
    func (l *lessor) recvKeepAliveLoop() (gerr error) {
    	defer func() {
    		l.mu.Lock()
    		close(l.donec)
    		l.loopErr = gerr
    		for _, ka := range l.keepAlives {
    			ka.close()
    		}
    		l.keepAlives = make(map[LeaseID]*keepAlive)
    		l.mu.Unlock()
    	}()
    
    	for {
    		// resetRecv 打开一个新的lease stream并开始发送保持活动请求。
    		stream, err := l.resetRecv()
    		if err != nil {
    			if canceledByCaller(l.stopCtx, err) {
    				return err
    			}
    		} else {
    			for {
    				// 接收lease stream的返回返回
    				resp, err := stream.Recv()
    				if err != nil {
    					if canceledByCaller(l.stopCtx, err) {
    						return err
    					}
    
    					if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
    						l.closeRequireLeader()
    					}
    					break
    				}
    				// 根据LeaseKeepAliveResponse更新租约
    				// 如果租约过期删除所有alive channels
    				l.recvKeepAlive(resp)
    			}
    		}
    
    		select {
    		case <-time.After(retryConnWait):
    			continue
    		case <-l.stopCtx.Done():
    			return l.stopCtx.Err()
    		}
    	}
    }
    
    // resetRecv 打开一个新的lease stream并开始发送保持活动请求。
    func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
    	sctx, cancel := context.WithCancel(l.stopCtx)
    	// 建立服务端和客户端连接的lease stream
    	stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
    	if err != nil {
    		cancel()
    		return nil, err
    	}
    
    	l.mu.Lock()
    	defer l.mu.Unlock()
    	if l.stream != nil && l.streamCancel != nil {
    		l.streamCancel()
    	}
    
    	l.streamCancel = cancel
    	l.stream = stream
    
    	go l.sendKeepAliveLoop(stream)
    	return stream, nil
    }
    
    // sendKeepAliveLoop 在给定流的生命周期内发送保持活动请求
    func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
    	for {
    		var tosend []LeaseID
    
    		now := time.Now()
    		l.mu.Lock()
    		for id, ka := range l.keepAlives {
    			if ka.nextKeepAlive.Before(now) {
    				tosend = append(tosend, id)
    			}
    		}
    		l.mu.Unlock()
    
    		for _, id := range tosend {
    			r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
    			if err := stream.Send(r); err != nil {
    				// TODO do something with this error?
    				return
    			}
    		}
    
    		select {
    		// 每500毫秒执行一次
    		case <-time.After(500 * time.Millisecond):
    		case <-stream.Context().Done():
    			return
    		case <-l.donec:
    			return
    		case <-l.stopCtx.Done():
    			return
    		}
    	}
    }
    
    // 撤销给定的租约,所有附加到租约的key将过期并被删除  
    func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
    	r := &pb.LeaseRevokeRequest{ID: int64(id)}
    	resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
    	if err == nil {
    		return (*LeaseRevokeResponse)(resp), nil
    	}
    	return nil, toErr(ctx, err)
    }
    

    总结:

    1、每次注册一个服务的分配一个租约;

    2、KeepAlive通过从客户端到服务器端的流化的keep alive请求和从服务器端到客户端的流化的keep alive应答来维持租约;

    3、KeepAlive会500毫秒进行一次lease stream的发送;

    4、然后接收到KeepAlive发送信息回执,处理更新租约,服务处于活动状态;

    5、如果在租约TTL中没有收到响应的任何保持活动的请求,删除租约;

    6、Revoke撤销一个租约,所有附加到租约的key将过期并被删除。

    服务发现

    我们只需实现grpc在resolver中提供了Builder和Resolver接口,就能完成gRPC客户端的服务发现和负载均衡

    // 创建一个resolver用于监视名称解析更新
    type Builder interface {
    	Build(target Target, cc ClientConn, opts BuildOption) (Resolver, error)
    	Scheme() string
    }
    
    • Build方法:为给定目标创建一个新的resolver,当调用grpc.Dial()时执行;

    • Scheme方法:返回此resolver支持的方案,可参考Scheme定义

    // 监视指定目标的更新,包括地址更新和服务配置更新
    type Resolver interface {
    	ResolveNow(ResolveNowOption)
    	Close()
    }
    
    • ResolveNow方法:被 gRPC 调用,以尝试再次解析目标名称。只用于提示,可忽略该方法;

    • Close方法:关闭resolver。

    接下来看下具体的实现

    package discovery
    
    import (
    	"context"
    	"time"
    
    	"go.uber.org/zap"
    
    	"go.etcd.io/etcd/api/v3/mvccpb"
    	clientv3 "go.etcd.io/etcd/client/v3"
    	"google.golang.org/grpc/resolver"
    )
    
    const (
    	schema = "etcd"
    )
    
    // Resolver for grpc client
    type Resolver struct {
    	schema      string
    	EtcdAddrs   []string
    	DialTimeout int
    
    	closeCh      chan struct{}
    	watchCh      clientv3.WatchChan
    	cli          *clientv3.Client
    	keyPrifix    string
    	srvAddrsList []resolver.Address
    
    	cc     resolver.ClientConn
    	logger *zap.Logger
    }
    
    // NewResolver create a new resolver.Builder base on etcd
    func NewResolver(etcdAddrs []string, logger *zap.Logger) *Resolver {
    	return &Resolver{
    		schema:      schema,
    		EtcdAddrs:   etcdAddrs,
    		DialTimeout: 3,
    		logger:      logger,
    	}
    }
    
    // Scheme returns the scheme supported by this resolver.
    func (r *Resolver) Scheme() string {
    	return r.schema
    }
    
    // Build creates a new resolver.Resolver for the given target
    func (r *Resolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    	r.cc = cc
    
    	r.keyPrifix = BuildPrefix(Server{Name: target.Endpoint, Version: target.Authority})
    	if _, err := r.start(); err != nil {
    		return nil, err
    	}
    	return r, nil
    }
    
    // ResolveNow resolver.Resolver interface
    func (r *Resolver) ResolveNow(o resolver.ResolveNowOptions) {}
    
    // Close resolver.Resolver interface
    func (r *Resolver) Close() {
    	r.closeCh <- struct{}{}
    }
    
    // start
    func (r *Resolver) start() (chan<- struct{}, error) {
    	var err error
    	r.cli, err = clientv3.New(clientv3.Config{
    		Endpoints:   r.EtcdAddrs,
    		DialTimeout: time.Duration(r.DialTimeout) * time.Second,
    	})
    	if err != nil {
    		return nil, err
    	}
    	resolver.Register(r)
    
    	r.closeCh = make(chan struct{})
    
    	if err = r.sync(); err != nil {
    		return nil, err
    	}
    
    	go r.watch()
    
    	return r.closeCh, nil
    }
    
    // watch update events
    func (r *Resolver) watch() {
    	ticker := time.NewTicker(time.Minute)
    	r.watchCh = r.cli.Watch(context.Background(), r.keyPrifix, clientv3.WithPrefix())
    
    	for {
    		select {
    		case <-r.closeCh:
    			return
    		case res, ok := <-r.watchCh:
    			if ok {
    				r.update(res.Events)
    			}
    		case <-ticker.C:
    			if err := r.sync(); err != nil {
    				r.logger.Error("sync failed", zap.Error(err))
    			}
    		}
    	}
    }
    
    // update
    func (r *Resolver) update(events []*clientv3.Event) {
    	for _, ev := range events {
    		var info Server
    		var err error
    
    		switch ev.Type {
    		case mvccpb.PUT:
    			info, err = ParseValue(ev.Kv.Value)
    			if err != nil {
    				continue
    			}
    			addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
    			if !Exist(r.srvAddrsList, addr) {
    				r.srvAddrsList = append(r.srvAddrsList, addr)
    				r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
    			}
    		case mvccpb.DELETE:
    			info, err = SplitPath(string(ev.Kv.Key))
    			if err != nil {
    				continue
    			}
    			addr := resolver.Address{Addr: info.Addr}
    			if s, ok := Remove(r.srvAddrsList, addr); ok {
    				r.srvAddrsList = s
    				r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
    			}
    		}
    	}
    }
    
    // sync 同步获取所有地址信息
    func (r *Resolver) sync() error {
    	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    	defer cancel()
    	res, err := r.cli.Get(ctx, r.keyPrifix, clientv3.WithPrefix())
    	if err != nil {
    		return err
    	}
    	r.srvAddrsList = []resolver.Address{}
    
    	for _, v := range res.Kvs {
    		info, err := ParseValue(v.Value)
    		if err != nil {
    			continue
    		}
    		addr := resolver.Address{Addr: info.Addr, Metadata: info.Weight}
    		r.srvAddrsList = append(r.srvAddrsList, addr)
    	}
    	r.cc.UpdateState(resolver.State{Addresses: r.srvAddrsList})
    	return nil
    }
    

    总结:

    1、watch会监听前缀的信息变更,有变更的通知,及时更新srvAddrsList的地址信息;

    2、sync会定时的同步etcd中的可用的服务地址到srvAddrsList中;

    3、使用UpdateState更新ClientConn的Addresses;

    4、然后grpc客户端就能根据配置的具体策略发送请求到grpc的server中。

    这里使用gRPC内置的负载均衡策略round_robin,根据负载均衡地址,以轮询的方式进行调用服务,来测试下服务的发现和简单的服务负载

    package discovery
    
    import (
    	"context"
    	"fmt"
    	"log"
    	"net"
    	"testing"
    	"time"
    
    	"go.uber.org/zap"
    	"google.golang.org/grpc/balancer/roundrobin"
    	"google.golang.org/grpc/resolver"
    
    	"etcd-learning/discovery/helloworld"
    
    	"google.golang.org/grpc"
    )
    
    var etcdAddrs = []string{"127.0.0.1:2379"}
    
    func TestResolver(t *testing.T) {
    	r := NewResolver(etcdAddrs, zap.NewNop())
    	resolver.Register(r)
    
    	// etcd中注册5个服务
    	go newServer(t, ":1001", "1.0.0", 1)
    	go newServer(t, ":1002", "1.0.0", 1)
    	go newServer(t, ":1003", "1.0.0", 1)
    	go newServer(t, ":1004", "1.0.0", 1)
    	go newServer(t, ":1006", "1.0.0", 10)
    
    	conn, err := grpc.Dial("etcd:///hello", grpc.WithInsecure(), grpc.WithBalancerName(roundrobin.Name))
    	if err != nil {
    		t.Fatalf("failed to dial %v", err)
    	}
    	defer conn.Close()
    
    	c := helloworld.NewGreeterClient(conn)
    
    	// 进行十次数据请求
    	for i := 0; i < 10; i++ {
    		resp, err := c.SayHello(context.Background(), &helloworld.HelloRequest{Name: "abc"})
    		if err != nil {
    			t.Fatalf("say hello failed %v", err)
    		}
    		log.Println(resp.Message)
    		time.Sleep(100 * time.Millisecond)
    	}
    
    	time.Sleep(10 * time.Second)
    }
    
    type server struct {
    	Port string
    }
    
    // SayHello implements helloworld.GreeterServer
    func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
    	return &helloworld.HelloReply{Message: fmt.Sprintf("Hello From %s", s.Port)}, nil
    }
    
    func newServer(t *testing.T, port string, version string, weight int64) {
    	register := NewRegister(etcdAddrs, zap.NewNop())
    	defer register.Stop()
    
    	listen, err := net.Listen("tcp", port)
    	if err != nil {
    		log.Fatalf("failed to listen %v", err)
    	}
    
    	s := grpc.NewServer()
    	helloworld.RegisterGreeterServer(s, &server{Port: port})
    
    	info := Server{
    		Name:    "hello",
    		Addr:    fmt.Sprintf("127.0.0.1%s", port),
    		Version: version,
    		Weight:  weight,
    	}
    
    	register.Register(info, 10)
    
    	if err := s.Serve(listen); err != nil {
    		log.Fatalf("failed to server %v", err)
    	}
    }
    

    这里注册了5个服务,端口号是1001到1006,循环调用10次

    === RUN   TestResolver
    2021/07/24 22:44:52 Hello From :1001
    2021/07/24 22:44:52 Hello From :1006
    2021/07/24 22:44:53 Hello From :1001
    2021/07/24 22:44:53 Hello From :1002
    2021/07/24 22:44:53 Hello From :1003
    2021/07/24 22:44:53 Hello From :1004
    2021/07/24 22:44:53 Hello From :1006
    2021/07/24 22:44:53 Hello From :1001
    2021/07/24 22:44:53 Hello From :1002
    2021/07/24 22:44:53 Hello From :1003
    

    发现每次的请求会发送到不同的服务中

    负载均衡

    集中式LB(Proxy Model)

    grpc

    在服务消费者和服务提供者之间有一个独立的LB,通常是专门的硬件设备如 F5,或者基于软件如LVSHAproxy等实现。LB上有所有服务的地址映射表,通常由运维配置注册,当服务消费方调用某个目标服务时,它向LB发起请求,由LB以某种策略,比如轮询(Round-Robin)做负载均衡后将请求转发到目标服务。LB一般具备健康检查能力,能自动摘除不健康的服务实例。

    该方案主要问题:

    1、单点问题,所有服务调用流量都经过LB,当服务数量和调用量大的时候,LB容易成为瓶颈,且一旦LB发生故障影响整个系统;

    2、服务消费方、提供方之间增加了一级,有一定性能开销。

    进程内LB(Balancing-aware Client)

    grpc

    针对第一个方案的不足,此方案将LB的功能集成到服务消费方进程里,也被称为软负载或者客户端负载方案。服务提供方启动时,首先将服务地址注册到服务注册表,同时定期报心跳到服务注册表以表明服务的存活状态,相当于健康检查,服务消费方要访问某个服务时,它通过内置的LB组件向服务注册表查询,同时缓存并定期刷新目标服务地址列表,然后以某种负载均衡策略选择一个目标服务地址,最后向目标服务发起请求。LB和服务发现能力被分散到每一个服务消费者的进程内部,同时服务消费方和服务提供方之间是直接调用,没有额外开销,性能比较好。

    该方案主要问题:

    1、开发成本,该方案将服务调用方集成到客户端的进程里头,如果有多种不同的语言栈,就要配合开发多种不同的客户端,有一定的研发和维护成本;

    2、另外生产环境中,后续如果要对客户库进行升级,势必要求服务调用方修改代码并重新发布,升级较复杂。

    独立 LB 进程(External Load Balancing Service)

    grpc

    该方案是针对第二种方案的不足而提出的一种折中方案,原理和第二种方案基本类似。

    不同之处是将LB和服务发现功能从进程内移出来,变成主机上的一个独立进程。主机上的一个或者多个服务要访问目标服务时,他们都通过同一主机上的独立LB进程做服务发现和负载均衡。该方案也是一种分布式方案没有单点问题,一个LB进程挂了只影响该主机上的服务调用方,服务调用方和LB之间是进程内调用性能好,同时该方案还简化了服务调用方,不需要为不同语言开发客户库,LB的升级不需要服务调用方改代码。

    该方案主要问题:部署较复杂,环节多,出错调试排查问题不方便。

    上面通过etcd实现服务发现,使用的及时第二种 进程内LB(Balancing-aware Client)。

    参考

    【Load Balancing in gRPC】https://github.com/grpc/grpc/blob/master/doc/load-balancing.md
    【文中的代码示例】https://github.com/boilingfrog/etcd-learning/tree/main/discovery

  • 相关阅读:
    LightOJ 1132 Summing up Powers(矩阵快速幂)
    hdu 3804 Query on a tree (树链剖分+线段树)
    LightOJ 1052 String Growth && uva 12045 Fun with Strings (矩阵快速幂)
    uva 12304 2D Geometry 110 in 1! (Geometry)
    LA 3263 That Nice Euler Circuit (2D Geometry)
    2013 SCAUCPC Summary
    poj 3321 Apple Tree (Binary Index Tree)
    uva 11796 Dog Distance (几何+模拟)
    uva 11178 Morley's Theorem (2D Geometry)
    动手动脑
  • 原文地址:https://www.cnblogs.com/ricklz/p/15059497.html
Copyright © 2011-2022 走看看