zoukankan      html  css  js  c++  java
  • Go语言操作etcd

    Go语言操作etcd

    这里使用官方的etcd/clientv3包来连接etcd并进行相关操作。

    安装

    go get go.etcd.io/etcd/clientv3
    

    报错处理:

    // 在go.mod中添加这个两句话
    replace github.com/coreos/bbolt v1.3.4 => go.etcd.io/bbolt v1.3.4
    replace google.golang.org/grpc => google.golang.org/grpc v1.26.0
    

    image-20210912105821052

    put、del、get操作

    put命令用来设置键值对数据,get命令用来根据key获取值。

    package main
    import (
    	"context"
    	"fmt"
    	"go.etcd.io/etcd/clientv3"
    	"time"
    )
    /*
    @author RandySun
    @create 2021-09-11-22:18
    */
    func main() {
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints: []string{"127.0.0.1:2379"},
    		DialTimeout: 5 * time.Second,
    	})
    	if err != nil{
    		fmt.Printf("connect to etcd failed err: %#v", err)
    	}
    	fmt.Println("connect to etcd success")
    	defer cli.Close()
    	// 添加数据
    	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    	_, err = cli.Put(ctx, "name", "randySun")
    	if err != nil{
    		fmt.Printf("put to etcd failed, err: %#v", err)
    	}
    	cancel()
    
    	
    	// get 取数据
    	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
    	resp, err := cli.Get(ctx, "name")
    	cancel()
    	if err != nil {
    		fmt.Printf("get from etcd failed, err:%v\n", err)
    		return
    	}
    	for _, ev := range resp.Kvs {
    		fmt.Printf("%s:%s\n", ev.Key, ev.Value)
    	}
    	// del 取数据
    	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
    	delResponse, err := cli.Delete(ctx, "name")
    	cancel()
    	if err != nil {
    		fmt.Printf("del from etcd failed, err:%v\n", err)
    		return
    	}
    	fmt.Println("delete count: ",delResponse.Deleted)
    }
    

    image-20210912104456112

    watch操作

    watch用来获取未来更改的通知。

    package main
    
    import (
    	"context"
    	"fmt"
    	"go.etcd.io/etcd/clientv3"
    	"time"
    )
    
    /*
    @author RandySun
    @create 2021-09-12-10:21
    */
    
    // watch: 监控etcd中的Key的变化(创建,更改,修改)
    func main() {
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints: []string{"127.0.0.1:2379"},
    		DialTimeout: 5 * time.Second,
    	})
    
    	if err  != nil{
    		fmt.Printf("connect fo failed err: %#v", err)
    	}
    	defer cli.Close()
    	// watch
    	watchCh := cli.Watch(context.Background(), "name") // <-chan WatchResponse
    
    	// 获取修改的指监控
    	for wresp := range watchCh{
    		for _, env := range wresp.Events{
    			// 获取被修改的key
    			fmt.Printf("type:%s key:%s value: %s\n", env.Type, env.Kv.Key, env.Kv.Value)
    
    		}
    	}
    
    }
    

    将上面的代码保存编译执行,此时程序就会等待etcd中name这个key的变化。

    例如:我们打开终端执行以下命令修改、删除、设置name这个key。

    $ ./etcdctl.exe --endpoints=http://127.0.0.1:2379 put name randysun
    
    $ ./etcdctl.exe --endpoints=http://127.0.0.1:2379 del name randysun
    
    $ ./etcdctl.exe --endpoints=http://127.0.0.1:2379 put name randysun
    

    上面的程序都能收到如下通知。

    type:PUT key:name value: randysun
    type:DELETE key:name value: 
    type:PUT key:name value: randysun
    

    image-20210912105505718

    lease租约

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    // etcd lease
    
    import (
    	"context"
    	"log"
    
    	"go.etcd.io/etcd/clientv3"
    )
    
    func main() {
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints:   []string{"127.0.0.1:2379"},
    		DialTimeout: time.Second * 5,
    	})
    	if err != nil {
    		log.Fatal(err)
    	}
    	fmt.Println("connect to etcd success.")
    	defer cli.Close()
    
    	// 创建一个5秒的租约
    	resp, err := cli.Grant(context.TODO(), 5)
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// 5秒钟之后, /nazha/ 这个key就会被移除
    	_, err = cli.Put(context.TODO(), "/nazha/", "dsb", clientv3.WithLease(resp.ID))
    	if err != nil {
    		log.Fatal(err)
    	}
    }
    

    keepAlive

    package main
    
    import (
    	"context"
    	"fmt"
    	"log"
    	"time"
    
    	"go.etcd.io/etcd/clientv3"
    )
    
    // etcd keepAlive
    
    func main() {
    	cli, err := clientv3.New(clientv3.Config{
    		Endpoints:   []string{"127.0.0.1:2379"},
    		DialTimeout: time.Second * 5,
    	})
    	if err != nil {
    		log.Fatal(err)
    	}
    	fmt.Println("connect to etcd success.")
    	defer cli.Close()
    
    	resp, err := cli.Grant(context.TODO(), 5)
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	_, err = cli.Put(context.TODO(), "/nazha/", "dsb", clientv3.WithLease(resp.ID))
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// the key 'foo' will be kept forever
    	ch, kaerr := cli.KeepAlive(context.TODO(), resp.ID)
    	if kaerr != nil {
    		log.Fatal(kaerr)
    	}
    	for {
    		ka := <-ch
    		fmt.Println("ttl:", ka.TTL)
    	}
    }
    

    基于etcd实现分布式锁

    go.etcd.io/etcd/clientv3/concurrency在etcd之上实现并发操作,如分布式锁、屏障和选举。

    导入该包:

    import "go.etcd.io/etcd/clientv3/concurrency"
    

    基于etcd实现的分布式锁示例:

    cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()
    
    // 创建两个单独的会话用来演示锁竞争
    s1, err := concurrency.NewSession(cli)
    if err != nil {
        log.Fatal(err)
    }
    defer s1.Close()
    m1 := concurrency.NewMutex(s1, "/my-lock/")
    
    s2, err := concurrency.NewSession(cli)
    if err != nil {
        log.Fatal(err)
    }
    defer s2.Close()
    m2 := concurrency.NewMutex(s2, "/my-lock/")
    
    // 会话s1获取锁
    if err := m1.Lock(context.TODO()); err != nil {
        log.Fatal(err)
    }
    fmt.Println("acquired lock for s1")
    
    m2Locked := make(chan struct{})
    go func() {
        defer close(m2Locked)
        // 等待直到会话s1释放了/my-lock/的锁
        if err := m2.Lock(context.TODO()); err != nil {
            log.Fatal(err)
        }
    }()
    
    if err := m1.Unlock(context.TODO()); err != nil {
        log.Fatal(err)
    }
    fmt.Println("released lock for s1")
    
    <-m2Locked
    fmt.Println("acquired lock for s2")
    

    输出:

    acquired lock for s1
    released lock for s1
    acquired lock for s2
    

    查看文档了解更多

    其他操作

    其他操作请查看etcd/clientv3官方文档

    参考链接:

    在当下的阶段,必将由程序员来主导,甚至比以往更甚。
  • 相关阅读:
    array_merge
    漏斗模型
    3 破解密码,xshell连接
    2-安装linux7
    1-Linux运维人员要求
    17-[模块]-time&datetime
    16-[模块]-导入方式
    Nginx服务器
    15-作业:员工信息表查询
    14-本章总结
  • 原文地址:https://www.cnblogs.com/randysun/p/15676125.html
Copyright © 2011-2022 走看看