zoukankan      html  css  js  c++  java
  • [Golang] ETCD键值监听器

    0x0 需求

      我们所有的服务启动后都以lease形式注册入ETCD,现要把这些服务监控起来。

    0x1 ETCD key监听器实现

      可动态增删要监听的键值对

      https://github.com/bailu1901/pkg/blob/master/etcd_watcher/watcher.go

    // Package etcd_watcher ETCD键值监听器
    package etcd_watcher
    
    import (
        "context"
        "sync"
        "time"
    
        "github.com/coreos/etcd/clientv3"
        "github.com/coreos/etcd/mvcc/mvccpb"
    )
    
    var (
        timeOut = time.Duration(3) * time.Second // 超时
    )
    
    // Listener 对外通知
    type Listener interface {
        Set([]byte, []byte)
        Create([]byte, []byte)
        Modify([]byte, []byte)
        Delete([]byte)
    }
    
    // EtcdWatcher ETCD key监视器
    type EtcdWatcher struct {
        cli          *clientv3.Client // etcd client
        wg           sync.WaitGroup
        listener     Listener
        mu           sync.Mutex
        closeHandler map[string]func()
    }
    
    // NewEtcdWatcher 构造
    func NewEtcdWatcher(servers []string) (*EtcdWatcher, error) {
        cli, err := clientv3.New(clientv3.Config{
            Endpoints:   servers,
            DialTimeout: timeOut,
        })
        if err != nil {
            return nil, err
        }
    
        ew := &EtcdWatcher{
            cli:          cli,
            closeHandler: make(map[string]func()),
        }
    
        return ew, nil
    }
    
    // AddWatch 添加监视
    func (mgr *EtcdWatcher) AddWatch(key string, prefix bool, listener Listener) bool {
        mgr.mu.Lock()
        defer mgr.mu.Unlock()
        if _, ok := mgr.closeHandler[key]; ok {
            return false
        }
        ctx, cancel := context.WithCancel(context.Background())
        mgr.closeHandler[key] = cancel
    
        mgr.wg.Add(1)
        go mgr.watch(ctx, key, prefix, listener)
    
        return true
    }
    
    // RemoveWatch 删除监视
    func (mgr *EtcdWatcher) RemoveWatch(key string) bool {
        mgr.mu.Lock()
        defer mgr.mu.Unlock()
        cancel, ok := mgr.closeHandler[key]
        if !ok {
            return false
        }
        cancel()
        delete(mgr.closeHandler, key)
    
        return true
    }
    
    // ClearWatch 清除所有监视
    func (mgr *EtcdWatcher) ClearWatch() {
        mgr.mu.Lock()
        defer mgr.mu.Unlock()
        for k := range mgr.closeHandler {
            mgr.closeHandler[k]()
        }
        mgr.closeHandler = make(map[string]func())
    }
    
    // Close 关闭
    func (mgr *EtcdWatcher) Close(wait bool) {
        mgr.ClearWatch()
    
        if wait {
            mgr.wg.Wait()
        }
    
        mgr.cli.Close()
        mgr.cli = nil
    }
    
    func (mgr *EtcdWatcher) watch(ctx context.Context, key string, prefix bool, listener Listener) error {
        defer mgr.wg.Done()
    
        ctx1, cancel := context.WithTimeout(context.Background(), timeOut)
        defer cancel()
        var getResp *clientv3.GetResponse
        var err error
        if prefix {
            getResp, err = mgr.cli.Get(ctx1, key, clientv3.WithPrefix())
        } else {
            getResp, err = mgr.cli.Get(ctx1, key)
        }
        if err != nil {
            return err
        }
    
        for _, ev := range getResp.Kvs {
            listener.Set(ev.Key, ev.Value)
        }
    
        var watchChan clientv3.WatchChan
        if prefix {
            watchChan = mgr.cli.Watch(context.Background(), key, clientv3.WithPrefix(), clientv3.WithRev(getResp.Header.Revision+1))
        } else {
            watchChan = mgr.cli.Watch(context.Background(), key, clientv3.WithRev(getResp.Header.Revision+1))
        }
        for {
            select {
            case <-ctx.Done():
                return nil
            case resp := <-watchChan:
                err := resp.Err()
                if err != nil {
                    return err
                }
                for _, ev := range resp.Events {
                    if ev.IsCreate() {
                        listener.Create(ev.Kv.Key, ev.Kv.Value)
                    } else if ev.IsModify() {
                        listener.Modify(ev.Kv.Key, ev.Kv.Value)
                    } else if ev.Type == mvccpb.DELETE {
                        listener.Delete(ev.Kv.Key)
                    } else {
                    }
                }
            }
        }
    }

    0x3 配合发送邮件做报警

      https://github.com/bailu1901/pkg/tree/master/mailbox

  • 相关阅读:
    js字符串空格和换行
    python resources
    -eous
    英语资源网站
    -iatry 没病走两步
    book corpus
    epub converters
    brainstorm detain
    craftsman
    parachute
  • 原文地址:https://www.cnblogs.com/mrblue/p/11752523.html
Copyright © 2011-2022 走看看