zoukankan      html  css  js  c++  java
  • syncer.go

    package mirror

    import (
        "github.com/coreos/etcd/clientv3"
        "golang.org/x/net/context"
    )

    const (
        batchLimit = 1000
    )

    // Syncer syncs with the key-value state of an etcd cluster.
    type Syncer interface {
        // SyncBase syncs the base state of the key-value state.
        // The key-value state are sent through the returned chan.
        SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
        // SyncUpdates syncs the updates of the key-value state.
        // The update events are sent through the returned chan.
        SyncUpdates(ctx context.Context) clientv3.WatchChan
    }

    // NewSyncer creates a Syncer.
    func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
        return &syncer{c: c, prefix: prefix, rev: rev}
    }

    type syncer struct {
        c      *clientv3.Client
        rev    int64
        prefix string
    }

    func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
        respchan := make(chan clientv3.GetResponse, 1024)
        errchan := make(chan error, 1)

        // if rev is not specified, we will choose the most recent revision.
        if s.rev == 0 {
            resp, err := s.c.Get(ctx, "foo")
            if err != nil {
                errchan <- err
                close(respchan)
                close(errchan)
                return respchan, errchan
            }
            s.rev = resp.Header.Revision
        }

        go func() {
            defer close(respchan)
            defer close(errchan)

            var key string

            opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}

            if len(s.prefix) == 0 {
                // If len(s.prefix) == 0, we will sync the entire key-value space.
                // We then range from the smallest key (0x00) to the end.
                opts = append(opts, clientv3.WithFromKey())
                key = "x00"
            } else {
                // If len(s.prefix) != 0, we will sync key-value space with given prefix.
                // We then range from the prefix to the next prefix if exists. Or we will
                // range from the prefix to the end if the next prefix does not exists.
                opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
                key = s.prefix
            }

            for {
                resp, err := s.c.Get(ctx, key, opts...)
                if err != nil {
                    errchan <- err
                    return
                }

                respchan <- (clientv3.GetResponse)(*resp)

                if !resp.More {
                    return
                }
                // move to next key
                key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
            }
        }()

        return respchan, errchan
    }

    func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
        if s.rev == 0 {
            panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
        }
        return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1))
    }

  • 相关阅读:
    Idea中将项目支持groovy语法
    Python驱动SAP GUI完成自动化(四)
    山东一男孩疑被假警察掳走
    关系型数据库,查看表列表,及字段列表
    my.cnf配置
    MySQL 密码策略
    Docker UnicodeEncodeError: 'ascii' codec can't encode characters in position
    三星S20开启120Hz高刷新率
    三星S20关闭5G消息状态图标
    Centos 7挂载本地ISO光盘
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7452731.html
Copyright © 2011-2022 走看看