zoukankan      html  css  js  c++  java
  • Zookeeper深入理解(三)kazoo接口

    zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。

    1.安装kazoo

    yum install python-pip

    pip install kazoo

    安装过程中会出现一些python依赖包未安装的情况,安装即可。

    2.运行kazoo基础例子kazoo_basic.py

    import time

    from kazoo.client import KazooClient

    from kazoo.client import KazooState

    def main():

        zk=KazooClient(hosts='127.0.0.1:2182')

        zk.start()

        

        @zk.add_listener

        def my_listener(state):

            if state == KazooState.LOST:

                print("LOST")

            elif state == KazooState.SUSPENDED:

                print("SUSPENDED")

            else:

                print("Connected")

        #Creating Nodes

        # Ensure a path, create if necessary

        zk.ensure_path("/my/favorite")

        # Create a node with data

        zk.create("/my/favorite/node", b"")

        zk.create("/my/favorite/node/a", b"A")

        #Reading Data

        # Determine if a node exists

        if zk.exists("/my/favorite"):

            print("/my/favorite is existed")

        @zk.ChildrenWatch("/my/favorite/node")

        def watch_children(children):

            print("Children are now: %s" % children)

        # Above function called immediately, and from then on

        @zk.DataWatch("/my/favorite/node")

        def watch_node(data, stat):

            print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

        # Print the version of a node and its data

        data, stat = zk.get("/my/favorite/node")

        print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

        # List the children

        children = zk.get_children("/my/favorite/node")

        print("There are %s children with names %s" % (len(children), children))

        #Updating Data

        zk.set("/my/favorite", b"some data")

        #Deleting Nodes

        zk.delete("/my/favorite/node/a")

        #Transactions

        transaction = zk.transaction()

        transaction.check('/my/favorite/node', version=-1)

        transaction.create('/my/favorite/node/b', b"B")

        results = transaction.commit()

        print ("Transaction results is %s" % results)

        zk.delete("/my/favorite/node/b")

        zk.delete("/my", recursive=True)

        time.sleep(2)

        zk.stop()

    if __name__ == "__main__":

        try:

            main()

        except Exception, ex:

            print "Ocurred Exception: %s" % str(ex)

            quit()

    运行结果:

    Children are now: [u'a']

    Version: 0, data: 

    Version: 0, data: 

    There are 1 children with names [u'a']

    Children are now: []

    Transaction results is [True, u'/my/favorite/node/b']

    Children are now: [u'b']

    Children are now: []

    No handlers could be found for logger "kazoo.recipe.watchers"

    LOST

    以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。

    3.运行通过kazoo实现的分布式锁程序kazoo_lock.py

    import logging, os, time

    from kazoo.client import KazooClient

    from kazoo.client import KazooState

    from kazoo.recipe.lock import Lock

    class ZooKeeperLock():

        def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):

            self.hosts = hosts

            self.id_str = id_str

            self.zk_client = None

            self.timeout = timeout

            self.logger = logger

            self.name = lock_name

            self.lock_handle = None

            self.create_lock()

        def create_lock(self):

            try:

                self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)

                self.zk_client.start(timeout=self.timeout)

            except Exception, ex:

                self.init_ret = False

                self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)

                logging.error(self.err_str)

                return

            try:

                lock_path = os.path.join("/", "locks", self.name)

                self.lock_handle = Lock(self.zk_client, lock_path)

            except Exception, ex:

                self.init_ret = False

                self.err_str = "Create lock failed! Exception: %s" % str(ex)

                logging.error(self.err_str)

                return

        def destroy_lock(self):

            #self.release()

            if self.zk_client != None:

                self.zk_client.stop()

                self.zk_client = None

        def acquire(self, blocking=True, timeout=None):

            if self.lock_handle == None:

                return None

            try:

                return self.lock_handle.acquire(blocking=blocking, timeout=timeout)

            except Exception, ex:

                self.err_str = "Acquire lock failed! Exception: %s" % str(ex)

                logging.error(self.err_str)

                return None

        def release(self):

            if self.lock_handle == None:

                return None

            return self.lock_handle.release()

        def __del__(self):

            self.destroy_lock()

    def main():

        logger = logging.getLogger()

        logger.setLevel(logging.INFO)

        sh = logging.StreamHandler()

        formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')

        sh.setFormatter(formatter)

        logger.addHandler(sh)

        zookeeper_hosts = "127.0.0.1:2182"

        lock_name = "test"

        lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)

        ret = lock.acquire()

        if not ret:

            logging.info("Can't get lock! Ret: %s", ret)

            return

        logging.info("Get lock! Do something! Sleep 10 secs!")

        for i in range(1, 11):

            time.sleep(1)

            print str(i)

        lock.release()

    if __name__ == "__main__":

        try:

            main()

        except Exception, ex:

            print "Ocurred Exception: %s" % str(ex)

            quit()

    将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

    参考链接:

    http://kazoo.readthedocs.org/en/latest/basic_usage.html

    http://yunjianfei.iteye.com/blog/2164888

  • 相关阅读:
    84. Largest Rectangle in Histogram (Solution 2)
    84. Largest Rectangle in Histogram (Solution 1)
    73. Set Matrix Zeroes
    【JavaScript】Symbol 静态方法
    【JavaScript】Date
    【JavaScript】Math
    725. Split Linked List in Parts把链表分成长度不超过1的若干部分
    791. Custom Sort String字符串保持字母一样,位置可以变
    508. Most Frequent Subtree Sum 最频繁的子树和
    762. Prime Number of Set Bits in Binary Representation二进制中有质数个1的数量
  • 原文地址:https://www.cnblogs.com/run4life/p/5331040.html
Copyright © 2011-2022 走看看