zoukankan      html  css  js  c++  java
  • syncer.go

    package mirror

    import (
        "github.com/coreos/etcd/clientv3"
        "golang.org/x/net/context"
    )

    const (
        batchLimit = 1000
    )

    // Syncer syncs with the key-value state of an etcd cluster.
    type Syncer interface {
        // SyncBase syncs the base state of the key-value state.
        // The key-value state are sent through the returned chan.
        SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error)
        // SyncUpdates syncs the updates of the key-value state.
        // The update events are sent through the returned chan.
        SyncUpdates(ctx context.Context) clientv3.WatchChan
    }

    // NewSyncer creates a Syncer.
    func NewSyncer(c *clientv3.Client, prefix string, rev int64) Syncer {
        return &syncer{c: c, prefix: prefix, rev: rev}
    }

    type syncer struct {
        c      *clientv3.Client
        rev    int64
        prefix string
    }

    func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, chan error) {
        respchan := make(chan clientv3.GetResponse, 1024)
        errchan := make(chan error, 1)

        // if rev is not specified, we will choose the most recent revision.
        if s.rev == 0 {
            resp, err := s.c.Get(ctx, "foo")
            if err != nil {
                errchan <- err
                close(respchan)
                close(errchan)
                return respchan, errchan
            }
            s.rev = resp.Header.Revision
        }

        go func() {
            defer close(respchan)
            defer close(errchan)

            var key string

            opts := []clientv3.OpOption{clientv3.WithLimit(batchLimit), clientv3.WithRev(s.rev)}

            if len(s.prefix) == 0 {
                // If len(s.prefix) == 0, we will sync the entire key-value space.
                // We then range from the smallest key (0x00) to the end.
                opts = append(opts, clientv3.WithFromKey())
                key = "x00"
            } else {
                // If len(s.prefix) != 0, we will sync key-value space with given prefix.
                // We then range from the prefix to the next prefix if exists. Or we will
                // range from the prefix to the end if the next prefix does not exists.
                opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
                key = s.prefix
            }

            for {
                resp, err := s.c.Get(ctx, key, opts...)
                if err != nil {
                    errchan <- err
                    return
                }

                respchan <- (clientv3.GetResponse)(*resp)

                if !resp.More {
                    return
                }
                // move to next key
                key = string(append(resp.Kvs[len(resp.Kvs)-1].Key, 0))
            }
        }()

        return respchan, errchan
    }

    func (s *syncer) SyncUpdates(ctx context.Context) clientv3.WatchChan {
        if s.rev == 0 {
            panic("unexpected revision = 0. Calling SyncUpdates before SyncBase finishes?")
        }
        return s.c.Watch(ctx, s.prefix, clientv3.WithPrefix(), clientv3.WithRev(s.rev+1))
    }

  • 相关阅读:
    Objective-C中 Self和 Super详解
    OC类方法和实例方法中的self区别
    Objective-C----MRC内存管理 、 自动释放池 、 面向对象三大特性及封装 、 继承 、 组合与聚合
    Objective-C对象初始化 、 实例方法和参数 、 类方法 、 工厂方法 、 单例模式
    Objective-C语言介绍 、 Objc与C语言 、 面向对象编程 、 类和对象 、 属性和方法 、 属性和实例变量
    联合与枚举 、 高级指针 、 C语言标准库(一)
    C语言--- 字符串数组 、 预处理器和预处理指令 、 多文件编程 、 结构体
    C语言----变量及作用域 、 指针 、 指针和数组 、 进程空间 、 字符串
    iOS开发环境C语言基础 数组 函数
    ios开发环境 分支语句 、 循环结构(for) 、 循环结构
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7452731.html
Copyright © 2011-2022 走看看