您的位置:首页 > 理论基础 > 计算机网络

Memcached源码分析之网络连接建立

2014-03-20 19:11 441 查看
接着上一篇博客继续分析,上一篇博客请参考
Memcached源码阅读之网络监听的建立,这篇博客主要分析TCP的连接建立(从前面的代码分析可以看出,这个过程是由主线程驱动的),UDP没有连接建立的过程,所以之间进行连接分发,我们后续分析,现在直接上代码进行讲解。

conn *conn_new(const int sfd, enum conn_states init_state, const int event_flags,
const int read_buffer_size, enum network_transport transport,
struct event_base *base)
{
conn *c = conn_from_freelist();//获取一个空闲连接,conn是Memcached内部对网络连接的一个封装

if (NULL == c)//如果没有空闲的连接
{
if (!(c = (conn *) calloc(1, sizeof(conn))))//申请空间
{
fprintf(stderr, "calloc()\n");
return NULL;
}MEMCACHED_CONN_CREATE(c);
//进行一些初始化
c->rbuf = c->wbuf = 0;
c->ilist = 0;
c->suffixlist = 0;
c->iov = 0;
c->msglist = 0;
c->hdrbuf = 0;

c->rsize = read_buffer_size;
c->wsize = DATA_BUFFER_SIZE;
c->isize = ITEM_LIST_INITIAL;
c->suffixsize = SUFFIX_LIST_INITIAL;
c->iovsize = IOV_LIST_INITIAL;
c->msgsize = MSG_LIST_INITIAL;
c->hdrsize = 0;
//每个conn都自带读入和输出缓冲区,在进行网络收发数据时,特别方便
c->rbuf = (char *) malloc((size_t) c->rsize);
c->wbuf = (char *) malloc((size_t) c->wsize);
c->ilist = (item **) malloc(sizeof(item *) * c->isize);
c->suffixlist = (char **) malloc(sizeof(char *) * c->suffixsize);
c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize);
c->msglist = (struct msghdr *) malloc(
sizeof(struct msghdr) * c->msgsize);

if (c->rbuf == 0 || c->wbuf == 0 || c->ilist == 0 || c->iov == 0
|| c->msglist == 0 || c->suffixlist == 0)
{
conn_free(c);
fprintf(stderr, "malloc()\n");
return NULL;
}

STATS_LOCK();
stats.conn_structs++;//统计变量更新
STATS_UNLOCK();
}

c->transport = transport;
c->protocol = settings.binding_protocol;

if (!settings.socketpath)
{
c->request_addr_size = sizeof(c->request_addr);
}
else
{
c->request_addr_size = 0;
}
//输出一些日志信息
if (settings.verbose > 1)
{
if (init_state == conn_listening)
{
fprintf(stderr, "<%d server listening (%s)\n", sfd,
prot_text(c->protocol));
}
else if (IS_UDP(transport))
{
fprintf(stderr, "<%d server listening (udp)\n", sfd);
}
else if (c->protocol == negotiating_prot)
{
fprintf(stderr, "<%d new auto-negotiating client connection\n",
sfd);
}
else if (c->protocol == ascii_prot)
{
fprintf(stderr, "<%d new ascii client connection.\n", sfd);
}
else if (c->protocol == binary_prot)
{
fprintf(stderr, "<%d new binary client connection.\n", sfd);
}
else
{
fprintf(stderr, "<%d new unknown (%d) client connection\n", sfd,
c->protocol);
assert(false);
}
}

c->sfd = sfd;
c->state = init_state;
c->rlbytes = 0;
c->cmd = -1;
c->rbytes = c->wbytes = 0;
c->wcurr = c->wbuf;
c->rcurr = c->rbuf;
c->ritem = 0;
c->icurr = c->ilist;
c->suffixcurr = c->suffixlist;
c->ileft = 0;
c->suffixleft = 0;
c->iovused = 0;
c->msgcurr = 0;
c->msgused = 0;

c->write_and_go = init_state;
c->write_and_free = 0;
c->item = 0;

c->noreply = false;
        //建立sfd描述符上面的event事件,事件回调函数为event_handler
event_set(&c->event, sfd, event_flags, event_handler, (void *) c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;
if (event_add(&c->event, 0) == -1)
{      
//如果建立libevent事件失败,将创建的conn添加到空闲列表中
if (conn_add_to_freelist(c))
{
conn_free(c);
}
perror("event_add");
return NULL;
}

STATS_LOCK();
stats.curr_conns++;//统计信息更新
stats.total_conns++;
STATS_UNLOCK();

MEMCACHED_CONN_ALLOCATE(c->sfd);

return c;
}
//获得conn
conn *conn_from_freelist()
{
    conn *c;

    pthread_mutex_lock(&conn_lock);//操作链表,加锁,保持同步
    if (freecurr > 0)//freecurr为静态全局变量
    {  
        //freeconns是在Memcached启动时初始化的
        c = freeconns[--freecurr];
    }
    else//没有conn
    {
        c = NULL;
    }
    pthread_mutex_unlock(&conn_lock);

    return c;
}
//添加conn到空闲链表中
bool conn_add_to_freelist(conn *c)
{
    bool ret = true;
    pthread_mutex_lock(&conn_lock);
    if (freecurr < freetotal)//freeconns还有空间
    {
        freeconns[freecurr++] = c;//直接添加
        ret = false;
    }
    else
    {  
        //没有多余空间,进行扩容,按目前容量的2倍进行扩容
        size_t newsize = freetotal * 2;
        conn **new_freeconns = realloc(freeconns, sizeof(conn *) * newsize);
        if (new_freeconns)
        {
            freetotal = newsize;
            freeconns = new_freeconns;
            freeconns[freecurr++] = c;
            ret = false;
        }
    }
    pthread_mutex_unlock(&conn_lock);
    return ret;
}
//libevent事件回调函数的处理,回调函数被调用时,表明Memcached监听的端口号有网络事件到了
void event_handler(const int fd, const short which, void *arg)
{
conn *c;

c = (conn *) arg;
assert(c != NULL);

c->which = which;

//这种情况应该很少出现
if (fd != c->sfd)
{
    if (settings.verbose > 0)
        fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");
    conn_close(c);
    return;
}
//进入业务处理状态机
drive_machine(c);

return;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息