tickTime=2000 dataDir=D:/devtools/zookeeper-3.2.2/build clientPort=2181 |
-
tickTime:这个时间是作为 Zookeeper 服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个 tickTime 时间就会发送一个心跳。
-
dataDir:顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存在这个目录里。
- clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求。
initLimit=5 syncLimit=2 server.1=192.168.211.1:2888:3888 server.2=192.168.211.2:2888:3888 |
-
initLimit:这个配置项是用来配置 Zookeeper 接受客户端(这里所说的客户端不是用户连接 Zookeeper 服务器的客户端,而是 Zookeeper 服务器集群中连接到 Leader 的 Follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过 10 个心跳的时间(也就是 tickTime)长度后 Zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 5*2000=10 秒
-
syncLimit:这个配置项标识 Leader 与 Follower 之间发送消息,请求和应答时间长度,最长不能超过多少个 tickTime 的时间长度,总的时间长度就是 2*2000=4 秒
-
server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。
|
|
|
| String create(String path, byte[] data, List<ACL> acl,CreateMode createMode) | 创建一个给定的目录节点 path, 并给它设置数据,CreateMode 标识有四种形式的目录节点,分别是 PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;EPHEMERAL_SEQUENTIAL:临时自动编号节点 |
| Stat exists(String path, boolean watch) | 判断某个 path 是否存在,并设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper 实例时指定的 watcher,exists方法还有一个重载方法,可以指定特定的watcher |
| Stat exists(String path,Watcher watcher) | 重载方法,这里给某个目录节点设置特定的 watcher,Watcher 在 ZooKeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应 |
| void delete(String path, int version) | 删除 path 对应的目录节点,version 为 -1 可以匹配任何版本,也就删除了这个目录节点所有数据 |
| List<String>getChildren(String path, boolean watch) | 获取指定 path 下的所有子目录节点,同样 getChildren方法也有一个重载方法可以设置特定的 watcher 监控子节点的状态 |
| Stat setData(String path, byte[] data, int version) | 给 path 设置数据,可以指定这个数据的版本号,如果 version 为 -1 怎可以匹配任何版本 |
| byte[] getData(String path, boolean watch, Stat stat) | 获取这个 path 对应的目录节点存储的数据,数据的版本等信息可以通过 stat 来指定,同时还可以设置是否监控这个目录节点数据的状态 |
| voidaddAuthInfo(String scheme, byte[] auth) | 客户端将自己的授权信息提交给服务器,服务器将根据这个授权信息验证客户端的访问权限。 |
| Stat setACL(String path,List<ACL> acl, int version) | 给某个目录节点重新设置访问权限,需要注意的是 Zookeeper 中的目录节点权限不具有传递性,父目录节点的权限不能传递给子目录节点。目录节点 ACL 由两部分组成:perms 和 id。 Perms 有 ALL、READ、WRITE、CREATE、DELETE、ADMIN 几种 而 id 标识了访问目录节点的身份列表,默认情况下有以下两种: ANYONE_ID_UNSAFE = new Id("world", "anyone") 和 AUTH_IDS = new Id("auth", "") 分别表示任何人都可以访问和创建者拥有访问权限。 |
| List<ACL>getACL(String path,Stat stat) | 获取某个目录节点的访问权限列表 |
// 创建一个与服务器的连接
ZooKeeper zk = new ZooKeeper("localhost:" + CLIENT_PORT,
ClientBase.CONNECTION_TIMEOUT, new Watcher() {
// 监控所有被触发的事件
public void process(WatchedEvent event) {
System.out.println("已经触发了" + event.getType() + "事件!");
}
});
// 创建一个目录节点
zk.create("/testRootPath", "testRootData".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// 创建一个子目录节点
zk.create("/testRootPath/testChildPathOne", "testChildDataOne".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath",false,null)));
// 取出子目录节点列表
System.out.println(zk.getChildren("/testRootPath",true));
// 修改子目录节点数据
zk.setData("/testRootPath/testChildPathOne","modifyChildDataOne".getBytes(),-1);
System.out.println("目录节点状态:["+zk.exists("/testRootPath",true)+"]");
// 创建另外一个子目录节点
zk.create("/testRootPath/testChildPathTwo", "testChildDataTwo".getBytes(),
Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
System.out.println(new String(zk.getData("/testRootPath/testChildPathTwo",true,null)));
// 删除子目录节点
zk.delete("/testRootPath/testChildPathTwo",-1);
zk.delete("/testRootPath/testChildPathOne",-1);
// 删除父目录节点
zk.delete("/testRootPath",-1);
// 关闭连接
zk.close();
|
已经触发了 None 事件! testRootData [testChildPathOne] 目录节点状态:[5,5,1281804532336,1281804532336,0,1,0,0,12,1,6] 已经触发了 NodeChildrenChanged 事件! testChildDataTwo 已经触发了 NodeDeleted 事件! 已经触发了 NodeDeleted 事件! |
- I0Itec-zkClient
zkClient简单的使用样例如下:
下面表格列出了写操作与ZK内部产生的事件的对应关系:
event For “/path”event For “/path/child”|
|
||
| create(“/path”) | EventType.NodeCreated | NA |
| delete(“/path”) | EventType.NodeDeleted | NA |
| setData(“/path”) | EventType.NodeDataChanged | NA |
| create(“/path/child”) | EventType.NodeChildrenChanged | EventType.NodeCreated |
| delete(“/path/child”) | EventType.NodeChildrenChanged | EventType.NodeDeleted |
| setData(“/path/child”) | NA | EventType.NodeDataChanged |
而ZK内部的写事件与所触发的watcher的对应关系如下:
event For “/path”defaultWatcherexists(“/path”)getData
(“/path”)getChildren
(“/path”)
|
|
||||
| EventType.None | √ | √ | √ | √ |
| EventType.NodeCreated | √ | √ | ||
| EventType.NodeDeleted | √(不正常) | √ | ||
| EventType.NodeDataChanged | √ | √ | ||
| EventType.NodeChildrenChanged | √ |
综合上面两个表,我们可以总结出各种写操作可以触发哪些watcher,如下表所示:
“/path”“/path/child” existsgetDatagetChildrenexistsgetDatagetChildren|
|
||||||
|
|
||||||
| create(“/path”) | √ | √ | ||||
| delete(“/path”) | √ | √ | √ | |||
| setData(“/path”) | √ | √ | ||||
| create(“/path/child”) | √ | √ | √ | |||
| delete(“/path/child”) | √ | √ | √ | √ | ||
| setData(“/path/child”) | √ | √ |
如果发生session close、authFail和invalid,那么所有类型的wather都会被触发。
zkClient除了做了一些便捷包装之外,对watcher使用做了一点增强。比如subscribeChildChanges实际上是通过exists和getChildren关注了两个事件。这样当create(“/path”)时,对应path上通过getChildren注册的listener也会被调用。另外subscribeDataChanges实际上只是通过exists注册了事件。因为从上表可以看到,对于一个更新,通过exists和getData注册的watcher要么都会触发,要么都不会触发。
zkClient地址:https://github.com/sgroschupf/zkclient
Maven工程中使用zkClient需要加的依赖:
<dependency>
<groupId>zkclient</groupId>
<artifactId>zkclient</artifactId>
<version>0.1</version>
</dependency>
- menagerie
基于分布式锁,还实现了其他业务场景,比如leader选举:
public static void leaderElectionTest() {
ZkSessionManager zksm = new DefaultZkSessionManager(“ZK-host-ip:2181″, 5000);
LeaderElector elector = new ZkLeaderElector(“/leaderElectionTest”, zksm, Ids.OPEN_ACL_UNSAFE);
if (elector.nominateSelfForLeader()) {
System.out.println(“Try to become the leader success!”);
}
}
java.util.concurrent包下面的其他接口实现,也主要是基于ReentrantZkLock的,比如ZkHashMap实现了ConcurrentMap。具体请参见menagerie的API文档
menagerie地址:https://github.com/openUtility/menagerie
Maven工程中使用menagerie需要加的依赖:
<dependency>
<groupId>org.menagerie</groupId>
<artifactId>menagerie</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
void findLeader() throws InterruptedException {
byte[] leader = null;
try {
leader = zk.getData(root + "/leader", true, null);
} catch (Exception e) {
logger.error(e);
}
if (leader != null) {
following();
} else {
String newLeader = null;
try {
byte[] localhost = InetAddress.getLocalHost().getAddress();
newLeader = zk.create(root + "/leader", localhost,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
} catch (Exception e) {
logger.error(e);
}
if (newLeader != null) {
leading();
} else {
mutex.wait();
}
}
}
|
void getLock() throws KeeperException, InterruptedException{
List<String> list = zk.getChildren(root, false);
String[] nodes = list.toArray(new String[list.size()]);
Arrays.sort(nodes);
if(myZnode.equals(root+"/"+nodes[0])){
doAction();
}
else{
waitForLock(nodes[0]);
}
}
void waitForLock(String lower) throws InterruptedException, KeeperException {
Stat stat = zk.exists(root + "/" + lower,true);
if(stat != null){
mutex.wait();
}
else{
getLock();
}
}
|
void addQueue() throws KeeperException, InterruptedException{
zk.exists(root + "/start",true);
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
synchronized (mutex) {
List<String> list = zk.getChildren(root, false);
if (list.size() < size) {
mutex.wait();
} else {
zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
}
}
|
public void process(WatchedEvent event) {
if(event.getPath().equals(root + "/start") &&
event.getType() == Event.EventType.NodeCreated){
System.out.println("得到通知");
super.process(event);
doAction();
}
}
|
boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);
return true;
}
|
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
if(tempValue < min) min = tempValue;
}
byte[] b = zk.getData(root + "/element" + min,false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();
return retvalue;
}
}
}
}
|