zoukankan      html  css  js  c++  java
  • zookeeper 实战操作

    一:监听服务端zookeeper节点数据改变

    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    
    public class ConfigApp1 {
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
            //连接zookeeper服务器
            ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 5000, 
                    new Watcher() {
                        public void process(WatchedEvent event) {
                            if (KeeperState.SyncConnected == event.getState()) {    //zk连接成功通知事件
                                if ( EventType.None == event.getType() && null == event.getPath() ) {
                                    connectedSemaphore.countDown();
                                    System.out.println("===========");
                                }
                            }
                            
                        }
                    });
    
            connectedSemaphore.await();
            //创建节点app1,不进行ACL权限控制,EPHEMERAL:临时节点
            zk.create("/app1", "app1Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            
            //注册对该节点的监听
            zk.exists("/app1", new WatcherClass(zk));
            
            //延迟2秒后 开始改变数据
            TimeUnit.SECONDS.sleep(2);
            for(int i = 0; i < 10; i++) {
                TimeUnit.SECONDS.sleep(1);
                String s = ("app" + i * 10);
                zk.setData("/app1", s.getBytes(), -1);
                System.out.println("数据改变了:"+s);
            }
            System.in.read();
        }
        
        static class WatcherClass implements Watcher {
            private ZooKeeper zk;
            
            public WatcherClass(ZooKeeper zk) {
                this.zk = zk;
            }
    
            @Override
            public void process(WatchedEvent arg0) {
                try {
                    byte[] b = zk.getData("/app1", false, null);
                    System.out.println("改变数据通知:" + new String(b));
                    
                    //获取数据后,再次对节点进行监听
                    zk.exists("/app1", new WatcherClass(zk));
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            
        }
    }

    console结果截图:

     二:集群管理

      应用集群中,我们常常需要让每一个机器知道集群中(或依赖的其他某一个集群)哪些机器是活着的,并且在集群机器因为宕机,网络断链等原因能够不在人工介入的情况下迅速通知到每一个机器

     思路:用三个类模拟成三个服务器,去连接zookeeper,这三个服务器监控zookeeper节点root,每个服务器上线都会在zookeeper的节点root下创建一个临时节点,这样,这三个服务器watcher这个zookeeper的root节点就可以动态感知服务器的上下线情况。

     

    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.data.Stat;
    import org.apache.zookeeper.ZooKeeper;
    
    public class Cluster1 {
    
        private static final int zkSessionTimeOut = 5000;
        
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            //连接zookeeper服务器
            ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", zkSessionTimeOut, 
                    new Watcher() {
                        public void process(WatchedEvent event) {
                            if (KeeperState.SyncConnected == event.getState()) {    //zk连接成功通知事件
                                if ( EventType.None == event.getType() && null == event.getPath() ) {
                                    connectedSemaphore.countDown();
                                    System.out.println("===========");
                                }
                            }
                            
                        }
                    });
    
            connectedSemaphore.await();
            
            
            Stat stat = zk.exists("/root", true);
             if(stat == null) {
                 System.out.println("/root" + "路径不存在,请先创建该节点");
                 //创建节点root,不进行ACL权限控制,PERSISTENTAL:永久节点  只有永久节点才可以创建子节点的临时节点
                 zk.create("/root", "rootDate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
            String clusterPath = zk.create("/root/cluster1", "cluster1Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(clusterPath);
            
            ZkWatcher zkWatcher = new ZkWatcher(zk);
            List<String> clusterList = zk.getChildren("/root", zkWatcher);
            
            System.out.println("****************");
            for(String str : clusterList) {
                System.out.println("cluster:" + str);
            }
            System.out.println("****************");
            
            while(true) {
                
            }
        }
    
    }
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.data.Stat;
    
    public class Cluster2 {
    
        private static final int zkSessionTimeOut = 5000;
        
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            //连接zookeeper服务器
            ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", zkSessionTimeOut, 
                    new Watcher() {
                        public void process(WatchedEvent event) {
                            if (KeeperState.SyncConnected == event.getState()) {    //zk连接成功通知事件
                                if ( EventType.None == event.getType() && null == event.getPath() ) {
                                    connectedSemaphore.countDown();
                                    System.out.println("===========");
                                }
                            }
                            
                        }
                    });
    
            connectedSemaphore.await();
            
            Stat stat = zk.exists("/root", true);
             if(stat == null) {
                 System.out.println("/root" + "路径不存在,请先创建该节点");
                 zk.create("/root", "rootDate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
            String clusterPath = zk.create("/root/cluster2", "cluster2Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(clusterPath);
            
            ZkWatcher zkWatcher = new ZkWatcher(zk);
            List<String> clusterList = zk.getChildren("/root", zkWatcher);
            
            System.out.println("****************");
            for(String str : clusterList) {
                System.out.println("cluster:" + str);
            }
            System.out.println("****************");
            
            while(true) {
                
            }
        }
    
    }
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.Watcher.Event.KeeperState;
    import org.apache.zookeeper.ZooDefs.Ids;
    
    public class Cluster3 {
    
        private static final int zkSessionTimeOut = 5000;
        
        private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
        
        public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
            //连接zookeeper服务器
            ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", zkSessionTimeOut, 
                    new Watcher() {
                        public void process(WatchedEvent event) {
                            if (KeeperState.SyncConnected == event.getState()) {    //zk连接成功通知事件
                                if ( EventType.None == event.getType() && null == event.getPath() ) {
                                    connectedSemaphore.countDown();
                                    System.out.println("===========");
                                }
                            }
                            
                        }
                    });
    
            connectedSemaphore.await();
            
             Stat stat = zk.exists("/root", true);
             if(stat == null) {
                 System.out.println("/root" + "路径不存在,请先创建该节点");
                 zk.create("/root", "rootDate".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
            String clusterPath = zk.create("/root/cluster3", "cluster3Date".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            System.out.println(clusterPath);
            
            ZkWatcher zkWatcher = new ZkWatcher(zk);
            List<String> clusterList = zk.getChildren("/root", zkWatcher);
            
            System.out.println("****************");
            for(String str : clusterList) {
                System.out.println("cluster:" + str);
            }
            System.out.println("****************");
            
            while(true) {
                
            }
        }
    
    }
    import java.util.List;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.Watcher.Event.EventType;
    import org.apache.zookeeper.ZooKeeper;
    
    public class ZkWatcher implements Watcher{
    
        private ZooKeeper zk;
        
        public ZkWatcher(ZooKeeper zk) {
            this.zk = zk;
        }
        
        @Override
        public void process(WatchedEvent event) {
            if(EventType.NodeChildrenChanged.equals(event.getType())) {
                List<String> clusterList = null;
                try {
                    clusterList = zk.getChildren("/root", this);
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.println("****************");
                System.out.println("changed");
                for(String str : clusterList) {
                    System.out.println("cluster:" + str);
                }
                System.out.println("****************");
            }
        }
    
    }
  • 相关阅读:
    在TreeView控件节点中显示图片
    PAT 甲级 1146 Topological Order (25 分)
    PAT 甲级 1146 Topological Order (25 分)
    PAT 甲级 1145 Hashing
    PAT 甲级 1145 Hashing
    PAT 甲级 1144 The Missing Number (20 分)
    PAT 甲级 1144 The Missing Number (20 分)
    PAT 甲级 1151 LCA in a Binary Tree (30 分)
    PAT 甲级 1151 LCA in a Binary Tree (30 分)
    PAT 甲级 1149 Dangerous Goods Packaging
  • 原文地址:https://www.cnblogs.com/myseries/p/11294373.html
Copyright © 2011-2022 走看看