zoukankan      html  css  js  c++  java
  • kratos Consul注册发现使用示例

    项目目录

    kratos-register  -- 为注册项目示例

    kratos-discovery -- 为发现项目示例

    kratos-discoveryinternalconsul  -- (核心)为本项目封装的基于Consul服务注册发现的demo

    kratos-registercmdmain.go -- 为服务注册代码示例位置

    kratos-discoveryapi-register  -- 为发现服务调用注册服务的协议文件  

    kratos-discoveryapiclient.go  -- 为服务发现示例代码位置

    kratos-discoveryinternaldaodao.go  -- 提供service 调用的入口

    kratos-discoveryinternalserverserviceservice.go -- 最终grpc 调用的实际方法

    准备前提

    服务注册

    kratos-registercmdmain.go

    func main(){
        
        // 实例化Discovery
        dis, err := consul.New(&consul.Config{Zone:"zone01", Env:"dev", Region:"region01"})
        if err != nil {
            panic(err)
        }
        // 注册为 resolver
        resolver.Register(dis)
    
        ip := "192.168.3.87"//你本地服务的ip地址 到时候可以替换
        port := "9002"//你本地grpc 服务的端口
        hn, _ := os.Hostname()
        //dis := discovery.New(nil)
        ins := &naming.Instance{
            Zone:     env.Zone, //时区 从环境变量中读取
            Env:      env.DeployEnv,
            AppID:    "register.service", //服务发现需要用到
            Hostname: hn,
            Addrs: []string{
                "grpc://" + ip + ":" + port,
            },
        }
        cancel, err := dis.Register(context.Background(), ins)
        if err != nil {
            panic(err)
        }
        
        signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
        for {
            s := <-c
            log.Info("get a signal %s", s.String())
            switch s {
            case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
                closeFunc()
                //程序退出时取消consul 注册 windows signal 机制无法命中此方法所以可能在windows环境可能无法模拟取消注册
                cancel()
                log.Info("kratos-register exit")
                time.Sleep(time.Second)
                return
            case syscall.SIGHUP:
                cancel()
            default:
                return
            }
    }

    服务发现

    kratos-discoveryapiclient.go  

    提供NewRegisterClient 方法给dao层使用。

    // AppID .这里的AppId 是你需要发现的服务的AppId
    const AppID = "register.service"
    
    func init(){
        // NOTE: 注意这段代码,表示要使用服务发现
        // NOTE: 还需注意的是,resolver.Register是全局生效的,所以建议该代码放在进程初始化的时候执行
        // NOTE: !!!切记不要在一个进程内进行多个不同中间件的Register!!!
        // NOTE: 在启动应用时,可以通过flag(-discovery.nodes) 或者 环境配置(DISCOVERY_NODES)指定discovery节点
        resolver.Register(consul.Builder())
    }
    
    // NewRClient new register grpc client 
    func NewRClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (register.RegisterClient, error) {
        client := warden.NewClient(cfg, opts...)
        //使用consul的 scheme
        cc, err := client.Dial(context.Background(), fmt.Sprintf("consul://default/%s", AppID))
        if err != nil {
            return nil, err
        }
        return register.NewRegisterClient(cc), nil
    }

    kratos-discoveryinternaldaodao.go  

    提供注册服务的实例给grpc使用

    // Dao dao interface
    type Dao interface {
        Close()
        Ping(ctx context.Context) (err error)
        // bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
        Article(c context.Context, id int64) (*model.Article, error)
        //注册的服务接口约束
        RegisterLogin(ctx context.Context, req *discoverApi.LoginReq) (reply *discoverApi.LoginResp, err error)
    }
    
    // dao dao.
    type dao struct {
        db          *sql.DB
        redis       *redis.Redis
        mc          *memcache.Memcache
        cache *fanout.Fanout
        demoExpire int32
        //暴露给service 接口使用的register.Client
        client register.RegisterClient
    }
    
    func newDao(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d *dao, cf func(), err error) {
        var cfg struct{
            DemoExpire xtime.Duration
        }
        if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
            return
        }
    
        //配置grpc 超时相关配置
        grpccfg := &warden.ClientConfig{
            Dial:              xtime.Duration(time.Second * 10),
            Timeout:           xtime.Duration(time.Second * 1),
            Subset:            50,
            KeepAliveInterval: xtime.Duration(time.Second * 60),
            KeepAliveTimeout:  xtime.Duration(time.Second * 20),
        }
        var rClient register.RegisterClient
        //通过服务发现创建registerClient 实例
        if rClient,err = discoverApi.NewRClient(grpccfg); err != nil {
            panic(err)
        }
        d = &dao{
            db: db,
            redis: r,
            mc: mc,
            cache: fanout.New("cache"),
            demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
            //
            client:rClient,
        }
        cf = d.Close
        return
    }
    
    // RegisterLogin 调用服务注册的Login方法
    func (d *dao) RegisterLogin(ctx context.Context, req *discoverApi.LoginReq) (reply *discoverApi.LoginResp, err error) {
        reply2, err := d.client.Login(ctx, (*register.LoginReq)(req))
        reply = (*discoverApi.LoginResp)(reply2)
        return
    }

    kratos-discoveryinternalserverserviceservice.go

    实现协议文件中RegisterLogin 方法

    //LoginUrl 登录服务接口逻辑底层调用的是register服务的登录方法
    func (s *Service) LoginUrl(ctx context.Context, req *discoverApi.LoginReq) (reply *discoverApi.LoginResp, err error) {
        fmt.Printf("discovery login username: %s, passwd: %s", req.Username, req.Passwd)
        //调用dao层的注册方法
         reply,err=    s.dao.RegisterLogin(ctx,req)
        return
    }

    kratos-discoveryinternalconsul  -- (核心)为本项目封装的基于Consul服务注册发现的demo

    package discovery
    
    import (
        "context"
        "encoding/json"
        "errors"
        "fmt"
        "github.com/go-kratos/kratos/pkg/log"
        "github.com/go-kratos/kratos/pkg/naming"
        "github.com/hashicorp/consul/api"
        "github.com/hashicorp/consul/api/watch"
        "github.com/hashicorp/go-hclog"
        "net/url"
        "strconv"
        "strings"
        "sync"
        "sync/atomic"
        "time"
    )
    
    var (
        ERR_INS_ADDRS_EMPTY = errors.New("len of ins.Addrs should not be 0")
    )
    
    type logWrapper struct {
    }
    
    func (wrapper logWrapper) Write(p []byte) (n int, err error) {
        log.Info(string(p))
        return len(p), nil
    }
    
    // Config discovery configures.
    type Config struct {
        Nodes  []string
        Region string
        Zone   string
        Env    string
        Host   string
    }
    
    // Resolver resolve naming service
    type Resolver struct {
        appID   string
        c       chan struct{}
        client  *api.Client
        agent   *api.Agent
        plan    *watch.Plan
        builder *Discovery
        ins     atomic.Value
    }
    
    func (resolver *Resolver) watch() error {
        var params map[string]interface{}
        watchKey := fmt.Sprintf(`{"type":"service", "service":"%s"}`, resolver.appID)
        if err := json.Unmarshal([]byte(watchKey), &params); err != nil {
            return err
        }
        plan, err := watch.Parse(params)
        if err != nil {
            return err
        }
        plan.Handler = func(idx uint64, raw interface{}) {
            if raw == nil {
                return // ignore
            }
            v, ok := raw.([]*api.ServiceEntry)
            if !ok {
                return // ignore
            }
            log.Info("consul watch service %s notify, len %d", resolver.appID, len(v))
            ins := resolver.coverServiceEntry2Ins(v)
            resolver.ins.Store(ins)
            resolver.c <- struct{}{}
        }
    
        logger := hclog.NewInterceptLogger(&hclog.LoggerOptions{}) // replace logger
        go func() {
            err := plan.RunWithClientAndHclog(resolver.client,logger)
            if err != nil {
                log.Error("watch service %s error %s", resolver.appID, err.Error())
            }
        }()
        resolver.plan = plan
        return nil
    }
    
    func (resolver *Resolver) Watch() <-chan struct{} {
        return resolver.c
    }
    
    func (resolver *Resolver) coverServiceEntry2Ins(serviceArr []*api.ServiceEntry) *naming.InstancesInfo {
        instancesInfo := &naming.InstancesInfo{}
        //instancesInfo.Scheduler = make([]naming.Zone, 0, 10)
        instancesInfo.Instances = make(map[string][]*naming.Instance)
        for _, service := range serviceArr {
            if service.Checks.AggregatedStatus() == api.HealthPassing {
                log.Info("appid %s ip %s port %d pass", resolver.appID, service.Service.Address, service.Service.Port)
                ins := resolver.coverService2Instance(service.Service)
                if _, ok := instancesInfo.Instances[ins.Zone]; !ok {
                    instancesInfo.Instances[ins.Zone] = make([]*naming.Instance, 0, 10)
                }
                instancesInfo.Instances[ins.Zone] = append(instancesInfo.Instances[ins.Zone], ins)
            }
        }
        instancesInfo.LastTs = time.Now().Unix()
        return instancesInfo
    }
    
    // unused
    func (resolver *Resolver) fetch(c context.Context) (*naming.InstancesInfo, bool) {
        _, infoArr, err := resolver.agent.AgentHealthServiceByName(resolver.appID)
        if err != nil {
            log.Error("get AgentHealthServiceByName %s err %s", resolver.appID, err.Error())
            return nil, false
        }
        instancesInfo := &naming.InstancesInfo{}
        //instancesInfo.Scheduler = make([]naming.Zone, 0, 10)
        instancesInfo.Instances = make(map[string][]*naming.Instance)
        log.Info("get AgentHealthServiceByName %s info len %d", resolver.appID, len(infoArr))
        for _, info := range infoArr {
            log.Info("get AgentHealthServiceByName %s info addr %s:%d status: %s", resolver.appID, info.Service.Address, info.Service.Port, info.AggregatedStatus)
            if info.AggregatedStatus != "passing" {
                continue
            }
            ins := resolver.coverService2Instance(info.Service)
            if _, ok := instancesInfo.Instances[ins.Zone]; !ok {
                instancesInfo.Instances[ins.Zone] = make([]*naming.Instance, 0, 10)
            }
            instancesInfo.Instances[ins.Zone] = append(instancesInfo.Instances[ins.Zone], ins)
        }
        instancesInfo.LastTs = time.Now().Unix()
        return instancesInfo, true
    }
    
    func (resolver *Resolver) Fetch(c context.Context) (ins *naming.InstancesInfo, ok bool) {
        v := resolver.ins.Load()
        ins, ok = v.(*naming.InstancesInfo)
        return
    }
    
    func (resolver Resolver) Close() error {
        if resolver.plan != nil && !resolver.plan.IsStopped() {
            resolver.plan.Stop()
        }
        return nil
    }
    
    func (resolver *Resolver) coverService2Instance(service *api.AgentService) *naming.Instance {
        meta := service.Meta
        addr := []string{
            service.Address + ":" + strconv.Itoa(service.Port),
        }
        ins := &naming.Instance{
            Region:   meta["region"],
            Zone:     meta["zone"],
            Env:      meta["env"],
            Hostname: meta["hostname"],
            Version:  meta["version"],
            AppID:    service.Service,
            Addrs:    addr,
        }
        ins.Metadata = make(map[string]string)
        for key, value := range meta {
            if key == "region" || key == "env" || key == "zone" || key == "version" || key == "hostname" {
                continue
            }
            ins.Metadata[key] = value
        }
        ins.LastTs = time.Now().Unix()
        return ins
    }
    
    func (builder Discovery) coverIns2AgentService(ins *naming.Instance) ([]*api.AgentServiceRegistration, error) {
        if len(ins.Addrs) == 0 {
            return nil, ERR_INS_ADDRS_EMPTY
        }
        registrationArr := make([]*api.AgentServiceRegistration, len(ins.Addrs))
        meta := make(map[string]string)
        meta["region"] = ins.Region
        meta["zone"] = ins.Zone
        meta["env"] = ins.Env
        meta["hostname"] = ins.Hostname
        meta["version"] = ins.Version
        meta["last_ts"] = strconv.FormatInt(ins.LastTs, 10)
    
        for key, value := range ins.Metadata {
            meta[key] = value
        }
        for i, addr := range ins.Addrs {
            urlVal, err := url.Parse(addr)
            if err != nil {
                return nil, err
            }
            port, _ := strconv.Atoi(urlVal.Port())
            service := &api.AgentServiceRegistration{
                ID:      ins.AppID + "-" + urlVal.Hostname() + "-" + urlVal.Port(),
                Name:    ins.AppID,
                Kind:    api.ServiceKindTypical,
                Port:    port,
                Address: urlVal.Scheme + "://" + urlVal.Hostname(),
                Meta:    meta,
            }
            registrationArr[i] = service
        }
        return registrationArr, nil
    }
    
    func (builder Discovery) Register(ctx context.Context, ins *naming.Instance) (cancel context.CancelFunc, err error) {
        serviceArr, err := builder.coverIns2AgentService(ins)
        if err != nil {
            return
        }
    
        ctx, cancel = context.WithCancel(ctx)
        defer func() {
            if err != nil { // avoid register partition
                cancel()
            }
        }()
        for _, service := range serviceArr { //@todo 批量注册
            service.Check = &api.AgentServiceCheck{
                TTL:    "15s",
                Status: api.HealthPassing,
            }
            var status string
            var info *api.AgentServiceChecksInfo
            status, info, err = builder.agent.AgentHealthServiceByID(service.ID)
            if err != nil {
                return
            }
            if info == nil && status == api.HealthCritical {
                err = builder.agent.ServiceRegister(service) // @todo check had registered
                if err != nil {
                    return
                }
            } else {
                err = builder.agent.PassTTL(fmt.Sprintf("service:%s", service.ID), "I am good :)")
                if err != nil {
                    return
                }
            }
    
            go func(service *api.AgentServiceRegistration) {
                for {
                    select {
                    case <-ctx.Done():
                        log.Info("ServiceDeregister %s", service.ID)
                        err := builder.agent.ServiceDeregister(service.ID)
                        if err != nil {
                            log.Error("consul: ServiceDeregister %s err: %s", service.ID, err.Error())
                        }
                        return
                    case <-time.After(time.Second * 5):
                        err := builder.agent.PassTTL(fmt.Sprintf("service:%s", service.ID), "I am good :)")
                        if err == nil {
                            continue
                        }
                        log.Error("consul: PassTTL %s err: %s", service.ID, err.Error())
                        if strings.Index(err.Error(), "does not have associated TTL") > 0 { // 注册已经失效
                            err = builder.agent.ServiceRegister(service) // consul 下线会导致 有这个 error
                            if err != nil {
                                log.Error("consul: PassTTL %s reRegister err: %s", service.ID, err.Error())
                            }
                        }
                    }
                }
            }(service)
        }
        return
    }
    
    
    
    type Discovery struct {
        client *api.Client
        agent  *api.Agent
        r      map[string]*Resolver
        locker sync.RWMutex
        c      *Config
        ctx        context.Context
        cancelFunc context.CancelFunc
    }
    
    func (builder *Discovery) Build(id string, opts ...naming.BuildOpt) naming.Resolver {
        builder.locker.RLock()
        if r, ok := builder.r[id]; ok {
            builder.locker.RUnlock()
            return r
        }
        builder.locker.RUnlock()
        builder.locker.Lock()
        r := &Resolver{
            appID:   id,
            client:  builder.client,
            agent:   builder.agent,
            builder: builder,
        }
        r.c = make(chan struct{}, 10)
        builder.r[id] = r
        builder.locker.Unlock()
        err := r.watch()
        if err != nil {
            log.Error("watch error %s", err.Error())
        }
        return r
    }
    
    func (builder *Discovery) Scheme() string {
        return "consul"
    }
    
    var (
        _once    sync.Once
        _builder naming.Builder
    )
    
    func Builder() naming.Builder {
        _once.Do(func() {
            _builder,_ = New(nil)
        })
        return _builder
    }
    
    
    func New(c *Config) (builder *Discovery, err error) {
        if c == nil {
            c = new(Config)
        }
        client, err := api.NewClient(api.DefaultConfig())
        ctx, cancel := context.WithCancel(context.Background())
        if err != nil {
            return
        }
        builder= &Discovery{
            client:        client,
            agent: client.Agent(),
            ctx:        ctx,
            cancelFunc: cancel,
            c:      c,
            r:  make(map[string]*Resolver),
        }
    
        return
    }

    设置环境变量(以goland示例)

    consul 环境变量参考

    // HTTPAddrEnvName defines an environment variable name which sets
    // the HTTP address if there is no -http-addr specified.
    HTTPAddrEnvName = "CONSUL_HTTP_ADDR"
    
    // HTTPTokenEnvName defines an environment variable name which sets
    // the HTTP token.
    HTTPTokenEnvName = "CONSUL_HTTP_TOKEN"
    
    // HTTPTokenFileEnvName defines an environment variable name which sets
    // the HTTP token file.
    HTTPTokenFileEnvName = "CONSUL_HTTP_TOKEN_FILE"
    
    // HTTPAuthEnvName defines an environment variable name which sets
    // the HTTP authentication header.
    HTTPAuthEnvName = "CONSUL_HTTP_AUTH"
    
    // HTTPSSLEnvName defines an environment variable name which sets
    // whether or not to use HTTPS.
    HTTPSSLEnvName = "CONSUL_HTTP_SSL"
    
    // HTTPCAFile defines an environment variable name which sets the
    // CA file to use for talking to Consul over TLS.
    HTTPCAFile = "CONSUL_CACERT"
    
    // HTTPCAPath defines an environment variable name which sets the
    // path to a directory of CA certs to use for talking to Consul over TLS.
    HTTPCAPath = "CONSUL_CAPATH"
    
    // HTTPClientCert defines an environment variable name which sets the
    // client cert file to use for talking to Consul over TLS.
    HTTPClientCert = "CONSUL_CLIENT_CERT"
    
    // HTTPClientKey defines an environment variable name which sets the
    // client key file to use for talking to Consul over TLS.
    HTTPClientKey = "CONSUL_CLIENT_KEY"
    
    // HTTPTLSServerName defines an environment variable name which sets the
    // server name to use as the SNI host when connecting via TLS
    HTTPTLSServerName = "CONSUL_TLS_SERVER_NAME"
    
    // HTTPSSLVerifyEnvName defines an environment variable name which sets
    // whether or not to disable certificate checking.
    HTTPSSLVerifyEnvName = "CONSUL_HTTP_SSL_VERIFY"
    
    // GRPCAddrEnvName defines an environment variable name which sets the gRPC
    // address for consul connect envoy. Note this isn't actually used by the api
    // client in this package but is defined here for consistency with all the
    // other ENV names we use.
    GRPCAddrEnvName = "CONSUL_GRPC_ADDR"
  • 相关阅读:
    base64和Blob的相互转换
    限制文件上传的大小和尺寸
    git将本地项目提交到github
    vue-cli3创建项目时报错
    运行项目是node-sass报错的解决方法
    classList的使用
    将数组扁平化并去除其中重复数据,最终得到一个升序且不重复的数组
    移动端的图片放大
    js获取url中的参数
    HTML5-canvas
  • 原文地址:https://www.cnblogs.com/chongyao/p/14272043.html
Copyright © 2011-2022 走看看