您的位置:首页 > 理论基础 > 计算机网络

epoll实现TCP通信

2016-05-27 14:58 826 查看
常见指令:telent 127.0.0.1 8080 连接
service iptables stop 关闭防火墙
在TCP连接中,主动关闭连接的一方会进入2MSL,如果是服务器端,当TIME_WAIT时,sock不能被复用(四次挥手),使用setsockopt解决。int opt=1;setsockopt(listenfd,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));epoll是linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
注:epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
注:epoll文件描述符用完后,直接用close关闭即可,非常方便。事实上,任何被侦听的文件符只要其被关闭,那么它也会自动从被侦听的文件描述符集合中删除,很是智能。
epoll相关的系统调用有:epoll_create, epoll_ctl和epoll_wait。
1.epoll_create用来创建一个epoll文件描述符。



返回值:>0:非空文件描述符;-1:函数调用失败,同时会自动设置全局变量errno;2.epoll_ctl用来添加/修改/删除需要侦听的文件描述符及其事件.
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。




epfd:epoll_create的返回值
op:表示动作,为以下三个宏任意一个(根据需要):添加,修改,删除



fd:为关心的描述符
event为:关心描述符事件





返回值:成功,返回0;失败,返回-1,置错误码。
3.epoll_wait/epoll_pwait接收发生在被侦听的描述符上的,用户感兴趣的IO事件。





收集在epoll监控的事件中已经就绪的事件。
events:是分配好的epoll_event结构体数组,epoll将会把就绪的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中)。
maxevents:告之内核这个events有多大,这个 maxevents的值不能小于创建epoll_create()时的size
timeout:是超时时间(毫秒)
1.0表示轮询非阻塞,立即返回;
2.-1将不确定,也有说法说是永久阻塞。
3.大于0,以timeput事件轮询返回

返回值:
1.成功,返回对应I/O上已准备好的文件描述符数
2.0表示已超时。
3.-1,发生错误。

//初级版
#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#define _SIZE_ 64
#define _BACKLOG_ 5
typedef struct fdBuf
{
void * _buf;
int _fd;
}fdBuf;
static void usage(const char* proc)
{
printf("%s [ip][port]",proc);
}
static int startup(char* ip,int port)
{
int listen_sock=socket(AF_INET,SOCK_STREAM,0);
if(listen_sock<0)
{
perror("socket");
exit(1);
}
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_port=htons(port);
local.sin_addr.s_addr=inet_addr(ip);
if(bind(listen_sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(2);
}
if(listen(listen_sock,_BACKLOG_)<0)
{
perror("listen");
exit(3);
}
return listen_sock;
}
int sock_epoll(int listen_sock)
{
//1.create fds instance
int  ins=epoll_create(_SIZE_);
if(ins<0)
{
perror("poll_create");
return 1;
}
struct epoll_event ev;
ev.events=EPOLLIN;
ev.data.fd=listen_sock;
int i=0;//index
fdBuf bufs[_SIZE_];
for(i=0;i<_SIZE_;++i)
{
bufs[i]._fd=-1;
bufs[i]._buf=NULL;
}
struct epoll_event fds[_SIZE_];//with bufs save buf
for(i=0;i<_SIZE_;++i)
{
fds[i].events=0;
fds[i].data.fd=-1;
}
epoll_ctl(ins,EPOLL_CTL_ADD,listen_sock,&ev);
int ret=-1;
int timeout=5000;
struct sockaddr_in remote;
socklen_t len=sizeof(remote);
ssize_t _s;//charnum
while(1)
{
switch((ret=epoll_wait(ins,fds,64,timeout)))
{
case -1://error
perror("epoll_wait");
break;
case 0://time out
printf("time is out\n");
break;
default:
{
for(i=0;i<ret;++i)
{
//printf("%d",ret);
if(fds[i].data.fd==listen_sock)
{
if(new_sock<0)
{
perror("accept");
continue;
}
ev.events=EPOLLIN;
ev.data.fd=new_sock;
epoll_ctl(ins,EPOLL_CTL_ADD,new_sock,&ev);
}
else if(fds[i].data.fd>0&&fds[i].events&EPOLLIN)
{
if(bufs[i]._fd==-1)
{
char *buf=(char*)malloc(sizeof(char)*1024);
bufs[i]._fd=fds[i].data.fd;
bufs[i]._buf=buf;
}
//save buf and fd
memset(bufs[i]._buf,'\0',1024);
//sleep(1);
fflush(stdout);
_s=read(fds[i].data.fd,bufs[i]._buf,sizeof(bufs[i]._buf)-1);
if(_s>0)
{
((char*)bufs[i]._buf)[_s]='\0';
printf("client:%s",(char*)bufs[i]._buf);//输出
ev.events=EPOLLOUT;
ev.data.fd=fds[i].data.fd;
epoll_ctl(ins,EPOLL_CTL_MOD,fds[i].data.fd,&ev);
}
else if(_s==0)
{
printf("client is close...\n");
free(bufs[i]._buf);
bufs[i]._fd=-1;
bufs[i]._buf=NULL;
//remove
epoll_ctl(ins,EPOLL_CTL_DEL,fds[i].data.fd,NULL);
}
else
{}
}
else if(fds[i].data.fd>0&&fds[i].events&EPOLLOUT)
{
write(fds[i].data.fd,bufs[i]._buf,strlen(bufs[i]._buf));
ev.events=EPOLLIN;
ev.data.fd=fds[i].data.fd;
epoll_ctl(ins,EPOLL_CTL_MOD,fds[i].data.fd,&ev);
}
else
{}
}
break;
}//default end
}//switch end
}//while end
}
int main(int argc,char* argv[])
{
if(argc!=3)
{
usage(argv[0]);
return 1;
}
int _port=atoi(argv[2]);
char* _ip=argv[1];
int listen_sock=startup(_ip,_port);
sock_epoll(listen_sock);
close(listen_sock);
return 0;
}
//client
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
static void usage(const char* proc)
{
printf("%s [i][port]",proc);
}
int main(int argc,char* argv[])
{
if(argc!=3)
{
usage(argv[0]);
return 1;
}
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
perror("socket");
return 2;
}
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_port=htons(atoi(argv[2]));
local.sin_addr.s_addr=inet_addr(argv[1]);
if(connect(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("connect");
return 3;
}
char buf[1024];
ssize_t _s;
while(1)
{
printf("please input\n");
fflush(stdout);
_s=read(0,buf,sizeof(buf)-1);
if(_s>0)
{
buf[_s]='\0';
if(strncmp(buf,"quit",4)==0)
{
close(sock);
return 0;
}
write(sock,buf,strlen(buf));
}
else if(_s==0)
{
close(sock);
return 1;
}
_s=read(sock,buf,sizeof(buf)-1);
if(_s>0)
{
buf[_s]='\0';
printf("echo:%s\n",buf);
}
}
return 0;
}
运行截图:

client:



server:



//server_epoll
#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<assert.h>
#include<string.h>
#include<errno.h>
#define _MAX_FD_NUM_  64
typedef struct _fd_buf
{
int fd;
char buf[1024];
}fdBuf_t,*fdBuf_p;
static void usage(const char* const proc)
{
assert(proc);
printf("usage:%s[ip][port]",proc);
}
static int start(char* ip,int port)
{
assert(ip);
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
perror("socket");
exit(1);
}
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_port=htons(port);
local.sin_addr.s_addr=inet_addr(ip);
//reuse socket
int opt=1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(2);
}
if(listen(sock,5)<0)
{
perror("listen");
exit(3);
}
return sock;
}
static int epoll_server(int sock)
{
int epoll_fd=epoll_create(256);//-1 or fd
if(epoll_fd<0)
{
perror("epoll_create");
return -1;
}
struct epoll_event ev;
ev.events=EPOLLIN;
ev.data.fd=sock;
//0 success or -1 fail
if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,sock,&ev)<0)
{
perror("epoll_ctl");
return -1;
}
struct epoll_event fds[_MAX_FD_NUM_];
int ret=-1;
int timeout=5000;
int i=0;
struct sockaddr_in client;
socklen_t len=sizeof(client);
ssize_t _s=-1;
while(1)
{
switch((ret=epoll_wait(epoll_fd,fds,_MAX_FD_NUM_,timeout)))
{
case -1://error
perror("epoll_wait");
break;
case 0:
printf("time out...\n");
break;
default:
{
for(i=0;i<ret;++i)
{
//listen ready
if(fds[i].data.fd==sock&&fds[i].events&EPOLLIN)
{
int new_sock=accept(sock,(struct sockaddr*)&client,&len);
if(new_sock<0)
{
perror("accept");
continue;
}
ev.events=EPOLLIN;
ev.data.fd=new_sock;
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,new_sock,&ev);
}
else//normal socket
{
if(fds[i].events&EPOLLIN)
{
fdBuf_p mem=(fdBuf_p)malloc(sizeof(fdBuf_t));
_s=read(fds[i].data.fd,mem->buf,sizeof(mem->buf)-1);
if(_s>0)
{
mem->fd=fds[i].data.fd;
(mem->buf)[_s]='\0';
fds[i].data.ptr=mem;
printf("client:%s",mem->buf);
ev.events=EPOLLOUT;
ev.data.ptr=mem;
epoll_ctl(epoll_fd,EPOLL_CTL_MOD,mem->fd,&ev);
}
else if(_s==0)
{
free(mem);
close(fds[i].data.fd);
continue;
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fds[i].data.fd,NULL);
}
else
{}

}
else if(fds[i].events&EPOLLOUT)
{
fdBuf_p cur=(fdBuf_p)fds[i].data.ptr;
write(cur->fd,cur->buf,strlen(cur->buf));
close(cur->fd);
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,cur->fd,NULL);
free(cur);
}
else{}
}
}
}
break;
}
}
}
int main(int argc,char* argv[])
{
if(argc!=3)
{
usage(argv[0]);
return -1;
}
char* ip=argv[1];
int port=atoi(argv[2]);
int listen_sock=start(ip,port);
epoll_server(listen_sock);
close(listen_sock);
return 0;
}
运行截图:浏览器访问,
请求行---响应行
client用的什么方法,什么浏览器(火狐),协议版本http/1.0 + 状态 200成功+错误码(eg:400页面不存在)






改为显示hello的:
修改代码:









LT(level triggered)是epoll缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.







下来修改epoll为ET触发。
先修改文件描述符为非阻塞











#include<stdio.h>
#include<stdlib.h>
#include<errno.h>
#include<string.h>
#include<unistd.h>
#include<fcntl.h>
static void setnoblock(int fd)
{
//get flag
int fl=fcntl(fd,F_GETFL);//arg is ignore
if(fl<0)
{
perror("fcntl");
exit(1);
}
//set flag
if(fcntl(fd,F_SETFL,fl|O_NONBLOCK))//isn't 0 is error
{
perror("fcntl");
exit(1);
}
}
int main()
{
setnoblock(0);
char buf[20];
ssize_t _s=-1;
while(1)
{
memset(buf,'\0',20);
_s=read(0,buf,sizeof(buf)-1);
if(_s>0)
{
buf[_s]='\0';
printf("echo:%s",buf);
}
else if(_s==0)
{
//do nothing
}
else
{
if(errno==EAGAIN)//EAGAIN==11
{
printf("no data\n");
}
}
sleep(1);
}
return 0;
}
修改epoll为ET
//read
static int readData(int sock,char* buf,int size)
{
assert(buf);
memset(buf,'\0',size);
int i=0;
int ret=-1;
while((ret=read(sock,buf+i,size-i))<size)
{
if(errno==EAGAIN)
break;
i+=ret;
}
return i;
}
//write
static int writeData(int sock,char* buf,int size)
{
assert(buf);
int i=0;
int ret=-1;
while((ret=write(sock,buf+i,size-i))<size)
{
if(errno==EAGAIN)
break;
i+=ret;
}
return i;
}
//修改e所有关心操作符的vents
struct epoll_event ev;
ev.events=EPOLLIN|EPOLLET;
ev.data.fd=sock;
epoll:ET,非阻塞代码
#include<stdio.h>
#include<stdlib.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<sys/epoll.h>
#include<assert.h>
#include<fcntl.h>
#include<unistd.h>
#include<string.h>
#include<errno.h>
#define _MAX_FD_NUM_  64
typedef struct _fd_buf
{
int fd;
char buf[1024];
}fdBuf_t,*fdBuf_p;
static void setnoblock(int fd)
{
int fl=fcntl(fd,F_GETFL);
if(fl<0)
{
perror("fcntl");
exit(1);
}
if(fcntl(fd,F_SETFL,fl|O_NONBLOCK))
{
perror("fcntl");
exit(1);
}
}
static void usage(const char* const proc)
{
assert(proc);
printf("usage:%s[ip][port]",proc);
}
static int start(char* ip,int port)
{
assert(ip);
int sock=socket(AF_INET,SOCK_STREAM,0);
if(sock<0)
{
perror("socket");
exit(1);
}
struct sockaddr_in local;
local.sin_family=AF_INET;
local.sin_port=htons(port);
local.sin_addr.s_addr=inet_addr(ip);
//reuse socket
int opt=1;
setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
if(bind(sock,(struct sockaddr*)&local,sizeof(local))<0)
{
perror("bind");
exit(2);
}
if(listen(sock,5)<0)
{
perror("listen");
exit(3);
}
return sock;
}
//read
static int readData(int sock,char* buf,int size)
{
assert(buf);
memset(buf,'\0',size);
int i=0;
int ret=-1;
while((ret=read(sock,buf+i,size-i))<size)
{
if(errno==EAGAIN)
break;
i+=ret;
}
return i;
}
//write
static int writeData(int sock,char* buf,int size)
{
assert(buf);
int i=0;
int ret=-1;
while((ret=write(sock,buf+i,size-i))<size)
{
if(errno==EAGAIN)
break;
i+=ret;
}
return i;
}
static int epoll_server(int sock)
{
int epoll_fd=epoll_create(256);//-1 or fd
if(epoll_fd<0)
{
perror("epoll_create");
return -1;
}
struct epoll_event ev;
setnoblock(sock);
ev.events=EPOLLIN|EPOLLET;
ev.data.fd=sock;
//0 success or -1 fail
if(epoll_ctl(epoll_fd,EPOLL_CTL_ADD,sock,&ev)<0)
{
perror("epoll_ctl");
return -1;
}
struct epoll_event fds[_MAX_FD_NUM_];
int ret=-1;
int timeout=5000;
int i=0;
struct sockaddr_in client;
socklen_t len=sizeof(client);
ssize_t _s=-1;
while(1)
{
switch((ret=epoll_wait(epoll_fd,fds,_MAX_FD_NUM_,timeout)))
{
case -1://error
perror("epoll_wait");
break;
case 0:
printf("time out...\n");
break;
default:
{
for(i=0;i<ret;++i)
{
//listen ready
if(fds[i].data.fd==sock&&fds[i].events&EPOLLIN)
{
int new_sock=accept(sock,(struct sockaddr*)&client,&len);
if(new_sock<0)
{
perror("accept");
continue;
}
setnoblock(new_sock);
printf("get a connect\n");
ev.events=EPOLLIN|EPOLLET;
ev.data.fd=new_sock;
epoll_ctl(epoll_fd,EPOLL_CTL_ADD,new_sock,&ev);
}
else//normal socket
{
if(fds[i].events&EPOLLIN)
{
fdBuf_p mem=(fdBuf_p)malloc(sizeof(fdBuf_t));
//_s=read(fds[i].data.fd,mem->buf,sizeof(mem->buf)-1);
_s=readData(fds[i].data.fd,mem->buf,sizeof(mem->buf));
if(_s>0)
{
mem->fd=fds[i].data.fd;
(mem->buf)[_s]='\0';
fds[i].data.ptr=mem;
printf("client:%s",mem->buf);
ev.events=EPOLLOUT|EPOLLET;
ev.data.ptr=mem;
epoll_ctl(epoll_fd,EPOLL_CTL_MOD,mem->fd,&ev);
}
else if(_s==0)
{
free(mem);
close(fds[i].data.fd);
continue;
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,fds[i].data.fd,NULL);
}
else
{}

}
else if(fds[i].events&EPOLLOUT)
{
//char* buf="http/1.0 200 ok\r\n\r\nhello:)\r\n";
fdBuf_p cur=(fdBuf_p)fds[i].data.ptr;
//write(cur->fd,cur->buf,strlen(cur->buf));
//write(cur->fd,buf,strlen(buf));
writeData(cur->fd,cur->buf,strlen(cur->buf));
close(cur->fd);
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,cur->fd,NULL);
free(cur);
}
else{}
}
}
}
break;
}
}
}
int main(int argc,char* argv[])
{
if(argc!=3)
{
usage(argv[0]);
return -1;
}
char* ip=argv[1];
int port=atoi(argv[2]);
int listen_sock=start(ip,port);
epoll_server(listen_sock);
close(listen_sock);
return 0;
}
运行结果:


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  epoll