go-zero 服务注册和发现
在没有服务注册和发现的时候, 没新上一个服务, 或者没部署一个新的节点, 都要改所有调用方的配置文件, 简直就是配置噩梦, 还容易配置错误
分析一个go-zero的服务注册和发现,
接着上面的代码, go-zero实战, 看看rpc客户端怎么寻址到rpc服务端的
//logic调用的代码
regRsp, err := l.svcCtx.UserServiceRpc.Register(l.ctx, in)
//rpc/userserviceclient/userservice.go
func NewUserService(cli zrpc.Client) UserService {
return &defaultUserService{
cli: cli,
}
}
// 注册
func (m *defaultUserService) Register(ctx context.Context, in *RegisterRequest) (*RegisterResponse, error) {
//发起调用, 使用的是上面NewUserService里的zrpc.Client
client := userService.NewUserServiceClient(m.cli.Conn())
return client.Register(ctx, in)
}
//api/internal/svc/servicecontext.go 中调用的NewUserService
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
Model: model.NewUserinfoModel(sqlx.NewMysql(c.DataSource), c.Cache),
UserServiceRpc: userserviceclient.NewUserService(zrpc.MustNewClient(c.Rpc)), //初始化rpcClient
}
}
//先看一下zrpc.MustNewClient 这个方法, 传入的配置文件中的etcd 的hosts和服务key, 跟进去看下这个方法
//github.com/tal-tech/go-zero/zrpc/client.go
//这个方法没啥, 继续往下面走
func MustNewClient(c RpcClientConf, options ...ClientOption) Client {
cli, err := NewClient(c, options...)
if err != nil {
log.Fatal(err)
}
return cli
}
//方法中主要方法是 internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
func NewClient(c RpcClientConf, options ...ClientOption) (Client, error) {
var opts []ClientOption
if c.HasCredential() {
opts = append(opts, WithDialOption(grpc.WithPerRPCCredentials(&auth.Credential{
App: c.App,
Token: c.Token,
})))
}
if c.Timeout > 0 {
opts = append(opts, WithTimeout(time.Duration(c.Timeout)*time.Millisecond))
}
opts = append(opts, options...)
var client Client
var err error
if len(c.Endpoints) > 0 {
client, err = internal.NewClient(internal.BuildDirectTarget(c.Endpoints), opts...)
} else if err = c.Etcd.Validate(); err == nil {
client, err = internal.NewClient(internal.BuildDiscovTarget(c.Etcd.Hosts, c.Etcd.Key), opts...)
}
if err != nil {
return nil, err
}
return &RpcClient{
client: client,
}, nil
}
//先看一下internal.BuildDiscovTarget, 这个方法入参是etcd的hosts和 服务的key, 返回的是一个类似url的东西, 协议是DiscovScheme = "discov"
func BuildDiscovTarget(endpoints []string, key string) string {
return fmt.Sprintf("%s://%s/%s", resolver.DiscovScheme,
strings.Join(endpoints, resolver.EndpointSep), key)
}
//BuildDiscovTarget返回的url类似 : discov://127.0.0.1:2379/user-service, 传入NewClient中
//这个函数有两个核心逻辑 一个是grpc.WithBalancerName(p2c.Name)
// NewClient returns a Client.
func NewClient(target string, opts ...ClientOption) (Client, error) {
var cli client
opts = append([]ClientOption{WithDialOption(grpc.WithBalancerName(p2c.Name))}, opts...)
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
}
//WithBalancerName 这方法看名字知道是负载均衡的作用, 通过balancerName获取的, 对应入参的p2c.Name
func WithBalancerName(balancerName string) DialOption {
builder := balancer.Get(balancerName)
if builder == nil {
panic(fmt.Sprintf("grpc.WithBalancerName: no balancer is registered for name %v", balancerName))
}
return newFuncDialOption(func(o *dialOptions) {
o.balancerBuilder = builder
})
}
//跟着p2c.Name进去, github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/balancer/p2c/p2c.go
//在这里注入的负载均衡, 核心的逻辑在Pick中, 大致是一个可选就选一个, 两个就选择连接数最小的, 两个以上就随机两个出来进行选择
//如何选择的逻辑在choose(c1, c2 *subConn)方法中, 基本上就两个选连接数小的那个,
func init() {
balancer.Register(newBuilder())
}
type p2cPickerBuilder struct{}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}
//....
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
p.lock.Lock()
defer p.lock.Unlock()
var chosen *subConn
switch len(p.conns) {
case 0:
return nil, nil, balancer.ErrNoSubConnAvailable
case 1:
chosen = p.choose(p.conns[0], nil)
case 2:
chosen = p.choose(p.conns[0], p.conns[1])
default:
var node1, node2 *subConn
for i := 0; i < pickTimes; i++ {
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
if b >= a {
b++
}
node1 = p.conns[a]
node2 = p.conns[b]
if node1.healthy() && node2.healthy() {
break
}
}
chosen = p.choose(node1, node2)
}
atomic.AddInt64(&chosen.inflight, 1)
atomic.AddInt64(&chosen.requests, 1)
return chosen.conn, p.buildDoneFunc(chosen), nil
}
//继续返回去看NewClient方法中的dial方法, 传入的是target, 也就是那个url, discov://127.0.0.1:2379/user-service
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/client.go
//这个方法本身没什么, 主要就是调用grpc.DialContext()方法, 这里就进入了grpc的逻辑了, 相当于通过grpc dial 了discov://127.0.0.1:2379/user-service, 继续进去看
func (c *client) dial(server string, opts ...ClientOption) error {
options := c.buildDialOptions(opts...)
timeCtx, cancel := context.WithTimeout(context.Background(), dialTimeout)
defer cancel()
conn, err := grpc.DialContext(timeCtx, server, options...)
if err != nil {
service := server
if errors.Is(err, context.DeadlineExceeded) {
pos := strings.LastIndexByte(server, separator)
// len(server) - 1 is the index of last char
if 0 < pos && pos < len(server)-1 {
service = server[pos+1:]
}
}
return fmt.Errorf("rpc dial: %s, error: %s, make sure rpc service %q is alread started",
server, err.Error(), service)
}
c.conn = conn
return nil
}
//google.golang.org/grpc@v1.29.1/clientconn.go, 这个代码逻辑比较多, 我们找到我们关心的部分(服务发现), 就是如何解析discov://127.0.0.1:2379/user-service 成为一个ip:port, 通过分析发现cc.parsedTarget.Scheme, 也就是一开始拼接的discov字符串, 跟这getResolver方法进去
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
conns: make(map[*addrConn]struct{}),
dopts: defaultDialOptions(),
blockingpicker: newPickerWrapper(),
czData: new(channelzData),
firstResolveEvent: grpcsync.NewEvent(),
}
//.....
//发现是通过Scheme去获取的, 也就是discov,
resolverBuilder := cc.getResolver(cc.parsedTarget.Scheme)
//....
return cc, nil
}
//google.golang.org/grpc@v1.29.1/resolver/resolver.go
//发现get是用map中读的, map数据是Register方法注入的, 返回到DiscovScheme = "discov"定义的地方, 看看有没有调用Register方法
var (
// m is a map from scheme to resolver builder.
m = make(map[string]Builder)
// defaultScheme is the default scheme to use.
defaultScheme = "passthrough"
)
// registered with the same name, the one registered last will take effect.
func Register(b Builder) {
m[b.Scheme()] = b
}
// Get returns the resolver builder registered with the given scheme.
//
// If no builder is register with the scheme, nil will be returned.
func Get(scheme string) Builder {
if b, ok := m[scheme]; ok {
return b
}
return nil
}
//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/resolver.go
//注入的是discovBuilder, 继续看下discovBuilder的具体实现
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}
//github.com/tal-tech/go-zero@v1.1.7/zrpc/internal/resolver/discovbuilder.go
//具体逻辑看函数中的知识
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
//new一个服务发现的客户端, 里面基本上就是个etcd的封装, etcd的逻辑在NewSubscriber里面, 比较简单, 就写出来了
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
//拿到服务key的所有etcd中的数据
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
//实时监听etcd数据变化, 然后通过update方法更新数据到grpc的client
sub.AddListener(update)
//初始化的时候调用一次
update()
return &nopResolver{cc: cc}, nil
}
func (d *discovBuilder) Scheme() string {
return DiscovScheme
}
//到这里客户端的服务发现就结束了
//服务注册的代码在rpc/userservice.go中, MustNewServer调用NewServer
func MustNewServer(c RpcServerConf, register internal.RegisterFn) *RpcServer {
server, err := NewServer(c, register)
if err != nil {
log.Fatal(err)
}
return server
}
//NewServer调用的是server, err = internal.NewRpcPubServer(c.Etcd.Hosts, c.Etcd.Key, c.ListenOn, internal.WithMetrics(metrics))
//这个方法进去就是调用discov.NewPublishe
pubClient := discov.NewPublisher(etcdEndpoints, etcdKey, pubListenOn)
//继续跟进去, 发现在KeepAlive()中会调用register方法, 用etcd的put方法注册到etcd中(client.Put)
func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, error) {
resp, err := client.Grant(client.Ctx(), TimeToLive)
if err != nil {
return clientv3.NoLease, err
}
lease := resp.ID
if p.id > 0 {
p.fullKey = makeEtcdKey(p.key, p.id)
} else {
p.fullKey = makeEtcdKey(p.key, int64(lease))
}
_, err = client.Put(client.Ctx(), p.fullKey, p.value, clientv3.WithLease(lease))
return lease, err
}
总结
- go-zero的注册发现代码比较容易懂, 比较简单, 可以作为初步阅读源码的练手项目
- 业务上基本上是够用的