您的位置:首页 > 其它

操作系统课程设计 消息缓冲队列通信

2016-01-09 20:18 453 查看
消息缓冲队列通信机制其基本思想是根据“生产者——消费者”原理,利用内存中公用消息缓冲区实现进程间的信息交换。

在这种通信机制中,首先需要在内存中开辟若干空闲消息缓冲区,用以存放要通信的消息。每当一个进程需要向另一个进程发送消息时,便向系统申请一个空闲消息缓冲区,并把已准备好的消息复制到该缓冲区,然后把该消息缓冲区插入到接收进程的消息队列中,最后通知接收进程。接收进程接收到发送进程发来的通知后,从本进程的消息队列中摘下一消息缓冲区,取出其中的信息,然后把消息缓冲区作为空闲消息缓冲区归还给系统。系统负责管理公用消息缓冲区以及消息的传递。

// 消息缓冲队列
// 2016.1.7

#include <stdlib.h>
#include <dos.h>
#include <stdio.h>

#define GET_INDOS 0x34         /* 34H 系统功能调用    */
#define GET_CRIT_ERR 0x5d06    /* 5D06H号系统功能调用 */

#define BLANK -1
#define FINISHED 0        /*     终止    */
#define RUNNING 1         /*   执行  */
#define READY 2           /*   就绪  */
#define BLOCKED 3         /*   阻塞  */
#define NTCB 3            /* 系统线程的最大数 */

#define TL 10             /* 时间片大小  */
#define NBUF 2            /* 消息缓冲区数目 */
#define NTEXT 50          /* 文本输出大小  */

char far* intdos_ptr=0;
char far* crit_err_ptr=0;
int timecount=0;
int current=-1;

typedef unsigned int UINT16;

typedef struct/* 信号量 */
{
int value;
struct TCB* wq;
}semaphore;

semaphore mutexfb={1,NULL};        // freebuf 互斥变量     初值 1
semaphore sfb={2,NULL};            // 计数信号量
semaphore bufferSem1, bufferSem2;

// 消息缓冲区
// 空闲缓冲队列 freebuf(临界资源)
struct buffer
{
int sender;             /*消息发送者的标识数 */
int size;               /* 消息长度<=NTEXT 个字节   */
char text[NTEXT];       /* 消息正文   */
struct buffer* next;    /* 指向下一个消息缓冲区的指针 */
} *freebuf;

/* 线程控制块  */
struct TCB
{
unsigned char* stack;          /* 堆栈的起始地址  */
unsigned ss;
unsigned sp;            /* 堆栈段址和堆栈指针 */
char state;             /* 进程状态   */
char name[10];          /* 线程的外部标识符  */
int value;              /*优先级*/
struct TCB* next;       /* 指向控制快指针  */
struct buffer* mq;      /* 消息缓冲队列首指针  */
semaphore mutex;        /* 互斥信号量   */
semaphore sm;           /* 消息缓冲队列计数信号量*/
}tcb[NTCB];

/* 堆栈现场保护和恢复结构体 */
struct int_regs
{
unsigned BP,DI,SI,DS,ES,DX,CX,BX,AX,IP,CS,Flags,off,seg;
};

typedef int(far* codeptr)(void);
void interrupt(*old_int8)(void);
int DosBusy(void);
void InitIndos(void);
void InitTcb();
void interrupt new_int8(void);
void interrupt swtch();
void send(char *receiver,char *a,int size);
int receive(char *sender,char *a);
void p(semaphore *sem);
void v(semaphore *sem);
int Create(char* name,codeptr code,int stacklen,int prio);  /* 创建线程 */
void Destroy(int i);

// 1#线程
void f1()
{

while(1)
{
p(&bufferSem1);

send("f2","f1 send message to f2",NTEXT);

printf("f1 sending!\n");

v(&bufferSem2);
}
}

// 2#线程
void f2()
{
char a[NTEXT];

while(1)
{
p(&bufferSem2);

receive("f1",a);

printf("f2 receiving!\n");

v(&bufferSem1);
}
}

void InitInDos()      /* 取得INDOS标志和严重错误标志地址 */
{
union REGS regs;
struct SREGS segregs;

regs.h.ah=GET_INDOS;      /* 使用34H号系统功能调用 */
intdosx(®s,®s,&segregs);

intdos_ptr=MK_FP(segregs.es,regs.x.bx);
if(_osmajor<3)
crit_err_ptr=intdos_ptr+1;      /* 严重错误在INDOS后一字节处 */
else if(_osmajor==3&&_osminor==0)
crit_err_ptr=intdos_ptr-1;      /* 严重错误在INDOS前一字节处 */
else
{
regs.x.ax=GET_CRIT_ERR;
intdosx(®s,®s,&segregs);
crit_err_ptr=MK_FP(segregs.ds,regs.x.si);
}
}

int DosBusy(void)            /* 判断DOS是否忙 */
{
if(intdos_ptr&&crit_err_ptr)
return(*intdos_ptr||*crit_err_ptr);  /* DOS忙,返回严重错误标志 */
else
return(-1);         /* DOS不忙 */
}

void InitTcb()           /* 初始化线程 */
{
int i;

for(i=0;i<NTCB;i++)
{
tcb[i].state=BLANK;       /* 初始状态标志   */
tcb[i].mq=NULL;
tcb[i].mutex.value=1;
tcb[i].mutex.wq=NULL;
tcb[i].sm.value=0;
tcb[i].sm.wq=NULL;
}
}

void Destroy(int i)
{

if(tcb[i].state==RUNNING)
{
disable();
tcb[i].state=FINISHED;
strcpy(tcb[i].name,NULL);
free(tcb[i].stack);
tcb[i].ss=0;
tcb[i].sp=0;
enable();
}

}

void over()
{
Destroy(current);
swtch();
}

int Create(char *name,codeptr code,int stacklen,int value)
{
int i;
char *p;
struct int_regs *pt;
unsigned int *pp;

for(i=1;i<NTCB;i++)
{
if(tcb[i].state==BLANK||tcb[i].state==FINISHED)
break;
}
if(i==NTCB)
return-1;

tcb[i].value=value;
strcpy(tcb[i].name,name);
tcb[i].stack=(p=(unsigned char*)malloc(stacklen));
memset(tcb[i].stack, 0xff, stacklen);
p=p+stacklen;

#if 0
pt=(struct int_regs*)p;
pt--;
pt->Flags=0x200;
pt->CS=FP_SEG(code);
pt->IP=FP_OFF(code);

pt->off=FP_OFF(over);
pt->seg=FP_SEG(over);
pt->DS=_DS;
pt->ES=_ES;
tcb[i].sp=FP_OFF(pt);
tcb[i].ss=FP_SEG(pt);
#else if
/*
pp=(UINT16 *)(p-2);
*(pp)=FP_SEG(over);
*(pp-1)=FP_OFF(over);
*(pp-2)=0x200;
*(pp-3)=FP_SEG(code);
*(pp-4)=FP_OFF(code);

*(pp-9)=_ES;
*(pp-10)=_DS;
tcb[i].sp=FP_OFF(pp-13);
tcb[i].ss=FP_SEG(pp-13);
*/

*(p-1)=(FP_SEG(over)&0xff00)>>8;
*(p-2)=FP_SEG(over)&0x00ff;

*(p-3)=(FP_OFF(over)&0xff00)>>8;
*(p-4)=FP_OFF(over)&0x00ff;

*(p-5)=0x02;
*(p-6)=0x00;

*(p-7)=(FP_SEG(code)&0xff00)>>8;
*(p-8)=FP_SEG(code)&0x00ff;

*(p-9)=(FP_OFF(code)&0xff00)>>8;
*(p-10)=FP_OFF(code)&0x00ff;

*(p-19)=(_ES&0xff00)>>8;
*(p-20)=_ES&0x00ff;

*(p-21)=(_DS&0xff00)>>8;
*(p-22)=_DS&0x00ff;

tcb[i].sp=FP_OFF((UINT16 *)(p-28));
tcb[i].ss=FP_SEG((UINT16 *)(p-28));

#endif

tcb[i].state=READY;

return i;
}

void tcb_state()        /* 线程状态信息 */
{
int i;

for(i=0;i<NTCB;i++)
if(tcb[i].state!=BLANK)
{
switch(tcb[i].state)
{
case FINISHED:
printf("\ntcb[%d] is FINISHED\n",i);
break;

case RUNNING:
printf("tcb[%d] is RUNNING\n",i);
break;
case READY:
printf("tcb[%d] is READY\n",i);
break;
case BLOCKED:
printf("tcb[%d] is BLOCKED\n",i);

break;
}
}
}

int all_finished()
{
int i;

for(i=1;i<NTCB;i++)
if(tcb[i].state==RUNNING||tcb[i].state==BLOCKED||tcb[i].state==READY)
return 0;

return 1;
}

int Find()
{
int i,j;
i=current;

while(tcb[i=((i+1)%NTCB)].state!=READY||i==current);

return i;
}

void interrupt new_int8(void)       /* CPU 调度*/
{
int i;

(*old_int8)();      /* 指向原来时钟中断处理过程入口的中断处理函数指针 */
timecount++;

if(timecount==TL)        /* 时间片是否到? */
{
if(!DosBusy())     /* DOS是否忙? */
{
disable();

tcb[current].ss=_SS;     /* 保存现场 */
tcb[current].sp=_SP;

if(tcb[current].state==RUNNING)
tcb[current].state=READY;

i=Find();
if(i<0)
return;

_SS=tcb[i].ss;
_SP=tcb[i].sp;
tcb[i].state=RUNNING;
current=i;
timecount=0;      /* 重新计时 */

enable();
}
else
return;
}
else
return;
}

void interrupt swtch()            /* 其他原因CPU调度  */
{
int i;

if(tcb[current].state!=FINISHED
&¤t!=0&&tcb[current].state!=BLOCKED) /* 当前线程还没结束 */
return;

i=Find();
if(i<0)
return;

disable();
tcb[current].ss=_SS;
tcb[current].sp=_SP;

if(tcb[current].state==RUNNING)
tcb[current].state=READY;      /* 放入就绪队列中 */

_SS=tcb[i].ss;
_SP=tcb[i].sp;        /* 保存现场 */

tcb[i].state=RUNNING;
current=i;
enable();
}

void block(struct TCB **p)         /* 阻塞原语 */
{
struct TCB *pp;

tcb[current].state=BLOCKED;

if((*p)==NULL)
*p=&tcb[current];    /* 阻塞队列空,直接放入 */
else
{
pp=*p;
while(pp->next)
pp=pp->next;         /* 找到阻塞队列最后一个节点 */

pp->next=&tcb[current];      /* 放入阻塞队列 */
}
tcb[current].next=NULL;
swtch();       /* 重新进行CPU调度 */
}

void wakeup_first(struct TCB **p)    /* 唤醒队首线程 */
{
struct TCB *pl;

if((*p)==NULL)
return;

pl=(*p);
(*p)=(*p)->next;     /* 得到阻塞队列队首线程 */
pl->state=READY;        /* 修为就绪状态 */
pl->next=NULL;
}

void p(semaphore *sem)
{
struct TCB **qp;

disable();
sem->value=sem->value-1;

if(sem->value<0)
{
qp=&(sem->wq);
block(qp);
}
enable();
}

void v(semaphore*sem)
{
struct TCB **qp;

disable();
qp=&(sem->wq);
sem->value=sem->value+1;

if(sem->value>=0)
wakeup_first(qp);

enable();
}

///////////////////////////////////////////////////////////////////////////////
// buffer
struct buffer*Initbuf(void)
{
struct buffer *p,*pt,*pt2;
int i;

pt2=pt=(struct buffer*)malloc(sizeof(struct buffer));
pt->sender=-1;
pt->size=0;
strcmp(pt->text,"");
pt->next=NULL;

for(i=0;i<NBUF-1;i++)
{
p=(struct buffer*)malloc(sizeof(struct buffer));
p->sender=-1;
p->size=0;
p->text[NTEXT]='\0';
p->next=NULL;
pt2->next=p;
pt2=p;
}

return pt;
}

// 从空闲消息缓冲队列队头上取下一缓空闲消息冲区
struct buffer* getbuf(void)
{
struct buffer *buf;

buf=freebuf;        /* 取得缓冲队列的缓冲区*/
freebuf=freebuf->next;

return(buf);        /* 返回指向该缓冲区的指针 */
}

// 将buff所指的缓冲区插到*mq所指的缓冲队列末尾
void insert(struct buffer **mq, struct buffer *buff)
{
struct buffer *temp;

if(buff==NULL)
return;       /* buff为空 */

buff->next=NULL;
if(*mq==NULL)     /* *mq为空 则直接插入*/
*mq=buff;
else
{
temp=*mq;
while(temp->next)      /* 找到队尾 */
temp=temp->next;

temp->next=buff;
}
}

// 将地址a开始的size个字节发送给外部标识符为receiver的线程
void send(char *receiver,char *a, int size)
{
struct buffer *buff;
int i,id=-1;

disable();
for(i=0;i<NTCB;i++)
{
if(strcmp(receiver,tcb[i].name)==0)
{
id=i;
break;
}
}

if(id==-1)
{
printf("Error:Receiver not exist!\n");
enable();
return;
}

p(&sfb);

p(&mutexfb);
buff=getbuf();
v(&mutexfb);

buff->sender=current;
buff->size=size;
buff->next=NULL;

for(i=0;i<buff->size;i++,a++)
buff->text[i]=*a;

// 将要发送的消息放到接收者TCB的buffer中
p(&tcb[id].mutex);
insert(&(tcb[id].mq),buff);
v(&tcb[id].mutex);

// 用于同步
v(&tcb[id].sm);
enable();
}

//////////////////////////////////////////////////////////////////////////////////////////////
// 获取消息缓冲区函数
struct buffer *remov(struct buffer **mq, int sender)
{
struct buffer *buff, *p, *q;
q = NULL;
p = *mq;

// 在消息缓冲区队列中找到其他进程发送给自己的消息
while((p->next != NULL) && (p->sender != sender))
{
q = p;
p = p->next;
}

// 获取消息后从队列中删除,防止重复接收
if(p->sender == sender)
{
buff = p;
if(q == NULL)
*mq = buff->next;
else
q->next = buff->next;

buff->next = NULL;
return buff;
}
else
return NULL;
}

// 接收原语
int receive(char *sender, char *b)
{
int i, id = -1;
struct buffer *buff;

disable();

// 寻找 sender
for(i = 0; i < NBUF; i++)
{
if(strcmp(sender, tcb[i].name) == 0)
{
id = i;
break;
}
}

if(id == -1)
{
enable();
return -1;
}

p(&tcb[current].sm);

p(&tcb[current].mutex);
buff = remov(&(tcb[current].mq), id);
v(&tcb[current].mutex);

if(buff == NULL)
{
v(&tcb[current].sm);
enable();
return -1;
}
// 将消息正文复制到接收区
strcpy(b, buff->text);

// 释放前先把标识去掉,防止重复接收
buff->sender = -1;
// 释放相应的消息缓冲区
p(&mutexfb);
insert(&freebuf, buff);
v(&mutexfb);

v(&sfb);

enable();

return buff->size;
}

void main()
{
long i, j, k;

bufferSem1.value = 1;
bufferSem1.wq = NULL;

bufferSem2.value = 0;
bufferSem2.wq = NULL;

InitInDos();
InitTcb();

freebuf=Initbuf();
old_int8=getvect(8);

strcpy(tcb[0].name,"main");
tcb[0].state=RUNNING;
tcb[0].value=0;
current=0;

Create("f1",(codeptr)f1,1024,5);
Create("f2",(codeptr)f2,1024,6);

tcb_state();
setvect(8,new_int8);

while(!all_finished());
{

printf("running!\n");

}

tcb[0].name[0]='\0';
tcb[0].state=FINISHED;
setvect(8,old_int8);

tcb_state();

printf("\n Muli_task system teminated \n");
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: