您的位置:首页 > 其它

POCO线程池分析

2017-12-01 11:14 253 查看
一、接口

ThreadPool(int minCapacity = 2,
int maxCapacity = 16,
int idleTime = 60,
int stackSize = POCO_THREAD_STACK_SIZE);
/// Creates a thread pool with minCapacity threads.
/// If required, up to maxCapacity threads are created
/// a NoThreadAvailableException exception is thrown.
/// If a thread is running idle for more than idleTime seconds,
/// and more than minCapacity threads are running, the thread
/// is killed. Threads are created with given stack size.


创建个minCapacity线程的线程池,线程池最多拥有maxCapacity个线程,在基本的minCapacity线程外的线程空闲时间超过idleTimes,将自动被杀死。

二、创建过程

ThreadPool::ThreadPool(int minCapacity,
int maxCapacity,
int idleTime,
int stackSize):
_minCapacity(minCapacity),
_maxCapacity(maxCapacity),
_idleTime(idleTime),
_serial(0),
_age(0),
_stackSize(stackSize)
{
poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);

for (int i = 0; i < _minCapacity; i++)
{
PooledThread* pThread = createThread();
_threads.push_back(pThread);
pThread->start();
}
}


先创建minCapacity个PooledThread对象并保存到对象容器中,并同时执行start()函数:

void PooledThread::start()
{
_thread.start(*this);
_started.wait();
}


_thread是Poco::Thread对象,具体构造函数:

Thread::Thread():
_id(uniqueId()),
_name(makeName()),
_pTLS(0)
{
}


Thread类继承于Poco里面ThreadImpl类,而这个ThreadImpl类是对线程创建等相关函数的封装。看看Thread::strart()接口:

void Thread::start(Runnable& target)
{
startImpl(target);
}


这里的startImpl调用的就是:

void ThreadImpl::startImpl(Runnable& target)
{
if (isRunningImpl())
throw SystemException("thread already running");

_pRunnableTarget = ⌖

createImpl(runnableEntry, this);
}


我们最后看下,createImpl接口:

void ThreadImpl::createImpl(Entry ent, void* pData)
{
#if defined(_DLL)
_thread = CreateThread(NULL, _stackSize, ent, pData, 0, &_threadId);
#else
unsigned threadId;
_thread = (HANDLE) _beginthreadex(NULL, _stackSize, ent, this, 0, &threadId);
_threadId = static_cast<DWORD>(threadId);
#endif
if (!_thread)
throw SystemException("cannot create thread");
if (_prio != PRIO_NORMAL_IMPL && !SetThreadPriority(_thread, _prio))
throw SystemException("cannot set thread priority");
}


经过一连串的调用,我们终于看到最底层的创建线程方法,和我们普通写应用层线程创建一致,采用_beginthreadex方法。

我们可以很轻易的创建一个线程池:
ThreadPool pool(2, 3, 3);


三、它是如何动态管理所创建的线程?

我们先使用掉一个线程,调用方法:

RunnableAdapter<CThreadPoolTest> ra(*this, &CThreadPoolTest::Count);
pool.start(ra); //!<用掉一个


我们使用线程方法,是采用ThreadPool::start()方法:

void ThreadPool::start(Runnable& target)
{
getThread()->start(Thread::PRIO_NORMAL, target);
}


关键的函数出现了getThread(),获取创建好的线程,执行start()方法。

PooledThread* ThreadPool::getThread()
{
FastMutex::ScopedLock lock(_mutex);

if (++_age == 32)
housekeep();

PooledThread* pThread = 0;
for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)
{
if ((*it)->idle()) pThread = *it;
}
if (!pThread)
{
if (_threads.size() < _maxCapacity)
{
pThread = createThread();
try
{
pThread->start();
_threads.push_back(pThread);
}
catch (...)
{
delete pThread;
throw;
}
}
else throw NoThreadAvailableException();
}
pThread->activate();
return pThread;
}


这个函数很简单,首先遍历之前创建并存储到容器的线程,判断哪个线程是处理空闲状态,如果有则直接取出对象赋值。如果不存在空闲的线程,则判断当前已创建线程的总数量,是否超过预设的最大值,如果没有则创建新的线程。

最后直接激活
pThread->activate();
就是将空闲状态置为false

void PooledThread::activate()
{
FastMutex::ScopedLock lock(_mutex);

poco_assert (_idle);
_idle = false;
_targetCompleted.reset();
}


整体分析下来,Poco采用了更加严谨的逻辑处理线程池的分配。那如何增大一开始设置的最大线程数呢,我们采用
pool.addCapacity(1);
的方法,将最大线程数+1。

void ThreadPool::addCapacity(int n)
{
FastMutex::ScopedLock lock(_mutex);

poco_assert (_maxCapacity + n >= _minCapacity);
_maxCapacity += n;
housekeep();
}


在我们增加最大线程数的同时,还执行了housekeep()函数:重新整理已有线程池,这边就涉及到自动清除空闲太久的线程方法:

void ThreadPool::housekeep()
{
_age = 0;
if (_threads.size() <= _minCapacity)
return;

ThreadVec idleThreads;
ThreadVec expiredThreads;
ThreadVec activeThreads;
idleThreads.reserve(_threads.size());
activeThreads.reserve(_threads.size());

for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
{
if ((*it)->idle())
{
if ((*it)->idleTime() < _idleTime)
idleThreads.push_back(*it);
else
expiredThreads.push_back(*it);
}
else activeThreads.push_back(*it);
}
int n = (int) activeThreads.size();
int limit = (int) idleThreads.size() + n;
if (limit < _minCapacity) limit = _minCapacity;
idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
_threads.clear();
for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it)
{
if (n < limit)
{
_threads.push_back(*it);
++n;
}
else (*it)->release();
}
_threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end());
}


直接遍历已创建的线程,空闲线程则比对空闲的时间是否超过设置的时间,如果超过则添加到另一个容器中expiredThreads,非空闲线程都加入activeThreads中。这里面有个设计,可能大伙看的不是很明白,为什么都区分超时和非超时的线程,最后还要将超时的expiredThreads内容插入到,非超时的idleThreads里面呢?其实,是为了防止已激活的和未超时的总数达不到limit值,并且我们是将expiredThreads插入到的是idleThreads的尾部,即使后面遍历的时候,也是先遍历未超时的。

最后遍历idleThreads,当总数小于limit时,我们先插入空闲的线程,总数达到limit后,剩下的空闲线程直接执行release()释放。最后再将已激活的线程加入到_threads尾部。着就是线程池的重新整理。



五、调用实例

#pragma once
#include <assert.h>
#include "OutputDebugString.h"
#include "Poco/ThreadPool.h"
#include "Poco/RunnableAdapter.h"
#include "Poco/Exception.h"
#include "Poco/Thread.h"
#include "Poco/Event.h"
#include "Poco/Mutex.h"

using Poco::ThreadPool;
using Poco::RunnableAdapter;
using Poco::Thread;

class CThreadPoolTest
{
public:
CThreadPoolTest(): _count(0), _event(false){};
~CThreadPoolTest(){};

void Count()
{
_event.wait();
odprintf("Test run.....");
for (int i = 0; i < 10000; ++i)
{
_mutex.lock();
++_count;
//odprintf("values:%d", _count);
printf("values:%d\n", _count);
_mutex.unlock();
}
};

void Test()
{
ThreadPool pool(2, 3, 3); //!<创建个2线程的线程池,线程池最多拥有3个线程,在基本的2线程外的线程空闲时间超过3s,将被杀死
pool.setStackSize(1);

assert (pool.allocated() == 2); //!<判断当前是不是只有2线程
assert (pool.used() == 0); //!<判断当前是否线程在工作
assert (pool.capacity() == 3); //!<判断最大的线程数
assert (pool.available() == 3);
pool.addCapacity(1);
assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);

RunnableAdapter<CThreadPoolTest> ra(*this, &CThreadPoolTest::Count); pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);

pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
assert (pool.available() == 2);

pool.start(ra); //!<用掉一个
assert (pool.allocated() == 3);
assert (pool.used() == 3);
assert (pool.capacity() == 4);
assert (pool.available() == 1);

pool.start(ra); //!<用掉一个
assert (pool.allocated() == 4);
assert (pool.used() == 4);
assert (pool.capacity() == 4);
assert (pool.available() == 0);

try
{
pool.start(ra); //!<用掉一个,此时没有可用将发生异常
odprintf("thread pool exhausted - must throw exception");
}
catch (Poco::NoThreadAvailableException&)
{
odprintf("NoThreadAvailable exception thrown");
}
catch (...)
{
odprintf("wrong exception thrown");
}

_event.set(); // go!!! Run 开始执行循环
pool.joinAll(); //!<等待所有线程完成

assert (_count == 40000); //!<4个线程在累加,总的值为4W

//!<所有线程全部退出了
assert (pool.allocated() == 4);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);

Thread::sleep(4000);

pool.collect(); //!<开始回收了,最终只会剩下2个线程
assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);

_count = 0;
_event.reset(); //!<Run 进入wait
pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 1);
assert (pool.capacity() == 4);
assert (pool.available() == 3);

pool.start(ra); //!<用掉一个
assert (pool.allocated() == 2);
assert (pool.used() == 2);
assert (pool.capacity() == 4);
assert (pool.available() == 2);
_event.set(); // go!!!
pool.joinAll();

assert (_count == 20000);

assert (pool.allocated() == 2);
assert (pool.used() == 0);
assert (pool.capacity() == 4);
assert (pool.available() == 4);
};

private:
Poco::Event _event;
Poco::FastMutex _mutex;
int _count;
};
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  线程池 poco