from kazoo.client import KazooClient
from kazoo.client import KazooState
import time
print time.ctime()
zk = KazooClient(hosts='20.5.101.15:12181',timeout=500,read_only=True)
print zk
print zk.state
zk.start()
if zk.exists('/sds'):
print '111111111'
else:
print '222222222'
print zk.get_children('/consumers')
# print zk.get('/aaayyy')
# zk.set("/aaayyy", b"some data")
# print zk.get('/aaayyy')
def my_func(event):
print "abcdefg"
while True:
time.sleep(1)
children=zk.get_children("/consumers",watch=my_func)
print children
C:Python27python.exe C:/Users/tlcb/PycharmProjects/untitled/rizhiyi/t3.py
Thu Jun 21 15:21:44 2018
<kazoo.client.KazooClient object at 0x025A80B0>
LOST
222222222
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
[u'console-consumer-48493', u'console-consumer-63948']
abcdefg
[u'console-consumer-48493']
[u'console-consumer-48493']
[u'console-consumer-48493']
[u'console-consumer-48493']
Process finished with exit code 1
测试环境;
[root@elkapp05 bin]# ./kafka-console-consumer.sh --zookeeper 20.5.101.15:12181 --topic shuaige --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
[root@elkapp05 bin]# ./kafka-console-consumer.sh --zookeeper 20.5.101.15:12181 --topic shuaige --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
111
222222222
fdfdfd
dsadsadasd
888888888
565
fdfd
4324
3242
342
4
2432
5454
54
5
5
^CProcessed a total of 16 messages
当一个消费者断开后触发事件
Kazoo 可以设置watch 函数在几个节点,可以被触发当节点改变或者当子节点改变
这个对节点或者子节点改变可以是节点或者子节点被删除
Watchers 可以设置两种不同的方式,第一种是Zookeeper 默认支持一次性watch events.
这些watch 函数可以通过kazoo调用,不接收session events
不像本地的Zookeeper watches. 使用这个需要watch 函数被传递到下面方法里:
get()
get_children()
exists()
一个watch 函数传递到get()或者exists 会被调用当数据节点在节点或者子节点本身被删除
它会被传递到一个 WatchedEvent instance.
def my_func(event):
# check to see what the children are now
# Call my_func when the children change
children = zk.get_children("/my/favorite/node", watch=my_func)
Kazoo 包含一个更高级别的API用于watch 数据和子节点修改
是更容易使用因为它不需要重新设置watch 每次 当事件触发时。
它也传递数据和ZnodeStat 当watch一个节点或者子节点列表
watch 函数会被调用当每次有改变时
或直到函数返回False
如果 allow_session_lost是设置为True 那么函数不会被调用 当session 是丢失
下面的方法提供这个功能: