您的位置:首页 > 编程语言

mxnet代码剖析之--thread_engine篇

2016-04-12 10:21 344 查看
线程引擎:

#############################################################################################################################

version 2.0

/// ---------------------------------------------------------------------

struct ThreadedOpr /// 线程操作子

Engine::AsyncFn fn; /// 操作函数,包括backward, forward, NDArray一元/二元/三元操作等

std::vector<ThreadedVar*> const_vars; /// 操作执行过程中的只读变量,只读变量之间的操作允许并发

std::vector<ThreadedVar*> mutable_vars; /// 操作过程中的读写变量,写操作需要对变量独占

FnProperty prop; /// 操作函数属性,主要将copy操作与其它常规计算分离,单独处理

说明:执行引擎调度时的入口,包括操作需要的输入/输出变量,操作函数入口,操作类型等

/// ---------------------------------------------------------------------

struct OprBlock /// 操作数据块

std::atomic<int> wait{0}; /// 变量块等待计数器,通常仅表示写等待计数,多个读操作可以并发

ThreadedOpr* opr{nullptr}; /// 变量对应的线程操作子指针,当等待计数器为零时,触发该操作

Context ctx; /// 数据块的上下文环境

int priority;

说明:主要负责多操作共享数据块的管理,实现读写互斥以及读并发

/// ---------------------------------------------------------------------

struct VersionedVarBlock

VersionedVarBlock* next{nullptr}; /// 变量块的链表结构,nxt指针

OprBlock* trigger{nullptr}; /// 实际操作数据块

bool write{false}; /// 操作数据块读/写属性

说明:通过链表方式排队多个操作对同一个数据块的读写访问,实现执行引擎高效调度

/// ---------------------------------------------------------------------

class ThreadedVar /// 线程变量

std::mutex m_; /// ...

int num_pending_reads_{0}; /// 当前可并发执行的数据块个数(读操作)

VersionedVarBlock* head_{nullptr}; /// VersionedVarBlock 链表头指针

VersionedVarBlock* pending_write_{nullptr}; /// 下一个写操作指针

bool to_delete_{false}; ///

static constexpr int kWriteTriggered = -1;

说明:管理VersionedVarBlock链表,实现链表的添加,删除,执行等操作

需要特别注意VersionedVarBlock的删除时机

1 void AppendReadDependency(OprBlock* opr_block): head_链表的添加读操作

如果链表当前队列前无写操作,则num_pending_reads++,对应的线程操作子计数器--,表示该操作子ready

否则,new VersionedVarBlock()节点,将opr_block添加到head链表末尾

2 void AppendWriteDependency(OprBlock* opr_block):head_链表添加写操作

new VersionedVarBlock()节点, 标志为写操作,添加操作块opr_block

如果当前链表无写操作,则pending_write_ = head_, 如果当前链表前无读操作,则wait--,表示该操作子ready

3 void CompleteReadDependency(Dispatcher dispatcher) :head_链表完成一次读操作

num_pending_reads--

如果所有的并发读操作完成(num_pending_reads == 0),如果链表中存在等待的写操作,则触发该写操作

4 inline bool CompleteWriteDependency(Dispatcher dispatcher):head_链表完成一次写操作

在链表中查找下一个写操作,同时统计下一个写操作之前共有读操作个数

如果读操作个数不为零,并发所有的读操作

如果下一个写操作之前无读操作,则触发下一个写操作

/// ---------------------------------------------------------------------

class ThreadedEngine

std::atomic<int> pending_{0}; /// 表示当前线程引擎共挂起操作的个数

std::atomic<bool> kill_{false};

std::atomic<bool> shutdown_phase_{false};

bool engine_info_{false};

std::mutex finished_m_;

std::condition_variable finished_cv_;

std::shared_ptr<common::ObjectPool<ThreadedOpr> > objpool_opr_ref_;

std::shared_ptr<common::ObjectPool<OprBlock> > objpool_blk_ref_;

std::shared_ptr<common::ObjectPool<VersionedVarBlock> > objpool_varblk_ref_;

std::shared_ptr<common::ObjectPool<ThreadedVar> > objpool_var_ref_;

说明:负责完成ThreadedOp的实际执行

1 ThreadedVar* NewVariable()

2 ThreadedOpr* NewOperator(AsyncFn fn, std::vector<VarHandle> const& const_vars, std::vector<VarHandle> const& mutable_vars, FnProperty prop)

3 void Push(OprHandle op, Context exec_ctx, int priority)

重置op->wait = const_vars.size() + mutable_vars.size() + 1:表示op的触发条件为其所有的变量ready

在操作依赖的所有变量上挂起操作句柄,实现操作对变量状态的依赖

如果操作依赖的所有变量全部ready,压入待执行队列

npending++

4 void WaitForVar(VarHandle var)

5 void WaitForAll() /// 等待执行引擎完成所有的挂起操作

6 void OnComplete(ThreadedOpr* threaded_opr):完成threaded_opr操作,作为ExecuteOprBlock操作的callback函数

操作的所有只读变量触发CompleteReadDependency()操作

操作的所有读写变量触发CompleteWriteDependency()操作

npending--,如果npending == 0, 线程引擎完成所有任务

/// ---------------------------------------------------------------------

class ThreadPool 线程池

std::vector<std::thread> worker_threads_;

class StreamManager

std::array<std::array<mshadow::Stream<gpu>*, kStreams>, kNumGpus> gpu_streams_;

std::array<mshadow::Stream<gpu>*, kNumGpus> gpu_io_streams_;

std::array<int, kNumGpus> gpu_cnt_;

说明:每个节点最多支持16张GPU卡,每个GPU卡允许同时维护16个常规计算stream,1个copy stream

任务调度时,采用round-robin 方法实现负载均衡

/// ---------------------------------------------------------------------

class ThreadedEnginePooled

static constexpr std::size_t kNumWorkingThreads = 16;

static constexpr std::size_t kMaxNumGpus = 16;

static constexpr std::size_t kNumStreamsPerGpu = 16;

StreamManager<kMaxNumGpus, kNumStreamsPerGpu> streams_;

dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_; /// 常规计算任务队列

dmlc::ConcurrentBlockingQueue<OprBlock*> io_task_queue_; /// io/copy操作任务队列

ThreadPool thread_pool_; /// 轮询方式完成task_queue中的所有任务,每个节点16个计算线程

ThreadPool io_thread_pool_; /// 轮询方式完成io_task_queue中的所有任务,每个节点1个拷贝线程

说明:

采用全局线程池访问所有的GPU设备,常规计算任务入task_queue队列,由16个thread_pool_线程轮询执行

每个GPU卡维护16个计算流,1个copy流,线程池采用round-robin算法将计算任务调度到相应GPU的流上,进行操作

copy任务入io_task_queue队列,由1个io_thread_pool_线程完成数据copy任务

PushToExecute()函数调用DoPushToQueue()函数,实现任务按照属性入队列

/// ---------------------------------------------------------------------

struct ThreadWorkerBlock

dmlc::ConcurrentBlockingQueue<OprBlock*, type> task_queue; /// 任务池

std::unique_ptr<ThreadPool> pool; /// 线程池,

class ThreadedEnginePerDevice

int cpu_worker_nthreads_; /// CPU常规处理线程数

int gpu_worker_nthreads_; /// 每个GPU绑定常规计算线程数

int gpu_copy_nthreads_; /// 每个GPU绑定copy操作线程数

common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_; /// CPU常规计算任务池/线程池

std::unique_ptr<ThreadWorkerBlock<kPriorityQueue> > cpu_priority_worker_; /// CPU优先处理任务池/线程池

common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > gpu_normal_workers_;///GPU计算任务池/线程池

common::LazyAllocArray<ThreadWorkerBlock<kCopyQueue> > gpu_copy_workers_; /// GPU数据拷贝任务池/线程池

#############################################################################################################################

version 1.0

1 变量描述:

/// -------------------------------------------------------------------------

struct OprBlock

std::atomic<int> wait{0};

ThreadedOpr* opr{nullptr};

Context ctx;

int priority;

1.1 OprBlock:操作与所依赖变量之间的量化指标,所有操作可以被执行的前提条件是其所依赖的变量全部准备就绪,操作完成后有义务通知其它所有操作(这些操作之间存在变量关联,read/write)

1.1.1 wait: 操作等待计数器,原子操作以保证多线程安全,每个线程操作块(函数)的执行条件为所有的constVar与mutableVar准备就绪,wait = constVar.size + mutableVar.size + 1.if(wait == 0)push opr to executer

1.1.2 opr: 线程操作块,当wait计数器置零时,允许进入threadPool, 等待threadWork的实际执行操作

1.1.3 ctx: opr 运行时上下文信息,CPU/GPU(id)

1.1.4 priority: 操作优先级

/// -------------------------------------------------------------------------

struct VersionedVarBlock

VersionedVarBlock* next{nullptr}

OprBlock* trigger{nullptr}

bool write{false}

1.2 VersionedVarBlock: 实现OprBlock链表,

1.2.1 next: OprBlock链表指针

1.2.2 trigger: myown ptr

1.2.3 write: 读操作/写操作

/// -------------------------------------------------------------------------

class ThreadedVar

std::mutex m_;

int num_pending_reads_{0};

VersionedVarBlock* head_{nullptr};

VersionedVarBlock* pending_write_{nullptr};

bool to_delete_{false}

static constexpr int kWriteTriggered = -1;

1.3 ThreadedVar: 维护VersionVarBlock链表,实现链表的添加与激活(?通知)操作,实现多线程对同一变量的读写互斥,基本规则如下:

添加一个读依赖操作A:

如果当前变量不存在写依赖,直接将操作A计数器减一(即A操作的当前变量可立即使用)
如果当前变量存在写依赖,将操作A压入head_队列,等待写操作完成后,重新调度

添加一个写依赖操作B:

如果当前变量不存在其它任何操作,直接将操作B计算器减一(即B操作的当前写入变量可立即使用)
如果当前变量存在其它写操作,将操作B压入head_队列,等待重新调度
如果当前变量不存在其它写操作,重围写操作指针pending_write_

完成一个读依赖操作: 读操作计数器num_pending_reads_减一,如果读操作计数器为零,并且存在待调度写操作,则调度写操作(所有的读操作已经完成,写操作可以开始执行)

完成一个写依赖操作B:查找head_队列中下一个写依赖操作C,同时统计两个写依赖操作之间的读依赖操作数num_pending_reads_

如果两个BC操作之间无任何读操作(即C操作之前无读操作队列),则直接调度C操作
如果C操作之前存在n个读操作,则首先完成n个读操作的调度

1.3.1 m_: 线程安全锁

1.3.2 num_pending_reads_:写操作前待调度读操作队列个数,如果= -1,表示当前正在执行写操作,完成每次写操作后,需要重新进行统计

1.3.3 head_:操作队列,队列中同时存在读写操作,所有的写操作需要等待它前面所有操作完成之后才可以调度,所有连续的读操作可以并发调度

1.3.4 pending_write_: 下一个等待调度的写操作指针

1.3.5 kWriteTriggered:变量当前调度标志位,= -1 表示当前正在调度写操作

/// -------------------------------------------------------------------------

struct ThreadedOpr

Engine::AsyncFn fn;

std::vector<ThreadedVar*> const_vars;

std::vector<ThreadedVar*> mutable_vars;

FnProperty prop;

bool temporary{false};

1.4 ThreadedOpr:表示一个具体的线程操作子,包括操作依赖的所有变量(constVars, mutableVars)和执行函数

1.4.1 fn: 操作对应的执行函数

1.4.2 const_vars:只读变量,函数执行过程中不改变变量的值

1.4.3 mutable_vars:写变量,函数执行过程中可能修改这些变量的值

1.4.4 temporary:是否属于临时操作,如果true,操作完成后可以立即删除本操作

/// -------------------------------------------------------------------------

class ThreadedEngine:

std::atomic<int> pending_{0};

std::atomic<bool> kill_{false};

std::atomic<bool> shutdown_phase_{false};

bool engine_info_{false};

std::mutex finished_m_;

std::condition_variable finished_cv_;

std::shared_ptr<common::ObjectPool<ThreadedOpr> > objpool_opr_ref_;

std::shared_ptr<common::ObjectPool<OprBlock> > objpool_blk_ref_;

std::shared_ptr<common::ObjectPool<VersionedVarBlock> > objpool_varblk_ref_;

std::shared_ptr<common::ObjectPool<ThreadedVar> > objpool_var_ref_;

1.5 ThreadedEngine:主要负责以下部分工作:

管理线程操作依赖队列,即各操作之间变量的读写依赖,生成任务池

管理threadWorker,具体实现方式包括ThreadedEnginePooled, ThreadedEnginePerDevice,生成线程池

根据任务池中各任务的属性及优先级,调度相应的线程完成具体作业

变量检查,CheckDuplicate(),保证所有变量的唯一性!防止ThreadedVar可能发生的调度异常!

监控VarHandle状态,WaitForVar()

添加新的操作A:重置A的等待计数器wait = constVar.size + mutableVar.size + 1

遍历A的constVars,加入ThreadedVar,实现与其它操作的变量依赖性分析(读依赖)
遍历A的mutableVars,加入ThreadedVar,实现与其它操作的变量依赖性分析(写依赖)
如果wait == 0, just Execute it!

完成操作A: 激活所有操作A关联constVars的threadOpr

激活所有操作A关联mutableVars的threadOpr
pending--, if(!pending)finished_cv.notify_all;

/// -------------------------------------------------------------------------

class ThreadedEnginePooled

static constexpr std::size_t kNumWorkingThreads = 16;

static constexpr std::size_t kMaxNumGpus = 16;

static constexpr std::size_t kNumStreamsPerGpu = 16

StreamManager<kMaxNumGpus, kNumStreamsPerGpu> streams_

dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_;

dmlc::ConcurrentBlockingQueue<OprBlock*> io_task_queue_;

ThreadPool thread_pool_;

ThreadPool io_thread_pool_;

1.6 ThreadedEnginePooled:

线程池任务池模式

构造函数初始化16个工作线程+1个io线程:轮询完成任务池中的threadOpr

PushToExecute()负责将所有的任务入队列(常规任务 + io任务)->DoPushToQueue()
DoExecute()完成具体的操作

1.6.1 thread_pool_ + task_queue_:常规任务池,将由kNumWorkingThreads个线程轮询完成

1.6.2 io_thread_pool_ + io_task_queue_:io任务池,每个节点仅启动1个IO线程,完成数据的CPU/GPU传输

/// -------------------------------------------------------------------------

class ThreadedEnginePerDevice

int cpu_worker_nthreads_;

int gpu_worker_nthreads_;

int gpu_copy_nthreads_;

common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_

std::unique_ptr<ThreadWorkerBlock<kPriorityQueue> > cpu_priority_worker_;

common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > gpu_normal_workers_;

common::LazyAllocArray<ThreadWorkerBlock<kCopyQueue> > gpu_copy_workers_;

1.7 ThreadedEnginePerDevice:

每个GPU指派1个计算线程 + 1个copy线程
优先级任务pushInto优先任务池
GPU任务分别pushInto常规任务池或者copy任务池

/// -------------------------------------------------------------------------

特殊的任务:pushThread()

思考:GPU端如果支持PCI上传下载通道分离,是否可以将upload/download分别指定到不同的stream?

GPU版本IO需要维护两个线程,分别完成数据的上传下载
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: