C# Redis客户端对一致性hash算法的实现
2013-06-04 10:01
453 查看
这里介绍的是C#的redis客户端 ServiceStack 对一致性hash算法的实现.主要涉及到类如下:
关于一致性hash算法的用处和介绍,请看我以前的blog
http://blog.csdn.net/fenglvming/article/details/8892228
一致性hash的思路很简单:也就是,它将所有的可读写的redis服务器的ip,hash到了一个”环形列表”.在C#中是用SortedMap(查找的时候采用二分查找来获取值)来保存这些ip的hash值.Java的jedis客户端使用到了TreeMap(Java对红黑树的实现). 两者的查找的时间复杂度差不多,都可以看成是lgn.当要保存一个对象或者获取一个对象的时候,通过这个对象的key得到的hash值顺时针找到的第一个redis即为要存放或获取数据的目的或来源.我们先看看ShardedRedisClientManager的构造函数:
就是一个二分查找.但是和普通的二分查找意义不一样的是,普通的二分查找是找到一个与给定的值相等的值,返回其下标.这个二分查找是到一个下标,是传入的值顺时针遇到的第一个值.具体可以看代码:
以上,redis的C#客户端的一致性hash算法的实现就大致如此了.
但是如果某个pool一旦不可达,它可不知道是否可达,所以它依然还是根据key找到了一个pool并返回一个client.
我做的修改是启动一个线程,定时去查看这些pool是否可达,如果不可达,就将其从环中删去.
并记录.同时,还会去判断那些不可达的pool是否可达了,如果变成可达了,那么就重新加入到环中.以下是ShardedRedisClientManager中新增的方法:
同时在ConsistendHash中增加一个方法:
不过我感觉这种方式不是正确处理redis服务器宕机的正确路子.
也希望得到大家的看法.如果让redis服务器宕机的影响到最小.
下面说说我对PooledRedisClientManager的一些修改.这个里面有两个属性,一个写客户端数组和一个读客户端数组.如果在默认情况下,这两个数组大小为10.
ShardedConnectionPool ShardedRedisClientManager 而ShardedConnectionPool又是继承自PooledRedisClientManager ConsistentHash
关于一致性hash算法的用处和介绍,请看我以前的blog
http://blog.csdn.net/fenglvming/article/details/8892228
一致性hash的思路很简单:也就是,它将所有的可读写的redis服务器的ip,hash到了一个”环形列表”.在C#中是用SortedMap(查找的时候采用二分查找来获取值)来保存这些ip的hash值.Java的jedis客户端使用到了TreeMap(Java对红黑树的实现). 两者的查找的时间复杂度差不多,都可以看成是lgn.当要保存一个对象或者获取一个对象的时候,通过这个对象的key得到的hash值顺时针找到的第一个redis即为要存放或获取数据的目的或来源.我们先看看ShardedRedisClientManager的构造函数:
public ShardedRedisClientManager(params ShardedConnectionPool[] connectionPools) { if (connectionPools == null) throw new ArgumentNullException("connection pools can not be null."); List<KeyValuePair> pools = new List<KeyValuePair>(); foreach (var connectionPool in connectionPools) {pools.Add(new KeyValuePair(connectionPool, connectionPool.weight)); } _consistentHash = new ConsistentHash(pools); }看看它怎么对ShardedConnectionPool进行一致性hash的.
public ConsistentHash(IEnumerable<KeyValuePair> nodes, Func hashFunction) { if (hashFunction != null) _hashFunction = hashFunction;//内置的MD5算法 foreach (var node in nodes) AddTarget(node.Key, node.Value); }他将每一个redis节点都放置到通过AddTarget方法,加入到了SortedMap中.其中weight代表的是该redisPool的权重,replicas就好像是一致性hash中的虚拟节点的概念一样,也就是说1个redis实例,可能对应着weight*replicas个hash值.这样有助于数据的分散和均匀.
public void AddTarget(T node, int weight) { int repeat = weight > 0 ? weight * Replicas : Replicas; for (int i = 0; i < repeat; i++) { string identifier = node.GetHashCode().ToString() + "-" + i; ulong hashCode = _hashFunction(identifier); _circle.Add(hashCode, node); } }好了,这样我们就把我们的redis(如果权重都为1的话)和hash值做了一个一一对应.,每个redispool有replicas个hash值.下面我们来看看简单的获取流程.我们要通过redis获取值的话,首先我们要获得一个IRedisClient的对象来操作redis.如果我们用的是ShardedRedisClientManager的话,我们先获取到redispool,然后通过pool获得client.获取一个POOL的方法如下,还是在ConsistentHash类中:
public T GetTarget(string key) { ulong hash = _hashFunction(key); ulong firstNode = ModifiedBinarySearch(_circle.Keys.ToArray(), hash); return _circle[firstNode]; }ModifiedBinarySearch
就是一个二分查找.但是和普通的二分查找意义不一样的是,普通的二分查找是找到一个与给定的值相等的值,返回其下标.这个二分查找是到一个下标,是传入的值顺时针遇到的第一个值.具体可以看代码:
public static ulong ModifiedBinarySearch(ulong[] sortedArray, ulong val) { int min = 0; int max = sortedArray.Length - 1; if (val < sortedArray[min] || val > sortedArray[max]) return sortedArray[0]; while (max - min > 1) { int mid = (max + min) / 2; if (sortedArray[mid] >= val) {max = mid; }else{min = mid; } } return sortedArray[max]; }
以上,redis的C#客户端的一致性hash算法的实现就大致如此了.
但是如果某个pool一旦不可达,它可不知道是否可达,所以它依然还是根据key找到了一个pool并返回一个client.
我做的修改是启动一个线程,定时去查看这些pool是否可达,如果不可达,就将其从环中删去.
并记录.同时,还会去判断那些不可达的pool是否可达了,如果变成可达了,那么就重新加入到环中.以下是ShardedRedisClientManager中新增的方法:
private List<KeyValuePair<ShardedConnectionPool, int>> removed = new List<KeyValuePair<ShardedConnectionPool, int>>(); public void ValidShardingPool(List<KeyValuePair<ShardedConnectionPool, int>> pools) { var newpools = pools; Thread t = new Thread(Func => { while(true){ checkValid(newpools); Thread.Sleep(60000); checkInvalid(); Thread.Sleep(60000); } }); t.Start(); } private void checkValid(List<KeyValuePair<ShardedConnectionPool, int>> pools) { foreach (var pool in pools) { var writeclient = pool.Key.GetClient(); var readclient = pool.Key.GetReadOnlyClient(); var nwclient = writeclient as RedisNativeClient; var nrclient = readclient as RedisNativeClient; bool needRemove = false; try { if (!nwclient.Ping()) { if (!nrclient.Ping()) { needRemove = true; } } } catch (Exception e) { needRemove = true; } if (needRemove) { removed.Add(pool); lock (_consistentHash) { _consistentHash.RemoveTarget(pool.Key, pool.Value); } } } } public void checkInvalid() { List<KeyValuePair<ShardedConnectionPool, int>> addBack = new List<KeyValuePair<ShardedConnectionPool, int>>(); foreach (var pool in removed) { var writeclient = pool.Key.GetClient(); var readclient = pool.Key.GetReadOnlyClient(); var nwclient = writeclient as RedisNativeClient; var nrclient = readclient as RedisNativeClient; if (nwclient.Ping()) { if (nrclient.Ping()) { lock (_consistentHash) { addBack.Add(pool); _consistentHash.AddTarget(pool.Key, pool.Value); } } } } foreach (var pool in addBack) { removed.Remove(pool); } }
同时在ConsistendHash中增加一个方法:
public int GetRepeat(int weight) { return weight > 0 ? weight * Replicas : Replicas; } public string GetIdentifier(T node,int index) { return node.GetHashCode().ToString() + "-" + index; } public ulong GetHashCode(string identifier) { return _hashFunction(identifier); } public void RemoveTarget(T node ,int weight) { // increase the replicas of the node by weight int repeat = GetRepeat(weight); for (int i = 0; i < repeat; i++) { string identifier = GetIdentifier(node, i); ulong hashCode = GetHashCode(identifier); if(_circle.ContainsKey(hashCode)){ _circle.Remove(hashCode); } } }
不过我感觉这种方式不是正确处理redis服务器宕机的正确路子.
也希望得到大家的看法.如果让redis服务器宕机的影响到最小.
下面说说我对PooledRedisClientManager的一些修改.这个里面有两个属性,一个写客户端数组和一个读客户端数组.如果在默认情况下,这两个数组大小为10.
private RedisClient[] writeClients = new RedisClient[0]; private RedisClient[] readClients = new RedisClient[0];我们有时候会遇到如下情况,如果某个redis新手的话,他会在一个for循环中不断的获取client.这样很容易就耗尽了这两个数组,并且这两个数组中的client都还在使用状态.那么会导致在循环到一定程序,获取到的client为null.我做了如下修改,引入了两个新的锁 变量来替换原有锁住上述两个数组对象的地方.
private object writeClientsLock = new object(); private object readClientsLock = new object(); protected int RedisMaxConnection = 320;然后改进了GetInActiveWriteClient和GetInActiveReadClient.在后面加入了:
//do something to extends the capacity of the writes client; //extends two times of the original size int originalLength = writeClients.Length; if (originalLength * 2 < RedisMaxConnection) { int length = writeClients.Length << 1; var newwriteClients = new RedisClient[length]; Array.ConstrainedCopy(writeClients, 0, newwriteClients, 0, writeClients.Length); writeClients = newwriteClients; var ret = GetInActiveWriteClient(); return ret; }一旦发现writeClients数组长度没有达到320,那么就扩展其长度.
相关文章推荐
- c#实现redis客户端(一)
- Redis系列(一):c#实现redis客户端
- c#实现redis客户端(一)
- c#实现redis客户端(一)
- 自己动手写Redis客户端(C#实现)3 - GET请求和批量回复
- 自己动手写Redis客户端(C#实现)2 - SET请求和状态回复(set)
- 自己动手写Redis客户端(C#实现)4 - 整数回复
- 分布式对象缓存系统在Linux下的安装与客户端C#实现
- c#中异步基于消息通信的完成端口的TCP/IP协议的组件实现(源代码) 客户端
- .NET客户端实现Redis中的管道(PipeLine)与事物(Transactions)
- C#下如何实现服务器+客户端的聊天程序
- C#实现组播源及客户端
- C#应用视频教程1.2 Socket通信客户端实现
- 客户端实现蓝牙接收(C#)知识总结
- redis---一致性hash特性及java实现
- 基于C#的socket编程的TCP异步实现 ,包含服务器端与客户端源代码
- Unity使用C#实现简单Scoket连接及服务端与客户端通讯
- 对一致性Hash算法,java代码实现的深入研究
- Redis Java客户端jedis工具类以及Redis实现的跨jvm的锁
- C#编写Windows服务程序 (服务端),客户端使用 消息队列 实现淘宝 订单全链路效果