POSIX多线程程序设计 工作流例程2
2016-03-25 16:13
218 查看
之前的例程有说到超出工作单元数量的输入数据会使得整个进程进入死锁状态,然后改进了一下,让用户可以选择是获取数据再加入新的数据还是放弃加入新数据。
还有一点就是,我认为pipe_t中的mutex锁是多余的,因为其他线程只会修改工作单元内部的数据,而并不会影响工作流pipe链表的结构,而更改pipe_t的数据的只有pipe_result和pipe_start这两个函数,但是这两个函数只会在主线程中被调用,所以他们的运行是串行的,并不会发生线程上的冲突。
我试着将所有锁定,解锁pipe的mutex的代码都注释掉,测试来很多遍目前还没有发现问题。
还有一点就是,我认为pipe_t中的mutex锁是多余的,因为其他线程只会修改工作单元内部的数据,而并不会影响工作流pipe链表的结构,而更改pipe_t的数据的只有pipe_result和pipe_start这两个函数,但是这两个函数只会在主线程中被调用,所以他们的运行是串行的,并不会发生线程上的冲突。
我试着将所有锁定,解锁pipe的mutex的代码都注释掉,测试来很多遍目前还没有发现问题。
#include<pthread.h> #include "errors.h" typedef struct stage_tag{ pthread_mutex_t mutex; pthread_cond_t avail; pthread_cond_t ready; int data_ready; long data; pthread_t thread; struct stage_tag *next; }stage_t; typedef struct pipe_tag{ //pthread_mutex_t mutex; stage_t *head; stage_t *tail; int stages; int active; }pipe_t; int pipe_send(stage_t *stage,long data){ int status; status = pthread_mutex_lock(&stage->mutex); if(status != 0) return status; while(stage->data_ready){ status = pthread_cond_wait(&stage->ready,&stage->mutex); if(status != 0){ pthread_mutex_unlock(&stage->mutex); return status; } } stage->data = data; stage->data_ready = 1; status = pthread_cond_signal(&stage->avail); if(status != 0){ pthread_mutex_unlock(&stage->mutex); return status; } status = pthread_mutex_unlock(&stage->mutex); return status; } void *pipe_stage(void *arg){ stage_t *stage = (stage_t*)arg; stage_t *next_stage = stage->next; int status; status = pthread_mutex_lock(&stage->mutex); if(status != 0) err_abort(status,"Lock pipe stage"); while(true){ while(stage->data_ready != 1){ status = pthread_cond_wait(&stage->avail,&stage->mutex); if(status != 0) err_abort(status,"Wait for previous stage"); } pipe_send(next_stage, stage->data + 1); stage->data_ready = 0; status = pthread_cond_signal(&stage->ready); if(status != 0) err_abort(status,"Wake next stage"); } } int pipe_create(pipe_t * pipe,int stages){ int pipe_index; stage_t **link = &pipe->head, *new_stage, *stage; int status; // status = pthread_mutex_init(&pipe->mutex,NULL); // if(status != 0) // err_abort(status , "Init pipe mutex"); pipe->stages = stages; pipe->active = 0; for(pipe_index = 0; pipe_index <= stages; pipe_index++){ new_stage = (stage_t*)malloc(sizeof(stage_t)); if(new_stage == NULL) errno_abort("Allocate stage"); status = pthread_mutex_init(&new_stage->mutex,NULL); if(status != 0) err_abort(status,"Init stage mutex"); status = pthread_cond_init(&new_stage->avail,NULL); if(status != 0) err_abort(status,"Init avail condition"); status = pthread_cond_init(&new_stage->ready,NULL); if(status != 0) err_abort(status, "Init ready condition"); new_stage->data_ready = 0; *link = new_stage; link = &new_stage->next; } *link = (stage_t*)NULL; pipe->tail = new_stage; for(stage = pipe->head;stage->next != NULL;stage = stage->next){ status = pthread_create(&stage->thread,NULL,pipe_stage,(void*)stage); if(status != 0) err_abort(status,"Create pipe stage"); } return 0; } int pipe_result(pipe_t *pipe,long *result){ stage_t *tail = pipe->tail; long value; int empty = 0; int status; // status = pthread_mutex_lock(&pipe->mutex); // if(status != 0) // err_abort(status, "Lock pipe mutex"); if(pipe->active <= 0) empty = 1; else pipe->active--; // status = pthread_mutex_unlock(&pipe->mutex); // if(status != 0) // err_abort(status, "Unlock pipe mutex"); if(empty) return 0; pthread_mutex_lock(&tail->mutex); while(!tail->data_ready) pthread_cond_wait(&tail->avail,&tail->mutex); *result = tail->data; tail->data_ready = 0; pthread_cond_signal(&tail->ready); pthread_mutex_unlock(&tail->mutex); return 1; } int pipe_start(pipe_t *pipe, long value){ int status; // status = pthread_mutex_lock(&pipe->mutex); // if(status != 0) // err_abort(status,"Lock pipe mutex"); pipe->active++; if(pipe->active > pipe->stages + 1){//如果活跃中的工作单元比工作单元总数+1(tail所指向的工作单元)还多,则说明工作流已满 char c = '\0'; while(true){//下面是简单的处理过程 printf("**The pipe is full**\n"); printf("Get result and add new data?(y/n):"); scanf("%c",&c); if(c == 'y'){ // status = pthread_mutex_unlock(&pipe->mutex); // if(status != 0) // err_abort(status, "Unlock pipe mutex"); long res; pipe_result(pipe,&res); printf("Result is %ld\n",res); // status = pthread_mutex_lock(&pipe->mutex); // if(status != 0) // err_abort(status,"Lock pipe mutex"); break; }else if(c == 'n'){ pipe->active--; // status = pthread_mutex_unlock(&pipe->mutex); // if(status != 0) // err_abort(status, "Unlock pipe mutex"); return 0; } } }//至此处理过程结束 // status = pthread_mutex_unlock(&pipe->mutex); // if(status != 0) // err_abort(status, "Unlock pipe mutex"); pipe_send(pipe->head,value); return 0; } int main(int argc, char *argv[]){ pipe_t my_pipe; long value,result; int status; char line[128]; pipe_create(&my_pipe, 10); printf("Enter integer values, or \"=\" for next result\n"); while(true){ printf("Data>"); if(fgets(line,sizeof(line),stdin) == NULL)exit(0); printf("%s",line); if(strlen(line) <= 1) continue; if(strlen(line) <= 2 && line[0] == '='){ if(pipe_result(&my_pipe,&result)) printf("Result is %ld\n",result); else printf("Pipe is empty\n"); }else{ if(sscanf(line,"%ld",&value) < 1) fprintf(stderr, "Enter an integer value\n"); else pipe_start(&my_pipe,value); } } }
相关文章推荐
- 图片.pkm格式转换为.png格式
- capistrano3.4.0部署不能自动登录
- 当CAD图形中无法进行复制、粘贴时,请将此DWG另存一下或许就可以了
- capistrano3.4.0部署不能自动登录
- StudyJams第一课的学习
- 关于整数和真分数的四则运算的算法,整数已通过,但真分数不能化到最简的问题求大神指导啊
- 做一个合格的程序猿之浅析Spring AOP源码(十七) Spring AOP开发大作战
- python笔记---常见异常
- io端口与io内存详解(zz)
- 图像识别工程师 VS The application has requested the Runtime to terminate it in an unusual way.
- iOS开发 ☞ UITableView详解
- iOS开发Swift篇(01) 变量&常量&元组
- 3.25课·········JavaScript简介与语法
- BCM_I2C函数更改
- 巧用 构造函数
- centos关闭selinux
- 华为招聘练习--小明的筷子
- Nginx源码编译安装
- 160多个android开源代码汇总
- Android之调用系统照相机并裁剪