您的位置:首页 > 理论基础 > 计算机网络

libuv学习笔记(12)

2016-06-16 12:49 477 查看

libuv学习笔记(12)

uv_tcp_t数据结构与相关函数(1)

数据结构

typedef struct uv_tcp_s uv_tcp_t;
struct uv_tcp_s {
UV_HANDLE_FIELDS//uv_handle_t的成员
UV_STREAM_FIELDS//stream的成员
//UV_TCP_PRIVATE_FIELDS展开如下:
SOCKET socket;
int delayed_error;
union {
struct {
uv_tcp_accept_t* accept_reqs;//接受请求列表
unsigned int processed_accepts;
uv_tcp_accept_t* pending_accepts;//等待处理的接受请求
LPFN_ACCEPTEX func_acceptex;//accetpex的函数指针
} serv;
struct {
uv_buf_t read_buffer; //读取数据的缓存
LPFN_CONNECTEX func_connectex;//connectex函数指针
} conn;
} tcp;
};


相关的请求

typedef struct uv_tcp_accept_s {
UV_REQ_FIELDS//uv_req_t的成员
SOCKET accept_socket;
char accept_buffer[sizeof(struct sockaddr_storage) * 2 + 32];
HANDLE event_handle;
HANDLE wait_handle;
struct uv_tcp_accept_s* next_pending;
} uv_tcp_accept_t;


相关函数

初始化。导出函数,在uv.h中声明,在tcp.c中定义

int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* handle) {
return uv_tcp_init_ex(loop, handle, AF_UNSPEC);//
}


初始化扩展函数。导出函数,在uv.h中声明,在tcp.c中定义

int uv_tcp_init_ex(uv_loop_t* loop, uv_tcp_t* handle, unsigned int flags) {
int domain;
//使用flags的低八位
domain = flags & 0xFF;
//只能是一下三种中的一种,其中AF_UNSPEC并不会做任何操作
if (domain != AF_INET && domain != AF_INET6 && domain != AF_UNSPEC)
return UV_EINVAL;
if (flags & ~0xFF)//高八位有值,返回错误
return UV_EINVAL;
//初始化stream,handle加入loop的handle列表
uv_stream_init(loop, (uv_stream_t*) handle, UV_TCP);
handle->tcp.serv.accept_reqs = NULL;
handle->tcp.serv.pending_accepts = NULL;
handle->socket = INVALID_SOCKET;
handle->reqs_pending = 0;
handle->tcp.serv.func_acceptex = NULL;
handle->tcp.conn.func_connectex = NULL;
handle->tcp.serv.processed_accepts = 0;
handle->delayed_error = 0;

//在下面的流程中如果发生错误,那么需要移除loop中的本handle
if (domain != AF_UNSPEC) {
SOCKET sock;
DWORD err;
sock = socket(domain, SOCK_STREAM, 0);//新建socket
if (sock == INVALID_SOCKET) {//失败
err = WSAGetLastError();
QUEUE_REMOVE(&handle->handle_queue);
return uv_translate_sys_error(err);
}
//将申请的socket与loop的iocp端口联系起来
err = uv_tcp_set_socket(handle->loop, handle, sock, domain, 0);
if (err) {//失败
closesocket(sock);
QUEUE_REMOVE(&handle->handle_queue);
return uv_translate_sys_error(err);
}

}
return 0;
}


将tcp socket与iocp端口绑定

static int uv_tcp_set_socket(uv_loop_t* loop,
uv_tcp_t* handle,
SOCKET socket,
int family,
int imported) {
DWORD yes = 1;
int non_ifs_lsp;
int err;
if (handle->socket != INVALID_SOCKET)
return UV_EBUSY;
//将socket设置为非阻塞模式
if (ioctlsocket(socket, FIONBIO, &yes) == SOCKET_ERROR) {
return WSAGetLastError();
}
//使socket句柄无法被继承
if (!SetHandleInformation((HANDLE) socket, HANDLE_FLAG_INHERIT, 0))
return GetLastError();
//与iocp端口关联起来,并将socket作为key
if (CreateIoCompletionPort((HANDLE)socket,
loop->iocp,
(ULONG_PTR)socket,
0) == NULL) {
//在uv_tcp_open中为打开一个已有socket中,imported为1
if (imported) {
handle->flags |= UV_HANDLE_EMULATE_IOCP;
} else {
return GetLastError();
}
}

if (family == AF_INET6) {
//uv_tcp_non_ifs_lsp_ipv6 = 1表示IPPROTO_IP协议使用真正地操作系统句柄,没有lsp封装
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv6;
} else {
//同理
non_ifs_lsp = uv_tcp_non_ifs_lsp_ipv4;
}

if (pSetFileCompletionNotificationModes &&
!(handle->flags & UV_HANDLE_EMULATE_IOCP) && !non_ifs_lsp) {
if (pSetFileCompletionNotificationModes((HANDLE) socket,
FILE_SKIP_SET_EVENT_ON_HANDLE |
FILE_SKIP_COMPLETION_PORT_ON_SUCCESS))//如果操作立刻完成,不再向iocp发送通知
{
handle->flags |= UV_HANDLE_SYNC_BYPASS_IOCP;
} else if (GetLastError() != ERROR_INVALID_FUNCTION) {
return GetLastError();
}
}
//取消使用Nagle算法
if (handle->flags & UV_HANDLE_TCP_NODELAY) {
err = uv__tcp_nodelay(handle, socket, 1);
if (err)
return err;
}
//使用保持存活
if (handle->flags & UV_HANDLE_TCP_KEEPALIVE) {
err = uv__tcp_keepalive(handle, socket, 1, 60);
if (err)
return err;
}
handle->socket = socket;

if (family == AF_INET6) {
handle->flags |= UV_HANDLE_IPV6;
} else {
assert(!(handle->flags & UV_HANDLE_IPV6));
}

return 0;
}


通过一个socket作为uv_tcp_t。导出函数,在uv.h中声明,在tcp.c中定义

int uv_tcp_open(uv_tcp_t* handle, uv_os_sock_t sock) {
WSAPROTOCOL_INFOW protocol_info;
int opt_len;
int err;

//获取地址的类型
opt_len = (int) sizeof protocol_info;
//获取协议信息
if (getsockopt(sock,
SOL_SOCKET,
SO_PROTOCOL_INFOW,
(char*) &protocol_info,
&opt_len) == SOCKET_ERROR) {
return uv_translate_sys_error(GetLastError());
}
//将socket与loop的iocp端口联系起来
//根据handle的标记设置socket。
err = uv_tcp_set_socket(handle->loop,
handle,
sock,
protocol_info.iAddressFamily,
1);
if (err) {
return uv_translate_sys_error(err);
}

return 0;
}


获取地址。导出函数,在uv.h中声明,在tcp.c中定义

int uv_tcp_getsockname(const uv_tcp_t* handle,
struct sockaddr* name,
int* namelen)
{
int result;
if (handle->socket == INVALID_SOCKET) {
return UV_EINVAL;
}
if (handle->delayed_error) {//之前的操作出现错误,比如bind操作
return uv_translate_sys_error(handle->delayed_error);
}
result = getsockname(handle->socket, name, namelen);//调用API
if (result != 0) {
return uv_translate_sys_error(WSAGetLastError());
}
return 0;
}


获取连接对象的地址。导出函数,在uv.h中声明,在tcp.c中定义

int uv_tcp_getpeername(const uv_tcp_t* handle,
struct sockaddr* name,
int* namelen) {
int result;

if (handle->socket == INVALID_SOCKET) {
return UV_EINVAL;
}

if (handle->delayed_error) {
return uv_translate_sys_error(handle->delayed_error);
}

result = getpeername(handle->socket, name, namelen);//调用API
if (result != 0) {
return uv_translate_sys_error(WSAGetLastError());
}
return 0;
}


建立连接。导出函数,在uv.h中声明,在tcp.c中定义

int uv_tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
uv_connect_cb cb) {
unsigned int addrlen;
if (handle->type != UV_TCP)
return UV_EINVAL;
if (addr->sa_family == AF_INET)
addrlen = sizeof(struct sockaddr_in);
else if (addr->sa_family == AF_INET6)
addrlen = sizeof(struct sockaddr_in6);
else
return UV_EINVAL;
return uv__tcp_connect(req, handle, addr, addrlen, cb);//调用内部函数处理
}


connect的内部处理

int uv__tcp_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
int err;
err = uv_tcp_try_connect(req, handle, addr, addrlen, cb);
if (err)
return uv_translate_sys_error(err);
return 0;
}


static int uv_tcp_try_connect(uv_connect_t* req,
uv_tcp_t* handle,
const struct sockaddr* addr,
unsigned int addrlen,
uv_connect_cb cb) {
uv_loop_t* loop = handle->loop;
const struct sockaddr* bind_addr;
BOOL success;
DWORD bytes;
int err;
if (handle->delayed_error) {
return handle->delayed_error;
}
//没有绑定本地地址,使用默认地址绑定
if (!(handle->flags & UV_HANDLE_BOUND)) {
if (addrlen == sizeof(uv_addr_ip4_any_)) {
bind_addr = (const struct sockaddr*) &uv_addr_ip4_any_;
} else if (addrlen == sizeof(uv_addr_ip6_any_)) {
bind_addr = (const struct sockaddr*) &uv_addr_ip6_any_;
} else {
abort();
}
err = uv_tcp_try_bind(handle, bind_addr, addrlen, 0);
if (err)
return err;
if (handle->delayed_error)
return handle->delayed_error;
}

if (!handle->tcp.conn.func_connectex) {
//根据具体的socket获取connectex函数指针
if (!uv_get_connectex_function(handle->socket, &handle->tcp.conn.func_connectex))
{
return WSAEAFNOSUPPORT;
}
}
//初始化connect请求
uv_req_init(loop, (uv_req_t*) req);
req->type = UV_CONNECT;
req->handle = (uv_stream_t*) handle;
req->cb = cb;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));
//调用connectex函数,异步链接,成功或者失败会通知iocp端口
success = handle->tcp.conn.func_connectex(handle->socket,
addr,
addrlen,
NULL,
0,
&bytes,
&req->u.io.overlapped);

if (UV_SUCCEEDED_WITHOUT_IOCP(success)) {
//未能成功,直接将请求添加到loop的请求列表进行处理
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(success)) {
//通过iocp处理,req将会在收到iocp消息时添加到loop的请求列表
handle->reqs_pending++;
REGISTER_HANDLE_REQ(loop, handle, req);
} else {
return WSAGetLastError();
}
return 0;
}


对于通过iocp的connect请求处理,在uv_run轮询时,会获取连接的消息,并通过overlapped获取对应的req,将其添加到loop的请求列表,在下一个循环迭代loop会调用uv_process_reqs进行处理

case UV_CONNECT:
DELEGATE_STREAM_REQ(loop, (uv_connect_t*) req, connect, handle);
break;


最终调用uv_process_tcp_connect_req,处理tcp connect请求

void uv_process_tcp_connect_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_connect_t* req) {
int err;
assert(handle->type == UV_TCP);
//handle的活动计数减一,如果为零则停止handle(loop活动handle减一,handle状态变为非Active)
//loop的active_req列表中删除req
//因为connect请求到此处处理完就结束了,而对应的uv_tcp_t则应该回归停止状态
UNREGISTER_HANDLE_REQ(loop, handle, req);
err = 0;
if (REQ_SUCCESS(req)) {//链接成功,更新socket状态
if (setsockopt(handle->socket,
SOL_SOCKET,
SO_UPDATE_CONNECT_CONTEXT,
NULL,
0) == 0) {
uv_connection_init((uv_stream_t*)handle);
handle->flags |= UV_HANDLE_READABLE | UV_HANDLE_WRITABLE;
loop->active_tcp_streams++;//活动的tcp流计数加一
} else {
err = WSAGetLastError();
}
} else {
err = GET_REQ_SOCK_ERROR(req);
}
req->cb(req, uv_translate_sys_error(err));//调用回调
//DECREASE_PENDING_REQ_COUNT(handle);展开:
do {
assert(handle->reqs_pending > 0);
handle->reqs_pending--;/等待处理的请求数减一
if (handle->flags & UV__HANDLE_CLOSING &&
handle->reqs_pending == 0) {//正在关闭状态
uv_want_endgame(loop, (uv_handle_t*)handle);
}
} while (0)
}


读取数据的请求。导出函数,在uv.h中声明,在stream.c中定义

//对于uv_tcp_t类型的stream,调用下面的处理
switch (handle->type) {
case UV_TCP:
err = uv_tcp_read_start((uv_tcp_t*)handle, alloc_cb, read_cb);
break;


uv_tcp_read_start,tcp开始读取

int uv_tcp_read_start(uv_tcp_t* handle, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
uv_loop_t* loop = handle->loop;
handle->flags |= UV_HANDLE_READING;//状态变为reading
handle->read_cb = read_cb;
handle->alloc_cb = alloc_cb;
//activecnt++,如果之前activecnt为0,那么就start  handle
INCREASE_ACTIVE_COUNT(loop, handle);
//如果读取被中断,那么就再次开始
if (!(handle->flags & UV_HANDLE_READ_PENDING)) {
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&
!handle->read_req.event_handle) {
//模拟iocp的情况下,创建事件
handle->read_req.event_handle = CreateEvent(NULL, 0, 0, NULL);
if (!handle->read_req.event_handle) {
uv_fatal_error(GetLastError(), "CreateEvent");
}
}
//将读取请求放到队列
uv_tcp_queue_read(loop, handle);
}
return 0;
}


读取请求放到队列

static void uv_tcp_queue_read(uv_loop_t* loop, uv_tcp_t* handle) {
uv_read_t* req;
uv_buf_t buf;
int result;
DWORD bytes, flags;
assert(handle->flags & UV_HANDLE_READING);
assert(!(handle->flags & UV_HANDLE_READ_PENDING));//在等待读取的过程中不应该再次发起读取请
//求
req = &handle->read_req;
memset(&req->u.io.overlapped, 0, sizeof(req->u.io.overlapped));

//预分配读取缓存,目前uv_active_tcp_streams_threshold值为0
if (loop->active_tcp_streams < uv_active_tcp_streams_threshold) {
handle->flags &= ~UV_HANDLE_ZERO_READ;
handle->alloc_cb((uv_handle_t*) handle, 65536, &handle->tcp.conn.read_buffer);
if (handle->tcp.conn.read_buffer.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &handle->tcp.conn.read_buffer);
return;
}
assert(handle->tcp.conn.read_buffer.base != NULL);
buf = handle->tcp.conn.read_buffer;
}
else {//没有预分配
handle->flags |= UV_HANDLE_ZERO_READ;
buf.base = (char*) &uv_zero_;
buf.len = 0;
}
//初始化重叠结构体
memset(&(req->u.io.overlapped), 0, sizeof(req->u.io.overlapped));
if (handle->flags & UV_HANDLE_EMULATE_IOCP) {//非原生socket等因素导致与iocp端口绑定失败的情况下
assert(req->event_handle);
req->u.io.overlapped.hEvent = (HANDLE) ((ULONG_PTR) req->event_handle | 1);
}
flags = 0;
//异步接收数据
result = WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
&req->u.io.overlapped,
NULL);

if (UV_SUCCEEDED_WITHOUT_IOCP(result == 0)) {
//不通过iocp处理请求,直接添加到loop的请求列表
handle->flags |= UV_HANDLE_READ_PENDING;
req->u.io.overlapped.InternalHigh = bytes;
handle->reqs_pending++;
uv_insert_pending_req(loop, (uv_req_t*)req);
} else if (UV_SUCCEEDED_WITH_IOCP(result == 0)) {
//通过iocp处理请求
handle->flags |= UV_HANDLE_READ_PENDING;//修改状态
handle->reqs_pending++;//活动请求加一
if (handle->flags & UV_HANDLE_EMULATE_IOCP &&//socket与iocp绑定失败
req->wait_handle == INVALID_HANDLE_VALUE &&
!RegisterWaitForSingleObject(&req->wait_handle,//注册等待事件
req->event_handle, post_completion, (void*) req,
INFINITE, WT_EXECUTEINWAITTHREAD)) {
SET_REQ_ERROR(req, GetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);//读请求添加到loop的请求列表
}
} else {
//记录错误信息
SET_REQ_ERROR(req, WSAGetLastError());
uv_insert_pending_req(loop, (uv_req_t*)req);
handle->reqs_pending++;
}
}


loop对读请求的处理最终会调用到以下函数:

void uv_process_tcp_read_req(uv_loop_t* loop, uv_tcp_t* handle,
uv_req_t* req) {
DWORD bytes, flags, err;
uv_buf_t buf;
assert(handle->type == UV_TCP);
handle->flags &= ~UV_HANDLE_READ_PENDING;//去掉等待处理标记
if (!REQ_SUCCESS(req)) {
//读取发生错误
if ((handle->flags & UV_HANDLE_READING) ||
!(handle->flags & UV_HANDLE_ZERO_READ))
{
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);//活动计数减一
buf = (handle->flags & UV_HANDLE_ZERO_READ) ?
uv_buf_init(NULL, 0) : handle->tcp.conn.read_buffer;
err = GET_REQ_SOCK_ERROR(req);
if (err == WSAECONNABORTED) {
err = WSAECONNRESET;
}
//调用读取回调,传入错误信息
handle->read_cb((uv_stream_t*)handle,
uv_translate_sys_error(err),
&buf);
}
} else {//成功
if (!(handle->flags & UV_HANDLE_ZERO_READ)) {
if (req->u.io.overlapped.InternalHigh > 0) {
//读取成功
handle->read_cb((uv_stream_t*)handle,
req->u.io.overlapped.InternalHigh,
&handle->tcp.conn.read_buffer);
//如果当前获取的数据长度与缓存长度相同,继续读取
if (req->u.io.overlapped.InternalHigh < handle->tcp.conn.read_buffer.len) {
goto done;
}
} else {
//链接失败
if (handle->flags & UV_HANDLE_READING) {
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
}
handle->flags &= ~UV_HANDLE_READABLE;
buf.base = 0;
buf.len = 0;
handle->read_cb((uv_stream_t*)handle, UV_EOF, &handle->tcp.conn.read_buffer);
goto done;
}
}
//持续读取
while (handle->flags & UV_HANDLE_READING) {
handle->alloc_cb((uv_handle_t*) handle, 65536, &buf);//调用分配内存的回调
if (buf.len == 0) {
handle->read_cb((uv_stream_t*) handle, UV_ENOBUFS, &buf);
break;
}
assert(buf.base != NULL);
flags = 0;
if (WSARecv(handle->socket,
(WSABUF*)&buf,
1,
&bytes,
&flags,
NULL,
NULL) != SOCKET_ERROR) {
if (bytes > 0) {
//成功读取,调用读取回调
handle->read_cb((uv_stream_t*)handle, bytes, &buf);
//判断是否需要继续读取
if (bytes < buf.len) {
break;
}
} else {
//链接被关闭了
handle->flags &= ~(UV_HANDLE_READING | UV_HANDLE_READABLE);
DECREASE_ACTIVE_COUNT(loop, handle);
handle->read_cb((uv_stream_t*)handle, UV_EOF, &buf);
break;
}
} else {//读取出错
err = WSAGetLastError();
if (err == WSAEWOULDBLOCK) {
//读取缓存为空
handle->read_cb((uv_stream_t*)handle, 0, &buf);
} else {
//出现了严重错误
handle->flags &= ~UV_HANDLE_READING;
DECREASE_ACTIVE_COUNT(loop, handle);
if (err == WSAECONNABORTED) {
err = WSAECONNRESET;
}
handle->read_cb((uv_stream_t*)handle,
uv_translate_sys_error(err),
&buf);
}
break;
}
}
done:
//一个读取请求结束之后,如果没有停止读取,那么再次开启一个请求,持续读取
if ((handle->flags & UV_HANDLE_READING) &&
!(handle->flags & UV_HANDLE_READ_PENDING)) {
uv_tcp_queue_read(loop, handle);
}
}
DECREASE_PENDING_REQ_COUNT(handle);//handle等待处理的请求数量减一
}


停止读取请求。导出函数,在uv.h中声明,在stream.c中定义

并不会立刻停止,在调用该函数之后,会影响最近的一次读取请求处理,并且不会再次发送读取请求,依次达到停止效果。

int uv_read_stop(uv_stream_t* handle) {
int err;
if (!(handle->flags & UV_HANDLE_READING))//非读取状态,返回
return 0;
err = 0;
if (handle->type == UV_TTY) {
err = uv_tty_read_stop((uv_tty_t*) handle);
} else {
if (handle->type == UV_NAMED_PIPE) {
uv__pipe_stop_read((uv_pipe_t*) handle);
} else {
handle->flags &= ~UV_HANDLE_READING;//TCP
}
DECREASE_ACTIVE_COUNT(handle->loop, handle);//handle的活动计数减一
}
return uv_translate_sys_error(err);
}


从uv_tcp_t的connet以及读取可以看出。

1.uv_tcp_t的激活状态主要是依靠请求,比如在connet请求处理完之后,如果没有再发送其他请求(比如read),那么uv_tcp_t将会处于停止状态,loop不会在活动handle中记录该handle。

2.部分流相关的请求会记录在loop的请求列表中,比如connect以及write,个人推测是用户发出的请求会添加到loop的请求队列,而libuv自己处理中发出的请求则不会(比如uv_read_start,参数中并没有req,是使用的是stream内部的私有uv_read_t;)。

3.一些流相关的请求是会在处理之后自己再重复发起的,比如read,可以推测accept也是如此,不需要用户自己再去重复发送,但是需要用户去停止。

这里写代码片


通过uv_close关闭uv_tcp_t,会调用uv_tcp_close

void uv_tcp_close(uv_loop_t* loop, uv_tcp_t* tcp) {
int close_socket = 1;
if (tcp->flags & UV_HANDLE_READ_PENDING) {
//等待读请求的处理。也就是调用了异步读取WSARecv,但是还未收到数据
if (!(tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET))
{
//非共享socket,直接shutdown。在uv_tcp_open的方式初始化uv_tcp_t时,会设置为
//UV_HANDLE_SHARED_TCP_SOCKET
shutdown(tcp->socket, SD_SEND);//关闭socket的写功能
} else if (uv_tcp_try_cancel_io(tcp) == 0)
{//通过CancelIo  API取消
//如果是共享的,那么尝试取消i/o请求。如果成功,并不立刻关闭socket,等读请求返回的时候关闭
close_socket = 0;
} else
{//共享socket,取消失败
//只能关闭socket了。
}
} else if ((tcp->flags & UV_HANDLE_SHARED_TCP_SOCKET) &&
tcp->tcp.serv.accept_reqs != NULL) {
/* Under normal circumstances closesocket() will ensure that all pending */
/* accept reqs are canceled. However, when the socket is shared the */
/* presence of another reference to the socket in another process will */
/* keep the accept reqs going, so we have to ensure that these are */
/* canceled. */
if (uv_tcp_try_cancel_io(tcp) != 0) {
/* When cancellation is not possible, there is another option: we can */
/* close the incoming sockets, which will also cancel the accept */
/* operations. However this is not cool because we might inadvertently */
/* close a socket that just accepted a new connection, which will */
/* cause the connection to be aborted. */
unsigned int i;
for (i = 0; i < uv_simultaneous_server_accepts; i++) {
uv_tcp_accept_t* req = &tcp->tcp.serv.accept_reqs[i];
if (req->accept_socket != INVALID_SOCKET &&
!HasOverlappedIoCompleted(&req->u.io.overlapped)) {
closesocket(req->accept_socket);
req->accept_socket = INVALID_SOCKET;
}
}
}
}

if (tcp->flags & UV_HANDLE_READING) {
tcp->flags &= ~UV_HANDLE_READING;//停止读取
DECREASE_ACTIVE_COUNT(loop, tcp);
}

if (tcp->flags & UV_HANDLE_LISTENING) {
tcp->flags &= ~UV_HANDLE_LISTENING;
DECREASE_ACTIVE_COUNT(loop, tcp);
}

if (close_socket) {
closesocket(tcp->socket);
tcp->socket = INVALID_SOCKET;
tcp->flags |= UV_HANDLE_TCP_SOCKET_CLOSED;
}

tcp->flags &= ~(UV_HANDLE_READABLE | UV_HANDLE_WRITABLE);
uv__handle_closing(tcp);//状态改为closing
//没有等待处理的请求了,关闭handle,否则需要等到处理请求时关闭
if (tcp->reqs_pending == 0) {
uv_want_endgame(tcp->loop, (uv_handle_t*)tcp);
}
}


uv_tcp_t中有两个计数

1.reqs_pending,表示已经添加到loop中的请求,或者已经与iocp绑定的端口调用了i/o函数,可以通过iocp端口获取的请求,在请求处理结束之后减一,如果为0且处于UV_HANDLE_CLOSING状态,那么会将handle添加到关闭列表。

2.activecnt,调用了某一个请求后加一,在对应的处理函数中减一,为0并且正在关闭时stop handle

请求的调用与处理会引发两个计数的变化。

对于部分请求,比如connect与write,会添加到loop的active_reqs列表,请求处理完成之后,会从列表删除。该列表是判断loop alive的条件之一。

与tcp相关的还有accept与write操作,将在下一篇中分析
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  libuv uv-tcp-t