Zookeeepr实现分布式集群监控
Zookeeper中节点有两种:临时节点和永久节点
从类型上看节点又可以分为四种节点类型:PERSIST,PERSIST_SEQUENTIAL,EPHEMERAL,EPHEMERAL_SEQUENTIAL
临时节点有一个特点:当创建临时节点的程序停掉之后,这个临时节点就会消失。
监视器的特点:可以给zk中的节点注册监视器,见识这个节点的变化情况。
监视器注册一次,只能使用一次,多次使用就要多次注册。
我们利用这个Zookeeper的临时节点特性+监视器(Watch)来实现分布式集群监控
我们在/monitor(永久节点)下创建临时节点。
实际上,zookeeper的sdk不是特别好用,很多边界情况需要用户自己处理。curator是对Zookeeper的sdk进行了封装,所以说使用curator操作Zookeeper更加方便
在maven官网找到curator的依赖
<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework,支持zookeeper3.4.6--> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.10.0</version> </dependency>
我们通过curator来使用zookeeper,那么我们就必须知道如何使用curator来连上zookeeper,下面代码是官方文档所给出
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
开始我们的代码
TestCurator.java ,实现功能:创建Zookeeper临时节点
1 package zkdemo; 2 3 import java.net.InetAddress; 4 5 import org.apache.curator.RetryPolicy; 6 import org.apache.curator.framework.CuratorFramework; 7 import org.apache.curator.framework.CuratorFrameworkFactory; 8 import org.apache.curator.retry.ExponentialBackoffRetry; 9 10 import org.apache.zookeeper.CreateMode; 11 import org.apache.zookeeper.ZooDefs.Ids; 12 import org.apache.zookeeper.ZooKeeper; 13 import org.junit.Test; 14 15 16 public class TestCurator { 17 18 @Test 19 public void test1() throws Exception{ 20 //1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数 21 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 22 String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181"; 23 int sessionTimeoutMs = 5000;//这个值只能在4000-40000ms之间表示链接断开后多长时间临时节点会小时 24 int connectionTimeoutMs = 3000;//获取链接的超时时间 25 //创建一个zk连接 26 CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs 27 ,connectionTimeoutMs,retryPolicy); 28 29 client.start(); 30 31 InetAddress localHost = InetAddress.getLocalHost(); 32 String ip = localHost.getHostAddress(); 33 34 client.create().creatingParentsIfNeeded() 35 .withMode(CreateMode.EPHEMERAL)//指定节点类型 36 .withACL(Ids.OPEN_ACL_UNSAFE)//指定设置节点权限信息 37 .forPath("/monitor/"+ip);//指定节点名称 38 39 while(true) 40 { 41 ; 42 } 43 } 44 }
ZkNodeWatcher.java 实现功能:注册watcher,监视节点的变化
1 package zk; 2 3 import java.util.List; 4 import java.util.ArrayList; 5 6 import org.apache.curator.RetryPolicy; 7 import org.apache.curator.framework.CuratorFramework; 8 import org.apache.curator.framework.CuratorFrameworkFactory; 9 import org.apache.curator.retry.ExponentialBackoffRetry; 10 import org.apache.zookeeper.WatchedEvent; 11 import org.apache.zookeeper.Watcher; 12 13 14 15 public class ZkNodeWatcher implements Watcher{ 16 CuratorFramework client; 17 List<String> childrenList = new ArrayList<String>(); 18 List<String> newChildrenList = new ArrayList<String>(); 19 20 public ZkNodeWatcher(){ 21 //1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数 22 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); 23 String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181"; 24 int sessionTimeoutMs = 5000;//这个值只能在4000-40000ms之间表示链接断开后多长时间临时节点会消失 25 int connectionTimeoutMs = 3000;//获取链接的超时时间 26 //创建一个zk连接 27 client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs 28 ,connectionTimeoutMs,retryPolicy); 29 client.start(); 30 31 //监视monitor节点,获取下面的所有子节点的变化情况 32 try { 33 childrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); 34 } catch (Exception e) { 35 // TODO Auto-generated catch block 36 e.printStackTrace(); 37 } 38 } 39 40 41 42 /** 43 * 实现一个zk监视器,监视某个节点的变化情况 44 * 45 * 这个监视程序需要一直运行 46 * @CPH 47 */ 48 49 public void process(WatchedEvent event) { 50 System.out.println("我被调用了"); 51 try { 52 newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor"); 53 for(String ip : childrenList) 54 { 55 if(!newChildrenList.contains(ip)){ 56 System.out.println("节点消失了"+ip); 57 //TODO 给管理员发送短信什么的 58 59 } 60 } 61 62 for(String ip : newChildrenList){ 63 if(!childrenList.contains(ip)){ 64 System.out.println("节点新增"+ip); 65 } 66 } 67 //重要 68 childrenList = newChildrenList; 69 } catch (Exception e) { 70 // TODO Auto-generated catch block 71 e.printStackTrace(); 72 } 73 74 75 76 } 77 78 public void start(){ 79 while (true){;} 80 } 81 82 public static void main(String[] args) { 83 ZkNodeWatcher watcher = new ZkNodeWatcher(); 84 watcher.start(); 85 } 86 }
我们先开启Zookeeper集群,启动/bin/zkCli.sh,然后启动这2个集群,我们可以看到由对应的ip目录,这个ip不是虚拟机的ip,而是本地的ip,同时我们console下看到
然后暂停TestCurator.java,不一会儿,就会看到
这样,通过Zookeeper实现分布式集群监控的功能就完成了!