memcached线程模型
2013-08-01 10:24
246 查看
1.
main函数中调用thread_init(),初始化setting.num_threads个worker线程以及一个主线程dispatcher_thread。
每个worker线程用pipe创建一个管道,并注册libevent事件,当管道的读端可以读时,就调用thread_libevent_process()函数。
thread_libevent_process()做的事情等下再说。
[cpp] view
plaincopy
void thread_init(int nthreads, struct event_base *main_base) {
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
//初始化dispatcher线程
dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
//初始化nthreads个worker线程
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
//每个线程打开一个管道
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
//注册libevent事件。当该线程的管道的读端可以读时,调用thread_libevent_process()
setup_thread(&threads[i]);
}
/* Create threads after we've done all the libevent setup. */
/* 创建worker线程,并让每个线程进入event_base_loop()循环。
前面的threads数组只是创建了nthreads个LIBEVENT_THREAD对象,
并初始化了libevent以及其他信息。并没有真正开始一个新的进程。
在create_worker里才真正调用了pthread_create,
并且每个进程都进入了event_base_loop()
*/
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
}
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();
/* Listen for notifications from other threads */
//当读端可以读时,调用thread_libevent_process()函数
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
/*初始化每个线程的new_conn_queue成员。
new_conn_queue成员是一个conn_queue指针,相当于一个队列,
记录分配到该线程的,等待new一个conn对象的那些item的信息。
每次调用thread_libevent_process()时,
就从该队列中取出一个item,然后建立一个conn*/
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->new_conn_queue);
}
static void create_worker(void *(*func)(void *), void *arg) {
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
event_base_loop(me->base, 0);
return NULL;
}
2.
main函数中,然后调用server_sockets,在指定的端口和ip地址上分别建立tcp和udp的监听。
然后
1) 如果是TCP,就调用conn_new(sfd, conn_listening, EV_READ|EV+PERSIST, 1, tcp_transport, main_base),该函数在main_base上建立libevent事件,当sfd可读时,调用event_handler(). 而event_handler()又调用drive_machine().
2) 如果是UDP,就调用dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, udp_transport),该函数用round-robin的方法找到一个worker线程,然后new一个CQ_ITEM对象,把该item对象放到worker线程的new_conn_queue队列中。然后,通知该worker线程。通知的方法是,往该线程的写端写一个字节,这样,该线程的读端就可读了。由于之前已经讲过,每个worker线程注册了libevent事件,当读端可读时,就调用thread_libevent_process()。
server_sockets()调用server_socket(), server_socket()先在指定端口和ip上建立socket,得到文件描述符sfd,然后bind,然后
[cpp] view
plaincopy
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
if ((sfd = new_socket(next)) == -1) {
...
}
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
...
}
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
...
}
if (IS_UDP(transport)) {
int c;
for (c = 0; c < settings.num_threads_per_udp; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
...
}
}
}
3. 继续2.1
如果是TCP,把conn的状态设为conn_listening, 设置libevent事件。当一个新的tcp connection到达时,进入drive_machine。
[cpp] view
plaincopy
static void drive_machine(conn *c) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen));
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
...
}
调用dispatch_conn_new,把新的tcp connection的sfd指派到某个worker线程上。
4. 继续2.2
注意server_socket中,当协议是UDP时,调用dispatch_new_conn,但是是在一个for循环中。就是说,对于一个udp的描述符,指派了settings.num_threads_per_udp个线程来监控该udp描述符。
这叫做“惊群”。当一个udp连接到达时,所有的libevent都能检测到该事件,但是只有一个线程调用recvfrom能返回数据。其他线程都返回失败。
udp用惊群而tcp不用的原因可能是:对于tcp,主线程(即dispatch_thread)会一直监控是否有新的tcp connection到达。如果到达,就会指派一个worker thread处理它。而对于udp,如果不用惊群,那么只有一个worker 线程监控udp 请求。因此,当该线程在处理一个udp请求时,其他的udp请求就不能得到及时处理。
另一个原因是:
“TCP是定然不能这样设计的,所以只能accept之后分发到特定线程,因为是字节流.
而UDP是包,随便哪个线程处理包1,再换个线程处理包2也是没有问题的.”
main函数中调用thread_init(),初始化setting.num_threads个worker线程以及一个主线程dispatcher_thread。
每个worker线程用pipe创建一个管道,并注册libevent事件,当管道的读端可以读时,就调用thread_libevent_process()函数。
thread_libevent_process()做的事情等下再说。
[cpp] view
plaincopy
void thread_init(int nthreads, struct event_base *main_base) {
threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));
//初始化dispatcher线程
dispatcher_thread.base = main_base;
dispatcher_thread.thread_id = pthread_self();
//初始化nthreads个worker线程
for (i = 0; i < nthreads; i++) {
int fds[2];
if (pipe(fds)) {
perror("Can't create notify pipe");
exit(1);
}
//每个线程打开一个管道
threads[i].notify_receive_fd = fds[0];
threads[i].notify_send_fd = fds[1];
//注册libevent事件。当该线程的管道的读端可以读时,调用thread_libevent_process()
setup_thread(&threads[i]);
}
/* Create threads after we've done all the libevent setup. */
/* 创建worker线程,并让每个线程进入event_base_loop()循环。
前面的threads数组只是创建了nthreads个LIBEVENT_THREAD对象,
并初始化了libevent以及其他信息。并没有真正开始一个新的进程。
在create_worker里才真正调用了pthread_create,
并且每个进程都进入了event_base_loop()
*/
for (i = 0; i < nthreads; i++) {
create_worker(worker_libevent, &threads[i]);
}
}
static void setup_thread(LIBEVENT_THREAD *me) {
me->base = event_init();
/* Listen for notifications from other threads */
//当读端可以读时,调用thread_libevent_process()函数
event_set(&me->notify_event, me->notify_receive_fd,
EV_READ | EV_PERSIST, thread_libevent_process, me);
event_base_set(me->base, &me->notify_event);
if (event_add(&me->notify_event, 0) == -1) {
fprintf(stderr, "Can't monitor libevent notify pipe\n");
exit(1);
}
/*初始化每个线程的new_conn_queue成员。
new_conn_queue成员是一个conn_queue指针,相当于一个队列,
记录分配到该线程的,等待new一个conn对象的那些item的信息。
每次调用thread_libevent_process()时,
就从该队列中取出一个item,然后建立一个conn*/
me->new_conn_queue = malloc(sizeof(struct conn_queue));
cq_init(me->new_conn_queue);
}
static void create_worker(void *(*func)(void *), void *arg) {
pthread_t thread;
pthread_attr_t attr;
int ret;
pthread_attr_init(&attr);
if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {
fprintf(stderr, "Can't create thread: %s\n",
strerror(ret));
exit(1);
}
}
static void *worker_libevent(void *arg) {
LIBEVENT_THREAD *me = arg;
event_base_loop(me->base, 0);
return NULL;
}
2.
main函数中,然后调用server_sockets,在指定的端口和ip地址上分别建立tcp和udp的监听。
然后
1) 如果是TCP,就调用conn_new(sfd, conn_listening, EV_READ|EV+PERSIST, 1, tcp_transport, main_base),该函数在main_base上建立libevent事件,当sfd可读时,调用event_handler(). 而event_handler()又调用drive_machine().
2) 如果是UDP,就调用dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST, UDP_READ_BUFFER_SIZE, udp_transport),该函数用round-robin的方法找到一个worker线程,然后new一个CQ_ITEM对象,把该item对象放到worker线程的new_conn_queue队列中。然后,通知该worker线程。通知的方法是,往该线程的写端写一个字节,这样,该线程的读端就可读了。由于之前已经讲过,每个worker线程注册了libevent事件,当读端可读时,就调用thread_libevent_process()。
server_sockets()调用server_socket(), server_socket()先在指定端口和ip上建立socket,得到文件描述符sfd,然后bind,然后
[cpp] view
plaincopy
static int server_socket(const char *interface,
int port,
enum network_transport transport,
FILE *portnumber_file) {
if ((sfd = new_socket(next)) == -1) {
...
}
if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {
...
}
if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {
...
}
if (IS_UDP(transport)) {
int c;
for (c = 0; c < settings.num_threads_per_udp; c++) {
/* this is guaranteed to hit all threads because we round-robin */
dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
UDP_READ_BUFFER_SIZE, transport);
}
} else {
if (!(listen_conn_add = conn_new(sfd, conn_listening,
EV_READ | EV_PERSIST, 1,
transport, main_base))) {
...
}
}
}
3. 继续2.1
如果是TCP,把conn的状态设为conn_listening, 设置libevent事件。当一个新的tcp connection到达时,进入drive_machine。
[cpp] view
plaincopy
static void drive_machine(conn *c) {
switch(c->state) {
case conn_listening:
addrlen = sizeof(addr);
sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen));
dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
DATA_BUFFER_SIZE, tcp_transport);
...
}
调用dispatch_conn_new,把新的tcp connection的sfd指派到某个worker线程上。
4. 继续2.2
注意server_socket中,当协议是UDP时,调用dispatch_new_conn,但是是在一个for循环中。就是说,对于一个udp的描述符,指派了settings.num_threads_per_udp个线程来监控该udp描述符。
这叫做“惊群”。当一个udp连接到达时,所有的libevent都能检测到该事件,但是只有一个线程调用recvfrom能返回数据。其他线程都返回失败。
udp用惊群而tcp不用的原因可能是:对于tcp,主线程(即dispatch_thread)会一直监控是否有新的tcp connection到达。如果到达,就会指派一个worker thread处理它。而对于udp,如果不用惊群,那么只有一个worker 线程监控udp 请求。因此,当该线程在处理一个udp请求时,其他的udp请求就不能得到及时处理。
另一个原因是:
“TCP是定然不能这样设计的,所以只能accept之后分发到特定线程,因为是字节流.
而UDP是包,随便哪个线程处理包1,再换个线程处理包2也是没有问题的.”
相关文章推荐
- Memcached源码分析--线程模型(二)
- Memcached源码分析--线程模型(三)
- Memcached源码分析(线程模型)
- 深入分析Memcached的线程接入模型---上
- Memcached源码分析之线程模型
- MEMCACHED(2) 网络线程模型
- (转载)Memcached源码分析(线程模型)
- memcached的线程模型
- memcached源码分析之线程模型
- 从源码角度看MySQL memcached plugin——2. 线程模型和连接的状态机
- 深入分析Memcached的线程接入模型---中
- Memcached源码分析(线程模型)
- memcached线程模型---main thread线程
- 深入分析Memcached的线程接入模型---下
- memcached的线程模型
- Memcached源码分析(线程模型)
- memcached线程模型
- memcached 线程处理模型
- Memcached源码分析之线程模型
- Memcached线程模型(1)