Linux通过改进的epoll实现对不同超时时间的数据包重传
2014-01-10 22:26
711 查看
应用背景:
epoll模型是当前Linux网络编程的主流模型,可以高效解决多个事件并发的问题。在进行网络编程时,往往要对每一个发出的数据包进行ACK确认,若在指定的时间内没有收到ACK,则需要重传或者丢弃该数据包。那么如果在epoll模型中实现该功能呢?
先来看看传统的做法:程序维护一个“已发出但是没收到ACK”的数据包记录R,记录包括数据包内容、数据包发送的时间戳t以及超时时间T。当需要进行数据包发送时,在发出数据包的同时把该数据包加入记录R,接下来程序继续执行。在这个方法中,程序需要通过一种手段定时检测当前时间now是否大于记录R中的每一个数据包的t+T,若大于,则说明数据包接收ACK超时,可以通过多线程技术或者在程序主循环中每循环一次检测一次。
现在的问题是:一般的epoll模型,程序都会阻塞在epoll_wait调用上,程序没有办法循环检测记录R,有人会说,epoll_wait不是有超时时间么?的确,但是这个超时只能针对一个数据包的情况,试想一下,若程序当前依次发出了10个数据包,各自的超时时间分别为3秒、5秒、2秒等等,那么epoll_wait的超时时间应该设置多长呢?答案是没有办法。因为前面的数据包万一设置的epoll_wait超时时间比较长,后面来了一个超时时间短的,由于epoll_Wait的阻塞,就会错失后面超时时间短的数据包的重发机会。
因此,本文实现了一个基于升序时间链表的tepoll,可以对不同超时间隔的数据包进行超时重传。主要思想是:
1、在发送数据包的同时,把数据包以及其超时值封装成一个“已发送但未应答”的对象,并根据超时值插入升序时间链表;
2、对epoll进行封装,使得其在每一次调用前,先查询升序时间链表中最近的超时时间T,并把该超时时间T作为epoll_wait的超时值;
3、若在超时前epoll_wait返回,说明收到ACK,根据ACK与数据包的对应关系删除时间链表中的记录;
4、若epoll_wait超时,则遍历时间链表,查找now>t+T的数据包(此时最少有一个数据包超时),调用send将其进行重发,同时执行步骤1;
下面贴上主要代码进行讲解:
以下是对epoll的封装。注意:封装后的epoll_wait前3个参数与原来一致,只是没有了第4个超时参数,将根据时间链表设置超时参数。
总结一下:
经过对epoll_wait的封装后,tepoll_wait的基本用法没多大改变。本文方法本质上是通过epoll_wait的超时机制设置,由近及远设置定时器,先设置较近触发的定时值,若超时则调用超时回调,若没超时则把定时链表中的对应节点删除。每次数据发送后,需要把相应的超时时间加入超时链表中。
优点:1、不用设置额外的定时器;2、可以保持epoll的编程模型而不需要作太大的修改。
epoll模型是当前Linux网络编程的主流模型,可以高效解决多个事件并发的问题。在进行网络编程时,往往要对每一个发出的数据包进行ACK确认,若在指定的时间内没有收到ACK,则需要重传或者丢弃该数据包。那么如果在epoll模型中实现该功能呢?
先来看看传统的做法:程序维护一个“已发出但是没收到ACK”的数据包记录R,记录包括数据包内容、数据包发送的时间戳t以及超时时间T。当需要进行数据包发送时,在发出数据包的同时把该数据包加入记录R,接下来程序继续执行。在这个方法中,程序需要通过一种手段定时检测当前时间now是否大于记录R中的每一个数据包的t+T,若大于,则说明数据包接收ACK超时,可以通过多线程技术或者在程序主循环中每循环一次检测一次。
现在的问题是:一般的epoll模型,程序都会阻塞在epoll_wait调用上,程序没有办法循环检测记录R,有人会说,epoll_wait不是有超时时间么?的确,但是这个超时只能针对一个数据包的情况,试想一下,若程序当前依次发出了10个数据包,各自的超时时间分别为3秒、5秒、2秒等等,那么epoll_wait的超时时间应该设置多长呢?答案是没有办法。因为前面的数据包万一设置的epoll_wait超时时间比较长,后面来了一个超时时间短的,由于epoll_Wait的阻塞,就会错失后面超时时间短的数据包的重发机会。
因此,本文实现了一个基于升序时间链表的tepoll,可以对不同超时间隔的数据包进行超时重传。主要思想是:
1、在发送数据包的同时,把数据包以及其超时值封装成一个“已发送但未应答”的对象,并根据超时值插入升序时间链表;
2、对epoll进行封装,使得其在每一次调用前,先查询升序时间链表中最近的超时时间T,并把该超时时间T作为epoll_wait的超时值;
3、若在超时前epoll_wait返回,说明收到ACK,根据ACK与数据包的对应关系删除时间链表中的记录;
4、若epoll_wait超时,则遍历时间链表,查找now>t+T的数据包(此时最少有一个数据包超时),调用send将其进行重发,同时执行步骤1;
下面贴上主要代码进行讲解:
struct tevent_t { tevent_t *next;//指向下一个节点 struct timeval tv;//超时时间 void ( *func )( void * );//超时回调函数 void *arg;//回调函数参数 unsigned int id;//超时定时器ID };//升序时间链表节点 static tevent_t *active = NULL; /* active timers */ static tevent_t *free_list = NULL; /* inactive timers */ /* end declarations */ //从free_list中分配一个空闲的节点 static tevent_t *allocate_timer( void ) { tevent_t *tp; if ( free_list == NULL ) /* need new block of timers? */ { free_list = (tevent_t *)malloc( NTIMERS * sizeof( tevent_t ) ); if ( free_list == NULL ) error( 1, 0, "couldn't allocate timers\n" ); for ( tp = free_list; tp < free_list + NTIMERS - 1; tp++ ) tp->next = tp + 1; tp->next = NULL; } tp = free_list; /* allocate first free */ free_list = tp->next; /* and pop it off list */ return tp; }
//在发送数据后调用,往时间链表中加入一个节点 unsigned int timeout( void ( *func )( void * ), void *arg, int ms ) { tevent_t *tp; tevent_t *tcur; tevent_t **tprev; static unsigned int id = 1; /* timer ID */ tp = allocate_timer(); tp->func = func; tp->arg = arg; if ( gettimeofday( &tp->tv, NULL ) < 0 ) error( 1, errno, "timeout: gettimeofday failure" ); tp->tv.tv_usec += ms * 1000; if ( tp->tv.tv_usec > 1000000 ) { tp->tv.tv_sec += tp->tv.tv_usec / 1000000; tp->tv.tv_usec %= 1000000; } for ( tprev = &active, tcur = active; tcur && !timercmp( &tp->tv, &tcur->tv, < ); /* XXX */ tprev = &tcur->next, tcur = tcur->next ) { ; } *tprev = tp; tp->next = tcur; tp->id = id++; /* set ID for this timer */ return tp->id; } /* end timeout */ /* untimeout - cancel a timer */
//收到ACK后调用,从时间链表中删除一个节点 void untimeout( unsigned int id ) { tevent_t **tprev; tevent_t *tcur; for ( tprev = &active, tcur = active; tcur && id != tcur->id; tprev = &tcur->next, tcur = tcur->next ) { ; } if ( tcur == NULL ) { error( 0, 0, "untimeout called for non-existent timer (%d)\n", id ); return; } *tprev = tcur->next; tcur->next = free_list; free_list = tcur; }
以下是对epoll的封装。注意:封装后的epoll_wait前3个参数与原来一致,只是没有了第4个超时参数,将根据时间链表设置超时参数。
int tepoll_wait( int epollfd, epoll_event *events, int max_event_number ) { struct timeval now; struct timeval tv; struct timeval *tvp; tevent_t *tp; int n; for ( ;; ) { if ( gettimeofday( &now, NULL ) < 0 ) error( 1, errno, "tselect: gettimeofday failure" ); //若时间链表中有超时事件,则取出第一个超时时间 if ( active ) { tv.tv_sec = active->tv.tv_sec - now.tv_sec;; tv.tv_usec = active->tv.tv_usec - now.tv_usec; if ( tv.tv_usec < 0 ) { tv.tv_usec += 1000000; tv.tv_sec--; } tvp = &tv; } else tvp = NULL; if(tvp == NULL) n = epoll_wait( epollfd, events, max_event_number, -1 );//若没有超时事件,则一直等待 else n = epoll_wait( epollfd, events, max_event_number, tvp->tv_sec ); if ( n < 0 ) return -1; //在超时前有事件到达,函数返回事件总数,与epoll语义相同 if ( n > 0 ) return n; //超时了,说明超时链表上最起码有一个节点需要触发超时处理回调函数 while ( active && !timercmp( &now, &active->tv, < ) ) { active->func( active->arg );//调用回调 tp = active; active = active->next; tp->next = free_list;//从超时链表中删除,返还给空闲链表 free_list = tp; } } }main函数中进行如下调用:与普通的epoll_wait用法差不多,只是当tepoll_wait返回0时不再表示超时,而是出错。(epoll_wait永远不会超时返回,因为tepoll_wait中若超时了,会一直死循环)。
for ( ;; ) { rc = tepoll_wait( epollfd, events, MAX_EVENT_NUMBER); if ( rc < 0 ) error( 1, errno, "tepoll failure" ); if ( rc == 0 ) error( 1, 0, "tepoll returned with no events\n" ); for ( i= 0; i < rc; i++ ) { int sockfd = events[i].data.fd;//有新的数据到达 if ( sockfd == s ) { //printf( "event trigger once\n" ); cnt=0; while( 1 ) { int ret = recv( s, ack+cnt, ACKSZ-cnt, 0 );//接收数据,注意epoll中ET模式的用法 if( ret < 0 ) { if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )//接收数据,注意epoll中ET模式的用法 { //printf( "read later\n" ); break; } printf("ret < 0\n"); close( sockfd ); break; } else if( ret == 0 ) { printf("ret == 0\n"); close( s ); } else { cnt+=ret; } } memcpy( &mid, ack + 1, sizeof( u_int32_t ) ); mp = findmsgrec( mid );//根据ACK中的句柄ID,找出时间链表上的对应节点,删除之 if ( mp != NULL ) { untimeout( mp->id ); /* cancel timer */ freemsgrec( mp ); /* delete saved msg */ } } else if ( events[i].events & EPOLLIN && sockfd == 0)//此处为了模拟,把键盘输入数据作为发出的数据包,附带有一个发送句柄 { int ret=0; mp = getfreerec(); ret = read( 0, mp->pkt.buf, sizeof( mp->pkt.buf ) ); if ( ret < 0 ) error( 1, errno, "read failure" ); mp->pkt.buf[ ret ] = '\0'; mp->pkt.cookie = msgid++;//加入发送句柄 mp->pkt.len = htonl( sizeof( u_int32_t ) + ret ); if ( send( s, &mp->pkt, 2 * sizeof( u_int32_t ) + ret, 0 ) < 0 ) error( 1, errno, "send failure" ); mp->id = timeout( ( tofunc_t )lost_ACK, mp, T1 );//数据发送后设置ACK超时 } else { printf( "something else happened \n" ); } } }以上只提供了主要代码,若有需要可联系我获取全部代码,QQ:120150200
总结一下:
经过对epoll_wait的封装后,tepoll_wait的基本用法没多大改变。本文方法本质上是通过epoll_wait的超时机制设置,由近及远设置定时器,先设置较近触发的定时值,若超时则调用超时回调,若没超时则把定时链表中的对应节点删除。每次数据发送后,需要把相应的超时时间加入超时链表中。
优点:1、不用设置额外的定时器;2、可以保持epoll的编程模型而不需要作太大的修改。
相关文章推荐
- (笔记)linux下用select函数的超时实现timer(时间定时器s)
- Linux通过该控制台的字符流 实现打印出的字符不同颜色
- Linux下超时重传时间(RTO)的实现探究
- Linux之通过NTP协议实现服务器时间同步的源码
- [linux]通过ssh远程设定各服务器时间,从而实现集群时间同步
- Linux下超时重传时间(RTO)的实现探究
- Linux网络编程--使用epoll,共享内存技术实现高性能的聊天室程序
- 通过 一个多层交换机 实现 不同网段主机 互通
- Linux 下通过脚本实现远程自动备份
- Linux-视频监控系统(3)-Epoll框架的实现
- 将不同的html页面组合成一个——通过框架标签frameset和frame实现
- 通过多线程为基于 .NET 的应用程序实现响应迅速的用户[改进]
- linux下通过脚本实现自动重启程序
- Oracle 11g 通过创建物化视图实现不同数据库间的表数据同步
- Linux内核--网络栈实现分析(六)--应用层获取数据包(上)
- Linux时间子系统之六:高精度定时器(HRTIMER)的原理和实现
- C语言实现linux网卡检测-改进版
- 第六课 自己实现路由改进,针对不同请求的路径进行响应
- Linux下通过bonding技术实现网络负载均衡及冗余
- 嵌入式Linux上通过boa服务器实现cgi/html的web上网(转)