zoukankan      html  css  js  c++  java
  • golang中使用etcd

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
    )
    
    func main(){
        var (
            config clientv3.Config
            err error
            client *clientv3.Client
        )
        //配置
        config = clientv3.Config{
            Endpoints:[]string{"192.168.1.188:2379"},
            DialTimeout:time.Second*5,
        }
        //连接
        if client,err = clientv3.New(config);err != nil{
            fmt.Println(err)
            return
        }
        client=client
    }
    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
    )
    
    func main(){
        var (
            config clientv3.Config
            err error
            client *clientv3.Client
            kv clientv3.KV
            putResp *clientv3.PutResponse
    
        )
        //配置
        config = clientv3.Config{
            Endpoints:[]string{"192.168.1.188:2379"},
            DialTimeout:time.Second*5,
        }
        //连接 床见一个客户端
        if client,err = clientv3.New(config);err != nil{
            fmt.Println(err)
            return
        }
        //用于读写etcd的键值对
        kv = clientv3.NewKV(client)
        putResp, err = kv.Put(context.TODO(),"/cron/jobs/job1","bye",clientv3.WithPrevKV())
        if err != nil{
            fmt.Println(err)
        }else{
            //获取版本信息
            fmt.Println("Revision:",putResp.Header.Revision)
            if putResp.PrevKv != nil{
                fmt.Println("key:",string(putResp.PrevKv.Key))
                fmt.Println("Value:",string(putResp.PrevKv.Value))
                fmt.Println("Version:",string(putResp.PrevKv.Version))
            }
        }
    }
    Revision: 10
    key: /cron/jobs/job1
    Value: hello
    Version: 

    get

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
    )
    
    func main(){
        var (
            config clientv3.Config
            err error
            client *clientv3.Client
            kv clientv3.KV
            getResp *clientv3.GetResponse
    
        )
        //配置
        config = clientv3.Config{
            Endpoints:[]string{"192.168.1.188:2379"},
            DialTimeout:time.Second*5,
        }
        //连接 床见一个客户端
        if client,err = clientv3.New(config);err != nil{
            fmt.Println(err)
            return
        }
    
    
        //用于读写etcd的键值对
        kv = clientv3.NewKV(client)
    
        getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1")
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(getResp.Kvs)
    }
    [key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye" ]

    with用法

    //用于读写etcd的键值对
        kv = clientv3.NewKV(client)
    
        getResp,err = kv.Get(context.TODO(),"/cron/jobs/job1",clientv3.WithCountOnly())
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(getResp.Kvs,getResp.Count)
    [] 1

    读取前缀

    //用于读写etcd的键值对
        kv = clientv3.NewKV(client)
    
        //读取前缀
        getResp,err = kv.Get(context.TODO(),"/cron/jobs/",clientv3.WithPrefix())
        if err != nil {
            fmt.Println(err)
            return
        }
        fmt.Println(getResp.Kvs)
    [key:"/cron/jobs/job1" create_revision:4 mod_revision:11 version:5 value:"bye"  
    key:"/cron/jobs/job2" create_revision:12 mod_revision:12 version:1 value:"byehhhhhh" ]

    Delete

        //用于读写etcd的键值对
        kv = clientv3.NewKV(client)
    
        delResp,err = kv.Delete(context.TODO(),"/cron/jobs/job2",clientv3.WithPrevKV())
        if err != nil{
            fmt.Println(err)
            return
        }else{
            if len(delResp.PrevKvs) > 0 {
                for idx,kvpair = range delResp.PrevKvs{
                    idx = idx
                    fmt.Println("删除了",string(kvpair.Key),string(kvpair.Value))
                }
            }
        }
    byehhhhhh

    删除多个key

    delResp,err = kv.Delete(context.TODO(),"/cron/jobs",clientv3.WithPrefix())

    续租:

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
    )
    
    func main(){
        var (
            config clientv3.Config
            err error
            client *clientv3.Client
            kv clientv3.KV
            lease clientv3.Lease
            leaseid clientv3.LeaseID
            leaseGrantResp *clientv3.LeaseGrantResponse
            putResp *clientv3.PutResponse
            getResp *clientv3.GetResponse
            //keepresp *clientv3.LeaseKeepAliveResponse
            //keepRestChan <-chan *clientv3.LeaseKeepAliveResponse
    
        )
        //配置
        config = clientv3.Config{
            Endpoints:[]string{"192.168.1.188:2379"},
            DialTimeout:time.Second*5,
        }
        //连接 床见一个客户端
        if client,err = clientv3.New(config);err != nil{
            fmt.Println(err)
            return
        }
    
    
    
    
        //申请一个lease 租约
        lease = clientv3.NewLease(client)
    
        //申请一个10秒的租约
        if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{
            fmt.Println(err)
            return
        }
    
    
    
        //拿到租约id
        leaseid = leaseGrantResp.ID
    
        //获得kv api子集
        kv = clientv3.NewKV(client)
    
    
        //put一个kv 让它与租约关联起来 从而实现10秒自动过期
        if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{
            fmt.Println(err)
            return
        }
    
        fmt.Println("写入成功",putResp.Header.Revision)
    
        //定时的看一下key过期了没有
        for{
            if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{
                fmt.Println(err)
                return
            }
            if getResp.Count == 0{
                fmt.Println("kv过期了")
                break
            }
            fmt.Println("还没过期:",getResp.Kvs)
            time.Sleep(time.Second*2)
        }
    }
    写入成功 24
    还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
    还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
    还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
    还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
    还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
    还没过期: [key:"cron/lock/job1" create_revision:24 mod_revision:24 version:1 value:"v5" lease:7587840069550468387 ]
    kv过期了

    永不过期的租约

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
    )
    
    func main(){
        var (
            config clientv3.Config
            err error
            client *clientv3.Client
            kv clientv3.KV
            lease clientv3.Lease
            leaseid clientv3.LeaseID
            leaseGrantResp *clientv3.LeaseGrantResponse
            putResp *clientv3.PutResponse
            getResp *clientv3.GetResponse
            keepresp *clientv3.LeaseKeepAliveResponse
            keepRestChan <-chan *clientv3.LeaseKeepAliveResponse
    
        )
        //配置
        config = clientv3.Config{
            Endpoints:[]string{"192.168.1.188:2379"},
            DialTimeout:time.Second*5,
        }
        //连接 床见一个客户端
        if client,err = clientv3.New(config);err != nil{
            fmt.Println(err)
            return
        }
    
    
    
    
        //申请一个lease 租约
        lease = clientv3.NewLease(client)
    
        //申请一个10秒的租约
        if leaseGrantResp, err = lease.Grant(context.TODO(),10);err != nil{
            fmt.Println(err)
            return
        }
    
        //拿到租约id
        leaseid = leaseGrantResp.ID
    
        //获得kv api子集
        kv = clientv3.NewKV(client)
    
    
        //自动续租
        if keepRestChan,err = lease.KeepAlive(context.TODO(),leaseid);err != nil{
            fmt.Println(err)
            return
        }
        //处理续租应答的协程
        go func() {
            for {
                select {
                    case keepresp = <-keepRestChan:
                        if keepRestChan == nil{
                            fmt.Println("租约已失效了")
                            goto END
                        }else{//每秒会续租一次,所以就会收到一次应答
                            fmt.Println("收到自动续租的应答")
                        }
                }
            }
            END:
        }()
    
    
    
    
    
        //put一个kv 让它与租约关联起来 从而实现10秒自动过期
        if putResp,err = kv.Put(context.TODO(),"cron/lock/job1","v5",clientv3.WithLease(leaseid));err != nil{
            fmt.Println(err)
            return
        }
    
        fmt.Println("写入成功",putResp.Header.Revision)
    
        //定时的看一下key过期了没有
        for{
            if getResp,err = kv.Get(context.TODO(),"cron/lock/job1");err != nil{
                fmt.Println(err)
                return
            }
            if getResp.Count == 0{
                fmt.Println("kv过期了")
                break
            }
            fmt.Println("还没过期:",getResp.Kvs)
            time.Sleep(time.Second*2)
        }
    }
    View Code
    写入成功 38
    收到自动续租的应答
    还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
    还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
    收到自动续租的应答
    还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
    还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
    收到自动续租的应答
    还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
    还没过期: [key:"cron/lock/job1" create_revision:38 mod_revision:38 version:1 value:"v5" lease:7587840069550468444 ]
    收到自动续租的应答

    watch 

      监听k v变化  常用作与集群中配置下发,状态同步 非常有价值

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
        "github.com/coreos/etcd/mvcc/mvccpb"
    )
    
    func main() {
        var (
            config clientv3.Config
            client *clientv3.Client
            err error
            kv clientv3.KV
            watcher clientv3.Watcher
            getResp *clientv3.GetResponse
            watchStartRevision int64
            watchRespChan <-chan clientv3.WatchResponse
            watchResp clientv3.WatchResponse
            event *clientv3.Event
        )
    
        // 客户端配置
        config = clientv3.Config{
            Endpoints: []string{"36.111.184.221:2379"},
            DialTimeout: 5 * time.Second,
        }
    
        // 建立连接
        if client, err = clientv3.New(config); err != nil {
            fmt.Println(err)
            return
        }
    
        // KV
        kv = clientv3.NewKV(client)
    
        // 模拟etcd中KV的变化
        go func() {
            for {
                kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
    
                kv.Delete(context.TODO(), "/cron/jobs/job7")
    
                time.Sleep(1 * time.Second)
            }
        }()
    
        // 先GET到当前的值,并监听后续变化
        if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
            fmt.Println(err)
            return
        }
    
        // 现在key是存在的
        if len(getResp.Kvs) != 0 {
            fmt.Println("当前值:", string(getResp.Kvs[0].Value))
        }
    
        // 当前etcd集群事务ID, 单调递增的
        watchStartRevision = getResp.Header.Revision + 1
    
        // 创建一个watcher
        watcher = clientv3.NewWatcher(client)
    
        // 启动监听
        fmt.Println("从该版本向后监听:", watchStartRevision)
    
        ctx, cancelFunc := context.WithCancel(context.TODO())
        time.AfterFunc(5 * time.Second, func() {
            cancelFunc()
        })
    
        watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
    
        // 处理kv变化事件
        for watchResp = range watchRespChan {
            for _, event = range watchResp.Events {
                switch event.Type {
                case mvccpb.PUT:
                    fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
                case mvccpb.DELETE:
                    fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
                }
            }
        }
    }
    当前值: i am job7
    从该版本向后监听: 72
    删除了 key:"/cron/jobs/job7" mod_revision:72 
    修改为: key:"/cron/jobs/job7" create_revision:73 mod_revision:73 version:1 value:"i am job7" 
    删除了 key:"/cron/jobs/job7" mod_revision:74 
    修改为: key:"/cron/jobs/job7" create_revision:75 mod_revision:75 version:1 value:"i am job7" 
    删除了 key:"/cron/jobs/job7" mod_revision:76 
    修改为: key:"/cron/jobs/job7" create_revision:77 mod_revision:77 version:1 value:"i am job7" 
    删除了 key:"/cron/jobs/job7" mod_revision:78 
    修改为: key:"/cron/jobs/job7" create_revision:79 mod_revision:79 version:1 value:"i am job7" 
    删除了 key:"/cron/jobs/job7" mod_revision:80 
    修改为: key:"/cron/jobs/job7" create_revision:81 mod_revision:81 version:1 value:"i am job7" 
    删除了 key:"/cron/jobs/job7" mod_revision:82 

    op取代get put delete方法

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
    )
    
    func main() {
        var (
            config clientv3.Config
            client *clientv3.Client
            err error
            kv clientv3.KV
            putOp clientv3.Op
            getOp clientv3.Op
            opResp clientv3.OpResponse
        )
    
        // 客户端配置
        config = clientv3.Config{
            Endpoints: []string{"36.111.184.221:2379"},
            DialTimeout: 5 * time.Second,
        }
    
        // 建立连接
        if client, err = clientv3.New(config); err != nil {
            fmt.Println(err)
            return
        }
    
        kv = clientv3.NewKV(client)
    
        // 创建Op: operation
        putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
    
        // 执行OP
        if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
            fmt.Println(err)
            return
        }
    
        // kv.Do(op)
    
        // kv.Put
        // kv.Get
        // kv.Delete
    
        fmt.Println("写入Revision:", opResp.Put().Header.Revision)
    
        // 创建Op
        getOp = clientv3.OpGet("/cron/jobs/job8")
    
        // 执行OP
        if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
            fmt.Println(err)
            return
        }
    
        // 打印
        fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)    // create rev == mod rev
        fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
    }
    View Code

    事务txn实现分布式锁

    package main
    
    import (
        "github.com/coreos/etcd/clientv3"
        "time"
        "fmt"
        "context"
    )
    
    func main() {
        var (
            config clientv3.Config
            client *clientv3.Client
            err error
            lease clientv3.Lease
            leaseGrantResp *clientv3.LeaseGrantResponse
            leaseId clientv3.LeaseID
            keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
            keepResp *clientv3.LeaseKeepAliveResponse
            ctx context.Context
            cancelFunc context.CancelFunc
            kv clientv3.KV
            txn clientv3.Txn
            txnResp *clientv3.TxnResponse
        )
    
        // 客户端配置
        config = clientv3.Config{
            Endpoints: []string{"36.111.184.221:2379"},
            DialTimeout: 5 * time.Second,
        }
    
        // 建立连接
        if client, err = clientv3.New(config); err != nil {
            fmt.Println(err)
            return
        }
    
        // lease实现锁自动过期:
        // op操作
        // txn事务: if else then
    
        // 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
        lease = clientv3.NewLease(client)
    
        // 申请一个5秒的租约
        if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
            fmt.Println(err)
            return
        }
    
        // 拿到租约的ID
        leaseId = leaseGrantResp.ID
    
        // 准备一个用于取消自动续租的context
        ctx, cancelFunc = context.WithCancel(context.TODO())
    
        // 确保函数退出后, 自动续租会停止
        defer cancelFunc()
        defer lease.Revoke(context.TODO(), leaseId)
    
        // 5秒后会取消自动续租
        if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
            fmt.Println(err)
            return
        }
    
        // 处理续约应答的协程
        go func() {
            for {
                select {
                case keepResp = <- keepRespChan:
                    if keepRespChan == nil {
                        fmt.Println("租约已经失效了")
                        goto END
                    } else {    // 每秒会续租一次, 所以就会受到一次应答
                        fmt.Println("收到自动续租应答:", keepResp.ID)
                    }
                }
            }
        END:
        }()
    
        //  if 不存在key, then 设置它, else 抢锁失败
        kv = clientv3.NewKV(client)
    
        // 创建事务
        txn = kv.Txn(context.TODO())
    
        // 定义事务
    
        // 如果key不存在
        txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
            Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
            Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
    
        // 提交事务
        if txnResp, err = txn.Commit(); err != nil {
            fmt.Println(err)
            return // 没有问题
        }
    
        // 判断是否抢到了锁
        if !txnResp.Succeeded {
            fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
            return
        }
    
        // 2, 处理业务
    
        fmt.Println("处理任务")
        time.Sleep(5 * time.Second)
    
        // 3, 释放锁(取消自动续租, 释放租约)
        // defer 会把租约释放掉, 关联的KV就被删除了
    }

     执行结果:

  • 相关阅读:
    js监听对象属性的改变
    js中的垃圾回收机制
    防抖和节流
    Ajax的浏览器缓存问题及解决方法
    Java多线程系列---“基础篇”07之 线程休眠
    Java多线程系列---“基础篇”06之 线程让步
    Java多线程系列---“基础篇”05之 线程等待与唤醒
    Java多线程系列---“基础篇”04之 synchronized关键字
    Java多线程系列---“基础篇”03之 Thread中start()和run()的区别
    Java多线程系列---“基础篇”02之 常用的实现多线程的两种方式
  • 原文地址:https://www.cnblogs.com/sunlong88/p/11295424.html
Copyright © 2011-2022 走看看