zoukankan      html  css  js  c++  java
  • 一致性Hash算法 java应用

    一致性Hash算法背景

      一致性哈希算法在1997年由麻省理工学院的Karger等人在解决分布式Cache中提出的,设计目标是为了解决因特网中的热点(Hot spot)问题,初衷和CARP十分类似。一致性哈希修正了CARP使用的简单哈希算法带来的问题,使得DHT可以在P2P环境中真正得到应用。

      但现在一致性hash算法在分布式系统中也得到了广泛应用,研究过memcached缓存数据库的人都知道,memcached服务器端本身不提供分布式cache的一致性,而是由客户端来提供,具体在计算一致性hash时采用如下步骤:

    1. 首先求出memcached服务器(节点)的哈希值,并将其配置到0~232的圆(continuum)上。
    2. 然后采用同样的方法求出存储数据的键的哈希值,并映射到相同的圆上。
    3. 然后从数据映射到的位置开始顺时针查找,将数据保存到找到的第一个服务器上。如果超过232仍然找不到服务器,就会保存到第一台memcached服务器上。

      从上图的状态中添加一台memcached服务器。余数分布式算法由于保存键的服务器会发生巨大变化而影响缓存的命中率,但Consistent Hashing中,只有在园(continuum)上增加服务器的地点逆时针方向的第一台服务器上的键会受到影响,如下图所示:

     

    一致性Hash性质

      考虑到分布式系统每个节点都有可能失效,并且新的节点很可能动态的增加进来,如何保证当系统的节点数目发生变化时仍然能够对外提供良好的服务,这是值得考虑的,尤其实在设计分布式缓存系统时,如果某台服务器失效,对于整个系统来说如果不采用合适的算法来保证一致性,那么缓存于系统中的所有数据都可能会失效(即由于系统节点数目变少,客户端在请求某一对象时需要重新计算其hash值(通常与系统中的节点数目有关),由于hash值已经改变,所以很可能找不到保存该对象的服务器节点),因此一致性hash就显得至关重要,良好的分布式cahce系统中的一致性hash算法应该满足以下几个方面:

    • 平衡性(Balance)

    平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件。

    • 单调性(Monotonicity)

    单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲区加入到系统中,那么哈希的结果应能够保证原有已分配的内容可以被映射到新的缓冲区中去,而不会被映射到旧的缓冲集合中的其他缓冲区。简单的哈希算法往往不能满足单调性的要求,如最简单的线性哈希:x = (ax + b) mod (P),在上式中,P表示全部缓冲的大小。不难看出,当缓冲大小发生变化时(从P1到P2),原来所有的哈希结果均会发生变化,从而不满足单调性的要求。哈希结果的变化意味着当缓冲空间发生变化时,所有的映射关系需要在系统内全部更新。而在P2P系统内,缓冲的变化等价于Peer加入或退出系统,这一情况在P2P系统中会频繁发生,因此会带来极大计算和传输负荷。单调性就是要求哈希算法能够应对这种情况。

    • 分散性(Spread)

    在分布式环境中,终端有可能看不到所有的缓冲,而是只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应能够尽量避免不一致的情况发生,也就是尽量降低分散性。

    • 负载(Load)

    负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷。

    • 平滑性(Smoothness)

    平滑性是指缓存服务器的数目平滑改变和缓存对象的平滑改变是一致的。

    原理

    基本概念

      一致性哈希算法(Consistent Hashing)最早在论文《Consistent Hashing and Random Trees: Distributed Caching Protocols for Relieving Hot Spots on the World Wide Web》中被提出。简单来说,一致性哈希将整个哈希值空间组织成一个虚拟的圆环,如假设某哈希函数H的值空间为0-2^32-1(即哈希值是一个32位无符号整形),整个哈希空间环如下:

      整个空间按顺时针方向组织。0和232-1在零点中方向重合。

      下一步将各个服务器使用Hash进行一个哈希,具体可以选择服务器的ip或主机名作为关键字进行哈希,这样每台机器就能确定其在哈希环上的位置,这里假设将上文中四台服务器使用ip地址哈希后在环空间的位置如下:

      

    接下来使用如下算法定位数据访问到相应服务器:将数据key使用相同的函数Hash计算出哈希值,并确定此数据在环上的位置,从此位置沿环顺时针“行走”,第一台遇到的服务器就是其应该定位到的服务器。

      例如我们有Object A、Object B、Object C、Object D四个数据对象,经过哈希计算后,在环空间上的位置如下:

    根据一致性哈希算法,数据A会被定为到Node A上,B被定为到Node B上,C被定为到Node C上,D被定为到Node D上。

    下面分析一致性哈希算法的容错性和可扩展性。现假设Node C不幸宕机,可以看到此时对象A、B、D不会受到影响,只有C对象被重定位到Node D。一般的,在一致性哈希算法中,如果一台服务器不可用,则受影响的数据仅仅是此服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它不会受到影响。

    下面考虑另外一种情况,如果在系统中增加一台服务器Node X,如下图所示:

    此时对象Object A、B、D不受影响,只有对象C需要重定位到新的Node X 。一般的,在一致性哈希算法中,如果增加一台服务器,则受影响的数据仅仅是新服务器到其环空间中前一台服务器(即沿着逆时针方向行走遇到的第一台服务器)之间数据,其它数据也不会受到影响。

    综上所述,一致性哈希算法对于节点的增减都只需重定位环空间中的一小部分数据,具有较好的容错性和可扩展性。

    另外,一致性哈希算法在服务节点太少时,容易因为节点分部不均匀而造成数据倾斜问题。例如系统中只有两台服务器,其环分布如下,

    此时必然造成大量数据集中到Node A上,而只有极少量会定位到Node B上。为了解决这种数据倾斜问题,一致性哈希算法引入了虚拟节点机制,即对每一个服务节点计算多个哈希,每个计算结果位置都放置一个此服务节点,称为虚拟节点。具体做法可以在服务器ip或主机名的后面增加编号来实现。例如上面的情况,可以为每台服务器计算三个虚拟节点,于是可以分别计算 “Node A#1”、“Node A#2”、“Node A#3”、“Node B#1”、“Node B#2”、“Node B#3”的哈希值,于是形成六个虚拟节点:

    同时数据定位算法不变,只是多了一步虚拟节点到实际节点的映射,例如定位到“Node A#1”、“Node A#2”、“Node A#3”三个虚拟节点的数据均定位到Node A上。这样就解决了服务节点少时数据倾斜的问题。在实际应用中,通常将虚拟节点数设置为32甚至更大,因此即使很少的服务节点也能做到相对均匀的数据分布。

    原理来源:https://www.cnblogs.com/williamjie/p/9477852.html

    java实现

    实体

    package com.mybatis.plus.utils.hash.pojo;
    
    import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
    import lombok.Data;
    
    import java.io.Serializable;
    import java.util.TreeMap;
    
    /**
     * 一致性哈希环
     *
     * @author gch
     * @date 2020-07-09 10:18
     */
    @Data
    public class ConsistentHashLoop implements Serializable {
    
        private Integer pointCount;
        private TreeMap<Integer, ConsistentHashNode> nodes;
    
        public ConsistentHashLoop() {
        }
    
        public ConsistentHashLoop(Integer pointCount) {
            this.pointCount = pointCount;
        }
    
    }
    package com.mybatis.plus.utils.hash.pojo;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    import java.io.Serializable;
    
    /**
     * 一致性哈希节点
     *
     * @author gch
     * @date 2020-07-09 10:23
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class ConsistentHashNode implements Serializable {
    
        private Integer point;
        private Object target;
    
    }
    package com.mybatis.plus.utils.hash.pojo;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * 一致性哈希点范围
     *
     * @author gch
     * @date 2020-07-09 22:58
     */
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class PointScope {
    
        /**
         * 起始点(包含)
         */
        private Integer start;
    
        /**
         * 截止点(包含)
         * 当截止点小于起始点时,需包含截止点以下所有点
         */
        private Integer end;
    
        /**
         * 截止点的下一个节点
         */
        private ConsistentHashNode nextNode;
    
    }
    package com.mybatis.plus.utils.hash;
    
    import com.mybatis.plus.utils.hash.pojo.ConsistentHashLoop;
    import com.mybatis.plus.utils.hash.pojo.ConsistentHashNode;
    import com.mybatis.plus.utils.hash.pojo.PointScope;
    import org.omg.CORBA.SystemException;
    import org.springframework.util.DigestUtils;
    
    import java.util.*;
    
    /**
     * 一致性哈希算法
     *
     * @author gch
     * @date 2020-07-09 12:14
     */
    public class ConsistentHash {
    
        private ConsistentHashLoop consistentHashLoop;
    
        private ConsistentHash(ConsistentHashLoop consistentHashLoop) {
            this.consistentHashLoop = consistentHashLoop;
        }
    
        public ConsistentHashLoop getConsistentHashLoop() {
            return consistentHashLoop;
        }
    
        public static ConsistentHash fro(ConsistentHashLoop consistentHashLoop) {
            return new ConsistentHash(consistentHashLoop).init();
        }
    
        private ConsistentHash init() {
            if (null == this.consistentHashLoop) {
                throw new RuntimeException("loop can't be null");
            }
    //        Assert.nonNull(this.consistentHashLoop).orThrows(() -> new SystemException("loop can't be null"));
            TreeMap<Integer, ConsistentHashNode> nodes = this.consistentHashLoop.getNodes();
            if (nodes == null) {
                this.consistentHashLoop.setNodes(new TreeMap<>());
            }
            return this;
        }
    
        /**
         * 获取精准节点
         *
         * @param str 字符串
         * @return 节点信息
         */
        public ConsistentHashNode getAccurateNode(String str) {
            int point = this.getPoint(str);
            ConsistentHashNode node = this.getNode(point);
            return node != null && node.getPoint() == point ? node : null;
        }
    
        /**
         * 查找节点,顺时针方向
         *
         * @param str 字符串
         * @return 节点信息
         */
        public ConsistentHashNode getNode(String str) {
            int point = this.getPoint(str);
            return this.getNode(point);
        }
    
        /**
         * 查找节点,顺时针方向
         *
         * @param point 点
         * @return 节点信息
         */
        public ConsistentHashNode getNode(Integer point) {
            TreeMap<Integer, ConsistentHashNode> nodes = this.consistentHashLoop.getNodes();
            if (nodes.size() == 0) {
                return null;
            }
            Map.Entry<Integer, ConsistentHashNode> entry = nodes.ceilingEntry(point);
            return entry != null ? entry.getValue() : nodes.firstEntry().getValue();
        }
    
        /**
         * 获取点
         *
         * @param str 字符串
         * @return 点
         */
        public int getPoint(String str) {
            int hashCode = DigestUtils.md5DigestAsHex(str.getBytes()).hashCode();
            return Math.abs(hashCode % this.consistentHashLoop.getPointCount());
        }
    
        /**
         * 新增节点
         *
         * @param node 节点信息
         * @return 影响点范围
         */
        public PointScope putNode(ConsistentHashNode node) {
            PointScope[] pointScopes = putNode(node, 0);
            return pointScopes != null && pointScopes.length > 0 ? pointScopes[0] : null;
        }
    
        /**
         * 新增节点
         *
         * @param node         节点信息
         * @param virtualCount 虚拟节点数量
         * @return 影响点范围
         */
        public PointScope[] putNode(ConsistentHashNode node, Integer virtualCount) {
            TreeMap<Integer, ConsistentHashNode> nodes = this.consistentHashLoop.getNodes();
            if (node.getPoint() == null) {
                throw new RuntimeException("node point can't be null");
            }
    
            List<ConsistentHashNode> list = new ArrayList<>();
            nodes.put(node.getPoint(), node);
            list.add(node);
    
            for (int i = 0; i < virtualCount; i++) {
                int point = getPoint(node.getPoint() + "#V_" + i);
                ConsistentHashNode virtualNode = new ConsistentHashNode(point, node.getTarget());
                nodes.put(point, virtualNode);
                list.add(virtualNode);
            }
    
            // 如果新增是否为第一批节点,则不计算影响范围
            if (nodes.size() == list.size()) {
                return new PointScope[0];
            }
    
            // 按照从小到大排序
    
            list.sort(Comparator.comparingInt(ConsistentHashNode::getPoint));
    
            // 计算出所有新增节点影响到的范围
    
            Map<ConsistentHashNode, PointScope> pointScopeMap = new HashMap<>(list.size());
    
            for (ConsistentHashNode endNode : list) {
                PointScope pointScope = getPointScope(endNode);
                if (pointScope != null) {
                    pointScopeMap.put(endNode, pointScope);
                }
            }
    
            return pointScopeMap.values().toArray(new PointScope[0]);
        }
    
        /**
         * 删除节点
         *
         * @param point 节点所在点
         * @return 影响点范围
         */
        public PointScope[] removeNode(int point) {
            TreeMap<Integer, ConsistentHashNode> nodes = this.consistentHashLoop.getNodes();
            ConsistentHashNode node = nodes.get(point);
            if (node == null) {
                return null;
            }
    
            // 查出所有相同目标的节点并删除
    
            List<ConsistentHashNode> list = new ArrayList<>();
            this.consistentHashLoop.getNodes().forEach((key, value) -> {
                if (Objects.equals(value.getTarget(), node.getTarget())) {
                    list.add(value);
                }
            });
            list.stream().map(ConsistentHashNode::getPoint).forEach(nodes::remove);
    
            // 按照从小到大排序
    
            list.sort(Comparator.comparingInt(ConsistentHashNode::getPoint));
    
            // 计算出所有删除节点影响到的范围
    
            Map<ConsistentHashNode, PointScope> pointScopeMap = new HashMap<>(list.size());
            Map<ConsistentHashNode, List<ConsistentHashNode>> nextNodeMap = new HashMap<>(list.size());
            for (ConsistentHashNode endNode : list) {
                PointScope pointScope = getPointScope(endNode);
                if (pointScope != null) {
                    pointScopeMap.put(endNode, pointScope);
                    if (pointScope.getNextNode() != null) {
                        nextNodeMap.computeIfAbsent(pointScope.getNextNode(), (obj) -> new ArrayList<>()).add(endNode);
                    }
                }
            }
    
            // 如果删除的节点只有一个,则直接返回
    
            if (pointScopeMap.size() <= 1) {
                return pointScopeMap.values().toArray(new PointScope[0]);
            }
    
            // 去掉重叠的影响范围
    
            List<PointScope> result = new ArrayList<>(nextNodeMap.size());
    
            for (Map.Entry<ConsistentHashNode, List<ConsistentHashNode>> entry : nextNodeMap.entrySet()) {
                ConsistentHashNode nextNode = entry.getKey();
                List<ConsistentHashNode> nodeList = entry.getValue();
                if (nodeList.size() == 1) {
                    result.add(pointScopeMap.get(nodeList.get(0)));
                    continue;
                }
                ConsistentHashNode shortestNode = geLongestDistance(nodeList, nextNode.getPoint());
                PointScope pointScope = pointScopeMap.get(shortestNode);
                pointScope.setEnd(nextNode.getPoint() - 1);
                result.add(pointScope);
            }
    
            return result.toArray(new PointScope[0]);
        }
    
        /**
         * 获取影响点范围
         *
         * @param endNode 截止节点
         * @return 影响点范围
         */
        public PointScope getPointScope(ConsistentHashNode endNode) {
            TreeMap<Integer, ConsistentHashNode> nodes = this.consistentHashLoop.getNodes();
            if (nodes.size() == 0) {
                return null;
            }
            Map.Entry<Integer, ConsistentHashNode> entry = nodes.lowerEntry(endNode.getPoint());
            ConsistentHashNode startNode = entry != null ? entry.getValue() : nodes.lastEntry().getValue();
            return getPointScope(startNode, endNode);
        }
    
        /**
         * 获取影响点范围
         *
         * @param startNode 起始节点
         * @param endNode   截止节点
         * @return 影响点范围
         */
        public PointScope getPointScope(ConsistentHashNode startNode, ConsistentHashNode endNode) {
            if (startNode == endNode || startNode.getPoint().equals(endNode.getPoint())) {
                return null;
            }
            // 查询影响范围的下一个节点
            ConsistentHashNode nextNode = null;
            TreeMap<Integer, ConsistentHashNode> nodes = this.consistentHashLoop.getNodes();
            if (nodes.size() > 0) {
                Map.Entry<Integer, ConsistentHashNode> entry = nodes.ceilingEntry(endNode.getPoint());
                if (entry == null) {
                    entry = nodes.firstEntry();
                }
                nextNode = entry.getValue();
            }
            // 防止点数超过最大点总数
            Integer pointCount = this.getConsistentHashLoop().getPointCount();
            int startPoint = startNode.getPoint().equals(pointCount) ? pointCount : startNode.getPoint() + 1;
            return new PointScope(startPoint, endNode.getPoint(), nextNode);
        }
    
        /**
         * 获取点到目标点的距离
         * 顺时针方向
         *
         * @param point  点
         * @param target 目标点
         * @return 最近距离点
         */
        private int getTargetDistance(int point, int target) {
            if (point <= target) {
                return target - point;
            }
            return this.consistentHashLoop.getPointCount() - target + point;
        }
    
        /**
         * 获取最距离目标点最长的节点
         * 顺时针方向
         *
         * @param points 点数组
         * @param target 目标点
         * @return 节点
         */
        private ConsistentHashNode geLongestDistance(List<ConsistentHashNode> points, int target) {
            ConsistentHashNode result = null;
            int longest = -1;
            for (ConsistentHashNode point : points) {
                int distance = getTargetDistance(point.getPoint(), target);
                if (result == null || distance > longest) {
                    result = point;
                    longest = distance;
                }
            }
            return result;
        }
    
    }

    测试main方法

        public static void main(String[] args) {
            // 创建哈希环
            ConsistentHashLoop hashLoop = new ConsistentHashLoop(10000);
            ConsistentHash consistentHash = ConsistentHash.fro(hashLoop);
    
            Console.log("=============================================初始2个节点 5个对象 分配结果:");
            // 新增1个物理节点和3个对应的虚拟节点:服务器A
            String server1 = "服务器节点A";
            consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(server1),server1),150);
    
            // 新增1个物理节点和3个对应的虚拟节点:服务器B
            String server2 = "服务器节点B";
            consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(server2),server2),150);
    
            // 模拟请求分配节点
            approveNode(consistentHash);
            Console.log("=============================================新增一个节点C 分配结果:");
            // 删除节点
    //        PointScope[] pointScopes = consistentHash.removeNode(consistentHash.getPoint(server1));
    //        System.out.println("删除节点影响范围:" + Arrays.toString(pointScopes));
            String server3 = "服务器节点C";
            consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(server3),server3),150);
            // 模拟请求分配节点
            approveNode(consistentHash);
            Console.log("=============================================新增一个节点D 分配结果:");
            String server4 = "服务器节点D";
            consistentHash.putNode(new ConsistentHashNode(consistentHash.getPoint(server4),server4),150);
            // 模拟请求分配节点
            approveNode(consistentHash);
            Console.log("=============================================删除一个节点B 分配结果:");
            consistentHash.removeNode(consistentHash.getPoint(server2));
            approveNode(consistentHash);
            int i = 1;
        }
    
        private static void approveNode(ConsistentHash consistentHash) {
            String request1 = "127.0.0.1";
            System.out.println("请求:" + request1 + ",分配节点:" + consistentHash.getNode(request1).getTarget()+ ", 节点node: "+consistentHash.getNode(request1).getPoint());
            String request2 = "192.168.0.1";
            System.out.println("请求:" + request2 + ",分配节点:" + consistentHash.getNode(request2).getTarget()+ ", 节点node: "+consistentHash.getNode(request2).getPoint());
    
            String request3 = "192.168.0.5";
            System.out.println("请求:" + request3 + ",分配节点:" + consistentHash.getNode(request3).getTarget()+ ", 节点node: "+consistentHash.getNode(request3).getPoint());
    
            String request4 = "192.168.0.55";
            System.out.println("请求:" + request4 + ",分配节点:" + consistentHash.getNode(request4).getTarget()+ ", 节点node: "+consistentHash.getNode(request4).getPoint());
    
            String request5 = "192.168.0.88";
            System.out.println("请求:" + request5 + ",分配节点:" + consistentHash.getNode(request5).getTarget()+ ", 节点node: "+consistentHash.getNode(request5).getPoint());
        }

    结果:

  • 相关阅读:
    复杂JSON字符串转换为Java嵌套对象的方法
    好代码是如何炼成的
    让数据流转换代码更加健壮流畅:List的Stream包装
    由一个重构示例引发的对可扩展性的思考
    如何高效搜索信息
    个人安全防护简明指南
    YAML配置解析
    事件处理业务的简易组件编排框架
    lambda表达式滥用之殇:解耦三层嵌套lambda表达式
    碎碎念四六
  • 原文地址:https://www.cnblogs.com/guanxiaohe/p/15638452.html
Copyright © 2011-2022 走看看