zoukankan      html  css  js  c++  java
  • zookeeper适用场景:分布式锁实现

    问题导读:
    1.zookeeper如何实现分布式锁?
    2.什么是羊群效应?
    3.zookeeper如何释放锁?






    zookeeper应用场景有关于分布式集群配置文件同步问题的描述,设想一下如果有100台机器同时对同一台机器上某个文件进行修改,如何才能保证文本不会被写乱,这就是最简单的分布式锁,本文介绍利用zk实现分布式锁。下面是写锁的实现步骤

    分布式写锁
    create一个PERSISTENT类型的znode,/Locks/write_lock

    • 客户端创建SEQUENCE|EPHEMERAL类型的znode,名字是lockid开头,创建的znode是/Locks/write_lock/lockid0000000001
    • 调用getChildren()不要设置Watcher获取/Locks/write_lock下的znode列表
    • 判断自己步骤2创建znode是不是znode列表中最小的一个,如果是就代表获得了锁,如果不是往下走
    • 调用exists()判断步骤2自己创建的节点编号小1的znode节点(也就是获取的znode节点列表中最小的znode),并且设置Watcher,如果exists()返回false,执行步骤3
    • 如果exists()返回true,那么等待zk通知,从而在回掉函数里返回执行步骤3


    释放锁就是删除znode节点或者断开连接就行

    *注意:上面步骤2中getChildren()不设置Watcher的原因是,防止羊群效应,如果getChildren()设置了Watcher,那么集群一抖动都会收到通知。在整个分布式锁的竞争过程中,大量重复运行,并且绝大多数的运行结果都是判断出自己并非是序号最小的节点,从而继续等待下一次通知—,这个显然看起来不怎么科学。客户端无端的接受到过多的和自己不相关的事件通知,这如果在集群规模大的时候,会对Server造成很大的性能影响,并且如果一旦同一时间有多个节点的客户端断开连接,这个时候,服务器就会像其余客户端发送大量的事件通知——这就是所谓的羊群效应。
    下面是代码实现

    import sys
    
    class GJZookeeper(object):
    
        ZK_HOST = "localhost:2181"
        ROOT = "/Locks"
        WORKERS_PATH = join(ROOT,"write_lock")
        MASTERS_NUM = 1
        TIMEOUT = 10000
    
        def __init__(self, verbose = True):
            self.VERBOSE = verbose
            self.masters = []
            self.is_master = False
            self.path = None
    
            self.zk = ZKClient(self.ZK_HOST, timeout = self.TIMEOUT)
            self.say("login ok!")
            # init
            self.__init_zk()
            # register
            self.register()
    
        def __init_zk(self):
            """
            create the zookeeper node if not exist
            |--Locks
                    |--write_lock
            """
            nodes = (self.ROOT, self.WORKERS_PATH)
            for node in nodes: 
                if not self.zk.exists(node):
                    try:
                        self.zk.create(node, "")
                    except:
                        pass
    
        def register(self):
            """
            register a node for this worker
            |--Locks
                    |--write_lock
                                |--lockid000000000x ==> hostname
            """
    
            import socket
            hostname = socket.gethostname()
            self.path = self.zk.create(self.WORKERS_PATH + "/lockid", hostname, flags=zookeeper.EPHEMERAL | zookeeper.SEQUENCE)
            self.lockid = basename(self.path)
            self.say("register ok! I'm %s" % self.path)
            # check who is the master
            self.get_lock()
    
        def get_lock(self):
            """
            get children znode try to get lock
            """
            @watchmethod
            def watcher(event):
                self.say("child changed, try to get lock again.")
                self.get_lock()
    
            children = self.zk.get_children(self.WORKERS_PATH)
            children.sort()
            min_lock_id = children[0]
            self.say("%s's children: %s" % (self.WORKERS_PATH, children)) 
            if cmp(self.lockid,min_lock_id) == 0:
                self.get_lock_success()
                return True
            elif cmp(self.lockid,min_lock_id) > 0:
                index = children.index(self.lockid)
                new_lockid_watch = join(self.WORKERS_PATH,children[index-1])
                self.say("Add watch on %s"%new_lockid_watch)
                res = self.zk.exists(new_lockid_watch,watcher)
                if not res :
                    """代表没有存在之前小的锁,但是这并不意味着能拿到锁了,因为还可能有比当前还小的锁,还没轮到,要重新执行一遍"""
    #               self.get_lock_success()
                    return False
                else :
                    """现在的锁有人在使用,等他释放了再抢"""
                    self.say("I can not get the lock this time,wait for the next time")
                    return False
    
        def get_lock_success(self):
            self.say("I get the lock !!!")
            self.write_file()
            self.zk.delete(join(self.WORKERS_PATH,self.lockid))
            self.say("I release the lock !!!")
            sys.exit(1)
    
        def write_file(self):
            fd = open("lock.log",'a')
            fd.write("%s
    "%self.lockid)
            fd.close()
    
        def say(self, msg):
            """
            print messages to screen
            """
            if self.VERBOSE:
                if self.path:
                    log.info("[ %s(%s) ] %s" % (self.path, "master" if self.is_master else "slave", msg))
                else:
                    log.info(msg)
    
    def start_get_lock():
        gj_zookeeper = GJZookeeper()
    
    def main():
        th1 = threading.Thread(target = start_get_lock, name = "thread_1", args = ())
        th1.start()
        th1.join()
        
    if __name__ == "__main__":
        main()
        time.sleep(1000)
    

      文章转自:http://www.aboutyun.com/forum.php?mod=viewthread&tid=9267&ctid=16

  • 相关阅读:
    2018 蓝桥杯省赛 B 组模拟赛(一)-年龄
    在win10系统下安装和卸载Ubuntu系统(为了搞双系统)的各种办法
    2018 CCPC 中国大学生程序设计竞赛-网络选拔赛 1004(D题 )Find Integer(三角函数+费马大定理)
    HDU(杭州电子科技大学) 2614 Beat (BFS写法)
    SQL server用到的SQL语句备份下
    【SQL Server】SQL触发器经验详解
    SQL SERVER 语句大全
    sqlserver 触发器实例代码
    触发器deleted 表和 inserted 表详解
    SQL server触发器中 update insert delete 分别给写个例子被。
  • 原文地址:https://www.cnblogs.com/likehua/p/4060316.html
Copyright © 2011-2022 走看看