您的位置:首页 > 运维架构 > Linux

进程间通信之消息队列

2018-04-07 23:52 429 查看
前面我们介绍了进程间的通信方式之管道,接下来我们将介绍另外一种进程间的通信方式:消息队列

消息队列

一、什么是消息队列?

消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法

每个数据块都被认为有一个类型,接收者进程接受的数据块可以有不同的类型值

消息队列也有和管道一样的不足,每个消息的最大长度(MSGMAX)、每个消息队列总的字节数(MSGMNB)、系统消息队列的总数(MSGMNI)都有上限。

IPC对象数据结构

内核为每个IPC对象维护一个数据结构



该结构体中的key,是内核分配给每个消息队列的唯一标识符。

消息队列结构



消息队列在内核中的表示



另外,消息队列的生命周期随内核。

消息队列函数

msgget函数

功能:用来创建和访问一个消息队列
原型
int msgget(key_t key, int msgflag);
参数
key:某个消息队列的名字
msgflag:由九个权限标志位构成,它们的用法和创建文件时使用的消息队列,
单独使用IPC_CREAT,获得一个存下的消息队列。
返回值
成功返回一个非负整数,即该消息队列的标识码。失败返回-1。


msgctl函数

功能:消息队列的控制函数
原型
int msgctl(int msqid, int cmd, struct msqid_ds *buf)
参数
msqid:由msgget函数返回的消息队列标识码
cmd:是将要采取的动作。
返回值:成功返回0,失败返回-1


cmd:将要采取的动作:



msgsnd

功能:把一条消息添加到消息队列中
原型
int msgsnd(int msqid, const void *msgp, size_t msgsz,\
int msgflg)
参数
msqid:由msgget函数返回的消息队列标识码
msgp:是一个指针,指针指向准备发送的消息
msgsz:是msgp指向的消息长度,这个长度不包含保存消息类型的那个
long int长整型
msgflag:控制着当前消息队列满或达到系统上限时将要发生的事情
msgflg = IPC_NOWAIT表示队列满不等待,返回EAGAIN错误
返回值:成功返回0,失败返回-1


说明

1.消息结构在两个方面受到制约:
首先,它必须小于系统的上限值。
其次,它必须以一个long int长整数开始,接受者函数将利用这个整数

2.消息结构的参考形式如下:
struct msgbuf{
long mtype;
char mtext[1];
}


msgrcv函数

功能:从一个消息队列接收消息
原型
ssize_t msgrcv(int msqid, void *msgp, size_t msgsz,
long msgtyp, int msgflg);
参数
msqid:由msgget函数返回的消息队列标识码。
msgp:是一个指针,指向准备接受的消息。
msgsz:是msgp指向消息的长度,这个长度不包含保存消息类型的那个
long int长整型。
msgtyp:它可以实现接受优先级的简单形式
msgflag:控制着队列中没有相应类型的消息可供接收时要发生的事
返回值
成功返回实际放到接收缓冲区的字符个数,失败返回-1。


说明

msgtyp = 0 返回队列的第一条信息。
msgtyp > 0 返回队列第一条类型是msgtyp类型的消息。
msgtyp < 0 返回队列第一条类型小于等于msgtyp绝对值类型的消息,并且是
满足条件类型最小的消息。
msgflg = MSG_ERROR,消息大小超过msgsz时被截断。
msgflg = IPC_NOWAIT, 队列没有可读消息时不等待,返回ENOMSG错误。
msgtyp > 0且msgflg = MSG_EXCEPT,接受类型不等于msgtyp的第一条消息。


介绍完消息队列以及它的一些基本接口后,我们来写代码测试一下吧!

首先编写comm.h

#pragma once
#include<stdio.h>
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<string.h>
#include<unistd.h>

#define PATHNAME "."
#define PROJ_ID 0x6666

#define SERVER_TYPE 1
#define CLIENT_TYPE 2

struct msgbuf{
long mtype;
char mtext[1024];
};
//创建一个消息队列
int createMsgQueue();
//访问一个消息队列
int getMsgQueue();
//销毁消息队列
int destroyMsgQueue(int msgid);
//向消息队列发送消息
int sendMsg(int msgid, int who, char* msg);
//从消息队列读消息
int recvMsg(int msgid, int recvType, char out[]);


接着编写comm.c

#include"comm.h"

//创建或访问一个消息队列
static int commMsgQueue(int flags){
key_t _key = ftok(PATHNAME, PROJ_ID);//创建一个_key,
//保证接下来的测试函数访问的是一个消息队列,所以在创建或者访
//问消息队列的时候必须使用同一个_key

if(_key < 0){
perror("ftok");
return -1;
}

int msgid = msgget(_key, flags);//创建或者访问一个消息队列一个消息队列
if(msgid < 0){
perror("msgget");
}
return
c2e8
msgid;
}

//IP_CREAT和IPC——EXCL同时使用表示创建一个消息队列,IPC_CREAT单独使用表示访问一个消息队列
int createMsgQueue(){
return commMsgQueue(IPC_CREAT|IPC_EXCL|0666);//创建一个消息队列,权限为0666
}

int getMsgQueue(){
return commMsgQueue(IPC_CREAT);
}

int destroyMsgQueue(int msgid){
if(msgctl(msgid, IPC_RMID,NULL) < 0){ //IPC_RMID表示删除队列
perror("msgctl");
return -1;
}
return 0;
}

int sendMsg(int msgid, int who, char* msg){  //who表示一个长整数,接受者根据这个长整数确定消息类型
struct msgbuf buf;
buf.mtype = who;
strcpy(buf.mtext,msg);

if(msgsnd(msgid, (void*)&buf, sizeof(buf.mtext), 0) < 0){
perror("msgsnd");
return -1;
}

return 0;
}

int recvMsg(int msgid, int recvType, char out[]){
struct msgbuf buf;
if(msgrcv(msgid, (void*)&buf, sizeof(buf.mtext),recvType,0) < 0){
perror("msgrcv");
return -1;
}
strcpy(out,buf.mtext);
return 0;
}


接着编写两个函数模拟两个进程间通信

server.c

#include"comm.h"

int main(){
int msgid = createMsgQueue();//创建消息队列

char buf[1024];
while(1){
buf[0] = 0;
recvMsg(msgid, CLIENT_TYPE, buf);
printf("client# %s\n", buf);

printf("Please Enter# ");
fflush(stdout);   //刷新标准输出缓冲区

ssize_t s = read(0, buf, sizeof(buf));//从标准输入读数据到buf
if(s > 0){
buf[s - 1] = 0; //将最后一个元素置0,作为字符结束标志
sendMsg(msgid, SERVER_TYPE, buf);
printf("send done,wait recv...\n");
}
}
destroyMsgQueue(msgid);
return 0;
}


client.c

#include"comm.h"

int main(){
int msgid = getMsgQueue();//访问消息队列

char buf[1024];
while(1){
buf[0] = 0;
printf("Please Enter# ");
fflush(stdout);
ssize_t s = read(0, buf, sizeof(buf));
if(s > 0){
buf[s-1] = 0;
sendMsg(msgid, CLIENT_TYPE, buf);
printf("send done,wait recv...\n");
}
recvMsg(msgid, SERVER_TYPE, buf);
printf("server# %s\n", buf);
}
return 0;
}


Makefile

.PHONY:all
all: client server
client:client.c comm.c
gcc -o $@ $^
server:server.c comm.c
gcc -o $@ $^

.PHONY:clean
clean:
rm -f server client


测试效果





是不是很有意思?但是,在Ctrl c(异常终止)掉进程之后,我们再执行server会报错:File exists ,如下图:



这是因为在创建消息队列的时候,发现消息队列的名字_key已经存在,所以无法创建新的消息队列。

可是我们明明写了destroyMsgQueue函数,为什么消息队列没有被删除呢?这是因为,为了程序的测试,我们把server写成了死循环呀,是不会运行到destroyMsgQueue的。

这时,我们通过 ipcs -q命令查看所有消息队列,会发现这时有一个msgid为163840的消息队列。



我们使用 ipcrm -q 163840命令删除消息队列就ok了。



关于ipcs&ipcrm 的具体使用参见博客:ipcs&ipcrm的使用方法

以上的博客可能会有不妥或者BUG,欢迎大家发邮件到我的邮箱(Cyrus_wen@163.com)批评指正!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息