zoukankan      html  css  js  c++  java
  • 使用Golang利用ectd实现一个分布式锁

    http://blog.codeg.cn/post/blog/2016-02-24-distrubute-lock-over-etcd/

     
    本文 http://blog.codeg.cn/post/blog/2016-02-24-distrubute-lock-over-etcd/ 是作者zieckey在研究和学习相关内容时所做的笔记,欢迎广大朋友指正和交流! 版权所有,欢迎转载和分享,但请保留此段声明。

    etcd是随着CoreOS项目一起成长起来的,随着Golang和CoreOS等项目在开源社区日益火热, etcd作为一个高可用、强一致性的分布式Key-Value存储系统被越来越多的开发人员关注和使用。

    这篇文章全方位介绍了etcd的应用场景,这里简单摘要如下:

    • 服务发现(Service Discovery)
    • 消息发布与订阅
    • 负载均衡
    • 分布式通知与协调
    • 分布式锁
    • 分布式队列
    • 集群监控与Leader竞选
    • 为什么用etcd而不用ZooKeeper

    本文重点介绍如何利用ectd实现一个分布式锁。 锁的概念大家都熟悉,当我们希望某一事件在同一时间点只有一个线程(goroutine)在做,或者某一个资源在同一时间点只有一个服务能访问,这个时候我们就需要用到锁。 例如我们要实现一个分布式的id生成器,多台服务器之间的协调就非常麻烦。分布式锁就正好派上用场。

    其基本实现原理为:

    1. 在ectd系统里创建一个key
    2. 如果创建失败,key存在,则监听该key的变化事件,直到该key被删除,回到1
    3. 如果创建成功,则认为我获得了锁

    具体代码如下:

    package etcdsync
    
    import (
    	"fmt"
    	"io"
    	"os"
    	"sync"
    	"time"
    
    	"github.com/coreos/etcd/client"
    	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
    )
    
    const (
    	defaultTTL = 60
    	defaultTry = 3
    	deleteAction = "delete"
    	expireAction = "expire"
    )
    
    // A Mutex is a mutual exclusion lock which is distributed across a cluster.
    type Mutex struct {
    	key    string
    	id     string // The identity of the caller
    	client client.Client
    	kapi   client.KeysAPI
    	ctx    context.Context
    	ttl    time.Duration
    	mutex  *sync.Mutex
    	logger io.Writer
    }
    
    // New creates a Mutex with the given key which must be the same
    // across the cluster nodes.
    // machines are the ectd cluster addresses
    func New(key string, ttl int, machines []string) *Mutex {
    	cfg := client.Config{
    		Endpoints:               machines,
    		Transport:               client.DefaultTransport,
    		HeaderTimeoutPerRequest: time.Second,
    	}
    
    	c, err := client.New(cfg)
    	if err != nil {
    		return nil
    	}
    
    	hostname, err := os.Hostname()
    	if err != nil {
    		return nil
    	}
    
    	if len(key) == 0 || len(machines) == 0 {
    		return nil
    	}
    
    	if key[0] != '/' {
    		key = "/" + key
    	}
    
    	if ttl < 1 {
    		ttl = defaultTTL
    	}
    
    	return &Mutex{
    		key:    key,
    		id:     fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), time.Now().Format("20060102-15:04:05.999999999")),
    		client: c,
    		kapi:   client.NewKeysAPI(c),
    		ctx: context.TODO(),
    		ttl: time.Second * time.Duration(ttl),
    		mutex:  new(sync.Mutex),
    	}
    }
    
    // Lock locks m.
    // If the lock is already in use, the calling goroutine
    // blocks until the mutex is available.
    func (m *Mutex) Lock() (err error) {
    	m.mutex.Lock()
    	for try := 1; try <= defaultTry; try++ {
    		if m.lock() == nil {
    			return nil
    		}
    		
    		m.debug("Lock node %v ERROR %v", m.key, err)
    		if try < defaultTry {
    			m.debug("Try to lock node %v again", m.key, err)
    		}
    	}
    	return err
    }
    
    func (m *Mutex) lock() (err error) {
    	m.debug("Trying to create a node : key=%v", m.key)
    	setOptions := &client.SetOptions{
    		PrevExist:client.PrevNoExist,
    		TTL:      m.ttl,
    	}
    	resp, err := m.kapi.Set(m.ctx, m.key, m.id, setOptions)
    	if err == nil {
    		m.debug("Create node %v OK [%q]", m.key, resp)
    		return nil
    	}
    	m.debug("Create node %v failed [%v]", m.key, err)
    	e, ok := err.(client.Error)
    	if !ok {
    		return err
    	}
    
    	if e.Code != client.ErrorCodeNodeExist {
    		return err
    	}
    
    	// Get the already node's value.
    	resp, err = m.kapi.Get(m.ctx, m.key, nil)
    	if err != nil {
    		return err
    	}
    	m.debug("Get node %v OK", m.key)
    	watcherOptions := &client.WatcherOptions{
    		AfterIndex : resp.Index,
    		Recursive:false,
    	}
    	watcher := m.kapi.Watcher(m.key, watcherOptions)
    	for {
    		m.debug("Watching %v ...", m.key)
    		resp, err = watcher.Next(m.ctx)
    		if err != nil {
    			return err
    		}
    
    		m.debug("Received an event : %q", resp)
    		if resp.Action == deleteAction || resp.Action == expireAction {
    			return nil
    		}
    	}
    
    }
    
    // Unlock unlocks m.
    // It is a run-time error if m is not locked on entry to Unlock.
    //
    // A locked Mutex is not associated with a particular goroutine.
    // It is allowed for one goroutine to lock a Mutex and then
    // arrange for another goroutine to unlock it.
    func (m *Mutex) Unlock() (err error) {
    	defer m.mutex.Unlock()
    	for i := 1; i <= defaultTry; i++ {
    		var resp *client.Response
    		resp, err = m.kapi.Delete(m.ctx, m.key, nil)
    		if err == nil {
    			m.debug("Delete %v OK", m.key)
    			return nil
    		}
    		m.debug("Delete %v falied: %q", m.key, resp)
    		e, ok := err.(client.Error)
    		if ok && e.Code == client.ErrorCodeKeyNotFound {
    			return nil
    		}
    	}
    	return err
    }
    
    func (m *Mutex) debug(format string, v ...interface{}) {
    	if m.logger != nil {
    		m.logger.Write([]byte(m.id))
    		m.logger.Write([]byte(" "))
    		m.logger.Write([]byte(fmt.Sprintf(format, v...)))
    		m.logger.Write([]byte("
    "))
    	}
    }
    
    func (m *Mutex) SetDebugLogger(w io.Writer) {
    	m.logger = w
    }
    

    其实类似的实现有很多,但目前都已经过时,使用的都是被官方标记为deprecated的项目。且大部分接口都不如上述代码简单。 使用上,跟Golang官方sync包的Mutex接口非常类似,先New(),然后调用Lock(),使用完后调用Unlock(),就三个接口,就是这么简单。示例代码如下:

    package main
    
    import (
    	"github.com/zieckey/etcdsync"
    	"log"
    )
    
    func main() {
    	//etcdsync.SetDebug(true)
    	log.SetFlags(log.Ldate|log.Ltime|log.Lshortfile)
    	m := etcdsync.New("/etcdsync", "123", []string{"http://127.0.0.1:2379"})
    	if m == nil {
    		log.Printf("etcdsync.NewMutex failed")
    	}
    	err := m.Lock()
    	if err != nil {
    		log.Printf("etcdsync.Lock failed")
    	} else {
    		log.Printf("etcdsync.Lock OK")
    	}
    
    	log.Printf("Get the lock. Do something here.")
    
    	err = m.Unlock()
    	if err != nil {
    		log.Printf("etcdsync.Unlock failed")
    	} else {
    		log.Printf("etcdsync.Unlock OK")
    	}
    }
    
    

    参考

    1. etcdsync项目地址
    2. ectd项目官方地址
  • 相关阅读:
    Problem about objc_exception_throw
    Change mime Types for mac's default apache
    重启Mac上的Apache服务
    通过无线网安装自己打包的ipa文件
    How to cancel selection for TreePanel(GXT)
    使用Jsoup获取网页内容超时设置
    HTML Meta, http-equiv, Refresh
    线上redis服务内存异常分析。
    C++/CLI 本地字符串和托管字符串之间的转换
    DWF Toolkit on Microsoft Windows
  • 原文地址:https://www.cnblogs.com/diegodu/p/5603678.html
Copyright © 2011-2022 走看看