您的位置:首页 > 理论基础 > 计算机网络

进程间通信——Posix消息队列

2013-05-07 11:11 561 查看
消息队列具有随内核的持续性,队列中的每个消息具有如下属性:一个unsigned int 优先级或一个long 类型;消息的数据部分长度;数据本身

#include <mqueue.h>
mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */ ); // mq描述符/-1

int mq_close(mqd_t mqdes); // 0/-1
int mq_unlink(const char *name); // 0/-1

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
int mq_setattr(mqd_t mqdes, struct mq_attr *attr); // 0/-1

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio); // 0/-1
int mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop); // bytes/-1

int mq_notify(mqd_t mqdes, const struct sigevent *notification); // 0/-1

mq_open函数的oflag参数,是O_RDONLY, O_WRONLY, O_RDWR之一,可能按位或上O_CREAT, O_EXCL, O_NONBLOCK
mq_unlink函数类似于unlink函数删除一个文件的机制,当一个消息队列的引用计数仍大于0时,其name就能删除,但是该队列的析构要到最后一个mq_close发生时才进行

struct mq_attr{
long mq_flags; // 0, O_NONBLOCK
long mq_maxmsg;
long mq_msgsize;
long mq_curmsgs;
};
创建一个新的队列时,给它指定mq_maxmsg, mq_msgsize属性,mq_open忽略另外另个成员;mq_setattr只使用mq_flags成员,已设置或清除非阻塞标志,另外3成员被忽略

每个消息有一个优先级,是一个小于MQ_PRIO_MAX的无符号整数,可以通过 getconf MQ_PRIO_MAX获得,mq_receive总是返回所指定队列中最高优先级的最早消息,其len参数的值不能小于mq_msgsize,否则返回EMSGSIZE。mq_send的prio参数是待发送消息的优先级,其值必须小于MQ_PRIO_MAX;若应用不必使用优先级不同的消息,指定值为0的优先级。

Posix消息队列允许异步事件通知,以告知何时有一个消息放置到了某个空消息队列中。通知方式有两种:产生一个信号;创建一个线程来执行一个指定的函数。

union sigval {
int sival_int;
void *sival_ptr;
};
struct sigevent {
int sigev_notify; // SIGEV_{NONE,SIGNAL,THREAD}
int sigev_signo;
union sigval sigev_value;
void (*sigev_notify_function)(union sigval);
pthread_attr_t *sigev_notify_attributes;
};
异步事件通知的若干规则:

notification参数非空,该进程注册为接受该队列的通知;否则已存在的注册被撤销。任意时刻只有一个进程可以被注册为接受某个给定队列的通知;若当前队列为空且有一个进程被注册通知,只有在没有任何线程阻塞在该队列的recv调用的前提下通知才被发出,recv调用优先;通知发出后,注册被注销,如果想要的话需再次重新注册。

//非阻塞mq_receive的信号通知
#include	"unpipc.h"

volatile sig_atomic_t	mqflag;		/* set nonzero by signal handler */
static void	sig_usr1(int);

int
main(int argc, char **argv)
{
mqd_t	mqd;
void	*buff;
ssize_t	n;
sigset_t	zeromask, newmask, oldmask;
struct mq_attr	attr;
struct sigevent	sigev;

if (argc != 2)
err_quit("usage: mqnotifysig3 <name>");

/* 4open queue, get attributes, allocate read buffer */
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);

Sigemptyset(&zeromask);		/* no signals blocked */
Sigemptyset(&newmask);
Sigemptyset(&oldmask);
Sigaddset(&newmask, SIGUSR1);
/* 4establish signal handler, enable notification */
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);

for ( ; ; ) {
Sigprocmask(SIG_BLOCK, &newmask, &oldmask);	/* block SIGUSR1 */
while (mqflag == 0)
sigsuspend(&zeromask);
mqflag = 0;		/* reset flag */

Mq_notify(mqd, &sigev);			/* reregister first */
while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
Sigprocmask(SIG_UNBLOCK, &newmask, NULL);	/* unblock SIGUSR1 */
}
exit(0);
}

static void
sig_usr1(int signo)
{
mqflag = 1;
return;
}


//sigwait版本的信号通知
#include	"unpipc.h"

int
main(int argc, char **argv)
{
int		signo;
mqd_t	mqd;
void	*buff;
ssize_t	n;
sigset_t	newmask;
struct mq_attr	attr;
struct sigevent	sigev;

if (argc != 2)
err_quit("usage: mqnotifysig4 <name>");

/* 4open queue, get attributes, allocate read buffer */
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);

Sigemptyset(&newmask);
Sigaddset(&newmask, SIGUSR1);
Sigprocmask(SIG_BLOCK, &newmask, NULL);		/* block SIGUSR1 */

/* 4establish signal handler, enable notification */
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);

for ( ; ; ) {
Sigwait(&newmask, &signo);
if (signo == SIGUSR1) {
Mq_notify(mqd, &sigev);			/* reregister first */
while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
}
}
exit(0);
}
//select版本的信号通知
#include	"unpipc.h"

int		pipefd[2];
static void	sig_usr1(int);
/* $$.bp$$ */
int
main(int argc, char **argv)
{
int		nfds;
char	c;
fd_set	rset;
mqd_t	mqd;
void	*buff;
ssize_t	n;
struct mq_attr	attr;
struct sigevent	sigev;

if (argc != 2)
err_quit("usage: mqnotifysig5 <name>");

/* 4open queue, get attributes, allocate read buffer */
mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);
buff = Malloc(attr.mq_msgsize);

Pipe(pipefd);

/* 4establish signal handler, enable notification */
Signal(SIGUSR1, sig_usr1);
sigev.sigev_notify = SIGEV_SIGNAL;
sigev.sigev_signo = SIGUSR1;
Mq_notify(mqd, &sigev);

FD_ZERO(&rset);
for ( ; ; ) {
FD_SET(pipefd[0], &rset);
nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);

if (FD_ISSET(pipefd[0], &rset)) {
Read(pipefd[0], &c, 1);
Mq_notify(mqd, &sigev);			/* reregister first */
while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");
}
}
exit(0);
}

static void
sig_usr1(int signo)
{
Write(pipefd[1], "", 1);	/* one byte of 0 */
return;
}
//线程版本的信号通知
#include	"unpipc.h"

mqd_t	mqd;
struct mq_attr	attr;
struct sigevent	sigev;

static void	notify_thread(union sigval);		/* our thread function */

int
main(int argc, char **argv)
{
if (argc != 2)
err_quit("usage: mqnotifythread1 <name>");

mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
Mq_getattr(mqd, &attr);

sigev.sigev_notify = SIGEV_THREAD;
sigev.sigev_value.sival_ptr = NULL;
sigev.sigev_notify_function = notify_thread;
sigev.sigev_notify_attributes = NULL;
Mq_notify(mqd, &sigev);

for ( ; ; )
pause();		/* each new thread does everything */

exit(0);
}

static void
notify_thread(union sigval arg)
{
ssize_t	n;
void	*buff;

printf("notify_thread started\n");
buff = Malloc(attr.mq_msgsize);
Mq_notify(mqd, &sigev);			/* reregister */

while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
printf("read %ld bytes\n", (long) n);
}
if (errno != EAGAIN)
err_sys("mq_receive error");

free(buff);
pthread_exit(NULL);
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐