您的位置:首页 > 其它

第十一章 自己实现一致性hash算法

2016-12-29 00:00 176 查看
关于一致性hash算法的意义以及其相对于简单求余法(除数求余法)的好处,查看第六章 memcached剖析

注意:真实的hash环的数据结构是二叉树,这里为了简便使用了列表List

1、一致性hash算法的使用地方

memcached服务器

Jedis分片机制

2、真实服务器节点没有虚拟化的一致性hash算法实现

ServerNode:真实服务器节点





1 package hash;  2
3 /**
4  * server节点  5  */
6 public class ServerNode {  7     private String serverName;  8     private long serverHash;  9
10     public String getServerName() { 11         return serverName; 12  } 13
14     public void setServerName(String serverName) { 15         this.serverName = serverName; 16  } 17
18     public long getServerHash() { 19         return serverHash; 20  } 21
22     public void setServerHash(long serverHash) { 23         this.serverHash = serverHash; 24  } 25
26     /**
27  * 下边重写hashcode()和equals()是为了在删除节点的时候只根据传入的serverName删除即可 28      */
29  @Override 30     public int hashCode() { 31         final int prime = 31; 32         int result = 1; 33         result = prime * result 34                 + ((serverName == null) ? 0 : serverName.hashCode()); 35         return result; 36  } 37
38  @Override 39     public boolean equals(Object obj) { 40         if (this == obj) 41             return true; 42         if (obj == null) 43             return false; 44         if (getClass() != obj.getClass()) 45             return false; 46         ServerNode other = (ServerNode) obj; 47         if (serverName == null) { 48             if (other.serverName != null) 49                 return false; 50         } else if (!serverName.equals(other.serverName)) 51             return false; 52         return true; 53  } 54
55
56 }


View Code

注意:

serverName可以自己取名,这里取名为"ip:port"

对于hashCode()和equals()方法的重写仅仅是为了删除服务器节点的时候,只根据serverName就可以删除,而不需要再计算服务器节点的hash值

ServerComparator:真实服务器比较器





1 package hash;  2
3 import java.util.Comparator;  4
5 /**
6  * 服务器排序比较器  7  */
8 public class ServerComparator implements Comparator<ServerNode> {  9
10     public int compare(ServerNode node1, ServerNode node2) { 11         if(node1.getServerHash() <= node2.getServerHash()) { 12             return -1;//node1<node2
13  } 14         return 1;//node1>node2
15  } 16
17 }


View Code

注意:

关于java的比较器,有两种:(后者用的多一些)

javabean实现comparable接口,实现compareTo()方法

另外建一个类实现comparator接口,实现其中的compare()方法

ConsistentHash:一致性hash实现类





1 package hash;  2
3 import java.util.ArrayList;  4 import java.util.Collections;  5 import java.util.List;  6 import java.util.zip.CRC32;  7
8 /**
9  * 一致性hash实现(数据结构:list)(服务器没有虚拟化)  10  * 一致性hash的真正数据结构是二叉树  11  */
12 public class ConsistentHash {  13     private List<ServerNode> servers = new ArrayList<ServerNode>();//存放服务器
14
15     /** 计算服务器和存储的键的hash值 */
16     public long hash(String str){  17         CRC32 crc32 = new CRC32();  18  crc32.update(str.getBytes());  19         return crc32.getValue();  20  }  21
22     /**
23  * 添加server到环上  24  * @param serverName ip:port  25      */
26     public void addServer(String serverName){  27
28         ServerNode node = new ServerNode();  29  node.setServerName(serverName);  30  node.setServerHash(hash(serverName));  31
32  servers.add(node);  33         Collections.sort(servers, new ServerComparator());  34  }  35
36     /**
37  * 从环上删除server节点  38      */
39     public void deleteServer(String serverName){  40
41         ServerNode node = new ServerNode();  42  node.setServerName(serverName);  43
44  servers.remove(node);  45  }  46
47     /**
48  * 获取一个缓存key应该存放的位置  49  * @param cachekey 缓存的key  50  * @return 缓存的服务器节点  51      */
52     public ServerNode getServer(String cachekey){  53         long keyHash = hash(cachekey);  54
55         for(ServerNode node : servers){  56             if(keyHash<=node.getServerHash()){  57                 return node;  58  }  59  }  60
61         return servers.get(0);//如果node没有合适放置位置,放在第一台服务器上去
62  }  63
64     /****************测试*******************/
65     public void printServers(){  66         for(ServerNode server : servers){  67             System.out.println(server.getServerName()+"-->"+server.getServerHash());  68  }  69  }  70
71     public static void main(String[] args) {  72         ConsistentHash ch = new ConsistentHash();  73         ch.addServer("127.0.0.1:11211");  74         ch.addServer("127.0.0.1:11212");  75         ch.addServer("127.0.0.2:11211");  76         ch.addServer("127.0.0.2:11212");  77
78  ch.printServers();  79
80         ServerNode node = ch.getServer("hello");  81         System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getServerHash());  82
83         ServerNode node2 = ch.getServer("name");  84         System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getServerHash());  85
86         ServerNode node3 = ch.getServer("a");  87         System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getServerHash());  88
89         /********************删除节点*********************/
90         ch.deleteServer("127.0.0.1:11212");  91  ch.printServers();  92
93         ServerNode node0 = ch.getServer("hello");  94         System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getServerHash());  95
96         ServerNode node02 = ch.getServer("name");  97         System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getServerHash());  98
99         ServerNode node03 = ch.getServer("a"); 100         System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getServerHash()); 101
102  } 103 }


View Code

注意:

在计算服务器节点和存储的key的hash值的时候,不仅仅可以使用crc32算法,还可以使用MD5算法等等,只要是最后得出的结果是一个>=0&&<=232的数就好

在这个实现中,并没有将真实服务器节点进行虚拟化

3、真实服务器节点虚拟化后的一致性hash算法实现

为什么要虚拟化,查看第六章 memcached剖析 ,这里只列出几条原因:

在memcached服务器较少的情况下,很难平均的分布到hash环上,这样就会造成负载不均衡--引入虚拟化节点,可以解决这个问题

当一台memcached宕机时,其原先所承受的压力全部给了其下一个节点,为了将其原先所承受的压力尽可能的分布给所有剩余的memcached节点,引入虚拟化节点可以达到这个目的

当新添加了一台memcached服务器server1时,server1只会缓解其中的一台服务器(即server1插入环后,server1的下一个节点)的压力,为了可以让server1尽可能的缓解所有的memcached服务器的压力,引入虚拟节点可以达到这个目的

VirtualServerNode:虚拟节点





1 package hash2;  2
3 /**
4  * 虚拟节点  5  */
6 public class VirtualServerNode {  7     private String serverName;//真实节点名称
8     private long virtualServerHash;//虚拟节点hash
9
10     public String getServerName() { 11         return serverName; 12  } 13
14     public void setServerName(String serverName) { 15         this.serverName = serverName; 16  } 17
18     public long getVirtualServerHash() { 19         return virtualServerHash; 20  } 21
22     public void setVirtualServerHash(long virtualServerHash) { 23         this.virtualServerHash = virtualServerHash; 24  } 25
26 }


View Code

注意:

该类中的serverName是该虚拟节点对应的真实节点的名称,这里就是"ip:port"

真正的虚拟节点的名称是serverName-i(其中,i是0~virtualCount的整数值),这一块儿请查看ConsistentHashWithVirtualNode的addServer(String serverName)

VirtualServerComparator:虚拟节点比较器





1 package hash2;  2
3 import java.util.Comparator;  4
5 /**
6  * 虚拟节点比较器  7  */
8 public class VirtualServerComparator implements Comparator<VirtualServerNode> {  9
10     public int compare(VirtualServerNode node1, VirtualServerNode node2) { 11         if(node1.getVirtualServerHash() <= node2.getVirtualServerHash()) { 12             return -1; 13  } 14         return 1; 15  } 16
17 }


View Code

ConsistentHashWithVirtualNode:真实节点虚拟化后的一致性hash算法





1 package hash2;  2
3 import java.util.ArrayList;  4 import java.util.Collections;  5 import java.util.List;  6 import java.util.zip.CRC32;  7
8 /**
9  * 具有虚拟节点的一致性hash实现(数据结构:list)  10  * 一致性hash的真正数据结构是二叉树  11  */
12 public class ConsistentHashWithVirtualNode {  13     private List<VirtualServerNode> virtualServers = new ArrayList<VirtualServerNode>();//存放虚拟节点
14     private static final int virtualCount = 8;//每个真实节点虚拟成8个虚拟节点
15
16     /** 计算服务器和存储的键的hash值 */
17     public long hash(String str){  18         CRC32 crc32 = new CRC32();  19  crc32.update(str.getBytes());  20         return crc32.getValue();  21  }  22
23     /**
24  * 添加server的虚拟节点到环上  25  * @param serverName ip:port  26      */
27     public void addServer(String serverName){  28
29         for(int count=0;count<virtualCount;count++){  30             VirtualServerNode node = new VirtualServerNode();  31  node.setServerName(serverName);  32             node.setVirtualServerHash(hash(serverName+"-"+count));//虚拟节点的名字:serverName+"-"+count
33  virtualServers.add(node);  34  }  35
36         Collections.sort(virtualServers, new VirtualServerComparator());  37  }  38
39     /**
40  * 从环上删除server节点(需要删除所有的该server节点对应的虚拟节点)  41      */
42     public void deleteServer(String serverName){  43
44         /*
45  * 在这种删除的时候,会出现java.util.ConcurrentModificationException  46  * 这是因为此处的遍历方式为使用ArrayList内部类Itr进行遍历,  47  * 在遍历的过程中发生了remove、add等操作,导致modCount发生了变化,  48  * 产生并发修改异常,  49  * 可以使用下边的那一种方式来进行遍历(遍历方式不是Itr),  50  * 再这样的遍历过程中,add和remove都是没有问题的  51          */
52         /*for(VirtualServerNode node : virtualServers){  53  if(node.getServerName().equals(serverName)){  54  virtualServers.remove(node);  55  }  56  }*/
57         for(int i=0;i<virtualServers.size();i++) {  58             VirtualServerNode node = virtualServers.get(i);  59             if(node.getServerName().equals(serverName)) {  60  virtualServers.remove(node);  61  }  62  }  63
64  }  65
66     /**
67  * 获取一个缓存key应该存放的位置  68  * @param cachekey 缓存的key  69  * @return 缓存的服务器节点  70      */
71     public VirtualServerNode getServer(String cachekey){  72         long keyHash = hash(cachekey);  73
74         for(VirtualServerNode node : virtualServers){  75             if(keyHash<=node.getVirtualServerHash()){  76                 return node;  77  }  78  }  79
80         return virtualServers.get(0);//如果node没有合适放置位置,放在第一台服务器上去
81  }  82
83     /****************测试*******************/
84     public void printServers(){  85         for(VirtualServerNode server : virtualServers){  86             System.out.println(server.getServerName()+"-->"+server.getVirtualServerHash());  87  }  88  }  89
90     public static void main(String[] args) {  91         ConsistentHashWithVirtualNode ch = new ConsistentHashWithVirtualNode();  92         ch.addServer("127.0.0.1:11211");  93         ch.addServer("127.0.0.1:11212");  94         ch.addServer("127.0.0.2:11211");  95         ch.addServer("127.0.0.2:11212");  96
97  ch.printServers();  98
99         VirtualServerNode node = ch.getServer("hello"); 100         System.out.println(ch.hash("hello")+"-->"+node.getServerName()+"-->"+node.getVirtualServerHash()); 101
102         VirtualServerNode node2 = ch.getServer("name"); 103         System.out.println(ch.hash("name")+"-->"+node2.getServerName()+"-->"+node2.getVirtualServerHash()); 104
105         VirtualServerNode node3 = ch.getServer("a"); 106         System.out.println(ch.hash("a")+"-->"+node3.getServerName()+"-->"+node3.getVirtualServerHash()); 107
108         /*********************删除节点之后**********************/
109         ch.deleteServer("127.0.0.1:11212"); 110  ch.printServers(); 111
112         VirtualServerNode node0 = ch.getServer("hello"); 113         System.out.println(ch.hash("hello")+"-->"+node0.getServerName()+"-->"+node0.getVirtualServerHash()); 114
115         VirtualServerNode node02 = ch.getServer("name"); 116         System.out.println(ch.hash("name")+"-->"+node02.getServerName()+"-->"+node02.getVirtualServerHash()); 117
118         VirtualServerNode node03 = ch.getServer("a"); 119         System.out.println(ch.hash("a")+"-->"+node03.getServerName()+"-->"+node03.getVirtualServerHash()); 120
121  } 122 }


View Code

注意:

在实际操作中,一台memcached服务器虚拟成150台比较合适(100~200)

从环上删除节点的算法写的较差,但是考虑到删除节点的操作在实际使用中用的比较少(宕机比较少,人为的删除节点也较少),也无所谓

删除节点的时候,注意使用foreach语法糖去遍历的时候,在遍历的过程中不可以做删除、增加操作,否则会抛出并发修改异常,具体的原因见注释和第二章 ArrayList源码解析;想要实现在遍历的过程中进行删除、增加操作,使用简单for循环,见如上代码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: