zoukankan      html  css  js  c++  java
  • syncer.go

    package mirror

    import (
        "github.com/coreos/etcd/clientv3"
        "golang.org/x/net/context"
    )

    const (
        batchLimit = 1000
    )

    // Syncer syncs with the key-value state of an etcd cluster.
    type Syncer interface {
        // SyncBase syncs the base state of the key-value state.
        // The key-value state are sent through the returned chan.
        SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
        // SyncUpdates syncs the updates of the key-value state.
        // The update events are sent through the returned chan.
        SyncUpdates(ctx context.Context) clientv3.WatchChan
    }

    // NewSyncer creates a Syncer.
    func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
        return &syncer{c: c, prefix: prefix, rev: rev}
    }

    type syncer struct {
        c      *clientv3.Client
        rev    int64
        prefix string
    }

    func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
        respchan := make(chan clientv3.GetResponse, 1024)
        errchan := make(chan error, 1)

        // if rev is not specified, we will choose the most recent revision.
        if s.rev == 0 {
            resp, err := s.c.Get(ctx, "foo")
            if err != nil {
                errchan <- err
                close(respchan)
                close(errchan)
                return respchan, errchan
            }
            s.rev = resp.Header.Revision
        }

        go func() {
            defer close(respchan)
            defer close(errchan)

            var key string

            opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}

            if len(s.prefix) == 0 {
                // If len(s.prefix) == 0, we will sync the entire key-value space.
                // We then range from the smallest key (0x00) to the end.
                opts = append(opts, clientv3.WithFromKey())
                key = "x00"
            } else {
                // If len(s.prefix) != 0, we will sync key-value space with given prefix.
                // We then range from the prefix to the next prefix if exists. Or we will
                // range from the prefix to the end if the next prefix does not exists.
                opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
                key = s.prefix
            }

            for {
                resp, err := s.c.Get(ctx, key, opts...)
                if err != nil {
                    errchan <- err
                    return
                }

                respchan <- (clientv3.GetResponse)(*resp)

                if !resp.More {
                    return
                }
                // move to next key
                key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
            }
        }()

        return respchan, errchan
    }

    func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
        if s.rev == 0 {
            panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
        }
        return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1))
    }

  • 相关阅读:
    [Spark]Spark-streaming通过Receiver方式实时消费Kafka流程(Yarn-cluster)
    [git]将代码上传到github
    [Scala]Scala安装以及在IDEA中配置Scala
    [tesseract-ocr]OCR图像识别Ubuntu下环境包安装
    [Spark]Spark-sql与hive连接配置
    [py2neo]Ubuntu14 安装py2neo失败问题解决
    [wcp部署]Linux(Ubuntu)安装部署WCP
    Office 365入门教程(一):开始使用Office 365
    微软Power BI入门教程(一):认识Power BI
    电脑病毒猛于虎,但这些坏习惯猛于病毒
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7452731.html
Copyright © 2011-2022 走看看