(Caffe)基本类DataReader、QueuePair、Body(四)
2016-04-07 18:48
471 查看
本文地址:/article/10918244.html
说明:
一个
说明:
1.
2.
说明:
一个数据库只可能有
所以有,一个
此外有,一个
由代码5,6行及16行可知,每一个DataReader对应一个读的任务,即从数据库(如examples/mnist/mnist_train_lmdb)中读取param.data_param().prefetch() * param.data_param().batch_size()(LeNet5中默认为4×64)个样本
由此可见,一个DataReader为一个任务,通过QueuePair(也对应于该任务)“通知”Body某个数据库中读去N个样本
由代码13行可知,某个数据库(如examples/mnist/mnist_train_lmdb)对应的Body若不存在,将新建一个Body来处理该数据库,也可以理解成新建一个唯一对应于该数据库的线程来处理该数据可。
说明:
由第4节16行可知,一个新的任务(
9到13行从每个
17行循环读数据中,由于
LeNet5中默认为
该线程何时停止呢?
1 简介
QueuePair与Body是
DataReader的内部类。一个
DataReader对应一个任务,一个Body生成一个线程来读取数据库(如
examples/mnist/mnist_train_lmdb)。
QueuePair为前面两者之间的衔接、通信。
2 源代码
/** * @brief Reads data from a source to queues available to data layers. * A single reading thread is created per source, even if multiple solvers * are running in parallel, e.g. for multi-GPU training. This makes sure * databases are read sequentially, and that each solver accesses a different * subset of the database. Data is distributed to solvers in a round-robin * way to keep parallel training deterministic. */ class DataReader { public: ... protected: // Queue pairs are shared between a body and its readers class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); BlockingQueue<Datum*> free_; BlockingQueue<Datum*> full_; }; // A single body is created per source class Body : public InternalThread { public: ... protected: void InternalThreadEntry(); void read_one(db::Cursor* cursor, QueuePair* qp); const LayerParameter param_; BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_; ... }; ... const shared_ptr<QueuePair> queue_pair_; shared_ptr<Body> body_; static map<const string, boost::weak_ptr<DataReader::Body> > bodies_; };
2 类QueuePair
DataReader::QueuePair::QueuePair(int size) { // Initialize the free queue with requested number of datums for (int i = 0; i < size; ++i) { free_.push(new Datum()); } }
说明:
一个
QueuePair对应一个任务队列,从数据库(如
examples/mnist/mnist_train_lmdb)中读取
size个样本
BlockingQueue为一个线程安全的队列容器,其模板类型可能是
Datum,
Batch等。此处装的是
Datum。
BlockingQueue<Datum*> free_为Datum队列,均为新
new出来的,没有包含原始数据(图像)信息
BlockingQueue<Datum*> full_为从数据库读取信息后的队列,包含了原始数据(图像)信息
Datum为一个样本单元,关于
Datum的定义,参见
caffe.proto文件,一般来说,
Datum对应于一张图像(及其
label)
3 类Body
DataReader::Body::Body(const LayerParameter& param) : param_(param), new_queue_pairs_() { StartInternalThread(); }
说明:
1.
Body类继承了
InternalThread(详见博文)。在构造函数了开启这个线程
2.
Body类重载了
DataReader::Body::InternalThreadEntry()函数,从数据库读取数据的操作在该函数中实现,见本文第5节
4 类DataReader
DataReader类的构造函数如下:
map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_; static boost::mutex bodies_mutex_; DataReader::DataReader(const LayerParameter& param) : queue_pair_(new QueuePair( // param.data_param().prefetch() * param.data_param().batch_size())) { // Get or create a body boost::mutex::scoped_lock lock(bodies_mutex_); string key = source_key(param); weak_ptr<Body>& weak = bodies_[key]; body_ = weak.lock(); if (!body_) { body_.reset(new Body(param)); bodies_[key] = weak_ptr<Body>(body_); } body_->new_queue_pairs_.push(queue_pair_); }
说明:
一个数据库只可能有
Body对象,如
examples/mnist/mnist_train_lmdb不管在任何线程的任何
DataReader对象中,都只会有一个
Body对象,因为
bodies_是静态的:
所以有,一个
Body的对象也可以有多个
DataReader对象
此外有,一个
DataReader对象可以有多个
Body对象,即
map<string,weak_ptr<Body>> bodies_
由代码5,6行及16行可知,每一个DataReader对应一个读的任务,即从数据库(如examples/mnist/mnist_train_lmdb)中读取param.data_param().prefetch() * param.data_param().batch_size()(LeNet5中默认为4×64)个样本
由此可见,一个DataReader为一个任务,通过QueuePair(也对应于该任务)“通知”Body某个数据库中读去N个样本
由代码13行可知,某个数据库(如examples/mnist/mnist_train_lmdb)对应的Body若不存在,将新建一个Body来处理该数据库,也可以理解成新建一个唯一对应于该数据库的线程来处理该数据可。
5 函数DataReader::Body::InternalThreadEntry
void DataReader::Body::InternalThreadEntry() { ... vector<shared_ptr<QueuePair> > qps; try { ... // To ensure deterministic runs, only start running once all solvers // are ready. But solvers need to peek on one item during initialization, // so read one item, then wait for the next solver. for (int i = 0; i < solver_count; ++i) { shared_ptr<QueuePair> qp(new_queue_pairs_.pop()); read_one(cursor.get(), qp.get()); qps.push_back(qp); } // Main loop while (!must_stop()) { for (int i = 0; i < solver_count; ++i) { read_one(cursor.get(), qps[i].get()); } ... } } catch (boost::thread_interrupted&) { // Interrupted exception is expected on shutdown } }
说明:
read_one()从
QueuePair的
free_中取出一个
Datum,从数据库读入数据至
Datum,然后放入
full_中
由第4节16行可知,一个新的任务(
DataReader)到来时,将把一个命令队列(
QueuePair)放入到某个数据库(
Body)的缓冲命令队列中(
new_queue_pairs_)
9到13行从每个
solver的任务中读取一个
Datum,在15到18行从数据库中循环读出数据
17行循环读数据中,由于
QueuePair中的
BlockingQueue<Datum*> free_是线程安全的,当
free_不够用时候,该线程挂起,等待其他地方使用了
full_释放出
free_再继续从数据库读数据至
free_中的
Datum,然后把该
Datum重新放入
full_中。
LeNet5中默认为
free_加
full_的
Datum个数为4×64
该线程何时停止呢?
相关文章推荐
- Android高级UI之ViewPager实现页卡的最新方法-简洁的TabLayout
- uint8_t / uint16_t / uint32_t /uint64_t
- ie浏览器强制开启怪异模式(Quirks Mode)的解决方法
- CoAP Request and Response Rules
- Could not find com.android.tools.build:gradle:1.3.0.
- android studio 构建系统基础build
- 3D Slicer+Qt-easy-build+VS2013
- Android学习笔记---第五天---基础UI组件---AnalogClock&TextClock&Chronometer(时钟与简单的计时器)
- 利用UEditor上传图片的注意点
- Access restriction: The type JFrame is not accessible due to restriction on required 错误
- UUID和UDID的常识
- UVA_10583_Ubiquitous Religions
- ios —— UIViewAdditions 布局坐标类库
- Ural 1183 Brackets Sequence(区间DP+记忆化搜索)
- UIColor *_color[5] UIColor数组
- DuiLib(7)——CTreeViewUI使用攻略
- requirejs加载css
- 79.iOS 设备的UI规范和iOS各控件默认高度
- 通用页面框架CmPage(一):简介
- [leetcode] 23. Merge k Sorted Lists