实现一个线程池
2016-03-31 19:43
615 查看
一.线程最主要的三个同步机制
1.信号量
2.互斥锁
3.条件变量
二.对三个同步机制分别实现一个包装类
三.实现线程池
动态创建线程十分消耗时间,如果有一个线程池,用户请求到来时,从线程池取一个空闲的线程来处理用户的请求,请求处理完后,线程又变为空闲状态,等待下次被使用。
核心数据结构有两个:线程容器 、请求队列
1.线程容器
这里用一个vector容器来存放线程池里面所有线程的id
2.请求队列
这里用list容器来存放所有请求,请求处理按fifo的顺序
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"
template< typename T >
class threadpool
{
public:
threadpool( int thread_number = 8, int max_requests = 10000 );
~threadpool();
bool append( T* request );
private:
static void* worker( void* arg );
void run();
private:
int thread_number_like;//当前线程池中的线程个数
int max_requests_like;//最大请求数
//pthread_t* threads_like;
vector< pthread> threads_like;//线程容器
std::list< T* > workqueue_like;//请求队列
locker queuelocker_like;//请求队列的访问互斥锁
sem queuestat_like;//用于请求队列与空闲线程同步的信号量
bool stop_like;//结束所有线程,线程池此时没有线程
};
template< typename T >
threadpool< T >::threadpool( int thread_number, int max_requests ) :
m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL )
{
if( ( thread_number <= 0 ) || ( max_requests <= 0 ) )
{
throw std::exception();
}
threads_like.resize( thread_number_like);
if( thread_number_like!= threads_like.size() )
{
throw std::exception();
}
for ( int i = 0; i < thread_number_like; ++i )
{
printf( "create the %dth thread\n", i );
if( pthread_create( &threads_like [i], NULL, worker, this ) != 0 )//创建线程
{
threads_like.resize(0);
throw std::exception();
}
if( pthread_detach( m_threads[i] ) )//设置为脱离线程
{
threads_like.resize(0);
throw std::exception();
}
}
}
template< typename T >
threadpool< T >::~threadpool()
{
stop_like = true;
}
template< typename T >
bool threadpool< T >::append( T* request )
{
queuelocker_like.lock();
if ( workqueue_like.size() > max_requests_like )
{
queuelocker_like.unlock();
return false;
}
workqueue_like.push_back( request );
queuelocker_like.unlock();
queuestat_like.post();
return true;
}
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;//静态函数要调用动态成员run,必须通过参数arg得到
pool->run();//线程的执行体
return pool;
}
template< typename T >
void threadpool< T >::run()
{
while ( ! m_stop )
{
queuestat_like.wait();
queuelocker_like.lock();
if ( workqueue_like.empty() )
{
queuelocker_like.unlock();
continue;
}
T* request = workqueue_like.front();
workqueue_like.pop_front();
queuelocker_like.unlock();
if ( ! request )
{
continue;
}
request->process();//执行当前请求所对应的处理函数
}
}
#endif
注:1.这里的线程池模型中,每一个线程对应一个请求
2.这种方式保证了用户请求的及时处理,对请求的处理函数性能要求更小,因为这种模型并不要求请求处理过程是非堵塞的,因为一个请求的处理时延不会影响到系统对其他请求的处理(当然线程数必须能动态增加)。
3.这种方式对于高并发服务器并不是最优的,类似于nginx的一个进程对应多个用户请求的方式更有优势,nginx模型的优势主要有两个:一:进程数固定,不会因为同时有很多线程或者进程而占用过多的内存。二:nginx的工作进程数一般与cpu的核数一致,并可以把一个进程绑定到一个核上,这样就节省了进程切换或线程切换带来的系统开销
1.信号量
2.互斥锁
3.条件变量
二.对三个同步机制分别实现一个包装类
#ifdef LOCKER_H #define LOCKER_H #include <pthread.h> #include <semaphore.h> /*信号量的封装*/ class sem { public: sem() { if( sem_init( &sem_like, 0, 0)) { throw std::exception(); } } ~sem() { sem_destroy( &sem_like); } bool wait() { return sem_wait( &sem_like)== 0; } bool post() { return sem_post( &sem_like)== 0; } private: sem_t sem_like; } /*互斥锁的封装*/ class locker { public: locker() { if( pthread_mutex_init( &mutex_like,NULL) !=0) { throw std::exception(); } } ~locker() { pthread_mutex_destroy( &mutex_like); } bool lock() { return pthread_mutex_lock( &mutex_like)== 0; } bool unlock() { return pthread_mutex_unlock( &mutex_like); } private: pthread_mutex_t mutex_like; } /*条件变量的封装*/ class cond { public: cond() { if( pthread_mutex_init( &mutex_like,NULL)!= 0) { throw std::exception; } if( pthread_cond_init( &cond_like, NULL)!= 0) { //释放对应的互斥锁 pthread_mutex_destroy( &mutex_like); throw std::exception; } } ~cond() { pthread_mutex_destroy( &mutex_like); pthread_cond_destroy( &cond_like); } bool wait() { int flag= 0; pthread_mutex_lock( &mutex_like); flag= pthread_cond_wait( &cond_like, &mutex_like); pthread_mutex_unlock( &mutex_like); return flag== 0; } bool signal() { return pthread_cond_signal( &cond_like)== 0; } private: pthread_mutex_t mutex_like; pthread_cond_t cond_like; } #endif
三.实现线程池
动态创建线程十分消耗时间,如果有一个线程池,用户请求到来时,从线程池取一个空闲的线程来处理用户的请求,请求处理完后,线程又变为空闲状态,等待下次被使用。
核心数据结构有两个:线程容器 、请求队列
1.线程容器
这里用一个vector容器来存放线程池里面所有线程的id
2.请求队列
这里用list容器来存放所有请求,请求处理按fifo的顺序
#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "locker.h"
template< typename T >
class threadpool
{
public:
threadpool( int thread_number = 8, int max_requests = 10000 );
~threadpool();
bool append( T* request );
private:
static void* worker( void* arg );
void run();
private:
int thread_number_like;//当前线程池中的线程个数
int max_requests_like;//最大请求数
//pthread_t* threads_like;
vector< pthread> threads_like;//线程容器
std::list< T* > workqueue_like;//请求队列
locker queuelocker_like;//请求队列的访问互斥锁
sem queuestat_like;//用于请求队列与空闲线程同步的信号量
bool stop_like;//结束所有线程,线程池此时没有线程
};
template< typename T >
threadpool< T >::threadpool( int thread_number, int max_requests ) :
m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL )
{
if( ( thread_number <= 0 ) || ( max_requests <= 0 ) )
{
throw std::exception();
}
threads_like.resize( thread_number_like);
if( thread_number_like!= threads_like.size() )
{
throw std::exception();
}
for ( int i = 0; i < thread_number_like; ++i )
{
printf( "create the %dth thread\n", i );
if( pthread_create( &threads_like [i], NULL, worker, this ) != 0 )//创建线程
{
threads_like.resize(0);
throw std::exception();
}
if( pthread_detach( m_threads[i] ) )//设置为脱离线程
{
threads_like.resize(0);
throw std::exception();
}
}
}
template< typename T >
threadpool< T >::~threadpool()
{
stop_like = true;
}
template< typename T >
bool threadpool< T >::append( T* request )
{
queuelocker_like.lock();
if ( workqueue_like.size() > max_requests_like )
{
queuelocker_like.unlock();
return false;
}
workqueue_like.push_back( request );
queuelocker_like.unlock();
queuestat_like.post();
return true;
}
template< typename T >
void* threadpool< T >::worker( void* arg )
{
threadpool* pool = ( threadpool* )arg;//静态函数要调用动态成员run,必须通过参数arg得到
pool->run();//线程的执行体
return pool;
}
template< typename T >
void threadpool< T >::run()
{
while ( ! m_stop )
{
queuestat_like.wait();
queuelocker_like.lock();
if ( workqueue_like.empty() )
{
queuelocker_like.unlock();
continue;
}
T* request = workqueue_like.front();
workqueue_like.pop_front();
queuelocker_like.unlock();
if ( ! request )
{
continue;
}
request->process();//执行当前请求所对应的处理函数
}
}
#endif
注:1.这里的线程池模型中,每一个线程对应一个请求
2.这种方式保证了用户请求的及时处理,对请求的处理函数性能要求更小,因为这种模型并不要求请求处理过程是非堵塞的,因为一个请求的处理时延不会影响到系统对其他请求的处理(当然线程数必须能动态增加)。
3.这种方式对于高并发服务器并不是最优的,类似于nginx的一个进程对应多个用户请求的方式更有优势,nginx模型的优势主要有两个:一:进程数固定,不会因为同时有很多线程或者进程而占用过多的内存。二:nginx的工作进程数一般与cpu的核数一致,并可以把一个进程绑定到一个核上,这样就节省了进程切换或线程切换带来的系统开销
相关文章推荐
- nginx代理指定目录
- 访问Nginx发生SSL connection error的一种情况
- Nginx+Naxsi部署专业级Web应用防火墙
- CentOS 6.2实战部署Nginx+MySQL+PHP
- nginx中http核心模块的配置指令2
- nginx中http核心模块的配置指令3
- nginx中http核心模块的配置指令4
- nginx中http的fastcgi模块的配置指令1
- Nginx 学习笔记(一)
- 网站502与504错误分析
- 用zabbix监控nginx_status状态
- 艰难完成 nginx + puma 部署 rails 4的详细记录
- C#线程间不能调用剪切板的解决方法
- 把Lua编译进nginx步骤方法
- C#多线程学习之(四)使用线程池进行多线程的自动管理
- C#线程同步的三类情景分析
- C#获取进程或线程相关信息的方法
- C#停止线程的方法
- C#子线程更新UI控件的方法实例总结