zoukankan      html  css  js  c++  java
  • [go]etcd使用

    
    // 连接etcd
    import (
    	"github.com/coreos/etcd/clientv3"
    	"github.com/coreos/etcd/mvcc/mvccpb"
    )
    config = clientv3.Config{
    	Endpoints: []string{"127.0.0.1:2379"},
    	DialTimeout: 5 * time.Second,
    }
    
    client, err = clientv3.New(config)
    kv = clientv3.NewKV(client)
    
    // put
    putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "bye", clientv3.WithPrevKV())
    putResp.Header.Revision
    putResp.PrevKv.Value
    
    // get
    getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/)
    getResp.Kvs
    getResp.Count
    
    // get prefix
    getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
    getResp.Kvs
    
    
    // delete
    delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2))
    for _, kvpair = range delResp.PrevKvs {
    	fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value))
    }
    
    // lease: 关注key是否存在
    
    lease = clientv3.NewLease(client)
    leaseGrantResp, err = lease.Grant(context.TODO(), 10)
    keepRespChan, err = lease.KeepAlive(context.TODO(), leaseGrantResp.id) //自动续约
    
    putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseGrantResp.id))
    putResp.Header.Revision
    
    // watch:  关注key的变化
    
    getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7") //先get到当前值,监听后续变化
    watchStartRevision = getResp.Header.Revision + 1
    watcher = clientv3.NewWatcher(client)
    
    ctx, cancelFunc := context.WithCancel(context.TODO())
    time.AfterFunc(5 * time.Second, func() {
    	cancelFunc()
    })
    
    watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
    
    for watchResp = range watchRespChan {
    	for _, event = range watchResp.Events {
    		switch event.Type {
    		case mvccpb.PUT:
    			fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
    		case mvccpb.DELETE:
    			fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
    		}
    	}
    }
    
    // op操作(类似条件一样的)
    putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
    opResp, err = kv.Do(context.TODO(), putOp)
    
    // kv.Put
    // kv.Get
    // kv.Delete
    
    opResp.Put().Header.Revision
    
    getOp = clientv3.OpGet("/cron/jobs/job8")
    opResp, err = kv.Do(context.TODO(), getOp)
    opResp.Get().Kvs[0].ModRevision
    opResp.Get().Kvs[0].Value
    
    // 事务机制: 去锁资源, 判断lock目录key是否存在, 如果不存在则put进去, 标记抢锁成功, 如果存在则象征性get下, 标记强锁失败
    lease = clientv3.NewLease(client) //创建租约, 自动续租, 拿着租约去抢占一个key
    leaseGrantResp, err = lease.Grant(context.TODO(), 5)
    defer lease.Revoke(context.TODO(), leaseGrantResp.id)
    defer lease.Revoke(context.TODO(), leaseGrantResp.id)
    keepRespChan, err = lease.KeepAlive(ctx, leaseId) //5秒后会取消自动续租
    
    
    go func() {
    	for {
    		select {
    		case keepResp = <- keepRespChan:
    			if keepRespChan == nil {
    				fmt.Println("租约已经失效了")
    				goto END
    			} else {	// 每秒会续租一次, 所以就会受到一次应答
    				fmt.Println("收到自动续租应答:", keepResp.ID)
    			}
    		}
    	}
    END:
    }()
    
    kv = clientv3.NewKV(client)  //  if 不存在key, then 设置它, else 抢锁失败
    txn = kv.Txn(context.TODO())  // 创建事务
    
    //if条件成立,走then,否则走else
    txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
    	Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
    	Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
    txn.Commit()
    txnResp.Succeeded //抢锁成功
    
    

    理解事务: 一个curl的例子

    // python2
    // 已在etcd中创建了key1:0,A:"success",B:"failure"三个键值对
    import json
    import base64
    import requests
    
    URL = "http://127.0.0.1:2379/v3beta/kv/%s"
    
    url = URL % "txn"
    
    payload = {
        "compare":[
            {
                "target": "VALUE",
                "key":base64.b64encode("key1"),
                'result': "EQUAL",
                "value": base64.b64encode("0"),
            },
        ],
        "success":[
            {
                "requestRange":{
                    "key":base64.b64encode("A"),
                }
            }
        ],
        "failure":[
            {
                "requestRange":{
                    "key":base64.b64encode("B"),
                }
            }
        ]
    }
    resp = requests.post(url,json=payload)
    print json.dumps(resp.json(), indent=2)
    

    所谓的支持事务锁: 里面的操作是原子的

          // 发起转账视图
          txn := etcd.Txn(ctx.TODO()).If(
              v3.Compare(v3.ModRevision(from), “=”, fromKV.ModRevision),  // 事务提交时,from账户余额没有没有变动
              v3.Compare(v3.ModRevision(to), “=”, toKV.ModRevision))      // 事务提交时,to账户余额没有变动
          txn = txn.Then(
              OpPut(from, fromUint64(fromV - amount)),  // 更新from账户余额
              OpPut(to, fromUint64(toV - amount))       // 更新to账户余额
          putresp, err := txn.Commit()   // 提交事务
    

    [etcd]分布式锁

    etcd election特性使用场景——Master选举分析与实现

  • 相关阅读:
    mysql教程(九) 索引详解
    mysql教程(八) 事务详解
    mysql教程(七) 约束详解
    mysql教程(七)创建表并添加约束
    mysql教程(六) 对字段的操作--添加、删除、修改
    mysql教程(五)limit的用法
    mysql教程(四)连接查询
    mysql教程(三)分组查询group by
    mysql教程(一)count函数与聚合函数
    mysql教程(二)数据库常用函数汇总
  • 原文地址:https://www.cnblogs.com/iiiiiher/p/11961054.html
Copyright © 2011-2022 走看看