zoukankan      html  css  js  c++  java
  • Zookeeper深入理解(三)kazoo接口

    zookeeper的开发接口以前主要以java和c为主,随着python项目越来越多的使用zookeeper作为分布式集群实现,python的zookeeper接口也出现了很多,现在主流的纯python的zookeeper接口是kazoo。因此如何使用kazoo开发基于python的分布式程序是必须掌握的。

    1.安装kazoo

    yum install python-pip

    pip install kazoo

    安装过程中会出现一些python依赖包未安装的情况,安装即可。

    2.运行kazoo基础例子kazoo_basic.py

    import time

    from kazoo.client import KazooClient

    from kazoo.client import KazooState

    def main():

        zk=KazooClient(hosts='127.0.0.1:2182')

        zk.start()

        

        @zk.add_listener

        def my_listener(state):

            if state == KazooState.LOST:

                print("LOST")

            elif state == KazooState.SUSPENDED:

                print("SUSPENDED")

            else:

                print("Connected")

        #Creating Nodes

        # Ensure a path, create if necessary

        zk.ensure_path("/my/favorite")

        # Create a node with data

        zk.create("/my/favorite/node", b"")

        zk.create("/my/favorite/node/a", b"A")

        #Reading Data

        # Determine if a node exists

        if zk.exists("/my/favorite"):

            print("/my/favorite is existed")

        @zk.ChildrenWatch("/my/favorite/node")

        def watch_children(children):

            print("Children are now: %s" % children)

        # Above function called immediately, and from then on

        @zk.DataWatch("/my/favorite/node")

        def watch_node(data, stat):

            print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

        # Print the version of a node and its data

        data, stat = zk.get("/my/favorite/node")

        print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

        # List the children

        children = zk.get_children("/my/favorite/node")

        print("There are %s children with names %s" % (len(children), children))

        #Updating Data

        zk.set("/my/favorite", b"some data")

        #Deleting Nodes

        zk.delete("/my/favorite/node/a")

        #Transactions

        transaction = zk.transaction()

        transaction.check('/my/favorite/node', version=-1)

        transaction.create('/my/favorite/node/b', b"B")

        results = transaction.commit()

        print ("Transaction results is %s" % results)

        zk.delete("/my/favorite/node/b")

        zk.delete("/my", recursive=True)

        time.sleep(2)

        zk.stop()

    if __name__ == "__main__":

        try:

            main()

        except Exception, ex:

            print "Ocurred Exception: %s" % str(ex)

            quit()

    运行结果:

    Children are now: [u'a']

    Version: 0, data: 

    Version: 0, data: 

    There are 1 children with names [u'a']

    Children are now: []

    Transaction results is [True, u'/my/favorite/node/b']

    Children are now: [u'b']

    Children are now: []

    No handlers could be found for logger "kazoo.recipe.watchers"

    LOST

    以上程序运行了基本kazoo接口命令,包括创建删除加watcher等操作,通过调试并对比zookeeper服务节点znode目录结构的变化,就可以理解具体的操作结果。

    3.运行通过kazoo实现的分布式锁程序kazoo_lock.py

    import logging, os, time

    from kazoo.client import KazooClient

    from kazoo.client import KazooState

    from kazoo.recipe.lock import Lock

    class ZooKeeperLock():

        def __init__(self, hosts, id_str, lock_name, logger=None, timeout=1):

            self.hosts = hosts

            self.id_str = id_str

            self.zk_client = None

            self.timeout = timeout

            self.logger = logger

            self.name = lock_name

            self.lock_handle = None

            self.create_lock()

        def create_lock(self):

            try:

                self.zk_client = KazooClient(hosts=self.hosts, logger=self.logger, timeout=self.timeout)

                self.zk_client.start(timeout=self.timeout)

            except Exception, ex:

                self.init_ret = False

                self.err_str = "Create KazooClient failed! Exception: %s" % str(ex)

                logging.error(self.err_str)

                return

            try:

                lock_path = os.path.join("/", "locks", self.name)

                self.lock_handle = Lock(self.zk_client, lock_path)

            except Exception, ex:

                self.init_ret = False

                self.err_str = "Create lock failed! Exception: %s" % str(ex)

                logging.error(self.err_str)

                return

        def destroy_lock(self):

            #self.release()

            if self.zk_client != None:

                self.zk_client.stop()

                self.zk_client = None

        def acquire(self, blocking=True, timeout=None):

            if self.lock_handle == None:

                return None

            try:

                return self.lock_handle.acquire(blocking=blocking, timeout=timeout)

            except Exception, ex:

                self.err_str = "Acquire lock failed! Exception: %s" % str(ex)

                logging.error(self.err_str)

                return None

        def release(self):

            if self.lock_handle == None:

                return None

            return self.lock_handle.release()

        def __del__(self):

            self.destroy_lock()

    def main():

        logger = logging.getLogger()

        logger.setLevel(logging.INFO)

        sh = logging.StreamHandler()

        formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s')

        sh.setFormatter(formatter)

        logger.addHandler(sh)

        zookeeper_hosts = "127.0.0.1:2182"

        lock_name = "test"

        lock = ZooKeeperLock(zookeeper_hosts, "myid is 1", lock_name, logger=logger)

        ret = lock.acquire()

        if not ret:

            logging.info("Can't get lock! Ret: %s", ret)

            return

        logging.info("Get lock! Do something! Sleep 10 secs!")

        for i in range(1, 11):

            time.sleep(1)

            print str(i)

        lock.release()

    if __name__ == "__main__":

        try:

            main()

        except Exception, ex:

            print "Ocurred Exception: %s" % str(ex)

            quit()

    将该测试文件copy到多个服务器,同时运行,就可以看到分布式锁的效果了。

    参考链接:

    http://kazoo.readthedocs.org/en/latest/basic_usage.html

    http://yunjianfei.iteye.com/blog/2164888

  • 相关阅读:
    Android--多线程之Handler
    Android--Service之基础
    Android--UI之Fragment
    Android--多线程之图文混排
    python常用模块
    python应用之socket编程
    网络编程socket理论一
    pycharm Launching unittests with arguments
    python字符串格式化
    python数据类型之三
  • 原文地址:https://www.cnblogs.com/run4life/p/5331040.html
Copyright © 2011-2022 走看看