linux AIO libaio和epoll实现非阻塞模型
2014-07-15 10:49
447 查看
epoll是Linux内核为处理大批句柄而作改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著的减少程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。因为它会复用文件描述符集合来传递结果而不是迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一个原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select\poll那种IO事件的电平触发(Level
Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提供应用程序的效率。
libaio和epoll的结合
在异步编程中,任何一个环节的阻塞都会导致整个程序的阻塞,所以一定要避免在io_getevents调用时阻塞式的等待。还记得io_iocb_common中的flags和resfd吗?看看libaio是如何提供io_getevents和事件循环的结合:
void io_set_eventfd(struct iocb *iocb, int eventfd)
{
iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
iocb->u.c.resfd = eventfd;
}
这里的resfd是通过系统调用eventfd生成的。
int eventfd(unsigned int initval, int flags);
eventfd是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是有内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取。
有了eventfd,就可以很好地将libaio和epoll事件循环结合起来:
C代码
1. 创建一个eventfd
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2. 将eventfd设置到iocb中
io_set_eventfd(iocb, efd);
3. 交接AIO请求
io_submit(ctx, NUM_EVENTS, iocb);
4. 创建一个epollfd,并将eventfd加到epoll中
epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);
5. 当eventfd可读时,从eventfd读出完成IO请求的数量,并调用io_getevents获取这些IO
read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
epoll 编程实例,测试通过:
Java代码
#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
#include <stdio.h>
#include <errno.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <inttypes.h>
#define TEST_FILE "aio_test_file"
#define TEST_FILE_SIZE (127 * 1024)
#define NUM_EVENTS 128
#define ALIGN_SIZE 512
#define RD_WR_SIZE 1024
struct custom_iocb
{
struct iocb iocb;
int nth_request;
};
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n",
iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}
int main(int argc, char *argv[])
{
int efd, fd, epfd;
io_context_t ctx;
struct timespec tms;
struct io_event events[NUM_EVENTS];
struct custom_iocb iocbs[NUM_EVENTS];
struct iocb *iocbps[NUM_EVENTS];
struct custom_iocb *iocbp;
int i, j, r;
void *buf;
struct epoll_event epevent;
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd == -1) {
perror("eventfd");
return 2;
}
fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
if (fd == -1) {
perror("open");
return 3;
}
ftruncate(fd, TEST_FILE_SIZE);
ctx = 0;
if (io_setup(8192, &ctx)) {
perror("io_setup");
return 4;
}
if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
perror("posix_memalign");
return 5;
}
printf("buf: %p\n", buf);
for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
iocbps[i] = &iocbp->iocb;
io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
io_set_eventfd(&iocbp->iocb, efd);
io_set_callback(&iocbp->iocb, aio_callback);
iocbp->nth_request = i + 1;
}
if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
perror("io_submit");
return 6;
}
epfd = epoll_create(1);
if (epfd == -1) {
perror("epoll_create");
return 7;
}
epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
perror("epoll_ctl");
return 8;
}
i = 0;
while (i < NUM_EVENTS) {
uint64_t finished_aio;
if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
perror("epoll_wait");
return 9;
}
if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
perror("read");
return 10;
}
printf("finished io number: %"PRIu64"\n", finished_aio);
while (finished_aio > 0) {
tms.tv_sec = 0;
tms.tv_nsec = 0;
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
if (r > 0) {
for (j = 0; j < r; ++j) {
((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
}
i += r;
finished_aio -= r;
}
}
}
close(epfd);
free(buf);
io_destroy(ctx);
close(fd);
close(efd);
remove(TEST_FILE);
return 0;
}
说明:
1. 在centos 6.2 (libaio-devel 0.3.107-10) 上运行通过
2. struct io_event中的res字段表示读到的字节数或者一个负数错误码。在后一种情况下,-res表示对应的
errno。res2字段为0表示成功,否则失败
3. iocb在aio请求执行过程中必须是valid的
4. 在上面的程序中,通过扩展iocb结构来保存额外的信息(nth_request),并使用iocb.data
来保存回调函数的地址。如果回调函数是固定的,那么也可以使用iocb.data来保存额外信息。
原文 :http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html
Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提供应用程序的效率。
libaio和epoll的结合
在异步编程中,任何一个环节的阻塞都会导致整个程序的阻塞,所以一定要避免在io_getevents调用时阻塞式的等待。还记得io_iocb_common中的flags和resfd吗?看看libaio是如何提供io_getevents和事件循环的结合:
void io_set_eventfd(struct iocb *iocb, int eventfd)
{
iocb->u.c.flags |= (1 << 0) /* IOCB_FLAG_RESFD */;
iocb->u.c.resfd = eventfd;
}
这里的resfd是通过系统调用eventfd生成的。
int eventfd(unsigned int initval, int flags);
eventfd是linux 2.6.22内核之后加进来的syscall,作用是内核用来通知应用程序发生的事件的数量,从而使应用程序不用频繁地去轮询内核是否有时间发生,而是有内核将发生事件的数量写入到该fd,应用程序发现fd可读后,从fd读取该数值,并马上去内核读取。
有了eventfd,就可以很好地将libaio和epoll事件循环结合起来:
C代码
1. 创建一个eventfd
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
2. 将eventfd设置到iocb中
io_set_eventfd(iocb, efd);
3. 交接AIO请求
io_submit(ctx, NUM_EVENTS, iocb);
4. 创建一个epollfd,并将eventfd加到epoll中
epfd = epoll_create(1);
epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent);
epoll_wait(epfd, &epevent, 1, -1);
5. 当eventfd可读时,从eventfd读出完成IO请求的数量,并调用io_getevents获取这些IO
read(efd, &finished_aio, sizeof(finished_aio);
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
epoll 编程实例,测试通过:
Java代码
#define _GNU_SOURCE
#define __STDC_FORMAT_MACROS
#include <stdio.h>
#include <errno.h>
#include <libaio.h>
#include <sys/eventfd.h>
#include <sys/epoll.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <stdint.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <inttypes.h>
#define TEST_FILE "aio_test_file"
#define TEST_FILE_SIZE (127 * 1024)
#define NUM_EVENTS 128
#define ALIGN_SIZE 512
#define RD_WR_SIZE 1024
struct custom_iocb
{
struct iocb iocb;
int nth_request;
};
void aio_callback(io_context_t ctx, struct iocb *iocb, long res, long res2)
{
struct custom_iocb *iocbp = (struct custom_iocb *)iocb;
printf("nth_request: %d, request_type: %s, offset: %lld, length: %lu, res: %ld, res2: %ld\n",
iocbp->nth_request, (iocb->aio_lio_opcode == IO_CMD_PREAD) ? "READ" : "WRITE",
iocb->u.c.offset, iocb->u.c.nbytes, res, res2);
}
int main(int argc, char *argv[])
{
int efd, fd, epfd;
io_context_t ctx;
struct timespec tms;
struct io_event events[NUM_EVENTS];
struct custom_iocb iocbs[NUM_EVENTS];
struct iocb *iocbps[NUM_EVENTS];
struct custom_iocb *iocbp;
int i, j, r;
void *buf;
struct epoll_event epevent;
efd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (efd == -1) {
perror("eventfd");
return 2;
}
fd = open(TEST_FILE, O_RDWR | O_CREAT | O_DIRECT, 0644);
if (fd == -1) {
perror("open");
return 3;
}
ftruncate(fd, TEST_FILE_SIZE);
ctx = 0;
if (io_setup(8192, &ctx)) {
perror("io_setup");
return 4;
}
if (posix_memalign(&buf, ALIGN_SIZE, RD_WR_SIZE)) {
perror("posix_memalign");
return 5;
}
printf("buf: %p\n", buf);
for (i = 0, iocbp = iocbs; i < NUM_EVENTS; ++i, ++iocbp) {
iocbps[i] = &iocbp->iocb;
io_prep_pread(&iocbp->iocb, fd, buf, RD_WR_SIZE, i * RD_WR_SIZE);
io_set_eventfd(&iocbp->iocb, efd);
io_set_callback(&iocbp->iocb, aio_callback);
iocbp->nth_request = i + 1;
}
if (io_submit(ctx, NUM_EVENTS, iocbps) != NUM_EVENTS) {
perror("io_submit");
return 6;
}
epfd = epoll_create(1);
if (epfd == -1) {
perror("epoll_create");
return 7;
}
epevent.events = EPOLLIN | EPOLLET;
epevent.data.ptr = NULL;
if (epoll_ctl(epfd, EPOLL_CTL_ADD, efd, &epevent)) {
perror("epoll_ctl");
return 8;
}
i = 0;
while (i < NUM_EVENTS) {
uint64_t finished_aio;
if (epoll_wait(epfd, &epevent, 1, -1) != 1) {
perror("epoll_wait");
return 9;
}
if (read(efd, &finished_aio, sizeof(finished_aio)) != sizeof(finished_aio)) {
perror("read");
return 10;
}
printf("finished io number: %"PRIu64"\n", finished_aio);
while (finished_aio > 0) {
tms.tv_sec = 0;
tms.tv_nsec = 0;
r = io_getevents(ctx, 1, NUM_EVENTS, events, &tms);
if (r > 0) {
for (j = 0; j < r; ++j) {
((io_callback_t)(events[j].data))(ctx, events[j].obj, events[j].res, events[j].res2);
}
i += r;
finished_aio -= r;
}
}
}
close(epfd);
free(buf);
io_destroy(ctx);
close(fd);
close(efd);
remove(TEST_FILE);
return 0;
}
说明:
1. 在centos 6.2 (libaio-devel 0.3.107-10) 上运行通过
2. struct io_event中的res字段表示读到的字节数或者一个负数错误码。在后一种情况下,-res表示对应的
errno。res2字段为0表示成功,否则失败
3. iocb在aio请求执行过程中必须是valid的
4. 在上面的程序中,通过扩展iocb结构来保存额外的信息(nth_request),并使用iocb.data
来保存回调函数的地址。如果回调函数是固定的,那么也可以使用iocb.data来保存额外信息。
原文 :http://blog.sina.com.cn/s/blog_6b19f21d0100znza.html
相关文章推荐
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- linux基础编程:IO模型:阻塞/非阻塞/IO复用 同步/异步 Select/Epoll/AIO
- Linux非阻塞IO(八)使用epoll重新实现非阻塞的回射服务器
- Linux下基于EPOLL 模型,实现用户登录,客户端采用QT
- Linux非阻塞IO(八)使用epoll重新实现非阻塞的回射服务器
- linux网络模型-阻塞i/o代码实现
- Linux Socket 事件触发模型 epoll 示例 这里会写一个用C语言的TCP服务器的完全实现的简单程序
- Linux非阻塞IO(七)使用epoll重新实现客户端
- Linux网络服务器epoll模型的socket通讯的实现(一)
- Linux非阻塞IO(七)使用epoll重新实现客户端
- linux网络模型非阻塞i/o模型代码实现
- linux 下阻塞睡眠等待poll函数简单实现
- 父进程非阻塞回收子进程(适用LINUX下C语言的client-server模型)
- Linux中断实现方法(三):中断处理模型
- Linux下select, poll和epoll IO模型的详解
- 通过LD_ASSUME_KERNEL设置Linux的线程实现模型
- linux epoll 模型介绍及程序实例
- linux下epoll如何实现高效处理百万句柄的