POCO线程池分析
2017-12-01 11:14
253 查看
一、接口
创建个minCapacity线程的线程池,线程池最多拥有maxCapacity个线程,在基本的minCapacity线程外的线程空闲时间超过idleTimes,将自动被杀死。
二、创建过程
先创建minCapacity个PooledThread对象并保存到对象容器中,并同时执行start()函数:
_thread是Poco::Thread对象,具体构造函数:
Thread类继承于Poco里面ThreadImpl类,而这个ThreadImpl类是对线程创建等相关函数的封装。看看Thread::strart()接口:
这里的startImpl调用的就是:
我们最后看下,createImpl接口:
经过一连串的调用,我们终于看到最底层的创建线程方法,和我们普通写应用层线程创建一致,采用_beginthreadex方法。
我们可以很轻易的创建一个线程池:
三、它是如何动态管理所创建的线程?
我们先使用掉一个线程,调用方法:
我们使用线程方法,是采用ThreadPool::start()方法:
关键的函数出现了getThread(),获取创建好的线程,执行start()方法。
这个函数很简单,首先遍历之前创建并存储到容器的线程,判断哪个线程是处理空闲状态,如果有则直接取出对象赋值。如果不存在空闲的线程,则判断当前已创建线程的总数量,是否超过预设的最大值,如果没有则创建新的线程。
最后直接激活
整体分析下来,Poco采用了更加严谨的逻辑处理线程池的分配。那如何增大一开始设置的最大线程数呢,我们采用
在我们增加最大线程数的同时,还执行了housekeep()函数:重新整理已有线程池,这边就涉及到自动清除空闲太久的线程方法:
直接遍历已创建的线程,空闲线程则比对空闲的时间是否超过设置的时间,如果超过则添加到另一个容器中expiredThreads,非空闲线程都加入activeThreads中。这里面有个设计,可能大伙看的不是很明白,为什么都区分超时和非超时的线程,最后还要将超时的expiredThreads内容插入到,非超时的idleThreads里面呢?其实,是为了防止已激活的和未超时的总数达不到limit值,并且我们是将expiredThreads插入到的是idleThreads的尾部,即使后面遍历的时候,也是先遍历未超时的。
最后遍历idleThreads,当总数小于limit时,我们先插入空闲的线程,总数达到limit后,剩下的空闲线程直接执行release()释放。最后再将已激活的线程加入到_threads尾部。着就是线程池的重新整理。
…
五、调用实例
ThreadPool(int minCapacity = 2, int maxCapacity = 16, int idleTime = 60, int stackSize = POCO_THREAD_STACK_SIZE); /// Creates a thread pool with minCapacity threads. /// If required, up to maxCapacity threads are created /// a NoThreadAvailableException exception is thrown. /// If a thread is running idle for more than idleTime seconds, /// and more than minCapacity threads are running, the thread /// is killed. Threads are created with given stack size.
创建个minCapacity线程的线程池,线程池最多拥有maxCapacity个线程,在基本的minCapacity线程外的线程空闲时间超过idleTimes,将自动被杀死。
二、创建过程
ThreadPool::ThreadPool(int minCapacity, int maxCapacity, int idleTime, int stackSize): _minCapacity(minCapacity), _maxCapacity(maxCapacity), _idleTime(idleTime), _serial(0), _age(0), _stackSize(stackSize) { poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0); for (int i = 0; i < _minCapacity; i++) { PooledThread* pThread = createThread(); _threads.push_back(pThread); pThread->start(); } }
先创建minCapacity个PooledThread对象并保存到对象容器中,并同时执行start()函数:
void PooledThread::start() { _thread.start(*this); _started.wait(); }
_thread是Poco::Thread对象,具体构造函数:
Thread::Thread(): _id(uniqueId()), _name(makeName()), _pTLS(0) { }
Thread类继承于Poco里面ThreadImpl类,而这个ThreadImpl类是对线程创建等相关函数的封装。看看Thread::strart()接口:
void Thread::start(Runnable& target) { startImpl(target); }
这里的startImpl调用的就是:
void ThreadImpl::startImpl(Runnable& target) { if (isRunningImpl()) throw SystemException("thread already running"); _pRunnableTarget = ⌖ createImpl(runnableEntry, this); }
我们最后看下,createImpl接口:
void ThreadImpl::createImpl(Entry ent, void* pData) { #if defined(_DLL) _thread = CreateThread(NULL, _stackSize, ent, pData, 0, &_threadId); #else unsigned threadId; _thread = (HANDLE) _beginthreadex(NULL, _stackSize, ent, this, 0, &threadId); _threadId = static_cast<DWORD>(threadId); #endif if (!_thread) throw SystemException("cannot create thread"); if (_prio != PRIO_NORMAL_IMPL && !SetThreadPriority(_thread, _prio)) throw SystemException("cannot set thread priority"); }
经过一连串的调用,我们终于看到最底层的创建线程方法,和我们普通写应用层线程创建一致,采用_beginthreadex方法。
我们可以很轻易的创建一个线程池:
ThreadPool pool(2, 3, 3);
三、它是如何动态管理所创建的线程?
我们先使用掉一个线程,调用方法:
RunnableAdapter<CThreadPoolTest> ra(*this, &CThreadPoolTest::Count); pool.start(ra); //!<用掉一个
我们使用线程方法,是采用ThreadPool::start()方法:
void ThreadPool::start(Runnable& target) { getThread()->start(Thread::PRIO_NORMAL, target); }
关键的函数出现了getThread(),获取创建好的线程,执行start()方法。
PooledThread* ThreadPool::getThread() { FastMutex::ScopedLock lock(_mutex); if (++_age == 32) housekeep(); PooledThread* pThread = 0; for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it) { if ((*it)->idle()) pThread = *it; } if (!pThread) { if (_threads.size() < _maxCapacity) { pThread = createThread(); try { pThread->start(); _threads.push_back(pThread); } catch (...) { delete pThread; throw; } } else throw NoThreadAvailableException(); } pThread->activate(); return pThread; }
这个函数很简单,首先遍历之前创建并存储到容器的线程,判断哪个线程是处理空闲状态,如果有则直接取出对象赋值。如果不存在空闲的线程,则判断当前已创建线程的总数量,是否超过预设的最大值,如果没有则创建新的线程。
最后直接激活
pThread->activate();就是将空闲状态置为false
void PooledThread::activate() { FastMutex::ScopedLock lock(_mutex); poco_assert (_idle); _idle = false; _targetCompleted.reset(); }
整体分析下来,Poco采用了更加严谨的逻辑处理线程池的分配。那如何增大一开始设置的最大线程数呢,我们采用
pool.addCapacity(1);的方法,将最大线程数+1。
void ThreadPool::addCapacity(int n) { FastMutex::ScopedLock lock(_mutex); poco_assert (_maxCapacity + n >= _minCapacity); _maxCapacity += n; housekeep(); }
在我们增加最大线程数的同时,还执行了housekeep()函数:重新整理已有线程池,这边就涉及到自动清除空闲太久的线程方法:
void ThreadPool::housekeep() { _age = 0; if (_threads.size() <= _minCapacity) return; ThreadVec idleThreads; ThreadVec expiredThreads; ThreadVec activeThreads; idleThreads.reserve(_threads.size()); activeThreads.reserve(_threads.size()); for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it) { if ((*it)->idle()) { if ((*it)->idleTime() < _idleTime) idleThreads.push_back(*it); else expiredThreads.push_back(*it); } else activeThreads.push_back(*it); } int n = (int) activeThreads.size(); int limit = (int) idleThreads.size() + n; if (limit < _minCapacity) limit = _minCapacity; idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end()); _threads.clear(); for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it) { if (n < limit) { _threads.push_back(*it); ++n; } else (*it)->release(); } _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end()); }
直接遍历已创建的线程,空闲线程则比对空闲的时间是否超过设置的时间,如果超过则添加到另一个容器中expiredThreads,非空闲线程都加入activeThreads中。这里面有个设计,可能大伙看的不是很明白,为什么都区分超时和非超时的线程,最后还要将超时的expiredThreads内容插入到,非超时的idleThreads里面呢?其实,是为了防止已激活的和未超时的总数达不到limit值,并且我们是将expiredThreads插入到的是idleThreads的尾部,即使后面遍历的时候,也是先遍历未超时的。
最后遍历idleThreads,当总数小于limit时,我们先插入空闲的线程,总数达到limit后,剩下的空闲线程直接执行release()释放。最后再将已激活的线程加入到_threads尾部。着就是线程池的重新整理。
…
五、调用实例
#pragma once
#include <assert.h>
#include "OutputDebugString.h"
#include "Poco/ThreadPool.h"
#include "Poco/RunnableAdapter.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include "Poco/Event.h"
#include "Poco/Mutex.h"
using Poco::ThreadPool;
using Poco::RunnableAdapter;
using Poco::Thread;
class CThreadPoolTest
{
public:
CThreadPoolTest(): _count(0), _event(false){};
~CThreadPoolTest(){};
void Count()
{
_event.wait();
odprintf("Test run.....");
for (int i = 0; i < 10000; ++i)
{
_mutex.lock();
++_count;
//odprintf("values:%d", _count);
printf("values:%d\n", _count);
_mutex.unlock();
}
};
void Test()
{
ThreadPool pool(2, 3, 3); //!<创建个2线程的线程池,线程池最多拥有3个线程,在基本的2线程外的线程空闲时间超过3s,将被杀死
pool.setStackSize(1);
assert (pool.allocated() == 2); //!<判断当前是不是只有2线程
assert (pool.used() == 0); //!<判断当前是否线程在工作
assert (pool.capacity() == 3); //!<判断最大的线程数
assert (pool.available() == 3);
pool.addCapacity(1);
assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);
RunnableAdapter<CThreadPoolTest> ra(*this, &CThreadPoolTest::Count); pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);
pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
assert (pool.available() == 2);
pool.start(ra); //!<用掉一个
assert (pool.allocated() == 3);
assert (pool.used() == 3);
assert (pool.capacity() == 4);
assert (pool.available() == 1);
pool.start(ra); //!<用掉一个
assert (pool.allocated() == 4);
assert (pool.used() == 4);
assert (pool.capacity() == 4);
assert (pool.available() == 0);
try
{
pool.start(ra); //!<用掉一个,此时没有可用将发生异常
odprintf("thread pool exhausted - must throw exception");
}
catch (Poco::NoThreadAvailableException&)
{
odprintf("NoThreadAvailable exception thrown");
}
catch (...)
{
odprintf("wrong exception thrown");
}
_event.set(); // go!!! Run 开始执行循环
pool.joinAll(); //!<等待所有线程完成
assert (_count == 40000); //!<4个线程在累加,总的值为4W
//!<所有线程全部退出了
assert (pool.allocated() == 4);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);
Thread::sleep(4000);
pool.collect(); //!<开始回收了,最终只会剩下2个线程
assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);
_count = 0;
_event.reset(); //!<Run 进入wait
pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);
pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
assert (pool.available() == 2);
_event.set(); // go!!!
pool.joinAll();
assert (_count == 20000);
assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);
};
private:
Poco::Event _event;
Poco::FastMutex _mutex;
int _count;
};
相关文章推荐
- 基于POCO框架的C++库 源码分析
- 线程池原理分析
- JAVA线程池的分析和使用
- python threadpool 源码分析以及自己封装的简易版线程池
- Dubbo源代码分析八:再说Provider线程池被EXHAUSTED
- 深入浅出多线程(6)分析并行包线程池的设计与实现
- Tomcat源码分析(三)------ 可携带状态的线程池 .
- POCO C++库学习和分析 -- 随机数和数字摘要
- 多线程进阶与源码分析--线程池相关(一)
- 也来分析下Java 线程池(ThreadPoolExecutor)的原理
- [转载] Java线程池框架源码分析
- Java 线程池框架核心代码分析
- JDK源码分析之concurrent包(二) -- 线程池ThreadPoolExecutor
- 从源代码分析Universal-Image-Loader中的线程池
- w3af代码分析,w3af线程池实现,w3af 调适环境配置,w3af win7开发环境
- Android AsyncTask内部线程池异步执行任务机制简要分析
- 线程池的使用及ThreadPoolExecutor的分析(一)
- 线程池ThreadPoolExecutor分析
- ThreadPoolExecutor线程池的分析和使用
- POCO中的Thread类分析