您的位置:首页 > 运维架构 > Linux

linux消息队列

2009-08-13 17:48 393 查看
基本概念

1.队列

队列是信息的线性表,它的访问次序是先进先出(FIFO)。也就是说,置入队列中的

第一个数据项将是从队列中第一次读出的数据项,置入的第二项将是读出的第二项,依

此类推。这是队列允许的唯一存取操作,其它随机访问是不允许的。这种数据结构保证

对数据资源的请求将严格按照先后顺序进行,因而可用于对事件的调度并起到I/O缓冲

的作用。

2.报文

发送进程和接收进程进行信息的交换,一般是通过将信息划分为若干段放入数据交

换缓冲器中,进程间通过对该缓冲器的存取来实现通信。因此,数据是以不连续的形式

在进程间传送,这些不连续的部分就叫报文。

3.消息队列

将报文按队列的结构进行组织就叫消息队列。该队列用于存放正被发送或接收的

每一个报文的标题信息。每一个消息队列还对应有一个数据结构,它含有消息队列的

存取权限,和消息队列的当前状态信息等信息。消息队列可进行"发送"和"接收"操作。

消息队列的编程要点及运作过程

1.消息队列的创建

在报文能够发送和接收之前,必须创建一个能够唯一被识别出的消息队列和数据结

构,这个被创建的唯一标识符叫做消息队列描述符(msqid),用来识别或引用相关的消

息队列和数据结构。用msgget(longkey,intmsgflg)系统调用来创建消息队列,其中

key是一个长整型,可由用户设定也可通过ftok()获得。msgflg的值是八进制的消息队

列操作权和控制命令的组合。操作权定义为:

操作允许权八进制整数

用户可读0400

用户可写0200

同组可读0040

同组可写0020

其它可读0004

其它可写0002

操作权可相加而派生,如用户可"读"、"写"的权限为0400|0200=0600。控制命令

可取IPC_CREAT或IPC_EXCL。如果要创建一个key=888且属主和同组可读写的消息队列,

执行以下系统调用msgget(0x888,0660|IPC_CREAT)。创建后可用ipcs命令看到以下信

息:

IPCstatusfrom/dev/memasofSun

Jan2506:49:521970

TIDKEYMODEOWNERGROUP

MessageQueues:

.q70x00000888--rw-rw

rootsystem

...

它的消息队列描述符是7,属主是root,同组是system,存取权是属主、用户可读写

。如果执行msgget(0x888,0660|IPC_CREAT)时,与0x888对应的消息队列已存在,则返

回该消息队列的描述符msqid。
2.消息的发送

消息队列一经创建即可用msgsnd(intmsqid,void*msgp,size_tmsgsz,intmsgflg)

发送消息。msgqid是经msgget创建的消息队列描述符,msgp是指向消息段的指针,该

指针所指结构含有报文类型和要发送或接收的报文:

structmsgbuf{

longmtype;/*消息类型*/

charmtext[512];

/*消息正文,512暂定为消息段的大小*/

}

msgsz是msgp参量指向的数据结构中字符数组的长度,即报文长度,最大值由MSGMAX

确定。msgflg是当消息队列满时(队列中无空闲空间),系统要采取的行动.如果

msgflg&IPC_NOWAIT=真,调用进程立即返回,不发送该消息。如果

msgflg&IPC_NOWAIT=假,调用进程暂停执行,处于"挂起"状态,且不发送该消息。直到

下列情况之一出现:
引起暂停的条件不再存在,如队列出现空闲,即可发送
该消系队列被从系统中删去
调用进程接收到一个要捕捉的信号,如中断信号,此时不发送消息,调用进程按

signal中描述的方式执行。

如果msgsnd返回0则发送成功。返回-1则表示发送失败,错误类型可具体查看

errno。

3.消息的接收

用msgrcv(intmsqid,void*msgp,size_tmsgsz,longmsgtyp,intmsgflg)系统调用从

msqid消息队列中读取一条信息并将其放入消息段指针msgp指向的结构。msgsz给出

mtext的字节数,如果所接收的消息比msgsz大且msgflg&MSG_NOERROR为真,则按msgsz

的大小截断而不通知调用进程。msgtyp指定要求的消息类型:

msgtyp=0接收消息队列中的第一个报文
msgtyp>0接收消息队列中的类型为msgtyp的第一个报文
msgtyp<0接收消息队列中小于等于msgtyp绝对值的最低类型的第一个报文

当队列上没有所期望类型的消息或消息队列为空时msgflg指出调用进程要采取的

行动:如果msgflg&IPC_NOWAIT为真,则调用进程立即结束并返回-1。如

msgflg&IPC_NOWAIT为假,则调用进程暂停执行直至出现:

队列中放入所需类型的消息,调用进程接收该消息
msqid消息队列从系统中删除
调用进程接收到捕获的信号,此时不接收消息,调用进程按signal描述的方式执行



如果msgrev执行成功,则返回放入mtext中的字节数,失败返回-1,错误类型可查

errno。

4.消息队列的控制和撤销

用msgctl(intmsqid,intcmd,structmsqid_ds*buf)系统调用实现对消息队列的控

制。msgqid必须是用msgget创建的消息队列描述符。cmd可以是:

IPC_STAT查看消息队列的状态,结果放入buf指针指向的结构
IPC_SET为消息队列设置属主标识,同组标识,操作允许权,最大字节数
IPC_RMID删除指定的msqid以及相关的消息队列和结构

四、编程示例

下面给出一个运用消息队列,实现进程通信的实例。以下程序在IBMRS/6000小型机

(AIX操作系统)上和IBMPC(UNIX操作系统)上分别调试通过。该程序主要模拟根据

帐号查询余额的过程。包括三方面:

请求进程从标准输入读入帐号,并将该帐号通过消息队列发送给服务进程;

服务进程接收该帐号后,按照请求的先后顺序在标准输入上输入该帐户的姓名和余额,

并将结果返回给客户进程;

请求进程接收返回的信息,并将结果输出在标准输出上。
服务进程(msgcenter)先于请求进程(msgreq)启动.客户进程启动时要携带请求编

号,可同时起动多个请求进程。

程序在HP-unix有点问题!! C,S都生成ID一样的消息队列
而且要加两个头文件:#include <stdlib.h> #include <string.h>
/*请求方程序msgreq.c*/

#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
static struct msgbuf1
{
long mtype;
char mtext[100];
} sndbuf, rcvbuf, *msgp ;

extern int errno;

int main(int argc, char **argv)
{
int rtrn, msqid ;
char name[10];
double balance;

if (argc!=2)
{
fprintf(stderr,"msgreq [01-99]/n"); exit(-1);
}
if ((msqid = msgget(0x888, IPC_CREAT|0660)) == -1 )
{
fprintf(stderr, "msgget 888 failed !/n");
//exit(-1);
}
msgp=&sndbuf;

sprintf(sndbuf.mtext,"%2.2s",argv[1]);
printf("输入4位帐号:");
scanf("%s",&sndbuf.mtext[2]);

sndbuf.mtext[6]=0;
msgp->mtype=666;
rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);
if (rtrn==-1)
{
perror("msgsnd"); exit(-1);
}
msgp=&rcvbuf;

fprintf(stderr,"等待后台数据处理进程的回答....");

rtrn=msgrcv(msqid,msgp, 100, atoi(argv[1]), 0);
if(rtrn==-1)
{
perror("msgrcv"); exit(-1);
}

sscanf(rcvbuf.mtext,"%[^|]|%lf",name,&balance);
printf("/n姓名=%s/n",name);
printf("余额=%lf/n",balance);
}

/*服务方程序msgcenter.c*/
#include <unistd.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

static struct msgbuf1
{
long mtype;
char mtext[100];
} sndbuf, rcvbuf , *msgp;

extern int errno;

int main()
{
int rtrn, msqid ;
char strbuf[100];

if ( (msqid = msgget(0x888, IPC_CREAT|0600)) == -1 )
{
fprintf(stderr, "msgget 888 failed !/n"); exit(-1);
}

while(1)
{
msgp=&rcvbuf;
fprintf(stderr,"等待前台进程的请求....");

rtrn=msgrcv(msqid, msgp, 100, 666 ,MSG_NOERROR);
if(rtrn==-1)
{
perror("msgrcv");exit(-1);
}
msgp=&sndbuf;
sprintf(strbuf,"%2.2s/0",rcvbuf.mtext);
msgp->mtype=atoi(strbuf);
printf("/n输入帐号=%4.4s的帐户姓名:",&rcvbuf.mtext[2]);

scanf("%s",sndbuf.mtext);
strcat(sndbuf.mtext,"|");
printf("输入该帐户余额:");
scanf("%s",strbuf);
strcat(sndbuf.mtext,strbuf);
rtrn=msgsnd(msqid,msgp, strlen(sndbuf.mtext), 0);
if (rtrn==-1)
{
perror("msgsnd");
exit(-1);
}
}
}

eg.2

编写了012号与013号程序,分别是关于消息队列和共享内存的,在机器上也已经调试过了,现在把代码贴在下面:

/*********************程序相关信息*********************
程序编号:012
程序编写起始日期:2008.11.1
程序编写完成日期:2008.11.1
程序修改日期: 修改备注:
程序目的:学习linux消息队列通信
所用主要函数:msgget(),msgsnd(),msgrcv(),msgctl()
程序存疑:
程序完成地点: 宿舍内
*********************程序相关信息*********************/
#include<sys/types.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#include<stdio.h>
#include<string.h>
#include<stdlib.h>
int main()
{
int pid,msqid;//后者为消息队列识别代号
struct msgbuf
{
long mtype;//消息类型
char mtext[20];//消息内容
}send_buf,receive_buf;
if((msqid=msgget(IPC_PRIVATE,0700))<0)//建立消息队列
{
printf("msgget建立消息队列失败。/n");
exit(1);
}
else
printf("msgget建立消息队列成功,该消息队列识别代号为%d。/n",msqid);
if((pid=fork())<0)
{
printf("fork()函数调用失败!/n");
exit(2);
}
else if(pid>0)//父进程,发送消息到消息队列
{
send_buf.mtype=1;
strcpy(send_buf.mtext,"My test information");
printf("发送到消息队列的信息内容为:%s/n",send_buf.mtext);
if(msgsnd(msqid,&send_buf,20,IPC_NOWAIT)<0)//发送send_buf中的信息到msqid对应的消息队列
{
printf("msgsnd消息发送失败。/n");
exit(3);
}
else
printf("msgsnd消息发送成功。/n");
sleep(2);
exit(0);
}
else//子进程,从消息队列中接收消息]
{
sleep(2);//等待父进程发送消息完成
int infolen;//读到的信息数据长度
if((infolen=msgrcv(msqid,&receive_buf,20,0,IPC_NOWAIT))<0)//自消息队列接收信息
{
printf("msgrcv读取信息错误。/n");
exit(4);
}
else
printf("msgrcv读取信息成功。/n");
printf("自消息队列读取到的内容为%s,共读取%d个字节。/n",receive_buf.mtext,infolen);
if((msgctl(msqid,IPC_RMID,NULL))<0)//删除msqid对应的消息队列
{
printf("msgctl函数调用出现错误。/n");
exit(5);
}
else
{
printf("识别代号为%d的消息队列已经被成功删除。/n",msqid);
exit(0);
}
}
}
/*********************程序运行结果*********************
[root@localhost temp]# ./msg
msgget建立消息队列成功,该消息队列识别代号为98304。
发送到消息队列的信息内容为:My test information
msgsnd消息发送成功。
msgrcv读取信息成功。
自消息队列读取到的内容为My test information,共读取20个字节。
识别代号为98304的消息队列已经被成功删除。
***********************************************************/

/*********************程序相关信息*********************
程序编号:013
程序编写起始日期:2008.11.1
程序编写完成日期:2008.11.1
程序修改日期: 修改备注:
程序目的:学习linux共享内存
所用主要函数:shmget(),shmat(),shmctl(),shmdt()
程序存疑:
程序完成地点: 宿舍内
*********************程序相关信息*********************/
#include<sys/ipc.h>
#include<sys/shm.h>
#include<stdlib.h>
#include<string.h>
#include<stdio.h>
int main()
{
int pid,shmid;//后者为共享内存识别代号
char *write_address;
char *read_address;
struct shmid_ds dsbuf;
if((shmid=shmget(IPC_PRIVATE,32,0))<0)//分配共享内存
{
printf("shmid共享内存分配出现错误。/n");
exit(1);
}
else
printf("shmid共享内存分配成功,共享内存识别代号为:%d。/n",shmid);
if((pid=fork())<0)
{
printf("fork函数调用出现错误!/n");
exit(2);
}
else if(pid>0)//父进程,向共享内存中写入数据
{
printf("父进程的ID是:%d/n",getpid());
write_address=(char *)shmat(shmid,NULL,0);//连接共享内存
if((int)write_address==-1)
{
printf("shmat连接共享内存错误。/n");
exit(3);
}
else
{
printf("shmat连接共享内存成功。/n");
strcpy(write_address,"我是写入共享内存的测试数据");//将数据写入共享内存
printf("写入共享内存的信息为“%s”。/n",write_address);
if((shmdt((void *)write_address))<0)//断开与共享内存的连接
printf("shmdt共享内存断开错误。/n");
else
printf("shmdt共享内存断开成功。/n");
sleep(2);
return;
}
}
else//子进程,从共享内存中读取数据
{
sleep(2);//等待父进程写入共享内存完毕
printf("子进程ID是:%d/n",getpid());
if((shmctl(shmid,IPC_STAT,&dsbuf))<0)
{
printf("shmctl获取共享内存数据结构出现错误。/n");
exit(4);
}
else
{
printf("shmctl获取共享内存数据结构成功。/n建立这个共享内存的进程ID是:%d/n",dsbuf.shm_cpid);
printf("该共享内存的大小为:%d/n",dsbuf.shm_segsz);
if((read_address=(char *)shmat(shmid,0,0))<0)//连接共享内存
{
printf("shmat连接共享内存出现错误。/n");
exit(5);
}
else
{
printf("自共享内存中读取的信息为:“%s”。/n",read_address);
printf("最后一个操作该共享内存的进程ID是:%d/n",dsbuf.shm_lpid);
if((shmdt((void *)read_address))<0)//断开与共享内存的连接
{
printf("shmdt共享内存断开错误。/n");
exit(6);
}
else
printf("shmdt共享内存断开成功。/n");
if(shmctl(shmid,IPC_RMID,NULL)<0)//删除共享内存及其数据结构
{
printf("shmctl删除共享内存及其数据结构出现错误。/n");
exit(7);
}
else
printf("shmctl删除共享内存及其数据结构成功。/n");
exit(0);
}
}
}
}
/*********************程序运行结果*********************
[root@localhost temp]# ./shm
shmid共享内存分配成功,共享内存识别代号为:1703947。
父进程的ID是:7647
shmat连接共享内存成功。
写入共享内存的信息为“我是写入共享内存的测试数据”。
shmdt共享内存断开成功。
子进程ID是:7648
shmctl获取共享内存数据结构成功。
建立这个共享内存的进程ID是:7647
该共享内存的大小为:32
自共享内存中读取的信息为:“我是写入共享内存的测试数据”。
最后一个操作该共享内存的进程ID是:7647
shmdt共享内存断开成功。
shmctl删除共享内存及其数据结构成功。
***********************************************************/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: