您的位置:首页 > 其它

线程池实例讲解(原创)

2013-08-09 09:30 155 查看
  最近老看一些服务器网关的代码页看了一些开源的代码。一个重要的技术线程池。

  何为线程池,所谓线程池就是一组用来处理,客户请求的线程组 这里的客户指代的是线程池服务的对象。

  

  线程池的实现原理:

    (1)消息队列调用函数,当有消息到来时候,将消息封装插入消息队列。

    (2)有一个 线程池附服务线程,该线程负责检索消息队列,创建线程池线程,将该消息派发到线程池的某一线程处理。

    (3)线程池线程参数,该参数应该是一个结构体,该结构体中某项指向一个消息,处理完成之后删除消息,进入等待状态。

 

  源代码如下 代码注释有说明 故不在单独讲解代码:

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
#include <fcntl.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <sys/types.h>

//消息队列基本元素
typedef  struct _STPOOLMsgElement       //消息队列基本元素
{
char * param;                       //代表消息队列消息  实际情况中也可以使用结构体
struct _STPOOLMsgData *next;         //下一条消息

}STPOOLMsgData,*PSTPOOLMsgData;

//线程池基本元素
typedef struct _STPOOLThreadElement
{
pthread_t thread;                      //但前线程ID
pthread_mutex_t busy;                  // 用来同步线程池线程 和线程池服务线程
pthread_cond_t nodata;                 //
PSTPOOLMsgData pMsgData;               //当前线程需要处理的事件,如果为空线程池空闲
struct _STPOOLThreadElement *next;     //线程池的下一个线程

}STPOOLThreadElement,*PSTPOOLThreadElement;

//变量定义
//消息队列肯定有头有尾头出尾巴进  多个线程要访问必须有同步 以及初始化
PSTPOOLMsgData     pStMsgQuequeBegin = NULL;
PSTPOOLMsgData     pStMsgQuequeEnd = NULL;
pthread_mutex_t queueAccess ;//= PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t     queueEmpty; //= PTHREAD_COND_INITIALIZER;

//线程池链表  线程池的线程不管先后顺序,每次任务来了检索空闲线程,派遣任务
PSTPOOLThreadElement  pStPool = NULL;

//先来看消息队列相关函数  消息插入函数 负责消息封装插入消息队列
void InsertMsg(char * pMsg)
{
//将参数封装成消息队列元素
PSTPOOLMsgData pMsgData = (PSTPOOLMsgData)malloc(sizeof(STPOOLMsgData));
if(!pMsgData)  return;

char *pMsgDupli = malloc(strlen(pMsg) + 1);   //消息数据
if(!pMsgDupli)
{
free(pMsgData);
return;
}
strcpy(pMsgDupli,pMsg);
pMsgData->param = pMsgDupli;
pMsgData->next = NULL;

//插入消息
pthread_mutex_lock(&queueAccess);
if (pStMsgQuequeEnd)
{
pStMsgQuequeEnd->next = pMsgData;
}
pStMsgQuequeEnd = pMsgData;

if (!pStMsgQuequeBegin)
{
pStMsgQuequeBegin = pMsgData;
}

pthread_cond_broadcast(&queueEmpty);    //通知消息分发线程有消息到来
pthread_mutex_unlock(&queueAccess);
}

//再来看看消息取出函数  采用阻塞模式,因为没有消息的时候所有线程都处于空闲状态
PSTPOOLMsgData GetNextMsg()
{
PSTPOOLMsgData Result = NULL;

pthread_mutex_lock(&queueAccess);
//如果没有事件,一直等待直道有事件产生    取出一个事件
while (pStMsgQuequeBegin == NULL)
pthread_cond_wait(&queueEmpty, &queueAccess);

Result = pStMsgQuequeBegin;
pStMsgQuequeBegin = Result->next;
if (!pStMsgQuequeBegin)     //没有消息
{
pStMsgQuequeEnd = NULL;
}
Result->next = NULL;
pthread_mutex_unlock(&queueAccess);
return Result;
}

//接下来看线程池线程
void *PoolLDealMsgThread(PSTPOOLThreadElement listElement)
{
int threadId = 0;
pthread_mutex_lock(&listElement->busy);
while (1)
{
while (listElement->pMsgData == NULL)    //无事件处理 一直阻塞等待
pthread_cond_wait(&listElement->nodata, &listElement->busy);

//处理事件,清除事件占用空间

threadId = listElement->thread;
printf("threadID :%d  Msg: %s \n",threadId,listElement->pMsgData->param);

free(listElement->pMsgData->param);
free(listElement->pMsgData);

listElement->pMsgData = NULL;
}
pthread_mutex_unlock(&listElement->busy);
pthread_exit(NULL);
return NULL;
}

//接着看看线程池服务线程 这里调用DispatchMsg 函数分发消息
void DispatchMsg(PSTPOOLMsgData pMsgData)
{
int Result = 0;
PSTPOOLThreadElement cur = NULL;

//查找空闲线程处理消息
for (cur = pStPool; cur != NULL; cur = cur->next)
{
//双重锁定   提高程序性能
if (cur->pMsgData == NULL)
{
pthread_mutex_lock(&cur->busy);
if (cur->pMsgData != NULL)
{
pthread_mutex_unlock(&cur->busy);
continue;
}
cur->pMsgData = pMsgData;
pthread_cond_broadcast(&cur->nodata);   //通知线程池线程有事件到来
pthread_mutex_unlock(&cur->busy);
return;
}
}
//新建线程处理消息
cur = (PSTPOOLThreadElement)malloc(sizeof(STPOOLThreadElement));
if(!cur)
{
free(pMsgData->param);
free(pMsgData);
return;
}

memset(cur,0,sizeof(STPOOLThreadElement));

pthread_mutex_init(&cur->busy, NULL);
pthread_cond_init(&cur->nodata, NULL);
cur->pMsgData = pMsgData;
cur->next = NULL;
Result = pthread_create(&cur->thread, NULL, (void *(*)(void *)) PoolLDealMsgThread, cur);

if (0 != Result)
{
free(cur);
free(pMsgData->param);
free(pMsgData);
return;
}
//新线程插入线程池
cur->next = pStPool;
pStPool = cur;
}

void *QueueDealFuction(void *data)
{
PSTPOOLMsgData pMsgData = NULL;
while (1)
{
pMsgData = GetNextMsg();  //读取一条事件记录
DispatchMsg(pMsgData);
}
return NULL;
}

//好了基本线程池模型已经构建完毕 现在看看主函数
char testMsg[5][10]  = {"hello!",
"this",
"is",
"my",
"testApp",
} ;

int main()
{
//初始化开启线程池服务线程
int i = 0;
pthread_mutex_init(&queueAccess, NULL);
pthread_cond_init(&queueEmpty, NULL);

pthread_t queueHandler;
if(0 != pthread_create(&queueHandler, NULL, (void *(*)(void *)) QueueDealFuction, NULL) )
{
return -1;
}
//这里消息仅代表字符串
for(i=0 ;i < 5 ;i++)
{
InsertMsg(testMsg[i]);
}
sleep(10);
return 0;
}


编译

gcc -g main.c -lpthread -o main

连续运行程序三次 三次结果如下:



分析结果可以知道: 在线程池中并不是先分配处理线程的的消息先处理完成,这取决于操作系统线程调度。

由此可以得知如要顺序处理消息,这种方式不使用于线程池。

一些技巧:

  线程池常用于服务器网络编程,线程池参数中往往携带输出socket ,当我们处理完消息后,将结果通过socket 发送给客户端。

  如:web服务器,加载生成XML文件,通过socket 发送给客户端。

    

  

  

  

  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: