您的位置:首页 > 产品设计 > UI/UE

UE4线程池源码分析和线程池的封装

2016-11-25 18:33 615 查看
在游戏客户端开发过程中难免有些任务是需要另外开一个线程来执行的,这些任务的特点是会有一些阻塞操作,它的执行不能影响到游戏线程的正常执行。比如一款网络游戏的客户端,那么网络的收发最好是使用单独的线程来处理,又比如在线更新数据的操作,也最好是使用单独的线程来操作才比较合理。

我在封装线程池之前想了解下UE4的多线程是怎么实现的,所以最近我看了它的源码,刚开始没有找到一个很好的入口,所以看起来云里雾里,还好通过慢慢的啃,总算是看出点门路了,在此分享一下自己的经验,避免其他同行陷入我之前的困境。

UE4采用的是半同步半异步线程池模型,在本文中我使用源码+应用的方式来分析下UE4的线程池,由于UE4是个较复杂的引擎,所以文中如果有出现错误的地方欢迎指正。

我们首先来查看引擎里给的一个线程池的使用例子,代码在Engine\Source\Runtime\Core\Public\Async\AsyncWork.h

class ExampleAutoDeleteAsyncTask : public FNonAbandonableTask
{
// 最终会调用此友元类的DoThreadedWork()来执行任务
friend class FAutoDeleteAsyncTask<ExampleAutoDeleteAsyncTask>;

int32 ExampleData;

ExampleAutoDeleteAsyncTask(int32 InExampleData)
: ExampleData(InExampleData)
{
}

void DoWork()
{
... do the work here
// 执行真正的任务的地方
}

FORCEINLINE TStatId GetStatId() const
{
RETURN_QUICK_DECLARE_CYCLE_STAT(ExampleAutoDeleteAsyncTask, STATGROUP_ThreadPoolAsyncTasks);
}
};

void Example()
{
// start an example job
(new FAutoDeleteAsyncTask<ExampleAutoDeleteAsyncTask>(5)->StartBackgroundTask();

// do an example job now, on this thread
(new FAutoDeleteAsyncTask<ExampleAutoDeleteAsyncTask>(5)->StartSynchronousTask();
}


观察Example()函数中并没有哪里显示的创建一个线程池,这是因为官方的这个例子里面使用的UE4在启动的时候自己创建的一个全局线程池,还是上面那个文件,有

FAutoDeleteAsyncTask类的声明,我们需要关注的是Start()函数,它是在Example中的两个Start***中调用的

void Start(bool bForceSynchronous)
{
FPlatformMisc::MemoryBarrier();
FQueuedThreadPool* QueuedPool = GThreadPool;
if (bForceSynchronous)
{
QueuedPool = 0;
}
if (QueuedPool)
{
QueuedPool->AddQueuedWork(this);
}
else
{
// we aren't doing async stuff
DoWork();
}
}
我们可以看到如果选择的是异步执行的任务,会把自己塞入到GThreadPool的QueuedPool中,其中GThreadPool就是UE4在启动的时候创建的,继续进入到AddQueuedWork()中

void AddQueuedWork(IQueuedWork* InQueuedWork) override
{
if (TimeToDie)
{
InQueuedWork->Abandon();
return;
}
check(InQueuedWork != nullptr);
FQueuedThread* Thread = nullptr;
// Check to see if a thread is available. Make sure no other threads
// can manipulate the thread pool while we do this.
check(SynchQueue);
FScopeLock sl(SynchQueue);
if (QueuedThreads.Num() > 0)
{
// Cycle through all available threads to make sure that stats are up to date.
int32 Index = 0;
// Grab that thread to use
Thread = QueuedThreads[Index];
// Remove it from the list so no one else grabs it
QueuedThreads.RemoveAt(Index);
}
// Was there a thread ready?
if (Thread != nullptr)
{
// We have a thread, so tell it to do the work
Thread->DoWork(InQueuedWork);
}
else
{
// There were no threads available, queue the work to be done
// as soon as one does become available
QueuedWork.Add(InQueuedWork);
}
}

从QueuedThreads中取出了一个空闲线程,并且调用线程的DoWork(),线程池中如果没有空闲线程就放在任务等待队列中,当有空闲线程时候再执行。下面就来看看DoWork中又做了什么?

//DoWork的参数就是FAutoDeleteAsyncTask的指针(FAutoDeleteAsyncTask中有一个模板类型TTask的成员变量Task,在本例中就是ExampleAutoDeleteAsyncTask)
void DoWork(IQueuedWork* InQueuedWork)
{
DECLARE_SCOPE_CYCLE_COUNTER( TEXT( "FQueuedThread::DoWork" ), STAT_FQueuedThread_DoWork, STATGROUP_ThreadPoolAsyncTasks );

check(QueuedWork == nullptr && "Can't do more than one task at a time");
// Tell the thread the work to be done
QueuedWork = InQueuedWork;
FPlatformMisc::MemoryBarrier();
// Tell the thread to wake up and do its job
DoWorkEvent->Trigger();
}


  一个实现了DoThreadedWork 在DoWork中其实是进行了线程的唤醒操作,线程的唤醒是调用了DoWorkEvent->Trigger(),DoWorkEvent是一个FEvent类,此类其实就是在不同平台封装了线程等待函数的操作,到此,整个流程就断了。聪明的你一定有疑问了,这的流程已经断了,那这个任务在哪里,这就要再换一角度来看源码了,现在我们来看线程池的创建部分,源码在Engine\Source\Runtime\Launch\Private\LaunchEngineLoop.cpp,从PreInit(const
TCHAR* CmdLine)函数看起,函数前面的部分不用关心,直接拉到

if (FPlatformProcess::SupportsMultithreading())
{
GThreadPool	= FQueuedThreadPool::Allocate();
int32 NumThreadsInThreadPool = FPlatformMisc::NumberOfWorkerThreadsToSpawn();

// we are only going to give dedicated servers one pool thread
if (FPlatformProperties::IsServerOnly())
{
NumThreadsInThreadPool = 1;
}
verify(GThreadPool->Create(NumThreadsInThreadPool));

#if WITH_EDITOR
// when we are in the editor we like to do things like build lighting and such
// this thread pool can be used for those purposes
GLargeThreadPool = FQueuedThreadPool::Allocate();
int32 NumThreadsInLargeThreadPool = FPlatformMisc::NumberOfCoresIncludingHyperthreads() - 2;

verify(GLargeThreadPool->Create(NumThreadsInLargeThreadPool));
#endif
}
好,那么GThreadPool出来了,它通过FqueuedThreadPool的Allocate()创建了一个FQueuedThreadPoolBase实例,你可以进入到这个类中,会观察到它有三个数组,分别是全部线程、空闲线程、待执行任务,其中线程数组中的元素FQueuedThread就是操作系统底层线程与UE4交互的桥梁,调用GThreadPool->Create(NumThreadsInThreadPool),在函数中创建了QueuedThread

virtual bool Create(uint32 InNumQueuedThreads,uint32 StackSize = (32 * 1024),EThreadPriority ThreadPriority=TPri_Normal) override
{
省略
// Now create each thread and add it to the array
for (uint32 Count = 0; Count < InNumQueuedThreads && bWasSuccessful == true; Count++)
{
// Create a new queued thread
FQueuedThread* pThread = new FQueuedThread();
// Now create the thread and add it if ok
if (pThread->Create(this,StackSize,ThreadPriority) == true)
{
QueuedThreads.Add(pThread);
AllThreads.Add(pThread);
}
else
{
// Failed to fully create so clean up
bWasSuccessful = false;
delete pThread;
}
}
// Destroy any created threads if the full set was not successful
if (bWasSuccessful == false)
{
Destroy();
}
return bWasSuccessful;
}
首先调用了线程的Create函数,然后将创建的FQueuedThread对象放在了线程池的线程数组中,在Create函数里继续进行后面的创建与赋值

virtual bool Create(class FQueuedThreadPool* InPool,uint32 InStackSize = 0,EThreadPriority ThreadPriority=TPri_Normal)
{
static int32 PoolThreadIndex = 0;
const FString PoolThreadName = FString::Printf( TEXT( "PoolThread %d" ), PoolThreadIndex );
PoolThreadIndex++;

OwningThreadPool = InPool;
DoWorkEvent = FPlatformProcess::GetSynchEventFromPool();
Thread = FRunnableThread::Create(this, *PoolThreadName, InStackSize, ThreadPriority, FPlatformAffinity::GetPoolThreadMask());
check(Thread);
return true;
}
这时候又增加了一个类FRunnableThread,而且关注第一个参数,传递了一个this进去,这个this对于线程的执行来说是至关重要的,后面会说明。FRunnableThread就是真正帮助我们创建操作系统线程的类了,跟进去FRunnableThread的Create函数

FRunnableThread* FRunnableThread::Create(
class FRunnable* InRunnable,
const TCHAR* ThreadName,
uint32 InStackSize,
EThreadPriority InThreadPri,
uint64 InThreadAffinityMask)
{
FRunnableThread* NewThread = nullptr;
if (FPlatformProcess::SupportsMultithreading())
{
check(InRunnable);
// Create a new thread object
NewThread = FPlatformProcess::CreateRunnableThread();
if (NewThread)
{
// Call the thread's create method
if (NewThread->CreateInternal(InRunnable,ThreadName,InStackSize,InThreadPri,InThreadAffinityMask) == false)
{
// We failed to start the thread correctly so clean up
delete NewThread;
NewThread = nullptr;
}
}
}
省略
}


FPlatformProcess::CreateRunnableThread()创建了一个平台相关的并且继承于FRunnableThread的派生类,然后调用了此派生类的CreateInternal函数
virtual bool CreateInternal( FRunnable* InRunnable, const TCHAR* InThreadName,
uint32 InStackSize = 0,
EThreadPriority InThreadPri = TPri_Normal, uint64 InThreadAffinityMask = 0 ) override
{
省略

// Create the new thread
Thread = CreateThread(NULL,InStackSize,_ThreadProc,this,STACK_SIZE_PARAM_IS_A_RESERVATION,(::DWORD *)&ThreadID);

省略
}

到了这里调用的CreateThread就是windows下的创建线程接口了,入口函数是_ThreadProc,跟进去进步发现,它调用了Runnable的Run(),这个Runnable是在

调用FRunnableThread::Create的时候传递进来的,实际上Runnable就是FQueueThread,那么我们现在只需要关注FQueuedThread的Run函数都干了什么

virtual uint32 Run() override
{
while (!TimeToDie)
{
// This will force sending the stats packet from the previous frame.
SET_DWORD_STAT( STAT_ThreadPoolDummyCounter, 0 );
// We need to wait for shorter amount of time
bool bContinueWaiting = true;
while( bContinueWaiting )
{
DECLARE_SCOPE_CYCLE_COUNTER( TEXT( "FQueuedThread::Run.WaitForWork" ), STAT_FQueuedThread_Run_WaitForWork, STATGROUP_ThreadPoolAsyncTasks );
// Wait for some work to do
bContinueWaiting = !DoWorkEvent->Wait( 10 );
}

IQueuedWork* LocalQueuedWork = QueuedWork;
QueuedWork = nullptr;
FPlatformMisc::MemoryBarrier();
check(LocalQueuedWork || TimeToDie); // well you woke me up, where is the job or termination request?
while (LocalQueuedWork)
{
// Tell the object to do the work
LocalQueuedWork->DoThreadedWork();
// Let the object cleanup before we remove our ref to it
LocalQueuedWork = OwningThreadPool->ReturnToPoolOrGetNextJob(this);
}
}
return 0;
}
这时候应该都明白了吧?线程在启动了以后如果没有任务,会调用bContinueWaiting = !DoWorkEvent->Wait( 10 )进入阻塞等待的状态,当外部调用了DoWorkEvent->Trigger()以后会唤醒当前线程,然后执行QueuedWork的DoThreadedWork(),QueuedWork在本例中就是一个ExampleAutoDeleteAsyncTask,它是FAutoDeleteAsyncTask的一个友元类,在FAutoDeleteAsyncTask里的DoThreadedWork又回调用回ExampleAutoDeleteAsyncTask的DoWork函数,DoWork
中是执行真正的任务的地方,执行完了任务,会继续进行一些检查,如果正常,则现在继续wait,等待任务的到来。

UE4的线程池执行过程在这里就讲述完毕了,它的线程池还是实现的有一点绕,需要大家多看源码,去琢磨,可能我这里有说不透的地方,大家就自行脑补源码吧。

现在如果直接使用它的线程池,那么我们想要执行的任务必须将FAutoDeleteAsyncTask作为自己的友元类引入,然后实现DoWork方法,那我对线程池的使用想法是我只需要给线程池一个函数和一个类的实例指针,我的任务类不需要跟UE4的类有这种友元关系,并且我希望线程池是一个单例的存在,所以我自己封装了一个线程池,其核心做法就是在执行任务的地方使用了c++11的std::function,在此我就不详细描述了,可以直接跳到这里查看源码:http://git.oschina.net/amuer/UE4ThreadPool
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  UE4 Unreal Engine