您的位置:首页 > 运维架构 > Linux

linux系统中的进程通信(消息队列、共享内存、信号量)

2016-09-12 15:22 706 查看
消息队列:类似与管道只能单项通信,消息队列由消息组成,消息可以有不同的数据结构,也可以由有优先级,能够更加灵活的通 信。消息队列使用int msgget(key_t key,int flags)获得或创建消息队列,使用void msgsnd(int msqid,const void *msgp,size_t msgsz,int msgflg);发送消息,使用size_t msgrcv(int msqid,void *msgp,size_t msgz,long
int msgtype,int msgflg);接收消息,通过int msgctl(int msqid,int cmd,struct msgid_ds *buf); 查询、获取、删除消息队列。

#include <sys/msg.h>
int msgget(key_t key,int flags);
key:消息对列的关键字。

flags:访问权限,创建标志(IPC_CREAT、IPC_EXCL)

#include <sys/msg.h>
void msgsnd(int msqid,const struct void *msgp,size_t msgsz,int msgflg);
msqid:IPC描述字。

msgp:消息缓存区指针。

msgsz:消息缓存去正文的大小。

msgflg:控制变量(IPC_NOWAIT,MSG_NOERROR)

#include <sys/msg.h>
size_t msgrcv(int msqid,void *msgp,size_t msgz,long int msgtype,int msgflg);




msqid:IPC描述字。

msgp:消息缓存区指针。

msgsz:消息缓存去正文的大小。

msgype:消息的接收类型。

msgype = 0 按顺序接收。

msgype < 0 接收特定的消息

msgype > 0 按优先级接收

msgflg:控制变量(IPC_NOWAIT,MSG_NOERROR)

#include <sys.msg.h>
int msgctl(int msqid,int cmd,struct msgid_ds *buf);
msqid: IPC描述字

cmd: 执行的命令

msgsz: 指向msqid_ds

接收:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/msg.h>
#include <errno.h>
#define BUFSIZE 1024

struct my_msg /*消息数据类型*/
{
long int mtype;
char mtext[BUFSIZE];
};

int main()
{
int msgid;
struct my_msg my;

if((msgid = msgget(565,0666|IPC_CREAT)) == -1) /*创建或获取消息队列*/
 	{
fprintf(stderr,"msg faile!\n");
exit(1);
}
while(1)
{
if(msgrcv(msgid,&my,sizeof(struct my_msg),-57,IPC_NOWAIT) != -1) /*接收优先级小于57的消息*/
fprintf(stdout,"%s\n",my.mtext);
else if(errno == ENOMSG && msgrcv(msgid,&my,sizeof(struct my_msg),0,0) != -1) /*按顺序接收消息*/
fprintf(stdout,"%s\n",my.mtext);
else
{
fprintf(stderr,"fiale!\n");
exit(1);
}

if(!strncmp(my.mtext,"end",3))
break;
}
if(msgctl(msgid,IPC_RMID,NULL) == -1) /*删除消息队列*/
{
fprintf(stderr,"rm faile\n");
exit(1);
}
exit(0);
}
发送:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/msg.h>
#include <errno.h>
#define BUFSIZE 1024

struct my_msg /*消息的数据类型*/
{
long int mtype;
char mtext[BUFSIZE];
};

int main()
{
int msgid;
struct my_msg my;

if((msgid = msgget(565,0666|IPC_CREAT)) == -1) /*创建或获取消息队列*/
{
fprintf(stderr,"msg faile!\n");
exit(1);
}
while(1)
{
fprintf(stdout,"Plase Input\n");
fgets(my.mtext,BUFSIZE,stdin);
my.mtype = my.mtext[0];

if(msgsnd(msgid,&my,sizeof(struct my_msg),IPC_NOWAIT) == -1) /*发送消息*/
{
fprintf(stderr,"msgsd faild!\n");
exit(1);
}
if(!strncmp(my.mtext,"end",3))
break;
}
exit(0);
}


共享内存:多个进程能够连接到同一个内存区域,都能够对其进行读取操作。通过int shmget(key_t key,size_t size,int shmflg);创建或者获取共享内存,通过void *shmat(int shmid,const void *shmaddr,int shmflg);链接到共享内存,通过int shmdt(int shmid,void *shmaddr);取消对共享内存的链接,通过int shmctl(int shmid,int
cmd,struct shmid_ds *buf);查询、设置、删除,共享内存。

#include <sys/shm.h>
int shmget(key_t key,size_t size,int shmflg);
key: IPC关键字

size: 共享内存大小

shmflg: 访问控制标志

#include <sys/shm.h>
void *shmat(int shmid,const void *shmaddr,int shmflg);
shmid: IPC描述字

shmadrr:共享内存地址

shmflg:控制变量

#include <sys/shm.h>
int shmdt(int shmid,void *shmaddr);


shmid: IPC描述字

shmadrr:共享内存地址

#include <sys/shm.h>
int shmctl(int shmid,int cmd,struct shmid_ds *buf);
shmid: IPC描述字

cmd: 命令

buf: 指向shmid_ds

创建共享内存并写入数据:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/shm.h>
#define Size 1024

int main()
{
int shmId;
char *s,*shm;

if((shmId = shmget(565,Size,0666|IPC_CREAT)) == -1) /*创建或获取共享内存*/
{
fprintf(stderr,"shm fail\n");
exit(0);
}

if((shm = shmat(shmId,NULL,0)) == -1) /*链接到共享内存*/
{
fprintf(stderr,"make faile!\n");
exit(1);
}
s = shm;

char i;
for(i = 'A'; i <= 'Z'; i++) /*向共享内存写入数据*/
*s++ = i;
*s = 0;
exit(0);
}


读取共享内存:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/shm.h>
#define Size 1024

int main()
{
int shmId;
char *s,*shm;

if((shmId = shmget(565,Size,0666)) == -1) /*获取或创建共享内存*/
{
fprintf(stderr,"shm fail\n");
exit(0);
}
if((shm = shmat(shmId,NULL,0)) == (char*)-1) /*链接到共享内存*/
{
fprintf(stderr,"shmId faild\n");
exit(1);
}

s = shm;
fprintf(stdout,"%s\n",s);

exit(0);
}


信号量:信号量可以解决消息队列、共享内存的竞争情况。信号量主要用于互斥情形和生产消费情形。可以通过int semget(key_t key,int nsems,int semflg);获取或创建信号量,通过int semop(int semid,struct sembuf *sops,size_t nsops);操作信号量,通过int semctl(int semid,int semnum,int cmd,[union semun arg]);查询、删除、设置信号量。

生产消费问题,创建一个生产者、消费者都能访问的共享内存。建立两个信号量保证两个进程的互斥访问,sem_pu_ok生产者进程没有访问共享内存,sem_con_ok消费者内存没有访问共享内存。具体代码如下:

生产者:

#include <sys/stat.h>
#include <sys/types.h>
#include <sys/unistd.h>
#include <sys/fcntl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define key_pu 1024
#define key_con 1025
#define key_shm 2048
#define Max 1024
#define ERR(msg,f) { \
if(f < 0) { \
fprintf(stderr,"%s",msg); \
exit(1); \
} \
}

union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};

int get_sem(int key,int sems) /*设置信号量*/
{
if(sems <= 0)
ERR("create sem failed\n",sems - 1);

int sem_id;
sem_id = semget(key,sems, 0666 | IPC_CREAT);
ERR("create sem failed\n",sem_id);

return sem_id;
}

int get_shm(int shm_id,ssize_t size) /*设置共享内存*/
{
if(size <= 0)
ERR("shm create faile\n",size - 1);
int shm;
shm = shmget(shm_id,size,0666 | IPC_CREAT);
ERR("shm create faile\n",shm);

return shm;
}

void init_sem(int sem_id,int semnum,int init_num) /*初始化信号量*/
{
union semun sem_set;
sem_set.val = init_num;
ERR("sem init failed\n",semctl(sem_id, semnum, SETVAL,sem_set));

return;
}

char *get_shm_addr(int shm_id) /*把共享内存映射到进程空间*/
{
char *addr = NULL;
addr = shmat(shm_id, NULL, 0);
if(addr)
return addr;
ERR("at shm faile\n",-1);
}

void sem_p(int sem_id) /*P操作*/
{
struct sembuf buf;
buf.sem_op = -1;
buf.sem_flg = 0;
buf.sem_flg = SEM_UNDO;

ERR("sem p faile\n",semop(sem_id, &buf, 1));

return;
}

void sem_v(int sem_id) /*V操作*/
{
struct sembuf buf;
buf.sem_op = 1;
buf.sem_flg = 0;
buf.sem_flg = SEM_UNDO;

ERR("sem p faile\n",semop(sem_id, &buf, 1));

return;
}

void rm_sem(int sem_id) /*删除信号量*/
{
ERR("delete sem failed\n",semctl(sem_id, 0, IPC_RMID,0));
return;
}

struct info{ /*共享内存数据类型*/
char buf[Max];
int g;
};

int main(int argc,char *argv[])
{
int shm_id,sem_pu_ok,sem_con_ok;
struct info *shm_addr = NULL;

sem_pu_ok = get_sem(key_pu,1); /*获得信号量*/
sem_con_ok = get_sem(key_con,1); /*获得信号量*/

init_sem(sem_con_ok,0,1); /*初始化信号量*/
init_sem(sem_pu_ok,0,0); /*初始化信号量*/

shm_id = get_shm(key_shm,sizeof(struct info)); /*建立共享存储*/
shm_addr = (struct info*)get_shm_addr(shm_id); /*把共享内存映射到进程地址空间*/

int i;
for(i = 0;;i++){
sem_p(sem_con_ok); /*P操作*/

printf("palse input string:\n");
fgets(shm_addr->buf, Max, stdin);
shm_addr->g = i;

sem_v(sem_pu_ok); /*V操作*/
if(!strncmp(shm_addr->buf,"end",3)) /*若是end结束*/
break;
}
sem_p(sem_con_ok); /*等待消费进程完成*/

rm_sem(sem_con_ok); /*删除信号量*/
rm_sem(sem_pu_ok); /*删除信号量*/
shmdt(shm_addr); /*分离共享内存*/
shmctl(shm_id,IPC_RMID,0); /*删除共享内存*/

exit(0);
}


消费者进程:
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/unistd.h>
#include <sys/fcntl.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#define key_pu 1024
#define key_con 1025
#define key_shm 2048
#define Max 1024
#define ERR(msg,f) { \
if(f < 0) { \
fprintf(stderr,"%s",msg); \
exit(1); \
} \
}

union semun {
int val;
struct semid_ds *buf;
unsigned short *array;
};

int get_sem(int key,int sems)
{
if(sems <= 0)
ERR("create sem failed\n",sems - 1);

int sem_id;
sem_id = semget(key,sems, 0666 | IPC_CREAT);
ERR("create sem failed\n",sem_id);

return sem_id;
}

int get_shm(int shm_id,ssize_t size)
{
if(size <= 0)
ERR("shm create faile\n",size - 1);
int shm;
shm = shmget(shm_id,size,0666 | IPC_CREAT);
ERR("shm create faile\n",shm);

return shm;
}

void init_sem(int sem_id,int semnum,int init_num)
{
//union semun sem_set;
union semun sem_set;
sem_set.val = init_num;
ERR("sem init failed\n",semctl(sem_id, semnum, SETVAL,sem_set));

return;
}

char *get_shm_addr(int shm_id)
{
char *addr = NULL;
addr = shmat(shm_id, NULL, 0);
if(addr)
return addr;
ERR("at shm faile\n",-1);
}

void sem_p(int sem_id)
{
struct sembuf buf;
buf.sem_op = -1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;

ERR("sem p faile\n",semop(sem_id, &buf, 1));

return;
}

void sem_v(int sem_id)
{
struct sembuf buf;
buf.sem_op = 1;
buf.sem_num = 0;
buf.sem_flg = SEM_UNDO;

ERR("sem p faile\n",semop(sem_id, &buf, 1));

return;
}

void rm_sem(int sem_id)
{
ERR("delete sem failed\n",semctl(sem_id, 0, IPC_RMID,0));
return;
}

struct info{
char buf[Max];
int g;
};

int main(int argc,char *argv[])
{
int shm_id,sem_pu_ok,sem_con_ok;
struct info *shm_addr = NULL;

sem_pu_ok = get_sem(key_pu,1);
sem_con_ok = get_sem(key_con,1);

shm_id = get_shm(key_shm,sizeof(struct info));
shm_addr = (struct info*)get_shm_addr(shm_id);

int i;
for(i = 0;;i++){
sem_p(sem_pu_ok);

printf("pid is: %d NO:%d rcv data is:%s\n",getpid(),shm_addr->g,shm_addr->buf);

sem_v(sem_con_ok);
if(!strncmp(shm_addr->buf,"end",3))
break;
}

shmdt(shm_addr);
sem_v(sem_con_ok); /*唤醒生产者进程*/

exit(0);
}

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息