您的位置:首页 > 理论基础 > 计算机网络

UNIX网络编程——System V 消息队列

2015-11-24 10:20 435 查看
消息队列可以认为是一个消息链表,System V 消息队列使用消息队列标识符标识。具有足够特权的任何进程都可以往一个队列放置一个消息,具有足够特权的任何进程都可以从一个给定队列读出一个消息。在某个进程往一个队列写入消息之前,并不需要另外某个进程在该队列上等待消息的到达。System V
消息队列是随内核持续的,只有在内核重起或者显示删除一个消息队列时,该消息队列才会真正被删除。对于系统中没个消息队列,内核维护一个msqid_ds的信息结构:

struct msqid_ds
  {
    struct msqid_ds {
    struct ipc_perm msg_perm;
    struct msg *msg_first;      /* first message on queue,unused  */
    struct msg *msg_last;       /* last message in queue,unused */
    __kernel_time_t msg_stime;  /* last msgsnd time */
    __kernel_time_t msg_rtime;  /* last msgrcv time */
    __kernel_time_t msg_ctime;  /* last change time */
    unsigned long  msg_lcbytes; /* Reuse junk fields for 32 bit */
    unsigned long  msg_lqbytes; /* ditto */
    unsigned short msg_cbytes;  /* current number of bytes on queue */
    unsigned short msg_qnum;    /* number of messages in queue */
    unsigned short msg_qbytes;  /* max number of bytes on queue */
    __kernel_ipc_pid_t msg_lspid;   /* pid of last msgsnd */
    __kernel_ipc_pid_t msg_lrpid;   /* last receive pid */
};

下面这个程序是创建一个消息队列,然后网消息队列中放置一个含有1个字节的消息,发出msgctl的IPC_STAT命令,使用system函数执行ipcs命令,最后使用IPC_RMID命令删除该队列。(书本例子)

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

#define MSG_R 0400 /* read permission */
#define MSG_W 0200 /* write permission */

#define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)

typedef unsigned long ulong_t;

int main()
{
int msqid;
struct msqid_ds info;
struct msgbuf buf;
if((msqid = msgget(IPC_PRIVATE,SVMSG_MODE|IPC_CREAT)) == -1)
{
perror("msgget() error");
exit(-1);
}
buf.mtype = 1;
buf.mtext[0] = 1;
if(msgsnd(msqid,&buf,1,0) == -1)
{
perror("msgsnd() error");
exit(-1);
}
if(msgctl(msqid,IPC_STAT,&info) == -1)
{
perror("msgctl() error");
exit(-1);
}
printf("read-write: %03o,cbytes = %lu,qnum =%lu,qbytes = %lu\n",
info.msg_perm.mode & 0777,(ulong_t) info.msg_cbytes,
(ulong_t) info.msg_qnum,(ulong_t) info.msg_qbytes);
system("ipcs -q");
if(msgctl(msqid,IPC_RMID,NULL) == -1)
{
perror("msgctl() error");
exit(-1);
}
exit(0);
return 0;
}

这个是对System V的一些函数的测试,下面看一个客户服务器的例子:

先服务器创建两个System V的消息队列,然后发送消息给服务器,服务器接受到消息后开始给客户反馈消息。

客服端:#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MQ_KEY1 1234L //定义两个KEY
#define MQ_KEY2 2345L
#define MAXMESGDATA 1024
#define MAXMSG sizeof(long)*2+1024
void client(int ,int);

int main(int argc,char *argv[])
{
int readid,writeid;
//打开消息队列
//MO_KEY2用于客户从服务器读取
if((readid = msgget(MQ_KEY2,0)) == -1)
{
perror("msgget() error");
exit(-1);
}
//MO_KEY用于客户向服务器写
if((writeid = msgget(MQ_KEY1,0))==-1)
{
perror("msgget() error");
exit(-1);
}
client(readid,writeid);
msgctl(readid,IPC_RMID,NULL); //删除消息队列
msgctl(writeid,IPC_RMID,NULL);
exit(0);
}

void client(int readid ,int writeid)
{
ssize_t n,len;
struct msgbuf *buff,*ptr;
char content[MAXMESGDATA];
long type;
printf("Enter the msg_type: ");
scanf("%ld",&type);
printf("Enter the msg_data: ");
scanf("%s",content);
len = strlen(content);
ptr =(msgbuf *) calloc(sizeof(long)*2+len,sizeof(char));
ptr->mtype = type;
strcpy(ptr->mtext,content);
printf("Sent to server.\n");
if(msgsnd(writeid,ptr,len+sizeof(long),0) == -1)
{
perror("msgsnd() error");
exit(-1);
}
printf("Sent to server successfully.\n");
free(ptr);
printf("*********************************\n");
buff = (msgbuf*)malloc(MAXMSG );
printf("waiting for server......\n");
//以阻塞形式读取
if((n=msgrcv(readid,buff,MAXMSG,0,0)) == -1)
{
perror("msgrcv() error");
exit(-1);
}
printf("Received message from server is:\nmsg_type = %ld\nmsg_data = %s\n",
buff->mtype,buff->mtext);
free(buff);
}
服务器端:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#define MQ_KEY1 1234L //定义两个KEY
#define MQ_KEY2 2345L
#define MAXMESGDATA 1024
#define MAXMSG sizeof(long)*2+1024
#define MSG_R 0400 /* read permission */
#define MSG_W 0200 /* write permission */
#define SVMSG_MODE (MSG_R | MSG_W | MSG_R >>3 | MSG_R >>6)

void server(int ,int);

int main(int argc,char *argv[])
{
int readid,writeid;
if((readid = msgget(MQ_KEY1,SVMSG_MODE | IPC_CREAT)) == -1)
{
perror("msgget() error");
exit(-1);
}
if((writeid = msgget(MQ_KEY2,SVMSG_MODE | IPC_CREAT)) == -1)
{
perror("msgget() error");
exit(-1);
}
server(readid,writeid);
exit(0);
}

void server(int readid,int writeid)
{
ssize_t n,len;
struct msgbuf *buff,*ptr;
char content[MAXMESGDATA];
long type;
buff =(msgbuf*) malloc(MAXMSG );
printf("waiting for client......\n");
if((n=msgrcv(readid,buff,MAXMSG,0,0)) == -1)
{
perror("msgrcv() error");
exit(-1);
}
printf("Received message from client is:\nmsg_type = %ld\nmsg_data = %s\n",
buff->mtype,buff->mtext);
free(buff);
printf("*********************************\n");
printf("Enter the msg_type: ");
scanf("%ld",&type);
printf("Enter the msg_data: ");
scanf("%s",content);
len = strlen(content);
ptr =(msgbuf*) calloc(sizeof(long)*2+len,sizeof(char));
ptr->mtype = type;
strcpy(ptr->mtext,content);
printf("Sent to client.\n");
if(msgsnd(writeid,ptr,len,0) == -1)
{
perror("msgsnd() error");
exit(-1);
}
printf("Sent to client successfully.\n");
free(buff);
}
消息复用:消息队列中的消息结构可以由我们自由定义,具备较强的灵活性。通过消息结构可以共享一个队列,进行消息复用。通常定义一个类似如下的消息结构:
#define MSGMAXDAT     1024
struct mymsg
{
long msg_len;   //消息长度
long msg_type; //消息类型
long msg_data[MSGMAXDATA]; //消息内容
};


消息结构相关联的类型字段(msg_type)提供了两个特性:

(1)标识消息,使得多个进程在单个队列上复用消息。

(2)用作优先级字段,允许接收者以不同于先进先出的某个顺序读出各个消息。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: