您的位置:首页 > 其它

Chromium线程模型、消息循环

2015-03-06 09:03 176 查看


多线程的麻烦

多线程编程是一件麻烦的事,相信很多人深有体会。执行顺序的不确定性,资源的并发访问一直困扰着众多程序员。解决多线程编程问题的方法分为两类:一是对并发访问的资源直接加锁;二是避免并发访问资源;Chromium采用第二种思想来设计多线程模型,通过在线程之间传递消息来实现跨进程通讯。


设计原则

Chromium希望尽量保持UI处于响应状态。为此遵循如下设计原则:

1 不在UI线程上执行任何阻塞I/O操作,以及其它耗时操作。

2 少用锁和线程安全对象

3 避免阻塞I/O线程

4 线程之间不要互相阻塞

5 在数据准备好更新到共享缓冲时才用锁(在准备数据期间不要用锁)

Chromium在程序启动时创建了很多预定用途的线程,我们要尽量使用已有线程,不要创建新的线程。为了实现非阻塞代码,Chromium的很多API都是异步的。

MessageLoop

Chromium将消息循环抽象为MessageLoop类,每个线程都有(只有)一个MessageLoop实例。MessageLoop提供了PostTask系列方法允许向特定线程添加任务。MessageLoop会安装“先进先出”的顺序执行任务,直到收到MessageLoop:Quit消息,消息循环才会退出。


MessageLoop类型

每个MessageLoop都有一个类型,所有类型的MessageLoop都能够处理tasks和timers,类型指定了MessageLoop除了处理tasks和timers外,还可以处理的消息类型。
TYPE_DEFAULT:默认的消息循环,只能处理异步任务和定时器timers
TYPE_UI:除了可以处理异步任何和定时器timers,还能够处理系统UI事件。主线程使用的就是该类型的MessageLoop。
TYPE_IO:支持处理异步I/O事件,Chromium创建的所有处理IPC消息的IO线程创建的MessageLoop都是这种类型。
TYPE_JAVA:只有Android支持,后端实现是Java层的消息处理器。它的行为和TYPE_UI一样,只是创建时不使用主线程上的MessagePump工厂方法。
TYPE_CUSTOM:构建时提供MessagePump


MessagePump::Delegate

Delegate定义了一组接口,由MessageLoop实现,MessagePump通过这组接口来触发MessageLoop执行特定的任务。


MessagePump

MessagePump用来从系统获取消息回调,触发MessageLoop执行Task类。不同类型的MessageLoop都有一个相对应的MessagePump,MessagePump的实现与平台相关。


MessageLoop, MessagePump, Delegate关系

MessageLoop具体的实现和平台相关,即使在相同的平台上,由于使用的事件处理库不同,其实现方式也可能不同。Chromium将平台相关的实现封装在MessagePump中。MessagePump的具体实现提供了平台相关的异步事件处理,而MessageLoop提供了轮询和调度异步任务的基本框架,两者通过MessagePump::Delegate抽象接口关联起来。主要类及它们的关系:



线程:创建到运行

下面以Browser Threads为例,来看看线程从创建到执行的过程。
内核初始化时调用BrowserMainLoop::CreateStartupTasks()创建一些启动任务,其中包括“创建Browser线程”的任务,然后调用startup_task_runner_执行启动任务:

 
StartupTask create_threads =
base::Bind(&BrowserMainLoop::CreateThreads, base::Unretained(this));
startup_task_runner_->AddTask(create_threads);


执行任务时,调用create_threads(),最终会调用系统API创建一个线程,并传入Thread.ThreadMain作为入口函数。线程创建后,就开始执行ThreadMain,该函数首先创建一个MessageLoop,然后开始执行消息循环,直到收到Quit消息。




向线程添加任务

任务最终通过调用MessageLoop:: PostTask*()把任务抛到MessageLoop执行。每个MessageLoop实例都属于一个线程,我们首先要知道需要哪个线程来执行任务。我们必定需要在某个地方维护着可用的线程的信息,例如:名称与线程ID的对应关系,这样我们可以通过一个名称来标识需要执行给定任务的线程。BrowserThread类提供了一组静态方法PostTask*来向Browser Threads添加任务:

 
staticbool PostTask(ID identifier,
consttracked_objects::Location& from_here,
constbase::Closure& task);
staticbool PostDelayedTask(ID identifier,
consttracked_objects::Location& from_here,
constbase::Closure& task,
base::TimeDelta delay);
staticbool PostNonNestableTask(ID identifier,
consttracked_objects::Location& from_here,
constbase::Closure& task);


第一个参数是一个线程的标识符,定义如下:

enumID {
// The main thread in the browser.
UI,
// This is the thread that interacts with the database.
DB,
// This is the thread that interacts with the file system.
FILE,
// Used for file system operations that block user interactions.
// Responsiveness of this thread affect users.
FILE_USER_BLOCKING,
// Used to launch and terminate Chrome processes.
PROCESS_LAUNCHER,
// This is the thread to handle slow HTTP cache operations.
CACHE,
// This is the thread that processes non-blocking IO, i.e. IPC and network.
// Blocking IO should happen on other threads like DB, FILE,
// FILE_USER_BLOCKING and CACHE depending on the usage.
IO,
// NOTE: do not add new threads here that are only used by a small number of
// files. Instead you should just use a Thread class and pass its
// MessageLoopProxy around. Named threads there are only for threads that
// are used in many places.
// This identifier does not represent a thread.  Instead it counts the
// number of well-known threads.  Insert new well-known threads before this
// identifier.
ID_COUNT
};


向DB线程添加一个任务就可以采用类似下面这样的代码:
BrowserThread::PostTask(
BrowserThread::DB, FROM_HERE,
base::Bind(
&GetUrlThumbnailTask, url_string, top_sites,
base::Owned(j_callback), lookup_success_callback,
lookup_failed_callback));


BrowserThread维护着一个全局变量g_globals,它的类型是BrowserThreadGlobals,定义如下:

structBrowserThreadGlobals {
BrowserThreadGlobals()
: blocking_pool(newbase::SequencedWorkerPool(3, "BrowserBlocking")) {
memset(threads, 0, BrowserThread::ID_COUNT * sizeof(threads[0]));
memset(thread_delegates, 0,
BrowserThread::ID_COUNT * sizeof(thread_delegates[0]));
}
// This lock protects |threads|. Do not read or modify that array
// without holding this lock. Do not block while holding this lock.
base::Lock lock;
// This array is protected by |lock|. The threads are not owned by this
// array. Typically, the threads are owned on the UI thread by
// BrowserMainLoop. BrowserThreadImpl objects remove themselves from this
// array upon destruction.
BrowserThreadImpl* threads[BrowserThread::ID_COUNT];
// Only atomic operations are used on this array. The delegates are not owned
// by this array, rather by whoever calls BrowserThread::SetDelegate.
BrowserThreadDelegate* thread_delegates[BrowserThread::ID_COUNT];
constscoped_refptr<base::SequencedWorkerPool> blocking_pool;
};
base::LazyInstance<BrowserThreadGlobals>::Leaky
g_globals = LAZY_INSTANCE_INITIALIZER;
} // namespace


其中的threads数组保存了创建的BrowserThread实例的指针。在初始化的时候会给threads赋值:

voidBrowserThreadImpl::Initialize() {
BrowserThreadGlobals& globals = g_globals.Get();
base::AutoLock lock(globals.lock);
DCHECK(identifier_ >= 0 && identifier_ < ID_COUNT);
DCHECK(globals.threads[identifier_] == NULL);
globals.threads[identifier_] = this;
}


PostTask()通过调用PostTaskHelper()来添加任务,其内部首先取得g_globals的值,然后根据传入的标识(例如:BrowserThread:: DB)找到对应的thread对象,就可以取得对象的message_loop,最后调用message_loop的PostTask方法把任务交由相应线程执行。

BrowserThreadGlobals& globals = g_globals.Get();
if(!target_thread_outlives_current)
globals.lock.Acquire();
base::MessageLoop* message_loop =
globals.threads[identifier] ? globals.threads[identifier]->message_loop()
: NULL;
if(message_loop) {
if(nestable) {
message_loop->PostDelayedTask(from_here, task, delay);
}else {
message_loop->PostNonNestableDelayedTask(from_here, task, delay);
}
}


 


消息循环工作原理

MessageLoop实际上就是一个循环,不断从任务队列中取出任务,并执行,那么当所有任务都执行完毕之后呢?最直接的做法是,采用忙等待,不断检查任务队列中是否有新的任务。Chromium当然不会采用这么拙劣的方法。以MessagePumpDefault的实现为例,当所有任务执行完毕之后,会执行event对象的wait()函数,等待事件或信号唤醒继续循环执行。

ThreadRestrictions::ScopedAllowWait allow_wait;
if(delayed_work_time_.is_null()) {
event_.Wait();
}else {
TimeDelta delay = delayed_work_time_ - TimeTicks::Now();
if(delay > TimeDelta()) {
event_.TimedWait(delay);
}else {
// It looks like delayed_work_time_ indicates a time in the past, so we
// need to call DoDelayedWork now.
delayed_work_time_ = TimeTicks();
}
}


唤醒的信号是怎么发送的呢?我们来看PostTask*的执行过程:



添加任务到队列时,如果发现任务队列为空,就会调用ScheduleWork启动消息循环,ScheduleWork具体的实现与采用的系统,以及采用的事件模型有关。还是以MessagePumpDefault为例,它的实现如下:

boolIncomingTaskQueue::PostPendingTask(PendingTask* pending_task) {
……
boolwas_empty = incoming_queue_.empty();
incoming_queue_.push(*pending_task);
pending_task->task.Reset();
if(always_schedule_work_ || (!message_loop_scheduled_ && was_empty)) {
// Wake up the message loop.
message_loop_->ScheduleWork();
// After we've scheduled the message loop, we do not need to do so again
// until we know it has processed all of the work in our queue and is
// waiting for more work again. The message loop will always attempt to
// reload from the incoming queue before waiting again so we clear this flag
// in ReloadWorkQueue().
message_loop_scheduled_ = true;
}
returntrue;
}
voidMessagePumpDefault::ScheduleWork() {
// Since this can be called on any thread, we need to ensure that our Run
// loop wakes up.
event_.Signal();
}


为了减少锁的使用和锁的范围,Chromium采用了一个比较巧妙的方法:简单来讲,MessageLoop维护有两个队列,一个work_queue,一个incoming_queue。消息循环不断从work_queue取任务并执行,新加入任务放入incoming_queue。当work_queue中的任务都执行完后,再把incoming_queue拷贝到work_queue(需要加锁)。这样避免了每执行一个任务都要去加锁。

 


参考

理解WebKit和Chromium: 消息循环(Message Loop)

Chromium on Android: Android系统上Chromium主消息循环的实现分析

Chrome学习笔记(一):线程模型,消息循环

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: