redis cluster集群的源码分析(2)
2016-12-22 17:39
656 查看
本文的分析主要介绍集群中的槽和集群中命令的执行。
一、集群中的槽
1、槽的基本结构信息
redis集群通过分片的方式来存储数据库中的键值对:集群的整个数据库被分为16384个槽,
数据库中的每个键属于这16384个槽的一个,每个节点可以处理0~16384个槽。当数据库中只有
所有的槽都有节点处理是,集群才是上线状态;反之,只要有一个槽没有节点处理,那集群就处
于下线状态。
clusterNode的slots是一个bit array,共包含16384个二进制位,以0为起始索引,16383为终止索引
对slots编码,并根据索引i上的二进制位判断是否处理槽i。
1)slots[i]为NULL,则该槽未进行指派
2)slots[i]为clusterNode指针,则该槽指派给clusterNode指针对应的节点
2、槽的指派
槽的相关处理主要有槽指派和重新分片。命令如下所示:
CLUSTER ADDSLOTS <slot> [slot] ...
二、集群中命令的执行
在集群中执行客户端发送的命令时,接收的命令的节点先需要判断集群状态,然后在去计算命令中要处理的key处于
哪个槽,并获取槽相应的节点。如果槽对应的节点是当前节点,就直接执行命令,反之,将指引客户端重定向到正确的
节点。
集群中的命令处理
一、集群中的槽
1、槽的基本结构信息
redis集群通过分片的方式来存储数据库中的键值对:集群的整个数据库被分为16384个槽,
数据库中的每个键属于这16384个槽的一个,每个节点可以处理0~16384个槽。当数据库中只有
所有的槽都有节点处理是,集群才是上线状态;反之,只要有一个槽没有节点处理,那集群就处
于下线状态。
struct clusterNode{ …… unsigned char slots[CLUSTER_SLOTS/8]; int numslots; …… };
clusterNode的slots是一个bit array,共包含16384个二进制位,以0为起始索引,16383为终止索引
对slots编码,并根据索引i上的二进制位判断是否处理槽i。
typedef struct clusterState { …… clusterNode *slots[CLUSTER_SLOTS]; …… } clusterState;clusterState的slots记录集群中所有槽的指派信息。
1)slots[i]为NULL,则该槽未进行指派
2)slots[i]为clusterNode指针,则该槽指派给clusterNode指针对应的节点
2、槽的指派
槽的相关处理主要有槽指派和重新分片。命令如下所示:
CLUSTER ADDSLOTS <slot> [slot] ...
void clusterCommand(client *c) { …… if ((!strcasecmp(c->argv[1]->ptr,"addslots") || !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) { int j, slot; unsigned char *slots = zmalloc(CLUSTER_SLOTS); int del = !strcasecmp(c->argv[1]->ptr,"delslots"); memset(slots,0,CLUSTER_SLOTS); ……//判断槽是否可处理,不可处理直接返回 for (j = 0; j < CLUSTER_SLOTS; j++) { if (slots[j]) { int retval; retval = del ? clusterDelSlot(j) :clusterAddSlot(myself,j); } } zfree(slots); clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } …… }//槽的添加 int clusterAddSlot(clusterNode *n, int slot) { if (server.cluster->slots[slot]) return C_ERR; clusterNodeSetSlotBit(n,slot);//更新clusterNode的slots server.cluster->slots[slot] = n;//更新clusterState的slots return C_OK; }
二、集群中命令的执行
在集群中执行客户端发送的命令时,接收的命令的节点先需要判断集群状态,然后在去计算命令中要处理的key处于
哪个槽,并获取槽相应的节点。如果槽对应的节点是当前节点,就直接执行命令,反之,将指引客户端重定向到正确的
节点。
集群中的命令处理
int processCommand(client *c) { …… if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) && !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0)) { int hashslot; if (server.cluster->state != CLUSTER_OK) {//集群状态为下线状态 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); return C_OK; } else { int error_code;//获取key所在的节点 clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code); if (n == NULL || n != server.cluster->myself) { clusterRedirectClient(c,n,hashslot,error_code); return C_OK; } } } …… }计算命令中要处理的key处于哪个槽,并获取槽相应的节点
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) { …… //检查这些命令的key是否在同一slot,并且此节点关联了这个slot for (i = 0; i < ms->count; i++) { …… mcmd = ms->commands[i].cmd; margc = ms->commands[i].argc; margv = ms->commands[i].argv; keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys); for (j = 0; j < numkeys; j++) { robj *thiskey = margv[keyindex[j]]; int thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); if (firstkey == NULL) { //事务的第一个命令的key,获取key的slot和node firstkey = thiskey; slot = thisslot; n = server.cluster->slots[slot]; if (n == NULL) {//key对应的槽没有节点指派 getKeysFreeResult(keyindex); if (error_code) *error_code = CLUSTER_REDIR_DOWN_UNBOUND; return NULL; }//判断是否在重新分片——key所在的槽要迁移到别的节点或者从别的节点迁移过来 if (n == myself && server.cluster->migrating_slots_to[slot] != NULL) { migrating_slot = 1; } else if (server.cluster->importing_slots_from[slot] != NULL) { importing_slot = 1; } } else {//用来确认所有的key是否都在同一个slot if (!equalStringObjects(firstkey,thiskey)) { if (slot != thisslot) { getKeysFreeResult(keyindex); if (error_code) *error_code = CLUSTER_REDIR_CROSS_SLOT; return NULL; } else { multiple_keys = 1; } } }//key是否已经迁移到别的节点或者还没从别的节点迁移过来 if ((migrating_slot || importing_slot) && lookupKeyRead(&server.db[0],thiskey) == NULL) { missing_keys++; } } getKeysFreeResult(keyindex); } if (n == NULL) return myself; if (hashslot) *hashslot = slot; if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand) return myself; //如果slot已经迁移到别的节点,返回CLUSTER_REDIR_ASK if (migrating_slot && missing_keys) { if (error_code) *error_code = CLUSTER_REDIR_ASK; return server.cluster->migrating_slots_to[slot]; } if (importing_slot && (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING)) { if (multiple_keys && missing_keys) { //如果slot还没从别的节点迁移过来,返回CLUSTER_REDIR_DOWN_UNBOUND if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; return NULL; } else { return myself; } } //我是slave,slot是在我的master,并且cmd是只读操作 if (c->flags & CLIENT_READONLY && cmd->flags & CMD_READONLY && nodeIsSlave(myself) && myself->slaveof == n) { return myself; } //slot不在此节点,返回CLUSTER_REDIR_MOVED if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; return n; }根据计算key对应的槽和节点时返回的错误,返回给客户端进行重定向
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { if (error_code == CLUSTER_REDIR_CROSS_SLOT) { addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n")); } else if (error_code == CLUSTER_REDIR_UNSTABLE) { addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n")); } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n")); } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n")); } else if (error_code == CLUSTER_REDIR_MOVED || error_code == CLUSTER_REDIR_ASK) { addReplySds(c,sdscatprintf(sdsempty(), "-%s %d %s:%d\r\n",(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", hashslot,n->ip,n->port)); } }
相关文章推荐
- redis cluster集群的源码分析(1)
- quartz集群调度机制调研及源码分析---转载
- RMI方式Ehcache集群的源码分析 【转】
- elasticsearch源码分析之集群管理(一)
- Spark集群启动之Master、Worker启动流程源码分析
- solrcloud集群启动管理过程基于源码的分析
- 深入分析redis cluster 集群安装配置详解
- Spark源码分析-spark集群启动及任务执行
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- Hadoop跨集群数据拷贝工具DISTCP内部源码实现分析
- spark core源码分析1 集群启动及任务提交过程
- MapReduce源码分析之新API作业提交(二):连接集群
- quartz集群调度机制调研及源码分析
- [转]RMI方式Ehcache集群的源码分析
- quartz集群调度机制调研及源码分析
- Spark的Master和Worker集群启动的源码分析
- 深入分析redis cluster 集群安装配置详解
- [转]RMI方式Ehcache集群的源码分析
- 深入分析redis cluster 集群安装配置详解
- quartz - 集群调度机制调研及源码分析