线程池实例讲解(原创)
2013-08-09 09:30
155 查看
最近老看一些服务器网关的代码页看了一些开源的代码。一个重要的技术线程池。
何为线程池,所谓线程池就是一组用来处理,客户请求的线程组 这里的客户指代的是线程池服务的对象。
线程池的实现原理:
(1)消息队列调用函数,当有消息到来时候,将消息封装插入消息队列。
(2)有一个 线程池附服务线程,该线程负责检索消息队列,创建线程池线程,将该消息派发到线程池的某一线程处理。
(3)线程池线程参数,该参数应该是一个结构体,该结构体中某项指向一个消息,处理完成之后删除消息,进入等待状态。
源代码如下 代码注释有说明 故不在单独讲解代码:
编译
gcc -g main.c -lpthread -o main
连续运行程序三次 三次结果如下:
分析结果可以知道: 在线程池中并不是先分配处理线程的的消息先处理完成,这取决于操作系统线程调度。
由此可以得知如要顺序处理消息,这种方式不使用于线程池。
一些技巧:
线程池常用于服务器网络编程,线程池参数中往往携带输出socket ,当我们处理完消息后,将结果通过socket 发送给客户端。
如:web服务器,加载生成XML文件,通过socket 发送给客户端。
何为线程池,所谓线程池就是一组用来处理,客户请求的线程组 这里的客户指代的是线程池服务的对象。
线程池的实现原理:
(1)消息队列调用函数,当有消息到来时候,将消息封装插入消息队列。
(2)有一个 线程池附服务线程,该线程负责检索消息队列,创建线程池线程,将该消息派发到线程池的某一线程处理。
(3)线程池线程参数,该参数应该是一个结构体,该结构体中某项指向一个消息,处理完成之后删除消息,进入等待状态。
源代码如下 代码注释有说明 故不在单独讲解代码:
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <assert.h> #include <fcntl.h> #include <string.h> #include <time.h> #include <pthread.h> #include <sys/types.h> //消息队列基本元素 typedef struct _STPOOLMsgElement //消息队列基本元素 { char * param; //代表消息队列消息 实际情况中也可以使用结构体 struct _STPOOLMsgData *next; //下一条消息 }STPOOLMsgData,*PSTPOOLMsgData; //线程池基本元素 typedef struct _STPOOLThreadElement { pthread_t thread; //但前线程ID pthread_mutex_t busy; // 用来同步线程池线程 和线程池服务线程 pthread_cond_t nodata; // PSTPOOLMsgData pMsgData; //当前线程需要处理的事件,如果为空线程池空闲 struct _STPOOLThreadElement *next; //线程池的下一个线程 }STPOOLThreadElement,*PSTPOOLThreadElement; //变量定义 //消息队列肯定有头有尾头出尾巴进 多个线程要访问必须有同步 以及初始化 PSTPOOLMsgData pStMsgQuequeBegin = NULL; PSTPOOLMsgData pStMsgQuequeEnd = NULL; pthread_mutex_t queueAccess ;//= PTHREAD_MUTEX_INITIALIZER; pthread_cond_t queueEmpty; //= PTHREAD_COND_INITIALIZER; //线程池链表 线程池的线程不管先后顺序,每次任务来了检索空闲线程,派遣任务 PSTPOOLThreadElement pStPool = NULL; //先来看消息队列相关函数 消息插入函数 负责消息封装插入消息队列 void InsertMsg(char * pMsg) { //将参数封装成消息队列元素 PSTPOOLMsgData pMsgData = (PSTPOOLMsgData)malloc(sizeof(STPOOLMsgData)); if(!pMsgData) return; char *pMsgDupli = malloc(strlen(pMsg) + 1); //消息数据 if(!pMsgDupli) { free(pMsgData); return; } strcpy(pMsgDupli,pMsg); pMsgData->param = pMsgDupli; pMsgData->next = NULL; //插入消息 pthread_mutex_lock(&queueAccess); if (pStMsgQuequeEnd) { pStMsgQuequeEnd->next = pMsgData; } pStMsgQuequeEnd = pMsgData; if (!pStMsgQuequeBegin) { pStMsgQuequeBegin = pMsgData; } pthread_cond_broadcast(&queueEmpty); //通知消息分发线程有消息到来 pthread_mutex_unlock(&queueAccess); } //再来看看消息取出函数 采用阻塞模式,因为没有消息的时候所有线程都处于空闲状态 PSTPOOLMsgData GetNextMsg() { PSTPOOLMsgData Result = NULL; pthread_mutex_lock(&queueAccess); //如果没有事件,一直等待直道有事件产生 取出一个事件 while (pStMsgQuequeBegin == NULL) pthread_cond_wait(&queueEmpty, &queueAccess); Result = pStMsgQuequeBegin; pStMsgQuequeBegin = Result->next; if (!pStMsgQuequeBegin) //没有消息 { pStMsgQuequeEnd = NULL; } Result->next = NULL; pthread_mutex_unlock(&queueAccess); return Result; } //接下来看线程池线程 void *PoolLDealMsgThread(PSTPOOLThreadElement listElement) { int threadId = 0; pthread_mutex_lock(&listElement->busy); while (1) { while (listElement->pMsgData == NULL) //无事件处理 一直阻塞等待 pthread_cond_wait(&listElement->nodata, &listElement->busy); //处理事件,清除事件占用空间 threadId = listElement->thread; printf("threadID :%d Msg: %s \n",threadId,listElement->pMsgData->param); free(listElement->pMsgData->param); free(listElement->pMsgData); listElement->pMsgData = NULL; } pthread_mutex_unlock(&listElement->busy); pthread_exit(NULL); return NULL; } //接着看看线程池服务线程 这里调用DispatchMsg 函数分发消息 void DispatchMsg(PSTPOOLMsgData pMsgData) { int Result = 0; PSTPOOLThreadElement cur = NULL; //查找空闲线程处理消息 for (cur = pStPool; cur != NULL; cur = cur->next) { //双重锁定 提高程序性能 if (cur->pMsgData == NULL) { pthread_mutex_lock(&cur->busy); if (cur->pMsgData != NULL) { pthread_mutex_unlock(&cur->busy); continue; } cur->pMsgData = pMsgData; pthread_cond_broadcast(&cur->nodata); //通知线程池线程有事件到来 pthread_mutex_unlock(&cur->busy); return; } } //新建线程处理消息 cur = (PSTPOOLThreadElement)malloc(sizeof(STPOOLThreadElement)); if(!cur) { free(pMsgData->param); free(pMsgData); return; } memset(cur,0,sizeof(STPOOLThreadElement)); pthread_mutex_init(&cur->busy, NULL); pthread_cond_init(&cur->nodata, NULL); cur->pMsgData = pMsgData; cur->next = NULL; Result = pthread_create(&cur->thread, NULL, (void *(*)(void *)) PoolLDealMsgThread, cur); if (0 != Result) { free(cur); free(pMsgData->param); free(pMsgData); return; } //新线程插入线程池 cur->next = pStPool; pStPool = cur; } void *QueueDealFuction(void *data) { PSTPOOLMsgData pMsgData = NULL; while (1) { pMsgData = GetNextMsg(); //读取一条事件记录 DispatchMsg(pMsgData); } return NULL; } //好了基本线程池模型已经构建完毕 现在看看主函数 char testMsg[5][10] = {"hello!", "this", "is", "my", "testApp", } ; int main() { //初始化开启线程池服务线程 int i = 0; pthread_mutex_init(&queueAccess, NULL); pthread_cond_init(&queueEmpty, NULL); pthread_t queueHandler; if(0 != pthread_create(&queueHandler, NULL, (void *(*)(void *)) QueueDealFuction, NULL) ) { return -1; } //这里消息仅代表字符串 for(i=0 ;i < 5 ;i++) { InsertMsg(testMsg[i]); } sleep(10); return 0; }
编译
gcc -g main.c -lpthread -o main
连续运行程序三次 三次结果如下:
分析结果可以知道: 在线程池中并不是先分配处理线程的的消息先处理完成,这取决于操作系统线程调度。
由此可以得知如要顺序处理消息,这种方式不使用于线程池。
一些技巧:
线程池常用于服务器网络编程,线程池参数中往往携带输出socket ,当我们处理完消息后,将结果通过socket 发送给客户端。
如:web服务器,加载生成XML文件,通过socket 发送给客户端。
相关文章推荐
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解) (转载)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- inux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- 【原创】C#导出数据到EXCEL方法谈(附实例源码和超级无敌详细讲解)
- Java5中的线程池实例讲解
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- JSF+Spring+Hibernate的实例讲解(原创翻译)
- [置顶] linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)
- linux中fork()函数详解(原创!!实例讲解)