Theron, a lightweight C++ concurrency library, 源码分析(二)
2012-12-03 22:56
281 查看
Framework class
Framework类在Theron应该可以看作是一个中枢神经系统。他管理着它内部各个actor之间的交互。我们知道,Windows是基于消息的系统。因此,它定义了消息队列,以及处理消息的基本流程:
while (GetMessage(&msg, NULL, 0, 0))
{
if (!TranslateAccelerator(msg.hwnd, hAccelTable, &msg))
{
TranslateMessage(&msg);
DispatchMessage(&msg);
}
}
Actor Model同样以消息为基础,那么也应该存在一个消息队列和消息派发的过程。
这个过程就是在Framework中定义的。我们来看Framework::Initialize()(略去了部分非关键代码):
void Framework::Initialize()
{
mRunning = true;
mManagerThread.Start(ManagerThreadEntryPoint, this);
uint32_t backoff(0);
while (mThreadCount.Load() < mTargetThreadCount.Load())
{
Detail::Utils::Backoff(backoff);
}
mIndex = Detail::StaticDirectory<Framework>::Register(this);
if (mName.IsNull())
{
mName = Detail::NameGenerator::Generate(mIndex);
}
}
有3个亮点:
mRunning = true和ManagerThreadEntryPoint
Utils::Backoff
mIndex和mName
mManagerThread是一个Thread类型,那么我们只有关注ManagerThreadEntryPoint就可以了。其实ManagerThreadEntryPoint很简单,不过它引入了另外一个函数,ManagerThreadProc
Backoff是一个和Sleep(0)类似的东西,不过和Sleep有一点细微的差别。Backoff调用了YieldProcessor。大家可以自己google下YieldProcessor vs Sleep(0)。
mIndex是给Framework分配了一个全局的索引值。
Framework::ManagerThreadProc
这个函数里,我们首先可以看到一个while循环:while (mRunning){ …}
可以很明确地肯定,在Framework还没有销毁时,它就是个死循环。这基本上就对等了Windows里的那个while。
while里面的循环稍微有点复杂了。我们需要分块一点一点来剥离。
while (mThreadCount.Load() < mTargetThreadCount.Load())
{
WorkerThreadStore* store = new WorkerThreadStore(…);
ThreadContext* threadContext = new ThreadContext(store);
ThreadPool::CreateThread(threadContext);
ThreadPool::StartThread(threadContext, &mWorkQueue, mNodeMask, mProcessorMask);
mThreadContexts.Insert(threadContext);
}
首先,这个函数是在Initialize里被调用的,因此必定要做一些初始化的操作。初始化操作的一个重头戏就是创建出足够多的线程。
在前面已经说了,线程被启动后,就会一直运行。因此初始化操作会创建16个(默认mTargetThreadCount是16)不停运行的线程。这些线程都是从mWorkQueue(SafeThreadQueue<Mailbox>类型)中去取相应的Mailbox,然后把关联的消息地送给actor执行。
所以,消息队列就是mWorkQueue。
当然这个函数虽然是在Initialize里被调用起来的,但是作为消息泵,它还有其他的职责。从函数的实现细节来看,整个过程可以用下面几条来总结:
mManagerThread还在运行时:
如果有空闲线程,那么启用空闲线程来执行任务
若可用线程数还没有达到上限,创建出足够多的线程
若线程数超过上限(线程数的上限可以动态调整),停止一些线程
mManagerThread停止运行时:
如果mThreadContext非空,则销毁ThreadContext管理的线程对象和掌握的资源
到这里,基本上把整个消息机制讲解完毕了。接下来我们要说一说Framework和Actor之间的关系。
Framework and Actor
Theron里的Framework有点像设计模式中的中介者模式。它管理着Actor的相关信息。怎么说呢,事实上和Actor对象关联的Mailbox和Address信息都是从Framework这里生产的。Framework::RegisterActor完成了这个工作。void Framework::RegisterActor(Actor* actor, const char* name)
{
const uint32_t mailboxIndex(mMailboxes.Allocate());
Mailbox& mailbox(mMailboxes.GetEntry(mailboxIndex));
String mailboxName(name);
mailbox.RegisterActor(actor);
const Index index(mIndex, mailboxIndex);
const Address mailboxAddress(mailboxName, index);
actor->mAddress = mailboxAddress;
}
所以,Framework像邮局,他知道他管辖的区域中有那些邮箱地址,而且也只有它才知道。不同的邮局管辖不同的区域,因此跨域通信必然是两个Framework之间协同完成的。
在上面这段代码里有一个mIndex变量,这个是Framework的成员变量,用来在全局表征一个Framework的索引值。这个指会在后面在提到。
到这里,我们基本可以把Framework和Actor之间的关系理个清楚了。接下来要说的就是actor和actor之间是如何发送消息的。
为了能够把问题简单化,我们这里只讨论在同一个Framework中的不同actor之间通信的情况。这个过程要涉及2个要点:
Framework::Send函数
ProcessorContext结构
Framework::Send
Send函数的原型是:Framework::Send(const ValueType &value, const Address &from, const Address &address),是一个模板函数。这个函数的作用就是把消息投递到对应地址的mailbox中。Send函数串联了不同actor之间的交互。整个消息的发送过程其实是由MessageSender::Send(EndPoint* endPoint, ProcessorContext* processorContext, const uint32_t localFrameworkIndex, IMessage* message, const Address &address)来负责完成的。
这个函数有很多参数,我们已经假定了只考虑同一个Framework中不同actors之间的通信。在这种情况下,我们可以不用考虑endPoint。另外,我们看到只有一个Address类型的参数了,要记得,发送者的address在message里。所以,这里的address是接收消息的actor的address。因此,Framework::Send中的value还不能算消息,只能算消息内容。因为前面说了,消息是要包含发送者的地址的。所以,在Send函数里,我们会看到它调用了MessageCreator::Create来创建一个消息结构,将value和发送消息的actor地址绑定在了一起。
接下来要的是processorContext和localFrameworkIndex两个变量。
ProcessorContext and Index
ProcessorContext事实上就是保存了某个Framework的相关信息。在发送消息的函数里(MessageSender::Send)附带上这个参数,可以明确当前的消息是从哪个Framework中发出来的。localFrameworkIndex是Index类型的。在MessageSender::Send中就是指接收消息的Framework的Index。
从RegisterActor函数里我们也可以看到,每一个Address对象都会关联一个Index,而这个Index又是和Framework::mIndex有一定的关联关系的。因此在MessageSender::Send中,我们可以通过address和localFrameworkIndex来判断当前的消息是在同一Framework下的actor之间相互沟通还是在不同Framework之间来回。
bool MessageSender::DeliverByIndex(ProcessorContext* processorContext,
const uint32_t localFrameworkIndex,
IMessage* message,
const Address &address)
{
const Index index(address.mIndex);
// Which framework is the addressed entity in?
const uint32_t targetFrameworkIndex(index.mComponents.mFramework);
if (targetFrameworkIndex == localFrameworkIndex)
{
// Is the message addressed to an actor in this framework?
delivered = DeliverToActorInThisFramework(processorContext,
message,
address);
}
else
{
delivered = DeliverToLocalMailbox(message, address);
}
}
DeliverToActorInThisFramework就不细说了,可以自己看。大致的逻辑就是从address中找出关联的mailbox对象,然后将message push到该mailbox中。如果当前的mailbox在填充消息前是空的,那么这个mailbox肯定不在该ProcessorContext的workQueue中。因此在放入消息后,要将它加入的workQueue中,让线程池在处理workQueue时,能处理到这条消息。
Actor class
Actor类其实是很简单的。在分析代码前,我们先把之前提到的和actor相关的概念再深化下。一个actor在响应消息的同时可以:
发送有限量的消息给其他actor
创建有线量的其他actor
指定下一次收到消息时具体响应动作/行为
也就是说在actor的消息处理函数里,actor可以给其他actor发送消息,可以创建新的actor,可以设置当前actor的一些属性。
所以,Actor类必定要有一个发送消息的函数:Actor::Send。在开始分析Send函数之前,我们先看一眼Actor类的成员函数。
Address mAddress;
Framework* mFramework;
HandlerCollection mMessageHandlers;
DefaultHandlerCollection mDefaultHandlers;
ProcessorContext* mProcessorContext;
Address和Framework暂时没啥好说的。mMessageHandlers就是当前Actor注册了的消息处理函数集合。
那继续我们的成员函数Send。
Actor::Send(const ValueType& value, const Address& address) const
Send函数就是把消息(value)发送到指定的address那里去。和Windows里的PostMessage一样,是异步的。事实上,这个函数的内部实现也是调用MessageSender::Send来完成的。这种情况下,再回头来看ProcessorContext和localFrameworkIndex你应该会有更多的体会。对这个函数的细致分析,就不多补充了。
基本到这里,该说的都说完了。
相关文章推荐
- Theron, a lightweight C++ concurrency library, 源码分析(一)
- Theron (C++ concurrency library) 读后感
- 《C++ Concurrency in Action》笔记9 std::unique_lock源码分析
- 从零开始学C++之STL(九):函数适配器bind2nd 、mem_fun_ref 源码分析、函数适配器应用举例
- C++ 源码分析网址
- C++ - memset的效率和源码分析
- Android源码分析实战之JNI so库加载System.loadLibrary流程分析
- MySQL源码分析及核心内幕之3 -- 源码调试之Eclipse C/C++
- OpenCV人脸识别C++源码分析
- C++:浅谈c++资源管理以及对[STL]智能指针auto_ptr源码分析,左值与右值
- C++STL内存配置的设计思想与关键源码分析
- TFS 客户端源码分析(c++)
- TreeFrog (C++ Web Framework)开发之源码分析
- spserver - SPServer is a high concurrency server framework library written on C++ - Google Project Hosting
- C++ STL源码解析:空间配置器allocator分析
- Spark SQL Catalyst源码分析之TreeNode Library
- TreeFrog (C++ Web Framework)开发之源码分析
- IDEA查看源码时提示:Library source does not match the bytecode for class的问题分析
- [置顶] 从零开始学C++之boost库(一):详解 boost 库智能指针(scoped_ptr<T> 、shared_ptr<T> 、weak_ptr<T> 源码分析)
- 短小精悍的C++单元测试框架CppUnitLite源码分析