SPServer中线程池实现部分分析
2015-01-05 13:50
232 查看
原文链接:http://blog.sina.com.cn/s/blog_67b570090100jsda.html
一次偶遇,看到了Half-Sync/Half-async相关论文,提到SPServer,以下它的英文描述:SPServer is a server framework library written on C++ that implements the Half-Sync/Half-Async andLeader/Follower patterns.
It's based on libevent in order to utilize the best I/O loop on any platform。
对SPServer,我感兴趣的还是它的线程池实现,可能是因为应用场景关系,SPServer对线程池实现相对简单,没有一些复杂的花样。
对线程池的封装主要提供了dispatch函数,将一个将要调用的函数分配给线程池里面的线程。
class SP_ThreadPool {
public:
typedef void ( * DispatchFunc_t )( void * );
SP_ThreadPool( int maxThreads, const char * tag = 0 );
~SP_ThreadPool();
/// @return 0 : OK, -1 : cannot create thread
int dispatch( DispatchFunc_t dispatchFunc, void *arg );
int getMaxThreads();
private:
char * mTag;
int mMaxThreads;
int mIndex;
int mTotal;
int mIsShutdown;
pthread_mutex_t mMainMutex;
pthread_cond_t mIdleCond;
pthread_cond_t mFullCond;
pthread_cond_t mEmptyCond;
SP_Thread_t ** mThreadList;
static void * wrapperFunc( void * );
int saveThread( SP_Thread_t * thread );
};
下面是dispatch函数代码:
int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
int ret = 0;
pthread_attr_t attr;
SP_Thread_t * thread = NULL;
pthread_mutex_lock( &mMainMutex );
if( mIndex <= 0 && mTotal >= mMaxThreads ) {
pthread_cond_wait( &mIdleCond, &mMainMutex );
}
if( mIndex <= 0 ) {
SP_Thread_t * thread = ( SP_Thread_t * )malloc( sizeof( SP_Thread_t ) );
thread->mId = 0;
pthread_mutex_init( &thread->mMutex, NULL );
pthread_cond_init( &thread->mCond, NULL );
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr,PTHREAD_CREATE_DETACHED );
if( 0 == pthread_create( &( thread->mId ), &attr, wrapperFunc, thread ) ) {
mTotal++;
syslog( LOG_NOTICE, "[tp@%s] create thread#%ld\n", mTag, thread->mId );
} else {
ret = -1;
syslog( LOG_WARNING, "[tp@%s] cannot create thread\n", mTag );
pthread_mutex_destroy( &thread->mMutex );
pthread_cond_destroy( &thread->mCond );
free( thread );
}
} else {
mIndex--;
thread = mThreadList[ mIndex ];
mThreadList[ mIndex ] = NULL;
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
pthread_mutex_lock( &thread->mMutex );
pthread_cond_signal( &thread->mCond ) ;
pthread_mutex_unlock ( &thread->mMutex );
}
pthread_mutex_unlock( &mMainMutex );
return ret;
}
如果当前没有空闲的线程,那么创建一个新线程来处理该事务。每个线程运行一个包裹函数
void * SP_ThreadPool :: wrapperFunc( void * arg )
{
SP_Thread_t * thread = ( SP_Thread_t * )arg;
for( ; 0 == thread->mParent->mIsShutdown; ) {
thread->mFunc( thread->mArg );
pthread_mutex_lock( &thread->mMutex );
if( 0 == thread->mParent->saveThread( thread ) ) {
pthread_cond_wait( &thread->mCond, &thread->mMutex );
pthread_mutex_unlock( &thread->mMutex );
} else {
pthread_mutex_unlock( &thread->mMutex );
pthread_cond_destroy( &thread->mCond );
pthread_mutex_destroy( &thread->mMutex );
free( thread );
break;
}
}
pthread_mutex_lock( &thread->mParent->mMainMutex );
thread->mParent->mTotal--;
if( thread->mParent->mTotal <= 0 ) {
pthread_cond_signal( &thread->mParent->mEmptyCond );
}
pthread_mutex_unlock( &thread->mParent->mMainMutex );
return NULL;
}
在该函数中,如果处里完事务,那么调用pthread_cond_wait等待新任务。
从上面实现中,从线程池角度来说,它并没有缓冲待处理的作务,这是因为在spserver中,采用了一个线程来接收对msgquene队列中的消息,对于接收到的每一个消息,调用threadPool中dispatch函数来处理该任务,如果线程池中没有空闲的线程而且线程数达到最大线程数,当前线程就等待。这里或许就是所谓的Leader/Followers模式应用吧。
由于任务入队列与从队列中接收队列在不同线程中,添加任务线程与接收队列线程Worker通信主要采用unix域套接字,当有任务添加到任务队列中时,通过写文件通知读端,有任务添加到任务队列中。
一次偶遇,看到了Half-Sync/Half-async相关论文,提到SPServer,以下它的英文描述:SPServer is a server framework library written on C++ that implements the Half-Sync/Half-Async andLeader/Follower patterns.
It's based on libevent in order to utilize the best I/O loop on any platform。
对SPServer,我感兴趣的还是它的线程池实现,可能是因为应用场景关系,SPServer对线程池实现相对简单,没有一些复杂的花样。
对线程池的封装主要提供了dispatch函数,将一个将要调用的函数分配给线程池里面的线程。
class SP_ThreadPool {
public:
typedef void ( * DispatchFunc_t )( void * );
SP_ThreadPool( int maxThreads, const char * tag = 0 );
~SP_ThreadPool();
/// @return 0 : OK, -1 : cannot create thread
int dispatch( DispatchFunc_t dispatchFunc, void *arg );
int getMaxThreads();
private:
char * mTag;
int mMaxThreads;
int mIndex;
int mTotal;
int mIsShutdown;
pthread_mutex_t mMainMutex;
pthread_cond_t mIdleCond;
pthread_cond_t mFullCond;
pthread_cond_t mEmptyCond;
SP_Thread_t ** mThreadList;
static void * wrapperFunc( void * );
int saveThread( SP_Thread_t * thread );
};
下面是dispatch函数代码:
int SP_ThreadPool :: dispatch( DispatchFunc_t dispatchFunc, void *arg )
{
int ret = 0;
pthread_attr_t attr;
SP_Thread_t * thread = NULL;
pthread_mutex_lock( &mMainMutex );
if( mIndex <= 0 && mTotal >= mMaxThreads ) {
pthread_cond_wait( &mIdleCond, &mMainMutex );
}
if( mIndex <= 0 ) {
SP_Thread_t * thread = ( SP_Thread_t * )malloc( sizeof( SP_Thread_t ) );
thread->mId = 0;
pthread_mutex_init( &thread->mMutex, NULL );
pthread_cond_init( &thread->mCond, NULL );
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
pthread_attr_init( &attr );
pthread_attr_setdetachstate( &attr,PTHREAD_CREATE_DETACHED );
if( 0 == pthread_create( &( thread->mId ), &attr, wrapperFunc, thread ) ) {
mTotal++;
syslog( LOG_NOTICE, "[tp@%s] create thread#%ld\n", mTag, thread->mId );
} else {
ret = -1;
syslog( LOG_WARNING, "[tp@%s] cannot create thread\n", mTag );
pthread_mutex_destroy( &thread->mMutex );
pthread_cond_destroy( &thread->mCond );
free( thread );
}
} else {
mIndex--;
thread = mThreadList[ mIndex ];
mThreadList[ mIndex ] = NULL;
thread->mFunc = dispatchFunc;
thread->mArg = arg;
thread->mParent = this;
pthread_mutex_lock( &thread->mMutex );
pthread_cond_signal( &thread->mCond ) ;
pthread_mutex_unlock ( &thread->mMutex );
}
pthread_mutex_unlock( &mMainMutex );
return ret;
}
如果当前没有空闲的线程,那么创建一个新线程来处理该事务。每个线程运行一个包裹函数
void * SP_ThreadPool :: wrapperFunc( void * arg )
{
SP_Thread_t * thread = ( SP_Thread_t * )arg;
for( ; 0 == thread->mParent->mIsShutdown; ) {
thread->mFunc( thread->mArg );
pthread_mutex_lock( &thread->mMutex );
if( 0 == thread->mParent->saveThread( thread ) ) {
pthread_cond_wait( &thread->mCond, &thread->mMutex );
pthread_mutex_unlock( &thread->mMutex );
} else {
pthread_mutex_unlock( &thread->mMutex );
pthread_cond_destroy( &thread->mCond );
pthread_mutex_destroy( &thread->mMutex );
free( thread );
break;
}
}
pthread_mutex_lock( &thread->mParent->mMainMutex );
thread->mParent->mTotal--;
if( thread->mParent->mTotal <= 0 ) {
pthread_cond_signal( &thread->mParent->mEmptyCond );
}
pthread_mutex_unlock( &thread->mParent->mMainMutex );
return NULL;
}
在该函数中,如果处里完事务,那么调用pthread_cond_wait等待新任务。
从上面实现中,从线程池角度来说,它并没有缓冲待处理的作务,这是因为在spserver中,采用了一个线程来接收对msgquene队列中的消息,对于接收到的每一个消息,调用threadPool中dispatch函数来处理该任务,如果线程池中没有空闲的线程而且线程数达到最大线程数,当前线程就等待。这里或许就是所谓的Leader/Followers模式应用吧。
由于任务入队列与从队列中接收队列在不同线程中,添加任务线程与接收队列线程Worker通信主要采用unix域套接字,当有任务添加到任务队列中时,通过写文件通知读端,有任务添加到任务队列中。
相关文章推荐
- SP短信平台-线程池实现
- BlogEngine.Net架构与源代码分析系列part14:实现分析(下)——网站页面上值得参考的部分
- 网络服务器开发框架spserver源码分析 (二)
- 第三部分 MediaPlayer的主要实现分析
- Linux 实时技术与典型实现分析, 第 2 部分: Ingo Molnar 的实时补丁
- 使用线程池实现Server端,Socket编程?
- Asp.net MVC源码分析--Model Validation(Server端)实现(1)
- 第二章(契约 实现一个双向服务契约的server 部分)
- SPServer源码分析(四): 核心服务器类SP_Server分析
- MPEG-2复用器PSI信息分析部分的FPGA实现
- ArcGIS.Server.9.2.DotNet实现点、线、面的缓冲分析Buffer
- 优秀的轻量级网络开发框架spserver源码分析(一)
- 终于凑出点时间,将偷懒很久的bas服务器框架实现了,有点类似于spserver
- 网络服务器开发框架spserver源码分析 (一)
- 优秀的轻量级网络开发框架spserver源码分析(二)
- Linux 实时技术与典型实现分析, 第 1 部分: 介绍
- ArcGIS.Server.9.2.DotNet实现点、线、面的缓冲分析Buffer
- SPServer : 一个基于线程池(包括HAHS和LF)的高并发 server 框架
- spserver 架构分析(一)
- 分布式文件系统KFS源码阅读与分析(三):RPC实现机制(MetaServer端)