您的位置:首页 > 运维架构 > Linux

Linux内核开发之异步通知与异步I/O《来自linux设备开发详解》

2012-10-15 21:34 323 查看
阻塞I/O意味着一直等待设备可访问再访问,非阻塞I/O意味着使用poll()来查询是否可访问,而异步通知则意味着设备通知应用程序自身可访问。(异步通知:很简单,一旦设备准备好,就主动通知应用程序,这种情况下应用程序就不需要查询设备状态,这是不是特像硬件上常提的“中断的概念”。上边比较准确的说法其实应该叫做“信号驱动的异步I/O”,信号是在软件层次上对中断机制的一种模拟。)

说到信号,在应用程序中,为了捕获信号(还捕获呢,不就是一个处理吗)可以使用signal()函数来设置对应的信号的处理函数。函数原型是

 void(*signal(intsigno,void(*func)(int)))(int)  这个看起来费劲吧,不光你,我看着也费劲,没关系,给你来个例子:

voidsigterm_handler(intsigno)
{
chardata[MAX_LEN];
intlen;
len=read(STDIN_FILENO,&data,MAX_LEN);
data[len]=0;
printf("Inputavailable:%s\n",data);
exit(0);
}
intmain(void)
{
intoflags;
//启动信号驱动机制
signal(SIGIO,sigterm_handler);
fcntl(STDIN_FILENO,F_SETOWN,getpid());
oflags=fcntl(STDIN_FILENO,F_GETFL);
fcntl(STDIN_FILENO,F_SETFL,oflags|FASYNC);
//建立一个死循环,防止程序结束
while(1);
return0;
}


看了这段代码明白啥意思了吧,我也不多少了,咱们继续往下走..为了一个用户在用户空间中能处理一个设备释放的信号,它必须完成一下3份工作:

1)通过F_SETOWN控制指令设置设备文件的拥有者为本进程,这样从设备驱动中发出的信号才能被本进程收到。

2)通过F_SETFLIO控制命令设置设备文件支持FASYNC,即异步通知模式。

3)通过signal()链接信号和信号处理函数。

当然,如果你了解linux/Unix信号机制的话,你可能会问为啥没说sigaction函数,其实没关系,作用差不多,想知道的话,自己看书Apue的P261.

有了信号的发送,那么就一定得有信号的释放了:

在设备驱动和应用程序的异步通知交互中,仅仅在应用程序端捕获信号是不够的,因为信号没有的源头是在驱动端,因此要在适当的时机让设备驱动释放信号。

为了使设备支持异步通知机制,驱动程序中涉及三个操作:

1)支持F_SETOWN命令,能在这个控制命令处理中设置filp->f_owner为对应的进程ID。不过此项工作已由内核完成,设备驱动无须处理。

2)支持F_SETFL命令的处理,每当FASYNC标志改变时,驱动程序中fasync()函数将得以进行。因此,驱动程序必须实现fasync()函数。

3)在设备资源可获得时,调用kill_fasync()函数激发相应的信号。

驱动程序中上面的三步是和应用程序是一一对应的。如下图:





设备驱动中异步通知编程还是比较简单的,主要就是一些数据结构,和两个函数:

数据结构:fasync_struct结构体

函数:1)处理FASYNC标志变更的函数intfasync_helper(intfd,structfile*filp,intmode,structfasync_struct**fa);

       2)释放信号用的函数voidkill_fasync(structfasync_struct**fa,intsig,intband);

和其他设备驱动一样,一般将fasync_struct放到设备结构体中。下边是典型模版:

structxxx_dev
{
structcdevcdev;
...
structfasync_struct*async_queue;//异步结构体
}


而在驱动的fasync()函数中,只需要简单的将该参数的3个参数以及fasync_struct结构体指针的指针作为第四个参数传给fasync_helper函数即可.下边是典型模版:

staticintxxx_fasync(intfd,structfile*filp,intmode)
{
structxxx_dev*dev=filp->private_data;
returnfasync_helper(fd,filp,mode,&dev->async_queue);
}
一旦设备资源可以获得时,应该调用kill_fasync()释放SIGIO信号,可读时第三个参数设置为POLL_IN,可写时第三个参数设置为POLL_OUT,下边是释放信号的典型模版:


staticssize_txxx_write(structfile*filp,constchar__user*buf,size_tcount,loff_t*f_ops)
{
structxxx_dev*dev=filp->private_data;
....
//产生异步信号
if(dev->async_queue)
{
kill_fasync(&dev->async_queue,SIGIO,POLL_IN);
}
..
}


最后,在文件关闭时,即在设备驱动的release函数中,应调用设备驱动的fasync()函数将文件从异步通知的列表中删除,下边是设备驱动的释放函数的典型模版:

staticintxxx_release(structinode*inode,structfile*filp)
{
structxxx_dev*dev=filp->private_data;
//将文件从异步通知列表中删除
xxx_fasync(-1,filp,0);
...
return0;
}


structglobalfifo_dev
{
structcdevcdev;/*cdev结构体*/
unsignedintcurrent_len;/*fifo有效数据长度*/
unsignedcharmem[GLOBALFIFO_SIZE];/*全局内存*/
structsemaphoresem;/*并发控制用的信号量*/
wait_queue_head_tr_wait;/*阻塞读用的等待队列头*/
wait_queue_head_tw_wait;/*阻塞写用的等待队列头*/
structfasync_struct*async_queue;/*异步结构体指针,用于读*/
};
/*文件释放函数*/
intglobalfifo_release(structinode*inode,structfile*filp)
{
/*将文件从异步通知列表中删除*/
globalmem_fasync(-1,filp,0);
return0;
}

staticintglobalfifo_fasync(intfd,structfile*filp,intmode)
{
structglobalfifo_dev*dev=filp->private_data;
returnfasync_helper(fd,filp,mode,&dev->async_queue);
}

/*globalfifo写操作*/
staticssize_tglobalfifo_write(structfile*filp,constchar__user*buf,
size_tcount,loff_t*ppos)
{
structglobalfifo_dev*dev=filp->private_data;//获得设备结构体指针
intret;
DECLARE_WAITQUEUE(wait,current);//定义等待队列

down(&dev->sem);//获取信号量
add_wait_queue(&dev->w_wait,&wait);//进入写等待队列头

/*等待FIFO非满*/
if(dev->current_len==GLOBALFIFO_SIZE)
{
if(filp->f_flags&O_NONBLOCK)
//如果是非阻塞访问
{
ret=-EAGAIN;
gotoout;
}
__set_current_state(TASK_INTERRUPTIBLE);//改变进程状态为睡眠
up(&dev->sem);

schedule();//调度其他进程执行
if(signal_pending(current))
//如果是因为信号唤醒
{
ret=-ERESTARTSYS;
gotoout2;
}

down(&dev->sem);//获得信号量
}

/*从用户空间拷贝到内核空间*/
if(count>GLOBALFIFO_SIZE-dev->current_len)
count=GLOBALFIFO_SIZE-dev->current_len;

if(copy_from_user(dev->mem+dev->current_len,buf,count))
{
ret=-EFAULT;
gotoout;
}
else
{
dev->current_len+=count;
printk(KERN_INFO"written%dbytes(s),current_len:%d\n",count,dev
->current_len);

wake_up_interruptible(&dev->r_wait);//唤醒读等待队列
/*产生异步读信号*/
if(dev->async_queue)
kill_fasync(&dev->async_queue,SIGIO,POLL_IN);

ret=count;
}

out:up(&dev->sem);//释放信号量
out2:remove_wait_queue(&dev->w_wait,&wait);//从附属的等待队列头移除
set_current_state(TASK_RUNNING);
returnret;
}

下面再给出测试程序:


#include...
//接收到异步读信号的动作
voidinput_handler(intsignum)
{
printf("Receiveasignalfromglobalfifo,signalnum:%d\n",signum);
}

intmain()
{
intfd,oflags;
fd=open("/dev/globalfifo",O_RDWR,S_IRUSR|S_IWUSR);
if(fd!=-1)
{
//启动信号驱动机制
signal(SIGIO,input_handler);//让input_handler()处理SIGIO信号
fcntl(fd,F_SETOWN,getpid());
oflags=fcntl(fd,F_GETFL);
fcntl(fd,F_SETFL,oflags|FASYNC);
while(1)
{
sleep(100);
}
}
else
{
printf("deviceopenfailure\n");
}
}


当我们加载完驱动并创建完设备节点后,运行上述程序,每当通过echo向/dev/globalfilfo写入新的数据后,input_handler将会被调用。如下所示:
echo0>/dev/globalfifo
receiveasignalfromglobalfifo,signalnum:29

 
echo0>/dev/globalfifo
receiveasignalfromglobalfifo,signalnum:29

 

echo0>/dev/globalfifo
receiveasignalfromglobalfifo,signalnum:29
 

所谓AIO就是AsynchronousInput/Output异步输入/输出,基本思想是允许进程发起很多的I/O操作,而不用阻塞或等待任何操作的完成,稍后或在接收到I/O操作完成的通知时,进程就可以检索I/O操作的结果。

在异步非阻塞IO中,我们是可以同时发起多个传输操作。这需要每个操作都有一个唯一的上下文,这样才能在它们完成时区分到底是哪个传输操作完成了。在AIO中,

通过aiocb(AIOIOcontrolBlock)结构体进行区分,这个结构体如下:

structaiocb{
int aio_fildes; /*Filedescriptor*/
off_t aio_offset; /*Fileoffset*/
volatilevoid* aio_buf; /*Locationofbuffer*/
size_t aio_nbytes; /*Lengthoftransfer*/
int aio_reqprio; /*Requestpriorityoffset*/
structsigevent aio_sigevent; /*Signalnumberandvalue*/
int aio_lio_opcode; /*Operationtobeperformed*/
};


从上边我们可以看到,这个结构体包含了有关传输的所有信息,包括数据准备的用户缓冲区。在产生IO通知时,aiocb结构就被用来唯一标识所完成的IO操作。

AIO系列API中主要有下边几个函数:

1.intaio_read(structaiocb*aiocbp)

  该函数请求对一个有效的文件描述符进行异步读操作。在请求进行排队之后会立即返回,如果执行成功,返回值就为0,错误则返回-1并设置errno的值。

2.intaio_write(structaiocb*aiocbp)
  该函数请求一个异步写操作,它会立即返回说明请求已经进行排队,成功返回0,失败返回为-1,并设置相应的error值。

3.intaio_error(structaiocb*aiocbp)

  该函数用来确定请求的状态,可以返回EINPROGRESS(说明请求尚未完成),ECANCELLED(请求被应用程序取消了),-1(说明发生了错误,具体错误原因由error记录)。

4.ssize_taio_return(structaiocb*aiocbp)

  由于并没有阻塞在read调用上,所以我们不能立即返回这个函数的返回状态,这是就要使用这个函数了,需要注意的是只有在aio_error调用确定请求已经完成(可能

已经完成,也可能发生了错误)之后,才能调用这个函数,这个函数的返回值就相当于同步情况下read或write系统调用的返回值(所传输的字节数,如果发生错误,则返回-1)。

5.intaio_suspend(conststructaiocb*constcblist[],intn,conststructtimespec*timeout)

  用户可以通过这个函数来来挂起(或阻塞)调用进程,直到异步请求完成为止,此时会产生一个信号,或者发生其他超时操作。调用者提供了一个aiocb引用列表,其中任何一个完成都会导致给函数返回。

6.intaio_cancel(intfd,structaiocb*aiocbp)

  该函数允许用户取消对某个文件描述符执行的一个或所有的IO请求。

  如果要取消一个请求,用户需提供文件描述符和aiocb引用,如果这个请求被成功取消了,则返回AIO_CANCELED,如果该请求完成了,返回AIO_NOTCANCELED.

  如果要取消对某个给定文件描述符的所有请求,用户需要提供这个文件的描述符以及一个aiocbp的NULL引用,如果所有请求被成功取消了,则返回AIO_CANCELED

,只要至少有一个没被取消,这个函数就返回AIO_NOT_CANCELED.如果没有一个请求可以被取消,该函数就会返回AIO_ALLDONE.

  然后,可以使用aio_error来验证每个AIO请求,如果某个请求已经被返回了,那么aio_error就返回-1,并且error会被设置为ECANCELED.

7.intlio_listio(intmode,structaiocb*list[],intnent,structsigevent*sig)

 这个操作使得用户可以在一个系统调用(一次内核上下文切换中启动大量的I/O操作)。其中,mode参数可以是LIO_WAIT或LIO_NOWAIT,前者会阻塞这个调用,直到所有的IO都完成为止,在操作进行排队之后,LIO_NOWAIT就会返回,list是一个aiocb引用的列表,最大元素的个数有nent定义的。如果list的元素为NULL,lio_lis

tio()将被忽略。

光说理论也不行,是不?现在来点实际点的:

a)用户空间读例程:

#include<aio.h>
..
intfd,set;
structaiocbmy_aiocb;

fd=open("file.txt",O_RDONLY);
if(fd<0)
{
perror("open");
}
//清零aiocb结构体
bzero((char*)&my_aiocb,sizeof(structaiocb));
//为aiocb请求分配数据缓冲区
my_aiocb.aio_buf=malloc(BUFSIZE+1);
if(!my_aiocb.aio_buf)
perror("malloc");
//初始化aiocb的成员
my_aiocb.aio_fildes=fd;
my_aiocb.aio_nbytes=BUFSIZE;
my_aiocb.aio_offset=0;

ret=aio_read(&my_aiocb);
if(ret<0)
perror("aio_read");
while(aio_error(&my_aiocb)==EINPROGRESS)
;
if((ret=aio_return(&my_iocb)))
{
//获得异步读的返回值
}
else
{
读失败,分析errror
}


b)用户空间异步IOaio_suspend()函数使用例程

structaioct*cblist(MAX_LIST)
//清零aioct结构链表
bzero((char*)cblist,sizeof(cblist));
//将一个或更多的aiocb放入aioct结构体链表
cblist[0]=&my_aiocb;
ret=aio_read(&my_aiocb);
ret=aio_suspend(cblist,MAX_LIST,NULL);


c)用户空间异步IOlio_list()函数使用例程

structaiocbaiocb1,aiocb2;
structaiocb*list[MAX_LIST];
...
//准备第一个aiocb
aiocb1.aio_fildes=fd;
aiocb1.aio_buf=malloc(BUFSIZE+1);
aiocb1.aio_nbytes=BUFSIZE;
aiocb1.aio_offset=next_offset;
aiocb1.aio_lio_opcode=LIO_READ;//异步读操作
...//准备多个aiocb
bzero((char*)list,sizeof(list));
//将aiocb填入链表
list[0]=&aiocb1;
list[1]=&aiocb2;
...
ret=lio_listio(LIO_WAIT,list,MAX_LIST,NULL);//发起大量IO操作


 

先上代码:使用信号作为AIO异步IO通知机制

voidsetup_io(..)
{
intfd;
structsigactionsig_act;
structaiocbmy_aiocb;
...
//设置信号处理函数
sigemptyset(&sig_act.sa_mask);
sig_act.sa_flags=SA_SIGINFO;
sig_act.sa_sigaction=aio_completion_handler;
//设置AIO请求
bzero((char*)&my_aiocb,sizeof(structaiocb));
my_aiocb.aio_flags=fd;
my_aiocb.aio_buf=malloc(BUF_SIZE+1);
my_aiocb.aio_nbytes=BUF_SIZE;
my_aiocb.offset=next_offset;
//连接AIO请求和信号处理函数
my_aiocb.aio_sigevent.sigev_notify=SIGEV_SIGNVAL;
my_aiocb.aio_sigevent.sigev_signo=SIGIO;
my_aiocb.aic_sigevent.sigev_value.sival_ptr=&my_aiocb;
//将信号和处理函数绑定
ret=sigaction(SIGION,&sig_act,NULL);
...
ret=aio_read(&my_aiocb);

//信号处理函数
voidaio_completion_handler(intsigno,siginfo_t*info,void*context)
{
structaiocb*req;
//确定是我们需要的信号
if(info->si_signo==SIGIO)
{
req=(structaiocb*)info->si_value.sival_ptr;//获得aiocb;
//请求的操作是否完成
if(aio_error(req)==0)
{
ret=aio_return(req);
}
}
return;
}


从上边可以看到,使用AIO的应用程序同样需要定义信号处理函数,在指定的信号被产生时会触发调用这个处理程序。

“那么是不是就只能使用信号这种方式呢,我记得以前没一个知识点你都给我讲了好多方法,这个歌也不例外吧”小王说。

“嗯,真聪明,就喜欢聪明的女生”听到小王也懂得开动脑子了,我也要表示表示不是。

再上代码:使用回调函数最为AIO的通知

voidsetup_io(..)
{
...//同上
//连接AIO请求和线程回调函数
my_aiocb.aio_sigevent.sigev_notify=SIGEV_THREAD;
my_aiocb.aio_sigevent.notify_function=aio_completion_handler;

//设置回调函数
my_aiocb.aio_sigevent.notify_attributes=NULL;
my_aiocb.aio_sigevent.sigev_value.sival_ptr=&my_aiocb;
...
ret=aio_read(&my_aiocb);

//信号处理函数
voidaio_completion_handler(intsigno,siginfo_t*info,void*context)
{
structaiocb*req;
req=(structaiocb*)sigval.sival_ptr;//获得aiocb;
//请求的操作是否完成
if(aio_error(req)==0)
{
ret=aio_return(req);
}
return;
}


上述程序在创建aiocb请求之后,使用SIGEV_THREAD请求了一个线程回调函数作为通知方法。在回调函数中。通过(structaiocb*)info->si_value.sival_ptr可以获得对应的aiocb指针,使用AIO函数可验证请求是否已经完成。

在Linux内核中,每个IO请求都对应一个kiocb结构体,其ki_filp成员指向对应的file指针,通过is_sync_kiocb可以判断某Kiocb时候为同步IO请求,如果非真,表示是异步IO请求。

 块设备和网络设备本身就是异步的。只有字符设备驱动必须明确指出应支持AIO.需要说明的是AIO对于大多数字符设备而言都不是必须的。只有少数才需要。

 在字符设备驱动程序中,file_operations包含了3个和AIO相关的函数。如下:

ssize_t(*aio_read)(structkiocb*iocb,char*buffer,size_tcount,loff_toffset);
ssize_t(*aio_write)(structkiocb*iocb,constchar*buffer,size_tcount,loff_toffset);
int(*aio_fsync)(structkiocb*iocb,intdatasync);

aio_read()和aio_write()与file_operation中的read()和write()中的offset参数不同,它直接传递值,而后者传递的是指针。这两个函数本身也不一定完成读写操作,它只是发起,初始化读写操作。

下面来看看实际的代码部分:


//异步读
staticssize_txxx_aio_read(structkiocb*iocb,char*buffer,size_tcount,loff_toffset)
{
returnxxx_defer_op(0,iocb,buf,count,pos);
}

//异步写
staticssize_txxx_aio_write(structkiocb*iocb,constchar*buffer,size_tcount,loff_toffset)
{
returnxxx_defer_op(1,iocb,(char*)buf,count,pos);
}

//初始化异步IO
staticintxxx_defer_op(intwrite,structkiocb*iocb,char*buf,size_tcount,loff_tpos)
{
structasync_work*async_wk;
intresult;
//当可以访问buffer时进行复制
if(write)
{
result=xxx_write(iocb->ki_filp,buf,count,&pos);
}
else
{
result=xxx_read(iocb->ki_filp,buf,count,&pos);
}
//如果是同步IOCB,立即返回状态
if(is_sync_kiocb(iocb))
returnresutl;
//否则,推后几us执行
async_wk=kmalloc(sizeof(*async_wk),GFP_KERNEL));
if(async_wk==NULL)
returnresult;
async_wk->aiocb=iocb;
async_wk->result=result;
INIT_WORK(&async_wk->work,xxx_do_deferred_op,async_wk);
schedule_delayed_work(&async_wk->work,HZ/100);
return-EIOCBOUEUED;//控制权限返回给用户空间
}
//延迟后执行
staticvoidxxx_do_deferred_op(void*p)
{
structasync_work*async_wk=(structasync_work*)p;
aio_complete(async_wk_iocb,async_wk->result,0);
kfree(async_wk);
}


在上述代码中有一个async_work的结构体定义如下:

structasync_work
{
structkiocb*iocb;//kiocb结构体指针
intresult;//执行结果
structwork_structwork;//工作结构体
};

在上边代码中最核心的是使用aync_work结构体将操作延迟,通过schedule_delayed_work可以调度其运行,而aio_complete的调用用于通知内核驱动程序已经完成了操作。


 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息