Memcached源码分析--线程模型(三)
2012-01-05 13:56
726 查看
原文:http://www.iteye.com/topic/344172
最后看看memcached网络事件处理的最核心部分- drive_machine
需要铭记于心的是drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine
首先大家不到被while循环误导(大部分做java的同学都会马上联想到是个周而复始的loop)其实while通常满足一个case后就会break了,这里用while是考虑到垂直触发方式下,必须读到EWOULDBLOCK错误才可以
言归正传,drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把该conn结构作为参数传递过来,就是该方法的形参
memcached里连接的状态通过一个enum声明
实际对于case conn_listening:这种情况是主线程自己处理的,workers线程永远不会执行此分支
我们看到主线程进行了accept后调用了
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, false);
这个函数就是通知workers线程的地方,看看
可以清楚的看到,主线程首先创建了一个新的CQ_ITEM,然后通过round robin策略选择了一个thread并通过cq_push将这个CQ_ITEM放入了该线程的CQ队列里,那么对应的workers线程是怎么知道的呢
就是通过这个write(threads[thread].notify_send_fd, "", 1)向该线程管道写了1字节数据,则该线程的libevent立即回调了thread_libevent_process方法(上面已经描述过)
然后那个线程取出item,注册读时间,当该条连接上有数据时,最终也会回调drive_machine方法,也就是drive_machine方法的 case conn_read:等全部是workers处理的,主线程只处理conn_listening 建立连接这个
这部分代码确实比较多,没法全部贴出来,请大家参考源码,最新版本1.2.6,我省去了很多优化的地方
比如,每个CQ_ITEM被malloc时会一次malloc很多个,以减小碎片的产生等等细节。
Memcached源码分析--线程模型(一)
Memcached源码分析--线程模型(二)
最后看看memcached网络事件处理的最核心部分- drive_machine
需要铭记于心的是drive_machine是多线程环境执行的,主线程和workers都会执行drive_machine
static void drive_machine(conn *c) { bool stop = false; int sfd, flags = 1; socklen_t addrlen; struct sockaddr_storage addr; int res; assert(c != NULL); while (!stop) { switch(c->state) { case conn_listening: addrlen = sizeof(addr); if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) { //省去n多错误情况处理 break; } if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { perror("setting O_NONBLOCK"); close(sfd); break; } dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, DATA_BUFFER_SIZE, false); break; case conn_read: if (try_read_command(c) != 0) { continue; } ....//省略 } }
首先大家不到被while循环误导(大部分做java的同学都会马上联想到是个周而复始的loop)其实while通常满足一个case后就会break了,这里用while是考虑到垂直触发方式下,必须读到EWOULDBLOCK错误才可以
言归正传,drive_machine主要是通过当前连接的state来判断该进行何种处理,因为通过libevent注册了读写时间后回调的都是这个核心函数,所以实际上我们在注册libevent相应事件时,会同时把事件状态写到该conn结构体里,libevent进行回调时会把该conn结构作为参数传递过来,就是该方法的形参
memcached里连接的状态通过一个enum声明
enum conn_states { conn_listening, /** the socket which listens for connections */ conn_read, /** reading in a command line */ conn_write, /** writing out a simple response */ conn_nread, /** reading in a fixed number of bytes */ conn_swallow, /** swallowing unnecessary bytes w/o storing */ conn_closing, /** closing this connection */ conn_mwrite, /** writing out many items sequentially */ };
实际对于case conn_listening:这种情况是主线程自己处理的,workers线程永远不会执行此分支
我们看到主线程进行了accept后调用了
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, false);
这个函数就是通知workers线程的地方,看看
void dispatch_conn_new(int sfd, int init_state, int event_flags, int read_buffer_size, int is_udp) { CQ_ITEM *item = cqi_new(); int thread = (last_thread + 1) % settings.num_threads; last_thread = thread; item->sfd = sfd; item->init_state = init_state; item->event_flags = event_flags; item->read_buffer_size = read_buffer_size; item->is_udp = is_udp; cq_push(&threads[thread].new_conn_queue, item); MEMCACHED_CONN_DISPATCH(sfd, threads[thread].thread_id); if (write(threads[thread].notify_send_fd, "", 1) != 1) { perror("Writing to thread notify pipe"); } }
可以清楚的看到,主线程首先创建了一个新的CQ_ITEM,然后通过round robin策略选择了一个thread并通过cq_push将这个CQ_ITEM放入了该线程的CQ队列里,那么对应的workers线程是怎么知道的呢
就是通过这个write(threads[thread].notify_send_fd, "", 1)向该线程管道写了1字节数据,则该线程的libevent立即回调了thread_libevent_process方法(上面已经描述过)
然后那个线程取出item,注册读时间,当该条连接上有数据时,最终也会回调drive_machine方法,也就是drive_machine方法的 case conn_read:等全部是workers处理的,主线程只处理conn_listening 建立连接这个
这部分代码确实比较多,没法全部贴出来,请大家参考源码,最新版本1.2.6,我省去了很多优化的地方
比如,每个CQ_ITEM被malloc时会一次malloc很多个,以减小碎片的产生等等细节。
Memcached源码分析--线程模型(一)
Memcached源码分析--线程模型(二)
相关文章推荐
- Memcached源码分析--线程模型(二)
- Memcached源码分析(线程模型)
- Memcached源码分析(线程模型)
- Memcached源码分析(线程模型)
- (转载)Memcached源码分析(线程模型)
- memcached源码分析之线程模型
- Memcached源码分析(线程模型)
- Memcached源码分析之线程模型
- Memcached源码分析(线程模型)
- memcached源码分析之线程模型 【转】
- Memcached源码分析(线程模型)
- Memcached源码分析之线程模型
- Memcached源码分析(线程模型)
- Hbase WAL线程模型源码分析
- 【Netty源码分析】Reactor线程模型
- 转:Memcached 线程部分源码分析
- 从源码角度看MySQL memcached plugin——2. 线程模型和连接的状态机
- PHP源码分析之线程安全模型
- quartz源码分析——执行引擎和线程模型
- 深入分析Memcached的线程接入模型---上