您的位置:首页 > 其它

使用线程的几种方式(1) 流水线

2015-08-30 22:37 330 查看
1、流水线:每个线程反复的在数据集上执行同一种操作,并把操作结果传递给下一步骤其他线程。“数据元素”流被串行的执行。

下面例子来自《posix多线程程序设计》,流水线中的每个线程将它的输入加一,然后传递给下一个线程。当不停的输入时,程序依次从最后一个线程开始逐个往前阻塞,直到从最后一个线程读取数据,并发送ready信号,表示前一个线程可以给该线程传递数据。

当data_ready=0时,等待条件变量avail,条件avail用来通知某步要处理的数据已准备好,在准备好数据之后,data_ready=1,并发送avail信号,通知线程,可以给下一步骤线程发送数据;当data_ready=1时,等待变量ready,ready表示可以发送数据,在发送上一份数据之后,data_reay=0,并发送ready信号,通知线程,可以接收上一步骤线程发来的数据。总之,avail信号把数据发送给下一步骤,ready接收上一步骤数据。

/*
* pipe.c
* 线程使用方式流水线示例程序.
* make:cc -g  -lpthread -Wall -o pipe pipe.c
*/
#include <pthread.h>
#include "errors.h"

typedef struct stage_tag {
pthread_mutex_t mutex; /*保护数据*/
pthread_cond_t  avail; /*当前流水节点处于空闲可用状态, 等待接收数据,进行加工*/
pthread_cond_t  ready; /*当前处于数据准备状态,写入当前节点,*/
int             data_ready;  /*节点数据状态, 1 表示准备好,等待发送  0 表示没有数据*/
long            data;
pthread_t       thread;
struct stage_tag *next;
} stage_t;

typedef struct pipe_tag {
pthread_mutex_t mutex;
struct stage_tag *head;
struct stage_tag *tail;
int             stages; /*流水线上的单元stage_tag数量*/
int             active;
} pipe_t;

int pipe_send(stage_t * stage, long data)
{
int status;
status = pthread_mutex_lock(&stage->mutex);
if(status != 0) return status;
/* 一般一个条件表达式都是在一个互斥锁的保护下被检查。
* 当条件表达式未被满足时,线程将仍然阻塞在这个条件变量上。
* 当另一个线程改变了条件的值并向条件变量发出信号时,
* 等待在这个条件变量上的一个线程或所有线程被唤醒,
* 接着都试图再次占有相应的互斥锁。阻塞在条件变量上的线程被唤醒以后,
* 直到pthread_cond_wait()函数返回之前条件的值都有可能发生变化。
* 所以函数返回以后,在锁定相应的互斥锁之前,必须重新测试条件值。
* 最好的测试方法是循环调用pthread_cond_wait函数,并把满足条件的表达式置为循环的终止条件。
* 如:pthread_mutex_lock(); while (condition_is_false) pthread_cond_wait(); pthread_mutex_unlock();
*/
while(stage->data_ready){
status = pthread_cond_wait(&stage->ready, &stage->mutex);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
}
stage->data = data;
stage->data_ready = 1;
status = pthread_cond_signal(&stage->avail);
if(status != 0){
pthread_mutex_unlock(&stage->mutex);
return status;
}
status = pthread_mutex_unlock(&stage->mutex);
return status;
}

void *pipe_stage(void *arg)
{
stage_t *stage = (stage_t*)arg;
stage_t *next_stage = stage->next;
int status;

status = pthread_mutex_lock(&stage->mutex);
if(status != 0) err_abort(status, "pthread_mutex_lock failed\n");

while(1) {
while(stage->data_ready != 1) {
status = pthread_cond_wait(&stage->avail, &stage->mutex);
if(status != 0) err_abort(status, "pthread_cond_wait failed\n");
}
pipe_send(next_stage, stage->data + 1);
stage->data_ready = 0;
status = pthread_cond_signal(&stage->ready);
if(status != 0) err_abort(status, "pthread_cond_signal failed\n");
}
return (void*)0;
}

/*
* 创建一个流水线,初始化所有stage_tag,创建线程
*/
int pipe_create(pipe_t *pipe, int stages)
{
int pipe_index;
int status;
stage_t **link, *new_stage, *stage;
link = &pipe->head;

/*
* 初始化pipe
* pipe为一个单链表
*/
status = pthread_mutex_init(&pipe->mutex, NULL);
if (status != 0) err_abort(status, "pthread_mutex_int failed\n");
pipe->stages = stages;
pipe->active = 0;

for(pipe_index = 0; pipe_index <=stages; pipe_index++){
new_stage = (stage_t*)malloc(sizeof(stage_t));
if(new_stage == NULL) errno_abort("malloc failed\n");
status = pthread_mutex_init(&new_stage->mutex, NULL);
if(status != 0) err_abort(status, "pthread_mutex_init failed\n");
status = pthread_cond_init(&new_stage->avail, NULL);
if(status != 0) err_abort(status, "pthread_cond_init failed\n");
status = pthread_cond_init(&new_stage->ready, NULL);
if(status != 0) err_abort(status, "pthread_cond_init failed\n");
new_stage->data_ready = 0;
*link = new_stage;
link = &new_stage->next;
}
*link = (stage_t*)NULL;
pipe->tail = new_stage;

/* 创建线程*/
for(stage = pipe->head; stage->next != NULL; stage = stage->next)
{
status = pthread_create(&stage->thread, NULL, pipe_stage, (void*)stage);
if(status != 0) err_abort(status, "pthread_create failed\n");
}
return 0;
}

int pipe_start(pipe_t *pipe, long value)
{
pthread_mutex_lock(&pipe->mutex);
pipe->active++;
pthread_mutex_unlock(&pipe->mutex);
pipe_send(pipe->head, value);
return 0;
}

int pipe_result(pipe_t *pipe, long *result)
{
stage_t *tail = pipe->tail;
int empty = 0;

pthread_mutex_lock(&pipe->mutex);
pipe->active <= 0 ? empty = 1 : pipe->active--;
pthread_mutex_unlock(&pipe->mutex);
if(empty) return 0;
pthread_mutex_lock(&tail->mutex);
while(!tail->data_ready)
pthread_cond_wait(&tail->avail, &tail->mutex);
*result = tail->data;
tail->data_ready = 0;
pthread_cond_signal(&tail->ready);
pthread_mutex_unlock(&tail->mutex);
return 1;
}

int main(int argc, char *argv[])
{
pipe_t mypipe;
long value, result;
char line[128];

pipe_create(&mypipe, 10);

while(1) {
printf("Data>");
if ( fgets(line, sizeof(line), stdin) == NULL) exit(0);
if (strlen(line) <= 0) continue;
if (strlen(line) == 2 && line[0] == 'q') {
printf("results is: ");
while(pipe_result(&mypipe, &result)){ printf("%ld \n", result);}
printf("\n");
exit(0);
}
if (strlen(line) == 2 && line[0] == '=') {
if(pipe_result(&mypipe, &result)) printf("result is %ld\n", result);
else printf("the pipe is empty\n");
} else if(strncmp(line, "==", 2) == 0) {
printf("results is: ");
while(pipe_result(&mypipe, &result)){ printf("%ld \n", result);}
printf("\n");
} else {
if(sscanf(line, "%ld", &value) < 1) fprintf(stderr, " Enter an integer value\n");
else pipe_start(&mypipe, value);
        } 
    }
}



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: