您的位置:首页 > 编程语言 > C语言/C++

Theron, a lightweight C++ concurrency library, 源码分析(二)

2012-12-03 22:56 281 查看

Framework class

Framework类在Theron应该可以看作是一个中枢神经系统。他管理着它内部各个actor之间的交互。

我们知道,Windows是基于消息的系统。因此,它定义了消息队列,以及处理消息的基本流程:

while (GetMessage(&msg, NULL, 0, 0))


{


if (!TranslateAccelerator(msg.hwnd, hAccelTable, &msg))


{


TranslateMessage(&msg);


DispatchMessage(&msg);


}


}


Actor Model同样以消息为基础,那么也应该存在一个消息队列和消息派发的过程。

这个过程就是在Framework中定义的。我们来看Framework::Initialize()(略去了部分非关键代码):

void Framework::Initialize()


{


mRunning = true;


mManagerThread.Start(ManagerThreadEntryPoint, this);




 uint32_t backoff(0);


while (mThreadCount.Load() < mTargetThreadCount.Load())


{


 Detail::Utils::Backoff(backoff);


}




 mIndex = Detail::StaticDirectory<Framework>::Register(this);




 if (mName.IsNull())


{


 mName = Detail::NameGenerator::Generate(mIndex);


}


}


有3个亮点:

mRunning = true和ManagerThreadEntryPoint

Utils::Backoff

mIndex和mName

mManagerThread是一个Thread类型,那么我们只有关注ManagerThreadEntryPoint就可以了。其实ManagerThreadEntryPoint很简单,不过它引入了另外一个函数,ManagerThreadProc

Backoff是一个和Sleep(0)类似的东西,不过和Sleep有一点细微的差别。Backoff调用了YieldProcessor。大家可以自己google下YieldProcessor vs Sleep(0)。

mIndex是给Framework分配了一个全局的索引值。

Framework::ManagerThreadProc

这个函数里,我们首先可以看到一个while循环:

while (mRunning){ …}

可以很明确地肯定,在Framework还没有销毁时,它就是个死循环。这基本上就对等了Windows里的那个while。

while里面的循环稍微有点复杂了。我们需要分块一点一点来剥离。

while (mThreadCount.Load() < mTargetThreadCount.Load())


{


 WorkerThreadStore* store = new WorkerThreadStore(…);


 ThreadContext* threadContext = new ThreadContext(store);




 ThreadPool::CreateThread(threadContext);


 ThreadPool::StartThread(threadContext, &mWorkQueue, mNodeMask, mProcessorMask);




 mThreadContexts.Insert(threadContext);


}


首先,这个函数是在Initialize里被调用的,因此必定要做一些初始化的操作。初始化操作的一个重头戏就是创建出足够多的线程。

在前面已经说了,线程被启动后,就会一直运行。因此初始化操作会创建16个(默认mTargetThreadCount是16)不停运行的线程。这些线程都是从mWorkQueue(SafeThreadQueue<Mailbox>类型)中去取相应的Mailbox,然后把关联的消息地送给actor执行。

所以,消息队列就是mWorkQueue。

当然这个函数虽然是在Initialize里被调用起来的,但是作为消息泵,它还有其他的职责。从函数的实现细节来看,整个过程可以用下面几条来总结:

mManagerThread还在运行时:

如果有空闲线程,那么启用空闲线程来执行任务

若可用线程数还没有达到上限,创建出足够多的线程

若线程数超过上限(线程数的上限可以动态调整),停止一些线程

mManagerThread停止运行时:

如果mThreadContext非空,则销毁ThreadContext管理的线程对象和掌握的资源

到这里,基本上把整个消息机制讲解完毕了。接下来我们要说一说Framework和Actor之间的关系。

Framework and Actor

Theron里的Framework有点像设计模式中的中介者模式。它管理着Actor的相关信息。怎么说呢,事实上和Actor对象关联的Mailbox和Address信息都是从Framework这里生产的。Framework::RegisterActor完成了这个工作。

void Framework::RegisterActor(Actor* actor, const char* name)


{


 const uint32_t mailboxIndex(mMailboxes.Allocate());


 Mailbox& mailbox(mMailboxes.GetEntry(mailboxIndex));




 String mailboxName(name);


 mailbox.RegisterActor(actor);




 const Index index(mIndex, mailboxIndex);


 const Address mailboxAddress(mailboxName, index);




 actor->mAddress = mailboxAddress;


}


所以,Framework像邮局,他知道他管辖的区域中有那些邮箱地址,而且也只有它才知道。不同的邮局管辖不同的区域,因此跨域通信必然是两个Framework之间协同完成的。

在上面这段代码里有一个mIndex变量,这个是Framework的成员变量,用来在全局表征一个Framework的索引值。这个指会在后面在提到。

到这里,我们基本可以把Framework和Actor之间的关系理个清楚了。接下来要说的就是actor和actor之间是如何发送消息的。

为了能够把问题简单化,我们这里只讨论在同一个Framework中的不同actor之间通信的情况。这个过程要涉及2个要点:

Framework::Send函数

ProcessorContext结构

Framework::Send

Send函数的原型是:Framework::Send(const ValueType &value, const Address &from, const Address &address),是一个模板函数。这个函数的作用就是把消息投递到对应地址的mailbox中。

Send函数串联了不同actor之间的交互。整个消息的发送过程其实是由MessageSender::Send(EndPoint* endPoint, ProcessorContext* processorContext, const uint32_t localFrameworkIndex, IMessage* message, const Address &address)来负责完成的。

这个函数有很多参数,我们已经假定了只考虑同一个Framework中不同actors之间的通信。在这种情况下,我们可以不用考虑endPoint。另外,我们看到只有一个Address类型的参数了,要记得,发送者的address在message里。所以,这里的address是接收消息的actor的address。因此,Framework::Send中的value还不能算消息,只能算消息内容。因为前面说了,消息是要包含发送者的地址的。所以,在Send函数里,我们会看到它调用了MessageCreator::Create来创建一个消息结构,将value和发送消息的actor地址绑定在了一起。

接下来要的是processorContext和localFrameworkIndex两个变量。

ProcessorContext and Index

ProcessorContext事实上就是保存了某个Framework的相关信息。在发送消息的函数里(MessageSender::Send)附带上这个参数,可以明确当前的消息是从哪个Framework中发出来的。

localFrameworkIndex是Index类型的。在MessageSender::Send中就是指接收消息的Framework的Index。

从RegisterActor函数里我们也可以看到,每一个Address对象都会关联一个Index,而这个Index又是和Framework::mIndex有一定的关联关系的。因此在MessageSender::Send中,我们可以通过address和localFrameworkIndex来判断当前的消息是在同一Framework下的actor之间相互沟通还是在不同Framework之间来回。

bool MessageSender::DeliverByIndex(ProcessorContext* processorContext,


const uint32_t localFrameworkIndex,


IMessage* message,


const Address &address)


{


 const Index index(address.mIndex);




 // Which framework is the addressed entity in?


 const uint32_t targetFrameworkIndex(index.mComponents.mFramework);


 if (targetFrameworkIndex == localFrameworkIndex)


{


 // Is the message addressed to an actor in this framework?


 delivered = DeliverToActorInThisFramework(processorContext,


 message,


 address);


}


 else


{


 delivered = DeliverToLocalMailbox(message, address);


}


}


DeliverToActorInThisFramework就不细说了,可以自己看。大致的逻辑就是从address中找出关联的mailbox对象,然后将message push到该mailbox中。如果当前的mailbox在填充消息前是空的,那么这个mailbox肯定不在该ProcessorContext的workQueue中。因此在放入消息后,要将它加入的workQueue中,让线程池在处理workQueue时,能处理到这条消息。

Actor class

Actor类其实是很简单的。在分析代码前,我们先把之前提到的和actor相关的概念再深化下。

一个actor在响应消息的同时可以:

发送有限量的消息给其他actor

创建有线量的其他actor

指定下一次收到消息时具体响应动作/行为

也就是说在actor的消息处理函数里,actor可以给其他actor发送消息,可以创建新的actor,可以设置当前actor的一些属性。

所以,Actor类必定要有一个发送消息的函数:Actor::Send。在开始分析Send函数之前,我们先看一眼Actor类的成员函数。

Address mAddress;

Framework* mFramework;

HandlerCollection mMessageHandlers;

DefaultHandlerCollection mDefaultHandlers;

ProcessorContext* mProcessorContext;

Address和Framework暂时没啥好说的。mMessageHandlers就是当前Actor注册了的消息处理函数集合。

那继续我们的成员函数Send。

Actor::Send(const ValueType& value, const Address& address) const

Send函数就是把消息(value)发送到指定的address那里去。和Windows里的PostMessage一样,是异步的。

事实上,这个函数的内部实现也是调用MessageSender::Send来完成的。这种情况下,再回头来看ProcessorContext和localFrameworkIndex你应该会有更多的体会。对这个函数的细致分析,就不多补充了。

基本到这里,该说的都说完了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: