zoukankan      html  css  js  c++  java
  • Python 基于python操纵zookeeper介绍

    基于python操纵zookeeper介绍

     

    by:授客  QQ:1033553122

    测试环境

    Win7 64位

     

    Python 3.3.4

     

    kazoo-2.6.1-py2.py3-none-any.whl(windows)

    kazoo-2.6.1.tar.gz (linux)

    https://pypi.org/project/kazoo/#files

     

    zookeeper-3.4.13.tar.gz

    下载地址:

    http://zookeeper.apache.org/releases.html#download

    https://www.apache.org/dyn/closer.cgi/zookeeper/

    https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

     

     

     

     

    代码实践

    kazooStudy.py

    #!/usr/bin/env python 3.4.0

    #-*- encoding:utf-8 -*-

     

    __author__ = 'shouke'

     

     

    import threading

    import time

     

    from kazoo.client import  KazooClient

    from kazoo.client import KazooState

    from kazoo.retry import KazooRetry

     

     

    def restart_zk_client():

        '''重启zookeeper会话'''

        global zk_client

        global zk_conn_stat

        try:

            zk_client.restart()

        except Exception as e:

            print('重启zookeeper客户端异常:%s' % e)

     

    zk_conn_stat = 0 # zookeeper连接状态 1-LOST   2-SUSPENDED 3-CONNECTED/RECONNECTED

    def zk_conn_listener(state):

        '''zookeeper连接状态监听器'''

     

        global  zk_conn_stat

        if state == KazooState.LOST:

            print('zookeeper connection lost')

            zk_conn_stat = 1

            # Register somewhere that the session was lost

     

            thread = threading.Thread(target=restart_zk_client)

            thread.start()

     

        elif state == KazooState.SUSPENDED:

            print('zookeeper connection dicconnected')

            zk_conn_stat = 2

            # Handle being disconnected from Zookeeper

        else:

            zk_conn_stat = 3

            print('zookeeper connection cconnected/reconnected')

            # Handle being connected/reconnected to Zookeeper

     

    # 监视器

    # 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个WatchedEvent实例

    def event_listener(event):

        print(event)

     

     

     

    if __name__ == '__main__':

        try:

            # 建立连接

            zk_client = KazooClient(hosts='127.0.0.1:2181')

            zk_client.add_listener(zk_conn_listener) # 添加监听器,监听连接状态

            zk_client.start() # 初始化到zk的连接,可以设置超时时间 zk_client.start(timeout=15) 默认15秒

     

            print('zk_client state:', zk_client.state) # 查看链接状态

     

            # 创建节点

            # ensure_path() 递归创建path中不存在的节点,但是不能为节点设置数据,仅ACL.

            zk_client.ensure_path('/node1')

     

            # 创建永久节点

            # create创建节点的同时,可为节点设置数据,要求path路径必须存在

            if not zk_client.exists('/node1/subNode1'):

                zk_client.create('/node1/subNode1', b'sub node1')

     

            # 创建临时节点

            # 注意:会话丢失、重启会话会导致zookeeper删除重启会话前创建的临时节点

            if not zk_client.exists('/node1/subNode2'):

                zk_client.create('/node1/subNode2', b'sub node2', ephemeral=True)

     

            # 创建有序临时节点

            zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)

            # 读取数据

            # 判断节点是否存在

            if zk_client.exists('/node1'): # 如果返回值为None则表示不存在给定节点

                print('存在节点node1,节点路径/node1')

     

                # 获取节点相关数据

                data, stat = zk_client.get('/node1')

                if stat:

                    print("Version: %s, data: %s" % (stat.version, data.decode("utf-8")))

     

                # 获取给定节点的子节点

                children = zk_client.get_children('/node1')

                print('node1子节点 有 %s 子节点,节点名称为: %s' % (len(children), children))

     

                print('/ 子节点', zk_client.get_children('/'))

     

            # 更新节点

            # 更新节点数据

            zk_client.set("/node1/subNode2", b"some new data")

     

            # 删除节点 recursive参数可选,递归删除节点数据

            zk_client.delete("/node1", recursive=True)

     

     

            # 重试命令

            try:

                result = zk_client.retry(zk_client.get, "/node1/subNode3")

                print(result)

     

                # 自定义重试

                # max_tries 出错最大重试次数, ignore_expire False-重试的时候忽略会话过期,否则不忽略

                kr = KazooRetry(max_tries=3, ignore_expire=False)

                result = kr(zk_client.get, "/node1/subNode3")

            except Exception as e:

                print('/node1/subNode3 不存在,所以会运行出错')

     

     

            # 释放客户端占用资源,移除连接

            zk_client.stop()

     

            #  zk_client.stop() 会导致zk_client连接状态变成 LOST,进而触发线程调用函数 restart_zk_client,

            # 该函数未执行完成的情况下,如果马上执行类似get,create等函数,会导致运行出错

            #

     

            while zk_conn_stat != 3:

                continue

            else:

                i = 0

                while i < 3000:

                    if i % 200 == 0:

                        time.sleep(2)

                        print('创建新节点')

                        zk_client.ensure_path('/node1')

                        zk_client.ensure_path('/node1/subNode2')

                        zk_client.create('/node1/subNode', b'sub nodexxxx', ephemeral=True, sequence=True)

                        zk_client.set('/node1/subNode2', b'new data')

                    i += 1

     

     

     

            # 关闭客户端前必须先调用stop,否则会报错

            zk_client.stop()

     

            # 关闭客户端

            zk_client.close()

        except Exception as e:

            print('运行出错:%s' % e)

     

     

    monitor.py

    #!/usr/bin/env python

    #-*- encoding:utf-8 -*-

     

    __author__ = 'shouke'

     

    import time

     

    from kazoo.client import  KazooClient

    from kazoo.client import KazooState

     

    zk = KazooClient(hosts='10.118.52.26:2181')

    zk.start()

     

    @zk.add_listener

    def my_listener(state):

        if state == KazooState.LOST:

            print('LOST')

            # Register somewhere that the session was lost

        elif state == KazooState.SUSPENDED:

            print('SUSPENDED')

            # Handle being disconnected from Zookeeper

        else:

            pass

            print('CONNECTED')

            # Handle being connected/reconnected to Zookeeper

     

    # 监视器

    # 当节点有变化、节点被删除时,将以多线程的方式调用以参数形式传递给get()、exists()的监视函数,监视函数将会接收到一个WatchedEvent实例

    def event_listener(event):

        print(event)

     

    children = zk.get_children('/node1',watch=event_listener)

    print('node1 has %s children with names %s' % (len(children), children))

     

    # 更高级监视api

    # 监视子节点的编号

    @zk.ChildrenWatch('/node1')

    def watch_children(children):

        print("Children are now: %s" % children)

     

     

    # 监视节点数据变更

    @zk.DataWatch("/node1/subNode2") #

    def watch_node(data, state):

        """监视节点数据是否变化"""

        if state:

            print("Version:", state.version, "data:", data)

     

    # 空转

    i = 0

    while i< 100:

        # children = zk.get_children('/node1',watch=event_listener)

        # print('node1 has %s children with names %s' % (len(children), children))

        time.sleep(1)

     

    zk.stop()

    zk.close()

     

     

     

    关于kazooClient连接状态说明

    LOST

    CONNECTED

    SUSPENDED

     

    KazooClient客户端实例刚创建时,处于LOST状态,同zookeeper建立连接后,转为CONNECTED 。如果连接出问题、切换到不同的zookeeper集群几点,转为SUSPENDED状态,当你知道暂时不能执行命令,如果zookeeper节点不再是集群的一部分,连接将丢失,也会导致 SUSPENDED状态

     

    客户端再次同zookeeper建立连接,如果会话不存在,客户端连接状态将转为LOST,如果会话没有过期,可用则转为CONNECTED

     

     

    运行效果

    参考链接:

    https://kazoo.readthedocs.io/en/latest/basic_usage.html

  • 相关阅读:
    BUAA2020个人博客作业小结
    BUAA2020软工热身作业小结
    个人博客作业----总结
    个人阅读作业7
    超链接按钮点击变色,原来的链接恢复原色
    setInterval和setTimeout的区别以及setInterval越来越快问题的解决方法
    自定义网站404页面
    jQuery实现的上下滚动公告栏详细讲解
    K先生的博客
    Bootstrap4响应式布局之栅格系统
  • 原文地址:https://www.cnblogs.com/shouke/p/10547245.html
Copyright © 2011-2022 走看看