您的位置:首页 > 其它

进程间通信之消息队列

2016-04-11 22:17 309 查看
一.概述:
消息队列是消息的链表,存放在内核中并由消息队列标识符标志,提供了一种从一个进程向另一个进程发送数据块(数据块有类型)的方法。我们可以通过发送消息来避免命名管道的同步和阻塞问题。



注:(1).最重要的是前三条和最后两条。
(2).消息队列与命名管道有一样的不足足,就是每个消息的最大大长度是有上限的(MSGMAX),每个消息队列的总的字节数是有上限的(MSGMNB),系统上消息队列的总数也有一一个上限(MSGMNI)。

二.相关函数:
int msgget(ket_t ket, int msgflg) :创建新消息队列或打开已经存在的消息队列。

key:可以认为是一个端口号,也可以用ftok函数(下面讲)创建。
msgflg:
IPC_CREAT:如果IPC不存在,则创建一个IPC资源,否则直接打开进行操作。
IPC_EXCL:只有在共享内存不存在的时候,共享内存才创建,如果共享内存本来就存在,则会产生错误。
返回值:如果成功,则返回消息队列标识符,否则,返回-1.
注:(1).如果将IPC_CREAT 和IPC_EXCL 一起使用:如果IPC资源不存在,则返回一个新建的IPC资源,如果IPC本来就存在,则返回-1。
(2).IPC_EXCL单独存在的意义不大,它一般用于和IPC_CREAT一起使用,用来保证返回的是一个新的IPC资源,而不是一个已存在的。
2. ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp, int msgflg) :从消息队列中取消息;
int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg) :将数据发到消息队列中。
msqid:消息队列的标识符码。
msgp:指向消息缓存区的指针,用来暂时存储接受和发送的消息,是一个用户可自定义的数据结构。形态如下:
struct msgp
{
long mytype;//大于0
char mytest[用户指定大小];

}
msgsz:消息的大小。
msgtyp:从消息队列内读取的消息形态。如果值为0,则标识消息队列中所有的消息都会被读取。
msgflg :用来指明核心程序在队列没有数据的情况下所应采取的行行动。如果msgflg和IPC_NOWAIT合用,则在msgsnd()执行行时若是消息队列已满,则msgsnd()将不会阻塞,而会立立即返回-1,如果执行的是msgrcv(),则在消息队列呈空时,不做等待马上返回-1,并设定错误码为ENOMSG。当msgflg为0时,msgsnd()及msgrcv()在队列呈满或呈空的情形时,采取阻塞等待的处理模式。
返回值: On failure both functions return -1 with errno indicating the error,otherwise msgsnd() returns 0 and msgrcv() returns the number of bytes actually copied into the mtext array.
3. int msgctl ( int msgqid, int cmd, struct msqid_ds *buf ) :设置信息队列属性。
msgctl 系统调用用对 msgqid 标识的消息队列执行行 cmd 操作,系统定义了 3 种 cmd 操作:
IPC_STAT : 该命令用用来获取消息队列对应的 msqid_ds 数据结构,并将其保存到 buf 指
定的地址空间。
IPC_SET : 该命令用用来设置消息队列的属性,要设置的属性存储在buf中。
IPC_RMID : 从内核中删除 msqid 标识的消息队列。
4. key_t ftok(const char* path, int proj_id) :把一个已经存在的路径名和一个整数转换为一个key_t值,称为IPC键。成功返回一个key_t值,否则返回-1;

三.用消息队列实现双向通信的代码:

//com.h:
#pragma once
#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<sys/types.h>
#include<unistd.h>
#include<sys/ipc.h>
#include<sys/msg.h>
#define PATH "."
#define BUF_SIZE 1024
#define PROJ_ID 0x777 // ...........?
#define SERVER_TYPE 1
#define CLIENT_TYPE 2

struct msgbuf
{
long mytype;
char mytest[BUF_SIZE];
};
static int msg_queue(int flag);//得到一个消息队列 用于本文件
int set_msg_queue();//设置消息队列的接口
int msg_queue_send(int msg_id, const char* msg, long type);//发送消息的接口
int msg_queue_recv(int msg_id, char* buf, long type);//接受消息的接口

//com.c
#include"com.h"

static int msg_queue(int flag)
{
key_t key = ftok(PATH,PROJ_ID);
if(key < 0)
{
printf("ftok is error\n");
return -1;
}

int msg_id = msgget(key,flag);
if(msg_id < 0)
{
printf("msgget is error\n");
return -1;
}

return msg_id;
}

int get_msg_queue()
{
return msg_queue(IPC_CREAT);
}

int set_msg_queue() //设置消息队列的读写执行权限
{
umask(0);
return msg_queue(IPC_CREAT | IPC_EXCL | 0666);
}

int msg_queue_send(int msg_id, const char* msg, long type)
{
struct msgbuf buf;
buf.mytype = type;
strcpy(buf.mytest, msg);
if(msgsnd(msg_id, &buf, strlen(buf.mytest), 0) < 0)
{
perror("msgsnd");
return -1;
}
return 0;
}

int msg_queue_recv(int msg_id, char *msg, long type)
{
int ret = 0;
struct msgbuf buf;
buf.mytype = type;
if(msgrcv(msg_id, &buf, sizeof(buf.mytest), 0, 0) < 0)
{
perror("msgrcv");
return -1;
}

strcpy(msg, buf.mytest);//把消息的内容拷到msg中
return strlen(msg);
}

server.c:
#include"com.h"

int main()
{
int msg_id = get_msg_queue();
if(msg_id < 0)
{
printf("message queue create failed\n");
return -1;
}

char buf[BUF_SIZE];
while(1) //服务器端先接受消息 再发送消息
{

memset(buf, '\0', sizeof(buf) - 1);
int ret = msg_queue_recv(msg_id, buf, CLIENT_TYPE);
if(ret < 0)
{
printf("message recv failed\n");
return -1;
}
else
{
if(strcmp(buf,"quit") == 0)
{
printf("client closed\n");
return 0;
}

buf[ret] = '\0';
printf("client say:%s\n",buf);
}

printf("please write yous words : ");
fflush(stdout);

memset(buf, '\0', sizeof(buf) - 1);
gets(buf);
if(msg_queue_send(msg_id, buf, SERVER_TYPE) < 0)
{
printf("message send failed\n");
return -1;
}
}
destory(msg_id);
return 0;
}

client.c
#include"com.h"

int main()
{
int msg_id = get_msg_queue();
if(msg_id < 0)
{
printf("message queue creat failed\n");
return -1;
}

char buf[BUF_SIZE];
while(1)
{
printf("please write your words:");
fflush(stdout);

memset(buf, '\0', sizeof(buf) - 1);
gets(buf);
if(msg_queue_send(msg_id, buf, CLIENT_TYPE) < 0)
{
printf("message send failed\n");
return -1;
}

memset(buf, '\0', sizeof(buf) - 1);
int ret = msg_queue_recv(msg_id, buf, SERVER_TYPE);
if(ret < 0)
{
printf("message recv failed\n");
return -1;
}
else
{
if(strcmp(buf, "quit") == 0)
{
printf("server closed!\n");
return 0;
}

buf[ret] = '\0';
printf("server say:%s\n",buf);
}
}
destory(msg_id);
return 0;
}

Makefile:
.PHONY:all
all:server client
server:server.c com.c
gcc -o $@ $^
client:client.c com.c
gcc -o $@ $^
.PHONY:clean
clean:
rm -f server client

执行结果:




问题:当输入quit时,缓存buf会出现以前的内容,不知道为什么?(我send之前memset了,recv也memset了)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  进程 通信 消息队列