(七) POSIX 线程 ——线程池实例
2014-08-12 16:27
253 查看
相关数据结构:
1.线程池中线程要执行的任务CThread_worker 封装,所有的任务以单链表组织。
2.线程池结构。互斥量和条件变量同步全局的线程池pool。
static CThread_pool *pool = NULL;
相关函数:
线程池初始化
void pool_init(int max_thread_num)
{
pool = (CThread_pool*)malloc(sizeof(CThread_pool));
pthread_mutex_init(&(pool->queue_lock),NULL);
pthread_cond_init(&(pool->queue_ready),NULL);
pool->queue_head = NULL;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->tid = (pthread_t *)malloc(max_thread_num*sizeof(pthread_t));
int i;
for(i = 0;i < max_thread_num;++i)
{
pthread_create(&(pool->tid[i]),NULL,thread_routine,NULL);
}
}
活动线程执行函数
void * thread_routine(void *arg)
{
printf("starting thread 0x%X\n",(unsigned int)pthread_self());
while(1)
{
//临界区保护,
pthread_mutex_lock(&(pool->queue_lock));
while(pool->cur_queue_size == 0 && !pool->shutdown)
{
printf("thread 0x%x is waiting\n",(unsigned int) pthread_self());
//条件变量queue->ready由互斥量queue->lock保护
//条件未满足则调用线程进入休眠状态,互斥量解锁
//把线程放入等待条件的线程列表上和互斥量解锁是原子操作
pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));
}
//等待队列中有任务时,线程被唤醒,pthread_cond_wait返回
//销毁线程池则退出线程
if(pool->shutdown)
{
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x%x will exit\n",(unsigned int)pthread_self());
pthread_exit(NULL);
}
printf("thread 0x%x is starting to work\n",(unsigned int)pthread_self());
assert(pool->cur_queue_size != 0);
assert(pool->queue_head != NULL);
//等待队列长度减去1,并取出链表头元素
pool->cur_queue_size--;
CThread_worker *work = pool->queue_head;
pool->queue_head = work->next;
pthread_mutex_unlock(&(pool->queue_lock));
//回调函数,执行任务
(*(work->process))(work->arg);
free(work);
work = NULL;
}
pthread_exit(NULL);
}
向线程池中加入任务
int pool_add_worker(void *(*process)(void *arg),void *arg)
{
//构造一个新任务
CThread_worker *new_worker = (CThread_worker *)malloc(sizeof(CThread_worker));
new_worker->process = process;
new_worker->arg = arg;
new_worker->next = NULL;
pthread_mutex_lock(&(pool->queue_lock));
/*构造一个新任务*/
CThread_worker *member = pool->queue_head;
if(member != NULL)
{
while(member->next != NULL)
member = member->next;
member->next = new_worker;
}
else
pool->queue_head = new_worker;
assert(pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock));
/*每ci添加一个任务,线程池为空,活动线程就有任务可以执行
向休眠进程发出唤醒信号*/
pthread_cond_signal(&(pool->queue_ready));
return 0;
}
销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出
主函数
int main(void)
{
pool_init(3);
int *workingnum = (int *)malloc(sizeof(int)*10);
sleep(3);
int i;
for(i = 0;i < 10;++i)
{
workingnum[i] = i;
pool_add_worker(myprocess,&workingnum[i]);
}
sleep(5);
pool_destroy();
free(workingnum);
return 0;
}
运行结果:
starting thread 0xC2B3E700
thread 0xc2b3e700 is waiting
starting thread 0xC233D700
thread 0xc233d700 is waiting
starting thread 0xC1B3C700
thread 0xc1b3c700 is waiting
thread 0xc1b3c700 is starting to work
thread id is 0xc1b3c700, working on task0
thread 0xc233d700 is starting to work
thread id is 0xc233d700, working on task1
thread 0xc2b3e700 is starting to work
thread id is 0xc2b3e700, working on task2
thread 0xc1b3c700 is starting to work
thread 0xc2b3e700 is starting to work
thread id is 0xc1b3c700, working on task3
thread id is 0xc2b3e700, working on task4
thread 0xc233d700 is starting to work
thread id is 0xc233d700, working on task5
thread 0xc1b3c700 is starting to work
thread id is 0xc1b3c700, working on task6
thread 0xc2b3e700 is starting to work
thread id is 0xc2b3e700, working on task7
thread 0xc233d700 is starting to work
thread id is 0xc233d700, working on task8
thread 0xc1b3c700 will exit
thread 0xc2b3e700 will exit
thread 0xc233d700 will exit
1.线程池中线程要执行的任务CThread_worker 封装,所有的任务以单链表组织。
typedef struct worker { /*回调函数,任务运行时会调用此函数汉调函数是具体要执行的工作*/ void *(*process) (void *arg); void *arg;/*回调函数的参数*/ struct worker *next; //下一任务 } CThread_worker;
2.线程池结构。互斥量和条件变量同步全局的线程池pool。
typedef struct threadpool { //互斥量和条件变量 pthread_mutex_t queue_lock; pthread_cond_t queue_ready; //任务等待队列 CThread_worker *queue_head; //是否销毁线程池 int shutdown; //活动线程的线程ID,动态申请其内存 pthread_t *tid; //线程池中允许的活动线程数目 int max_thread_num; //当前等待队列的任务数目 int cur_queue_size; }CThread_pool;
static CThread_pool *pool = NULL;
相关函数:
线程池初始化
void pool_init(int max_thread_num)
{
pool = (CThread_pool*)malloc(sizeof(CThread_pool));
pthread_mutex_init(&(pool->queue_lock),NULL);
pthread_cond_init(&(pool->queue_ready),NULL);
pool->queue_head = NULL;
pool->max_thread_num = max_thread_num;
pool->cur_queue_size = 0;
pool->tid = (pthread_t *)malloc(max_thread_num*sizeof(pthread_t));
int i;
for(i = 0;i < max_thread_num;++i)
{
pthread_create(&(pool->tid[i]),NULL,thread_routine,NULL);
}
}
活动线程执行函数
void * thread_routine(void *arg)
{
printf("starting thread 0x%X\n",(unsigned int)pthread_self());
while(1)
{
//临界区保护,
pthread_mutex_lock(&(pool->queue_lock));
while(pool->cur_queue_size == 0 && !pool->shutdown)
{
printf("thread 0x%x is waiting\n",(unsigned int) pthread_self());
//条件变量queue->ready由互斥量queue->lock保护
//条件未满足则调用线程进入休眠状态,互斥量解锁
//把线程放入等待条件的线程列表上和互斥量解锁是原子操作
pthread_cond_wait(&(pool->queue_ready),&(pool->queue_lock));
}
//等待队列中有任务时,线程被唤醒,pthread_cond_wait返回
//销毁线程池则退出线程
if(pool->shutdown)
{
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x%x will exit\n",(unsigned int)pthread_self());
pthread_exit(NULL);
}
printf("thread 0x%x is starting to work\n",(unsigned int)pthread_self());
assert(pool->cur_queue_size != 0);
assert(pool->queue_head != NULL);
//等待队列长度减去1,并取出链表头元素
pool->cur_queue_size--;
CThread_worker *work = pool->queue_head;
pool->queue_head = work->next;
pthread_mutex_unlock(&(pool->queue_lock));
//回调函数,执行任务
(*(work->process))(work->arg);
free(work);
work = NULL;
}
pthread_exit(NULL);
}
向线程池中加入任务
int pool_add_worker(void *(*process)(void *arg),void *arg)
{
//构造一个新任务
CThread_worker *new_worker = (CThread_worker *)malloc(sizeof(CThread_worker));
new_worker->process = process;
new_worker->arg = arg;
new_worker->next = NULL;
pthread_mutex_lock(&(pool->queue_lock));
/*构造一个新任务*/
CThread_worker *member = pool->queue_head;
if(member != NULL)
{
while(member->next != NULL)
member = member->next;
member->next = new_worker;
}
else
pool->queue_head = new_worker;
assert(pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock));
/*每ci添加一个任务,线程池为空,活动线程就有任务可以执行
向休眠进程发出唤醒信号*/
pthread_cond_signal(&(pool->queue_ready));
return 0;
}
void * myprocess (void *arg) { printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg); sleep (1);/*休息一秒,延长任务的执行时间*/ return NULL; }
销毁线程池,等待队列中的任务不会再被执行,但是正在运行的线程会一直把任务运行完后再退出
int pool_destroy() { //防止重复调用 if(pool->shutdown) return -1; pool->shutdown = 1; //唤醒所有等待线程 pthread_cond_broadcast(&(pool->queue_ready)); //阻塞等待线程推出 int i; for(i = 0 ;i < pool->max_thread_num;++i) pthread_join(pool->tid[i],NULL); free(pool->tid); CThread_worker *head = NULL; //释放内存 while(pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } //条件变量和互斥量销毁 pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free(pool); pool = NULL; return 0; }
主函数
int main(void)
{
pool_init(3);
int *workingnum = (int *)malloc(sizeof(int)*10);
sleep(3);
int i;
for(i = 0;i < 10;++i)
{
workingnum[i] = i;
pool_add_worker(myprocess,&workingnum[i]);
}
sleep(5);
pool_destroy();
free(workingnum);
return 0;
}
运行结果:
starting thread 0xC2B3E700
thread 0xc2b3e700 is waiting
starting thread 0xC233D700
thread 0xc233d700 is waiting
starting thread 0xC1B3C700
thread 0xc1b3c700 is waiting
thread 0xc1b3c700 is starting to work
thread id is 0xc1b3c700, working on task0
thread 0xc233d700 is starting to work
thread id is 0xc233d700, working on task1
thread 0xc2b3e700 is starting to work
thread id is 0xc2b3e700, working on task2
thread 0xc1b3c700 is starting to work
thread 0xc2b3e700 is starting to work
thread id is 0xc1b3c700, working on task3
thread id is 0xc2b3e700, working on task4
thread 0xc233d700 is starting to work
thread id is 0xc233d700, working on task5
thread 0xc1b3c700 is starting to work
thread id is 0xc1b3c700, working on task6
thread 0xc2b3e700 is starting to work
thread id is 0xc2b3e700, working on task7
thread 0xc233d700 is starting to work
thread id is 0xc233d700, working on task8
thread 0xc1b3c700 will exit
thread 0xc2b3e700 will exit
thread 0xc233d700 will exit
相关文章推荐
- JDK1.5中的线程池(java.util.concurrent.ThreadPoolExecutor)使用简介,线程邮件发送实例
- java线程及操作实例,线程池简单例子
- Daniel Robbins sed 实例 POSIX 线程详解
- 线程与线程池,实例比较。
- 线程实例,线程池,重入锁,队列
- 线程 线程池 线程同步 实例
- spring线程池在新的线程中获取不到注入的实例
- 基于线程池的线程管理(BlockingQueue生产者消费者方式)实例
- Linux 多线程编程( POSIX )( 二 )----->代码区 ( pthread_attr_t 线程属性实例 )
- 【多线程】 java线程实例(测试阻塞队列&&线程池)
- POSIX 线程详解(1)
- 利用C#线程机制实现应用程序的单实例运行
- 实例解析C++/CLI线程之多任务
- 线程生成者与消费者实例
- Windows服务器端编程-第二章 设备IO与线程间通信-9-I/O完成端口对线程池的管理
- 不知道怎么将一个线程添加到线程池?
- java线程实例详析
- 实例解析C++/CLI线程之线程状态持久性
- POSIX 线程详解(1)——一种支持内存共享的简捷工具
- 线程实例的收集