zoukankan      html  css  js  c++  java
  • Zookeeper实现分布式集群监控

    Zookeeepr实现分布式集群监控

    Zookeeper中节点有两种:临时节点和永久节点

    从类型上看节点又可以分为四种节点类型:PERSISTPERSIST_SEQUENTIAL,EPHEMERAL,EPHEMERAL_SEQUENTIAL

    临时节点有一个特点:当创建临时节点的程序停掉之后,这个临时节点就会消失。

    监视器的特点:可以给zk中的节点注册监视器,见识这个节点的变化情况。

    监视器注册一次,只能使用一次,多次使用就要多次注册。

    我们利用这个Zookeeper的临时节点特性+监视器(Watch)来实现分布式集群监控

    我们在/monitor(永久节点)下创建临时节点。

    实际上,zookeepersdk不是特别好用,很多边界情况需要用户自己处理。curator是对Zookeepersdk进行了封装,所以说使用curator操作Zookeeper更加方便

    maven官网找到curator的依赖

    <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework,支持zookeeper3.4.6-->
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>2.10.0</version>
    </dependency>

    我们通过curator来使用zookeeper,那么我们就必须知道如何使用curator来连上zookeeper,下面代码是官方文档所给出

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
    client.start();

    开始我们的代码

    TestCurator.java ,实现功能:创建Zookeeper临时节点

     1 package zkdemo;
     2 
     3 import java.net.InetAddress;
     4 
     5 import org.apache.curator.RetryPolicy;
     6 import org.apache.curator.framework.CuratorFramework;
     7 import org.apache.curator.framework.CuratorFrameworkFactory;
     8 import org.apache.curator.retry.ExponentialBackoffRetry;  
     9 
    10 import org.apache.zookeeper.CreateMode;
    11 import org.apache.zookeeper.ZooDefs.Ids;
    12 import org.apache.zookeeper.ZooKeeper;
    13 import org.junit.Test;
    14 
    15 
    16 public class TestCurator {
    17     
    18     @Test
    19     public void test1() throws Exception{
    20         //1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
    21         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    22         String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181";
    23         int sessionTimeoutMs = 5000;//这个值只能在4000-40000ms之间表示链接断开后多长时间临时节点会小时
    24         int connectionTimeoutMs = 3000;//获取链接的超时时间
    25         //创建一个zk连接
    26         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs
    27                 ,connectionTimeoutMs,retryPolicy);
    28         
    29         client.start();
    30         
    31         InetAddress localHost = InetAddress.getLocalHost();
    32         String ip = localHost.getHostAddress();
    33         
    34         client.create().creatingParentsIfNeeded()
    35         .withMode(CreateMode.EPHEMERAL)//指定节点类型
    36         .withACL(Ids.OPEN_ACL_UNSAFE)//指定设置节点权限信息
    37         .forPath("/monitor/"+ip);//指定节点名称
    38         
    39         while(true)
    40         {
    41             ;
    42         }
    43     }
    44 }

    ZkNodeWatcher.java 实现功能:注册watcher,监视节点的变化

     1 package zk;
     2 
     3 import java.util.List;
     4 import java.util.ArrayList;
     5 
     6 import org.apache.curator.RetryPolicy;
     7 import org.apache.curator.framework.CuratorFramework;
     8 import org.apache.curator.framework.CuratorFrameworkFactory;
     9 import org.apache.curator.retry.ExponentialBackoffRetry;
    10 import org.apache.zookeeper.WatchedEvent;
    11 import org.apache.zookeeper.Watcher;
    12 
    13 
    14 
    15 public class ZkNodeWatcher implements Watcher{
    16     CuratorFramework client;
    17     List<String> childrenList = new ArrayList<String>();
    18     List<String> newChildrenList = new ArrayList<String>();
    19 
    20     public  ZkNodeWatcher(){
    21         //1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数
    22         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
    23         String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181";
    24         int sessionTimeoutMs = 5000;//这个值只能在4000-40000ms之间表示链接断开后多长时间临时节点会消失
    25         int connectionTimeoutMs = 3000;//获取链接的超时时间
    26         //创建一个zk连接
    27         client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs
    28                 ,connectionTimeoutMs,retryPolicy);
    29         client.start();
    30         
    31         //监视monitor节点,获取下面的所有子节点的变化情况
    32         try {
    33             childrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
    34         } catch (Exception e) {
    35             // TODO Auto-generated catch block
    36             e.printStackTrace();
    37         }
    38     }
    39         
    40     
    41 
    42     /**
    43      * 实现一个zk监视器,监视某个节点的变化情况
    44      * 
    45      * 这个监视程序需要一直运行
    46      * @CPH
    47      */
    48     
    49     public void process(WatchedEvent event) {
    50         System.out.println("我被调用了");
    51         try {
    52             newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
    53             for(String ip : childrenList)
    54             {
    55                 if(!newChildrenList.contains(ip)){
    56                     System.out.println("节点消失了"+ip);
    57                     //TODO 给管理员发送短信什么的
    58                     
    59                 }
    60             }
    61             
    62         for(String ip : newChildrenList){
    63             if(!childrenList.contains(ip)){
    64                 System.out.println("节点新增"+ip);
    65             }
    66         }
    67         //重要
    68         childrenList = newChildrenList;
    69         } catch (Exception e) {
    70             // TODO Auto-generated catch block
    71             e.printStackTrace();
    72         }
    73         
    74         
    75         
    76     }
    77     
    78     public void start(){
    79         while (true){;}
    80         }
    81 
    82     public static void main(String[] args) {
    83         ZkNodeWatcher watcher = new ZkNodeWatcher();
    84         watcher.start();
    85     }
    86 }

    我们先开启Zookeeper集群,启动/bin/zkCli.sh,然后启动这2个集群,我们可以看到由对应的ip目录,这个ip不是虚拟机的ip,而是本地的ip,同时我们console下看到

    然后暂停TestCurator.java,不一会儿,就会看到

    这样,通过Zookeeper实现分布式集群监控的功能就完成了!

  • 相关阅读:
    docker十一:docker-DockerFile案例-CMD、ENTRYPOINT、ONBUILD
    查看JVM使用的什么垃圾收集器
    Druid 加载 Kafka 流数据的性能配置参数 TuningConfig
    NPM 和 NVM
    Windows 中 Node.js 中 nvm 的安装配置和使用
    Nvm 安装新的 nodejs 版本
    Druid 加载 Kafka 流数据配置可以读取和处理的流中数据格式
    Druid 加载 Kafka 流数据 KafkaSupervisorIOConfig 配置信息表
    Java 面试都只是背答案不
    有什么理由将代码保存为 GBK 编码
  • 原文地址:https://www.cnblogs.com/WardSea/p/7435767.html
Copyright © 2011-2022 走看看