zoukankan      html  css  js  c++  java
  • Zookeeper应用场景

    • 数据发布与订阅:发布订阅模型,就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现数据的集中管理和动态更新。
      • 配置中心:在应用中,将全局的配置信息放到ZK上集中管理,在应用启动的时候主动获取一次配置。同时,在节点上注册一个watcher,保证每次数据更新时会通知订阅者更新配置信息。
      • 元数据维护:在分布式搜索服务中,索引的元信息和服务器集群机器的节点状态保存在ZK的指定节点上,供各个客户端使用。
      • 分布式日志收集系统:这个系统的核心工作是收集不同服务器的日志。收集器通常是按照应用来分配任务单元,因此在ZK上用应用名创建一个节点,将需要收集的服务器的IP注册到这个节点的子结点上。
    • 命名服务:客户端应用能够根据指定名字来获取资源或服务的地址、提供者等信息。
    • 分布式通知/协调:ZK中特有的watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是:不同系统对ZK上的同一个节点进行注册,监听节点的变化。任意一个系统对节点进行了更新,其它系统都能接到通知,并进行相应处理。
      • 任务汇报:类似于任务分发系统。子任务启动后,在ZK上注册一个临时节点,并定时将自己的进度写入这个节点,这样任务管理者可以实时查看任务进度。
      • 服务器列表维护:能都动态监听服务器的上下线
        • 服务端代码
    package zookeeper;
    
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    
    public class Server {
    
        private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182";
        private static final int TIME_OUT = 15000;
    
        private ZooKeeper zooKeeper;
    
        public Server() throws IOException {
            this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> {
    
            });
        }
    
        /**
         * 创建临时序列节点(服务器关闭时节点会自动删除)
         * @param serverName
         * @return
         * @throws Exception
         */
        public String regsterServer(String serverName) throws Exception {
            String result = zooKeeper.create("/clusterServer/server", serverName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(serverName+" server start....");
            return result;
        }
    
        public static void main(String[] args) throws Exception {
            Server server = new Server();
            server.regsterServer(args[0]);
            //保持程序运行,防止程序停止运行导致节点自动删除
            while (true) {
    
            }
        }
    }
    View Code
        • 客户端代码
    package zookeeper;
    
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
    * 获取并监听注册的服务器列表
    */
    public class Client {
    
        private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182";
        private static final int TIME_OUT = 15000;
    
        private ZooKeeper zooKeeper;
    
        public Client() throws IOException {
            this.zooKeeper = new ZooKeeper(CONNECT_STRING, TIME_OUT, watchedEvent -> {
                System.out.println("watcher works.");
                try {
                    getServers();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
    
        public void getServers() throws Exception {
            List<String> servers = new ArrayList<>();
            List<String> children = zooKeeper.getChildren("/clusterServer", true, null);
            for (String child : children) {
                String server = getData("/clusterServer/"+child);
                servers.add(server);
            }
            System.out.println(servers);
        }
    
        public String getData(String path) throws Exception {
            byte[] data = zooKeeper.getData(path, true, null);
            return new String(data);
        }
    
        public static void main(String[] args) throws Exception {
            Client client = new Client();
            client.getServers();
            while (true) {
    
            }
        }
    }
    View Code
      • 分布式锁:
        • 保持独占:通常的做法是把ZK的节点看作一把锁,通过create node的方式来实现。
                                  具体的实现方式是:
          • 方法一:所有的客户端都去创建/lock节点,最终创建成功的持有这把锁。(同一个节点只能创建一次,再次创建会返回失败信息)
    /**
    * 通过创建临时节点,实现服务器之间的独占锁
    */
    @Test
    public void singleLock() {
        try {
            //参数:1,节点路径; 2,要存储的数据; 3,节点的权限; 4,节点的类型
            String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
            //创建成功,则相当于拥有独占锁,可以进行以下逻辑
            //TODO 业务逻辑
            System.out.println(nodePath);
            //业务逻辑结束后,删除节点,即释放锁资源
            zooKeeper.delete("/lock", -1);
        } catch (Exception e) {
            //创建节点失败,重新调用,直至创建成功
            if (e instanceof KeeperException && "NODEEXISTS".equals(((KeeperException)e).code().name())) {
                System.out.println("Node exists.");
                singleLock();
            }else {
                e.printStackTrace();
            }
        }
    }
    View Code
          • 方法二:判断/lock节点是否存在,如果存在,说明其它服务器已经持有锁资源;如果不存在,则锁资源处于空闲状态,创建节点占有锁资源。
    /**
    * 通过创建临时节点,实现服务器之间的独占锁
    */
    @Test
    public void singleLock2() throws KeeperException, InterruptedException {
        Stat stat = zooKeeper.exists("/lock", false);
        //如果节点已经存在,等待其它服务器删除节点。即:等待其它服务器释放锁资源
        while(stat != null) {  }
    
    
        //参数:1,节点路径; 2,要存储的数据; 3,节点的权限; 4,节点的类型
        String nodePath = zooKeeper.create("/lock", "This is Lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        //创建成功,则相当于拥有独占锁,可以进行以下逻辑
        //TODO 业务逻辑
        System.out.println(nodePath);
        //业务逻辑结束后,删除节点,即释放锁资源
        zooKeeper.delete("/lock", -1);
    }
    View Code
      • 控制时序:/lock节点已存在,客户端在它下面创建临时有序节点/lock/{sessionId}-1 , /lock/{sessionId}-2 , /lock/{sessionId}-3 …..(通过节点属性控制 CreateMode.EPHEMERAL_SEQUENTIAL控制),保证子节点创建的有序性,从而保证的客户端的时序性。
        • 方式一:获取锁资源时,程序处于block状态,直到获取锁资源。
    /**
    * 通过创建临时时序节点,实现服务器之间的时序锁
    */
    @Test
    public void lock() throws KeeperException, InterruptedException {
        //创建临时时序节点
        String nodePath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        
        while(true) {
            List<String> children = zooKeeper.getChildren("/lock", false);
            //排序,并获取序号最小的节点。(序号越小,表明请求时间越早,优先获取锁资源)
            children.sort(String::compareTo);
            if (nodePath.equals("/lock/"+children.get(0))){
                //TODO 业务逻辑
                System.out.println("TODO  Logic.");
                break;
            }
        }
    
        //业务逻辑结束后,删除节点,即释放锁资源
        zooKeeper.delete(nodePath, -1);
    }
    View Code
        • 方式二:通过watcher监听服务器列表变化,判断当前服务器是否获取锁资源,程序不会block
    package zookeeper;
    
    
    import org.apache.zookeeper.*;
    
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.List;
    
    
    /**
    *
    * 通过Zookeeper实现服务器之间的时序锁
    *
    */
    public class SeqLock {
    
        private static final String CONNECT_STRING = "172.17.23.79:2181,172.17.23.79:2182";
    
        private static ZooKeeper zooKeeper;
        private String thispath;
    
        public SeqLock() throws IOException {
            //超时时间单位:毫秒
            zooKeeper = new ZooKeeper(CONNECT_STRING, 15000, event -> {
                //监听“/lock”的子节点变化。如果有服务器释放锁,判断自己是否获取锁
                if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged
                        && event.getPath().startsWith("/lock")) {
                    try {
                        List<String> children = zooKeeper.getChildren("/lock", false);
                        if (children.size() > 0) {
                            Collections.sort(children);
                            String fistNode = "/lock/"+children.get(0);
                            if (fistNode.equals(thispath)){
                                doSomethingAndDelNode();
                            }
                        }
    
    
                    } catch (KeeperException | InterruptedException e) {
                        e.printStackTrace();
                    }
    
    
                }
            });
        }
    
    
        /**
         * 通过创建临时时序节点注册时序锁,并监听服务器列表
         */
        public void lock() throws KeeperException, InterruptedException {
            thispath = zooKeeper.create("/lock/sublock", "This is sub lock.".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(thispath);
            List<String> children = zooKeeper.getChildren("/lock", false);
            //如果只有一个子节点,说明锁资源被当前服务器持有。如果子节点不止一个,说明锁资源已经被其它服务器持有
            if(children.size() == 1) {
                doSomethingAndDelNode();
            }
        }
    
        private void doSomethingAndDelNode() throws InterruptedException, KeeperException {
            //TODO 业务逻辑
            System.out.println("TODO  Logic.");
            //业务逻辑结束后,删除节点,即释放锁资源
            zooKeeper.delete(thispath, -1);
        }
    
        public static void main(String[] args)  {
            try {
                SeqLock lock = new SeqLock();
                lock.lock();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    View Code
    参考文献:
    • ZooKeeper典型应用场景
    链接   :https://pan.baidu.com/s/1lohZ98K33z_Hf_8Y370Nuw
    提取码:kydw
     
    更多内容,请访问:http://www.cnblogs.com/BlueStarWei
  • 相关阅读:
    201871010132-张潇潇-作业四 软件研发团队组建
    201871010132-张潇潇 实验三 结对项目—《D{0-1}KP 实例数据集算法实验平台》项目报告
    201871010132-张潇潇 实验二 个人项目—《西北师范大学学生疫情上报系统》项目报告
    201871010132-张潇潇 实验一软件工程准备-软件工程初识
    张潇潇--学期获奖总结
    201871010132--张潇潇--《面向对象程序设计(java)》课程总结
    201871010132--张潇潇--《面向对象程序设计(java)》第十七周学习总结
    201871010132--张潇潇--《面向对象程序设计(java)》第十六周学习总结
    201871010132--张潇潇--《面向对象程序设计(java)》第十五周学习总结
    201871010132--张潇潇--《面向对象程序设计(java)》第十四周学习总结
  • 原文地址:https://www.cnblogs.com/BlueStarWei/p/15211010.html
Copyright © 2011-2022 走看看