zoukankan      html  css  js  c++  java
  • [go]etcd使用

    
    // 连接etcd
    import (
    	"github.com/coreos/etcd/clientv3"
    	"github.com/coreos/etcd/mvcc/mvccpb"
    )
    config = clientv3.Config{
    	Endpoints: []string{"127.0.0.1:2379"},
    	DialTimeout: 5 * time.Second,
    }
    
    client, err = clientv3.New(config)
    kv = clientv3.NewKV(client)
    
    // put
    putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "bye", clientv3.WithPrevKV())
    putResp.Header.Revision
    putResp.PrevKv.Value
    
    // get
    getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/)
    getResp.Kvs
    getResp.Count
    
    // get prefix
    getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix())
    getResp.Kvs
    
    
    // delete
    delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey(), clientv3.WithLimit(2))
    for _, kvpair = range delResp.PrevKvs {
    	fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value))
    }
    
    // lease: 关注key是否存在
    
    lease = clientv3.NewLease(client)
    leaseGrantResp, err = lease.Grant(context.TODO(), 10)
    keepRespChan, err = lease.KeepAlive(context.TODO(), leaseGrantResp.id) //自动续约
    
    putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseGrantResp.id))
    putResp.Header.Revision
    
    // watch:  关注key的变化
    
    getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7") //先get到当前值,监听后续变化
    watchStartRevision = getResp.Header.Revision + 1
    watcher = clientv3.NewWatcher(client)
    
    ctx, cancelFunc := context.WithCancel(context.TODO())
    time.AfterFunc(5 * time.Second, func() {
    	cancelFunc()
    })
    
    watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
    
    for watchResp = range watchRespChan {
    	for _, event = range watchResp.Events {
    		switch event.Type {
    		case mvccpb.PUT:
    			fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
    		case mvccpb.DELETE:
    			fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
    		}
    	}
    }
    
    // op操作(类似条件一样的)
    putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
    opResp, err = kv.Do(context.TODO(), putOp)
    
    // kv.Put
    // kv.Get
    // kv.Delete
    
    opResp.Put().Header.Revision
    
    getOp = clientv3.OpGet("/cron/jobs/job8")
    opResp, err = kv.Do(context.TODO(), getOp)
    opResp.Get().Kvs[0].ModRevision
    opResp.Get().Kvs[0].Value
    
    // 事务机制: 去锁资源, 判断lock目录key是否存在, 如果不存在则put进去, 标记抢锁成功, 如果存在则象征性get下, 标记强锁失败
    lease = clientv3.NewLease(client) //创建租约, 自动续租, 拿着租约去抢占一个key
    leaseGrantResp, err = lease.Grant(context.TODO(), 5)
    defer lease.Revoke(context.TODO(), leaseGrantResp.id)
    defer lease.Revoke(context.TODO(), leaseGrantResp.id)
    keepRespChan, err = lease.KeepAlive(ctx, leaseId) //5秒后会取消自动续租
    
    
    go func() {
    	for {
    		select {
    		case keepResp = <- keepRespChan:
    			if keepRespChan == nil {
    				fmt.Println("租约已经失效了")
    				goto END
    			} else {	// 每秒会续租一次, 所以就会受到一次应答
    				fmt.Println("收到自动续租应答:", keepResp.ID)
    			}
    		}
    	}
    END:
    }()
    
    kv = clientv3.NewKV(client)  //  if 不存在key, then 设置它, else 抢锁失败
    txn = kv.Txn(context.TODO())  // 创建事务
    
    //if条件成立,走then,否则走else
    txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
    	Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
    	Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
    txn.Commit()
    txnResp.Succeeded //抢锁成功
    
    

    理解事务: 一个curl的例子

    // python2
    // 已在etcd中创建了key1:0,A:"success",B:"failure"三个键值对
    import json
    import base64
    import requests
    
    URL = "http://127.0.0.1:2379/v3beta/kv/%s"
    
    url = URL % "txn"
    
    payload = {
        "compare":[
            {
                "target": "VALUE",
                "key":base64.b64encode("key1"),
                'result': "EQUAL",
                "value": base64.b64encode("0"),
            },
        ],
        "success":[
            {
                "requestRange":{
                    "key":base64.b64encode("A"),
                }
            }
        ],
        "failure":[
            {
                "requestRange":{
                    "key":base64.b64encode("B"),
                }
            }
        ]
    }
    resp = requests.post(url,json=payload)
    print json.dumps(resp.json(), indent=2)
    

    所谓的支持事务锁: 里面的操作是原子的

          // 发起转账视图
          txn := etcd.Txn(ctx.TODO()).If(
              v3.Compare(v3.ModRevision(from), “=”, fromKV.ModRevision),  // 事务提交时,from账户余额没有没有变动
              v3.Compare(v3.ModRevision(to), “=”, toKV.ModRevision))      // 事务提交时,to账户余额没有变动
          txn = txn.Then(
              OpPut(from, fromUint64(fromV - amount)),  // 更新from账户余额
              OpPut(to, fromUint64(toV - amount))       // 更新to账户余额
          putresp, err := txn.Commit()   // 提交事务
    

    [etcd]分布式锁

    etcd election特性使用场景——Master选举分析与实现

  • 相关阅读:
    LeetCode——Generate Parentheses
    LeetCode——Best Time to Buy and Sell Stock IV
    LeetCode——Best Time to Buy and Sell Stock III
    LeetCode——Best Time to Buy and Sell Stock
    LeetCode——Find Minimum in Rotated Sorted Array
    Mahout实现基于用户的协同过滤算法
    使用Java对文件进行解压缩
    LeetCode——Convert Sorted Array to Binary Search Tree
    LeetCode——Missing Number
    LeetCode——Integer to Roman
  • 原文地址:https://www.cnblogs.com/iiiiiher/p/11961054.html
Copyright © 2011-2022 走看看