生产者与消费者模型Linux下C语言的实现
2011-09-01 23:07
627 查看
文章出自:http://page.renren.com/600235506/note/492983524
学习了信号量以及共享内存后,我们就可以实现进程的同步与互斥了。说到这里,最经典的例子莫过于生产者和消费者模型。下面就和大家一起分析,如何一步步实现这个经典模型。
下面程序,实现的是多个生产者和多个消费者对N个缓冲区(N个货架)进行访问的例子。现在先想想我们以前的伪代码是怎么写的?是不是这样:
//生产者:
//消费者:
可能上面的代码你有些眼熟,又有些困惑,因为它和课本上的代码不完全一样,其实上面的代码就是伪代码的linuxC语言具体实现。我们从上面的代码中慢慢寻找伪代码的踪迹:p(semid,0)和v(semid,0)的作用是让进程互斥访问临界区。临界区中包含的数据indexaddr[0],indexaddr[1],以及shmaddr数组分别对应伪代码中的in,out,buffer。p(semid,1)和v(semid,2)以及p(semid,2)和v(semid,1)实现的是同步作用。
并且,在生产者中,生产者生产了一个货物(goods=rand()%10;),然后将这个货物放上货架(shmaddr[indexaddr[0]]=goods;)。在消费者中,消费和从货架上取下货物(goods=shmaddr[indexaddr[1]];)。
好了,现在再看一边上面的代码,我想你的思路就清晰了。
了解了核心代码,并不能算就完成了生产者和消费者模型,因为生产者和消费者核心代码前还得做一些些准备工作,具体要准备些什么,我们具体来分析。
首先申请一块共享内存,这块共享内存用于存放生产者所生产的货物。同时我们可以看到这块共享内存大小为10字节。这里需要注意,每个生产着或消费者运行后,都要去试着分配这样的一块共享内存。如果在当前进程运行前已经有某个进程已经创建了这块共享内存,那么这个进程就不再创建(此时createshm会返回-1并且错误代码为EEXIST),只是打开这块共享内存。创建后,再将这块共享内存添加到当前进程的地址空间。
接下来还要再申请一块共享内存,用于存放两个整形变量in和out(其实就是申请一个含有2个整形变量的数组而已)。他们记录的是生产和消费货物时“货架”的索引。与上面情况相同,如果已经有其他进程创建了此块共享内存,那么当前进程只是打开它而已。
注意这里对两个整形变量的初始化时的值均为0。
接下来就是创建一个信号量集,这个信号量集中包含三个信号量。第一个信号量实现的互斥作用,即进程对临界区的互斥访问。剩下两个均实现的是同步作用,协调生产者和消费者的合理运行,即货架上没有空位时候生产者不再生产,货架上无商品时消费者不再消费。
注意下面对每个信号量的赋值情况。互斥信号量当然初值为1。而同步信号量两者之和不能大于num的值。
基本上这样,就算完成了生产者和消费者的前期工作。我们可以看到,在核心代码中,我们只需要“装模作样”的将代码“各就各位”即可,当然这需要你理解生产者消费者这个基本模型。而在下面的准备代码中,则需要我们理解关于信号量和共享内存的一些基本函数。
最后再说说使用,建议先运行一个生产者和一个消费者,观察两者是如何协调工作的。然后再只运行一个生产者或一个消费者,看其是否会阻塞。了解了以上情况后,你就可以同时运行多个生产者和消费者了。
下面是源代码:
shm.h
producer.c
consumer.c
学习了信号量以及共享内存后,我们就可以实现进程的同步与互斥了。说到这里,最经典的例子莫过于生产者和消费者模型。下面就和大家一起分析,如何一步步实现这个经典模型。
下面程序,实现的是多个生产者和多个消费者对N个缓冲区(N个货架)进行访问的例子。现在先想想我们以前的伪代码是怎么写的?是不是这样:
//生产者:
while(1) { p(semid,1); sleep(3); p(semid,0); //producer is producing a product goods=rand()%10; shmaddr[indexaddr[0]]=goods; printf("producer:%d produces a product[%d]:%d\n",getpid(),indexaddr[0],goods); indexaddr[0]=(indexaddr[0]+1)%10; v(semid,0); sleep(3); v(semid,2); }
//消费者:
while(1) { p(semid,2); sleep(1); p(semid,0); //consumer is consuming a product goods=shmaddr[indexaddr[1]]; printf("consumer:%d consumes a product[%d]:%d\n",getpid(),indexaddr[1],goods); indexaddr[1]=(indexaddr[1]+1)%num; v(semid,0); sleep(1); v(semid,1); }
可能上面的代码你有些眼熟,又有些困惑,因为它和课本上的代码不完全一样,其实上面的代码就是伪代码的linuxC语言具体实现。我们从上面的代码中慢慢寻找伪代码的踪迹:p(semid,0)和v(semid,0)的作用是让进程互斥访问临界区。临界区中包含的数据indexaddr[0],indexaddr[1],以及shmaddr数组分别对应伪代码中的in,out,buffer。p(semid,1)和v(semid,2)以及p(semid,2)和v(semid,1)实现的是同步作用。
并且,在生产者中,生产者生产了一个货物(goods=rand()%10;),然后将这个货物放上货架(shmaddr[indexaddr[0]]=goods;)。在消费者中,消费和从货架上取下货物(goods=shmaddr[indexaddr[1]];)。
好了,现在再看一边上面的代码,我想你的思路就清晰了。
了解了核心代码,并不能算就完成了生产者和消费者模型,因为生产者和消费者核心代码前还得做一些些准备工作,具体要准备些什么,我们具体来分析。
首先申请一块共享内存,这块共享内存用于存放生产者所生产的货物。同时我们可以看到这块共享内存大小为10字节。这里需要注意,每个生产着或消费者运行后,都要去试着分配这样的一块共享内存。如果在当前进程运行前已经有某个进程已经创建了这块共享内存,那么这个进程就不再创建(此时createshm会返回-1并且错误代码为EEXIST),只是打开这块共享内存。创建后,再将这块共享内存添加到当前进程的地址空间。
num=10; //create a shared memory as goods buffer if((shmid_goods=createshm(".",'s',num))==-1) { if(errno==EEXIST) { if((shmid_goods=openshm(".",'s'))==-1) { exit(1); } } else { perror("create shared memory failed\n"); exit(1); } } //attach the shared memory to the current process if((shmaddr=shmat(shmid_goods,(char*)0,0))==(char*)-1) { perror("attach shared memory error\n"); exit(1); }
接下来还要再申请一块共享内存,用于存放两个整形变量in和out(其实就是申请一个含有2个整形变量的数组而已)。他们记录的是生产和消费货物时“货架”的索引。与上面情况相同,如果已经有其他进程创建了此块共享内存,那么当前进程只是打开它而已。
注意这里对两个整形变量的初始化时的值均为0。
//create a shared memory as index if((shmid_index=createshm(".",'z',2))==-1) { if(errno==EEXIST) { if((shmid_index=openshm(".",'z'))==-1) { exit(1); } } else { perror("create shared memory failed\n"); exit(1); } } else { is_noexist=1; } //attach the shared memory to the current process if((indexaddr=shmat(shmid_index,(int*)0,0))==(int*)-1) { perror("attach shared memory error\n"); exit(1); } if(is_noexist) { indexaddr[0]=0; indexaddr[1]=0; }
接下来就是创建一个信号量集,这个信号量集中包含三个信号量。第一个信号量实现的互斥作用,即进程对临界区的互斥访问。剩下两个均实现的是同步作用,协调生产者和消费者的合理运行,即货架上没有空位时候生产者不再生产,货架上无商品时消费者不再消费。
注意下面对每个信号量的赋值情况。互斥信号量当然初值为1。而同步信号量两者之和不能大于num的值。
//create a semaphore set including 3 semaphores if((semid=createsem(".",'t',3,0))==-1) { if(errno==EEXIST) { if((semid=opensem(".",'t'))==-1) { exit(1); } } else { perror("semget error:"); exit(1); } } else { union semun arg; //seting value for mutex semaphore arg.val=1; if(semctl(semid,0,SETVAL,arg)==-1) { perror("setting semaphore value failed\n"); return -1; } //set value for synchronous semaphore arg.val=num; //the num means that the producer can continue to produce num products if(semctl(semid,1,SETVAL,arg)==-1) { perror("setting semaphore value failed\n"); return -1; } //the last semaphore's value is default //the default value '0' means that the consumer is not use any product now }
基本上这样,就算完成了生产者和消费者的前期工作。我们可以看到,在核心代码中,我们只需要“装模作样”的将代码“各就各位”即可,当然这需要你理解生产者消费者这个基本模型。而在下面的准备代码中,则需要我们理解关于信号量和共享内存的一些基本函数。
最后再说说使用,建议先运行一个生产者和一个消费者,观察两者是如何协调工作的。然后再只运行一个生产者或一个消费者,看其是否会阻塞。了解了以上情况后,你就可以同时运行多个生产者和消费者了。
下面是源代码:
shm.h
#include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include <sys/types.h> #include <sys/ipc.h> #include <sys/sem.h> #include <sys/shm.h> #include <errno.h> #define SHM_SIZE 1024 union semun { int val; struct semid_ds *buf; unsigned short *array; }; //create a semaphore set int createsem(const char *pathname,int proj_id,int num,int init_val) { key_t key; int i,semid; union semun arg; if((key=ftok(pathname,proj_id))==-1) { perror("ftok error:"); return -1; } if((semid=semget(key,num,IPC_CREAT|IPC_EXCL|0666))==-1) { return -1; } //initialize the value of semaphore arg.val=init_val; for(i=0;i<num;i++) { if(semctl(semid,i,SETVAL,arg)==-1) { perror("semctl error:"); return -1; } } return (semid); } //open the semaphore set int opensem(const char*pathname,int proj_id) { key_t key; int semid; if((key=ftok(pathname,proj_id))==-1) { perror("ftok error:"); return -1; } //just get the id of semaphore set if((semid=semget(key,0,IPC_CREAT|0666))==-1) { perror("semget error:"); return -1; } return (semid); } //P operation int p(int semid,int index) { struct sembuf buf={0,-1,0}; if(index<0) { printf("error:the index is invalid\n"); return -1; } buf.sem_num=index; if(semop(semid,&buf,1)==-1) { perror("semop error:"); return -1; } return 1; } //V opeation int v(int semid,int index) { struct sembuf buf={0,+1,0}; if(index<0) { printf("error:the index is invalid\n"); return -1; } buf.sem_num=index; if(semop(semid,&buf,1)==-1) { perror("semop error:"); return -1; } return 1; } //delete the semaphore set int deletesem(int semid) { return (semctl(semid,0,IPC_RMID,0)==-1); } //waiting for the semaphore is equal to 1 int waitsem(int semid,int index) { while(semctl(semid,index,GETVAL,0)==0) { sleep(1); printf("I am waiting for semval equals 1..\n"); } return 1; } //create share memory int createshm(char *pathname,int proj_id,size_t size) { key_t key; int shmid; if((key=ftok(pathname,proj_id))==-1) { perror("ftok error:"); return -1; } if((shmid=shmget(key,size,IPC_CREAT|IPC_EXCL|0666))==-1) { return -1; } return (shmid); } //open share memory int openshm(char *pathname,int proj_id) { key_t key; int shmid; if((key=ftok(pathname,proj_id))==-1) { perror("ftok error:"); return -1; } if((shmid=shmget(key,0,IPC_CREAT|0666))==-1) { perror("shmget error:"); return -1; } return (shmid); }
producer.c
#include "shm.h" int main() { int num; int shmid_goods,shmid_index,semid; char* shmaddr=NULL; int *indexaddr=NULL; int is_noexist=0; num=10; //create a shared memory as goods buffer if((shmid_goods=createshm(".",'s',num))==-1) { if(errno==EEXIST) { if((shmid_goods=openshm(".",'s'))==-1) { exit(1); } } else { perror("create shared memory failed\n"); exit(1); } } //attach the shared memory to the current process if((shmaddr=shmat(shmid_goods,(char*)0,0))==(char*)-1) { perror("attach shared memory error\n"); exit(1); } //create a shared memory as index if((shmid_index=createshm(".",'z',2))==-1) { if(errno==EEXIST) { if((shmid_index=openshm(".",'z'))==-1) { exit(1); } } else { perror("create shared memory failed\n"); exit(1); } } else { is_noexist=1; } //attach the shared memory to the current process if((indexaddr=shmat(shmid_index,(int*)0,0))==(int*)-1) { perror("attach shared memory error\n"); exit(1); } if(is_noexist) { indexaddr[0]=0; indexaddr[1]=0; } //create a semaphore set including 3 semaphores if((semid=createsem(".",'t',3,0))==-1) { if(errno==EEXIST) { if((semid=opensem(".",'t'))==-1) { exit(1); } } else { perror("semget error:"); exit(1); } } else { union semun arg; //seting value for mutex semaphore arg.val=1; if(semctl(semid,0,SETVAL,arg)==-1) { perror("setting semaphore value failed\n"); return -1; } //set value for synchronous semaphore arg.val=num; //the num means that the producer can continue to produce num products if(semctl(semid,1,SETVAL,arg)==-1) { perror("setting semaphore value failed\n"); return -1; } //the last semaphore's value is default //the default value '0' means that the consumer is not use any product now } int goods=0; while(1) { p(semid,1); sleep(3); p(semid,0); //producer is producing a product goods=rand()%10; shmaddr[indexaddr[0]]=goods; printf("producer:%d produces a product[%d]:%d\n",getpid(),indexaddr[0],goods); indexaddr[0]=(indexaddr[0]+1)%10; v(semid,0); sleep(3); v(semid,2); } }
consumer.c
#include "shm.h" int main(int argc,char **argv) { int num; int shmid_goods,shmid_index,semid; char* shmaddr=NULL; int* indexaddr=NULL; int is_noexist=0; num=10; //create a shared memory as goods buffer if((shmid_goods=createshm(".",'s',num))==-1) { if(errno==EEXIST) { if((shmid_goods=openshm(".",'s'))==-1) { exit(1); } } else { perror("create shared memory failed\n"); exit(1); } } //attach the shared memory to the current process if((shmaddr=shmat(shmid_goods,(char*)0,0))==(char*)-1) { perror("attach shared memory error\n"); exit(1); } //create a shared memory as index if((shmid_index=createshm(".",'z',2))==-1) { if(errno==EEXIST) { if((shmid_index=openshm(".",'z'))==-1) { exit(1); } } else { perror("create shared memory failed\n"); exit(1); } } else { is_noexist=1; } //attach the shared memory to the current process if((indexaddr=shmat(shmid_index,(int*)0,0))==(int*)-1) { perror("attach shared memory error\n"); exit(1); } if(is_noexist) { indexaddr[0]=0; indexaddr[1]=0; } //create a semaphore set including 3 semaphores if((semid=createsem(".",'t',3,0))==-1) { if(errno==EEXIST) { if((semid=opensem(".",'t'))==-1) { exit(1); } } else { perror("semget error:"); exit(1); } } else { union semun arg; //seting value for mutex semaphore arg.val=1; if(semctl(semid,0,SETVAL,arg)==-1) { perror("setting semaphore value failed\n"); return -1; } //set value for synchronous semaphore arg.val=num; //the num means that the producer can continue to produce num products if(semctl(semid,1,SETVAL,arg)==-1) { perror("setting semaphore value failed\n"); return -1; } //the last semaphore's value is default //the default value '0' means that the consumer is not use any product now } int goods=0; while(1) { p(semid,2); sleep(1); p(semid,0); //consumer is consuming a product goods=shmaddr[indexaddr[1]]; printf("consumer:%d consumes a product[%d]:%d\n",getpid(),indexaddr[1],goods); indexaddr[1]=(indexaddr[1]+1)%num; v(semid,0); sleep(1); v(semid,1); } }
相关文章推荐
- Linux实现生产者消费者模型
- Linux互斥与同步应用(三):posix线程实现单个生产者和单个消费者模型
- linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)【转】
- Linux--Condition Variable(条件变量)实现生产者-消费者模型 、读写锁
- 生产者-消费者问题实现 (linux下C语言)----笛风读书笔记系列
- 生产者-消费者问题实现 (linux下C语言)
- 生产者-消费者问题实现 (linux下C语言)
- Linux C:利用两个线程实现生产者消费者模型
- Linux下利用信号量函数和共享内存函数和C语言实现生产者消费者问题
- 生产者-消费者问题实现 (linux下C语言)
- linux下C语言实现多线程通信—环形缓冲区,可用于生产者(producer)/消费者(consumer)
- 生产者-消费者问题实现 (linux下C语言)
- 浅谈生产者消费者模型(Linux系统下的两种实现方法)
- linux c pv 实现生产者消费者模型
- linux系统生产者-消费者,读者-写者,哲学家就餐 C语言实现
- 生产者消费者模型(Linux系统下的两种实现方法)
- 【Linux】线程总结:线程同步 -互斥锁,条件变量,信号量实现多生产者多消费者模型
- Linux——线程锁实现的生产者、消费者模型
- Linux相互排斥与同步应用(三):posix线程实现单个生产者和单个消费者模型
- 生产者消费者模型(Linux系统下的两种实现方法)