您的位置:首页 > 其它

POSIX多线程程序设计 工作流例程2

2016-03-25 16:13 218 查看
之前的例程有说到超出工作单元数量的输入数据会使得整个进程进入死锁状态,然后改进了一下,让用户可以选择是获取数据再加入新的数据还是放弃加入新数据。

还有一点就是,我认为pipe_t中的mutex锁是多余的,因为其他线程只会修改工作单元内部的数据,而并不会影响工作流pipe链表的结构,而更改pipe_t的数据的只有pipe_result和pipe_start这两个函数,但是这两个函数只会在主线程中被调用,所以他们的运行是串行的,并不会发生线程上的冲突。

我试着将所有锁定,解锁pipe的mutex的代码都注释掉,测试来很多遍目前还没有发现问题。

#include<pthread.h>
#include "errors.h"

typedef struct stage_tag{
pthread_mutex_t 	mutex;
pthread_cond_t 		avail;
pthread_cond_t 		ready;
int 			data_ready;
long 			data;
pthread_t 		thread;
struct stage_tag 	*next;
}stage_t;

typedef struct pipe_tag{
//pthread_mutex_t	mutex;
stage_t		*head;
stage_t		*tail;
int 		stages;
int 		active;
}pipe_t;

int pipe_send(stage_t *stage,long data){
int status;
status = pthread_mutex_lock(&stage->mutex);
if(status != 0)
return status;
while(stage->data_ready){
status = pthread_cond_wait(&stage->ready,&stage->mutex);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
}
stage->data = data;
stage->data_ready = 1;
status = pthread_cond_signal(&stage->avail);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
status = pthread_mutex_unlock(&stage->mutex);
return status;
}

void *pipe_stage(void *arg){
stage_t *stage = (stage_t*)arg;
stage_t *next_stage = stage->next;
int status;

status = pthread_mutex_lock(&stage->mutex);
if(status != 0)
err_abort(status,"Lock pipe stage");
while(true){
while(stage->data_ready != 1){
status = pthread_cond_wait(&stage->avail,&stage->mutex);
if(status != 0)
err_abort(status,"Wait for previous stage");
}
pipe_send(next_stage, stage->data + 1);
stage->data_ready = 0;
status = pthread_cond_signal(&stage->ready);
if(status != 0)
err_abort(status,"Wake next stage");
}
}

int pipe_create(pipe_t * pipe,int stages){
int pipe_index;
stage_t **link = &pipe->head, *new_stage, *stage;
int status;

//	status = pthread_mutex_init(&pipe->mutex,NULL);
//	if(status != 0)
//		err_abort(status , "Init pipe mutex");
pipe->stages = stages;
pipe->active = 0;

for(pipe_index = 0; pipe_index <= stages; pipe_index++){
new_stage = (stage_t*)malloc(sizeof(stage_t));
if(new_stage == NULL)
errno_abort("Allocate stage");
status = pthread_mutex_init(&new_stage->mutex,NULL);
if(status != 0)
err_abort(status,"Init stage mutex");
status = pthread_cond_init(&new_stage->avail,NULL);
if(status != 0)
err_abort(status,"Init avail condition");
status = pthread_cond_init(&new_stage->ready,NULL);
if(status != 0)
err_abort(status, "Init ready condition");
new_stage->data_ready = 0;
*link = new_stage;
link = &new_stage->next;
}
*link = (stage_t*)NULL;
pipe->tail = new_stage;
for(stage = pipe->head;stage->next != NULL;stage = stage->next){
status = pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage);
if(status != 0)
err_abort(status,"Create pipe stage");
}
return 0;
}

int pipe_result(pipe_t *pipe,long *result){
stage_t *tail = pipe->tail;
long value;
int empty = 0;
int status;

//	status = pthread_mutex_lock(&pipe->mutex);
//	if(status != 0)
//		err_abort(status, "Lock pipe mutex");
if(pipe->active <= 0)
empty = 1;
else
pipe->active--;
//	status = pthread_mutex_unlock(&pipe->mutex);
//	if(status != 0)
//		err_abort(status, "Unlock pipe mutex");
if(empty)
return 0;

pthread_mutex_lock(&tail->mutex);
while(!tail->data_ready)
pthread_cond_wait(&tail->avail,&tail->mutex);
*result = tail->data;
tail->data_ready = 0;
pthread_cond_signal(&tail->ready);
pthread_mutex_unlock(&tail->mutex);
return 1;
}

int pipe_start(pipe_t *pipe, long value){
int status;

//	status = pthread_mutex_lock(&pipe->mutex);
//	if(status != 0)
//		err_abort(status,"Lock pipe mutex");
pipe->active++;
if(pipe->active > pipe->stages + 1){//如果活跃中的工作单元比工作单元总数+1(tail所指向的工作单元)还多,则说明工作流已满
char c = '\0';
while(true){//下面是简单的处理过程
printf("**The pipe is full**\n");
printf("Get result and add new data?(y/n):");
scanf("%c",&c);
if(c == 'y'){
//				status = pthread_mutex_unlock(&pipe->mutex);
//				if(status != 0)
//					err_abort(status, "Unlock pipe mutex");
long res;
pipe_result(pipe,&res);
printf("Result is %ld\n",res);
//				status = pthread_mutex_lock(&pipe->mutex);
//				if(status != 0)
//					err_abort(status,"Lock pipe mutex");
break;
}else if(c == 'n'){
pipe->active--;
//				status = pthread_mutex_unlock(&pipe->mutex);
//				if(status != 0)
//					err_abort(status, "Unlock pipe mutex");
return 0;
}
}

}//至此处理过程结束
//	status = pthread_mutex_unlock(&pipe->mutex);
//	if(status != 0)
//		err_abort(status, "Unlock pipe mutex");
pipe_send(pipe->head,value);
return 0;
}

int main(int argc, char *argv[]){
pipe_t my_pipe;
long value,result;
int status;
char line[128];

pipe_create(&my_pipe, 10);
printf("Enter integer values, or \"=\" for next result\n");

while(true){
printf("Data>");
if(fgets(line,sizeof(line),stdin) == NULL)exit(0);
printf("%s",line);
if(strlen(line) <= 1) continue;
if(strlen(line) <= 2 && line[0] == '='){
if(pipe_result(&my_pipe,&result))
printf("Result is %ld\n",result);
else
printf("Pipe is empty\n");
}else{
if(sscanf(line,"%ld",&value) < 1)
fprintf(stderr, "Enter an integer value\n");
else pipe_start(&my_pipe,value);
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: