zoukankan      html  css  js  c++  java
  • Zookeeper使用入门记录

    下面记录一下Zookeeper使用相关的入门知识,包括基本命令、四种数据节点、watcher监听、访问控制列表ACL和相关API操作。

    介绍

    Zookeeper是基于Google关于Chubby Lock的论文来开源实际的,主要用于分布式架构进行协调和管理,在大数据框架中作为一个“润滑剂”而存在,协调分布式框架的使用变得更加顺畅。

    主要作用如下。

    • 节点主备切换、上下线感知:高可用hadoop集群中,active状态的namenode宕机后,standby状态的namenode需要在zkfc进程下,进行主备切换,实现故障转移。
    • 配置管理:zookeeper也是一个文件系统,可以放配置文件,如hadoop中的一些配置信息,如节点名可以是配置文件全路径名,节点信息就是配置的具体信息。

    基本命令

    Zookeeper集群启动、停止和查看状态的基本命令。

    # 启动ZooKeeper集群;在ZooKeeper集群中的每个节点执行以下命令
    ${ZK_HOME}/bin/zkServer.sh start
    # 停止ZooKeeper集群
    ${ZK_HOME}/bin/zkServer.sh stop
    # 查看集群状态#
    ${ZK_HOME}/bin/zkServer.sh status
    

    客户端连接zookeeper server。

    [hadoop@node03 /root]$ zkCli.sh -server node01:2181,node02:2181,node03:2181
    

    连接后,可以查看客户端命令,整体命令不多,注意有[watch]选项的,都是可以设置watch监听的命令。

    # help查看命令提示
    [zk: node01:2181,node02:2181,node03:2181(CLOSED) 55] help
    ZooKeeper -server host:port cmd args
    	stat path [watch] # 有[watch]选项的,都是可以设置watch监听的命令
    	set path data [version]
    	ls path [watch]
    	delquota [-n|-b] path
    	ls2 path [watch]
    	setAcl path acl
    	setquota -n|-b val path
    	history
    	redo cmdno
    	printwatches on|off
    	delete path [version]
    	sync path
    	listquota path
    	rmr path
    	get path [watch]
    	create [-s] [-e] path data acl
    	addauth scheme auth
    	quit
    	getAcl path
    	close
    	connect host:port
    

    常用命令说明。

    命令 说明
    stat path [watch] 查看path节点的状态
    set path data 修改path的数据
    ls path [watch] 查看节点下有哪些子节点
    ls2 path [watch] 相当于ls+stat
    history 查看当前session输入过的命令
    delete path 删除节点
    rmr path 删除节点
    get path [watch] 查看节点数据
    create [-s] [-e] path data acl 创建节点,需指定数据
    quit 退出命令行
    close 关闭session
    connect host:port 连接某个服务器,可以设置多个连接地址

    简单的使用命令,实现节点创建、修改、查看和删除。

    # 查看节点信息
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 57] ls /
    [clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
    # 创建一个节点/node
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 58] create /node node-data
    Created /node
    # 查看是否创建新节点
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 61] ls /
    [clyang, yangchaolin0000000003, news, node, ycl, zookeeper, spark, yang, hbase, zk_test]
    

    查看节点数据。

    # 查看节点数据
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 59] get /node
    node-data
    cZxid = 0x1600000002
    ctime = Sat May 16 13:22:35 CST 2020
    mZxid = 0x1600000002
    mtime = Sat May 16 13:22:35 CST 2020
    pZxid = 0x1600000002
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 9
    numChildren = 0
    

    修改节点数据。

    # 修改节点数据
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 62] set /node node-newdata
    cZxid = 0x1600000002
    ctime = Sat May 16 13:22:35 CST 2020
    mZxid = 0x1600000003
    mtime = Sat May 16 13:23:21 CST 2020
    pZxid = 0x1600000002
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 12
    numChildren = 0
    # 修改成功
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 63] get /node
    node-newdata
    cZxid = 0x1600000002
    ctime = Sat May 16 13:22:35 CST 2020
    mZxid = 0x1600000003
    mtime = Sat May 16 13:23:21 CST 2020
    pZxid = 0x1600000002
    cversion = 0
    dataVersion = 1
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 12
    numChildren = 0
    

    删除节点。

    # 删除节点
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 64] delete /node
    # 已删除
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 65] ls /
    [clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
    

    znode

    (1)基本信息

    zookeeper中的节点树,类似linux文件系统的文件目录树,只是zookeeper中节点和子节点都是和数据绑定在一起的,看上去一个节点有文件(存储数据)又有目录(包含子节点)的属性。节点可以存储少量的数据,byte~kb级别,主要存储状态信息、配置信息、位置信息等,为了区分和其他文件系统中的区别,用znode来表示。

    znode中的数据是固定结构的数据,包含数据版本、ACL和时间戳等信息,每次znode中数据发生变化,对应的版本号会更新,如czxid、mzxid和pzxid。从上面get命令获取数据可以看出,会将对应的数据版本信息也一并获取。

    znode中数据的读写是具有原子性的,读写都是整个数据的读和更新替换,当然每个节点都有ACL(访问控制列表),这个列表限制了哪些用户只能做哪些操作,主要是create、read、write、delete和admin的操作。

    下面说明一下使用get命令得到的信息代表什么。

    [zk: node01:2181(CONNECTED) 6] get /ycl
    yangchaolin
    cZxid = 0x100000000a
    ctime = Thu May 14 20:49:01 CST 2020
    mZxid = 0x1000000017
    mtime = Thu May 14 21:11:34 CST 2020
    pZxid = 0x1000000016
    cversion = 2
    dataVersion = 4
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 11
    numChildren = 0
    

    说明如下。

    属性 说明
    cZxid 0x100000000a 创建节点的事务id
    ctime Thu May 14 20:49:01 CST 2020 创建节点的时间
    mZxid 0x1000000017 修改节点数据的事务id
    mtime Thu May 14 21:11:34 CST 2020 修改节点数据的时间
    pZxid 0x1000000016 子节点个数变化的事务id
    cversion 2 子节点个数变化次数
    dataVersion 4 数据版本,修改了4次
    aclVersion 0 节点权限的变化次数
    ephemeralOwner 0x0 0代表持久,如果带上session id就是临时
    dataLength 11 数据的字节数,11位
    numChildren 0 子节点的个数

    事务通常是一个64位的数字。由epoch+counter组成,各32位。以cZxid为例,0x100000000a是十六进制,后8位"0000000a"是counter,代表是第10次创建节点。剩下的"10"那就代表epoch,换成10进制是16,这个跟version-2目录下的currentEpoch一致,类似改朝换代的纪元,说明改朝换代到16代君主了。

    [hadoop@node01 ~]$ cat currentEpoch
    16
    

    (2)持久节点、临时节点、有序节点

    Zookeeper中默认创建节点是持久节点,也有临时节点,只要创建临时节点的session是active的状态,这个临时节点就不会自动删除。还有一个是有序节点,zookeeper会在创建有序节点时自动在节点后面追加一个整形数字。

    • 持久节点

    create命令默认就是创建有序节点,需要显示的删除。

    # 创建持久节点p-node
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 68] create /p-node ''
    Created /p-node
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 69] ls /
    [clyang, yangchaolin0000000003, news, ycl, p-node, zookeeper, spark, yang, hbase, zk_test]
    # 显示删除持久节点
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 70] delete /p-node
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 71] ls /
    [clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
    
    • 临时节点

    临时节点的生命周期跟客户端session绑定,一旦session失效,临时节点被删除。

    客户端1创建一个临时节点。

    # 创建一个临时节点,使用-e参数
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 72] create -e /e-node ''
    Created /e-node
    

    客户端2查看这个临时节点。

    # 能查看到
    [zk: node01:2181(CONNECTED) 1] ls /
    [clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, e-node, hbase, zk_test]
    

    客户端1断开和集群的连接。

    # 断开
    [zk: node01:2181,node02:2181,node03:2181(CONNECTED) 73] close
    2020-05-16 14:51:04,451 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x1721c11c5610000 closed
    [zk: node01:2181,node02:2181,node03:2181(CLOSED) 74] 2020-05-16 14:51:04,451 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
    

    客户端2查看这个临时节点已经自动删除。

    # 断开前
    [zk: node01:2181(CONNECTED) 1] ls /
    [clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, e-node, hbase, zk_test]
    # 断开后
    [zk: node01:2181(CONNECTED) 2] ls /
    [clyang, yangchaolin0000000003, news, ycl, zookeeper, spark, yang, hbase, zk_test]
    
    • 有序节点

    有序节点,可以在创建时不必考虑是否已经存在,系统会自动在后面增添整形数字,保证znode名字的唯一,配合临时节点,可以用作分布式锁。

    # 创建有序节点
    [zk: node01:2181(CONNECTED) 1] create -s /s-node ''
    Created /s-node0000000024
    # 再次创建,序号自增1
    [zk: node01:2181(CONNECTED) 3] create -s /s-node ''
    Created /s-node0000000025
    

    Watcher

    watcher是客户端在服务器端注册的事件监听器,如果被监听的节点有更新(创建、修改、删除节点)操作,watcher会触发,通知客户端更新的细节,触发后watcher会移除。

    注意,使用命令的方式创建的watcher是一个单次触发的操作,但在3.6.0版本,即使watcher触发后,也不会移除。如果是3.6.0之前的版本,要有多次触发的效果,需要使用后面的API来实现。

    设置watcher的命令,上面命令可以看出,使用stat、ls、ls2和get可以设置。

    • 节点更新的监听

    client1监听某一个节点。

    [zk: node01:2181(CONNECTED) 1] ls /ycl watch
    

    client2在这个节点下创建一个子节点。

    [zk: node02:2181(CONNECTED) 13] create /ycl/child-node ''
    Created /ycl/child-node
    

    client1上查看结果。

    [zk: node01:2181(CONNECTED) 2]
    WATCHER::
    
    WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/ycl
    
    • 节点上下线的监听

    client1下创建一个临时节点。

    [zk: node01:2181(CONNECTED) 2] create -e /e-node ''
    Created /e-node
    

    client2在这个节点上创建一个监听。

    [zk: node02:2181(CONNECTED) 15] ls /e-node watch
    

    client1模拟节点下线。

    [zk: node01:2181(CONNECTED) 3] close
    2020-05-16 22:58:58,650 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x2721c9db94c0000 closed
    [zk: node01:2181(CLOSED) 4] 2020-05-16 22:58:58,651 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
    

    client2查看结果。

    [zk: node02:2181(CONNECTED) 16]
    WATCHER::
    
    WatchedEvent state:SyncConnected type:NodeDeleted path:/e-node
    

    ACL

    默认情况下,节点的读写权限是没有限制的,但是如果想某些关键的节点数据受到保护,这就需要用到ACL了。

    ACL(Access Control List)可以设置客户端对zookeeper服务器上节点的权限范围,有如下5种权限类型,所有权限都拥有简称为crwda。

    类型 说明
    CREATE(c) 创建子节点的权限
    READ(r) 获取节点及子节点列表的权限
    WRITE(w) 更新节点数据的权限
    DELETE(d) 删除子节点的权限
    ADMIN(a) 设置节点ACL权限

    如何使用呢?zookeeper提供了对应的操作方法。

    命令 说明
    setAcl path acl 设置访问控制列表
    getAcl path 查看path的ACL设置
    addauth scheme auth 添加认证用户

    里面有scheme,这个是权限模式,是鉴权的策略,有如下几种可以选择,常用digest。

    权限模式方案 说明
    world 默认方式,相当于全世界都能访问,只有一个用户:anynone
    auth 使用已添加认证的用户认证
    digest 使用“用户名:密码”这种方式认证
    ip 使用ip地址认证

    下面实操一下,为测试节点添加权限并测试,以下操作都是基于文末博文。

    • world权限模式

    设置方式

    setAcl <path> world:anyone:<acl>
    

    客户端实操

    # 创建节点
    [zk: node01:2181(CONNECTED) 19] create /world-node ''
    Created /world-node
    # 默认是world策略,并且拥有5种权限
    [zk: node01:2181(CONNECTED) 20] getAcl /world-node
    'world,'anyone
    : cdrwa
    # 可以通过命令设置world策略
    [zk: node01:2181(CONNECTED) 22] setAcl /world-node world:anyone:cdrwa
    cZxid = 0x180000000d
    ctime = Sun May 17 11:00:58 CST 2020
    mZxid = 0x180000000d
    mtime = Sun May 17 11:00:58 CST 2020
    pZxid = 0x180000000d
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    
    • auth权限模式

    设置方式

    # 添加认证用户
    addauth digest <user>:<password> 
    # 设置方式
    setAcl <path> auth:<user>:<acl>
    

    客户端实操

    # 创建节点
    [zk: node01:2181(CONNECTED) 23] create /auth-node ''
    Created /auth-node
    # 默认是world策略
    [zk: node01:2181(CONNECTED) 24] getAcl /auth-node
    'world,'anyone
    : cdrwa
    # 添加认证用户
    [zk: node01:2181(CONNECTED) 25] addauth digest clyang:123
    # 设置策略auth,权限为rw
    [zk: node01:2181(CONNECTED) 26] setAcl /auth-node auth:clyang:rw
    cZxid = 0x1800000010
    ctime = Sun May 17 11:07:38 CST 2020
    mZxid = 0x1800000010
    mtime = Sun May 17 11:07:38 CST 2020
    pZxid = 0x1800000010
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    # 查看策略
    [zk: node01:2181(CONNECTED) 27] getAcl /auth-node
    'digest,'clyang:9SAlEXaEiGIBs1CVSgXIYgCAyO0=
    : rw
    # 断开连接
    [zk: node01:2181(CONNECTED) 28] close
    2020-05-17 11:09:54,886 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x1721ef0e8600000 closed
    [zk: node01:2181(CLOSED) 29] 2020-05-17 11:09:54,886 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
    # 重新连接
    [zk: node01:2181(CLOSED) 29] connect
    2020-05-17 11:09:59,573 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=node01:2181 sessionTimeout=30000 
    # 查看节点,发现无权限
    [zk: node01:2181(CONNECTED) 30] get /auth-node
    Authentication is not valid : /auth-node
    # 需要重新添加认证用户
    [zk: node01:2181(CONNECTED) 31] addauth digest clyang:123
    # 再次查看节点,发现可以查看
    [zk: node01:2181(CONNECTED) 32] get /auth-node
    ''
    cZxid = 0x1800000010
    ctime = Sun May 17 11:07:38 CST 2020
    mZxid = 0x1800000010
    mtime = Sun May 17 11:07:38 CST 2020
    pZxid = 0x1800000010
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    # 创建子节点,失败,因为只有rw权限,没有c的权限
    [zk: node01:2181(CONNECTED) 33] create /auth-node/childnode ''
    Authentication is not valid : /auth-node/childnode
    
    • digest权限模式

    设置方式

    setAcl <path> digest:<user>:<password>:<acl>
    

    这里的密码,需要通过SHA1和BASE64处理,需要先通过如下命令先查看明文密码对应的密文密码。

    echo -n <user>:<password> | openssl dgst -binary -sha1 | openssl base64
    

    得到密文

    [hadoop@node01 /kkb/install/zookeeper-3.4.5-cdh5.14.2/zkdatas/version-2]$ echo -n clyang:123 |openssl dgst -binary -sha1 | openssl base64
    # 生成的密文
    9SAlEXaEiGIBs1CVSgXIYgCAyO0=
    

    客户端实操

    # 创建节点
    [zk: node01:2181(CONNECTED) 34] create /digest-node ''
    Created /digest-node
    # 默认是world策略
    [zk: node01:2181(CONNECTED) 35] getAcl /digest-node
    'world,'anyone
    : cdrwa
    # 设置策略digest,权限全有,使用上面生成的密文
    [zk: node01:2181(CONNECTED) 36] setAcl /digest-node digest:clyang:9SAlEXaEiGIBs1CVSgXIYgCAyO0=:cdrwa
    cZxid = 0x1800000015
    ctime = Sun May 17 12:02:00 CST 2020
    mZxid = 0x1800000015
    mtime = Sun May 17 12:02:00 CST 2020
    pZxid = 0x1800000015
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    # 查看策略
    [zk: node01:2181(CONNECTED) 37] getAcl /digest-node
    'digest,'clyang:9SAlEXaEiGIBs1CVSgXIYgCAyO0=
    : cdrwa
    # 查看数据,ok,因为刚开始添加的认证用户还有效
    [zk: node01:2181(CONNECTED) 38] get /digest-node
    ''
    cZxid = 0x1800000015
    ctime = Sun May 17 12:02:00 CST 2020
    mZxid = 0x1800000015
    mtime = Sun May 17 12:02:00 CST 2020
    pZxid = 0x1800000015
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    # 断开连接
    [zk: node01:2181(CONNECTED) 39] close
    2020-05-17 12:04:38,033 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x1721ef0e8600001 closed
    [zk: node01:2181(CLOSED) 40] 2020-05-17 12:04:38,033 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@512] - EventThread shut down
    # 重新连接
    [zk: node01:2181(CLOSED) 40] connect
    2020-05-17 12:04:43,545 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=node01:2181 sessionTimeout=30000 
    # 查看无权限
    [zk: node01:2181(CONNECTED) 41] get /digest-node
    Authentication is not valid : /digest-node
    # 添加认证用户
    [zk: node01:2181(CONNECTED) 42] addauth digest clyang:123
    # 可以查看节点
    [zk: node01:2181(CONNECTED) 43] get /digest-node
    ''
    cZxid = 0x1800000015
    ctime = Sun May 17 12:02:00 CST 2020
    mZxid = 0x1800000015
    mtime = Sun May 17 12:02:00 CST 2020
    pZxid = 0x1800000015
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    
    • ip权限模式

    设置方式

    setAcl <path> ip:<ip>:<acl>
    

    客户端实操

    # 创建节点
    [zk: node01:2181(CONNECTED) 44] create /ip-node ''
    Created /ip-node
    # 默认是world策略
    [zk: node01:2181(CONNECTED) 45] getAcl /ip-node
    'world,'anyone
    : cdrwa
    # 设置策略为ip
    [zk: node01:2181(CONNECTED) 46] setAcl /ip-node ip:192.168.200.100:cdrwa
    cZxid = 0x1800000019
    ctime = Sun May 17 12:10:49 CST 2020
    mZxid = 0x1800000019
    mtime = Sun May 17 12:10:49 CST 2020
    pZxid = 0x1800000019
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    # 设置成功
    [zk: node01:2181(CONNECTED) 47] getAcl /ip-node
    'ip,'192.168.200.100
    : cdrwa
    # 本机ip是192.168.200.110,提示访问不到
    [zk: node01:2181(CONNECTED) 48] get /ip-node
    Authentication is not valid : /ip-node
    

    换成ip为192.168.200.100的client来访问,没有问题。

    [zk: node01:2181(CONNECTED) 0] get /ip-node
    ''
    cZxid = 0x1800000019
    ctime = Sun May 17 12:10:49 CST 2020
    mZxid = 0x1800000019
    mtime = Sun May 17 12:10:49 CST 2020
    pZxid = 0x1800000019
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x0
    dataLength = 2
    numChildren = 0
    

    API

    包含两种风格的编程方式,一种是zookeeper原生的,一种是curator封装后的。

    (1)原生 API

    使用原生api,实现类似命令操作的基本功能。

    里面用到CountDownLatch线程递减锁,在线程计数归零之前,线程会被阻塞。

    package com.boe.zk;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperDemo {
    
        private ZooKeeper zk=null;
    
        //连接zookeeper
        @Before
        public void connect() throws IOException, InterruptedException {
            /**
             * connectString 连接的地址+端口
             * sessionTimeout 4s~40s之间
             * Watcher 监控者
             * zk连接过程使用netty,底层基于NIO,是一个非阻塞连接
             */
            //在监控到之前,test线程不能继续往下走
            final CountDownLatch cdl=new CountDownLatch(1);
            zk=new ZooKeeper("192.168.200.100:2181", 5000, new Watcher() {
    
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()== Event.KeeperState.SyncConnected){
                        System.out.println("连接zk成功....");
                        cdl.countDown();
                    }
                }
            });
    
            //cdl计数减到0后,cdl的await才会解除阻塞,执行下面的方法
            //如果不用CountDownLatch,会出现先打印"finish~~~",然后打印"连接zk成功...."
            cdl.await();
            System.out.println("finish~~~");
        }
    
        //创建节点 create
        @Test
        public void createNode() throws KeeperException, InterruptedException {
            /**
             * path 路径
             * data 数据
             * acl 权限策略
             * createMode 节点类型
             */
            String s = zk.create("/yang", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println(s);//返回值是节点的路径,主要用于顺序节点
            System.out.println("创建节点success");
        }
    
        //删除节点 delete
        @Test
        public void deleteNode() throws KeeperException, InterruptedException {
            /**
             * path 路径
             * version dataversion版本号
             */
            zk.delete("/yang",-1);
            //强制删除 版本号可以给-1,-1代表忽略版本校验
            System.out.println("删除节点success");
        }
        //修改数据 set
        @Test
        public void setNode() throws KeeperException, InterruptedException {
            Stat stat = zk.setData("/yang", "hello zookeeper".getBytes(), 0);
            System.out.println(stat.toString());
            System.out.println("修改数据成功");
        }
    
        //获取节点 get
        @Test
        public void getNode() throws KeeperException, InterruptedException {
            Stat s=new Stat();
            byte[] data = zk.getData("/yang", null, s);
            System.out.println(new String(data));
        }
    
        //获取子节点 ls
        @Test
        public void getChild() throws KeeperException, InterruptedException {
            List<String> children = zk.getChildren("/video", null);
            for(String s: children){
                //返回子节点的节点名
                System.out.println("子节点为:"+children);
            }
        }
    
        //创建某个节点是否存在
        @Test
        public void exist() throws KeeperException, InterruptedException {
            //返回值是节点信息
            Stat stat = zk.exists("/video", null);
            System.out.println(stat);
            System.out.println(stat==null? "节点不存在":"节点存在");
        }
    
    
    }
    

    以创建一个节点为例,从控制台可以看到正常的创建了节点"/yang"。另外发现连接zookeeper集群成功后,才打印"finish~~~",这是使用了线程递减锁的原因。

    也可以实现监听。

    package com.boe.zk;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.Date;
    import java.util.concurrent.CountDownLatch;
    
    public class ZookeeperDemo2 {
        //连接zk
        private ZooKeeper zk=null;
    
        @Before
        public void connect() throws IOException, InterruptedException {
            final CountDownLatch cdl=new CountDownLatch(1);
            zk=new ZooKeeper("192.168.200.100:2181", 5000, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getState()==Event.KeeperState.SyncConnected){
                        System.out.println("连接zk成功....");
                        cdl.countDown();
                    }
                }
            });
    
            cdl.await();
            System.out.println("finish");
        }
    
        //监控节点是否被改变
        @Test
        public void dataChange() throws KeeperException, InterruptedException {
            //连接和监控是非阻塞的,无论是否有变化,都会向下执行
            final CountDownLatch cdl=new CountDownLatch(1);
            zk.getData("/yang", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getType() == Event.EventType.NodeDataChanged){
                        System.out.println("数据节点数据在"+new Date()+"被改变");
                        cdl.countDown();
                    }
                }
            },
            null);
            //监控到了再结束
            cdl.await();
            System.out.println("监控结束");
        }
    
        //监控子节点个数变化
        @Test
        public void childrenChanged() throws KeeperException, InterruptedException {
            final CountDownLatch cdl=new CountDownLatch(1);
            zk.getChildren("/video", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getType()==Event.EventType.NodeChildrenChanged){
                        System.out.println("子节点个数发生"+new Date()+"变化");
                        cdl.countDown();
                    }
                }
            },null);
            //监控到了再结束
            cdl.await();
            System.out.println("监控结束");
    
        }
    
    
        //监控节点的增删变化
        @Test
        public void nodeChanged() throws KeeperException, InterruptedException {
            final CountDownLatch cdl=new CountDownLatch(1);
            zk.getChildren("/video", new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    if(event.getType()==Event.EventType.NodeCreated){
                        System.out.println("节点在"+new Date()+"创建");
                        cdl.countDown();
                    }else if(event.getType()==Event.EventType.NodeDeleted){
                        System.out.println("节点在"+new Date()+"删除");
                        cdl.countDown();
                    }
                }
            },null);
            //监控到了再结束
            cdl.await();
            System.out.println("监控结束");
        }
    
    }
    

    以监控节点是否被改变为例,启动后,当客户端对"/yang"数据进行更新后,控制台打印出监听信息。

    (2)curator API

    curator对zookeeper的api做了封装,提供简单易用的api,编程风格是链式编程。

    也可以实现基本的命令操作功能,此外监听部分可以重复监听,如果是命令只能生效一次,这个是有区别的地方。

    package com.boe.curator;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.ChildData;
    import org.apache.curator.framework.recipes.cache.TreeCache;
    import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
    import org.apache.curator.framework.recipes.cache.TreeCacheListener;
    import org.apache.curator.retry.RetryNTimes;
    import org.apache.zookeeper.CreateMode;
    
    /**
     * 测试使用java api来完成zookeeper常用操作
     */
    public class CuratorClientTest {
        //zookeeper集群信息
        private static final String ZK_SERVER_ADDRESS="node01:2181,node02:2181,node03:2181";
        //zookeeper节点名称
        private static final String ZK_NODE="/yang";
    
        //连接zookeeper的客户端对象
        private static CuratorFramework client=null;
    
        //初始化
        public static void init(){
            //测试连接重试策略
            RetryNTimes retryPolicy =new RetryNTimes(10,3000);//失败重试10次,每次间隔3秒
            //新建连接对象,需要两个参数,连接集群信息和连接策略
            client= CuratorFrameworkFactory.newClient(ZK_SERVER_ADDRESS,retryPolicy);
            //启动客户端,连接到zookeeper集群
            client.start();
            //打印
            System.out.println("zookeeper client connecting successful");
        }
    
        //关闭和集群的连接,体验链式编程
        public static void quit(){
            System.out.println("close session with zookeeper");
            client.close();
        }
    
        //创建永久节点
        public static void createPersistentZNode() throws Exception {
            //节点数据
            String znodedata="hello zookeeper,I am persistent node";
            //创建
            client.create().
                    creatingParentsIfNeeded().
                     withMode(CreateMode.PERSISTENT).//节点类型指定
                      forPath("/p_node",znodedata.getBytes());
        }
    
        //创建临时节点
        public static void createEphemeralZNode() throws Exception {
            //节点数据
            String znodedata="hello zookeeper,I am Ephemeral node";
            //创建
            client.create().
                    creatingParentsIfNeeded().
                     withMode(CreateMode.EPHEMERAL).
                      forPath("/e_node",znodedata.getBytes());
        }
    
        //修改节点
        public static void modifyZnodeData() throws Exception {
            //修改后节点数据
            String newdata="hello zookeeper,I am new data";
            //修改数据
            printCmd("set",ZK_NODE,newdata);
            client.setData().forPath(ZK_NODE,newdata.getBytes());
            printCmd("get",ZK_NODE);
            print(client.getData().forPath(ZK_NODE));
        }
    
        //删除节点
        public static void deleteZNode() throws Exception {
            printCmd("delete",ZK_NODE);
            client.delete().forPath(ZK_NODE);
            //再查询
            printCmd("ls","/");
            print(client.getChildren().forPath("/"));
        }
    
        //监听节点,这个监听可以反复监听,如果是命令中添加一个watch,只能监听一次
        public static void watchZNode() throws Exception {
            //使用TreeCache,需要传入client和path,为啥使用TreeCache,参考如下如下官方文档说明
            /**
             * A utility that attempts to keep all data from all children of a ZK path locally cached.
             * This class will watch the ZK path, respond to update/create/delete events, pull down the data, etc.
             * You can register a listener that will get notified when changes occur
             */
            TreeCache treeCache=new TreeCache(client,ZK_NODE);//就是监听ZK_NODE的节点
            //设置监听器和处理过程
            treeCache.getListenable().addListener(new TreeCacheListener() {//匿名内部类
                @Override
                public void childEvent(CuratorFramework cliet, TreeCacheEvent event) throws Exception {
                    ChildData data = event.getData();
                    if(data!=null){
                        switch (event.getType()){
                            case NODE_ADDED:
                                System.out.println("NODE_ADDED: "+data.getPath()+"数据为:"+new String(data.getData()));
                                break;
                            case NODE_UPDATED:
                                System.out.println("NODE_UPDATED: "+data.getPath()+"数据为:"+new String(data.getData()));
                                break;
                            case NODE_REMOVED:
                                System.out.println("NODE_REMOVED: "+data.getPath()+"数据为:"+new String(data.getData()));
                                break;
                            default:
                                break;
                        }
                    } else{
                        System.out.println("data is null:"+event.getType());
                    }
                }
            });
            //开始监听
            treeCache.start();
            //持续监听一段时间
            Thread.sleep(60000);
            //关闭监听
            treeCache.close();
        }
    
        //查询节点
        public static void queryZNode() throws Exception {
            printCmd("ls","/");
            //进入路径
            print(client.getChildren().forPath("/"));
            //查询
            printCmd("get",ZK_NODE);
            print(client.getData().forPath(ZK_NODE));
    
        }
    
        //输出命令
        public static void printCmd(String... cmds){
            StringBuilder stringBuilder=new StringBuilder("$ ");
            for (String cmd : cmds) {
                stringBuilder.append(cmd).append(" ");//每个命令之间用空格隔开
            }
            System.out.println(stringBuilder.toString());
        }
    
        //如果是数组,输出数组内容,不是就直接输出对象
        public static void print(Object result){
            System.out.println(result instanceof byte[]? new String((byte[]) result):result);
        }
    
        //main方法
        public static void main(String[] args) throws Exception {
            init();
            //createPersistentZNode();
            //createEphemeralZNode();
            //Thread.sleep(10000);//10s后临时节点在会话结束后删除
            //queryZNode();
            //modifyZnodeData();
            //deleteZNode();
            watchZNode();
            quit();
        }
    }
    

    以监听为例,当在client端多次修改"/yang",控制台可以监听到做了什么修改,并且可以监听多次。

    以上,理解不一定正确,学习就是一个不断认识和纠错的过程。

    参考博文:

    (1)https://zookeeper.apache.org/doc/current/zookeeperOver.html 官网介绍

    (2)https://www.cnblogs.com/youngchaolin/p/12113065.html 安装

    (3)https://blog.csdn.net/liuxiao723846/article/details/79391650 ACL

    (4)https://www.cnblogs.com/youngchaolin/p/12904014.html 事务id

  • 相关阅读:
    Filter
    Servlet
    Maven(Mac)
    SpringMVC 完美解决PUT请求参数绑定问题(普通表单和文件表单)
    Android Bitmap
    Android ContentProvider
    浅谈数据库事务隔离
    开启Spring Initializr个性化之旅
    java浮点型精度丢失浅析
    爬取糗事百科段子
  • 原文地址:https://www.cnblogs.com/youngchaolin/p/12905023.html
Copyright © 2011-2022 走看看