您的位置:首页 > 其它

进程间通信——消息队列

2016-07-23 12:19 253 查看
每个进程各自具有不同的用户地址空间,任何一个进程的全局变量在另外一个进程中看不到;所以进程之间要交换数据必须通过内核,在内核中开辟一块缓冲区,进程1把数据从用户空间拷到内核缓冲区,进程2再从内核缓冲区中读取。内核提供的这种机制被称为进程间通信(InterProcess Communication)。
XSI IPC包括消息队列、信号量、共享内存;他们都是依托标识符和键来实现的。下面介绍消息队列的实现和简单的一个测试程序。
在介绍之前先来介绍一下常用的ipcs的命令:
ipcs -a 默认的输出信息,打印出当前系统中所有的进程通信方式的信息;
ipcs -q 打印出消息队列的信息
ipcs -s 打印出信号量的信息
ipcs -m 打印出共享内存的信息
ipcrm 移除一个消息对象、共享内存、信号集,同事会将与ipc对象相关联的数据也一并移除(权限限制:root、ipc对象的创建者)。
ipcrm -q msqid 移除用msqid标识的消息队列
ipcrm -Q mgqkey 移除用msqkey创建的消息队列
另外两个类似于消息队列,不在赘述。
消息队列就是一个消息的链表。可以把消息看做一个记录,具有特定格式以及特定优先级。对消息队列有写权限的进程可以向中按照一定的规则添加新消息;对消息队列有读权限的进程则可以从消息队列中读走消息。消息队列是随内核的,记录消息队列的数据结构体位于内核中,只有在内核重启或者显示的删除一个消息队列时,该消息队列才真正的被删除。
在消息队列中,每一条消息队列msg_queue,定义在msg.h中:


/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first;          /* first message on queue,unused  */
struct msg *msg_last;           /* last message in queue,unused */
__kernel_time_t msg_stime;      /* last msgsnd time */
__kernel_time_t msg_rtime;      /* last msgrcv time */
__kernel_time_t msg_ctime;      /* last change time */
unsigned long  msg_lcbytes;     /* Reuse junk fields for 32 bit */
unsigned long  msg_lqbytes;     /* ditto */
unsigned short msg_cbytes;      /* current number of bytes on queue */
unsigned short msg_qnum;        /* number of messages in queue */
unsigned short msg_qbytes;      /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};


消息队列的操作主要包括:MSGSND:发送消息队列 MSGRCV:从队列中接受消息 MSGGET:打开消息队列 MSGCTL:控制消息队列
这几种操作在ipc.h中有定义宏:


#define MSGSND          11
#define MSGRCV          12
#define MSGGET          13
#define MSGCTL          14


int msgget(key_t key,int msgflg);
打开或创建消息队列:System V IPC中通过一个key来唯一标识一个IPC对象,。在消息队列中,一个key唯一标识一个队列。msgflg地段的9个位为权限标志。如果需要创建一个新的消息队列,需要设置IPC_CREAT标志。如果创建的ID已经存在,此函数不会出现错误,而只是忽略创建标志。但如果msgflg是联合使用IPC_CREAT|IPC+EXCL,那么如果创建的ID已经存在,此时返回错误,可以确保创建的是一个新的IPC对象。如果创建成功将会返回一个队列标识符msgid,失败则返回-1。


static int CommCreatMsgQ(int flg){
key_t _key = ftok(_FILE_PATH_,_PROJ_ID_);
if(_key < 0){
perror("ftok");
return -1;
}
else {
printf("_key = %d\n",_key);
int msg_id = msgget(_key,flg);
if(msg_id < 0){
perror("msgget");
return -1;
}
else
return msg_id;
}
return -1;
}

int CreatMsgQ(){
int flg = IPC_CREAT|IPC_EXCL|0644;
return CommCreatMsgQ(flg);
}

int GetMsgQ(){
int flg = IPC_CREAT;
return CommCreatMsgQ(flg);
}


上面的代码是创建一个新的消息队列和获取消息队列的函数。在CommCreatMsgQ()函数中使用到了ftok()函数。该函数的主要功能是产生一个新的key值。


ftok函数详情

int msgsnd(int msgid,const char *msgp,size_t msgsz,int msgflg)
发送消息到消息队列函数:msgid为msgget函数反回的标识符,已经不是key了。msgp为一个具体的消息内容,他只想一个struct msgbuf结构体:


struct msgbuf{
long mtype;
char mtext[_MSG_SIZE_];
}


msgflg主要用来控制当前消息队列无法容纳发送过来的数据或者消息内容的个数达到系统上限,操作的阻塞或者直接返回。如果被设置了IPC_NOWAIT,函数立即返回,不会发送消息,并且返回-1;如果清除了该标志,函数将会挂起等待队列,腾出可用空间,知道可以容纳完整的消息或者消息队列被删除,或者信号被中断。
注意:msgsnd()函数发送的数据保证数据的完整性,要么发送成功,要么发送失败;不会只发送一部分数据。


int snd(int msg_id,int type,const char *msg){
struct MsgBuf _msg;
_msg.mtype =  type;

strncpy(_msg.mtext,msg,strlen(msg)+1);

if(msgsnd(msg_id,&_msg,sizeof(_msg.mtext),0) < 0){
perror("msgsnd");
return -1;
}
return 0;
}


ssize_t msgrcv(int msgid,void *msgp,size_t msgsz,long msgtyp,int msgflg)
从消息队列中读取消息:前三个参数和msgsnd()一样。msgtyp是一个龙型的整数,用来标识要接受的消息类别。如果msgtyp=0,那么将获取队列第一个可用消息。如果msgtyp>0,那么将接受第一个相同类型的第一个消息。如果msgtyp<0,将获取类型等于或小于msgtyp绝对值的第一个消息。
msgflg用来控制当前队列没有相应类型的消息可以接受时,采取的操作。如果被设置为IPC_NOWAIT,函数将会立即返回,返回值为-1。如果该标志被清除,进程将会挂起等待,直到相应的消息到达,或者消息队列被删除,或被信号中断;


int rcv(int msg_id,int type,char *out){
struct MsgBuf _msg;
_msg.mtype = type;
memset(_msg.mtext,'\0',sizeof(_msg.mtext));
if(msgrcv(msg_id,&_msg,sizeof(_msg.mtext),type,0) < 0){
perror("msgrcv");
return -1;
}
else{
strcpy(out,_msg.mtext);
}
return 0;
}


int msgctl(int msgid,int cmd,msqid *buf);

消息队列控制函数:根据cmd的不同,该函数功能不一样,下面主要讨论三种:
1.IPC_STAT:检索当期当前消息队列的属性,返回的值储存在一个struct msqid_ds结构体中,该结构见下面。
2.IPC_SET:如果进程有足够权限,可以利用buf来设置队列属性。
3.IPC_RMID:用于删除队列。
struct msqid_ds是一个定义在/include/linux/msg.h中的结构体,相对来说,还是比较负责,如下


/* Obsolete, used only for backwards compatibility and libc5 compiles */
struct msqid_ds {
struct ipc_perm msg_perm;
struct msg *msg_first;          /* first message on queue,unused  */
struct msg *msg_last;           /* last message in queue,unused */
__kernel_time_t msg_stime;      /* last msgsnd time */
__kernel_time_t msg_rtime;      /* last msgrcv time */
__kernel_time_t msg_ctime;      /* last change time */
unsigned long  msg_lcbytes;     /* Reuse junk fields for 32 bit */
unsigned long  msg_lqbytes;     /* ditto */
unsigned short msg_cbytes;      /* current number of bytes on queue */
unsigned short msg_qnum;        /* number of messages in queue */
unsigned short msg_qbytes;      /* max number of bytes on queue */
__kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
__kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};


由于该结构体比较复杂,在实际开发过程中,特别在进行IPC_SET操作时候,正确的办法是先调用IPC_STAT得到这样一个结构体,然后再修改该结构体,最后再调用IPC_SET操作。


int DestoryMsgQ(int msg_id){
if(msgctl(msg_id,IPC_RMID,NULL) == -1){
perror("msgctl");
return -1;
}
return 0;
}


下面是一段测试代码,用于测试消息队列的创建、通信、销毁功能:


服务器端:

server.c
#include "comm.h"

int main(){
int msg_id = CreatMsgQ();
char buf[_MSG_SIZE_];
if(msg_id < 0){
printf("CreatMsgQ faild!\n");
return -1;
}
else{
printf("msg_id = %d\n",msg_id);
while(1){
memset(buf,'\0',sizeof(buf));

//sleep(10);
if(rcv(msg_id,_CLIENT_TYPE_,buf) < 0){
printf("rcv failed!\n");
return -1;
}
else{  //success

printf("client -> server:%s\n ",buf);

printf("please enter:");
fflush(stdout);

read(0,buf,sizeof(buf)-1);
snd(msg_id,_SERVER_TYPE_,buf);
}
}
}
if(DestoryMsgQ(msg_id) <  0){
printf("DestoryMsgQ failed!\n");
return -1;
}
else{
printf("DestoryMsgQ success!\n");
return 0;
}
}


客户端:

client.c
#include "comm.h"

int main(){
int msg_id = GetMsgQ();
char buf[_MSG_SIZE_];
if(msg_id < 0){
printf("GettMsgQ faild!\n");
return -1;
}
else{ //Get success
// printf("GetMsgQ msg_id = %d\n",msg_id);
while(1){
memset(buf,'\0',sizeof(buf));

read(0,buf,sizeof(buf)-1);
// sleep(5);
if(snd(msg_id,_CLIENT_TYPE_,buf) < 0){
printf("snd message failed!\n");
return -1;
}
else{  //snd success
rcv(msg_id,_SERVER_TYPE_,buf);
printf("server -> client:%s\n ",buf);

printf("please enter:");
fflush(stdout);
}
}
}

return 0;
}


服务器端主要功能是创建消息队列,然后接受客户端的消息,接收成功之后再发送消息给客户端;最后销毁消息队列。
客户端主要功能是得到消息队列标识符,然后发送消息给服务器端,发送成功后没接收服务器端发送的消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息