您的位置:首页 > 其它

进程间通讯——消息队列、信号量以及共享内存

2017-11-01 19:07 603 查看
本篇博客见《unix环境高级编程》的进程通信那一节,看到那一节就把这个写到博客里面啦,加深记忆。方便以后复习用……

消息队列,信号量,共享内存这三种我们称为XSI IPC,它们之间有很多相似之处。

在了解这三个之前,我们先来了解标识符和键。

每个内核中的IPC结构(这里指消息队列,信号量和共享存储段)都用一个非负整数的标识符加以引用。例如,为了对一个消息队列发送或读消息时,只需要知道

其队列的标识符。

标识符是IPC对象的内部名。为使多个合作进程能够在同一IPC对象上会和,需要提供一个外部名,因此使用了键(key),每个IPC对象都与一个键相关联,于是键

就用作为该对象的外部名。

无论何时创建IPC结构(调用 msgget、semget 或shmget),都应指定一个键(key),键的数据类型由系统规定为 key_t,通常在头文件<sys/types.h>中被规定为长整型。

键由内核变换成标识符。

三个get函数(msgget、semget和shmget)都有两个类似的参数:一个key和一个整型flag。如若满足下列两个条件之一,则创建一个新的IPC结构(服务器进程创建)

(1)key是IPC_PRIVATE

(2)key当前未与特定类型的IPC结构相结合,并且flag中指定了IPC_CREAT位。

为访问现存的队列,key必须等于创建该队列时所指定的键,并且不应该指定IPC_CREAT。

注意,访问一个现存的队列,决不能指定IPC_PRIVATE作为键。因为这是一个特殊的键值,它总是用于创建一个新队列。为了访问一个用IPC_PRIVATE键创建的现存队列,

一定要知道与该队列相结合的标识符,然后在其他IPC调用中(如msgsnd和msgrcv)使用该标识符。

权限:: XSI IPC权限



接下来我们来看消息队列:

消息队列是消息的链接表,存放在内核中并由消息队列标识符的标识。

每个队列都有一个msqid_ds结构与其相关联:
struct msqid_ds{
struct ipc_perm msg_perm; //ipc结构
msgqnum_t msg_qnum; //队列中消息的数量
msglen_t msg_qbytes; //队列中最大的字节数
pid_t msg_lspid; //最后调用msgsnd的pid
pid_t msg_lrpid; //最后调用msgrcv的pid
time_t msg_stime; //最后调用msgsnd函数的时间
time_t msgrtime; //最后调用msgrcv函数的时间
time_t msg_ctime; //最后change的时间
......
};

消息队列的函数:

#include <sys/msg.h>

(1)msgget用于创建新队列或打开一个现存的队列。

int msgget(key_t key, int flag);

返回值:成功返回消息队列的ID,出错返回-1;此后该值就可被用于其他三个消息队列的函数。

参数:key:键,可以看上面的介绍,这个我们一般给一个非负整数,

flag:权限位

(2)msgctl函数对队列执行多种
4000
操作。

int msgctl(int msqid,int cmd,struct msqid_ds *buf);

返回值:成功返回0,出错-1

参数:msqid:msgget的返回值

cmd


(3)调用msgsnd将数据放到消息队列中。

int msgsnd(int msqid,const void *ptr,size_t nbyte,int flag);成功返回0,出错-1

每个消息都有三部分组成,正长整型类型字段,非负长度(nbyte),以及实际数据字节(对应于长度)。消息总是放在队列尾端。

ptr参数指向一个长整型数,它包含了正的整型消息类型,在其后紧跟消息数据,若发送的最长消息是128字节,则可定义下列结构:

struct mymesg

{

long mtype; //正的整型消息类型

char mtext[128]; //消息数据

};

ptr就是指向mymesg结构的指针。接受者可以使用消息类型以非先进先出的次序去消息。

参数flag的值可以指定为IPC_NOWAIT.这类似于文件I/O的非阻塞I/O标志。一般给0的话,即忽略。

当msgsnd成功返回,与消息队列相关的msqid_ds结构得到更新,以标明发出该调用的进程ID(msg_lspid),

进行该调用的时间(msg_stime),并指示队列中加了一条消息(msg_qnum)

(4)msgrcv从队列中取消息;

ssize_t msgrcv(int msqid,void *ptr,size_t nbytes, long type , int flag);

成功返回消息的数据部分的长度,出错返回-1;

ptr和msgsnd的prt一样;

nbyte:数据缓冲区的长度

type:我们可以指定想要哪一种消息:type == 0 返回队列中的第一个消息。

type > 0 返回队列中消息类型为type的第一个消息

type < 0 返回队列中消息类型值小于或等于type绝对值的消息,如果这种消息有若干个,则取类型值最小的消息。

当msgrcv成功返回,内核更新与消息队列相关的msqid_ds结构,以指示调用者进程ID(msg_lspid),

和调用的时间(msg_stime),并将队列中的消息数减一(msg_qnum)

下面简单代码实现消息队列:

创建消息队列,写入消息:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <sys/msg.h>

struct mymesg  //ptr指针指向的结构
{
long mtype;
char mtext[512];
};

void main()
{
int msgid = msgget((key_t)1234, 0664|IPC_CREAT); //打开消息队列,没有创建的话则创建
assert(msgid != -1);

struct mymesg buf;
buf.mtype = 1000;   //消息类型1000
strcpy(buf.mtext, "hello"); //消息数据hello
msgsnd(msgid, &buf, strlen(buf.mtext), 0); //将数据放到消息队列中

buf.mtype = 2000;  //消息类型2000
strcpy(buf.mtext, "word");//消息数据word
msgsnd(msgid, &buf, strlen(buf.mtext), 0);//将数据放到消息队列中
}


从消息队列中取消息:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <sys/msg.h>

struct node
{
long type;
char buff[128];
};

void main()
{
int msgid = msgget((key_t)1234, 0664|IPC_CREAT);
assert(msgid != -1);

struct node buf;
memset(&buf, 0, sizeof(buf));
msgrcv(msgid, &buf, sizeof(buf) - 1, 2000, 0); //从消息队列中取类型为2000的消息
printf("buf.type: %d\n", buf.type);
printf("buf.buff: %s\n", buf.buff);
}


执行结果:



接下来我们看看信号量::



信号量与其他IPC不同的是,它是一个计数器,用于多进程对共享数据对象的访问。

为了获得共享资源,进程需要执行的操作:

(1)测试控制该资源的信号量。

(2)信号量的值为正时,则进程可以使用该资源。使用了,就将信号量减一(p操作)。

(3)信号量的值为0,则进程进入休眠状态,直至信号量的大于0。进程被唤醒后,返回至第(1)步

当进程不再使用该资源时,信号量的值加一(v操作)。如果有进程正在休眠等待此信号量,则唤醒进程。

信号量的测试及加一减一操作都是原子操作,信号量通常是在内核中实现。

常用的信号量形式被称为二元信号量或双态信号量,控制单个资源,初始值为1,。但是一般而言,信号量的初值可以是任意正数值,这说明

有多少个共享资源单位可供共享应用。

内核为每个信号量集合设置一个semid_ds结构:
struct semid_ds{
struct ipc_perm sem_perm;
unsigned short sem_nsems; //信号量在信号量集中的编号
time_t sem_otime; 最后调用semop()的时间。
time_t sem_ctime; 最后进行change的时间。
....
}
每个信号量由一个无名结构表示,它至少包含下列成员。
struct{
unsigned short semval; //信号量值,>=0
pid_t sempid; //最后使用信号量的pid
unsigned short semcnt; //等待semval变为大于其当前值的线程或进程数
unsigned short semzcnt; //等待semval变成0的线程或进程数
}

XSI的信号量要复杂一些:

(1)信号量并非是单个非负值,而必须将信号量定义为含有一个或多个信号量值的集合。当创建一个信号量时,要指定集合中信号量值的数量。

(2)创建信号量与对其赋初值是分开的。

(3)即使没有进程正在使用各种形式的XSI IPC,它们仍然是存在的。有些程序在终止时并没有释放已经分配给它的信号量。

关于信号量的函数::

(1)要获得一个信号量的ID,要调用的函数semget

#include <sys/sem.h>

int semget(key_t key, int nsems, int flag);

返回值:成功返回信号量的ID,若出错则返回-1

nsems是该集合中的信号量数。如果引用一个现存的集合,则将nsems指定为0。如果是创建新集合,则必须指定nsems

(2)semctl函数包含了多种信号量的操作

int semctl(int semid, int semnum ,int cmd , /* union semun arg */);

第四个参数是可选的,如果使用该参数,则其类型是semun,它是多个特定命令参数的联合(union);

union semun
{
int val; //for SETVAL
struct semid_ds *buf; //for IPC_STAT and IPC_SET
unsigned short *array; //for GETALL and SETALL
};

cmd参数指定下列10种命令的一种, 在semid指定的信号量集合上执行此命令,其中有5条命令是针对一个特定的信号量值

的,他们用semnum指定该信号量集合中的一个成员。semnum值在0和nsems-1之间。(包含0和nsems-1)

IPC_STAT:对此集合取semid_ds结构,并存放在由arg.buff指向的结构中。
IPC_SET:按由arg.buf指向结构中的值设置与此集合相关结构中的下面三个字段值:sem_perm.uid、sem_perm.gid和
sem_perm.mode。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或sem_perm.uid的进程,
另一种是具有超级用户特权的进程。
IPC_RMID:从系统中删除该信号量集合。这种删除是立即发生的。仍在使用此信号集合的其他进程在他们下次试图对此信号量
集合进行操作时,将出错返回EIDRM。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或
sem_perm.uid的进程;另一种是具有超级用户特权的进程。
GETVAL:返回成员semnum的semval值。
SETVAL:设置成员semnum的semval值。该值由arg.val指定。
GETPID:返回成员semnum的sempid值。
GETNCNT:返回成员semnum的semncnt值。
GETZCNT:返回成员semnum的semzcnt值。
GETALL:取该集合中所有信号量的值,并将他们存放在arg.array指向的数组中。
SETALL:按arg.array指向的数组中的值,设置该集合中所有信号量的值。
除GETALL以外的所有GET命令semctl函数都返回相应的值。其他命令返回值为0.

(3)函数semop自动执行信号量集合上的操作数组,这是个原子操作。

int semop(int semid,struct sembuf semoparray[ ] ,size_t nops);

返回值:成功返回0,出错返回-1

参数semoparray是一个指针,它指向一个信号量操作数组,信号量操作由sembuf结构表示:

struct sembuf
{
unsigned short sem_num; //信号集中的成员号(0,1....,nsems-1)
short sem_op; //加减操作
short sem_flg; //IPC_NOWAIT, SEM_UNDO
};
参数nops规定该数组中的操作的数量(元素数)。

简单代码实现::我把信号量封装了一下。

信号量的声明:sem.h文件
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <string.h>
#include <unistd.h>
#include <sys/sem.h>

union semun
{
int val;
};

void seminit(); //初始化
void semp(); //减一操作
void semv(); //加一操作
void semdel(); //删除操作


sem.c文件
#include "sem.h"

int semid = 0;

void seminit() //初始化操作
{
semid = semget((key_t)1234, 1, 0666);
if(semid == -1) //打开现有的失败,创建信号量
{
semid = semget((key_t)1234, 1, 0666 | IPC_CREAT);
if(semid == -1)
{
perror("");
exit(0);
}

union semun v;
v.val = 0; //设定初始值为0

semctl(semid, 0, SETVAL, v);
}
}
void semp() //减一操作
{
struct sembuf  buf;
buf.sem_num = 0;
buf.sem_op = -1;
buf.sem_flg = SEM_UNDO;

semop(semid, &buf, 1);
}
void semv() //加一操作
{
struct sembuf  buf;
buf.sem_num = 0;
buf.sem_op = 1;
buf.sem_flg = SEM_UNDO;

semop(semid, &buf, 1);
}
void semdel() //删除信号量
{
semctl(semid, 0, IPC_RMID, NULL);
}


一个进程给一个文件里面写东西,当写的时候,另一个进程不能读,利用信号量
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <fcntl.h>
#include "sem.h"

void main()
{
int fd = open("a.txt", O_CREAT | O_WRONLY | O_TRUNC, 0664);
assert(fd != -1);

seminit(); //初始化信号量

while(1)
{
char buff[128] = {0};
printf("please input data: ");
fflush(stdout);
fgets(buff, 128, stdin);
buff[strlen(buff) - 1] = 0;
write(fd, buff, strlen(buff));
semv(); //写完给信号量值加一
if(strncmp(buff, "end", 3) == 0)
{
break;
}
}

close(fd);
}


写完了之后,信号量值加一,这时候读的那个进程就可执行了,读之前要对信号量值减一
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <fcntl.h>
#include "sem.h"

void main()
{
int fd = open("a.txt", O_CREAT | O_RDONLY , 0664);
assert(fd != -1);

seminit(); //初始化信号量

while(1)
{
char buff[128] = {0};
int n = 1, flag = 0;
semp(); //减一操作
while(1)
{
n = read(fd, buff, 10);
if(n == 0)
{
break;
}
if(strncmp(buff, "end", 3) == 0)
{
flag = 1;
break;
}
printf("n = %d, buff = %s\n", n, buff);
memset(buff, 0, 128);
}
if(flag)
{
break;
}
}

close(fd);
}


共享存储::

共享存储允许两个或更多进程共享一给定的存储区。因为数据不需要在客户进程和服务器进程之间复制,所以共享存储是进程间通讯最快的一种。

使用共享存储必须是多进程之间同步访问共享存储区

即就是当我有一A进程正在将数据放入共享存储区,这个时候另一B进程是不能从共享存储区中去取数据,直到进程A写这一操作执行完

这里我用了信号量来实现进程的同步,信号量的操作利用上面信号量的代码::

简单代码实现如下:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <sys/shm.h>
#include "sem.h"

void main()
{
int shmid = shmget((key_t)1234, 128, 0664 | IPC_CREAT); //获得一个共享存储标识符
assert(shmid != -1);

char *ptr = (char *)shmat(shmid, NULL, 0); //将共享存储连接到内核分配的一块空间上
assert(ptr != (char *)-1);

seminit
89e3
(); //初始化信号量

while(1)
{
printf("please input: ");
fflush(stdout);
fgets(ptr, 128, stdin);
ptr[strlen(ptr) - 1] = 0;
semv();   //信号量值加一
if(strncmp(ptr, "end", 3) == 0)
{
break;
}
}

shmdt(ptr); //对共享存储的操作已结束
}
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <assert.h>
#include <string.h>
#include <sys/shm.h>
#include "sem.h"

void main()
{
int shmid = shmget((key_t)1234, 128, 0664 | IPC_CREAT);
assert(shmid != -1);

char *ptr = (char *)shmat(shmid, NULL, 0);
assert(ptr != (char *)-1);

seminit();
while(1)
{
semp();
if(strncmp(ptr, "end", 3) == 0)
{
break;
}
printf("s = %s\n", ptr);
}
shmdt(ptr);
shmctl(shmid, IPC_RMID, NULL); //从系统中删除共享存储段
semdel(); //删除信号量
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐