您的位置:首页 > 数据库 > Memcache

memcached线程模型

2017-06-01 16:05 148 查看
直接上图:



memcached使用多线程模型,一个master线程,多个worker线程,master和worker通过管道实现通信。

每个worker线程有一个队列,队列元素为CQ_ITEM。

typedef struct {
pthread_t thread_id;        /* unique ID of this thread */
struct event_base *base;    /* libevent handle this thread uses */
struct event notify_event;  /* listen event for notify pipe */
int notify_receive_fd;      /* receiving end of notify pipe */
int notify_send_fd;         /* sending end of notify pipe */
struct thread_stats stats;  /* Stats generated by this thread */
struct conn_queue *new_conn_queue; /* queue of new connections to handle */
cache_t *suffix_cache;      /* suffix cache */
logger *l;                  /* logger buffer */
void *lru_bump_buf;         /* async LRU bump buffer */
} LIBEVENT_THREAD;

/* An item in the connection queue. */
typedef struct conn_queue_item CQ_ITEM;
struct conn_queue_item {
int               sfd;
enum conn_states  init_state;
int               event_flags;
int               read_buffer_size;
enum network_transport     transport;
conn *c;
CQ_ITEM          *next;
};

/* A connection queue. */
typedef struct conn_queue CQ;
struct conn_queue {
CQ_ITEM *head;
CQ_ITEM *tail;
pthread_mutex_t lock;
};


memcached使用libevent实现事件监听,master和worker各有一个event_base。

起初,master负责监听连接的到来,worker线程负责监听管道的读事件。

当有一个连接到来,master线程accept该连接,并将conn_fd封装成一个CQ_ITEM对象放入一个worker线程的队列中,同时向管道写入数据触发管道读事件。

对应worker线程执行管道读事件的回调函数thread_libevent_process:

/*
* Processes an incoming "handle a new connection" item. This is called when
* input arrives on the libevent wakeup pipe.
*/
static void thread_libevent_process(int fd, short which, void *arg) {
LIBEVENT_THREAD *me = arg;
CQ_ITEM *item;
char buf[1];
unsigned int timeout_fd;

if (read(fd, buf, 1) != 1) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read from libevent pipe\n");
return;
}

switch (buf[0]) {
case 'c':
item = cq_pop(me->new_conn_queue);

if (NULL != item) {
conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
item->read_buffer_size, item->transport,
me->base);
if (c == NULL) {
if (IS_UDP(item->transport)) {
fprintf(stderr, "Can't listen for events on UDP socket\n");
exit(1);
} else {
if (settings.verbose > 0) {
fprintf(stderr, "Can't listen for events on fd %d\n",
item->sfd);
}
close(item->sfd);
}
} else {
c->thread = me;
}
cqi_free(item);
}
break;
case 'r':
item = cq_pop(me->new_conn_queue);

if (NULL != item) {
conn_worker_readd(item->c);
cqi_free(item);
}
break;
/* we were told to pause and report in */
case 'p':
register_thread_initialized();
break;
/* a client socket timed out */
case 't':
if (read(fd, &timeout_fd, sizeof(timeout_fd)) != sizeof(timeout_fd)) {
if (settings.verbose > 0)
fprintf(stderr, "Can't read timeout fd from libevent pipe\n");
return;
}
conn_close_idle(conns[timeout_fd]);
break;
}
}


在conn_new中,将conn_fd的读事件添加进自己的event_base中。

至此,worker线程开始监听连接上的I/O事件。

参考资料:

Memcached源码分析之线程模型

p.p1 { margin: 0.0px 0.0px 0.0px 0.0px; font: 14.0px Menlo; color: #ffffff }
span.s1 { }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: