zoukankan      html  css  js  c++  java
  • Golang etcd服务注册与发现

    //sevice.go

    package discovery
    
    import (
        "context"
        "errors"
        "sync"
        "time"
    
        "github.com/coreos/etcd/clientv3"
        l4g "github.com/alecthomas/log4go"
    )
    
    type Service struct {
        closeChan chan struct{}    //关闭通道
        client    *clientv3.Client //etcd v3 client
        leaseID   clientv3.LeaseID //etcd 租约id
        key       string           //
        val       string           //
        wg        sync.WaitGroup
    }
    
    // NewService 构造一个注册服务
    func NewService(etcdEndpoints []string, key string, val string) (*Service, error) {
    
        cli, err := clientv3.New(clientv3.Config{
            Endpoints:   etcdEndpoints,
            DialTimeout: 2 * time.Second,
        })
    
        if nil != err {
            return nil, err
        }
    
        s := &Service{
            client:    cli,
            closeChan: make(chan struct{}),
            key:       key,
            val:       val,
        }
    
        return s, nil
    }
    
    // Start 开启注册
    // @param - ttlSecond 租期(秒)
    func (s *Service) Start(ttlSecond int64) error {
    
        // minimum lease TTL is 5-second
        resp, err := s.client.Grant(context.TODO(), ttlSecond)
        if err != nil {
            panic(err)
        }
    
        s.leaseID = resp.ID
        _, err = s.client.Put(context.TODO(), s.key, s.val, clientv3.WithLease(s.leaseID))
        if err != nil {
            panic(err)
        }
    
        ch, err1 := s.client.KeepAlive(context.TODO(), s.leaseID)
        if nil != err1 {
            panic(err)
        }
    
        l4g.Info("[discovery] Service Start leaseID:[%d] key:[%s], value:[%s]", s.leaseID, s.key, s.val)
    
        s.wg.Add(1)
        defer s.wg.Done()
    
        for {
            select {
            case <-s.closeChan:
                return s.revoke()
            case <-s.client.Ctx().Done():
                return errors.New("server closed")
            case ka, ok := <-ch:
                if !ok {
                    l4g.Warn("[discovery] Service Start keep alive channel closed")
                    return s.revoke()
                } else {
                    l4g.Fine("[discovery] Service Start recv reply from Service: %s, ttl:%d", s.key, ka.TTL)
                }
            }
        }
    
        return nil
    }
    
    // Stop 停止
    func (s *Service) Stop() {
        close(s.closeChan)
        s.wg.Wait()
        s.client.Close()
    }
    
    func (s *Service) revoke() error {
    
        _, err := s.client.Revoke(context.TODO(), s.leaseID)
    
        if err != nil {
            l4g.Error("[discovery] Service revoke key:[%s] error:[%s]", s.key, err.Error())
        } else {
            l4g.Info("[discovery] Service revoke successfully key:[%s]", s.key)
        }
    
        return err
    }

    //watch.go

    package discovery
    
    import (
        "context"
        "os"
        "time"
    
        "github.com/coreos/etcd/clientv3"
        "github.com/coreos/etcd/mvcc/mvccpb"
        l4g "github.com/alecthomas/log4go"
        "google.golang.org/grpc/grpclog"
    
    )
    
    type GroupManager struct {
        wg     sync.WaitGroup
        ctx    context.Context
        cancel context.CancelFunc
        once   sync.Once
    }
    
    func NewGroupManager() *GroupManager {
        ret := new(GroupManager)
        ret.ctx, ret.cancel = context.WithCancel(context.Background())
        return ret
    }
    
    func (this *GroupManager) Close() {
        this.once.Do(this.cancel)
    }
    
    func (this *GroupManager) Wait() {
        this.wg.Wait()
    }
    
    func (this *GroupManager) Add(delta int) {
        this.wg.Add(delta)
    }
    
    func (this *GroupManager) Done() {
        this.wg.Done()
    }
    
    func (this *GroupManager) Chan() <-chan struct{} {
        return this.ctx.Done()
    }
    
    type Target interface {
        Set(string, string)
        Create(string, string)
        Modify(string, string)
        Delete(string)
    }
    
    type Config struct {
        Servers        []string
        DailTimeout    int64
        RequestTimeout int64
        Prefix         bool
        Target         string
    }
    
    func Watch(gm *GroupManager, cfg *Config, target Target) {
        defer gm.Done()
    
        cli, err := clientv3.New(clientv3.Config{
            Endpoints:   cfg.Servers,
            DialTimeout: time.Duration(cfg.DailTimeout) * time.Second,
        })
        if err != nil {
            panic(err.Error())
            return
        }
        defer cli.Close() // make sure to close the client
    
        l4g.Info("[discovery] start watch %s", cfg.Target)
    
        ctx, cancel := context.WithTimeout(context.Background(), time.Duration(cfg.RequestTimeout)*time.Second)
        var resp *clientv3.GetResponse
        if cfg.Prefix {
            resp, err = cli.Get(ctx, cfg.Target, clientv3.WithPrefix())
        } else {
            resp, err = cli.Get(ctx, cfg.Target)
        }
        cancel()
        if err != nil {
            panic(err.Error())
        }
        for _, ev := range resp.Kvs {
            target.Set(string(string(ev.Key)), string(ev.Value))
        }
    
        var rch clientv3.WatchChan
        if cfg.Prefix {
            rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithPrefix(), clientv3.WithRev(resp.Header.Revision+1))
        } else {
            rch = cli.Watch(context.Background(), cfg.Target, clientv3.WithRev(resp.Header.Revision+1))
        }
        for {
            select {
            case <-gm.Chan():
                l4g.Info("[discovery] watch %s close", cfg.Target)
                return
            case wresp := <-rch:
                err := wresp.Err()
                if err != nil {
                    l4g.Info("[discovery] watch %s response error: %s ", cfg.Target, err.Error())
                    gm.Close()
                    return
                }
                l4g.Debug("[discovery] watch %s response %+v", cfg.Target, wresp)
                for _, ev := range wresp.Events {
                    if ev.IsCreate() {
                        target.Create(string(ev.Kv.Key), string(ev.Kv.Value))
                    } else if ev.IsModify() {
                        target.Modify(string(ev.Kv.Key), string(ev.Kv.Value))
                    } else if ev.Type == mvccpb.DELETE {
                        target.Delete(string(ev.Kv.Key))
                    } else {
                        l4g.Error("[discovery] no found watch type: %s %q", ev.Type, ev.Kv.Key)
                    }
                }
            }
        }
    }
  • 相关阅读:
    ctags and vim
    [转]bash快捷键
    util-linux编译unknown architecture 'BSD_LABELSECTOR' undeclared错误
    HTML5安全攻防详析之八:Web Socket攻击
    HTML5安全攻防详析之七:新标签攻击
    HTML5安全风险详析之六:API攻击
    HTML5安全风险详析之五:劫持攻击
    HTML5安全风险详析之四:Web Worker攻击
    HTML5安全风险详析之二:Web Storage攻击
    HTML5安全风险详析之一:CORS攻击
  • 原文地址:https://www.cnblogs.com/mrblue/p/9722682.html
Copyright © 2011-2022 走看看