您的位置:首页 > 其它

xmemecached中的一致性hash算法

2013-07-08 16:34 183 查看
网上有很多文章都是说一致性hash算法的。但是没有啥代码实现,所以一直不是特别清楚。
周末看了一下xmemcached 这个memcached客户端的代码。
其中有一种选取策略就是一致性hash算法的。
别的不说,贴代码才是硬道理。langyu的blog上写了一部分解释,但是有一部分还是没有看特别明白。
和同事讨论一下,得到了一下的结论。自己mark一下,免得以后忘记。ps:xmemcached版本号是1.4.1

public class KetamaMemcachedSessionLocator extends
AbstractMemcachedSessionLocator {

static final int NUM_REPS = 160;
private transient volatile TreeMap<Long, List<Session>> ketamaSessions = new TreeMap<Long, List<Session>>();
private final HashAlgorithm hashAlg;
private volatile int maxTries;
private final Random random = new Random();

/**
* compatible with nginx-upstream-consistent,patched by wolfg1969
*/
static final int DEFAULT_PORT = 11211;
private final boolean cwNginxUpstreamConsistent;

public KetamaMemcachedSessionLocator() {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = false;
}

public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
this.hashAlg = HashAlgorithm.KETAMA_HASH;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
}

public KetamaMemcachedSessionLocator(HashAlgorithm alg,
boolean cwNginxUpstreamConsistent) {
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
}

public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
super();
this.hashAlg = alg;
this.cwNginxUpstreamConsistent = false;
this.buildMap(list, alg);
}

/**
* 根据,节点生成treeMap hash环
**/
private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>();

String sockStr;//memcached服务器ip+端口
for (Session session : list) {
if (this.cwNginxUpstreamConsistent) {
//不知道干啥的,也没看懂,似乎和nginx负载有关,选择性放弃
InetSocketAddress serverAddress = session
.getRemoteSocketAddress();
sockStr = serverAddress.getAddress().getHostAddress();
if (serverAddress.getPort() != DEFAULT_PORT) {
sockStr = sockStr + ":" + serverAddress.getPort();
}
} else {
sockStr = String.valueOf(session.getRemoteSocketAddress());
}
/**
* Duplicate 160 X weight references
*/
int numReps = NUM_REPS;//默认是160
if (session instanceof MemcachedTCPSession) {
//这个numReps,虚拟节点的数量。如果节点很少,则会发生不均衡的问题。即所有数据都倾斜到了第一个默认节点。
//后面getSessionByHash方法的时候会看到
numReps *= ((MemcachedSession) session).getWeight();
}
if (alg == HashAlgorithm.KETAMA_HASH) {
for (int i = 0; i < numReps / 4; i++) {
//已经写的比较清楚了,这个/4,每4个一组
//其实这里这个算法就是KETAMA_HASH算法的实现,不知道为什么他没有直接调用现成的方法
byte[] digest = HashAlgorithm.computeMd5(sockStr + "-" + i);
//获取虚拟节点的hash值(如果没有i,那么就无所谓虚拟节点,所有server的hash值都相同)
for (int h = 0; h < 4; h++) {
long k = (long) (digest[3 + h * 4] & 0xFF) << 24
| (long) (digest[2 + h * 4] & 0xFF) << 16
| (long) (digest[1 + h * 4] & 0xFF) << 8
| digest[h * 4] & 0xFF;
//Md5编码后,每个虚拟结点对应Md5码16个字节中的4个,组成一个long型数值,做为这个虚拟结点在环中的惟一key。
//langyu 在 http://langyu.iteye.com/blog/684087 中提到,
//之所以用long类型是因为long实现了comparable 接口
//但是实际上int也同样实现了comparable接口,不知道是不是jdk历史遗留问题。
//将一个节点16位转换为4个数值,分布到treemap中。
this.getSessionList(sessionMap, k).add(session);
}
}
} else {
for (int i = 0; i < numReps; i++) {
long key = alg.hash(sockStr + "-" + i);
this.getSessionList(sessionMap, key).add(session);
}
}
}
this.ketamaSessions = sessionMap;
this.maxTries = list.size();
}

//每个map的key上挂载这所有key值相同的虚拟节点。
//理论上说应该能够避免这种情况,但是实际上不可避免的会有这种情况发生。
//因为被分配到了2^32个int值中。而不是md5原有的128位
private List<Session> getSessionList(
TreeMap<Long, List<Session>> sessionMap, long k) {
List<Session> sessionList = sessionMap.get(k);
if (sessionList == null) {
sessionList = new ArrayList<Session>();
sessionMap.put(k, sessionList);
}
return sessionList;
}

// 程序的入口
public final Session getSessionByKey(final String key) {
if (this.ketamaSessions == null || this.ketamaSessions.size() == 0) {
return null;
}

long hash = this.hashAlg.hash(key);
//这个地方是一致性hash的入口
Session rv = this.getSessionByHash(hash);
int tries = 0;
while (!this.failureMode && (rv == null || rv.isClosed())
&& tries++ < this.maxTries) {
hash = this.nextHash(hash, key, tries);
rv = this.getSessionByHash(hash);
}
return rv;
}

//入口参数是缓存对象key的hash值
public final Session getSessionByHash(final long hash) {
TreeMap<Long, List<Session>> sessionMap = this.ketamaSessions;
if (sessionMap.size() == 0) {
return null;
}
Long resultHash = hash;
if (!sessionMap.containsKey(hash)) {

//如果key的hash值没有和整个服务器虚拟节点中的hash值产生重合
SortedMap<Long, List<Session>> tailMap = sessionMap.tailMap(hash);
//获取这个虚拟节点之后的所有节点
if (tailMap.isEmpty()) {
resultHash = sessionMap.firstKey();//如果是最后一个节点,则返回第一个节点。
//这就是为啥如果一致性hash中服务器节点个数非常少,会产生数据分布不均衡的原因
} else {
//如果有则返回第一个节点
resultHash = tailMap.firstKey();
}
}
List<Session> sessionList = sessionMap.get(resultHash);
if (sessionList == null || sessionList.size() == 0) {
return null;
}
int size = sessionList.size();
//随机分配一个值
//这里和同事在考虑,这样set和get被分配到了不同的节点,那么取的时候怎么办。会产生不命中的问题。
//我们最后这样认为:首先这个list的存在就是因为hash值不够均匀产生了碰撞。
//这个概率应该是有,但是不应该很高,也就是说这个list的长度不会很大。
//另外即使发生了冲突的问题,每次随机选取一个节点,这样最坏情况是同一份数据在多个节点中都存在。
//这个对于memcached这种缓存场景来说完全是允许的。
return sessionList.get(this.random.nextInt(size));
}
}


参考的blog http://langyu.iteye.com/blog/684087 http://flychao88.iteye.com/blog/1540246 http://blog.chenlb.com/2009/11/memcached-consistent-hashing.html http://www.iteye.com/topic/611976
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: