一致性 Hash 算法 Java 实现

一致性 Hash 算法 Java 实现(虚拟节点 + 实体节点)及性能对比

缓存集群

假设现在我们需要使用 Redis 存储图片资源,存储的格式为键值对,key 为图片名称,value 为该图片所在文件服务器的路径,我们需要根据文件名查询该文件所在文件服务器上的路径,假设数据有 100W,部署 4 台缓存服务器,每台服务器大约存储 25W 数据,如果按照随机分配的规则,某一条数据可能会存储在任何一台缓存服务器上,这样,假设查找 beijing.jpg 图片,则最坏需要 4 次查询才能找到,效率很低。

余数 Hash

如果在取数据时能够提前知道某条数据存储在哪个服务器上,则仅需要一次查询就能找到,效率提高很多。这种方案可以通过余数 Hash 来实现,在存储数据时,首先得到数据 key 值的 HashCode,然后对缓存服务器数量取余,得到的余数就是要存储的服务器编号。由于 HashCode 是具有随机性的,因此余数 Hash 算法可以保证缓存数据在整个服务器集群中比较均匀分布。假设图片 beijing.jpg 的 hashcode 值是 10,则存放时放在 10 % 4 = 2 编号为 2 的缓存服务器上,下次取 beijing.jpg 时,可以直接根据 hashcode 值,去编号为 2 的缓存服务器取,只需要一次查询。

Hash 算法问题

这种方法虽然提升了性能,但是还是存在一些缺陷,主要体现在服务器数量发生变动时,缓存的位置就需要发生改变。假设现在 4 台服务器已经满足不了我们的需求,需要添加到 5 台服务器,则还是以 beijing.jpg 为例,之前存储的时候存储在编号为 2 的缓存服务器,现在在取图片时,10 % 5 = 0 ,根据 hashcode 值计算出的缓存服务器编号为 0,无法命中缓存。也就说说缓存服务器的数量发生变动时,大部分缓存一定时间内是失效的,在缓存重建的这段时间内,所有的请求都会从数据库获取数据,容易导致缓存雪崩问题。

一致性 Hash 算法原理

一致性 Hash 算法通过构建环状的 Hash 空间替线性 Hash 空间的方法解决了这个问题,整个 Hash 空间被构建成一个首位相接的环。

其具体的构造过程为:

  1. 先构造一个长度为 2^32 的一致性 Hash 环
  2. 计算每个缓存服务器的 Hash 值,并记录,这就是它们在 Hash 环上的位置
  3. 对于每个图片,先根据 key 的 hashcode 得到它在 Hash 环上的位置,然后在 Hash 环上顺时针查找距离这个 Key 的 Hash 值最近的缓存服务器节点,这就是该图片所要存储的缓存服务器

当缓存服务器需要扩容的时候,只需要将新加入缓存服务器的 Hash 值放入一致性 Hash 环中,由于 Key 是顺时针查找距离其最近的节点,因此新加入的几点只影响整个环中的一小段。加入新节点 NODE3 后,原来的 Key 大部分还能继续计算到原来的节点,只有 Key3、Key0 从原来的 NODE1 重新计算到 NODE3,这样就能保证大部分被缓存数据还可以命中。当节点被删除时,其他节点在环上的映射不会发生改变,只是原来打在对应节点的 key 现在会转移到顺时针方向的下一个节点上。

Hash 环数据倾斜

新加入的节点 NODE3 只影响了原来的节点 NODE1,也就是说一部分原来需要访问 NODE1 的缓存数据现在需要访问 NODE3(概率上是 50%)但是原来的节点 NODE0 和 NODE2 不受影响,这就意味着 NODE0 和 NODE2 缓存数据量和负载压力是 NODE1 与 NODE3 的两倍。

虚拟节点

为了解决这个问题,最好的办法就是扩展整个环上的节点数量,可以将每台物理缓存服务器虚拟为一组虚拟缓存服务器,使得 Hash 环在空间上的分割更加均匀。

这样只是将虚拟节点的 Hash 值放置在 Hash 环上,在查找时,首先根据 Key 值找到环上的虚拟节点,然后再根据虚拟节点找到真实额缓存服务器。虚拟节点的数目足够多,就会使得节点在 Hash 环上的分布更加随机化,也就是增加或者删除一台缓存服务器时,都会较为均匀的影响原来集群中已经存在的缓存服务器。

一致性 Hash 在 Dubbo 中的应用

在 Dubbo 中如果同一环境下服务提供者的数量大于 1,就会出现服务消费者要选择哪个 Provider 进行调用的问题。为了避免集群中部分服务器压力过大,另一些服务器比较空闲的情况,就需要通过负载均衡,让负载“均摊”到不同的机器上。Dubbo 中就提供了基于一致性 Hash 的负载均衡算法,只是将缓存服务器替换成 Provider,根据 Provider 和 Consumer 的 IP 地址获取 hashcode。

一致性 Hash 算法的缺陷

虽然一致性 Hash 算法已经十分完善,但是还是有很多不足的地方

  1. Hash 环上的节点非常多或者更新频繁时,查询效率比较低下
  2. 整个 Hash 环需要一个服务路由来做负载均衡,存在单点问题

针对这两个问题,Redis 在实现自己的分布式集群方案时,采用了基于 P2P 结构的哈希槽算法

  1. 使用哈希槽
    1. Redis Cluster 通过分片的方式将整个缓存划分为 16384 个槽,每个缓存节点就相当于 Hash 换上的一个节点,接入集群时,所有实例都将均匀占有这些哈希槽,当需要查询一个 Key 是,首先根据 Key 的 hashcode 对 16384 取余来得到 Key 属于哪个槽,并映射到缓存实例上。
    2. 为了方便描述,将 16384 抽象为 20个哈希槽单位,在增加缓存实例时,假设原来有 4 个节点,分配的哈希槽为 0-4,5-9,10-14,15-19 现在增加一个节点,Redis Cluster 的做法是将之前每台服务器上的一部分哈希槽移动到第四个节点上,更新后的哈希槽分配为 1-4,6-9,11-14,16-19 第四个节点为 0,5,10,15。删除也是同理
  2. 去中心化
    1. 每个节点都保存有完整的哈希槽-节点的映射表,也就是说,每个节点都知道自己拥有哪些哈希槽,以及某个确定的哈希槽究竟对应着哪个节点。
    2. 无论向哪个节点发出寻找 Key 的请求,该节点都会通过求余计算该 Key 究竟存在于哪个哈希槽,并将请求转发至该哈希槽所在的节点。

一致性 Hash 算法实现

数据结构选取

一致性 Hash 算法首先需要考虑的问题就是:构造一个长度为 2^32 的整数环,根据节点名称的 Hash 值将服务器节点放置在这个 Hash 环上。

排序 + List

算出所有待加入的数据结构的节点名称的 Hash 值放入一个数组中,然后使用排序算法将其从小到大进行排序,最后将排序后的数据放入 List 中。之后,待路由的节点,只需要在 List 中找到第一个 Hash 值比它大的服务器节点就可以了。比如服务器节点的 Hash 值是 [0,2,4,6,8,10],待路由的节点是 7,则只需要找到第一个比 7 大的整数,也就是 8,就是我们要找的最终需要路由过去的服务器节点。

直接使用 Arrays#sort() 时间复杂度为 O(NLogN),则总的时间复杂度为:

  1. 最好情况,一次找到 O(1) + O(NLogN)
  2. 最坏情况,最后一次找到 O(N) + O(NLogN)

则总的时间复杂度为 O(NLogN)

遍历 + List

由于排序比较消耗性能,那么可以选择不排序,直接遍历的方式:

  1. 服务器节点不排序,其 Hash 值全部放到一个 List 中
  2. 等待路由的节点,算出其 Hash 值,由于指明了顺时针,因此遍历 List,比待路由节点 Hash 值大的算出差值并记录,比待路由节点 Hash 值小的忽略
  3. 算出所有的差值之后,最小的那个,就是最终需要路由过去的节点

现在算下时间复杂度:

  1. 最好的情况是只有一个服务器节点的 Hash 值大于带路由节点的 Hash 值,O(N) + O(1)
  2. 最坏的情况是所有服务器节点的 Hash 值都大于待路由节点的 Hash 值,时间复杂度是 O(N) + (N)

则总的时间复杂度为 O(N)

红黑树

选用红黑树的原因

  1. 红黑树主要的作用是用于存储有序的数据,因此相当于省略了排序这步骤,效率很高
  2. JDK 里面提供了红黑树的代码实现 TreeMap 和 TreeSet
  3. 红黑树提供了一个方法 ceilingEntry(Integer key) 可以直接获取 key 右边的第一个节点,如果节点为空,表示已经到尾,则直接取树的第一个节点

时间复杂度分析:O(logN)

Hash 值重新计算

服务器节点一般用字符串表示,比如 192.168.1.1,根据字符串得到其 Hash 值,所以一个重要的问题就是 Hash 值重新计算,首先看下直接使用 String#hashCode() 的结果

public static void main(String[] args) {
    System.out.println("192.168.0.0:111 的哈希值:" + 			Math.abs("192.168.0.0:111".hashCode()));
    System.out.println("192.168.0.1:111 的哈希值:" + Math.abs("192.168.0.1:111".hashCode()));
    System.out.println("192.168.0.2:111 的哈希值:" + Math.abs("192.168.0.2:111".hashCode()));
    System.out.println("192.168.0.3:111 的哈希值:" + Math.abs("192.168.0.3:111".hashCode()));
    System.out.println("192.168.0.4:111 的哈希值:" + Math.abs("192.168.0.4:111".hashCode()));
}
/** output **/
192.168.0.0:111 的哈希值771739798
192.168.0.1:111 的哈希值770816277
192.168.0.2:111 的哈希值769892756
192.168.0.3:111 的哈希值768969235
192.168.0.4:111 的哈希值768045714

明显可以看到,在 [0,2^23 - 1] 这个大区间中,5 和 HashCode 值仅仅分布在一个小区间,导致某个服务器的负载会特别大。因此需要一种新的计算 Hash 值的方法, 这种重新计算Hash值的算法有很多,比如 CRC32_HASH、FNV1_32_HASH、KETAMA_HASH 等,其中 KETAMA_HASH 是默认的 Reids 推荐的一致性 Hash 算法,用别的 Hash 算法也可以,比如 FNV1_32_HASH 算法的计算效率就会高一些,这里选用 FNV1_32_HASH 算法

虚拟节点

没有虚拟节点,就会出现前文提到的 Hash 环数据倾斜问题,某种程度上来说,这样就失去了负载均衡的意义,因为负载均衡的目的就是为了使得目标服务器均分所有的请求

可以通过引入虚拟节点减小负载均衡问题,查阅资料了解到,物理服务器很少,则需要更多的虚拟节点;反之,物理服务器比较多,虚拟节点就少一点。

构造虚拟节点需要考虑两个问题:

  1. 一个真实的节点如何对应多个虚拟节点
  2. 虚拟节点找到后如何还原成真实的节点

参考其他博客有个简单的方法,给每个真实节点机上后缀再取 Hash 值,比如:”192.168.0.0:111” 就可以变成 “192.168.0.0:111&&VN0”,”192.168.0.0:111&&VN1”,”192.168.0.0:111&&VN2”,”192.168.0.0:111&&VN3”,还原的时候只需要从头截取字符串到 ”&&“ 位置就可以了

代码实现

通用部分

通用接口

public interface LoadBalancer {

    // 添加服务器节点
    public void addServerNode(String serverNodeName);

    // 删除服务器节点
    public HashMap<String, String> delServerNode(String serverNodeName);

    // 选择服务器节点
    public String selectServerNode(String requestURL);
}

使用 FNVI_32_HASH 算法计算 Hash 值

public class GetHashCode {

    private static final long FNV_32_INIT = 2166136261L;
    private static final int FNV_32_PRIME = 16777619;

    public int getHashCode(String origin) {

        final int p = FNV_32_PRIME;
        int hash = (int)FNV_32_INIT;
        for (int i = 0; i < origin.length(); i++) {
            hash = (hash ^ origin.charAt(i)) * p;
        }
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        hash = Math.abs(hash);

        return hash;
    }
}

请求 IP 构造

public class IPAddressGenerate {

    public String[] getIPAddress(int num) {
        String[] res = new String[num];
        Random random = new Random();
        for (int i = 0; i < num; i++) {
            res[i] = String.valueOf(random.nextInt(256)) + "." + String.valueOf(random.nextInt(256)) + "."
                    + String.valueOf(random.nextInt(256)) + "." + String.valueOf(random.nextInt(256)) + ":"
                    + String.valueOf(random.nextInt(9999));
        }
        return res;
    }
}

请求在每台服务器上分布的数量方差计算

public class Analysis {

    // <节点,服务器>
    public double analysis(HashMap<String, String> map) {
        Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
        // 计数 map <Server, Count>
        ConcurrentHashMap<String, Integer> countMap = new ConcurrentHashMap<>();
        while (iterator.hasNext()) {
            Map.Entry<String, String> entry = iterator.next();
            String server = entry.getValue();
            if (countMap.containsKey(server)) {
                countMap.put(server, countMap.get(server) + 1);
            } else {
                countMap.put(server, 1);
            }
        }
        Collection<Integer> values = countMap.values();
        Iterator<Integer> val = values.iterator();
        int count = 0;
        int[] res = new int[values.size()];
        while (val.hasNext()) {
            res[count] = val.next();
        }
        return variance(res);
    }

    // 求方差
    private static double variance(int[] arr) {
        int m = arr.length;
        double sum = 0;
        for (int i = 0; i < m; i++) {
            sum += arr[i];
        }
        double dAve = sum / m;
        double dVar = 0;
        for (int i = 0; i < m; i++) {
            dVar += (arr[i] - dAve) * (arr[i] - dAve);
        }
        return dVar / m;
    }
}

不带虚拟节点实现

public class SortedMapWithoutVirtualNode implements LoadBalancer {

    private static Logger logger = LoggerFactory.getLogger(SortedMapWithoutVirtualNode.class);

    private TreeMap<Integer, String> treeMapHash;

    @Override
    public void addServerNode(String serverNodeName) {
        int hash = new GetHashCode().getHashCode(serverNodeName);
        treeMapHash.put(hash, serverNodeName);
    }

    @Override
    public void delServerNode(String serverNodeName) {
        int hash = new GetHashCode().getHashCode(serverNodeName);
        treeMapHash.remove(hash);
        logger.info("服务器节点:{} 下线", serverNodeName);
    }

    @Override
    public String selectServerNode(String requestURL) {
        int hash = new GetHashCode().getHashCode(requestURL);
        // 向右寻找第一个 key
        Map.Entry<Integer, String> subEntry= treeMapHash.ceilingEntry(hash);
        // 设置成一个环,如果超过尾部,则取第一个点
        subEntry = subEntry == null ? treeMapHash.firstEntry() : subEntry;
        return subEntry.getValue();
    }

    // 构建 Hash 环
    public SortedMap<Integer, String> buildHash(TreeMap<Integer, String> treeMap) {
        this.treeMapHash = treeMap;
        return treeMapHash;
    }
}

测试代码

public class SortedMapWithoutVirtualNodeTest {

    private static Logger logger = LoggerFactory.getLogger(SortedMapWithoutVirtualNodeTest.class);

    private static SortedMapWithoutVirtualNode sortedMapWithoutVirtualNode = new SortedMapWithoutVirtualNode();

    // Hash 环
    private static SortedMap<Integer, String> treeMapHash;
    // 服务器总数
    private static final int SERVERS_NUM = 100;

    // 待加入 Hash 环的服务器列表
    private static ArrayList<String> servers = new ArrayList<>();

    private static void init() {
        // 构造服务器数据
        for (int i = 0; i < SERVERS_NUM; i++) {
            StringBuilder stringBuilder = new StringBuilder();
            servers.add(stringBuilder.append("192.168.0.").append(String.valueOf(i)).toString());
        }
        // 构建 Hash 环
        treeMapHash = sortedMapWithoutVirtualNode.buildHash(new TreeMap<Integer, String>());
        // 将服务器添加到 Hash 环中
        for (int i = 0; i < SERVERS_NUM; i++) {
            sortedMapWithoutVirtualNode.addServerNode(servers.get(i));
        }
    }

    public static void main(String[] args) {

        init();

        // 请求节点
        String[] nodes = new IPAddressGenerate().getIPAddress(10000);
        // <节点,服务器>
        HashMap<String, String> map = new HashMap<>();
        for (int i = 0; i < nodes.length; i++) {
            // 选择服务器
            String serverIP = sortedMapWithoutVirtualNode.selectServerNode(nodes[i]);
            // 记录服务器信息
            map.put(nodes[i], serverIP);
        }

        logger.info("初始方差: " + new Analysis().analysis(map));
    }
}

带虚拟节点实现

public class SortedMapWithVirtualNode implements LoadBalancer {

    private static Logger logger = LoggerFactory.getLogger(SortedMapWithVirtualNode.class);

    private TreeMap<Integer, String> treeMapHash;

    @Override
    public void addServerNode(String serverNodeName) {
        int hash = new GetHashCode().getHashCode(serverNodeName);
        treeMapHash.put(hash, serverNodeName);
        // logger.info("服务器节点:{} 上线", serverNodeName);
    }

    @Override
    public void delServerNode(String serverNodeName) {
        int hash = new GetHashCode().getHashCode(serverNodeName);
        treeMapHash.remove(hash);
        logger.info("服务器节点:{} 下线", serverNodeName);
    }

    @Override
    public String selectServerNode(String requestURL) {
        int hash = new GetHashCode().getHashCode(requestURL);
        // 向右寻找第一个 key
        Map.Entry<Integer, String> subEntry = treeMapHash.ceilingEntry(hash);
        // 设置成一个环,如果超过尾部,则取第一个点
        subEntry = subEntry == null ? treeMapHash.firstEntry() : subEntry;
        String VNNode = subEntry.getValue();
        return VNNode.substring(0, VNNode.indexOf("&&"));
    }

    // 构建 Hash 环
    public SortedMap<Integer, String> buildHash(TreeMap<Integer, String> treeMap) {
        this.treeMapHash = treeMap;
        return treeMapHash;
    }
}

测试代码

public class SortedMapWithVirtualNodeTest {

    private static Logger logger = LoggerFactory.getLogger(SortedMapWithVirtualNodeTest.class);

    private static SortedMapWithVirtualNode sortedMapWithVirtualNode = new SortedMapWithVirtualNode();

    // Hash 环
    private static SortedMap<Integer, String> treeMapHash;
    // 服务器总数
    private static final int SERVERS_NUM = 100;
    // 每台服务器需要设置的虚拟节点数
    private static final int VIRTUAL_NODES = 10;

    // 待加入 Hash 环的服务器列表
    private static ArrayList<String> serverList = new ArrayList<>();

    private static void init() {
        // 构造服务器数据
        for (int i = 0; i < SERVERS_NUM; i++) {
            String s = new StringBuilder().append("192.168.0.").append(String.valueOf(i)).toString();
            serverList.add(s);
        }
        // 构建 Hash 环
        treeMapHash = sortedMapWithVirtualNode.buildHash(new TreeMap<Integer, String>());
        // 将服务器的虚拟节点添加到 Hash 环中
        for (String s : serverList) {
            for (int i = 0; i < VIRTUAL_NODES; i++) {
                String VNNode = s + "&&VN" + String.valueOf(i);
                sortedMapWithVirtualNode.addServerNode(VNNode);
            }
        }
    }

    public static void main(String[] args) {
        init();

        // <节点,服务器>
        HashMap<String, String> map = new HashMap<>();

        // 请求节点
        String[] nodes = new IPAddressGenerate().getIPAddress(10000);
        // <节点,服务器>
        for (int i = 0; i < nodes.length; i++) {
            // 选择服务器
            String serverIP = sortedMapWithVirtualNode.selectServerNode(nodes[i]);
            // 记录服务器信息
            map.put(nodes[i], serverIP);
        }

        logger.info("虚拟节点,初始方差: " + new Analysis().analysis(map));
    }
}

性能分析

模拟 1000 个请求,100 台服务器,每台服务器 100 个虚拟节点,比较方差为:

  • 不带虚拟节点:143.98
  • 带虚拟节点为:51.32

参考其他博客,使用 FNVI_32_HASH 算法计算 Hash 值,在服务器增加后,缓存的命中率为 78% 左右

参考

Search

    Table of Contents