您的位置:首页 > 运维架构 > Linux

linux多线程示例

2014-04-30 18:33 302 查看
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

typedef void* (*fun)(void*);

fun fun1, fun2;

pthread_mutex_t pmu = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond;
pthread_t pid1, pid2;
int flag = 0;
int gnum = 0;
int gsub = 100;

void *  func1(void * para)
{
int k = (int)para;
printf("func1, ******\n");
while(gnum<=100)
{
pthread_mutex_lock(&pmu);
printf("gnum == %d", gnum);
while(gnum==50)
{
printf("suspend thread1 at gnum==50 !!! \n");
pthread_cond_wait(&cond, &pmu);
gnum++;
}
++gnum;
++flag;
++k;
//printf("flag = %d, k = %d\n", flag, k);
pthread_mutex_unlock(&pmu);
printf("I am func1\n");
}
pthread_exit((void*)0);

}

void * func2(void * para)
{
int f = (int)para;
printf("f == %d\n", f);
printf("pthread2 start running !\n");
void * ret = NULL;
while(gsub>=0)
{
pthread_mutex_lock(&pmu);
gsub--;
printf("gsub= %d ", gsub);
if(gsub == 20)
{
printf("now gsnb ==20, and send signal\n");
pthread_cond_signal(&cond);
}
++flag;
++f;
printf("flag = %d, f = %d\n", flag, f);
pthread_mutex_unlock(&pmu);
printf("I am func2 \n");
}
//pthread_join(pid1, &ret);
pthread_exit((void*)0);
}

int main()
{
int id = 0;
void * ret = NULL;
int key = 5;

pthread_cond_init(&cond, NULL);  //属性设置NULL默认属性
id = pthread_create(&pid1, NULL, func1, (void*)key);
if(id != 0)
{
printf("pthread_create error !\n");
exit(0);
}

if(pthread_create(&pid2, NULL, func2, (void*)key))
{
printf("pthread_create error ! \n");
exit(0);
}
87     pthread_join(pid2, &ret);      //等待pid2线程退出
88     pthread_join(pid1, &ret);      //等待pid1线程退出

    //pthread_detach(pid1);        //主线程与pid1线程进行分离,一般用来实现异步返回
    //pthread_detach(pid2);        //同上
     pthread_exit((void*)0);
}


gcc test_thread.c -lpthread
./a.out

线程池实例代码:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <pthread.h>
#include <assert.h>

typedef struct worker
{
//回调函数,任务运行时会调用此函数,也可以声明为其他形式;
void * (*process)(void *arg);   //该函数返回值是任意类型的;参数也是任意类型的;`
void *arg;  //回调函数的参数;
struct worker *next;
}CThread_worker;

//线程池结构
typedef struct
{
pthread_mutex_t queue_lock;     //互斥量
pthread_cond_t queue_ready;     //条件变量

//链表结构, 线程池中所有等待任务
CThread_worker *queue_head;

//是否销毁线程池
int shutdown;
pthread_t *threadid;

//线程池中允许的活动线程数目;
//线程池中允许的活动线程数目;
int max_thread_num;
//当前等待队列的任务数目;
int  cur_queue_size;

}CThread_pool;

int pool_add_worker(void * (*process)(void *arg), void *arg);
void * thread_routine(void *arg);

static CThread_pool *pool = NULL;
void pool_init(int max_thread_num)
{
pool = (CThread_pool*)malloc(sizeof(CThread_pool));

//初始化互斥量;
pthread_mutex_init(&(pool->queue_lock), NULL);
//初始化条件变量
pthread_cond_init(&(pool->queue_ready), NULL);

pool->queue_head = NULL;

//最大线程数目
pool->max_thread_num = max_thread_num;
//当前线程数目
pool->cur_queue_size = 0;

pool->shutdown = 0;
pool->threadid = (pthread_t*)malloc(max_thread_num * sizeof(pthread_t));
int i = 0;
for(i=0; i<max_thread_num;i++)
{
pthread_create(&(pool->threadid[i]), NULL, thread_routine, NULL);
}
}

//向线程池中加入任务
int pool_add_worker(void*(*process)(void *arg), void *arg)
{
//构建一个新任务
CThread_worker *newworker = (CThread_worker *)malloc(sizeof(CThread_worker));
newworker->process = process;
newworker->arg = arg;
//别忘了置空
newworker->next = NULL;

//加锁互斥量
pthread_mutex_lock(&(pool->queue_lock));
//将任务加入到等待队列中
CThread_worker *member = pool->queue_head;
if(member !=NULL)
{
while(member->next != NULL)
member = member->next;
member->next = newworker;
}
else
{
pool->queue_head = newworker;
}

assert(pool->queue_head != NULL);
pool->cur_queue_size++;
pthread_mutex_unlock(&(pool->queue_lock));

//好了,等待队列中有任务了,唤醒一个等待线程;
//     注意如果所有线程都在忙碌,这句没有任何作用
pthread_cond_signal(&(pool->queue_ready));
return 0;
}

/*销毁线程池,等待队列中的任务不会再被执行,
*但是正在运行的线程会一直 把任务运行完后 再退出;
*/

int pool_destroy()
{
if(pool->shutdown)
return -1;   //防止两次调用
pool->shutdown = 1;

//唤醒所有等待线程,线程池要销毁了
pthread_cond_broadcast(&(pool->queue_ready));

//阻塞等待线程退出, 否则就成僵尸了
int i;
for(i=0; i<pool->max_thread_num; i++)
{
pthread_join(pool->threadid[i], NULL);
}

free(pool->threadid);

//销毁等待队列
CThread_worker *head = NULL;
while(pool->queue_head != NULL)
{
head=pool->queue_head;
pool->queue_head = pool->queue_head->next;
free(head);
}

//条件变量和互斥量也别忘了销毁
pthread_mutex_destroy(&(pool->queue_lock));
pthread_cond_destroy(&(pool->queue_ready));

free(pool);
/*销毁后指针置空是个好习惯*/
pool = NULL;
return 0;
}

void* thread_routine(void *arg)
{
printf("start thread 0x%x\n", pthread_self());
while(1)
{
pthread_mutex_lock(&(pool->queue_lock));
/*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
*pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
while(pool->cur_queue_size == 0 && !pool->shutdown)
{
printf("thread 0x%x is waiting \n", pthread_self());
pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock));
}

//线程池要销毁了;
if(pool->shutdown)
{
//遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
pthread_mutex_unlock(&(pool->queue_lock));
printf("thread 0x %x will exit \n", pthread_self());
pthread_exit(NULL);
}

printf("thread 0x %x is starting to work \n", pthread_self());

//使用断言
assert(pool->cur_queue_size!= 0);
assert(pool->queue_head!= NULL);

//等待队列长度减去1,并取出链表中的头元素
pool->cur_queue_size--;
CThread_worker *worker = pool->queue_head;
pool->queue_head = worker->next;
pthread_mutex_unlock(&(pool->queue_lock));

//调用回调函数,执行任务
(*(worker->process))(worker->arg);
free(worker);
worker = NULL;
}
//这一句正常情况下是不可达的
pthread_exit(NULL);
}

//test code
void *myprocess(void *arg)
{
printf("threadid is 0x%x, working on task %d\n", pthread_self(), *(int*)arg);
sleep(1);  //休息一秒,延长任务的执行时间
return NULL;
}

int main(int argc, char** argv)
{
pool_init(3); /*线程池中最多三个活动线程*/

//连续向线程池中放入10个任务;
int *workingnum = (int*)malloc(sizeof(int)*10);
int i;
for(i=0; i< 10;i++)
{
workingnum[i] = i;
pool_add_worker(myprocess, &workingnum[i]);
}

sleep(5);
//销毁线程池;
pool_destroy();
free(workingnum);

return 0;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: