使用线程的几种方式(1) 流水线
2015-08-30 22:37
330 查看
1、流水线:每个线程反复的在数据集上执行同一种操作,并把操作结果传递给下一步骤其他线程。“数据元素”流被串行的执行。
下面例子来自《posix多线程程序设计》,流水线中的每个线程将它的输入加一,然后传递给下一个线程。当不停的输入时,程序依次从最后一个线程开始逐个往前阻塞,直到从最后一个线程读取数据,并发送ready信号,表示前一个线程可以给该线程传递数据。
当data_ready=0时,等待条件变量avail,条件avail用来通知某步要处理的数据已准备好,在准备好数据之后,data_ready=1,并发送avail信号,通知线程,可以给下一步骤线程发送数据;当data_ready=1时,等待变量ready,ready表示可以发送数据,在发送上一份数据之后,data_reay=0,并发送ready信号,通知线程,可以接收上一步骤线程发来的数据。总之,avail信号把数据发送给下一步骤,ready接收上一步骤数据。
下面例子来自《posix多线程程序设计》,流水线中的每个线程将它的输入加一,然后传递给下一个线程。当不停的输入时,程序依次从最后一个线程开始逐个往前阻塞,直到从最后一个线程读取数据,并发送ready信号,表示前一个线程可以给该线程传递数据。
当data_ready=0时,等待条件变量avail,条件avail用来通知某步要处理的数据已准备好,在准备好数据之后,data_ready=1,并发送avail信号,通知线程,可以给下一步骤线程发送数据;当data_ready=1时,等待变量ready,ready表示可以发送数据,在发送上一份数据之后,data_reay=0,并发送ready信号,通知线程,可以接收上一步骤线程发来的数据。总之,avail信号把数据发送给下一步骤,ready接收上一步骤数据。
/* * pipe.c * 线程使用方式流水线示例程序. * make:cc -g -lpthread -Wall -o pipe pipe.c */ #include <pthread.h> #include "errors.h" typedef struct stage_tag { pthread_mutex_t mutex; /*保护数据*/ pthread_cond_t avail; /*当前流水节点处于空闲可用状态, 等待接收数据,进行加工*/ pthread_cond_t ready; /*当前处于数据准备状态,写入当前节点,*/ int data_ready; /*节点数据状态, 1 表示准备好,等待发送 0 表示没有数据*/ long data; pthread_t thread; struct stage_tag *next; } stage_t; typedef struct pipe_tag { pthread_mutex_t mutex; struct stage_tag *head; struct stage_tag *tail; int stages; /*流水线上的单元stage_tag数量*/ int active; } pipe_t; int pipe_send(stage_t * stage, long data) { int status; status = pthread_mutex_lock(&stage->mutex); if(status != 0) return status; /* 一般一个条件表达式都是在一个互斥锁的保护下被检查。 * 当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。 * 当另一个线程改变了条件的值并向条件变量发出信号时, * 等待在这个条件变量上的一个线程或所有线程被唤醒, * 接着都试图再次占有相应的互斥锁。阻塞在条件变量上的线程被唤醒以后, * 直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。 * 所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。 * 最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。 * 如:pthread_mutex_lock(); while (condition_is_false) pthread_cond_wait(); pthread_mutex_unlock(); */ while(stage->data_ready){ status = pthread_cond_wait(&stage->ready, &stage->mutex); if(status != 0){ pthread_mutex_unlock(&stage->mutex); return status; } } stage->data = data; stage->data_ready = 1; status = pthread_cond_signal(&stage->avail); if(status != 0){ pthread_mutex_unlock(&stage->mutex); return status; } status = pthread_mutex_unlock(&stage->mutex); return status; } void *pipe_stage(void *arg) { stage_t *stage = (stage_t*)arg; stage_t *next_stage = stage->next; int status; status = pthread_mutex_lock(&stage->mutex); if(status != 0) err_abort(status, "pthread_mutex_lock failed\n"); while(1) { while(stage->data_ready != 1) { status = pthread_cond_wait(&stage->avail, &stage->mutex); if(status != 0) err_abort(status, "pthread_cond_wait failed\n"); } pipe_send(next_stage, stage->data + 1); stage->data_ready = 0; status = pthread_cond_signal(&stage->ready); if(status != 0) err_abort(status, "pthread_cond_signal failed\n"); } return (void*)0; } /* * 创建一个流水线,初始化所有stage_tag,创建线程 */ int pipe_create(pipe_t *pipe, int stages) { int pipe_index; int status; stage_t **link, *new_stage, *stage; link = &pipe->head; /* * 初始化pipe * pipe为一个单链表 */ status = pthread_mutex_init(&pipe->mutex, NULL); if (status != 0) err_abort(status, "pthread_mutex_int failed\n"); pipe->stages = stages; pipe->active = 0; for(pipe_index = 0; pipe_index <=stages; pipe_index++){ new_stage = (stage_t*)malloc(sizeof(stage_t)); if(new_stage == NULL) errno_abort("malloc failed\n"); status = pthread_mutex_init(&new_stage->mutex, NULL); if(status != 0) err_abort(status, "pthread_mutex_init failed\n"); status = pthread_cond_init(&new_stage->avail, NULL); if(status != 0) err_abort(status, "pthread_cond_init failed\n"); status = pthread_cond_init(&new_stage->ready, NULL); if(status != 0) err_abort(status, "pthread_cond_init failed\n"); new_stage->data_ready = 0; *link = new_stage; link = &new_stage->next; } *link = (stage_t*)NULL; pipe->tail = new_stage; /* 创建线程*/ for(stage = pipe->head; stage->next != NULL; stage = stage->next) { status = pthread_create(&stage->thread, NULL, pipe_stage, (void*)stage); if(status != 0) err_abort(status, "pthread_create failed\n"); } return 0; } int pipe_start(pipe_t *pipe, long value) { pthread_mutex_lock(&pipe->mutex); pipe->active++; pthread_mutex_unlock(&pipe->mutex); pipe_send(pipe->head, value); return 0; } int pipe_result(pipe_t *pipe, long *result) { stage_t *tail = pipe->tail; int empty = 0; pthread_mutex_lock(&pipe->mutex); pipe->active <= 0 ? empty = 1 : pipe->active--; pthread_mutex_unlock(&pipe->mutex); if(empty) return 0; pthread_mutex_lock(&tail->mutex); while(!tail->data_ready) pthread_cond_wait(&tail->avail, &tail->mutex); *result = tail->data; tail->data_ready = 0; pthread_cond_signal(&tail->ready); pthread_mutex_unlock(&tail->mutex); return 1; } int main(int argc, char *argv[]) { pipe_t mypipe; long value, result; char line[128]; pipe_create(&mypipe, 10); while(1) { printf("Data>"); if ( fgets(line, sizeof(line), stdin) == NULL) exit(0); if (strlen(line) <= 0) continue; if (strlen(line) == 2 && line[0] == 'q') { printf("results is: "); while(pipe_result(&mypipe, &result)){ printf("%ld \n", result);} printf("\n"); exit(0); } if (strlen(line) == 2 && line[0] == '=') { if(pipe_result(&mypipe, &result)) printf("result is %ld\n", result); else printf("the pipe is empty\n"); } else if(strncmp(line, "==", 2) == 0) { printf("results is: "); while(pipe_result(&mypipe, &result)){ printf("%ld \n", result);} printf("\n"); } else { if(sscanf(line, "%ld", &value) < 1) fprintf(stderr, " Enter an integer value\n"); else pipe_start(&mypipe, value); } } }
相关文章推荐
- crontab命令
- uoj #31. 【UR #2】猪猪侠再战括号序列 贪心
- Javascript
- JSON(一):基本语法
- 探索Java反射机制
- 基于天天动听API开发在线音乐查询网站
- 1031. 查验身份证(15)
- scrollview 嵌套listview刷新问题
- Java依赖注入库框架 Dagger的源码分析(一)
- trait里的默认函数会为每个类型拷贝一份
- 网络协议之访问服务端的三种方式
- hihocoder #1223 : 不等式
- 设计模式14:Command 命令模式(行为型模式)
- html5学习之路_005
- 如何在linux下判断web服务是否开启?
- 在服务器上排除问题的头五分钟
- 图的常用存储结构
- 软件测试基础_零基础学测试
- 和redis谈一场恋爱(第一天邂逅)
- [Usaco2005 Open]Disease Manangement 疾病管理|状态压缩动态规划