您的位置:首页 > 移动开发 > 微信开发

IPC之信号量·即时通讯小程序(三)

2017-12-25 14:21 281 查看
上次说到解决并发的问题,需要用到信号量。下面,简单复习一下。

信号量

信号量是一种变量,它只能取正整数值,对这些正整数只能进行两种操作:等待和信号。(在我的理解,信号量就是用来访问一些临界资源而设计的)
用两种记号来表示信号量的这两种操作:
P(semaphore variable) 代表等待(请求资源)
V(semaphore variable) 代表信号(释放资源)


信号量的分类

最简单的信号量是一个只能取“0”和“1”值的变量,也就是人们常说的“二进制信号量”

可以取多个正整数值的信号量叫做“通用信号量”

P、V操作

假设我们有一个信号量变量sv,则pv操作的定义如下
P(sv):如果sv的值大于零,就给它减去1;如果sv的值等于零,就挂起该进程的执行
V(sv): 如果有其他进程因等待sv变量而被挂起,就让它恢复执行;如果没有进程因等待sv变量而被挂起,就给它加1

信号量函数

需要用到的函数如下:

#include < sys/types.h>
#include < sys/ipc.h>
#include < sys/sem.h>

int semget(key_t key, int nsems, int semflg);

作用:创建一个新的信号量或者取得一个现有的信号量的关键字
key: 是一个整数值,不相关的进程将通过这个值去访问同一个 信号量
num_sems:需要使用的信号量个数,它几乎总是取值为1
sem_flags:是一组标志,其作用与open函数的各种标志很相似,它低端的九个位是该信号量的权限,其作用相当于文件的访问权限,可以与键值IPC_CREATE做按位的OR操作以创建一个新的信号量
成功时将返回一个正数值,它就是其他信号量函数要用到的那个标识码,如果失败,将返回-1

int semop(int semid, struct sembuf *sops, unsigned nsops);

作用:改变信号量的键值(就是用来执行PV操作的)
semid:是该信号量的标识码,也就是semget函数的返回值
sops:是个指向一个结构数值的指针
nsop:进行操作信号量的个数,即sops结构变量的个数,需大于或等于1。最常见设置此值等于1,只完成对一个信号量的操作
Semop调用的一切动作都是一次性完成的,这是为了避免出现因使用了多个信号量而可能发生的竞争现象
其中,sembuf的结构体如下:



sem_num是信号量的编号,如果你的工作不需要使用一组信号量,这个值一般就取为0。
sem_op是信号量一次PV操作时加减的数值,一般只会用到两个值,一个是“-1”,也就是P操作,等待信号量变得可用;另一个是“+1”,也就是我们的V操作,发出信号量已经变得可用
sem_flag通常被设置为SEM_UNDO.她将使操作系统跟踪当前进程对该信号量的修改情况

int semctl(int semid, int semnum, int cmd, ...);

sem_id: 是由semget函数返回的一个信号量标识码
sem_num: 信号量的编号,如果在工作中需要使用到成组的信号量,就要用到这个编号;它一般取值为0,表示这是第一个也是唯一的信号量
comman:将要采取的操作动作
如果还有第四个参数,那它将是一个“union semun”复合结构.
先说一下cmd,有以下几个:







其中,比较常用的是以下两个:

SETVAL:用来把信号量初始化为一个已知的值,这个值在semun结构里是以val成员的面目传递的。

IPC_RMID:删除一个已经没有人继续使用的信号量标识码
再说一下第四个参数,这个函数比较特别,当有四个参数时,是一个“union semun”复合结构.(头文件里可能没有,最好自己重新定义一下

union semun {
int              val;    /* Value for SETVAL */
struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
unsigned short  *array;  /* Array for GETALL, SETALL */
struct seminfo  *__buf;  /* Buffer for IPC_INFO
(Linux-specific) */
};

案例分析

做个小练习,租赁汽车例子。
共5个厂,每个厂分别有汽车数:3 7 5 0 6。考虑租车问题(访问临界资源)
factory.c:

#include < stdio.h>
#include < sys/types.h>
#include < sys/ipc.h>
#include < sys/sem.h>

/*
租赁汽车例子
5个厂,每个厂分别有汽车数:3 7 5 0 6
=>信号量个数 = 5 , 信号量的值分别为:3 7 5 0 6 ,信号量编号从0开始
*/
union semun {
int              val;    /* Value for SETVAL */
struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
unsigned short  *array;  /* Array for GETALL, SETALL */
struct seminfo  *__buf;  /* Buffer for IPC_INFO
(Linux-specific) */
};

int main()
{
int i;
int sem_id;
int values[5] = {3,7,5,0,6};
union semun value = {0};
//创建信号量
sem_id = semget(1000,5,0);
if(sem_id != -1)
{
for(i = 0 ;i < 5;i++)
{
semctl(sem_id , i , IPC_RMID);
}

}
sem_id = semget(1000,5,IPC_CREAT);
printf("%d\n",sem_id);
if(sem_id == -1)
{
perror("producer semget");
return -1;
}
//设置信号量
for(i = 0 ; i < 5 ; i++)
{
printf("%d ",semctl(sem_id,i,GETVAL));
}
printf("\n");
for(i = 0 ;i < 5; i++)
{
value.val = values[i];
//打印信号量
semctl(sem_id,i,SETVAL,value.val);
}
for(i = 0 ; i < 5 ; i++)
{
printf("%d ",semctl(sem_id,i,GETVAL));
}

return 0;
}

client.c:

#include < stdio.h>
#include < sys/types.h>
#include < sys/ipc.h>
#include < sys/sem.h>

/*
struct sembuf
{
unsigned short sem_num;        // semaphore number
short          sem_op;             // semaphore operation
short          sem_flg;            // operation flags

Flags recognized  in  sem_flg  are
IPC_NOWAIT  and   SEM_UNDO.
*/

int main()
{
int sem_id;
struct sembuf buf = {0};
//打开信号量
sem_id = semget(1000,5,0);
if(sem_id == -1)
{
perror("consumer semget");
return -1;
}
printf("waiting...\n");
//借车
buf.sem_num = 0;    //信号量编号
buf.sem_op = -1;    //请求资源
buf.sem_flg = SEM_UNDO; //自动释放
semop(sem_id,&buf,1); //第三个参数必须是1
printf("get a car.\n");

//正在借
sleep(20);

//还车
buf.sem_op = +1;    //释放资源
semop(sem_id,&buf,1);
printf("back a car.\n");

return 0;
}

运行结果:
当执行第三个客户端的时候,会一直等待,知道第一辆车还了之后。

生产者、消费者问题

学习信号量最经典的几个问题,都可以拿来练练。生产者消费者问题、读者写者问题、哲学家进餐问题等都是一样的。
要求:产品数量初始为0,生产者负责生产,即+1,消费者负责消费,即-1。仓库最多只能存放10个产品
下面给出生产者消费者问题的代码:
(我使用的是两个信号量,通常课本上会使用一个信号量,可以自己试试)
public.c:

#ifndef  _PUBLIC_H_
#define  _PUBLIC_H_

#include < stdio.h>
#include < sys/types.h>
#include < sys/ipc.h>
#include < sys/sem.h>

unsigned short values[2] = {10,0};

union semun {
int              val;    /* Value for SETVAL */
struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
unsigned short  *array;  /* Array for GETALL, SETALL */
struct seminfo  *__buf;  /* Buffer for IPC_INFO
(Linux-specific) */
};

#endif

生产者

#include " public.h"

//信号量:控制进程间同步问题,共同的资源,资源的数量(信号量的值)>=0。
//P等待/请求 -1,V信号/释放,+1
//semget(创建、打开)\semctl(删除、初始化)、semop(PV)
int main()
{

int semid;
//两个信号量,一个生产,初始化10,一个消费,初始化为0
//生产了一件商品后,生产者(第零个信号量-1),消费者(第一个信号量+1)
//unsigned short values[2] = {10,0};

union semun sem = {0};
struct sembuf buf1 = {0};
struct sembuf buf2 = {0};

//1.创建信号量
semid = semget(1000,0,0);
if(semid == -1)
{
semid = semget(1000,2,IPC_CREAT);
if(semid == -1)
{
perror("semget open.");
return 1;
}
}

//2.信号量初始化
sem.array = values;
semctl(semid,0,SETALL,sem);

//3.pv操作
while(1)
{
//生产,信号量[0]减1 buf1
buf1.sem_num = 0;
buf1.sem_op = -1;
buf1.sem_flg = SEM_UNDO;
//      printf("can produce?\n");
semop(semid,&buf1,1);//最后一个参数,只要>0就行
//      printf("yes.\n");

sleep(1);
//生产完一件产品,信号量[1]加1 buf2
buf2.sem_num = 1;
buf2.sem_op = +1;
buf2.sem_flg = SEM_UNDO;
semop(semid,&buf2,1);
printf("finish produce.now: %d\n",values[1]);
//      printf("all product: %d\n",sem.array[1]);
}
return 0;
}

消费者

#include " public.h"

int main()
{
int semid;
struct sembuf buf1 = {0};
struct sembuf buf2 = {0};

semid = semget(1000,0,0);
if(semid == -1)
{
perror("semget open");
return 1;
}

//负责消费
while(1)
{
//能否消费 信号量[1] buf2
buf2.sem_num = 1;
buf2.sem_op = -1;
buf2.sem_flg = SEM_UNDO;
semop(semid,&buf2,1);

//消费
sleep(3);

//信号量[0] buf1
buf1.sem_num = 0;
buf1.sem_op = +1;
buf1.sem_flg = SEM_UNDO;
semop(semid,&buf1,1);
printf("consume one product.\n");
}
}


运行结果:
先运行./producer。 当生产到10个产品之后,就会阻塞。此时运行./consumer,每3s取走一个产品,1s后生产者会生产一个产品。

即时通讯小程序

现在,可以继续我们的小程序了。有了以上两个练习之后,就更容易理解了。跟生产者消费者问题很类似。
我们为读写设置两个信号量,一个控制读,一个控制写。写信号量初始化为1,写完后-1,变成0。读信号量初始化为0,写操作后,变成在线用户数-1。如果所有用户都读完了,读信号量为0,写信号量为1。
public.h:

#ifndef _PUBLIC_H_
#define _PUBLIC_H_

#include < stdio.h>
#include < string.h>
#include < sys/types.h>
#include < sys/ipc.h>
#include < sys/msg.h>
#include < sys/shm.h>
#include < signal.h>
#include < sys/sem.h>
#include < string>
#include < map>
#include < iostream>
using namespace std;

//用户信息结构体
typedef struct user_t
{
pid_t pid;
char uname[10]; //后面加上用户名不重名、密码验证
}USER_T;

//登录消息队列结构体
typedef struct login_msg_t
{
long type;
USER_T user;
}LMSG_T;

//聊天消息结构体
typedef struct msg_t
{
USER_T user;
char acMsg[100];
}MSG_T;

//消息队列:用户登录
#define LOGIN_TYPE          1
#define EXIT_TYPE           2
#define MSG_KEY             1000
#define MSG_SIZE            sizeof(LMSG_T)-sizeof(long)

//共享内存:用户列表(空闲块:0-空闲,1-占用)
#define SHM_USER_KEY        1001
#define MAX_USER            100
#define SHM_USER_SIZE       MAX_USER + MAX_USER * sizeof(USER_T)

//共享内存:聊天内容
#define SHM_MSG_KEY         1002
#define SHM_MSG_SIZE        sizeof(MSG_T)

//信号:更新用户列表,读消息
#define SIGNAL_USERS        34
#define SIGNAL_CHAT         35

//读写信号量
#define SEM_KEY             1003

union semun {
int              val;    /* Value for SETVAL */
struct semid_ds *buf;    /* Buffer for IPC_STAT, IPC_SET */
unsigned short  *array;  /* Array for GETALL, SETALL */
struct seminfo  *__buf;  /* Buffer for IPC_INFO
(Linux-specific) */
};
//两个信号量,一个控制读,一个控制写
//写信号量初始化为1,写完后-1,变成0
//读信号量初始化为0,写操作后,变成在线用户数-1
//如果所有用户都读完了,读信号量为0,写信号量为1
union semun sem = {0};
struct sembuf buf1 = {0};   //写
struct sembuf buf2 = {0};   //读
#endif

server.cpp:

#include " public.h"

int main()
{
int msg_id;
int shm_id;
LMSG_T loginMsg = {0};
char *userAddr;
USER_T *pUser;  //用户真正写入的地址
map<int,string> userMap;    //用户列表
map<int,string>::iterator it;
int i;

/*1、创建消息队列:用户登录*/
msg_id = msgget(MSG_KEY,0);
if(msg_id == -1)
{
msg_id = msgget(MSG_KEY,IPC_CREAT);
if (msg_id == -1)
{
perror("server msgget");
return -1;
}
}

/*2、创建共享内存:用户列表*/
shm_id = shmget(SHM_USER_KEY,0,0);
if (shm_id != -1)
{//已经存在,删除
shmctl(shm_id,IPC_RMID,NULL);
}
shm_id = shmget(SHM_USER_KEY,SHM_USER_SIZE,IPC_CREAT);
userAddr = (char *)shmat(shm_id,NULL,0);//映射
pUser = (USER_T *)(userAddr + MAX_USER);
memset(userAddr,0,SHM_USER_SIZE);//初始化

/*3、创建共享内存:聊天信息*/
int shm_msg_id = shmget(SHM_MSG_KEY,0,0);
if (shm_msg_id != -1)
{
shmctl(shm_msg_id,IPC_RMID,NULL);
}
shm_msg_id = shmget(SHM_MSG_KEY,SHM_MSG_SIZE,IPC_CREAT);
char *msgAddr = (char *)shmat(shm_msg_id,NULL,0);
memset(msgAddr,0,SHM_MSG_SIZE);

/*4、创建信号量*/
int sem_id;
sem_id = semget(SEM_KEY,0,0);
if (sem_id != -1)
{
semctl(sem_id,0,IPC_RMID);
semctl(sem_id,1,IPC_RMID);
}
sem_id = semget(SEM_KEY,2,IPC_CREAT);
//初始化信号量的值
sem.val = 1;
semctl(sem_id,0,SETVAL,sem);    //写
sem.val = 0;
semctl(sem_id,1,SETVAL,sem);    //读

//一直监听,是否有用户上线
while (1)
{
memset(&loginMsg,0,sizeof(LMSG_T));
msgrcv(msg_id,&loginMsg,MSG_SIZE,0,0);  //任何消息都接收
switch(loginMsg.type)
{
case LOGIN_TYPE:
//登录
cout<<"client "<<loginMsg.user.uname<<":"<<loginMsg.user.pid<<" is logining..."<<endl;
//2.1 将登录信息写入共享内存(先找到空闲块)
for (i = 0 ; i < MAX_USER ; i++)
{
if (*(userAddr + i) == 0)
{
//空闲
break;
}
}
if (i < MAX_USER)
{
*(userAddr + i) = 1;
*(pUser + i) = loginMsg.user;
userMap.insert( pair<int,string>(loginMsg.user.pid,loginMsg.user.uname) );
}
else
{
cout<<"online users are full.\n"<<endl;
return 1;
}
//2.2 发消息通知所有在线用户
for (it = userMap.begin();it != userMap.end();it++)
{
kill((*it).first,SIGNAL_USERS);
}

break;
case EXIT_TYPE:
//退出
cout<<"client "<<loginMsg.user.uname<<":"<<loginMsg.user.pid<<" is exiting..."<<endl;
for (i = 0 ; i < MAX_USER ; i++)
{
if ((pUser+i)->pid == loginMsg.user.pid)
{
*(userAddr+i) = 0;
break;
}
}
for (it = userMap.begin();it != userMap.end();it++)
{
if ((*it).first == loginMsg.user.pid)
{
continue;   //自己退出,不用再通知自己
}
kill((*it).first,SIGNAL_USERS);
}
break;
}

}

return 0;
}

client.cpp:

#include " public.h"

char *userAddr;
USER_T *pUser;

char *msgAddr;
MSG_T *pMsg;

map<int,string> userMap;    //用户列表
map<int,string>::iterator it;

int sem_id;

void PrtUserList(int sig_no)
{
//读取共享内存里的用户列表数据
userMap.clear();
cout<<"==== online users ===="<<endl;
for (int i = 0 ;i < MAX_USER ; i++)
{
if(*(userAddr + i) == 1)
{
cout<<(pUser + i)->uname<<endl;
userMap.insert(pair<int,string>( (pUser+i)->pid, (pUser+i)->uname ));
}
}
cout<<"========= "<<userMap.size()<<" ========="<<endl;
}

void GetChatMsg(int sig_no)
{
//读取共享内存里的聊天内容
sleep(10);
MSG_T msg = {0};
msg = *pMsg;
cout<<"receive msg from "<<msg.user.uname<<" : "<<msg.acMsg<<endl;

// 4.3 读完之后:读信号量-1,读到最后一个用户,读信号量为0,写信号量再设为1
buf2.sem_num = 1;
buf2.sem_op = -1;
buf2.sem_flg = SEM_UNDO;
semop(sem_id,&buf2,1);

if (semctl(sem_id,1,GETVAL) == 0) //读信号量为0
{
//      printf("done.\n");
sem.val = 1;
semctl(sem_id,0,SETVAL,sem);
}
}

int main()
{
char acOrder[20] = "";
int msg_id;
LMSG_T loginMsg = {0};
char uname[10] = "";
int shm_id;
char toWho[10] = "";    //聊天对象
MSG_T msg = {0};    //聊天消息结构体
char acMsg[100] = "";   //聊天内容

cout<<"------------onlineChat-------------"<<endl;
cout<<"username:";
cin>>uname;

//2.3 注册消息(放在最前面)
signal(SIGNAL_USERS,PrtUserList);
signal(SIGNAL_CHAT,GetChatMsg);

/*2、打开用户列表共享内存(要比写消息队列早)*/
shm_id = shmget(SHM_USER_KEY,0,0);
if (shm_id == -1)
{
perror("client userlist shmget");
return -1;
}
userAddr = (char*)shmat(shm_id,NULL,0);
pUser = (USER_T*)(userAddr + MAX_USER);

/*3、打开聊天信息共享内存*/
int shm_msg_id = shmget(SHM_MSG_KEY,0,0);
if (shm_msg_id == -1)
{
perror("client chatmsg shmget");
return -1;
}
msgAddr = (char *)shmat(shm_msg_id,NULL,0);
pMsg = (MSG_T *)msgAddr;

/*4、打开信号量*/
sem_id = semget(SEM_KEY,0,0);
if (sem_id == -1)
{
perror("client semget");
return -1;
}

/*1、打开消息队列*/
msg_id = msgget(MSG_KEY,0);
if(msg_id == -1)
{
perror("client msgget");
return -1;
}
//登录,写消息队列
loginMsg.type = LOGIN_TYPE;     //设置登录的消息类型为1
loginMsg.user.pid = getpid();
memcpy(loginMsg.user.uname,uname,strlen(uname));
cout<<loginMsg.user.uname<<" is logining..."<<endl;
msgsnd(msg_id,&loginMsg,MSG_SIZE,0);

//等待写
while(1)
{
putchar('#');
fflush(stdout);
scanf("%s",acOrder);
if (strcmp(acOrder,"exit") == 0)    //退出
{
cout<<loginMsg.user.uname<<" is exiting..."<<endl;
loginMsg.type = EXIT_TYPE;      //设置退出的消息类型为2
msgsnd(msg_id,&loginMsg,MSG_SIZE,0);
break;
}
else if (strcmp(acOrder,"users") == 0)  //查看在线用户列表
{
kill(getpid(),SIGNAL_USERS);
}
else if (strcmp(acOrder,"chat") == 0)   //进入聊天模式
{
cout<<"to who: ";
cin>>toWho;
cout<<"say: ";
memset(acMsg,0,100);
cin>>acMsg;

// 4.1 写之前:P(等待/请求)操作,写信号量-1
buf1.sem_num = 0;
buf1.sem_op = -1;
buf1.sem_flg = SEM_UNDO;
semop(sem_id,&buf1,1);

// 3.1 把聊天内容写进共享内存
memcpy(msg.acMsg,acMsg,strlen(acMsg));
msg.user = loginMsg.user;
memcpy(msgAddr,&msg,SHM_MSG_SIZE);

if (strcmp(toWho,"all") == 0)   //群聊
{

// 4.2 写之后:设置读信号量为在线用户数-1
sem.val = userMap.size() - 1;
semctl(sem_id,1,SETVAL,sem);

//通知所有人去读
for (it = userMap.begin();it != userMap.end();it++)
{
if ((*it).first != getpid())
{
kill((*it).first,SIGNAL_CHAT);
}
}
}
else    //私聊
{
for (it = userMap.begin();it != userMap.end();it++)
{
if (strcmp((*it).second.c_str() , toWho) == 0)
{
kill((*it).first,SIGNAL_CHAT);
break;
}
}
}
}
memset(acOrder,0,sizeof(acOrder));
}

//解除映射
shmdt(&userAddr);
shmdt(&msgAddr);

return 0;
}

运行结果:
运行结果就不演示了,此时,再sleep(),模拟并发的情况,就不会出现上次的问题了,毕竟此时的聊天内容的共享内存已经变成临界资源了,用新号良控制之后,就不会有同时读写的问题了。
未完待续....
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息