众所周知,ZooKeeper中的ZNode是树形结构,现在我需要给/app1结点设置watcher,监听/app1下增减、删除和修改的结点,并将相应的事件使用log4j记录到日志文件中。ZNode的变化可以直接通过event.getType来获取。使用zk.exists(PATH, wc);来为PATH结点设置watcher,所有结点都可以使用wc做watcher。
代码如下:
package com.iflytek.cpcloud.zookeeper; import java.io.IOException; import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.PropertyConfigurator; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.Watcher.Event.EventType; public class WatchClient implements Runnable { private static final Log LOG = LogFactory.getLog(WatchClient.class); public static final int CLIENT_PORT = 2181; public static final String PATH = "/app1";// 所要监控的结点 private static ZooKeeper zk; private static List<String> nodeList;// 所要监控的结点的子结点列表 public static void main(String[] args) throws Exception { PropertyConfigurator.configure("F:\test\conf\log4j.properties"); WatchClient client = new WatchClient(); Thread th = new Thread(client); th.start(); } public WatchClient() throws IOException { zk = new ZooKeeper("192.168.255.133:" + CLIENT_PORT, 21810, new Watcher() { public void process(WatchedEvent event) { } }); } /** * 设置watch的线程 */ @Override public void run() { Watcher wc = new Watcher() { @Override public void process(WatchedEvent event) { // 结点数据改变之前的结点列表 List<String> nodeListBefore = nodeList; // 主结点的数据发生改变时 if (event.getType() == EventType.NodeDataChanged) { LOG.info("Node data changed:" + event.getPath()); } if (event.getType() == EventType.NodeDeleted){ LOG.info("Node deleted:" + event.getPath()); } if(event.getType()== EventType.NodeCreated){ LOG.info("Node created:"+event.getPath()); } // 获取更新后的nodelist try { nodeList = zk.getChildren(event.getPath(), false); } catch (KeeperException e) { System.out.println(event.getPath()+" has no child, deleted."); } catch (InterruptedException e) { e.printStackTrace(); } List<String> nodeListNow = nodeList; // 增加结点 if (nodeListBefore.size() < nodeListNow.size()) { for (String str : nodeListNow) { if (!nodeListBefore.contains(str)) { LOG.info("Node created:" + event.getPath() + "/" + str); } } } } }; /** * 持续监控PATH下的结点 */ while (true) { try { zk.exists(PATH, wc);//所要监控的主结点 } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } try { nodeList = zk.getChildren(PATH, wc); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } // 对PATH下的每个结点都设置一个watcher for (String nodeName : nodeList) { try { zk.exists(PATH + "/" + nodeName, wc); } catch (KeeperException | InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(3000);// sleep一会,减少CUP占用率 } catch (InterruptedException e) { e.printStackTrace(); } } } }
该项目使用maven构建。pom.xml文件如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.iflytek.cpcloud</groupId> <artifactId>zookeeper-test</artifactId> <version>0.1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> <version>1.2</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2-beta-5</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.6</source> <target>1.6</target> <encoding>UTF-8</encoding> </configuration> </plugin> </plugins> </build> </project>