您的位置:首页 > 职场人生

epoll读到一半又有新事件来了怎么办?

2017-05-08 14:00 246 查看
有哥们在腾讯面试被问到了。我也很好奇就做了下实验。

有些朋友急性子想看过程只想知道结果,我就先给出结果吧。

1.阻塞读数据(不用epoll),你说读到一半有新消息又来了怎么办?

2.非阻塞读数据(不用epoll),你说读到一半有新消息又来了怎么办?

3.epoll的ET模式时,如果数据只读了一半,也就是缓冲区的数据只读了一点,然后又来新事件了怎么办?

1:来了就来了呗,读就是了啊。可能我们一次读到两次发过来的消息。

2:来了就来了呗,读就是了啊。可能我们一次读到两次发过来的消息。

3:单线程/进程不会有任何问题,多进程/多线程我们只需要设置EPOLLONESHOT这个参数就好了

关于问题3的用户代码应该怎么写后面会介绍。

下面就是我自己的测试代码,和自己一点epoll的源码分析,没兴趣的可以不看

客户端代码:(下面四个示例都是同一个客户端)

int main()
{
int sock;
sock= socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
if(sock<0){
return 0;
}

struct sockaddr_in servaddr;
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(8888);
servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");

if(connect(sock,(struct sockaddr*)&servaddr,sizeof(servaddr))<0){
return 0;
}
char *buf1 = "hello ";
write(sock,buf1,strlen(buf1) + 1);
printf("buf = %s\n",buf1);

sleep(1);

char *buf2 = "world ";
write(sock,buf2,strlen(buf2) + 1);
printf("buf = %s\n",buf2);

sleep(2);

char *buf3 = "陈明东";
write(sock,buf3,strlen(buf3) + 1);
printf("buf = %s\n",buf3);

sleep(10);
close(sock);
}




服务端阻塞读:

while(1)
{
printf("sleep\n");
sleep(2);
int len = read(conn,buf,1024);
if(0 == len)
{
printf("客户端退出\n");
close(conn);
break;
}
/*把读到的数据打印出来*/
for(int i = 0;i<len;++i)
printf("%c",buf[i]);
printf("\n");
}




服务端非阻塞读:

while(1)
{
printf("sleep\n");
sleep(2);
index = 0,len = 1024;
while(1)
{
int bytes_read = read(conn,buf + index,len - index);
if ( bytes_read == -1 )
{
if( errno == EAGAIN || errno == EWOULDBLOCK )
{
break;
}
return 0;
}
else if ( bytes_read == 0 )
{
printf("客户端退出\n");
close(conn);
return 0;
}
index += bytes_read;
printf("这次读到了 %d 字节\n",bytes_read);
}
/*把读到的数据打印出来*/
for(int i = 0;i<index;++i)
printf("%c",buf[i]);
printf("\n");
}




服务端epollET模式非阻塞读:

while(1)
{
printf("epoll_wait()\n");
num = epoll_wait(epoll_fd,events,10,-1);
if(num < 0) return 0;
for(int i = 0;i<num;++i)
{
sockfd = events[i].data.fd;
if(sockfd == listenfd)
{
if((connfd = accept(listenfd,(struct sockaddr*)&peeraddr,&peerlen))<0)
return 0;
addfd(epoll_fd,connfd);
}
else if(events[i].events & EPOLLIN)
{
printf("有读的数据到了\n");
char buf[1024];
/*非阻塞读*/
int index = 0,len = 1024;
while(1)
{
int bytes_read = read(sockfd,buf + index,len - index);
if ( bytes_read == -1 )
{
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
return 0;
}
else if ( bytes_read == 0 )
{
printf("客户端退出\n");
close(sockfd);
return 0;
}
index += bytes_read;
printf("这次读到了 %d 字节\n",bytes_read);
printf("我们故意读慢一点sleep 2s\n");
sleep(2);
}
/*把读到的数据打印出来*/
for(int i = 0;i<index;++i)
printf("%c",buf[i]);
printf("\n");
}
}
}




所以说呢,压根就没啥问题,你自己读你自己的嘛,每次事件来了epoll都会通知你,但是对于我这个代码占时看不出来是不是每次事件来了都会通知你,下面这个代码就能看出来。

服务端epoll多进程ET模式非阻塞读

while(1)
{
printf("epoll_wait() PID=%d\n",getpid());
num = epoll_wait(epoll_fd,events,10,-1);
if(num < 0) return 0;
printf("epoll_wait() over PID=%d\n",getpid());
for(int i = 0;i<num;++i)
{
sockfd = events[i].data.fd;
if(sockfd == listenfd)
{
if((connfd = accept(listenfd,(struct sockaddr*)&peeraddr,&peerlen))<0)
return 0;
addfd(epoll_fd,connfd);
}
else if(events[i].events & EPOLLIN)
{
char buf[1024];
/*非阻塞读*/
int index = 0,len = 1024;
while(1)
{
int bytes_read = read(sockfd,buf + index,len - index);
if ( bytes_read == -1 )
{
if( errno == EAGAIN || errno == EWOULDBLOCK )
break;
printf("PID=%d 读错误退出\n",getpid());
break;
}
else if ( bytes_read == 0 )
{
printf("客户端退出\n");
close(sockfd);
return 0;
}
index += bytes_read;
printf("PID=%d  读到了 %d 字节\n",getpid(),bytes_read);
printf("故意读慢一点 sleep 2s\n");
sleep(2);
}
printf("PID=%d 读到的数据:",getpid());
/*把读到的数据打印出来*/
for(int i = 0;i<index;++i)
printf("%c",buf[i]);
printf("\n");
}
}
}




可以看出来每次只要接收缓冲区内有数据,你就可以一直读到完。

但是每次事件过来都会通知你一次,比如上面代码,另一个进程进来了,但是他读不到数据。因为TCP没有和自己建立连接(顺便说一下我以前做高并发服务器的时候,那时候思想不成熟,想用半同步半异步模式,然后利用多进程来做,就是碰到了这个问题,另一个进程读不到数据,永远都是同一个进程在处理事件)

利用了EPOLLONESHOT之后的情况:



如果是多线程就就没有上面那个BUG了



总结:

如果是单进程是不会有任何问题的。因为在read的时候是不可能去epoll_wait(),这样epoll通知不到你,而且你也不需要它通知,因为你自己正在处理嘛。

如果是用多线程,我们不能多进程去读写同一个socket,只需要加一个EPOLLONESHOT事件,这样就不会存在同一个socket被两个线程读取

多进程稍微麻烦一点,有可能2号进程被唤醒来处理这个1号进程的socket,2号进程是读不到数据的。这样这个数据就一直在缓冲区中。所以我们要利用回话保持技术或者一致性Hash算法,每次都把同一个socket让同一个进程去处理,这样就没问题了

源码分析:

//这个就是传说中的回调函数,屌屌的。
ep_poll_callback()
{
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);/*将该fd加入到epoll监听的就绪链表中*/
if (waitqueue_active(&ep->wq))//里面就看出来,有等待的进程或线程
wake_up_locked(&ep->wq);//就直接唤醒
//没有就啥也不做,并且就绪链表还会被清空,LT模式还会把没有处理完的事件继续加入到就绪链表中,ET模式不会做任何事
}


虽然eventpoll里面有个wq(等待队列),但是从刚才源码分析的情况来看,我觉得最好就是一个进程或者线程去wait,多了反而会出问题。

再看一个epoll_wait源码吧

epoll_wait()
{
//里面主要就是ep_poll这个函数
error = ep_poll(ep, events, maxevents, timeout);
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout)
{
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
/* 将当前进程加入到eventpoll的等待队列中,等待文件状态就绪或直到超时,或被信号中断。 */
__add_wait_queue(&ep->wq, &wait);
for (;;) {
/* 执行ep_poll_callback()唤醒时应当需要将当前进程唤醒,所以当前进程状态应该为“可唤醒”TASK_INTERRUPTIBLE  */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果就绪队列不为空,也就是说已经有文件的状态就绪或者超时,则退出循环。*/
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/*
从这里开始当前进程会进入睡眠状态,直到某些文件的状态就绪或者超时。
当文件状态就绪时,eventpoll的回调函数ep_poll_callback()会唤醒在ep->wq指向的等待队列中的进程
*/
jtimeout = schedule_timeout(jtimeout);
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);//在这里把等待队列清空
set_current_state(TASK_RUNNING);
}


到此结束
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  epoll 腾讯 面试 源码