linux系统中的进程通信(消息队列、共享内存、信号量)
2016-09-12 15:22
706 查看
消息队列:类似与管道只能单项通信,消息队列由消息组成,消息可以有不同的数据结构,也可以由有优先级,能够更加灵活的通 信。消息队列使用int msgget(key_t key,int flags)获得或创建消息队列,使用void msgsnd(int msqid,const void *msgp,size_t msgsz,int msgflg);发送消息,使用size_t msgrcv(int msqid,void *msgp,size_t msgz,long
int msgtype,int msgflg);接收消息,通过int msgctl(int msqid,int cmd,struct msgid_ds *buf); 查询、获取、删除消息队列。
flags:访问权限,创建标志(IPC_CREAT、IPC_EXCL)
msgp:消息缓存区指针。
msgsz:消息缓存去正文的大小。
msgflg:控制变量(IPC_NOWAIT,MSG_NOERROR)
msqid:IPC描述字。
msgp:消息缓存区指针。
msgsz:消息缓存去正文的大小。
msgype:消息的接收类型。
msgype = 0 按顺序接收。
msgype < 0 接收特定的消息
msgype > 0 按优先级接收
msgflg:控制变量(IPC_NOWAIT,MSG_NOERROR)
cmd: 执行的命令
msgsz: 指向msqid_ds
接收:
共享内存:多个进程能够连接到同一个内存区域,都能够对其进行读取操作。通过int shmget(key_t key,size_t size,int shmflg);创建或者获取共享内存,通过void *shmat(int shmid,const void *shmaddr,int shmflg);链接到共享内存,通过int shmdt(int shmid,void *shmaddr);取消对共享内存的链接,通过int shmctl(int shmid,int
cmd,struct shmid_ds *buf);查询、设置、删除,共享内存。
size: 共享内存大小
shmflg: 访问控制标志
shmadrr:共享内存地址
shmflg:控制变量
shmid: IPC描述字
shmadrr:共享内存地址
cmd: 命令
buf: 指向shmid_ds
创建共享内存并写入数据:
读取共享内存:
信号量:信号量可以解决消息队列、共享内存的竞争情况。信号量主要用于互斥情形和生产消费情形。可以通过int semget(key_t key,int nsems,int semflg);获取或创建信号量,通过int semop(int semid,struct sembuf *sops,size_t nsops);操作信号量,通过int semctl(int semid,int semnum,int cmd,[union semun arg]);查询、删除、设置信号量。
生产消费问题,创建一个生产者、消费者都能访问的共享内存。建立两个信号量保证两个进程的互斥访问,sem_pu_ok生产者进程没有访问共享内存,sem_con_ok消费者内存没有访问共享内存。具体代码如下:
生产者:
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/unistd.h>
#include <sys/fcntl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define key_pu 1024
#define key_con 1025
#define key_shm 2048
#define Max 1024
#define ERR(msg,f) { \
if(f < 0) { \
fprintf(stderr,"%s",msg); \
exit(1); \
} \
}
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
int get_sem(int key,int sems) /*设置信号量*/
{
if(sems <= 0)
ERR("create sem failed\n",sems - 1);
int sem_id;
sem_id = semget(key,sems, 0666 | IPC_CREAT);
ERR("create sem failed\n",sem_id);
return sem_id;
}
int get_shm(int shm_id,ssize_t size) /*设置共享内存*/
{
if(size <= 0)
ERR("shm create faile\n",size - 1);
int shm;
shm = shmget(shm_id,size,0666 | IPC_CREAT);
ERR("shm create faile\n",shm);
return shm;
}
void init_sem(int sem_id,int semnum,int init_num) /*初始化信号量*/
{
union semun sem_set;
sem_set.val = init_num;
ERR("sem init failed\n",semctl(sem_id, semnum, SETVAL,sem_set));
return;
}
char *get_shm_addr(int shm_id) /*把共享内存映射到进程空间*/
{
char *addr = NULL;
addr = shmat(shm_id, NULL, 0);
if(addr)
return addr;
ERR("at shm faile\n",-1);
}
void sem_p(int sem_id) /*P操作*/
{
struct sembuf buf;
buf.sem_op = -1;
buf.sem_flg = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void sem_v(int sem_id) /*V操作*/
{
struct sembuf buf;
buf.sem_op = 1;
buf.sem_flg = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void rm_sem(int sem_id) /*删除信号量*/
{
ERR("delete sem failed\n",semctl(sem_id, 0, IPC_RMID,0));
return;
}
struct info{ /*共享内存数据类型*/
char buf[Max];
int g;
};
int main(int argc,char *argv[])
{
int shm_id,sem_pu_ok,sem_con_ok;
struct info *shm_addr = NULL;
sem_pu_ok = get_sem(key_pu,1); /*获得信号量*/
sem_con_ok = get_sem(key_con,1); /*获得信号量*/
init_sem(sem_con_ok,0,1); /*初始化信号量*/
init_sem(sem_pu_ok,0,0); /*初始化信号量*/
shm_id = get_shm(key_shm,sizeof(struct info)); /*建立共享存储*/
shm_addr = (struct info*)get_shm_addr(shm_id); /*把共享内存映射到进程地址空间*/
int i;
for(i = 0;;i++){
sem_p(sem_con_ok); /*P操作*/
printf("palse input string:\n");
fgets(shm_addr->buf, Max, stdin);
shm_addr->g = i;
sem_v(sem_pu_ok); /*V操作*/
if(!strncmp(shm_addr->buf,"end",3)) /*若是end结束*/
break;
}
sem_p(sem_con_ok); /*等待消费进程完成*/
rm_sem(sem_con_ok); /*删除信号量*/
rm_sem(sem_pu_ok); /*删除信号量*/
shmdt(shm_addr); /*分离共享内存*/
shmctl(shm_id,IPC_RMID,0); /*删除共享内存*/
exit(0);
}
消费者进程:
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/unistd.h>
#include <sys/fcntl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define key_pu 1024
#define key_con 1025
#define key_shm 2048
#define Max 1024
#define ERR(msg,f) { \
if(f < 0) { \
fprintf(stderr,"%s",msg); \
exit(1); \
} \
}
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
int get_sem(int key,int sems)
{
if(sems <= 0)
ERR("create sem failed\n",sems - 1);
int sem_id;
sem_id = semget(key,sems, 0666 | IPC_CREAT);
ERR("create sem failed\n",sem_id);
return sem_id;
}
int get_shm(int shm_id,ssize_t size)
{
if(size <= 0)
ERR("shm create faile\n",size - 1);
int shm;
shm = shmget(shm_id,size,0666 | IPC_CREAT);
ERR("shm create faile\n",shm);
return shm;
}
void init_sem(int sem_id,int semnum,int init_num)
{
//union semun sem_set;
union semun sem_set;
sem_set.val = init_num;
ERR("sem init failed\n",semctl(sem_id, semnum, SETVAL,sem_set));
return;
}
char *get_shm_addr(int shm_id)
{
char *addr = NULL;
addr = shmat(shm_id, NULL, 0);
if(addr)
return addr;
ERR("at shm faile\n",-1);
}
void sem_p(int sem_id)
{
struct sembuf buf;
buf.sem_op = -1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void sem_v(int sem_id)
{
struct sembuf buf;
buf.sem_op = 1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void rm_sem(int sem_id)
{
ERR("delete sem failed\n",semctl(sem_id, 0, IPC_RMID,0));
return;
}
struct info{
char buf[Max];
int g;
};
int main(int argc,char *argv[])
{
int shm_id,sem_pu_ok,sem_con_ok;
struct info *shm_addr = NULL;
sem_pu_ok = get_sem(key_pu,1);
sem_con_ok = get_sem(key_con,1);
shm_id = get_shm(key_shm,sizeof(struct info));
shm_addr = (struct info*)get_shm_addr(shm_id);
int i;
for(i = 0;;i++){
sem_p(sem_pu_ok);
printf("pid is: %d NO:%d rcv data is:%s\n",getpid(),shm_addr->g,shm_addr->buf);
sem_v(sem_con_ok);
if(!strncmp(shm_addr->buf,"end",3))
break;
}
shmdt(shm_addr);
sem_v(sem_con_ok); /*唤醒生产者进程*/
exit(0);
}
int msgtype,int msgflg);接收消息,通过int msgctl(int msqid,int cmd,struct msgid_ds *buf); 查询、获取、删除消息队列。
#include <sys/msg.h> int msgget(key_t key,int flags);key:消息对列的关键字。
flags:访问权限,创建标志(IPC_CREAT、IPC_EXCL)
#include <sys/msg.h> void msgsnd(int msqid,const struct void *msgp,size_t msgsz,int msgflg);msqid:IPC描述字。
msgp:消息缓存区指针。
msgsz:消息缓存去正文的大小。
msgflg:控制变量(IPC_NOWAIT,MSG_NOERROR)
#include <sys/msg.h> size_t msgrcv(int msqid,void *msgp,size_t msgz,long int msgtype,int msgflg);
msqid:IPC描述字。
msgp:消息缓存区指针。
msgsz:消息缓存去正文的大小。
msgype:消息的接收类型。
msgype = 0 按顺序接收。
msgype < 0 接收特定的消息
msgype > 0 按优先级接收
msgflg:控制变量(IPC_NOWAIT,MSG_NOERROR)
#include <sys.msg.h> int msgctl(int msqid,int cmd,struct msgid_ds *buf);msqid: IPC描述字
cmd: 执行的命令
msgsz: 指向msqid_ds
接收:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/msg.h> #include <errno.h> #define BUFSIZE 1024 struct my_msg /*消息数据类型*/ { long int mtype; char mtext[BUFSIZE]; }; int main() { int msgid; struct my_msg my; if((msgid = msgget(565,0666|IPC_CREAT)) == -1) /*创建或获取消息队列*/ { fprintf(stderr,"msg faile!\n"); exit(1); } while(1) { if(msgrcv(msgid,&my,sizeof(struct my_msg),-57,IPC_NOWAIT) != -1) /*接收优先级小于57的消息*/ fprintf(stdout,"%s\n",my.mtext); else if(errno == ENOMSG && msgrcv(msgid,&my,sizeof(struct my_msg),0,0) != -1) /*按顺序接收消息*/ fprintf(stdout,"%s\n",my.mtext); else { fprintf(stderr,"fiale!\n"); exit(1); } if(!strncmp(my.mtext,"end",3)) break; } if(msgctl(msgid,IPC_RMID,NULL) == -1) /*删除消息队列*/ { fprintf(stderr,"rm faile\n"); exit(1); } exit(0); }发送:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/msg.h> #include <errno.h> #define BUFSIZE 1024 struct my_msg /*消息的数据类型*/ { long int mtype; char mtext[BUFSIZE]; }; int main() { int msgid; struct my_msg my; if((msgid = msgget(565,0666|IPC_CREAT)) == -1) /*创建或获取消息队列*/ { fprintf(stderr,"msg faile!\n"); exit(1); } while(1) { fprintf(stdout,"Plase Input\n"); fgets(my.mtext,BUFSIZE,stdin); my.mtype = my.mtext[0]; if(msgsnd(msgid,&my,sizeof(struct my_msg),IPC_NOWAIT) == -1) /*发送消息*/ { fprintf(stderr,"msgsd faild!\n"); exit(1); } if(!strncmp(my.mtext,"end",3)) break; } exit(0); }
共享内存:多个进程能够连接到同一个内存区域,都能够对其进行读取操作。通过int shmget(key_t key,size_t size,int shmflg);创建或者获取共享内存,通过void *shmat(int shmid,const void *shmaddr,int shmflg);链接到共享内存,通过int shmdt(int shmid,void *shmaddr);取消对共享内存的链接,通过int shmctl(int shmid,int
cmd,struct shmid_ds *buf);查询、设置、删除,共享内存。
#include <sys/shm.h> int shmget(key_t key,size_t size,int shmflg);key: IPC关键字
size: 共享内存大小
shmflg: 访问控制标志
#include <sys/shm.h> void *shmat(int shmid,const void *shmaddr,int shmflg);shmid: IPC描述字
shmadrr:共享内存地址
shmflg:控制变量
#include <sys/shm.h> int shmdt(int shmid,void *shmaddr);
shmid: IPC描述字
shmadrr:共享内存地址
#include <sys/shm.h> int shmctl(int shmid,int cmd,struct shmid_ds *buf);shmid: IPC描述字
cmd: 命令
buf: 指向shmid_ds
创建共享内存并写入数据:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/shm.h> #define Size 1024 int main() { int shmId; char *s,*shm; if((shmId = shmget(565,Size,0666|IPC_CREAT)) == -1) /*创建或获取共享内存*/ { fprintf(stderr,"shm fail\n"); exit(0); } if((shm = shmat(shmId,NULL,0)) == -1) /*链接到共享内存*/ { fprintf(stderr,"make faile!\n"); exit(1); } s = shm; char i; for(i = 'A'; i <= 'Z'; i++) /*向共享内存写入数据*/ *s++ = i; *s = 0; exit(0); }
读取共享内存:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/types.h> #include <sys/stat.h> #include <sys/shm.h> #define Size 1024 int main() { int shmId; char *s,*shm; if((shmId = shmget(565,Size,0666)) == -1) /*获取或创建共享内存*/ { fprintf(stderr,"shm fail\n"); exit(0); } if((shm = shmat(shmId,NULL,0)) == (char*)-1) /*链接到共享内存*/ { fprintf(stderr,"shmId faild\n"); exit(1); } s = shm; fprintf(stdout,"%s\n",s); exit(0); }
信号量:信号量可以解决消息队列、共享内存的竞争情况。信号量主要用于互斥情形和生产消费情形。可以通过int semget(key_t key,int nsems,int semflg);获取或创建信号量,通过int semop(int semid,struct sembuf *sops,size_t nsops);操作信号量,通过int semctl(int semid,int semnum,int cmd,[union semun arg]);查询、删除、设置信号量。
生产消费问题,创建一个生产者、消费者都能访问的共享内存。建立两个信号量保证两个进程的互斥访问,sem_pu_ok生产者进程没有访问共享内存,sem_con_ok消费者内存没有访问共享内存。具体代码如下:
生产者:
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/unistd.h>
#include <sys/fcntl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define key_pu 1024
#define key_con 1025
#define key_shm 2048
#define Max 1024
#define ERR(msg,f) { \
if(f < 0) { \
fprintf(stderr,"%s",msg); \
exit(1); \
} \
}
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
int get_sem(int key,int sems) /*设置信号量*/
{
if(sems <= 0)
ERR("create sem failed\n",sems - 1);
int sem_id;
sem_id = semget(key,sems, 0666 | IPC_CREAT);
ERR("create sem failed\n",sem_id);
return sem_id;
}
int get_shm(int shm_id,ssize_t size) /*设置共享内存*/
{
if(size <= 0)
ERR("shm create faile\n",size - 1);
int shm;
shm = shmget(shm_id,size,0666 | IPC_CREAT);
ERR("shm create faile\n",shm);
return shm;
}
void init_sem(int sem_id,int semnum,int init_num) /*初始化信号量*/
{
union semun sem_set;
sem_set.val = init_num;
ERR("sem init failed\n",semctl(sem_id, semnum, SETVAL,sem_set));
return;
}
char *get_shm_addr(int shm_id) /*把共享内存映射到进程空间*/
{
char *addr = NULL;
addr = shmat(shm_id, NULL, 0);
if(addr)
return addr;
ERR("at shm faile\n",-1);
}
void sem_p(int sem_id) /*P操作*/
{
struct sembuf buf;
buf.sem_op = -1;
buf.sem_flg = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void sem_v(int sem_id) /*V操作*/
{
struct sembuf buf;
buf.sem_op = 1;
buf.sem_flg = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void rm_sem(int sem_id) /*删除信号量*/
{
ERR("delete sem failed\n",semctl(sem_id, 0, IPC_RMID,0));
return;
}
struct info{ /*共享内存数据类型*/
char buf[Max];
int g;
};
int main(int argc,char *argv[])
{
int shm_id,sem_pu_ok,sem_con_ok;
struct info *shm_addr = NULL;
sem_pu_ok = get_sem(key_pu,1); /*获得信号量*/
sem_con_ok = get_sem(key_con,1); /*获得信号量*/
init_sem(sem_con_ok,0,1); /*初始化信号量*/
init_sem(sem_pu_ok,0,0); /*初始化信号量*/
shm_id = get_shm(key_shm,sizeof(struct info)); /*建立共享存储*/
shm_addr = (struct info*)get_shm_addr(shm_id); /*把共享内存映射到进程地址空间*/
int i;
for(i = 0;;i++){
sem_p(sem_con_ok); /*P操作*/
printf("palse input string:\n");
fgets(shm_addr->buf, Max, stdin);
shm_addr->g = i;
sem_v(sem_pu_ok); /*V操作*/
if(!strncmp(shm_addr->buf,"end",3)) /*若是end结束*/
break;
}
sem_p(sem_con_ok); /*等待消费进程完成*/
rm_sem(sem_con_ok); /*删除信号量*/
rm_sem(sem_pu_ok); /*删除信号量*/
shmdt(shm_addr); /*分离共享内存*/
shmctl(shm_id,IPC_RMID,0); /*删除共享内存*/
exit(0);
}
消费者进程:
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/unistd.h>
#include <sys/fcntl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define key_pu 1024
#define key_con 1025
#define key_shm 2048
#define Max 1024
#define ERR(msg,f) { \
if(f < 0) { \
fprintf(stderr,"%s",msg); \
exit(1); \
} \
}
union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};
int get_sem(int key,int sems)
{
if(sems <= 0)
ERR("create sem failed\n",sems - 1);
int sem_id;
sem_id = semget(key,sems, 0666 | IPC_CREAT);
ERR("create sem failed\n",sem_id);
return sem_id;
}
int get_shm(int shm_id,ssize_t size)
{
if(size <= 0)
ERR("shm create faile\n",size - 1);
int shm;
shm = shmget(shm_id,size,0666 | IPC_CREAT);
ERR("shm create faile\n",shm);
return shm;
}
void init_sem(int sem_id,int semnum,int init_num)
{
//union semun sem_set;
union semun sem_set;
sem_set.val = init_num;
ERR("sem init failed\n",semctl(sem_id, semnum, SETVAL,sem_set));
return;
}
char *get_shm_addr(int shm_id)
{
char *addr = NULL;
addr = shmat(shm_id, NULL, 0);
if(addr)
return addr;
ERR("at shm faile\n",-1);
}
void sem_p(int sem_id)
{
struct sembuf buf;
buf.sem_op = -1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void sem_v(int sem_id)
{
struct sembuf buf;
buf.sem_op = 1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;
ERR("sem p faile\n",semop(sem_id, &buf, 1));
return;
}
void rm_sem(int sem_id)
{
ERR("delete sem failed\n",semctl(sem_id, 0, IPC_RMID,0));
return;
}
struct info{
char buf[Max];
int g;
};
int main(int argc,char *argv[])
{
int shm_id,sem_pu_ok,sem_con_ok;
struct info *shm_addr = NULL;
sem_pu_ok = get_sem(key_pu,1);
sem_con_ok = get_sem(key_con,1);
shm_id = get_shm(key_shm,sizeof(struct info));
shm_addr = (struct info*)get_shm_addr(shm_id);
int i;
for(i = 0;;i++){
sem_p(sem_pu_ok);
printf("pid is: %d NO:%d rcv data is:%s\n",getpid(),shm_addr->g,shm_addr->buf);
sem_v(sem_con_ok);
if(!strncmp(shm_addr->buf,"end",3))
break;
}
shmdt(shm_addr);
sem_v(sem_con_ok); /*唤醒生产者进程*/
exit(0);
}
相关文章推荐
- mongo实现消息队列
- C#信号量用法简单示例
- win32下进程间通信(共享内存)实例分析
- 进程间通信之深入消息队列的详解
- linux多线程编程详解教程(线程通过信号量实现通信代码)
- 单台服务器的PHP进程之间实现共享内存的方法
- PHP信号量基本用法实例详解
- PHP共享内存用法实例分析
- 基于条件变量的消息队列 说明介绍
- PHP消息队列用法实例分析
- PHP使用php-resque库配合Redis实现MQ消息队列的教程
- PHP+memcache实现消息队列案例分享
- JAVA 多线程之信号量(Semaphore)实例详解
- Spring学习笔记3之消息队列(rabbitmq)发送邮件功能
- Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程
- Python中线程的MQ消息队列实现以及消息队列的优点解析
- 利用Python学习RabbitMQ消息队列
- Python的消息队列包SnakeMQ使用初探
- 分布式消息队列XXL-MQ
- IBM WebSphere MQ介绍安装以及配置服务详解