您的位置:首页 > 数据库 > Memcache

Memcached 源码分析--命令流程分析

2016-09-07 18:03 579 查看
一、执行命令

首先是启动memcached 自带参数如下:

<span style="font-size:18px;">-p <num> 设置TCP端口号(默认设置为: 11211)
-U <num> UDP监听端口(默认: 11211, 0 时关闭)
-l <ip_addr> 绑定地址(默认:所有都允许,无论内外网或者本机更换IP,有安全隐患,若设置为127.0.0.1就只能本机访问)
-c <num> max simultaneous connections (default: 1024)
-d 以daemon方式运行
-u <username> 绑定使用指定用于运行进程<username>
-m <num> 允许最大内存用量,单位M (默认: 64 MB)
-P <file> 将PID写入文件<file>,这样可以使得后边进行快速进程终止, 需要与-d 一起使用</span>

#$: ./usr/local/bin/memcached -d -u root -l 192.168.10.156 -m 2048 -p 12121

客户端通过网络方式连接:

telnet 192.168.10.156 12121

然后就可以操作命令、常见命令如下:

<span style="font-size:18px;">set
add
replace
get
delete</span>

格式如下:

<span style="font-size:18px;">command <key> <flags> <expiration time> <bytes>
<value>

参数说明如下:
command set/add/replace
key key 用于查找缓存值
flags 可以包括键值对的整型参数,客户机使用它存储关于键值对的额外信息
expiration time 在缓存中保存键值对的时间长度(以秒为单位,0 表示永远)
bytes 在缓存中存储的字节点
value 存储的值(始终位于第二行)</span>

二、命令执行流程代码分析

首先看一下工作线程中的命令数据结构:

/**

 * The structure representing a connection into memcached.

 */

typedef struct conn conn;

非常重要的几个参数:

char * rbuf:用于存储客户端数据报文中的命令。

int rsize:rbuf的大小。

char * rcurr:未解析的命令的字符指针。

int rbytes:为解析的命令的长度。

结构如下:

<span style="font-size:18px;">struct conn {
int sfd;
char *rbuf; /** buffer to read commands into */
char *rcurr; /** but if we parsed some already, this is where we stopped */
int rsize; /** total allocated size of rbuf */
int rbytes; /** how much data, starting from rcur, do we have unparsed */

/* data for the mwrite state */
struct iovec *iov;
int iovsize; /* number of elements allocated in iov[] */
int iovused; /* number of elements used in iov[] */

struct msghdr *msglist;
int msgsize; /* number of elements allocated in msglist[] */
int msgused; /* number of elements used in msglist[] */
int msgcurr; /* element in msglist[] being transmitted now */
int msgbytes; /* number of bytes in current msg */
LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
...
};</span>



状态机迁移: drive_machine(conn *c)



以上图相当有水平,引用作者 http://calixwu.com/ 上的、自已就不再画了。

以文字说明一下整体状态机流程:

1. 当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端新的连接及是否有可读的数据。

2. 当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个case状态。

3. memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中。

4. conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔)。

5. process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令process_*_command这一系列就是处理具体的命令逻辑的。

6. 我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接。

7. 进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态。

8. conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法回去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四步)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法。

9. conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192。

三、下面以代码简要分析一下

1、读写事件回调函数:event_handler,这个方法中最终调用的是drive_machine

void event_handler(const int fd, const short which, void *arg) {
conn* c = (conn *) arg;
drive_machine(c);
}

drive_machine:

drive_machine这个方法中,都是通过c->state来判断需要处理的逻辑。

conn_listening:监听状态

conn_waiting:等待状态

conn_read:读取状态

conn_parse_cmd:命令行解析

conn_mwrite:向客户端写数据

conn_new_cmd:解析新的命令

//
static void drive_machine(conn *c) {
bool stop = false;
while(!stop) {
switch (c->state) {
case conn_waiting:
// 通过update_event函数确认是否为读状态,如果是则切到conn_read
if (!update_event(c, EV_READ | EV_PERSIST)) {
conn_set_state(c, conn_closing);
}
conn_set_state(c, conn_read);
stop = true;
break;

case conn_read:
// 读取数据并根据read的情况切到不同状态、正常情况切到conn_parse_cmd
res = try_read_network(c);
switch (res) {
case READ_NO_DATA_RECEIVED:
conn_set_state(c, conn_waiting);
break;
case READ_DATA_RECEIVED:
conn_set_state(c, conn_parse_cmd);
break;
case READ_ERROR:
conn_set_state(c, conn_closing);
break;
case READ_MEMORY_ERROR: /* Failed to allocate more memory */
/* State already set by try_read_network */
break;
}
break;

case conn_parse_cmd:
// 读取命令并解析命令,如果数据不够则切到conn_waiting
if (try_read_command(c) == 0) {
/* we need more data! */
conn_set_state(c, conn_waiting);
}
break;

case conn_mwrite:
res = transmit(c);
switch(res){
case TRANSMIT_COMPLETE:
if (c->state == conn_mwrite) {
/* XXX: I don't know why this wasn't the general case */
if(c->protocol == binary_prot) {
conn_set_state(c, c->write_and_go);
} else {
// 命令回复完成后、又切换到conn_new_cmd处理剩余的命令参数
conn_set_state(c, conn_new_cmd);
}
}
}
break;

...
}
}
}

上面的逻辑主要反映了状态机的转换流程,下面重点看下数据处理这一块:

命令格式:set username zhuli\r\n get username \n

通过\n这个换行符来分隔数据报文中的命令。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整

才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。
static int try_read_command(conn *c) {
if (c->protocol == binary_prot) { // 二进制模式
dispatch_bin_command(c);
}else{
//查找命令中是否有\n,memcache的命令通过\n来分割
el = memchr(c->rcurr, '\n', c->rbytes);

//如果找到了\n,说明c->rcurr中有完整的命令了
cont = el + 1; //下一个命令开始的指针节点
//这边判断是否是\r\n,如果是\r\n,则el往前移一位
if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {
el--;
}
//然后将命令的最后一个字符用 \0(字符串结束符号)来分隔
*el = '\0';

//处理命令,c->rcurr就是命令
process_command(c, c->rcurr);

//移动到下一个命令的指针节点
c->rbytes -= (cont - c->rcurr);
c->rcurr = cont;
}
}

// 处理具体的命令。将命令分解后,分发到不同的具体操作中去
static void process_command(conn *c, char *command) {
token_t tokens[MAX_TOKENS];
// 拆分命令:将拆分出来的命令元素放进tokens的数组中
ntokens = tokenize_command(command, tokens, MAX_TOKENS);

// 分解出来的命令的第一个参数为操作方法
1、process_get_command(c, tokens, ntokens, false); // "get"/"bget"

2、process_update_command(c, tokens, ntokens, comm, false); // "add"/"set"/...

3、process_get_command(c, tokens, ntokens, true); // "gets"

...>> 4-n
}

这里以 get 命令走读下:
static inline void process_get_command(conn *c, token_t *tokens...){
it = item_get(key, nkey, c); // 内存存储快块取数据
if (it) { // 获取到了数据
/*
* Construct the response. Each hit adds three elements to the
* outgoing data list:
* "VALUE "
* key
* " " + flags + " " + data length + "\r\n" + data (with \r\n)
*/
// 构建初始化返回出去的数据结构
add_iov(c, "VALUE ", 6);
add_iov(c, ITEM_key(it), it->nkey);
add_iov(c, ITEM_suffix(it), it->nsuffix - 2);
add_iov(c, suffix, suffix_len);
add_iov(c, "END\r\n", 5);

// 最后切到 conn_mwrite 即调用 transmit 函数
conn_set_state(c, conn_mwrite);
}
}
/*
* Returns an item if it hasn't been marked as expired,
* lazy-expiring as needed.
*/
item *item_get(const char *key, const size_t nkey, conn *c) {
item *it;
uint32_t hv;
hv = hash(key, nkey);
item_lock(hv);
it = do_item_get(key, nkey, hv, c);
item_unlock(hv);
return it;
}

// 向客户端写数据。写完数据后,如果写失败,则关闭连接;如果写成功,则会将状态修改成conn_new_cmd,
// 继续解析c->rbuf中剩余的命令
static enum transmit_result transmit(conn *c) {
//msghdr 发送数据的结构
struct msghdr *m = &c->msglist[c->msgcurr];

//sendmsg 发送数据方法
res = sendmsg(c->sfd, m, 0);

...
}

对于剩余命令的处理:
//重新设置命令handler
static void reset_cmd_handler(conn *c) {
c->cmd = -1;
c->substate = bin_no_state;
if (c->item != NULL) {
item_remove(c->item);
c->item = NULL;
}
conn_shrink(c); //这个方法是检查c->rbuf容器的大小
//如果剩余未解析的命令 > 0的话,继续跳转到conn_parse_cmd解析命令
if (c->rbytes > 0) {
conn_set_state(c, conn_parse_cmd);
} else {
//如果命令都解析完成了,则继续等待新的数据到来
conn_set_state(c, conn_waiting);
}
}

/*
* Shrinks a connection's buffers if they're too big. This prevents
* periodic large "get" requests from permanently chewing lots of server
* memory.
*
* This should only be called in between requests since it can wipe output
* buffers!
*/
static void conn_shrink(conn *c) { // 检查rbuf的大小
if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
char *newbuf;

if (c->rcurr != c->rbuf)
memmove(c->rbuf, c->rcurr, (size_t)c->rbytes);

newbuf = (char *)realloc((void *)c->rbuf, DATA_BUFFER_SIZE);
if (newbuf) {
c->rbuf = newbuf;
c->rsize = DATA_BUFFER_SIZE;
}
c->rcurr = c->rbuf;
}
...
}
对于异步套接字编译就是 回调+状态机、一定要记下所有的状态。有几点要特别注意:

1、注册的事件处理函数不能堵塞或主动sleep、否则整个工作线程处于挂起状态。

2、单线程、但其内在的复杂性——将线性思维分解成一堆回调的负担(breaking up linear thought into a bucketload of callbacks)——仍然存在

3、对于每个事件的处理都需要维护一个状态、上下文是紧密相关的、代码编写时需要时刻注意小心。

4、注意epoll的工作模式:LT还是ET模式、一般是回调时尽量处理更多的数据包。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: