您的位置:首页 > 数据库 > Memcache

memcached 源代码阅读笔记(4)- SET 操作分析

2011-12-07 16:33 661 查看
memcached 源代码阅读笔记- SET 操作分析
输入
set yzn 32 0 5
hell1

进入static void process_command(conn *c, char *command) 函数
进入如下分支
if (ntokens == 6 &&
((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) ||
(strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)) )) {

process_update_command(c, tokens, ntokens, comm, false);

}
可以看到调用 process_update_command函数

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {
char *key;
size_t nkey;
int flags;
time_t exptime;
int vlen;
uint64_t req_cas_id;
item *it;

assert(c != NULL);

if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}

key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;

flags = strtoul(tokens[2].value, NULL, 10);
exptime = strtol(tokens[3].value, NULL, 10);
vlen = strtol(tokens[4].value, NULL, 10);

// does cas value exist?
if(handle_cas)
{
req_cas_id = strtoull(tokens[5].value, NULL, 10);
}

if(errno == ERANGE || ((flags == 0 || exptime == 0) && errno == EINVAL)) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}

if (settings.detail_enabled) {
stats_prefix_record_set(key);
}

if (settings.managed) {
int bucket = c->bucket;
if (bucket == -1) {
out_string(c, "CLIENT_ERROR no BG data in managed mode");
return;
}
c->bucket = -1;
if (buckets[bucket] != c->gen) {
out_string(c, "ERROR_NOT_OWNER");
return;
}
}

it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);

if (it == 0) {
if (! item_size_ok(nkey, flags, vlen + 2))
out_string(c, "SERVER_ERROR object too large for cache");
else
out_string(c, "SERVER_ERROR out of memory");
/* swallow the data line */
c->write_and_go = conn_swallow;
c->sbytes = vlen + 2;
return;
}
if(handle_cas)
it->cas_id = req_cas_id;

c->item = it;
c->ritem = ITEM_data(it);
c->rlbytes = it->nbytes;
c->item_comm = comm;
conn_set_state(c, conn_nread);
}
可以看到命令格式就是
cmdname key flags exptime vlen
key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;

flags = strtoul(tokens[2].value, NULL, 10);
exptime = strtol(tokens[3].value, NULL, 10);
vlen = strtol(tokens[4].value, NULL, 10);

it = item_alloc(key, nkey, flags, realtime(exptime), vlen+2);
为item *do_item_alloc(char *key, const size_t nkey, const int flags, const rel_time_t exptime, const int nbytes) {
uint8_t nsuffix;
item *it;
char suffix[40];
size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);

unsigned int id = slabs_clsid(ntotal);
if (id == 0)
return 0;

it = slabs_alloc(ntotal);
if (it == 0) {
int tries = 50;
item *search;

/* If requested to not push old items out of cache when memory runs out,
* we're out of luck at this point...
*/

if (settings.evict_to_free == 0) return NULL;

/*
* try to get one off the right LRU
* don't necessariuly unlink the tail because it may be locked: refcount>0
* search up from tail an item with refcount==0 and unlink it; give up after 50
* tries
*/

if (id > LARGEST_ID) return NULL;
if (tails[id] == 0) return NULL;

for (search = tails[id]; tries > 0 && search != NULL; tries--, search=search->prev) {
if (search->refcount == 0) {
if (search->exptime == 0 || search->exptime > current_time) {
STATS_LOCK();
stats.evictions++;
STATS_UNLOCK();
}
do_item_unlink(search);
break;
}
}
it = slabs_alloc(ntotal);
if (it == 0) return NULL;
}

assert(it->slabs_clsid == 0);

it->slabs_clsid = id;

assert(it != heads[it->slabs_clsid]);

it->next = it->prev = it->h_next = 0;
it->refcount = 1; /* the caller will have a reference */
DEBUG_REFCNT(it, '*');
it->it_flags = 0;
it->nkey = nkey;
it->nbytes = nbytes;
strcpy(ITEM_key(it), key);
it->exptime = exptime;
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
it->nsuffix = nsuffix;
return it;
}
static size_t item_make_header(const uint8_t nkey, const int flags, const int nbytes,
char *suffix, uint8_t *nsuffix) {
/* suffix is defined at 40 chars elsewhere.. */
*nsuffix = (uint8_t) snprintf(suffix, 40, " %d %d\r\n", flags, nbytes - 2);
return sizeof(item) + nkey + *nsuffix + nbytes;
}

选择一个合适的slabs
unsigned int slabs_clsid(const size_t size) {
int res = POWER_SMALLEST;

if (size == 0)
return 0;
while (size > slabclass[res].size)
if (res++ == power_largest) /* won't fit in the biggest slab */
return 0;
return res;
}

it = slabs_alloc(ntotal);
void *do_slabs_alloc(const size_t size) {
slabclass_t *p;

unsigned int id = slabs_clsid(size);
if (id < POWER_SMALLEST || id > power_largest)
return NULL;

p = &slabclass[id];
assert(p->sl_curr == 0 || ((item *)p->slots[p->sl_curr - 1])->slabs_clsid == 0);

#ifdef USE_SYSTEM_MALLOC
if (mem_limit && mem_malloced + size > mem_limit)
return 0;
mem_malloced += size;
return malloc(size);
#endif

/* fail unless we have space at the end of a recently allocated page,
we have something on our freelist, or we could allocate a new page */
if (! (p->end_page_ptr != 0 || p->sl_curr != 0 || do_slabs_newslab(id) != 0))
return 0;

/* return off our freelist, if we have one */
if (p->sl_curr != 0)
return p->slots[--p->sl_curr];

/* if we recently allocated a whole page, return from that */
if (p->end_page_ptr) {
void *ptr = p->end_page_ptr;
if (--p->end_page_free != 0) {
(char*)(p->end_page_ptr) += p->size;
} else {
p->end_page_ptr = 0;
}
return ptr;
}

return NULL; /* shouldn't ever get here */
}

static slabclass_t slabclass[POWER_LARGEST + 1];
#define POWER_SMALLEST 1
#define POWER_LARGEST 200
#define POWER_BLOCK 1048576
#define CHUNK_ALIGN_BYTES (sizeof(void *))

typedef struct {
unsigned int size; /* sizes of items */
unsigned int perslab; /* how many items per slab *//

void **slots; /* list of item ptrs */
unsigned int sl_total; /* size of previous array */
unsigned int sl_curr; /* first free slot */

void *end_page_ptr; /* pointer to next free item at end of page, or 0 */
unsigned int end_page_free; /* number of items remaining at end of last alloced page */

unsigned int slabs; /* how many slabs were allocated for this class */

void **slab_list; /* array of slab pointers */
unsigned int list_size; /* size of prev array */

unsigned int killing; /* index+1 of dying slab, or zero if none */
} slabclass_t;

跟踪代码可知p此时
- p 0x0046ebac {size=88 perslab=11915 slots=0x02231e70 ...} slabclass_t *
size 88 unsigned int
perslab 11915 unsigned int
slots 0x02231e70 void * *
sl_total 16 unsigned int
sl_curr 1 unsigned int
end_page_ptr 0x022f00d0 void *
end_page_free 11913 unsigned int
slabs 1 unsigned int
slab_list 0x02231e18 void * *
list_size 16 unsigned int
killing 0 unsigned int

static int do_slabs_newslab(const unsigned int id) {
slabclass_t *p = &slabclass[id];
#ifdef ALLOW_SLABS_REASSIGN
int len = POWER_BLOCK;
#else
int len = p->size * p->perslab;
#endif
char *ptr;

if (mem_limit && mem_malloced + len > mem_limit && p->slabs > 0)
return 0;

if (grow_slab_list(id) == 0) return 0;

ptr = malloc((size_t)len);
if (ptr == 0) return 0;

memset(ptr, 0, (size_t)len);
p->end_page_ptr = ptr;
p->end_page_free = p->perslab;

p->slab_list[p->slabs++] = ptr;
mem_malloced += len;
return 1;
}

static int grow_slab_list (const unsigned int id) {
slabclass_t *p = &slabclass[id];
if (p->slabs == p->list_size) {
size_t new_size = (p->list_size != 0) ? p->list_size * 2 : 16;
void *new_list = realloc(p->slab_list, new_size * sizeof(void *));
if (new_list == 0) return 0;
p->list_size = new_size;
p->slab_list = new_list;
}
return 1;
}

自此完成一个item的内存分配
item的结构如下:
typedef struct _stritem {
struct _stritem *next;
struct _stritem *prev;
struct _stritem *h_next; /* hash chain next */
rel_time_t time; /* least recent access */
rel_time_t exptime; /* expire time */
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
uint64_t cas_id; /* the CAS identifier */
void * end[];
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;

strcpy(ITEM_key(it), key);
#define ITEM_key(item) ((char*)&((item)->end[0]))

memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
#define ITEM_suffix(item) ((char*) &((item)->end[0]) + (item)->nkey + 1)

c->item = it;
c->ritem = ITEM_data(it);
#define ITEM_data(item) ((char*) &((item)->end[0]) + (item)->nkey + 1 + (item)->nsuffix)
c->rlbytes = it->nbytes;

static void complete_nread(conn *c) {
item *it;
int comm;
int ret;
assert(c != NULL);

comm = c->item_comm;
it = c->item;

STATS_LOCK();
stats.set_cmds++;
STATS_UNLOCK();

if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
ret = store_item(it, comm);
if (ret == 1)
out_string(c, "STORED");
else if(ret == 2)
out_string(c, "EXISTS");
else if(ret == 3)
out_string(c, "NOT_FOUND");
else
out_string(c, "NOT_STORED");
}

item_remove(c->item); /* release the c->item reference */
c->item = 0;
}

ret = store_item(it, comm);
int do_store_item(item *it, int comm) {
char *key = ITEM_key(it);
bool delete_locked = false;
item *old_it = do_item_get_notedeleted(key, it->nkey, &delete_locked);
int stored = 0;

item *new_it = NULL;
int flags;

if (old_it != NULL && comm == NREAD_ADD) {
/* add only adds a nonexistent item, but promote to head of LRU */
do_item_update(old_it);
} else if (!old_it && (comm == NREAD_REPLACE
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
/* replace only replaces an existing value; don't store */
} else if (delete_locked && (comm == NREAD_REPLACE || comm == NREAD_ADD
|| comm == NREAD_APPEND || comm == NREAD_PREPEND))
{
/* replace and add can't override delete locks; don't store */
} else if (comm == NREAD_CAS) {
/* validate cas operation */
if (delete_locked)
old_it = do_item_get_nocheck(key, it->nkey);

if(old_it == NULL) {
// LRU expired
stored = 3;
}
else if(it->cas_id == old_it->cas_id) {
// cas validates
do_item_replace(old_it, it);
stored = 1;
}
else
{
stored = 2;
}
} else {
/*
* Append - combine new and old record into single one. Here it's
* atomic and thread-safe.
*/

if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {

/* we have it and old_it here - alloc memory to hold both */
/* flags was already lost - so recover them from ITEM_suffix(it) */

flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime, it->nbytes + old_it->nbytes - 2 /* CRLF */);

if (new_it == NULL) {
/* SERVER_ERROR out of memory */
return 0;
}

/* copy data from it and old_it to new_it */

if (comm == NREAD_APPEND) {
memcpy(ITEM_data(new_it), ITEM_data(old_it), old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */, ITEM_data(it), it->nbytes);
} else {
/* NREAD_PREPEND */
memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */, ITEM_data(old_it), old_it->nbytes);
}

it = new_it;
}

/* "set" commands can override the delete lock
window... in which case we have to find the old hidden item
that's in the namespace/LRU but wasn't returned by
item_get.... because we need to replace it */
if (delete_locked)
old_it = do_item_get_nocheck(key, it->nkey);

if (old_it != NULL)
do_item_replace(old_it, it);
else
do_item_link(it);

stored = 1;
}

if (old_it != NULL)
do_item_remove(old_it); /* release our reference */
if (new_it != NULL)
do_item_remove(new_it);

return stored;
}

if (old_it != NULL)
do_item_replace(old_it, it);

int do_item_replace(item *it, item *new_it) {
assert((it->it_flags & ITEM_SLABBED) == 0);

do_item_unlink(it);
return do_item_link(new_it);
}

int do_item_link(item *it) {
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
assert(it->nbytes < (1024 * 1024)); /* 1MB max size */
it->it_flags |= ITEM_LINKED;
it->time = current_time;
assoc_insert(it);

STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
STATS_UNLOCK();

/* Allocate a new CAS ID on link. */
it->cas_id = get_cas_id();

item_link_q(it);

return 1;
}

/* Note: this isn't an assoc_update. The key must not already exist to call this */

int assoc_insert(item *it) {
uint32_t hv;
unsigned int oldbucket;

assert(assoc_find(ITEM_key(it), it->nkey) == 0); /* shouldn't have duplicately named things defined */

hv = hash(ITEM_key(it), it->nkey, 0);
if (expanding &&
(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)
{
it->h_next = old_hashtable[oldbucket];
old_hashtable[oldbucket] = it;
} else {
it->h_next = primary_hashtable[hv & hashmask(hashpower)];
primary_hashtable[hv & hashmask(hashpower)] = it;
}

hash_items++;
if (! expanding && hash_items > (hashsize(hashpower) * 3) / 2) {
assoc_expand();
}

return 1;
}

看到这个函数可以看出内部的hash表采用开链存储法。

当hash_items > 桶的个数的1.5倍的时候,就扩张hash表。

/* grows the hashtable to the next power of 2. */
static void assoc_expand(void) {
old_hashtable = primary_hashtable;

primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *));
if (primary_hashtable) {
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion starting\n");
hashpower++;
expanding = true;
expand_bucket = 0;
do_assoc_move_next_bucket();
} else {
primary_hashtable = old_hashtable;
/* Bad news, but we can keep running. */
}
}

/* migrates the next bucket to the primary hashtable if we're expanding. */
void do_assoc_move_next_bucket(void) {
item *it, *next;
int bucket;

if (expanding) {
for (it = old_hashtable[expand_bucket]; NULL != it; it = next) {
next = it->h_next;

bucket = hash(ITEM_key(it), it->nkey, 0) & hashmask(hashpower);
it->h_next = primary_hashtable[bucket];
primary_hashtable[bucket] = it;
}

old_hashtable[expand_bucket] = NULL;

expand_bucket++;
if (expand_bucket == hashsize(hashpower - 1)) {
expanding = false;
free(old_hashtable);
if (settings.verbose > 1)
fprintf(stderr, "Hash table expansion done\n");
}
}
}

static void item_link_q(item *it) { /* item is the new head */
item **head, **tail;
/* always true, warns: assert(it->slabs_clsid <= LARGEST_ID); */
assert((it->it_flags & ITEM_SLABBED) == 0);

head = &heads[it->slabs_clsid];
tail = &tails[it->slabs_clsid];
assert(it != *head);
assert((*head && *tail) || (*head == 0 && *tail == 0));
it->prev = 0;
it->next = *head;
if (it->next) it->next->prev = it;
*head = it;
if (*tail == 0) *tail = it;
sizes[it->slabs_clsid]++;
return;
}

放到如下的LRU列表的头位置
static item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];
static unsigned int sizes[LARGEST_ID];
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: