您的位置:首页 > 运维架构 > Linux

Linux非阻塞IO(六)使用poll实现非阻塞的服务器端

2014-10-27 20:15 661 查看
关于poll模型监听的事件以及返回事件,我们定义宏如下:

#define kReadEvent (POLLIN | POLLPRI)
#define kWriteEvent (POLLOUT | POLLWRBAND)
#define kReadREvent (POLLIN | POLLPRI | POLLRDHUP)
#define kWriteREvent (POLLOUT)


前面我们说明了,为什么非阻塞IO必须具备缓冲区。事实上,对于server而言,每条TCP连接应该具有两个缓冲区,一个用于输入,一个用于输出。


sockfd –>  InputBuffer –> 用户空间 –> 处理数据 –> 得到结果 –> OutputBuffer –> sockfd



但是本例仅仅是简单的回射服务,收到数据立刻发出,所以我们只使用一个缓冲区。

对于每个TCP连接,我们对应这样的一个数据结构:

typedef struct{
buffer_t buffer_;
} tcp_connection_t; //表示一条TCP连接


那么我们需要建立一个从fd到对应的tcp_connection_t的对应关系。我使用以下的数组:

tcp_connection_t *connsets[EVENTS_SIZE]; //提供从fd到TCP连接的映射


这个数组初始化为NULL,然后我使用fd作为下标,也就是说,当我需要处理某fd的时候,以该fd为下标就可以找到它相应的tcp_connection_t指针。这本质上是一种哈希表的思想

使用tcp_connection_t的逻辑是:


每当accept一个新的连接,就动态创建一个新的tcp_connection_t,并且将其指针保存至以peerfd为下标的位置中




每当连接关闭,需要释放内存,同时重置指针为NULL。



我将所有的代码全部写入main函数中,因为前面封装了buffer,代码的可读性已经提高,再加上我们的业务逻辑简单,进一步封装,反而降低可读性。

这里跟client有几处相同点:


仅当缓冲区有空闲空间时才监听read事件




仅当缓冲区有数据时,才监听write事件



所以我们每次进行poll调用前都需要将fd的监听事件重新装填。

server的代码整体框架如下:

//初始化poll

while(1)
{
//重新装填fd数组

//poll系统调用

//依次处理每个fd的读写事件
}


所以完整的代码如下:

#define _GNU_SOURCE             /* See feature_test_macros(7) */
#include <sys/socket.h>
#include "sysutil.h"
#include "buffer.h"
#include <assert.h>

#define EVENTS_SIZE 1024

typedef struct{ buffer_t buffer_; } tcp_connection_t; //表示一条TCP连接

tcp_connection_t *connsets[EVENTS_SIZE]; //提供从fd到TCP连接的映射

int main(int argc, char const *argv[])
{
//获取监听fd
int listenfd = tcp_server("localhost", 9981);
//将监听fd设置为非阻塞
activate_nonblock(listenfd);

//初始化connsets
int i;
for(i = 0; i < EVENTS_SIZE; ++i)
{
connsets[i] = NULL;
}

//初始化poll
struct pollfd events[EVENTS_SIZE];
for(i = 0; i < EVENTS_SIZE; ++i)
events[i].fd = -1;
events[0].fd = listenfd;
events[0].events = kReadEvent;
int maxi = 0;

while(1)
{
//重新装填events数组
int i;
for(i = 1; i < EVENTS_SIZE; ++i)
{
int fd = events[i].fd;
events[i].events = 0; //重置events
if(fd == -1)
continue;
assert(connsets[fd] != NULL);

//当Buffer中有数据可读时,才监听write事件
if(buffer_is_readable(&connsets[fd]->buffer_))
{
events[i].events |= kWriteEvent;
}

//当Buffer中有空闲空间时,才监听read事件
if(buffer_is_writeable(&connsets[fd]->buffer_))
{
events[i].events |= kReadEvent;
}

}

//poll调用
int nready = poll(events, maxi + 1, 5000);
if(nready == -1)
ERR_EXIT("poll");
else if(nready == 0)
{
printf("poll timeout.\n");
continue;
}

//处理listenfd
if(events[0].revents & kReadEvent)
{
//接受一个新的客户fd
int peerfd = accept4(listenfd, NULL, NULL, SOCK_NONBLOCK | SOCK_CLOEXEC);
if(peerfd == -1)
ERR_EXIT("accept4");
//新建tcp连接
tcp_connection_t *pt = (tcp_connection_t*)malloc(sizeof(tcp_connection_t));
buffer_init(&pt->buffer_);
//将该tcp连接放入connsets
connsets[peerfd] = pt;
//放入events数组
int i;
for(i = 0; i < EVENTS_SIZE; ++i)
{
if(events[i].fd == -1)
{
events[i].fd = peerfd; //这里不必监听fd
if(i > maxi)
maxi = i; //更新maxi
break;
}
}
if(i == EVENTS_SIZE)
{
fprintf(stderr, "too many clients\n");
exit(EXIT_FAILURE);
}
}

//处理客户fd
//int i;
for(i = 1; i <= maxi; ++i)
{
int sockfd = events[i].fd;
if(sockfd == -1)
continue;
//取出指针
tcp_connection_t *pt = connsets[sockfd];
assert(pt != NULL);
if(events[i].revents & kReadREvent) //读取数据
{
if(buffer_read(&pt->buffer_, sockfd) == 0)
{
//close
events[i].fd = -1;
close(sockfd);
free(pt);
connsets[sockfd] = NULL;
continue; //继续下一次循环
}
}

if(events[i].revents & kWriteREvent) //可以发送数据
{
buffer_write(&pt->buffer_, sockfd);
}
}
}

close(listenfd);

return 0;
}


需要注意的是,关于accept,我使用的是accpet4,这是Linux新增的系统调用:

 

int accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags);


与accept的区别就是accept4可以指定非阻塞标志位。

 

下文使用epoll。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: