zoukankan      html  css  js  c++  java
  • gRPC-go源码(2):ClientConn

    摘要

    在上一篇文章中,我们聊了聊gRPC是怎么管理一条从ClientServer的连接的。

    我们聊到了gRPC拥有Resolver,用来解析地址;拥有Balancer,用来做负载均衡。

    在这一篇文章中,我们将从代码的角度来分析gRPC是怎么设计ResolverBalancer的,并会从头到尾的梳理一遍连接是怎么建立的。

    1 DialContext

    DialContext是客户端建立连接的入口函数,我们看看在这个函数里面做了哪些事情:

    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    
    	// 1.创建ClientConn结构体
    	cc := &ClientConn{
    		target:            target,
    		...
    	}
    	
    	// 2.解析target
    	cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
    	
    	// 3.根据解析的target找到合适的resolverBuilder
    	resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
    	
    	// 4.创建Resolver
    	rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    	
    	// 5.完事
    	return cc, nil
    }
    

    显而易见,在省略了亿点点细节之后,我们发现建立连接的过程其实也很简单,我们梳理一遍:

    因为gRPC没有提供服务注册,服务发现的功能,所以需要开发者自己编写服务发现的逻辑:也就是Resolver——解析器

    在得到了解析的结果,也就是一连串的IP地址之后,需要对其中的IP进行选择,也就是Balancer

    其余的就是一些错误处理、兜底策略等等,这些内容不在这一篇文章中讲解。

    2 Resolver的获取

    我们从Resolver开始讲起。

    cc.parsedTarget = grpcutil.ParseTarget(cc.target, cc.dopts.copts.Dialer != nil)
    

    关于ParseTarget的逻辑我们用简单一句话来概括:获取开发者传入的target参数的地址类型,在后续查找适合这种类型地址的Resolver

    然后我们来看查找Resolver的这部分操作,这部分代码比较简单,我在代码中加了一些注释:

    resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
    
    func (cc *ClientConn) getResolver(scheme string) resolver.Builder {
    	// 先查看是否在配置中存在resolver
    	for _, rb := range cc.dopts.resolvers {
    		if scheme == rb.Scheme() {
    			return rb
    		}
    	}
    	
    	// 如果配置中没有相应的resolver,再从注册的resolver中寻找
    	return resolver.Get(scheme)
    }
    
    // 可以看出,ResolverBuilder是从m这个map里面找到的
    func Get(scheme string) Builder {
    	if b, ok := m[scheme]; ok {
    		return b
    	}
    	return nil
    }
    

    看到这里我们可以推测:对于每个ResolverBuilder,是需要提前注册的

    我们找到Resolver的代码中,果然发现他在init()的时候注册了自己。

    func init() {
    	resolver.Register(&passthroughBuilder{})
    }
    
    // 注册Resolver,即是把自己加入map中
    func Register(b Builder) {
    	m[b.Scheme()] = b
    }
    

    至此,我们已经研究完了Resolver的注册和获取。

    3 ResolverWrapper的创建

    回到ClientConn的创建过程中,在获取到了ResolverBuilder之后,进行下一步的操作:

    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    

    gRPC为了实现插件式的Resolver,因此采用了装饰器模式,创建了一个ResolverWrapper

    我们看看在创建ResolverWrapper的细节:

    func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    	ccr := &ccResolverWrapper{
    		cc:   cc,
    		done: grpcsync.NewEvent(),
    	}
      
      // 根据传入的Builder,创建resolver,并放入wrapper中
      ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
     	return ccr, nil
    }
    

    好,到了这里我们可以暂停一下。

    我们停下来思考一下我们需要实现的功能:为了解耦ResolverBalancer,我们希望能够有一个中间的部分,接收到Resolver解析到的地址,然后对它们进行负载均衡。因此,在接下来的代码阅读过程中,我们可以带着这个问题:ResolverBalancer的通信过程是什么样的?

    再看上面的代码,ClientConn的创建已经结束了。那么我们可以推测,剩下的逻辑就在rb.Build(cc.parsedTarget, ccr, rbo)这一行代码里面。

    4 Resolver的创建

    其实,Build并不是一个确定的方法,他是一个接口。

    type Builder interface {
    	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    }
    

    在创建Resolver的时候,我们需要在Build方法里面初始化Resolver的各种状态。并且,因为Build方法中有一个target的参数,我们会在创建Resolver的时候,需要对这个target进行解析。

    也就是说,创建Resolver的时候,会进行第一次的域名解析。并且,这个解析过程,是由开发者自己设计的。

    到了这里我们会自然而然的接着考虑,解析之后的结果应该保存为什么样的数据结构,又应该怎么去将这个结果传递下去呢?

    我们拿最简单的passthroughResolver来举例:

    func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    	r := &passthroughResolver{
    		target: target,
    		cc:     cc,
    	}
      // 创建Resolver的时候,进行第一次的解析
    	r.start()
    	return r, nil
    }
    
    // 对于passthroughResolver来说,正如他的名字,直接将参数作为结果返回
    func (r *passthroughResolver) start() {
    	r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint}}})
    }
    

    我们可以看到,对于一个Resolver,需要将解析出的地址,传入resolver.State中,然后调用r.cc.UpdateState方法。

    那么这个r.cc.UpdateState又是什么呢?

    他就是我们上面提到的ccResolverWrapper

    这个时候逻辑就很清晰了,gRPCClientConn通过调用ccResolverWrapper来进行域名解析,而具体的解析过程则由开发者自己决定。在解析完毕后,将解析的结果返回给ccResolverWrapper

    5 Balancer的选择

    我们因此也可以进行推测:在ccResolverWrapper中,会将解析出的结果以某种形式传递给Balancer

    我们接着往下看:

    func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
    	...
      // 将Resolver解析的最新状态保存下来
    	ccr.curState = s
      // 对状态进行更新
    	ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
    }
    

    关于poll方法这里就不提了,重点我们看ccr.cc.updateResolverState(ccr.curState, nil)这部分。

    这里的ccr.cc中的cc,就是我们创建的ClientConn对象。

    也就是说,此时Resolver解析的结果,最终又回到了ClientConn中。

    注意,对于updateResolverState方法,在源码中逻辑比较深,主要是为了处理各种情况。在这里我直接把核心的那部分贴出来,所以这部分的代码你可以理解为是伪代码实现,和原本的代码是有出入的。如果你希望看到具体的实现,你可以去阅读gRPC的源码。

    func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
      
      var newBalancerName string
      
      // 假设已经配置好了balancer,那么使用配置中的balancer
      if cc.sc != nil && cc.sc.lbConfig != nil {
        newBalancerName = cc.sc.lbConfig.name
      } 
      // 否则的话,遍历解析结果中的地址,来判断应该使用哪种balancer
      else {
        var isGRPCLB bool
        for _, a := range addrs {
          if a.Type == resolver.GRPCLB {
            isGRPCLB = true
            break
          }
        }
        if isGRPCLB {
          newBalancerName = grpclbName
        } else if cc.sc != nil && cc.sc.LB != nil {
          newBalancerName = *cc.sc.LB
        } else {
          newBalancerName = PickFirstBalancerName
        }
      }
      
      // 具体的balancer逻辑
      cc.switchBalancer(newBalancerName)
      
      // 使用balancerWrapper更新Client的状态
      bw := cc.balancerWrapper
      uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
    
    	return ret
    }
    

    我们再来康康switchBalancer到底做了什么:

    func (cc *ClientConn) switchBalancer(name string) {
    	...
    	builder := balancer.Get(name)
    	cc.curBalancerName = builder.Name()
    	cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
    }
    

    是不是有一种似曾相识的感觉?

    没错,这部分的代码,跟ResolverWrapper的创建过程很接近。都是获取到对应的Builder Name,然后通过name来获取对应的Builder,然后创建wrapper

    func newCCBalancerWrapper(cc *ClientConn, b balancer.Builder, bopts balancer.BuildOptions) 	*ccBalancerWrapper {
    	ccb := &ccBalancerWrapper{
    		cc:       cc,
    		scBuffer: buffer.NewUnbounded(),
    		done:     grpcsync.NewEvent(),
    		subConns: make(map[*acBalancerWrapper]struct{}),
    	}
    	go ccb.watcher()
    	ccb.balancer = b.Build(ccb, bopts)
    	return ccb
    }
    

    这里的ccb.watcher我们先不管他,这个是跟连接的状态有关的内容,我们将在下一篇文章在进行分析。

    同样的,Build具体的Balancer的过程,也是由开发者自己决定的。

    在Balancer的创建过程中,涉及到了连接的管理。我们同样的把这部分内容放在下一篇中。在这篇文章中我们的主线任务还是ResolverBalancer的交互是怎么样的。

    在创建完相应的BalancerWrapper之后,就来到了bw.updateClientConnState这行了。

    注意,这里的bw就是我们上面创建的balancer。也就是说这里又来到了真正的Balancer逻辑。

    但是这其中的代码我们在这篇文章中先不进行介绍,gRPC对于真正的HTTP/2连接的管理逻辑也比较的复杂,我们下篇文章见。

    6 小结

    到这里我们来总结一下:创建ClientConn的时候创建ResolverWrapper,由ClientConn通知ResolverWrapper进行域名解析。

    此时,ResolverWrapper会将这个请求交给真正的Resolver,由真正的Resolver来处理域名解析。

    解析完毕后,Resolver会将结果保存在ResolverWrapper中,ResolverWrapper再将这个结果返回给ClientConn

    ClientConn发现解析的结果发生了改变,那么他就会去通知BalancerWrapper,重新进行负载均衡。
    此时BalancerWrapper又会去让真正的Balancer做这件事,最终将结果返回给ClientConn

    我们画张图来展示这个过程:

    写在最后

    首先,谢谢你能看到这里。

    这是一篇纯源码解读的文章,作为上一篇纯理论文章的补充。建议两篇文章配合一起食用:)

    如果在这个过程中,你有任何的疑问,都可以留言给我,或者在公众号“红鸡菌”中找到我。

    在下一篇文章中,我将向你介绍Balancer中的具体细节,也就是gRPC的底层连接管理。同样的,我应该也会用一篇文章来介绍应该怎么设计,然后再用一篇文章来介绍具体的实现,我们下篇文章再见。

    再次感谢你的阅读!

  • 相关阅读:
    自考新教材-p240_2
    自考新教材-p243_5_(1)
    自考新教材-p242_4
    自考新教材-p233
    自考新教材-p230
    Spring入门(9)-AOP初探
    MongoDB的备份与恢复
    JVM基础知识(1)-JVM内存区域与内存溢出
    Spring入门(8)-基于Java配置而不是XML
    Spring入门(7)-自动检测Bean
  • 原文地址:https://www.cnblogs.com/hongjijun/p/14466720.html
Copyright © 2011-2022 走看看