您的位置:首页 > 数据库 > Redis

C# Redis客户端对一致性hash算法的实现

2013-06-04 10:01 453 查看
这里介绍的是C#的redis客户端 ServiceStack 对一致性hash算法的实现.主要涉及到类如下:

ShardedConnectionPool
ShardedRedisClientManager
而ShardedConnectionPool又是继承自PooledRedisClientManager
ConsistentHash


关于一致性hash算法的用处和介绍,请看我以前的blog
http://blog.csdn.net/fenglvming/article/details/8892228
一致性hash的思路很简单:也就是,它将所有的可读写的redis服务器的ip,hash到了一个”环形列表”.在C#中是用SortedMap(查找的时候采用二分查找来获取值)来保存这些ip的hash值.Java的jedis客户端使用到了TreeMap(Java对红黑树的实现). 两者的查找的时间复杂度差不多,都可以看成是lgn.当要保存一个对象或者获取一个对象的时候,通过这个对象的key得到的hash值顺时针找到的第一个redis即为要存放或获取数据的目的或来源.我们先看看ShardedRedisClientManager的构造函数:

public ShardedRedisClientManager(params ShardedConnectionPool[] connectionPools)
{
if (connectionPools == null)
throw new ArgumentNullException("connection pools can not be null.");
List<KeyValuePair> pools = new List<KeyValuePair>();
foreach (var connectionPool in connectionPools)
{pools.Add(new KeyValuePair(connectionPool, connectionPool.weight));
}
_consistentHash = new ConsistentHash(pools);
}
看看它怎么对ShardedConnectionPool进行一致性hash的.
public ConsistentHash(IEnumerable<KeyValuePair> nodes, Func hashFunction)
{
if (hashFunction != null)
_hashFunction = hashFunction;//内置的MD5算法
foreach (var node in nodes)
AddTarget(node.Key, node.Value);
}
他将每一个redis节点都放置到通过AddTarget方法,加入到了SortedMap中.其中weight代表的是该redisPool的权重,replicas就好像是一致性hash中的虚拟节点的概念一样,也就是说1个redis实例,可能对应着weight*replicas个hash值.这样有助于数据的分散和均匀.
public void AddTarget(T node, int weight)
{
int repeat = weight > 0 ? weight * Replicas : Replicas;
for (int i = 0; i < repeat; i++)
{
string identifier = node.GetHashCode().ToString() + "-" + i;
ulong hashCode = _hashFunction(identifier);
_circle.Add(hashCode, node);
}
}
好了,这样我们就把我们的redis(如果权重都为1的话)和hash值做了一个一一对应.,每个redispool有replicas个hash值.下面我们来看看简单的获取流程.我们要通过redis获取值的话,首先我们要获得一个IRedisClient的对象来操作redis.如果我们用的是ShardedRedisClientManager的话,我们先获取到redispool,然后通过pool获得client.获取一个POOL的方法如下,还是在ConsistentHash类中:
public T GetTarget(string key)
{
ulong hash = _hashFunction(key);
ulong firstNode = ModifiedBinarySearch(_circle.Keys.ToArray(), hash);
return _circle[firstNode];
}
ModifiedBinarySearch
就是一个二分查找.但是和普通的二分查找意义不一样的是,普通的二分查找是找到一个与给定的值相等的值,返回其下标.这个二分查找是到一个下标,是传入的值顺时针遇到的第一个值.具体可以看代码:
public static ulong ModifiedBinarySearch(ulong[] sortedArray, ulong val)
{
int min = 0;
int max = sortedArray.Length - 1;
if (val < sortedArray[min] || val > sortedArray[max])
return sortedArray[0];
while (max - min > 1)
{    int mid = (max + min) / 2;
if (sortedArray[mid] >= val)
{max = mid; }else{min = mid; }
}
return sortedArray[max];
}

以上,redis的C#客户端的一致性hash算法的实现就大致如此了.

但是如果某个pool一旦不可达,它可不知道是否可达,所以它依然还是根据key找到了一个pool并返回一个client.

我做的修改是启动一个线程,定时去查看这些pool是否可达,如果不可达,就将其从环中删去.

并记录.同时,还会去判断那些不可达的pool是否可达了,如果变成可达了,那么就重新加入到环中.以下是ShardedRedisClientManager中新增的方法:

private List<KeyValuePair<ShardedConnectionPool, int>> removed
= new List<KeyValuePair<ShardedConnectionPool, int>>();

public void ValidShardingPool(List<KeyValuePair<ShardedConnectionPool, int>> pools)
{
var newpools = pools;
Thread t = new Thread(Func => {
while(true){
checkValid(newpools);
Thread.Sleep(60000);
checkInvalid();
Thread.Sleep(60000);
}
});
t.Start();
}

private void checkValid(List<KeyValuePair<ShardedConnectionPool, int>> pools)
{
foreach (var pool in pools)
{
var writeclient = pool.Key.GetClient();
var readclient = pool.Key.GetReadOnlyClient();
var nwclient = writeclient as RedisNativeClient;
var nrclient = readclient as RedisNativeClient;
bool needRemove = false;
try
{
if (!nwclient.Ping())
{
if (!nrclient.Ping())
{
needRemove = true;
}
}
}
catch (Exception e)
{
needRemove = true;
}

if (needRemove)
{
removed.Add(pool);
lock (_consistentHash)
{
_consistentHash.RemoveTarget(pool.Key, pool.Value);
}
}
}
}

public void checkInvalid() {
List<KeyValuePair<ShardedConnectionPool, int>> addBack =
new List<KeyValuePair<ShardedConnectionPool, int>>();
foreach (var pool in removed)
{
var writeclient = pool.Key.GetClient();
var readclient = pool.Key.GetReadOnlyClient();
var nwclient = writeclient as RedisNativeClient;
var nrclient = readclient as RedisNativeClient;
if (nwclient.Ping())
{
if (nrclient.Ping())
{
lock (_consistentHash)
{
addBack.Add(pool);
_consistentHash.AddTarget(pool.Key, pool.Value);
}
}
}
}

foreach (var pool in addBack)
{
removed.Remove(pool);
}
}


同时在ConsistendHash中增加一个方法:

public int GetRepeat(int weight)
{
return weight > 0 ? weight * Replicas : Replicas;
}

public string GetIdentifier(T node,int index) {
return node.GetHashCode().ToString() + "-" + index;
}

public ulong GetHashCode(string identifier) {
return _hashFunction(identifier);
}

public void RemoveTarget(T node ,int weight) {
// increase the replicas of the node by weight
int repeat = GetRepeat(weight);
for (int i = 0; i < repeat; i++)
{
string identifier = GetIdentifier(node, i);
ulong hashCode = GetHashCode(identifier);
if(_circle.ContainsKey(hashCode)){
_circle.Remove(hashCode);
}
}
}


不过我感觉这种方式不是正确处理redis服务器宕机的正确路子.

也希望得到大家的看法.如果让redis服务器宕机的影响到最小.

下面说说我对PooledRedisClientManager的一些修改.这个里面有两个属性,一个写客户端数组和一个读客户端数组.如果在默认情况下,这两个数组大小为10.

private RedisClient[] writeClients = new RedisClient[0];
private RedisClient[] readClients = new RedisClient[0];
我们有时候会遇到如下情况,如果某个redis新手的话,他会在一个for循环中不断的获取client.这样很容易就耗尽了这两个数组,并且这两个数组中的client都还在使用状态.那么会导致在循环到一定程序,获取到的client为null.我做了如下修改,引入了两个新的锁 变量来替换原有锁住上述两个数组对象的地方.
private object writeClientsLock = new object();
private object readClientsLock = new object();
protected int RedisMaxConnection = 320;
然后改进了GetInActiveWriteClient和GetInActiveReadClient.在后面加入了:
//do something to extends the capacity of the writes client;
//extends two times of the original size
int originalLength = writeClients.Length;
if (originalLength * 2 < RedisMaxConnection)
{
int length = writeClients.Length << 1;
var newwriteClients = new RedisClient[length];
Array.ConstrainedCopy(writeClients, 0, newwriteClients,
0, writeClients.Length);
writeClients = newwriteClients;
var ret = GetInActiveWriteClient();
return ret;
}
一旦发现writeClients数组长度没有达到320,那么就扩展其长度.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: