mxnet代码剖析之--thread_engine篇
2016-04-12 10:21
344 查看
线程引擎:
#############################################################################################################################
version 2.0
/// ---------------------------------------------------------------------
struct ThreadedOpr /// 线程操作子
Engine::AsyncFn fn; /// 操作函数,包括backward, forward, NDArray一元/二元/三元操作等
std::vector<ThreadedVar*> const_vars; /// 操作执行过程中的只读变量,只读变量之间的操作允许并发
std::vector<ThreadedVar*> mutable_vars; /// 操作过程中的读写变量,写操作需要对变量独占
FnProperty prop; /// 操作函数属性,主要将copy操作与其它常规计算分离,单独处理
说明:执行引擎调度时的入口,包括操作需要的输入/输出变量,操作函数入口,操作类型等
/// ---------------------------------------------------------------------
struct OprBlock /// 操作数据块
std::atomic<int> wait{0}; /// 变量块等待计数器,通常仅表示写等待计数,多个读操作可以并发
ThreadedOpr* opr{nullptr}; /// 变量对应的线程操作子指针,当等待计数器为零时,触发该操作
Context ctx; /// 数据块的上下文环境
int priority;
说明:主要负责多操作共享数据块的管理,实现读写互斥以及读并发
/// ---------------------------------------------------------------------
struct VersionedVarBlock
VersionedVarBlock* next{nullptr}; /// 变量块的链表结构,nxt指针
OprBlock* trigger{nullptr}; /// 实际操作数据块
bool write{false}; /// 操作数据块读/写属性
说明:通过链表方式排队多个操作对同一个数据块的读写访问,实现执行引擎高效调度
/// ---------------------------------------------------------------------
class ThreadedVar /// 线程变量
std::mutex m_; /// ...
int num_pending_reads_{0}; /// 当前可并发执行的数据块个数(读操作)
VersionedVarBlock* head_{nullptr}; /// VersionedVarBlock 链表头指针
VersionedVarBlock* pending_write_{nullptr}; /// 下一个写操作指针
bool to_delete_{false}; ///
static constexpr int kWriteTriggered = -1;
说明:管理VersionedVarBlock链表,实现链表的添加,删除,执行等操作
需要特别注意VersionedVarBlock的删除时机
1 void AppendReadDependency(OprBlock* opr_block): head_链表的添加读操作
如果链表当前队列前无写操作,则num_pending_reads++,对应的线程操作子计数器--,表示该操作子ready
否则,new VersionedVarBlock()节点,将opr_block添加到head链表末尾
2 void AppendWriteDependency(OprBlock* opr_block):head_链表添加写操作
new VersionedVarBlock()节点, 标志为写操作,添加操作块opr_block
如果当前链表无写操作,则pending_write_ = head_, 如果当前链表前无读操作,则wait--,表示该操作子ready
3 void CompleteReadDependency(Dispatcher dispatcher) :head_链表完成一次读操作
num_pending_reads--
如果所有的并发读操作完成(num_pending_reads == 0),如果链表中存在等待的写操作,则触发该写操作
4 inline bool CompleteWriteDependency(Dispatcher dispatcher):head_链表完成一次写操作
在链表中查找下一个写操作,同时统计下一个写操作之前共有读操作个数
如果读操作个数不为零,并发所有的读操作
如果下一个写操作之前无读操作,则触发下一个写操作
/// ---------------------------------------------------------------------
class ThreadedEngine
std::atomic<int> pending_{0}; /// 表示当前线程引擎共挂起操作的个数
std::atomic<bool> kill_{false};
std::atomic<bool> shutdown_phase_{false};
bool engine_info_{false};
std::mutex finished_m_;
std::condition_variable finished_cv_;
std::shared_ptr<common::ObjectPool<ThreadedOpr> > objpool_opr_ref_;
std::shared_ptr<common::ObjectPool<OprBlock> > objpool_blk_ref_;
std::shared_ptr<common::ObjectPool<VersionedVarBlock> > objpool_varblk_ref_;
std::shared_ptr<common::ObjectPool<ThreadedVar> > objpool_var_ref_;
说明:负责完成ThreadedOp的实际执行
1 ThreadedVar* NewVariable()
2 ThreadedOpr* NewOperator(AsyncFn fn, std::vector<VarHandle> const& const_vars, std::vector<VarHandle> const& mutable_vars, FnProperty prop)
3 void Push(OprHandle op, Context exec_ctx, int priority)
重置op->wait = const_vars.size() + mutable_vars.size() + 1:表示op的触发条件为其所有的变量ready
在操作依赖的所有变量上挂起操作句柄,实现操作对变量状态的依赖
如果操作依赖的所有变量全部ready,压入待执行队列
npending++
4 void WaitForVar(VarHandle var)
5 void WaitForAll() /// 等待执行引擎完成所有的挂起操作
6 void OnComplete(ThreadedOpr* threaded_opr):完成threaded_opr操作,作为ExecuteOprBlock操作的callback函数
操作的所有只读变量触发CompleteReadDependency()操作
操作的所有读写变量触发CompleteWriteDependency()操作
npending--,如果npending == 0, 线程引擎完成所有任务
/// ---------------------------------------------------------------------
class ThreadPool 线程池
std::vector<std::thread> worker_threads_;
class StreamManager
std::array<std::array<mshadow::Stream<gpu>*, kStreams>, kNumGpus> gpu_streams_;
std::array<mshadow::Stream<gpu>*, kNumGpus> gpu_io_streams_;
std::array<int, kNumGpus> gpu_cnt_;
说明:每个节点最多支持16张GPU卡,每个GPU卡允许同时维护16个常规计算stream,1个copy stream
任务调度时,采用round-robin 方法实现负载均衡
/// ---------------------------------------------------------------------
class ThreadedEnginePooled
static constexpr std::size_t kNumWorkingThreads = 16;
static constexpr std::size_t kMaxNumGpus = 16;
static constexpr std::size_t kNumStreamsPerGpu = 16;
StreamManager<kMaxNumGpus, kNumStreamsPerGpu> streams_;
dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_; /// 常规计算任务队列
dmlc::ConcurrentBlockingQueue<OprBlock*> io_task_queue_; /// io/copy操作任务队列
ThreadPool thread_pool_; /// 轮询方式完成task_queue中的所有任务,每个节点16个计算线程
ThreadPool io_thread_pool_; /// 轮询方式完成io_task_queue中的所有任务,每个节点1个拷贝线程
说明:
采用全局线程池访问所有的GPU设备,常规计算任务入task_queue队列,由16个thread_pool_线程轮询执行
每个GPU卡维护16个计算流,1个copy流,线程池采用round-robin算法将计算任务调度到相应GPU的流上,进行操作
copy任务入io_task_queue队列,由1个io_thread_pool_线程完成数据copy任务
PushToExecute()函数调用DoPushToQueue()函数,实现任务按照属性入队列
/// ---------------------------------------------------------------------
struct ThreadWorkerBlock
dmlc::ConcurrentBlockingQueue<OprBlock*, type> task_queue; /// 任务池
std::unique_ptr<ThreadPool> pool; /// 线程池,
class ThreadedEnginePerDevice
int cpu_worker_nthreads_; /// CPU常规处理线程数
int gpu_worker_nthreads_; /// 每个GPU绑定常规计算线程数
int gpu_copy_nthreads_; /// 每个GPU绑定copy操作线程数
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_; /// CPU常规计算任务池/线程池
std::unique_ptr<ThreadWorkerBlock<kPriorityQueue> > cpu_priority_worker_; /// CPU优先处理任务池/线程池
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > gpu_normal_workers_;///GPU计算任务池/线程池
common::LazyAllocArray<ThreadWorkerBlock<kCopyQueue> > gpu_copy_workers_; /// GPU数据拷贝任务池/线程池
#############################################################################################################################
version 1.0
1 变量描述:
/// -------------------------------------------------------------------------
struct OprBlock
std::atomic<int> wait{0};
ThreadedOpr* opr{nullptr};
Context ctx;
int priority;
1.1 OprBlock:操作与所依赖变量之间的量化指标,所有操作可以被执行的前提条件是其所依赖的变量全部准备就绪,操作完成后有义务通知其它所有操作(这些操作之间存在变量关联,read/write)
1.1.1 wait: 操作等待计数器,原子操作以保证多线程安全,每个线程操作块(函数)的执行条件为所有的constVar与mutableVar准备就绪,wait = constVar.size + mutableVar.size + 1.if(wait == 0)push opr to executer
1.1.2 opr: 线程操作块,当wait计数器置零时,允许进入threadPool, 等待threadWork的实际执行操作
1.1.3 ctx: opr 运行时上下文信息,CPU/GPU(id)
1.1.4 priority: 操作优先级
/// -------------------------------------------------------------------------
struct VersionedVarBlock
VersionedVarBlock* next{nullptr}
OprBlock* trigger{nullptr}
bool write{false}
1.2 VersionedVarBlock: 实现OprBlock链表,
1.2.1 next: OprBlock链表指针
1.2.2 trigger: myown ptr
1.2.3 write: 读操作/写操作
/// -------------------------------------------------------------------------
class ThreadedVar
std::mutex m_;
int num_pending_reads_{0};
VersionedVarBlock* head_{nullptr};
VersionedVarBlock* pending_write_{nullptr};
bool to_delete_{false}
static constexpr int kWriteTriggered = -1;
1.3 ThreadedVar: 维护VersionVarBlock链表,实现链表的添加与激活(?通知)操作,实现多线程对同一变量的读写互斥,基本规则如下:
添加一个读依赖操作A:
如果当前变量不存在写依赖,直接将操作A计数器减一(即A操作的当前变量可立即使用)
如果当前变量存在写依赖,将操作A压入head_队列,等待写操作完成后,重新调度
添加一个写依赖操作B:
如果当前变量不存在其它任何操作,直接将操作B计算器减一(即B操作的当前写入变量可立即使用)
如果当前变量存在其它写操作,将操作B压入head_队列,等待重新调度
如果当前变量不存在其它写操作,重围写操作指针pending_write_
完成一个读依赖操作: 读操作计数器num_pending_reads_减一,如果读操作计数器为零,并且存在待调度写操作,则调度写操作(所有的读操作已经完成,写操作可以开始执行)
完成一个写依赖操作B:查找head_队列中下一个写依赖操作C,同时统计两个写依赖操作之间的读依赖操作数num_pending_reads_
如果两个BC操作之间无任何读操作(即C操作之前无读操作队列),则直接调度C操作
如果C操作之前存在n个读操作,则首先完成n个读操作的调度
1.3.1 m_: 线程安全锁
1.3.2 num_pending_reads_:写操作前待调度读操作队列个数,如果= -1,表示当前正在执行写操作,完成每次写操作后,需要重新进行统计
1.3.3 head_:操作队列,队列中同时存在读写操作,所有的写操作需要等待它前面所有操作完成之后才可以调度,所有连续的读操作可以并发调度
1.3.4 pending_write_: 下一个等待调度的写操作指针
1.3.5 kWriteTriggered:变量当前调度标志位,= -1 表示当前正在调度写操作
/// -------------------------------------------------------------------------
struct ThreadedOpr
Engine::AsyncFn fn;
std::vector<ThreadedVar*> const_vars;
std::vector<ThreadedVar*> mutable_vars;
FnProperty prop;
bool temporary{false};
1.4 ThreadedOpr:表示一个具体的线程操作子,包括操作依赖的所有变量(constVars, mutableVars)和执行函数
1.4.1 fn: 操作对应的执行函数
1.4.2 const_vars:只读变量,函数执行过程中不改变变量的值
1.4.3 mutable_vars:写变量,函数执行过程中可能修改这些变量的值
1.4.4 temporary:是否属于临时操作,如果true,操作完成后可以立即删除本操作
/// -------------------------------------------------------------------------
class ThreadedEngine:
std::atomic<int> pending_{0};
std::atomic<bool> kill_{false};
std::atomic<bool> shutdown_phase_{false};
bool engine_info_{false};
std::mutex finished_m_;
std::condition_variable finished_cv_;
std::shared_ptr<common::ObjectPool<ThreadedOpr> > objpool_opr_ref_;
std::shared_ptr<common::ObjectPool<OprBlock> > objpool_blk_ref_;
std::shared_ptr<common::ObjectPool<VersionedVarBlock> > objpool_varblk_ref_;
std::shared_ptr<common::ObjectPool<ThreadedVar> > objpool_var_ref_;
1.5 ThreadedEngine:主要负责以下部分工作:
管理线程操作依赖队列,即各操作之间变量的读写依赖,生成任务池
管理threadWorker,具体实现方式包括ThreadedEnginePooled, ThreadedEnginePerDevice,生成线程池
根据任务池中各任务的属性及优先级,调度相应的线程完成具体作业
变量检查,CheckDuplicate(),保证所有变量的唯一性!防止ThreadedVar可能发生的调度异常!
监控VarHandle状态,WaitForVar()
添加新的操作A:重置A的等待计数器wait = constVar.size + mutableVar.size + 1
遍历A的constVars,加入ThreadedVar,实现与其它操作的变量依赖性分析(读依赖)
遍历A的mutableVars,加入ThreadedVar,实现与其它操作的变量依赖性分析(写依赖)
如果wait == 0, just Execute it!
完成操作A: 激活所有操作A关联constVars的threadOpr
激活所有操作A关联mutableVars的threadOpr
pending--, if(!pending)finished_cv.notify_all;
/// -------------------------------------------------------------------------
class ThreadedEnginePooled
static constexpr std::size_t kNumWorkingThreads = 16;
static constexpr std::size_t kMaxNumGpus = 16;
static constexpr std::size_t kNumStreamsPerGpu = 16
StreamManager<kMaxNumGpus, kNumStreamsPerGpu> streams_
dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_;
dmlc::ConcurrentBlockingQueue<OprBlock*> io_task_queue_;
ThreadPool thread_pool_;
ThreadPool io_thread_pool_;
1.6 ThreadedEnginePooled:
线程池任务池模式
构造函数初始化16个工作线程+1个io线程:轮询完成任务池中的threadOpr
PushToExecute()负责将所有的任务入队列(常规任务 + io任务)->DoPushToQueue()
DoExecute()完成具体的操作
1.6.1 thread_pool_ + task_queue_:常规任务池,将由kNumWorkingThreads个线程轮询完成
1.6.2 io_thread_pool_ + io_task_queue_:io任务池,每个节点仅启动1个IO线程,完成数据的CPU/GPU传输
/// -------------------------------------------------------------------------
class ThreadedEnginePerDevice
int cpu_worker_nthreads_;
int gpu_worker_nthreads_;
int gpu_copy_nthreads_;
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_
std::unique_ptr<ThreadWorkerBlock<kPriorityQueue> > cpu_priority_worker_;
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > gpu_normal_workers_;
common::LazyAllocArray<ThreadWorkerBlock<kCopyQueue> > gpu_copy_workers_;
1.7 ThreadedEnginePerDevice:
每个GPU指派1个计算线程 + 1个copy线程
优先级任务pushInto优先任务池
GPU任务分别pushInto常规任务池或者copy任务池
/// -------------------------------------------------------------------------
特殊的任务:pushThread()
思考:GPU端如果支持PCI上传下载通道分离,是否可以将upload/download分别指定到不同的stream?
GPU版本IO需要维护两个线程,分别完成数据的上传下载
#############################################################################################################################
version 2.0
/// ---------------------------------------------------------------------
struct ThreadedOpr /// 线程操作子
Engine::AsyncFn fn; /// 操作函数,包括backward, forward, NDArray一元/二元/三元操作等
std::vector<ThreadedVar*> const_vars; /// 操作执行过程中的只读变量,只读变量之间的操作允许并发
std::vector<ThreadedVar*> mutable_vars; /// 操作过程中的读写变量,写操作需要对变量独占
FnProperty prop; /// 操作函数属性,主要将copy操作与其它常规计算分离,单独处理
说明:执行引擎调度时的入口,包括操作需要的输入/输出变量,操作函数入口,操作类型等
/// ---------------------------------------------------------------------
struct OprBlock /// 操作数据块
std::atomic<int> wait{0}; /// 变量块等待计数器,通常仅表示写等待计数,多个读操作可以并发
ThreadedOpr* opr{nullptr}; /// 变量对应的线程操作子指针,当等待计数器为零时,触发该操作
Context ctx; /// 数据块的上下文环境
int priority;
说明:主要负责多操作共享数据块的管理,实现读写互斥以及读并发
/// ---------------------------------------------------------------------
struct VersionedVarBlock
VersionedVarBlock* next{nullptr}; /// 变量块的链表结构,nxt指针
OprBlock* trigger{nullptr}; /// 实际操作数据块
bool write{false}; /// 操作数据块读/写属性
说明:通过链表方式排队多个操作对同一个数据块的读写访问,实现执行引擎高效调度
/// ---------------------------------------------------------------------
class ThreadedVar /// 线程变量
std::mutex m_; /// ...
int num_pending_reads_{0}; /// 当前可并发执行的数据块个数(读操作)
VersionedVarBlock* head_{nullptr}; /// VersionedVarBlock 链表头指针
VersionedVarBlock* pending_write_{nullptr}; /// 下一个写操作指针
bool to_delete_{false}; ///
static constexpr int kWriteTriggered = -1;
说明:管理VersionedVarBlock链表,实现链表的添加,删除,执行等操作
需要特别注意VersionedVarBlock的删除时机
1 void AppendReadDependency(OprBlock* opr_block): head_链表的添加读操作
如果链表当前队列前无写操作,则num_pending_reads++,对应的线程操作子计数器--,表示该操作子ready
否则,new VersionedVarBlock()节点,将opr_block添加到head链表末尾
2 void AppendWriteDependency(OprBlock* opr_block):head_链表添加写操作
new VersionedVarBlock()节点, 标志为写操作,添加操作块opr_block
如果当前链表无写操作,则pending_write_ = head_, 如果当前链表前无读操作,则wait--,表示该操作子ready
3 void CompleteReadDependency(Dispatcher dispatcher) :head_链表完成一次读操作
num_pending_reads--
如果所有的并发读操作完成(num_pending_reads == 0),如果链表中存在等待的写操作,则触发该写操作
4 inline bool CompleteWriteDependency(Dispatcher dispatcher):head_链表完成一次写操作
在链表中查找下一个写操作,同时统计下一个写操作之前共有读操作个数
如果读操作个数不为零,并发所有的读操作
如果下一个写操作之前无读操作,则触发下一个写操作
/// ---------------------------------------------------------------------
class ThreadedEngine
std::atomic<int> pending_{0}; /// 表示当前线程引擎共挂起操作的个数
std::atomic<bool> kill_{false};
std::atomic<bool> shutdown_phase_{false};
bool engine_info_{false};
std::mutex finished_m_;
std::condition_variable finished_cv_;
std::shared_ptr<common::ObjectPool<ThreadedOpr> > objpool_opr_ref_;
std::shared_ptr<common::ObjectPool<OprBlock> > objpool_blk_ref_;
std::shared_ptr<common::ObjectPool<VersionedVarBlock> > objpool_varblk_ref_;
std::shared_ptr<common::ObjectPool<ThreadedVar> > objpool_var_ref_;
说明:负责完成ThreadedOp的实际执行
1 ThreadedVar* NewVariable()
2 ThreadedOpr* NewOperator(AsyncFn fn, std::vector<VarHandle> const& const_vars, std::vector<VarHandle> const& mutable_vars, FnProperty prop)
3 void Push(OprHandle op, Context exec_ctx, int priority)
重置op->wait = const_vars.size() + mutable_vars.size() + 1:表示op的触发条件为其所有的变量ready
在操作依赖的所有变量上挂起操作句柄,实现操作对变量状态的依赖
如果操作依赖的所有变量全部ready,压入待执行队列
npending++
4 void WaitForVar(VarHandle var)
5 void WaitForAll() /// 等待执行引擎完成所有的挂起操作
6 void OnComplete(ThreadedOpr* threaded_opr):完成threaded_opr操作,作为ExecuteOprBlock操作的callback函数
操作的所有只读变量触发CompleteReadDependency()操作
操作的所有读写变量触发CompleteWriteDependency()操作
npending--,如果npending == 0, 线程引擎完成所有任务
/// ---------------------------------------------------------------------
class ThreadPool 线程池
std::vector<std::thread> worker_threads_;
class StreamManager
std::array<std::array<mshadow::Stream<gpu>*, kStreams>, kNumGpus> gpu_streams_;
std::array<mshadow::Stream<gpu>*, kNumGpus> gpu_io_streams_;
std::array<int, kNumGpus> gpu_cnt_;
说明:每个节点最多支持16张GPU卡,每个GPU卡允许同时维护16个常规计算stream,1个copy stream
任务调度时,采用round-robin 方法实现负载均衡
/// ---------------------------------------------------------------------
class ThreadedEnginePooled
static constexpr std::size_t kNumWorkingThreads = 16;
static constexpr std::size_t kMaxNumGpus = 16;
static constexpr std::size_t kNumStreamsPerGpu = 16;
StreamManager<kMaxNumGpus, kNumStreamsPerGpu> streams_;
dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_; /// 常规计算任务队列
dmlc::ConcurrentBlockingQueue<OprBlock*> io_task_queue_; /// io/copy操作任务队列
ThreadPool thread_pool_; /// 轮询方式完成task_queue中的所有任务,每个节点16个计算线程
ThreadPool io_thread_pool_; /// 轮询方式完成io_task_queue中的所有任务,每个节点1个拷贝线程
说明:
采用全局线程池访问所有的GPU设备,常规计算任务入task_queue队列,由16个thread_pool_线程轮询执行
每个GPU卡维护16个计算流,1个copy流,线程池采用round-robin算法将计算任务调度到相应GPU的流上,进行操作
copy任务入io_task_queue队列,由1个io_thread_pool_线程完成数据copy任务
PushToExecute()函数调用DoPushToQueue()函数,实现任务按照属性入队列
/// ---------------------------------------------------------------------
struct ThreadWorkerBlock
dmlc::ConcurrentBlockingQueue<OprBlock*, type> task_queue; /// 任务池
std::unique_ptr<ThreadPool> pool; /// 线程池,
class ThreadedEnginePerDevice
int cpu_worker_nthreads_; /// CPU常规处理线程数
int gpu_worker_nthreads_; /// 每个GPU绑定常规计算线程数
int gpu_copy_nthreads_; /// 每个GPU绑定copy操作线程数
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_; /// CPU常规计算任务池/线程池
std::unique_ptr<ThreadWorkerBlock<kPriorityQueue> > cpu_priority_worker_; /// CPU优先处理任务池/线程池
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > gpu_normal_workers_;///GPU计算任务池/线程池
common::LazyAllocArray<ThreadWorkerBlock<kCopyQueue> > gpu_copy_workers_; /// GPU数据拷贝任务池/线程池
#############################################################################################################################
version 1.0
1 变量描述:
/// -------------------------------------------------------------------------
struct OprBlock
std::atomic<int> wait{0};
ThreadedOpr* opr{nullptr};
Context ctx;
int priority;
1.1 OprBlock:操作与所依赖变量之间的量化指标,所有操作可以被执行的前提条件是其所依赖的变量全部准备就绪,操作完成后有义务通知其它所有操作(这些操作之间存在变量关联,read/write)
1.1.1 wait: 操作等待计数器,原子操作以保证多线程安全,每个线程操作块(函数)的执行条件为所有的constVar与mutableVar准备就绪,wait = constVar.size + mutableVar.size + 1.if(wait == 0)push opr to executer
1.1.2 opr: 线程操作块,当wait计数器置零时,允许进入threadPool, 等待threadWork的实际执行操作
1.1.3 ctx: opr 运行时上下文信息,CPU/GPU(id)
1.1.4 priority: 操作优先级
/// -------------------------------------------------------------------------
struct VersionedVarBlock
VersionedVarBlock* next{nullptr}
OprBlock* trigger{nullptr}
bool write{false}
1.2 VersionedVarBlock: 实现OprBlock链表,
1.2.1 next: OprBlock链表指针
1.2.2 trigger: myown ptr
1.2.3 write: 读操作/写操作
/// -------------------------------------------------------------------------
class ThreadedVar
std::mutex m_;
int num_pending_reads_{0};
VersionedVarBlock* head_{nullptr};
VersionedVarBlock* pending_write_{nullptr};
bool to_delete_{false}
static constexpr int kWriteTriggered = -1;
1.3 ThreadedVar: 维护VersionVarBlock链表,实现链表的添加与激活(?通知)操作,实现多线程对同一变量的读写互斥,基本规则如下:
添加一个读依赖操作A:
如果当前变量不存在写依赖,直接将操作A计数器减一(即A操作的当前变量可立即使用)
如果当前变量存在写依赖,将操作A压入head_队列,等待写操作完成后,重新调度
添加一个写依赖操作B:
如果当前变量不存在其它任何操作,直接将操作B计算器减一(即B操作的当前写入变量可立即使用)
如果当前变量存在其它写操作,将操作B压入head_队列,等待重新调度
如果当前变量不存在其它写操作,重围写操作指针pending_write_
完成一个读依赖操作: 读操作计数器num_pending_reads_减一,如果读操作计数器为零,并且存在待调度写操作,则调度写操作(所有的读操作已经完成,写操作可以开始执行)
完成一个写依赖操作B:查找head_队列中下一个写依赖操作C,同时统计两个写依赖操作之间的读依赖操作数num_pending_reads_
如果两个BC操作之间无任何读操作(即C操作之前无读操作队列),则直接调度C操作
如果C操作之前存在n个读操作,则首先完成n个读操作的调度
1.3.1 m_: 线程安全锁
1.3.2 num_pending_reads_:写操作前待调度读操作队列个数,如果= -1,表示当前正在执行写操作,完成每次写操作后,需要重新进行统计
1.3.3 head_:操作队列,队列中同时存在读写操作,所有的写操作需要等待它前面所有操作完成之后才可以调度,所有连续的读操作可以并发调度
1.3.4 pending_write_: 下一个等待调度的写操作指针
1.3.5 kWriteTriggered:变量当前调度标志位,= -1 表示当前正在调度写操作
/// -------------------------------------------------------------------------
struct ThreadedOpr
Engine::AsyncFn fn;
std::vector<ThreadedVar*> const_vars;
std::vector<ThreadedVar*> mutable_vars;
FnProperty prop;
bool temporary{false};
1.4 ThreadedOpr:表示一个具体的线程操作子,包括操作依赖的所有变量(constVars, mutableVars)和执行函数
1.4.1 fn: 操作对应的执行函数
1.4.2 const_vars:只读变量,函数执行过程中不改变变量的值
1.4.3 mutable_vars:写变量,函数执行过程中可能修改这些变量的值
1.4.4 temporary:是否属于临时操作,如果true,操作完成后可以立即删除本操作
/// -------------------------------------------------------------------------
class ThreadedEngine:
std::atomic<int> pending_{0};
std::atomic<bool> kill_{false};
std::atomic<bool> shutdown_phase_{false};
bool engine_info_{false};
std::mutex finished_m_;
std::condition_variable finished_cv_;
std::shared_ptr<common::ObjectPool<ThreadedOpr> > objpool_opr_ref_;
std::shared_ptr<common::ObjectPool<OprBlock> > objpool_blk_ref_;
std::shared_ptr<common::ObjectPool<VersionedVarBlock> > objpool_varblk_ref_;
std::shared_ptr<common::ObjectPool<ThreadedVar> > objpool_var_ref_;
1.5 ThreadedEngine:主要负责以下部分工作:
管理线程操作依赖队列,即各操作之间变量的读写依赖,生成任务池
管理threadWorker,具体实现方式包括ThreadedEnginePooled, ThreadedEnginePerDevice,生成线程池
根据任务池中各任务的属性及优先级,调度相应的线程完成具体作业
变量检查,CheckDuplicate(),保证所有变量的唯一性!防止ThreadedVar可能发生的调度异常!
监控VarHandle状态,WaitForVar()
添加新的操作A:重置A的等待计数器wait = constVar.size + mutableVar.size + 1
遍历A的constVars,加入ThreadedVar,实现与其它操作的变量依赖性分析(读依赖)
遍历A的mutableVars,加入ThreadedVar,实现与其它操作的变量依赖性分析(写依赖)
如果wait == 0, just Execute it!
完成操作A: 激活所有操作A关联constVars的threadOpr
激活所有操作A关联mutableVars的threadOpr
pending--, if(!pending)finished_cv.notify_all;
/// -------------------------------------------------------------------------
class ThreadedEnginePooled
static constexpr std::size_t kNumWorkingThreads = 16;
static constexpr std::size_t kMaxNumGpus = 16;
static constexpr std::size_t kNumStreamsPerGpu = 16
StreamManager<kMaxNumGpus, kNumStreamsPerGpu> streams_
dmlc::ConcurrentBlockingQueue<OprBlock*> task_queue_;
dmlc::ConcurrentBlockingQueue<OprBlock*> io_task_queue_;
ThreadPool thread_pool_;
ThreadPool io_thread_pool_;
1.6 ThreadedEnginePooled:
线程池任务池模式
构造函数初始化16个工作线程+1个io线程:轮询完成任务池中的threadOpr
PushToExecute()负责将所有的任务入队列(常规任务 + io任务)->DoPushToQueue()
DoExecute()完成具体的操作
1.6.1 thread_pool_ + task_queue_:常规任务池,将由kNumWorkingThreads个线程轮询完成
1.6.2 io_thread_pool_ + io_task_queue_:io任务池,每个节点仅启动1个IO线程,完成数据的CPU/GPU传输
/// -------------------------------------------------------------------------
class ThreadedEnginePerDevice
int cpu_worker_nthreads_;
int gpu_worker_nthreads_;
int gpu_copy_nthreads_;
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > cpu_normal_workers_
std::unique_ptr<ThreadWorkerBlock<kPriorityQueue> > cpu_priority_worker_;
common::LazyAllocArray<ThreadWorkerBlock<kWorkerQueue> > gpu_normal_workers_;
common::LazyAllocArray<ThreadWorkerBlock<kCopyQueue> > gpu_copy_workers_;
1.7 ThreadedEnginePerDevice:
每个GPU指派1个计算线程 + 1个copy线程
优先级任务pushInto优先任务池
GPU任务分别pushInto常规任务池或者copy任务池
/// -------------------------------------------------------------------------
特殊的任务:pushThread()
思考:GPU端如果支持PCI上传下载通道分离,是否可以将upload/download分别指定到不同的stream?
GPU版本IO需要维护两个线程,分别完成数据的上传下载
相关文章推荐
- Eclipse Problems中提示没有build问题
- Struts2---入门
- SpringMVC-配置静态资源
- 关于在win7系统下zend studio5.5.0启动闪退问题的解决
- leetcode 7: Reverse integer (C#版)
- 如何将Eclipse中的项目迁移到Android Studio 中
- java基础随笔-内部类
- Python基础 变量和数据类型
- Github上的原文XMPP环境搭建步骤,英语能力差不多的可以看看
- PhantomJS 与python的结合
- Github上的原文XMPP环境搭建步骤,英语能力差不多的可以看看
- java 缓冲流的操作
- php 判断文件/目录是否存的方法
- C++库汇总
- BC404学习笔记-ABAP面向对象编程(一)-基础
- struts2文件上传
- SSM框架——详细整合教程(Spring+SpringMVC+MyBatis)【转载】
- SpringMVC ExceptionHandler 可以传递的参数及返回值
- hihocode #1033 : 交错和
- Struts2中的OGNL详解