zoukankan      html  css  js  c++  java
  • 分布式之zk的应用场景

    分布式应用系统中,经常会用到zk,比如dubbo注册中心,kafka分布式集群等都用到zk这一工具。除了这些用来做分布式集群外,zk还有那西应用场景事我们可以使用到该工具的呢?所以接下来就是我们要了解的重点了。

    首先在使用zk的各种应用之前,我们需要了解zk 的相关功能模块,这样才能让我们更清晰的了解为什么可以这么去使用:

    zookeeper以目录树的形式管理数据,提供znode监听、数据设置等接口,基于这些接口,我们可以实现Leader选举、配置管理、命名服务等功能,ZK提供了以下API,供client操作znode和znode中存储的数据:

    • create(path, data, flags):创建路径为path的znode,在其中存储data[]数据,flags可设置为Regular或Ephemeral,并可选打上sequential标志。
    • delete(path, version):删除相应path/version的znode
    • exists(path,watch):如果存在path对应znode,则返回true;否则返回false,watch标志可设置监听事件
    • getData(path, watch):返回对应znode的数据和元信息(如version等)
    • setData(path, data, version):将data[]数据写入对应path/version的znode
    • getChildren(path, watch):返回指定znode的子节点集合

    1.统一服务器名称

    命名服务器事一个比较常用的应用场景,客户端通过制定名字来获取服务器资源获或提供者信息等,被命名的可以服务器地址,远程对象。通过zk提供的创建节点的api,很容易创建一个全局唯一的path,这个path就可以做一个名称,

    dubbo使用zk就是用来做服务器名称。维护全局的服务地址列表。

    服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。

    服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。

    注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。 另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息

    2.统一配置管理

    zk客户端api提供了操作znode数据的功能。再分布式环境中我们可以配置文件存放在znode上,不同的服务需要使用到哪些配置的时候可以直接从znode上去获取。而且通过zk 的心跳极值,我们的配置文件是可以做到动态配置的。一般的配置中心的做法是在系统启动之后加载我们的内存当中,一但配置文件需要做响应的调整的时候,需要重启服务进行load配置操作,但是很多的场景事我们只需要更改一点点的内容就去重启服务,代价不可谓不大。但zk就可以避免这问题的发生,当配置文件发生改变的时候,watch为通知到我们的服务对其修改操作。

    3.分布式通知/协调

    ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统update了znode,那么另一个系统能够收到通知,并作出相应处理

    1. 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。

    2. 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。

    3. 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

    总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合

    4.共享锁

    分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性。锁服务可以分为两类,一个是 保持独占,另一个是 控制时序。

    1. 所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把zk上的一个znode看作是一把锁,通过create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。

    2. 控制时序,就是所有视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已经预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL来指定)。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

    5.队列管理

    队列方面,简单地讲有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。 第二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。

    6.master选举

    在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。

    利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

    另外,这种场景演化一下,就是动态Master选举。这就要用到EPHEMERAL_SEQUENTIAL类型节点的特性了。

    上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,/currentMaster/{sessionId}-2,/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。

    1. 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成,然后同步到集群中其它机器。另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master。

    2. 在Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选举出一个HMaster来运行,从而避免了HMaster的单点问题

    附上zk负载均衡响应的实现代码

    1.权重轮询模式

    package com.samp.zk.balance;
    
    import java.math.BigInteger;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import com.alibaba.fastjson.JSONObject;
    
    public class RoundRobin {
    
        private int currentIndex = -1;// 当前位置
        private int currentWeight = 0 ;//当前权重
        private int maxGcd = 0 ;    //最大权重数
        private int maxWeight = 0;// 最大公约数
        private int servetCount = 0 ;// 总服务器数量
        
        private List<Server> serverLst; // 服务器列表 
        
        
        private int gcd (int a , int b){
            BigInteger b1 = new BigInteger(String.valueOf(a));
            BigInteger b2 = new BigInteger(String.valueOf(b));
            BigInteger result = b1.gcd(b2);
            return result.intValue();
        }
        
        private int getMaxCurrentGcd (List<Server> serverList){
            int result = 0 ;
            for(int i = 0,len = serverLst.size();i< len -1 ;i++){
                if(result == 0){
                    result = gcd(serverLst.get(i).weight, serverLst.get(i+1).weight);
                }else{
                    result = gcd (result,serverLst.get(i+1).weight);
                }
            }
            return result;
        }
        
        private int getMaxCurrentWeight(List<Server> serverList){
            int result = 0 ;
            for(int i = 0,len = serverLst.size();i< len -1 ;i++){
                if( result ==0 ){
                    result = Math.max(serverLst.get(i).weight, serverLst.get(i+1).weight);
                }else{
                    result= Math.max(result, serverLst.get(i).weight);
                }
            }
            return result ;
        }
        
        public static void main(String[] args){
            RoundRobin obj =new RoundRobin();
            obj.init();
            Map<String,Integer> map = new HashMap<String,Integer>();
            for(int i=0;i<100;i++){
                
                Server ser = obj.getServer();
                String ip = ser.getIp();
                if(map.containsKey(ip)){
                    map.put(ip, map.get(ip)+1);
                }else{
                    map.put(ip, 1);
                }
            }
            
            for (Entry<String, Integer> m: map.entrySet()) {
                System.out.println("服务器 " + m.getKey() + " 请求次数: " + m.getValue());
            }
            
        }
        
        /**
         * 
         * @Title: getServer 
         * @Description: 服务获取方式:
         *     1.初始化开始位置为-1,权重是0,
         *     2.第1次轮询获取服务器未当前服务器权重最高的
         *  3.第2次轮询权重递减1,获取有大于等于该权限的服务器
         *  4.重复第3步,直到权重为最小值0时,从第1步开始从新轮询
         * @return 参数说明
         * @return Server    返回类型
         */
        public Server getServer(){
            while (true){
                currentIndex = (currentIndex + 1) % servetCount;
                if(currentIndex ==0 ){
                    currentWeight = currentWeight - maxGcd; 
                    if(currentWeight <= 0 ){
                        currentWeight = maxWeight ;
                        if(currentWeight == 0 )
                            return null;
                    }
                }
                if(serverLst.get(currentIndex).weight >= currentWeight ){
                    return serverLst.get(currentIndex);
                }
            }
        
        }
        
        public Server getServerBy(){
            
            return null;
        }
        
        public Server getServerFromDubbo(){
            AtomicInteger sequence = new AtomicInteger(1);
            int maxWeight = 0; // 最大权重
            int minWeight = Integer.MAX_VALUE; // 最小权重
            int weightSum = 0;
            Map<String,Integer> map = new HashMap<String,Integer>();
            for (int i = 0; i < serverLst.size(); i++) {
                int weight = serverLst.get(i).weight;
                String ip = serverLst.get(i).getIp();
                maxWeight = Math.max(maxWeight, weight); // 累计最大权重
                minWeight = Math.min(minWeight, weight); // 累计最小权重
                if (weight > 0) {
                    map.put(ip, weight);
                    weightSum += weight;
                }
            }
            System.out.println("============"+JSONObject.toJSON(map)+"======"+weightSum);
            int currentSequence = sequence.getAndIncrement();
            if (maxWeight > 0 && minWeight < maxWeight) { // 权重不一样
                int mod = currentSequence % weightSum;
                for (int i = 0; i < maxWeight; i++) {
                    for (Map.Entry<String,Integer> each : map.entrySet()) {
                        final String ip = each.getKey();
                        final Integer v = each.getValue();
                        if (mod == 0 && v > 0) {
                            return new Server(ip, v);
                        }
                        if (v > 0) {
                            mod--;
                        }
                    }
                }
            }
            return null;
        }
        
        public void getServer2(){
            List<String> lst = new ArrayList<String>();
            for (int i = 0; i < maxWeight; i++) {
                for (Server ser :serverLst) {
                    final String ip = ser.getIp();
                    int num = ser.getWeight();
                    if((num -1 )>0){
                        continue;
                    }
                    lst.add(ip);
                }
            }
        }
        
        private List<Server> getNewList(List<Server> sers){
            List<Server> l = new ArrayList<Server>();
            for(Server ser:sers){
                String ip = ser.getIp();
                int weight = ser.getWeight()-1;
                Server s = new Server(ip, weight);
                l.add(s);
            }
            return l;
        }
        
        public void init2(){
            Server s11 = new Server("127.0.0.1", 1);
            Server s12 = new Server("127.0.0.1", 1);
            Server s13 = new Server("127.0.0.1", 1);
            Server s21 = new Server("127.0.0.2", 1);
            Server s31 = new Server("127.0.0.3", 1);
            Server s32 = new Server("127.0.0.3", 1);
            Server s41 = new Server("127.0.0.4", 1);
            Server s42 = new Server("127.0.0.4", 1);
            Server s51 = new Server("127.0.0.5", 1);
            Server s52 = new Server("127.0.0.5", 1);
            Server s53 = new Server("127.0.0.5", 1);
            Server s54 = new Server("127.0.0.5", 1);
            
            serverLst = new ArrayList<Server>();
            serverLst.add(s11);
            serverLst.add(s12);
            serverLst.add(s13);
            serverLst.add(s21);
            serverLst.add(s31);
            serverLst.add(s32);
            
            serverLst.add(s41);
            serverLst.add(s42);
            serverLst.add(s53);
            serverLst.add(s54);
            serverLst.add(s51);
            serverLst.add(s52);
            
            maxWeight = getMaxCurrentWeight(serverLst);
            
        }
        
        public void init(){
            Server s1 = new Server("127.0.0.1", 3);
            Server s2 = new Server("127.0.0.2", 1);
            Server s3 = new Server("127.0.0.3", 2);
            Server s4 = new Server("127.0.0.4", 2);
            Server s5 = new Server("127.0.0.5", 4);
            
            serverLst = new ArrayList<Server>();
            serverLst.add(s1);
            serverLst.add(s2);
            serverLst.add(s3);
            serverLst.add(s4);
            serverLst.add(s5);
            
            maxGcd = getMaxCurrentGcd(serverLst);
            maxWeight = getMaxCurrentWeight(serverLst);
            currentIndex = -1 ;
            currentWeight = 0 ;
            servetCount = serverLst.size();
        }
        
        public int getCurrentIndex() {
            return currentIndex;
        }
        public void setCurrentIndex(int currentIndex) {
            this.currentIndex = currentIndex;
        }
        public int getCurrentWeight() {
            return currentWeight;
        }
        public void setCurrentWeight(int currentWeight) {
            this.currentWeight = currentWeight;
        }
        public int getMaxGcd() {
            return maxGcd;
        }
        public void setMaxGcd(int maxGcd) {
            this.maxGcd = maxGcd;
        }
        public int getMaxWeight() {
            return maxWeight;
        }
        public void setMaxWeight(int maxWeight) {
            this.maxWeight = maxWeight;
        }
        public int getServetCount() {
            return servetCount;
        }
        public void setServetCount(int servetCount) {
            this.servetCount = servetCount;
        }
    
    
        class Server {
            private String ip;
            private int weight;
            
            public Server(String ip, int weight) {
                super();
                this.ip = ip;
                this.weight = weight;
            }
            public String getIp() {
                return ip;
            }
            public void setIp(String ip) {
                this.ip = ip;
            }
            public int getWeight() {
                return weight;
            }
            public void setWeight(int weight) {
                this.weight = weight;
            }
            
        }
    
    }
    View Code

    2.随机分配

    package com.samp.zk.balance;
    
    import java.util.List;
    import java.util.Random;
    
    import org.I0Itec.zkclient.ZkClient;
    
    /** 
     * @ClassName RandomLoadBalance  
     * @Description 随机方式实现负载均衡 
     * @author hezc 
     * @date 2017年2月14日  
     *   
     */
    public class RandomLoadBalance implements LoadBalance {  
      
        @Override  
        public String select(String zkServer) {  
            ZkClient zkClient = new ZkClient(zkServer);  
            List<String> serverList = zkClient.getChildren(Constant.root);  
            zkClient.close();  
            Random r=new Random();  
            if(serverList.size()>=1){  
                String server=serverList.get(r.nextInt(serverList.size()));  
                return server;  
            }else{  
                return null;  
            }  
        }  
    
    }

    3.一致hash

    package com.samp.zk.balance;
    
    import java.io.UnsupportedEncodingException;
    import java.security.MessageDigest;
    import java.security.NoSuchAlgorithmException;
    import java.util.List;
    
    import org.I0Itec.zkclient.ZkClient;
    
    /** 
     * @ClassName ConsistentHashLoadBalance  
     * @Description 一致hash实现zk负载均衡 
     * @author hezc 
     * @date 2017年2月13日  
     *   
     */
    public class ConsistentHashLoadBalance implements LoadBalance {  
        private String client;  
          
        public void SetClient(String client){  
            this.client=client;  
        }  
          
        @Override  
        public String select(String zkServer) {  
            ZkClient zkClient = new ZkClient(zkServer);  
            List<String> serverList = zkClient.getChildren(Constant.root);  
            ConsistentHashSelector selector=new ConsistentHashSelector(client,serverList);  
            return selector.select();  
              
        }  
          
         private static final class ConsistentHashSelector {  
                public ConsistentHashSelector(String client,List<String> appServer){  
                    this.client=client;  
                    this.appServer=appServer;  
                }  
               
                private String client;  
                private List<String> appServer;  
                  
                public String select() {  
                    String key =client ;  
                    byte[] digest = md5(key);  
                    String server =appServer.get((int) hash(digest, 0));  
                    return server;  
                }  
      
                private long hash(byte[] digest, int number) {  
                    return (((long) (digest[3 + number * 4] & 0xFF) << 24)  
                            | ((long) (digest[2 + number * 4] & 0xFF) << 16)  
                            | ((long) (digest[1 + number * 4] & 0xFF) << 8)   
                            | (digest[0 + number * 4] & 0xFF))   
                            & 0xFFFFFFFFL;  
                }  
      
                private byte[] md5(String value) {  
                    MessageDigest md5;  
                    try {  
                        md5 = MessageDigest.getInstance("MD5");  
                    } catch (NoSuchAlgorithmException e) {  
                        throw new IllegalStateException(e.getMessage(), e);  
                    }  
                    md5.reset();  
                    byte[] bytes = null;  
                    try {  
                        bytes = value.getBytes("UTF-8");  
                    } catch (UnsupportedEncodingException e) {  
                        throw new IllegalStateException(e.getMessage(), e);  
                    }  
                    md5.update(bytes);  
                    return md5.digest();  
                }  
      
            }  
    
        
    }
    View Code

    4.最小活动优先

    package com.samp.zk.balance;
    
    import java.util.List;
    
    import org.I0Itec.zkclient.ZkClient;
    
    /** 
     * @ClassName LeastActiveLoadBalance  
     * @Description TODO 
     * @author hezc 
     * @date 2017年2月14日  
     *   
     */
    public class LeastActiveLoadBalance implements LoadBalance {  
      
        @Override  
        public String select(String zkServer) {  
            ZkClient zkClient = new ZkClient(zkServer);  
            List<String> serverList = zkClient.getChildren(Constant.root);  
      
            String tempServer = null;  
            int tempConn = -1;  
            for (int i = 0; i < serverList.size(); i++) {  
                String server = serverList.get(i);  
                if (zkClient.readData(Constant.root + "/" + server) != null) {  
                    int connNum = zkClient.readData(Constant.root + "/" + server);  
                    if (tempConn == -1) {  
                        tempServer = server;  
                        tempConn = connNum;  
                    }  
                    if (connNum < tempConn) {  
                        tempServer = server;  
                        tempConn = connNum;  
                    }  
                }else{  
                    zkClient.close();  
                    return server;  
                }  
            }  
            zkClient.close();  
            if (tempServer != null && !tempServer.equals("")) {  
                return tempServer;  
            }  
      
            return null;  
        }  
    
    }
    View Code
  • 相关阅读:
    Java实现 蓝桥杯VIP 算法训练 数的统计
    Java实现 蓝桥杯VIP 算法训练 和为T
    Java实现 蓝桥杯VIP 算法训练 友好数
    Java实现 蓝桥杯VIP 算法训练 连续正整数的和
    Java实现 蓝桥杯VIP 算法训练 寂寞的数
    Java实现 蓝桥杯VIP 算法训练 学做菜
    Java实现 蓝桥杯VIP 算法训练 暗恋
    Java实现 蓝桥杯VIP 算法训练 暗恋
    测试鼠标是否在窗口内,以及测试鼠标是否在窗口停留
    RichEdit 各个版本介绍
  • 原文地址:https://www.cnblogs.com/huane/p/6399502.html
Copyright © 2011-2022 走看看