您的位置:首页 > 数据库 > Redis

redis cluster集群的源码分析(2)

2016-12-22 17:39 656 查看
        本文的分析主要介绍集群中的槽和集群中命令的执行。

一、集群中的槽

1、槽的基本结构信息

        redis集群通过分片的方式来存储数据库中的键值对:集群的整个数据库被分为16384个槽,

数据库中的每个键属于这16384个槽的一个,每个节点可以处理0~16384个槽。当数据库中只有

所有的槽都有节点处理是,集群才是上线状态;反之,只要有一个槽没有节点处理,那集群就处

于下线状态。   

struct clusterNode{
……
unsigned char slots[CLUSTER_SLOTS/8];
int numslots;
……
};


       clusterNode的slots是一个bit array,共包含16384个二进制位,以0为起始索引,16383为终止索引

对slots编码,并根据索引i上的二进制位判断是否处理槽i。



typedef struct clusterState {
……
clusterNode *slots[CLUSTER_SLOTS];
……
} clusterState;
       clusterState的slots记录集群中所有槽的指派信息。
1)slots[i]为NULL,则该槽未进行指派

2)slots[i]为clusterNode指针,则该槽指派给clusterNode指针对应的节点

2、槽的指派

     槽的相关处理主要有槽指派和重新分片。命令如下所示:

CLUSTER ADDSLOTS <slot> [slot] ... 

void clusterCommand(client *c) {
……
if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
!strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
{
int j, slot;
unsigned char *slots = zmalloc(CLUSTER_SLOTS);
int del = !strcasecmp(c->argv[1]->ptr,"delslots");
memset(slots,0,CLUSTER_SLOTS);
……//判断槽是否可处理,不可处理直接返回
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (slots[j]) {
int retval;
retval = del ? clusterDelSlot(j) :clusterAddSlot(myself,j);
}
}
zfree(slots);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
……
}//槽的添加
int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);//更新clusterNode的slots
server.cluster->slots[slot] = n;//更新clusterState的slots
return C_OK;
}


二、集群中命令的执行

        在集群中执行客户端发送的命令时,接收的命令的节点先需要判断集群状态,然后在去计算命令中要处理的key处于

哪个槽,并获取槽相应的节点。如果槽对应的节点是当前节点,就直接执行命令,反之,将指引客户端重定向到正确的

节点。



     集群中的命令处理

int processCommand(client *c) {
……
if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
if (server.cluster->state != CLUSTER_OK) {//集群状态为下线状态
clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
return C_OK;
} else {
int error_code;//获取key所在的节点
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
}
……
}
    计算命令中要处理的key处于哪个槽,并获取槽相应的节点

clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
……
//检查这些命令的key是否在同一slot,并且此节点关联了这个slot
for (i = 0; i < ms->count; i++) {
……
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
//事务的第一个命令的key,获取key的slot和node
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
if (n == NULL) {//key对应的槽没有节点指派
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}//判断是否在重新分片——key所在的槽要迁移到别的节点或者从别的节点迁移过来
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {//用来确认所有的key是否都在同一个slot
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
multiple_keys = 1;
}
}
}//key是否已经迁移到别的节点或者还没从别的节点迁移过来
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
}
if (n == NULL) return myself;
if (hashslot) *hashslot = slot;
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
//如果slot已经迁移到别的节点,返回CLUSTER_REDIR_ASK
if (migrating_slot && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
//如果slot还没从别的节点迁移过来,返回CLUSTER_REDIR_DOWN_UNBOUND
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
//我是slave,slot是在我的master,并且cmd是只读操作
if (c->flags & CLIENT_READONLY &&
cmd->flags & CMD_READONLY &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
//slot不在此节点,返回CLUSTER_REDIR_MOVED
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
   根据计算key对应的槽和节点时返回的错误,返回给客户端进行重定向

void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == CLUSTER_REDIR_UNSTABLE) {
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
} else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
} else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息