zoukankan      html  css  js  c++  java
  • 基于Zookeeper实现客户端动态监听服务器上下线

    一、在具体实现之前,先来了解一下Zookeeper的监听器的原理:

       

      图中Main()线程作为客户端,当在主线程中创建Zookeeper客户端时,会默认创建两个子线程:Listenerconnectconnect线程负责将某一操作对应的的监听事件发送给Zookeeper服务集群。Zookeeper收到监听事件后会在该操作对应的监听器列表中注册该事件。

      比如图中的获取节点/”的子节点getChildren这一事件,并设置了true,表示监听此事件,那么Zookeeper就会在监听器列表中注册该事件。一旦“/”节点的子节点发生变化,getChildren的结果就随之发生变化,Zookeeper就会通知客户端的Listener线程,Listener就会去调用process方法对“/”的变化做出应对处理。“/”的变化可能是客户端不能控制的,但是为了适应这种变化,客户端在收到服务器的通知后可根据自身情况做出应对。

    二、这样说可能比较抽象,我们用一个案例来说明:

    public class ZkDemo {
        private String connect = "hadoop101:2181";
        private int timeout = 2000;
        private ZooKeeper zooKeeper = null;
    
        //1、获取Zookeeper客户端,用于连接Zookeeper集群,其功能类似于Linux中启动./zkCli.sh
        @Before
        public void getClient() throws Exception{
            zooKeeper = new ZooKeeper(connect, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println(event.getPath() + "已被修改,是否确定?");
                    try {
                        zooKeeper.getChildren("/lsj",true);
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
        /**
         * 查看子节点
         */
        @Test
        public void testList() throws Exception {
            List<String> children = zooKeeper.getChildren("/lsj", true);
            for (String str: children) {
                System.out.println(str);
            }
            Thread.sleep(Long.MAX_VALUE);
        }
    }

    附:Thread.sleep()的设置是为了让testList方法从某种意义上更像是客户端那样持续保持连接,所以必须让testList方法处在执行的过程中。

      默认的监听次数是一次(List<String> children = zooKeeper.getChildren("/lsj", true);执行后,Zookeeper会在/lsj节点的监听列表中注册一个监听事件,如果该节点发生变化,就通知给申请监听的客户端的listener,并将该监听事件从节点的监听列表中删除),服务端并不会持续为某客户端监听某一节点的变化。如果被监听的节点发生变化,会调用监听器的process方法,可以在process方法中再次调用getChildre方法并申请对目标节点的监听,通过这个小技巧使得监听的次数变得无限多了。只需记住:这段代码启动后对Zookeeper来说也是一个客户端,设为客户端1。zooKeeper.getChildren("/lsj", true)设置了监听此事件,当/lsj节点下的子节点发生变化时getChildren方法结果会发生变化,会触发监听器Watcher(就是那个内部类)执行process方法,在process方法中执行了应对变化的处理后再次调用getChildren方法,这才使得监听好像变的无限多了。

      下图所示:在Linux中启动的客户端(设为客户端2每一次对/lsj节点增加节点都会视为getChildren结果的变化,故而Zookeeper会通知客户端1这种变化,进而触发监听器执行process方法。

       

       

    三、下面进入Zookeeper的正题:如何利用Zookeeper监听服务器集群的变化,并在服务器集群变化时通知客户端呢?

      原理和上面一样一样的,只需将客户端1想象成下文的客户端群体,把客户端2想想成服务器群体。如果没有Zookeeper这个角色,让客户端直接和服务器接触,当客户端请求的一台服务器正好宕机时,客户端将无法获取资源,但又不知道这是服务器宕机所造成的问题,也无法改变请求到另一台正常运行的服务器,那么这个问题如何解决呢?

      思路是这样的,每一台服务器上线时都会在Zookeeper上的/ServerCluster的节点下创建一个标识本机的子节点(临时的 -e )。当,某一台服务器宕机时,那么其在/ServerCluster下创建的节点就会随之消失。我们让客户端获取服务器信息时,监听/ServerCluster的子节点变化,那么当某一台服务器宕机时(临时节点随之消失)Zookeeper会通知客户端/ServerCluster的变化,客户端也就知道了具体的哪一台服务器当机,哪一台服务器正常运行可以访问了。

      在这个过程中,最重要的一点:服务器和客户端对于Zookeeper集群来说,都是客户端。

      

    具体实现:

    public class Server {
    
        private static String connect = "hadoop101:2181";
        private static int timeout = 2000;
        private static ZooKeeper zooKeeper = null;
        private static String parentPath = "/ServerCluster";
    
        public static void main(String[] args) throws Exception {
    
            //1、获取一个Zookeeper的客户端对象,用于服务器向Zookeeper集群注册自己。
            getClient();
    
            //2、把本服务器的主机名注册到Zookeeper中的特定节点中
            registServer(args[0]);
    
            //3、服务器本身的业务逻辑
            getBusiness(args[0]);
        }
    
        private static void getBusiness(String hostname) throws InterruptedException {
            System.out.println(hostname + " is working...");
            Thread.sleep(Long.MAX_VALUE);
        }
    
        private static void registServer(String hostname) throws KeeperException, InterruptedException {
    
            //创建临时节点
            String path = zooKeeper.create(parentPath + "/server",
                    hostname.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(hostname + " is online...");
        }
    
        private static void getClient() throws Exception {
            zooKeeper = new ZooKeeper(connect, timeout, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    System.out.println(event.getType() + "---" + event.getPath());
                }
            });
        }
    }
    public class Client {
        private static String connect = "hadoop101:2181";
        private static int timeout = 2000;
        private static ZooKeeper zooKeeper = null;
        private static String parentPath = "/ServerCluster";
    public static void main(String[] args) throws Exception { //1、获取一个Zookeeper的客户端对象,用于服务器向Zookeeper集群注册自己。 getClient(); //2、获取服务器列表(主机名),并监听 getServers(); //3、客户端的业务逻辑 getBusiness(); } private static void getBusiness() throws InterruptedException { System.out.println("Client is working..."); Thread.sleep(Long.MAX_VALUE); } private static void getServers() throws KeeperException, InterruptedException { //向Zookeeper给getChildren方法注册监听,一旦parentPath节点发生变化,就会通知监听器触发process方法 List<String> children = zooKeeper.getChildren(parentPath, true); //此集合用于保存服务器主机名 ArrayList<String> hosts = new ArrayList(); for (String child: children) { byte[] data = zooKeeper.getData(parentPath + "/" + child, false, null); hosts.add(new String(data)); } System.out.println(hosts); } private static void getClient() throws Exception { zooKeeper = new ZooKeeper(connect, timeout, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.getType() + "---" + event.getPath()); //执行到本方法就说明parentPath已经被修改了,即服务器列表发生变化,需要重新获取。 try { getServers(); } catch (Exception e) { e.printStackTrace(); } } }); } }

      

    通过配置参数开启三个服务器hadoop101,开启一个Client

       

    关闭hadoop101

      

    Client观察到这种变化,打印新的服务器列表:

       

       

    当然,如果在Linux中加入一个“服务器”,Client也可以监听到:

       

  • 相关阅读:
    轻量模型之Mobilenet
    GAN的Loss
    Ubuntu16.04安装后配置一条龙
    Hardnet论文阅读
    orb-slam2编译时遇到的问题
    编译opencv+opencv_contrib
    Sophus库使用踩坑
    CloudCompare Viewer使用心得
    交通场景语义分割
    ROS编译中遇到的问题
  • 原文地址:https://www.cnblogs.com/superlsj/p/11975298.html
Copyright © 2011-2022 走看看