您的位置:首页 > 运维架构 > Linux

linux进程并发模型生产者和消费者模式编程

2014-07-29 12:55 706 查看
linux进程并发模型生产者和消费者模式编程

1. 使用到的知识点。
1.1  linux信号api.(不含多线程):
发送信号 kill, alarm, arise;
注册信号处理函数: signal, sigaction;
等待信号 sigsuspend, sigwait,pause;
阻塞信号 sigfillset, sigemptyset, sigaddset, sigdelset, sigismember, sigprocmask;

发送信号, kill, alarm, arise
这是唯一一组由信号发送方调用的API,
给自己发送信号的函数:
alarm, arise都是给自己进程产生信号,
arise()我们可以使用:kill(getpid(), signo)来实现。
指定给某个进程发送信号的函数:
kill是可以从一个进程发送一个信号到另一个进程,
因为信号的接收方一般都不做权限检查,所以发送方要有权限给的接收方发送信号信号才能被发送,
否则接收方无法接收到这个信号。

直接使用kill给进程1发送一个SIGKILL信号的话,你的系统基本也就OVER了。
这个权限检查的指导原则按我的理解是,
发送方的有效用户是否有权限给接收方的实际用户发送信号来决定。
因为,当你运行一个seteuid程序,这个程序运行时的euid可能不再是ruid了,
但是你仍然可以kill它。

注册信号处理函数, signal, sigaction
这组API是决定收到一个信号号的处理方法。

1.不显示的注册一个信号的处理函数,系统都有对应的默认处理函数,大多数都是直接终止程序本身.

在richard stevens的APUE中说signal的信号处理需要重新安装信号处理方法,但我在FC5下测试,
不重新安装也是可以的,并且正在进行信号处理时,发送一个信号过去,该信号也不会丢失,只是暂时阻塞,
等前面的信号处理完后再处理。当然当正在信号处理时发送多个信号过去,信号处理完后,可能传递一个信号过去,
其它的信号会丢失。至于直接使用signal安装信号,等信号处理完后是否会重启被该信号中断的慢速系统调用,我没有测试,不得而知。
当然你可以使用sigaction来安装信号,并显示的指明restart标志,这样被该信号中断的系统调用就会自动重启。
sigaction这个API更复杂,功能也更多灵活,可以随意定制,通过siginfo_t的结构,可以得到很多信号相关的信息。我猜现在的linux的signal都是用sigaction封装来实现的。跟sigaction相比,signal简单多了,没理由不用它,当然,你得保证signal的语义不会对你的程序造成冲突。

等待信号 pause, sigsuspend, sigwait
这组API都是等待信号的发生,可以根据程序的实际需求来选取。pause只要接收到信号就会返回,sigsuspend跟sigwait都是只等待某些特定的信号发生,但sigsuspend是把参数里的信号集给阻塞,而sigwait一般是先调用sigprocmask把所有的信号都阻塞,然后再等待sigwait参数里指定的信号。通常sigwait用在多线程程序里,主线程把所有信号都mask掉,然后不同的线程用sigwait来等待自己感兴趣的信号。

阻塞信号 sigfillset, sigemptyset, sigaddset, sigdelset, sigismember, sigprocmask
这一组API最多也最简单,前面五个API都是对信号集的操作,
用它用得到一个你想要的信号集后就可以调用sigprocmaks的设置程序的信号掩码了。

另外, siglongjmp, sigsetjmp是安全的在信号处理程序里跑转的两个接口。

最后一些跟信号处理程序里的一些注意事项
1、因为信号会中断慢速的系统调用,所以当系统调用失败时,
一定要检查失败原因,当被信号中断时做出重启系统调用的动作;
2、因为信号处理有异步性,所以任何在信号处理程序里调用了不具可重入性的函数都要小心了。
这里的不具可重入性的函数据括了系统调用跟自己定义的函数。
3、在信号处理程序里任何可能导致对全局变理的修改也要小心了,
一个经常被人忽视的就是在信号处理程序里调用了一个系统调用后可能导致errno的修改。

需求:
我们要对一个1010001到1020000中的,取出其中的质数并打印出来。

想法是一个进程做数据分发工作,其他四个进程进行计算。那么就是一个进程控制四个进程工作。

我们需要使用到进程间通信。
1.使用二个信号, sigusr1, 和sigusr2信号。
sigusr1信号:
当父进程写完一个大于零的数值到到文件中,使用sigusr1信号发送给其他进程。通知子进程做数据计算功能。

sigusr2信号:
当子进程读到给父进程写出的数之后。判断是否大于0 ,如果大于零。将文件中的数据置为零,发送信号给父亲进程。之后进行文件解锁操作。

注意这里应该先通知,在进行文件解锁。如果先解锁的话,给文件在没发出信号之前就有可以被其他的进程给锁住。

注册信号处理函数:
static struct sigaction sact,osact;
static sigset_t set, oset;

1.
static void sig_ctl(int n){}
2.
int sig_cond_init(void){
sact.sa_handler = sig_ctl;
sigemptyset(&sact,sa_mask);
sact.sa_flags = 0 ;
if(sigaction(SIGUSR1,&sact, &osact)<0){
perror("sigaction()");
exit(1);
}
if(sigaction(SIGUSR2,&sact, NULL)<0){
perror("sigaction()");
exit(1);
}
sigemptyset(&set);
sigaddset(&set, SIGUSR1);						//将信号SIGUSR1和SIGUSR2添加到添加的信号集中。
sigaddset(&set, SIGUSR2);
sigprocmask(SIG_BLOCK,&set,&oset); 					//将这些set中的两个信号block住。
return 1;
}

int sig_cond_destroy(void)		  				        //将信号还原默认行为。
{
sigprocmask(SIG_SETMASK, &oset, NULL);
if (sigaction(SIGUSR1, &osact, NULL) < 0) {
perror("sigaction()");
exit(1);
}
return 1;
}

int tell_parent(){						                 //给父进程发送信号。
kill(getppid(),SIGUSR2);
return 1;
}

int tell_child(){
kill(-(getpid()),SIGUSR1);			                        //给所有子进程发送信号。
return 1;
}

int wait_parent(void){					                       //子进程block信号SIGUSR2,并立即进入等待状态。
sigset_t st;
sigemptyset(&st);
sigaddset(&st,SIGUSR2);
/*
sigprocmask(SIG_BLOCK,&st,NULL);
pause();
*/
sigsuspend(&st);
return 1;
}
int wait_chlid(){                                                             //父进程block信号SIGUSR1,立即进入等待状态。
sigset_t st;
sigemptyset(&st);
sigaddset(&st,SIGUSR1);
sigsuspend(&st);
return 1;
}

int prime(int i)
{
int j;
int flag = 0;
for (j=2;j<i/2;j++) {
if (i%j == 0) {
flag = 0;
break;
} else {
flag = 1;
continue;
}
}
return flag;
}

int get_num(int fd)
{
int ret;
int size;
char buf[NUM];
ret = lseek(fd, 0, SEEK_SET);
if (ret < 0) {
perror("lseek(get_num)");
exit(1);
}
size = read(fd, buf, NUM);
if (size < 0) {
perror("read(get_num)");
exit(1);
}
buf[size] = '\0';
return atoi(buf);
}

int set_num(int fd, int num)
{
int ret;
int size;
char buf[NUM];
ret = lseek(fd, 0, SEEK_SET);
if (ret < 0) {
perror("lseek(get_num)");
exit(1);
}
sprintf(buf, "%d", num);
ret = write(fd, buf, strlen(buf));
if (ret < 0) {
perror("write(set_num)");
exit(1);
}
if (ftruncate(fd, ret) < 0) {
perror("ftruncate()");
exit(1);
}
return 1;
}

void child(void)
{
int fd, ret, num;
fd = open(PATH, O_RDWR);
if (fd < 0) {
perror("open(child)");
exit(1);
}

while (1) {
ret = flock(fd, LOCK_EX);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
num = get_num(fd);
if (num == 0) {
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
wait_parent();
continue;
}
if (num == -1) {
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
break;
}
set_num(fd, 0);
tell_parent();
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
if (prime(num)) {
printf("%d from: %d\n", num, getpid());
}
}

close(fd);
exit(0);
}

void parent(void)
{
int fd, i, ret, num;
fd = open(PATH, O_RDWR);
if (fd < 0) {
perror("open(child)");
exit(1);
}

for (i=START;i<END;i+=2) {
while (1) {
ret = flock(fd, LOCK_EX);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
num = get_num(fd);
if (num > 0) {
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
wait_child();
continue;
}
if (num == 0) {
set_num(fd, i);
tell_child();
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
break;
}
}
}

while (1) {
ret = flock(fd, LOCK_EX);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
num = get_num(fd);
if (num > 0) {
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
wait_child();
continue;
}
if (num == 0) {
set_num(fd, -1);
tell_child();
ret = flock(fd, LOCK_UN);
if (ret < 0) {
perror("flock(child)");
exit(1);
}
break;
}
}
close(fd);
}

#include "prime.h"
int main()
{
int i, j, flag = 0;
pid_t pid;

sig_cond_init();

for (i=0;i<PROC;i++) {
pid = fork();
if (pid < 0) {
perror("fork()");
exit(1);
}

if (pid == 0) {
child();
}
}
parent();
for (i=0;i<PROC;i++) {
wait(NULL);
}
sig_cond_destroy();
exit(0);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: