您的位置:首页 > 编程语言 > Java开发

分布式缓存的一致性Hash的Java实现

2016-12-24 22:27 507 查看

分布式缓存的一致性Hash的Java实现

关于分布式缓存一致性Hash算法的原理,有很多书籍、博客都有详细的介绍。本文主要是想对一致性Hash算法进行一个小小的实现,方便自己更好的理解。

算法的具体原理如下:

先构造一个长度为2^32的整数环(这个环被称为一致性Hash环),根据节点名称的Hash值(其分布为[0, 2^32-1])将服务器节点放置在这个Hash环上,然后根据数据的Key值计算得到其Hash值(其分布也为[0, 2^32-1]),接着在Hash环上顺时针查找距离这个Key值的Hash值最近的服务器节点,完成Key到服务器的映射查找。

一致性Hash的原理图如下:



第一个版本:不使用虚拟节点的一致性Hash算法的实现

首先了,我们写一个ServerNode类,代表服务器节点。

这个类比较简单,包含两个属性:名字和哈希值

package hash;

public class ServerNode {
private String serverNodeName;
private long  serverNodeHash;

public ServerNode(String serverNodeName, long serverNodeHash) {
super();
this.serverNodeName = serverNodeName;
this.serverNodeHash = serverNodeHash;
}
public String getServerNodeName() {
return serverNodeName;
}
public void setServerNodeName(String serverNodeName) {
this.serverNodeName = serverNodeName;
}
public long getServerNodeHash() {
return serverNodeHash;
}
public void setServerNodeHash(long serverNodeHash) {
this.serverNodeHash = serverNodeHash;
}
}


然后,我们就根据最上面所描述的算法进行实现。

可能首先要考虑的问题就是:用什么样的数据结构来存储服务器节点。本文采用的是“排序+List”。在http://www.cnblogs.com/xrq730/p/5186728.html文章中介绍了三种存储结构,第一种为:排序+List;第二种为:遍历+List;第三种为:二叉树(推荐使用)。

上面所了存储结构的事,在ConsistentHashWithoutVirtualNode类中,包含以下几种方法:

1、addServer;添加服务器节点的方法,这个是必须的

2、deleteServer;如果有服务器节点宕机,则在集群中需要删除这个节点

3、hash(),hash函数,这个是必须的。

4、getServerAccordKey;根据指定的key得到其路由到哪个服务器节点上。

具体实现代码如下:

package hash;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.zip.CRC32;

public class ConsistentHashWithoutVirtualNode {
//用来存储服务器节点对象
List<ServerNode> serverNodes= new ArrayList<ServerNode>();

//添加服务器节点
public void addServerNode(String serverNodeName){
if(serverNodeName==null){
return;
}
//利用Hash算法,求出服务器节点的Hash值
long serverNodeHash = getHash(serverNodeName);
ServerNode serverNode = new ServerNode(serverNodeName,serverNodeHash);
serverNodes.add(serverNode);

//将serverNodes进行排序
Collections.sort(serverNodes,new Comparator<ServerNode>() {

@Override
public int compare(ServerNode node1, ServerNode node2) {
if(node1.getServerNodeHash()<node2.getServerNodeHash()){
return -1;
}
return 1;
}

});
}

public long getHash(String serverNodeName) {
CRC32 crc32 = new CRC32();
crc32.update(serverNodeName.getBytes());
return crc32.getValue();
}
//删除服务器节点
public void deleteServerNode(String serverName){
//这里假设所有服务器名字不一样,则直接遍历名字是否相同即可
int serverNum=serverNodes.size();
for(int i=0;i<serverNum;i++){
ServerNode node = serverNodes.get(i);
if(node.getServerNodeName().equals(serverName)){
serverNodes.remove(node);
return;
}
}
}
//得到应当路由到的服务器结点
public ServerNode getServerNode(String key){
//得到key的hash值
long hash = getHash(key);
//在serverNodes中找到大于hash且离其最近的的那个ServerNode
//由于serverNodes是升序排列的,因此,找到的第一个大于hash的就是目标节点
for(ServerNode node:serverNodes){
if(node.getServerNodeHash()>hash){
return node;
}
}
//如果没有找到,则说明此key的hash值比所有服务器节点的hash值都大,因此返回最小hash值的那个Server节点
return serverNodes.get(0);

}

public void printServerNodes(){
System.out.println("所有的服务器节点信息如下:");
for(ServerNode node:serverNodes){
System.out.println(node.getServerNodeName()+":"+node.getServerNodeHash());
}
}

}


测试函数如下:

public static void main(String[] args){
ConsistentHashWithoutVirtualNode ch = new ConsistentHashWithoutVirtualNode();
//添加一系列的服务器节点
String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
"192.168.0.3:111", "192.168.0.4:111"};
for(String server:servers){
ch.addServerNode(server);
}
//打印输出一下服务器节点
ch.printServerNodes();

//看看下面的客户端节点会被路由到哪个服务器节点
String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
System.out.println("此时,各个客户端的路由情况如下:");
for(String node:nodes){
ServerNode serverNode = ch.getServerNode(node);
System.out.println(node+","+ ch.getHash(node)+"------->"+
serverNode.getServerNodeName()+","+serverNode.getServerNodeHash());
}

//如果由一个服务器节点宕机,即需要将这个节点从服务器集群中移除
ch.deleteServerNode("192.168.0.0:111");

System.out.println("删除节点后,再看看同样的客户端的路由情况,如下:");
for(String node:nodes){
ServerNode serverNode = ch.getServerNode(node);
System.out.println(node+","+ ch.getHash(node)+"------->"+
serverNode.getServerNodeName()+","+serverNode.getServerNodeHash());
}

}


运行结果如下:



从结果中可以看出,以上的代码就简单的模拟了下服务器端集群中有新的服务器添加到集群中,有服务器宕机的情况。

第二个版本:使用虚拟节点来实现一致性Hash

上面的例子中,虽然我们的测试客户端只有三个,但也可以看到,有两个客户端映射到了同一个服务器端,有一个客户端映射到了另一个服务器端,而剩余的服务器端都没有测试客户端。

因此,从某种程度上来说,这失去了负载均衡的意义,因为负载均衡的目的本身就是为了使得目标服务器均分所有的请求。

因此,解决方法就是:引入“虚拟节点”。其工作原理是:将一个物理节点拆分为多个虚拟节点,并且同一个物理节点的虚拟节点尽量均匀分布在Hash环上。采取这样的方式,就可以有效地解决增加或减少节点时候的负载不均衡的问题。

引入一个虚拟节点类

package hash2;

public class VirtualServerNode {
private String serverNodeName;//这个名字指的是其对应的真实的物理服务器节点的名字
private long virtualServerNodeHash;

public VirtualServerNode(String serverNodeName, long virtualServerNodeHash) {
super();
this.serverNodeName = serverNodeName;
this.virtualServerNodeHash = virtualServerNodeHash;
}
public String getServerNodeName() {
return serverNodeName;
}
public void setServerNodeName(String serverNodeName) {
this.serverNodeName = serverNodeName;
}
public long getVirtualServerNodeHash() {
return virtualServerNodeHash;
}
public void setVirtualServerNodeHash(long virtualServerNodeHash) {
this.virtualServerNodeHash = virtualServerNodeHash;
}

}


然后,按照第一个版本的思路,改吧改吧就形成了如下引入虚拟节点的代码。

package hash2;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.zip.CRC32;

public class ConsistentHashWithVirtualNode {
//保存虚拟服务器节点节点
List<VirtualServerNode> virtualServerNodes = new ArrayList<VirtualServerNode>();
//每个物理节点对应的虚拟节点的个数
private final static int VIRTUAL_NUM = 5;
//添加服务器节点
public void addServerNode(String serverName){
if(serverName==null){
return;
}
for(int i=0;i<VIRTUAL_NUM;i++){
//这里假设,虚拟节点的名字为类似这样的形式:serverName+"&&VN"+i,这样方便从虚拟节点得到物理节点
String virtualServerNodeName = serverName+"&&VN"+i;
long hash = getHash(virtualServerNodeName);
VirtualServerNode vsNode = new VirtualServerNode(serverName, hash);
virtualServerNodes.add(vsNode);
}
//将virtualServerNodes进行排序
Collections.sort(virtualServerNodes,new Comparator<VirtualServerNode>() {

@Override
public int compare(VirtualServerNode node1, VirtualServerNode node2) {
if(node1.getVirtualServerNodeHash()<node2.getVirtualServerNodeHash()){
return -1;
}
return 1;
}

});

}
public long getHash(String serverNodeName) {
CRC32 crc32 = new CRC32();
crc32.update(serverNodeName.getBytes());
return crc32.getValue();
}
//删除服务器节点,即要删除其物理服务器节点对应的所有虚拟节点
public void deleteServerNode(String serverName){

for(int i=0;i<virtualServerNodes.size();i++){
VirtualServerNode node = virtualServerNodes.get(i);

if(node.getServerNodeName().contains(serverName)){//这里用了contain查找,这里就把该物理服务器节点对应的虚拟节点都删除了
virtualServerNodes.remove(node);
/*
* 删除元素后,需要把下标减一。这是因为在每次删除元素后,ArrayList会将后面部分的元素依次往上挪一个位置(就是copy),
* 所以,下一个需要访问的下标还是当前下标,所以必须得减一才能把所有元素都遍历完。
* */
i--;
}
}
}
//得到应当路由到的结点
public VirtualServerNode getServerNode(String key){
//得到key的hash值
long hash = getHash(key);
//在VirtualServerNode中找到大于hash且离其最近的的那个VirtualServerNode
//由于serverNodes是升序排列的,因此,找到的第一个大于hash的就是目标节点
for(VirtualServerNode node:virtualServerNodes){
if(node.getVirtualServerNodeHash()>hash){
return node;
}
}
//如果没有找到,则说明此key的hash值比所有服务器节点的hash值都大,因此返回最小hash值的那个Server节点
return virtualServerNodes.get(0);

}

public void printServerNodes(){
System.out.println("所有的服务器节点信息如下:");
for(VirtualServerNode node:virtualServerNodes){
System.out.println(node.getServerNodeName()+":"+node.getVirtualServerNodeHash());
}
}

public static void main(String[] args){
ConsistentHashWithVirtualNode ch = new ConsistentHashWithVirtualNode();
//添加一系列的服务器节点
String[] servers = {"192.168.0.0:111", "192.168.0.1:111", "192.168.0.2:111",
"192.168.0.3:111", "192.168.0.4:111"};
for(String server:servers){
ch.addServerNode(server);
}
//打印输出一下服务器节点
ch.printServerNodes();

//看看下面的客户端节点会被路由到哪个服务器节点
String[] nodes = {"127.0.0.1:1111", "221.226.0.1:2222", "10.211.0.1:3333"};
System.out.println("此时,各个客户端的路由情况如下:");
for(String node:nodes){
VirtualServerNode virtualServerNode = ch.getServerNode(node);
System.out.println(node+","+ ch.getHash(node)+"------->"+
virtualServerNode.getServerNodeName()+","+virtualServerNode.getVirtualServerNodeHash());
}

//如果由一个服务器节点宕机,即需要将这个节点从服务器集群中移除
String deleteNodeName="192.168.0.2:111";
ch.deleteServerNode(deleteNodeName);

System.out.println("删除节点"+deleteNodeName+"后,再看看同样的客户端的路由情况,如下:");
for(String node:nodes){
VirtualServerNode virtualServerNode = ch.getServerNode(node);
System.out.println(node+","+ ch.getHash(node)+"------->"+
virtualServerNode.getServerNodeName()+","+virtualServerNode.getVirtualServerNodeHash());
}

}
}


小结

参考下面两篇博文,有了上面的代码,发现还是挺有意思的,对一致性hash有了更深的一个理解。

参考资料

1、http://www.cnblogs.com/xrq730/p/5186728.html

2、http://www.cnblogs.com/java-zhao/p/5223926.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: