zoukankan      html  css  js  c++  java
  • Consul 入门-gRPC 服务注册与发现

    前言

    假如我有钱,我想买一个降噪耳机,我应该哪里买? 答案很简单,可以去京东或者线下实体店。
    那如果把这个问题映射到微服务架构中:我打开京东,选中某款耳机进入详情页浏览,我可以看到这款耳机的价格、库存、规格、评价等。以我的理解,这个链路应该是这样的:

    暂定这个系统由3个微服务组成:商品详情服务、库存服务、评价服务。

    • 商品详情服务:聚合端上用户看到的所有信息
    • 库存服务:维护商品的库存信息、规格信息、价格信息
    • 评价服务:维护用户对商品的评价

    微服务的目的是为了基于松耦合高内聚将单体服务进行拆分,然后将个服务进行多副本部署(我们甚至不知道它会被部署到哪里,实体机?虚拟机?容器?云上?)以达到高可用的目的。这也要付出点代价,商品详情服务需要知道:库存服务和评价服务在哪里?

    由此,我们将继续学习 Consul 这款不错的服务发现工具,通前面的学习,我们已经对 Consul 的原理、使用、搭建有了认知。本次将学习:Consul 如何在 gRPC 构建的微服务网络环境中做一名合格的“指路人”。

    编写一个 Go gRPC 服务

    gRPC 是由 Google 开发并开源的RPC框架,详见官网。我们将通过官网的指导来编写一个简单的 go gRPC 服务

    获取样例代码

    1. 克隆grpc-go仓库
    $ git clone -b v1.29.1.0 https://github.com/grpc/grpc-go
    
    1. 切换到样例代码目录
    $ cd cd grpc-go/examples/helloworld
    

    目录结构如下:

    ├── greeter_client
    │   └── main.go
    ├── helloword
    │   └── helloword.proto
    └── greeter_server
    |   └── main.go
    

    运行样例代码

    1. 编译执行 server 代码:
    $ go run greeter_server/main.go
    
    1. 在新开一个终端,编译执行 client 代码,可以看到输出:
    $ go run greeter_client/main.go
    2021/09/11 16:28:29 Greeting: Hello world
    

    gRPC 的 Banlancer

    greeter_client/main.go中,是通过指定 server 地址的方式来实现访问到目标服务的

    ...
    const (
        address = "localhost:50051"
    )
    
    func main () {
        conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
        ...
    }
    

    但这种方式在生产环境是不可行的,因为我们并不知道目标服务的地址(目标服务的地址也有可能不只一个)。实际上,gRPC 已经为我们提供来解决方案:Balancer。

    首先,看一下 gRPC 客户端负载均衡实现的官方架构图:

    从图中,可以看到 Balancer 均衡器位于架构的最右方,内置一个 Picker 模块,Balancer 主要完成下面几个功能:

    • 与 Rersovler 通信(维持通信机制),接收 Resovler 通知的服务端列表更新,维护 Connection Pool 及每个连接的状态
    • 对上一步获取的服务端列表,调用newSubConn异步建立长连接(每个 Backend 一个长连接),同时,监控连接的状态,及时更新 Connection Pool
    • 创建 Picker,Picker 执行的算法就是真正的 LB 逻辑,当客户端使用conn初始化 PRC 方法时,通过 Picker 选择一个存活的连接,返回给客户端,然后调用 UpdatePicker 更新 LB 算法的内置状态,为下一次调用做准备
    • Balaner 是 gRPC 负载均衡最核心的模块

    据此,我们可用通过自定义的 Balancer,在 Balaner 基础上通过实现自定义的naming.Resolver来达到使用 Consul 看发现服务的功能。

    大概流程是:

    1. grpc 在 Dial 的时候通过 WithBalancer 传入 Balancer
    2. Balaner 会通过 naming.Resolver 去解析 (Resovle)Dial 传入的 target 得到一个nameing.Watcher
    3. naming.Watcher持续监视 target 解析到地址列表的变更并通过 Next 返回给 Balancer

    实现 Consul Resolver

    grpc-go/naming/naming.go中可以看到Resolver接口的声明

    type Resolver interface {
    	// Resolve creates a Watcher for target.
    	Resolve(target string) (Watcher, error)
    }
    

    需要实现一个Consul Resolver,在里面返回可用的服务端地址列表,在examples目录下新建grpcresolver文件夹,在该文件夹下新建consul.go文件:

    package grpcresolver
    
    import (
    	"fmt"
    	"net"
    	"strconv"
    	"sync"
    	"sync/atomic"
    
    	"github.com/hashicorp/consul/api"
    	"google.golang.org/grpc/naming"
    )
    
    type watchEntry struct {
    	addr string
    	modi uint64
    	last uint64
    }
    
    type consulWatcher struct {
    	down      int32
    	c         *api.Client
    	service   string
    	mu        sync.Mutex
    	watched   map[string]*watchEntry
    	lastIndex uint64
    }
    
    func (w *consulWatcher) Close() {
    	atomic.StoreInt32(&w.down, 1)
    }
    
    func (w *consulWatcher) Next() ([]*naming.Update, error) {
    	w.mu.Lock()
    	defer w.mu.Unlock()
    	watched := w.watched
    	lastIndex := w.lastIndex
    retry:
            // 访问 Consul, 获取可用的服务列表
    	services, meta, err := w.c.Catalog().Service(w.service, "", &api.QueryOptions{
    		WaitIndex: lastIndex,
    	})
    	if err != nil {
    		return nil, err
    	}
    	if lastIndex == meta.LastIndex {
    		if atomic.LoadInt32(&w.down) != 0 {
    			return nil, nil
    		}
    		goto retry
    	}
    	lastIndex = meta.LastIndex
    	var updating []*naming.Update
    	for _, s := range services {
    		ws := watched[s.ServiceID]
    		fmt.Println(s.ServiceAddress, s.ServicePort)
    		if ws == nil {
                            // 如果是新注册的服务
    			ws = &watchEntry{
    				addr: net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort)),
    				modi: s.ModifyIndex,
    			}
    			watched[s.ServiceID] = ws
                              
    			updating = append(updating, &naming.Update{
    				Op:   naming.Add,
    				Addr: ws.addr,
    			})
    		} else if ws.modi != s.ModifyIndex {
                            // 如果是原来的服务
    			updating = append(updating, &naming.Update{
    				Op:   naming.Delete,
    				Addr: ws.addr,
    			})
    			ws.addr = net.JoinHostPort(s.ServiceAddress, strconv.Itoa(s.ServicePort))
    			ws.modi = s.ModifyIndex
    			updating = append(updating, &naming.Update{
    				Op:   naming.Add,
    				Addr: ws.addr,
    			})
    		}
    		ws.last = lastIndex
    	}
    	for id, ws := range watched {
    		if ws.last != lastIndex {
    			delete(watched, id)
    			updating = append(updating, &naming.Update{
    				Op:   naming.Delete,
    				Addr: ws.addr,
    			})
    		}
    	}
    	w.watched = watched
    	w.lastIndex = lastIndex
    	return updating, nil
    }
    
    type consulResolver api.Client
    
    func (r *consulResolver) Resolve(target string) (naming.Watcher, error) {
    	return &consulWatcher{
    		c:       (*api.Client)(r),
    		service: target,
    		watched: make(map[string]*watchEntry),
    	}, nil
    }
    
    func ForConsul(reg *api.Client) naming.Resolver {
    	return (*consulResolver)(reg)
    }
    

    server 端通过 Consul 注册服务

    修改examples/helloword/greeter_server/main.go,在启动服务前,将服务的信息注册到 Consul

    package main
    
    import (
    	"context"
    	"encoding/hex"
    	"flag"
    	"fmt"
    	"log"
    	"math/rand"
    	"net"
    	"strconv"
    	"time"
    
    	"github.com/hashicorp/consul/api"
    	"google.golang.org/grpc"
    	pb "google.golang.org/grpc/examples/helloworld/helloworld"
    )
    
    const (
    	// host = "192.168.10.102"
    	// port = 50051
    	ttl = 30 * time.Second
    )
    
    // server is used to implement helloworld.GreeterServer.
    type server struct {
    	pb.UnimplementedGreeterServer
    	port int
    }
    
    // SayHello implements helloworld.GreeterServer
    func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    	log.Printf("Received: %v", in.GetName())
    	return &pb.HelloReply{Message: fmt.Sprintf("Hello %s, from %d", in.GetName(), s.port)}, nil
    }
    
    func main() {
    
    	host := flag.String("h", "127.0.0.1", "host")
    	port := flag.Int("p", 50051, "port")
    	flag.Parse()
    
    	lis, err := net.Listen("tcp", net.JoinHostPort(*host, strconv.Itoa(*port)))
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    
    	// Consul Client
    	registry, err := api.NewClient(api.DefaultConfig())
    	if err != nil {
    		log.Fatalln(err)
    	}
    
    	var h [16]byte
    	rand.Read(h[:])
    	// 生成一个全局ID
    	id := fmt.Sprintf("helloserver-%s-%d", hex.EncodeToString(h[:]), *port)
    	fmt.Println(id)
    	// 注册到 Consul,包含地址、端口信息,以及健康检查
    	err = registry.Agent().ServiceRegister(&api.AgentServiceRegistration{
    		ID:      id,
    		Name:    "helloserver",
    		Port:    *port,
    		Address: *host,
    		Check: &api.AgentServiceCheck{
    			TTL:     (ttl + time.Second).String(),
    			Timeout: time.Minute.String(),
    		},
    	})
    	if err != nil {
    		log.Fatalln(err)
    	}
    	go func() {
    		checkid := "service:" + id
    		for range time.Tick(ttl) {
    			err := registry.Agent().PassTTL(checkid, "")
    			if err != nil {
    				log.Fatalln(err)
    			}
    		}
    	}()
    
    	s := grpc.NewServer()
    	pb.RegisterGreeterServer(s, &server{port: *port})
    	if err := s.Serve(lis); err != nil {
    		log.Fatalf("failed to serve: %v", err)
    	}
    }
    

    client 端通过 Consul 发现服务

    package main
    
    import (
    	"context"
    	"log"
    	"time"
    
    	"github.com/hashicorp/consul/api"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/examples/grpcresolver"
    	pb "google.golang.org/grpc/examples/helloworld/helloworld"
    )
    
    const (
    	address     = "localhost:50051"
    	defaultName = "world"
    )
    
    func main() {
    	// consul
    	registry, err := api.NewClient(api.DefaultConfig())
    	if err != nil {
    		log.Fatalln(err)
    	}
    
    	// 自定义 LB,并使用刚才写的 Consul Resolver
    	lbrr := grpc.RoundRobin(grpcresolver.ForConsul(registry))
    
    	// Set up a connection to the server.
    	conn, err := grpc.Dial("helloserver", grpc.WithInsecure(), grpc.WithBalancer(lbrr))
    	if err != nil {
    		log.Fatalf("did not connect: %v", err)
    	}
    	defer conn.Close()
    	c := pb.NewGreeterClient(conn)
    
    	// 调用 server 端 RPC,通过响应观察负载均衡
    	for range time.Tick(time.Second) {
    		name := defaultName
    		r, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: name})
    		if err != nil {
    			log.Fatalf("could not greet: %v", err)
    			continue
    		}
    		log.Printf("server reply: %s", r.GetMessage())
    	}
    }
    

    启动 & 查看

    1. 启动两个 Server,设置不同的启动端口
    # 启动 server1
    $ go run grpc-goexampleshelloworldgreeter_servermain.go -p 50015
    helloserver-52fdfc072182654f163f5f0f9a621d72-50015
    
    # 启动 server2
    $ go run grpc-goexampleshelloworldgreeter_servermain.go -p 50014
    helloserver-52fdfc072182654f163f5f0f9a621d72-50014
    

    通过 Consul Web UI 查看,两个 instance 均是健康的

    1. 启动 Client
    $ go run grpc-goexampleshelloworldgreeter_clientmain.go
    2021/09/12 16:42:39 server reply: Hello world, from 50014
    2021/09/12 16:42:40 server reply: Hello world, from 50015
    2021/09/12 16:42:41 server reply: Hello world, from 50014
    2021/09/12 16:42:42 server reply: Hello world, from 50015
    2021/09/12 16:42:43 server reply: Hello world, from 50014
    2021/09/12 16:42:44 server reply: Hello world, from 50015
    2021/09/12 16:42:45 server reply: Hello world, from 50014
    

    可以看到是均匀对两个 server 发起调用,当我们将其中一个 instance server2 关掉(模拟不可用的情况),流量全面全部转移到另一台上了

    说明失败转移也是正常的。

  • 相关阅读:
    Quartz任务调度(3)存储与持久化操作配置详细解
    Quartz任务调度(2)CronTrigger定制个性化调度方案
    Quartz任务调度(1)概念例析快速
    Mybatis Generator最完整配置详解
    SpringMVC之@ControllerAdvice
    文件上传api——MultipartFile
    Springboot使用MatrixVariable 注解
    p命名空间和c命名空间
    SpringBoot配置Cors跨域请求
    SpringBoot五步配置Mybatis
  • 原文地址:https://www.cnblogs.com/Zioyi/p/15255570.html
Copyright © 2011-2022 走看看