zoukankan      html  css  js  c++  java
  • go-zero服务注册和发现

    go-zero 服务注册和发现

    在没有服务注册和发现的时候, 没新上一个服务, 或者没部署一个新的节点, 都要改所有调用方的配置文件, 简直就是配置噩梦, 还容易配置错误
    分析一个go-zero的服务注册和发现,

    接着上面的代码, go-zero实战, 看看rpc客户端怎么寻址到rpc服务端的

    //logic调用的代码
    regRsp, err := l.svcCtx.UserServiceRpc.Register(l.ctx, in)
    
    //rpc/userserviceclient/userservice.go
    
    
    func NewUserService(cli zrpc.Client) UserService {
       return &defaultUserService{
       	cli: cli,
       }
    }
    
    // 注册
    func (m *defaultUserService) Register(ctx context.Context, in *RegisterRequest) (*RegisterResponse, error) {
       //发起调用, 使用的是上面NewUserService里的zrpc.Client
       client := userService.NewUserServiceClient(m.cli.Conn())
       return client.Register(ctx, in)
    }
    
    //api/internal/svc/servicecontext.go 中调用的NewUserService
    func NewServiceContext(c config.Config) *ServiceContext {
       return &ServiceContext{
       	Config:         c,
       	Model:          model.NewUserinfoModel(sqlx.NewMysql(c.DataSource), c.Cache),
       	UserServiceRpc: userserviceclient.NewUserService(zrpc.MustNewClient(c.Rpc)), //初始化rpcClient
       }
    }
    
    //先看一下zrpc.MustNewClient 这个方法, 传入的配置文件中的etcd 的hosts和服务key, 跟进去看下这个方法
    //github.com/tal-tech/go-zero/zrpc/client.go
    //这个方法没啥, 继续往下面走
    func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
       cli, err := NewClient(c, options...)
       if err != nil {
       	log.Fatal(err)
       }
    
       return cli
    }
    
    
    //方法中主要方法是 internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
    func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
       var opts []ClientOption
       if c.HasCredential() {
       	opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
       		App:   c.App,
       		Token: c.Token,
       	})))
       }
       if c.Timeout > 0 {
       	opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
       }
       opts = append(opts, options...)
    
       var client Client
       var err error
       if len(c.Endpoints) > 0 {
       	client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
       } else if err = c.Etcd.Validate(); err == nil {
       	client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
       }
       if err != nil {
       	return nil, err
       }
    
       return &RpcClient{
       	client: client,
       }, nil
    }
    
    //先看一下internal.BuildDiscovTarget, 这个方法入参是etcd的hosts和 服务的key, 返回的是一个类似url的东西, 协议是DiscovScheme = "discov"
    func BuildDiscovTarget(endpoints []string, key string) string {
       return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
       	strings.Join(endpoints, resolver.EndpointSep), key)
    }
    
    
    //BuildDiscovTarget返回的url类似 : discov://127.0.0.1:2379/user-service, 传入NewClient中
    //这个函数有两个核心逻辑 一个是grpc.WithBalancerName(p2c.Name)
    
    // NewClient returns a Client.
    func NewClient(target string, opts ...ClientOption) (Client, error) {
       var cli client
       opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
       if err := cli.dial(target, opts...); err != nil {
       	return nil, err
       }
    
       return &cli, nil
    }
    
    //WithBalancerName 这方法看名字知道是负载均衡的作用, 通过balancerName获取的, 对应入参的p2c.Name
    func WithBalancerName(balancerName string) DialOption {
       builder := balancer.Get(balancerName)
       if builder == nil {
       	panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
       }
       return newFuncDialOption(func(o *dialOptions) {
       	o.balancerBuilder = builder
       })
    }
    
    //跟着p2c.Name进去, github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/balancer/p2c/p2c.go
    //在这里注入的负载均衡, 核心的逻辑在Pick中, 大致是一个可选就选一个, 两个就选择连接数最小的, 两个以上就随机两个出来进行选择
    //如何选择的逻辑在choose(c1, c2 *subConn)方法中, 基本上就两个选连接数小的那个, 
    func init() {
       balancer.Register(newBuilder())
    }
    
    type p2cPickerBuilder struct{}
    
    func newBuilder() balancer.Builder {
       return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
    }
    
    //....
    func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
       conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
       p.lock.Lock()
       defer p.lock.Unlock()
    
       var chosen *subConn
       switch len(p.conns) {
       case 0:
       	return nil, nil, balancer.ErrNoSubConnAvailable
       case 1:
       	chosen = p.choose(p.conns[0], nil)
       case 2:
       	chosen = p.choose(p.conns[0], p.conns[1])
       default:
       	var node1, node2 *subConn
       	for i := 0; i < pickTimes; i++ {
       		a := p.r.Intn(len(p.conns))
       		b := p.r.Intn(len(p.conns) - 1)
       		if b >= a {
       			b++
       		}
       		node1 = p.conns[a]
       		node2 = p.conns[b]
       		if node1.healthy() && node2.healthy() {
       			break
       		}
       	}
    
       	chosen = p.choose(node1, node2)
       }
    
       atomic.AddInt64(&chosen.inflight, 1)
       atomic.AddInt64(&chosen.requests, 1)
       return chosen.conn, p.buildDoneFunc(chosen), nil
    }
    
    //继续返回去看NewClient方法中的dial方法, 传入的是target, 也就是那个url, discov://127.0.0.1:2379/user-service
       if err := cli.dial(target, opts...); err != nil {
       	return nil, err
       }
    
    
    //github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/client.go
    //这个方法本身没什么, 主要就是调用grpc.DialContext()方法, 这里就进入了grpc的逻辑了, 相当于通过grpc dial 了discov://127.0.0.1:2379/user-service, 继续进去看
    func (c *client) dial(server string, opts ...ClientOption) error {
       options := c.buildDialOptions(opts...)
       timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
       defer cancel()
       conn, err := grpc.DialContext(timeCtx, server, options...)
       if err != nil {
       	service := server
       	if errors.Is(err, context.DeadlineExceeded) {
       		pos := strings.LastIndexByte(server, separator)
       		// len(server) - 1 is the index of last char
       		if 0 < pos && pos < len(server)-1 {
       			service = server[pos+1:]
       		}
       	}
       	return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
       		server, err.Error(), service)
       }
    
       c.conn = conn
       return nil
    }
    
    //google.golang.org/grpc@v1.29.1/clientconn.go, 这个代码逻辑比较多, 我们找到我们关心的部分(服务发现), 就是如何解析discov://127.0.0.1:2379/user-service 成为一个ip:port, 通过分析发现cc.parsedTarget.Scheme, 也就是一开始拼接的discov字符串, 跟这getResolver方法进去
    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
       cc := &ClientConn{
       	target:            target,
       	csMgr:             &connectivityStateManager{},
       	conns:             make(map[*addrConn]struct{}),
       	dopts:             defaultDialOptions(),
       	blockingpicker:    newPickerWrapper(),
       	czData:            new(channelzData),
       	firstResolveEvent: grpcsync.NewEvent(),
       }
       //.....
       //发现是通过Scheme去获取的, 也就是discov, 
       resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
       
       //....
       return cc, nil
    }
    
    //google.golang.org/grpc@v1.29.1/resolver/resolver.go
    //发现get是用map中读的, map数据是Register方法注入的, 返回到DiscovScheme = "discov"定义的地方, 看看有没有调用Register方法
    var (
       // m is a map from scheme to resolver builder.
       m = make(map[string]Builder)
       // defaultScheme is the default scheme to use.
       defaultScheme = "passthrough"
    )
    // registered with the same name, the one registered last will take effect.
    func Register(b Builder) {
       m[b.Scheme()] = b
    }
    
    // Get returns the resolver builder registered with the given scheme.
    //
    // If no builder is register with the scheme, nil will be returned.
    func Get(scheme string) Builder {
       if b, ok := m[scheme]; ok {
       	return b
       }
       return nil
    }
    
    
    //github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/resolver.go
    //注入的是discovBuilder, 继续看下discovBuilder的具体实现
    func RegisterResolver() {
       resolver.Register(&dirBuilder)
       resolver.Register(&disBuilder)
    }
    
    //github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/discovbuilder.go
    //具体逻辑看函数中的知识
    
    func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
       resolver.Resolver, error) {
       hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
       	return r == EndpointSepChar
       })
       //new一个服务发现的客户端, 里面基本上就是个etcd的封装, etcd的逻辑在NewSubscriber里面, 比较简单, 就写出来了
       sub, err := discov.NewSubscriber(hosts, target.Endpoint)
       if err != nil {
       	return nil, err
       }
       //拿到服务key的所有etcd中的数据
       update := func() {
       	var addrs []resolver.Address
       	for _, val := range subset(sub.Values(), subsetSize) {
       		addrs = append(addrs, resolver.Address{
       			Addr: val,
       		})
       	}
       	cc.UpdateState(resolver.State{
       		Addresses: addrs,
       	})
       }
       //实时监听etcd数据变化, 然后通过update方法更新数据到grpc的client
       sub.AddListener(update)
       //初始化的时候调用一次
       update()
    
       return &nopResolver{cc: cc}, nil
    }
    
    func (d *discovBuilder) Scheme() string {
       return DiscovScheme
    }
    //到这里客户端的服务发现就结束了
    
    //服务注册的代码在rpc/userservice.go中, MustNewServer调用NewServer
    func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
       server, err := NewServer(c, register)
       if err != nil {
       	log.Fatal(err)
       }
    
       return server
    }
    
    //NewServer调用的是server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, c.ListenOn, internal.WithMetrics(metrics))
    //这个方法进去就是调用discov.NewPublishe
       	pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)
    //继续跟进去, 发现在KeepAlive()中会调用register方法, 用etcd的put方法注册到etcd中(client.Put)
    func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
       resp, err := client.Grant(client.Ctx(), TimeToLive)
       if err != nil {
       	return clientv3.NoLease, err
       }
    
       lease := resp.ID
       if p.id > 0 {
       	p.fullKey = makeEtcdKey(p.key, p.id)
       } else {
       	p.fullKey = makeEtcdKey(p.key, int64(lease))
       }
       _, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
    
       return lease, err
    }
    
    

    总结

    1. go-zero的注册发现代码比较容易懂, 比较简单, 可以作为初步阅读源码的练手项目
    2. 业务上基本上是够用的
  • 相关阅读:
    RTSP协议转RTMP协议的行业视频接入网关EasyRTSPLive如何实现音频转码的
    RTSP协议转RTMP协议的行业视频接入网关EasyRTSPLive之跨平台ini配置及通道的方法
    GB/T28181协议EasyGBS播放1080p视频直播会花屏
    国标GB/T28181协议下播放器起播慢或者延迟高如何解决?
    EasyGBS查找大华设备的录像列表时失败
    ffmpeg增加h264编解码功能模块方法
    EasyNVR控制台运行出现invalid license关于计算机保护软件类似于360、腾讯云管家等限制相关问题
    摄像机经过多级路由转换无法被EasyNVR拉流问题处理方法
    使用EasyNVR软件对接海康摄像头对接失败问题解析
    GB/T28181协议使用EasyNVR降低播放延迟方法
  • 原文地址:https://www.cnblogs.com/leescre/p/14799586.html
Copyright © 2011-2022 走看看