caffe的data_reader.cpp分析一下干了点什么
2016-10-21 20:26
302 查看
首先说明:下面的内容不一定对
类body:
变量:LayerParameter param_ :它里面放的是:body传进来的layerparameter的参数;
BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_:这是一个队列,它里面放的是一个队列对指针,
它的初始化:由传入的layerparameter参数赋值param_变量,并开启一个相应的进程;
它的析构函数:让进程停下来;
开启的这个进程它会干什么呢??用来读数据吧。
类data_reader:
变量:shared_ptr<QueuePair> queue_pair_:它是一个队列对的指针,并且用 prefetch*batch_size QueuePair 初始化了指向的queuePair 的实例 ,即里面的 free_ 队列的大小;
shared_ptr<Body> body_: 它是一个类body的指针,
map<const string, boost::weak_ptr<DataReader::Body> > bodies_: 它是一个map的容器,
其中的它键值:网络层的名字+源数据的路径表示,
而它对应的值:是一个指 针,指向了
在data_reader类的初始化时,它传入一个参数LayerParameter,下面是它做的事情:
1,把它的队列对指针queue_pair_ ,并且用 prefetch*batch_size QueuePair 初始化了指向的queuePair 的实例;
2,让body_的指针指向一个用LayerParameter初始化的body指针,并且向它指向的body里的变量阻塞队列new_queue_pairs_里压入一个值:为queue_pair_。
3. 初始化上面的参数bodies_, 它的键值为相关的网络层的名字+源数据的路径表示,而值为:与body_相对应的弱指针。
它的析构函数做的事:
1,把body_指向的空间释放掉,2,把 bodies_ 内的键-值 删除掉,因为里面的弱指针已经过期了。
类queuePair:
变量: BlockingQueue<Datum*> free_; 它是一个存放 datum指针的阻塞队列;
BlockingQueue<Datum*> full_;它也是一个存放 datum指针的阻塞队列;
它的初始化为:初始化一定大小size(传入的参数)的free_的空间;
它的析构函数做的事情:释放掉free里的指针所指向的内存空间,并且把free_的阻塞队列清空;
(它都没有管full_的事情啊,)
还有不懂的地方啊,先粘上吧;;;fuck.
data_reader.hpp
data_reader.cpp
类body:
变量:LayerParameter param_ :它里面放的是:body传进来的layerparameter的参数;
BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_:这是一个队列,它里面放的是一个队列对指针,
它的初始化:由传入的layerparameter参数赋值param_变量,并开启一个相应的进程;
它的析构函数:让进程停下来;
开启的这个进程它会干什么呢??用来读数据吧。
类data_reader:
变量:shared_ptr<QueuePair> queue_pair_:它是一个队列对的指针,并且用 prefetch*batch_size QueuePair 初始化了指向的queuePair 的实例 ,即里面的 free_ 队列的大小;
shared_ptr<Body> body_: 它是一个类body的指针,
map<const string, boost::weak_ptr<DataReader::Body> > bodies_: 它是一个map的容器,
其中的它键值:网络层的名字+源数据的路径表示,
而它对应的值:是一个指 针,指向了
在data_reader类的初始化时,它传入一个参数LayerParameter,下面是它做的事情:
1,把它的队列对指针queue_pair_ ,并且用 prefetch*batch_size QueuePair 初始化了指向的queuePair 的实例;
2,让body_的指针指向一个用LayerParameter初始化的body指针,并且向它指向的body里的变量阻塞队列new_queue_pairs_里压入一个值:为queue_pair_。
3. 初始化上面的参数bodies_, 它的键值为相关的网络层的名字+源数据的路径表示,而值为:与body_相对应的弱指针。
它的析构函数做的事:
1,把body_指向的空间释放掉,2,把 bodies_ 内的键-值 删除掉,因为里面的弱指针已经过期了。
类queuePair:
变量: BlockingQueue<Datum*> free_; 它是一个存放 datum指针的阻塞队列;
BlockingQueue<Datum*> full_;它也是一个存放 datum指针的阻塞队列;
它的初始化为:初始化一定大小size(传入的参数)的free_的空间;
它的析构函数做的事情:释放掉free里的指针所指向的内存空间,并且把free_的阻塞队列清空;
(它都没有管full_的事情啊,)
还有不懂的地方啊,先粘上吧;;;fuck.
data_reader.hpp
1 #ifndef CAFFE_DATA_READER_HPP_ 2 #define CAFFE_DATA_READER_HPP_ 3 4 #include <map> 5 #include <string> 6 #include <vector> 7 8 #include "caffe/common.hpp" 9 #include "caffe/internal_thread.hpp" 10 #include "caffe/util/blocking_queue.hpp" 11 #include "caffe/util/db.hpp" 12 13 namespace caffe { 14 15 /** 16 * @brief Reads data from a source to queues available to data layers. 17 * A single reading thread is created per source, even if multiple solvers 18 * are running in parallel, e.g. for multi-GPU training. This makes sure 19 * databases are read sequentially, and that each solver accesses a different 20 * subset of the database. Data is distributed to solvers in a round-robin 21 * way to keep parallel training deterministic. 22 */ 23 class DataReader { 24 public: 25 explicit DataReader(const LayerParameter& param); 26 ~DataReader(); 27 28 inline BlockingQueue<Datum*>& free() const { //返回queue_pair_指向的queuepair里的free_阻塞队列; 29 return queue_pair_->free_; 30 } 31 inline BlockingQueue<Datum*>& full() const { //返回queue_pair_指向的queuepair里的full_阻塞队列; 32 return queue_pair_->full_; 33 } 34 35 protected: 36 // Queue pairs are shared between a body and its readers 37 class QueuePair { 38 public: 39 explicit QueuePair(int size); //它初始化时,会为free_阻塞队列里push进去size个 Datum*; 40 ~QueuePair(); //做的就是:把free_与full_里的指针指向的空间释放掉; 41 42 BlockingQueue<Datum*> free_; 43 BlockingQueue<Datum*> full_; 44 45 DISABLE_COPY_AND_ASSIGN(QueuePair); 46 }; 47 48 // A single body is created per source 49 class Body : public InternalThread { 50 public: 51 explicit Body(const LayerParameter& param); 52 virtual ~Body(); 53 54 protected: 55 void InternalThreadEntry(); //定义的入口函数,就是说对于body来说 ,这个线程是干什么的; ,它根据Layerparameter里的路径读取读据到new_queue_pairs_里的指针指向的queuepair中; 56 void read_one(db::Cursor* cursor, QueuePair* qp); 57 58 const LayerParameter param_; 59 BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_; //这个阻塞队列里的不同的元素与solver_count有关系啊? ,初始化时,就把queue_pair_放进去了啊; 60 61 friend class DataReader; 62 63 DISABLE_COPY_AND_ASSIGN(Body); 64 }; 65 66 // A source is uniquely identified by its layer name + path, in case 67 // the same database is read from two different locations in the net. 68 static inline string source_key(const LayerParameter& param) { //它做的就是形成一个字符串; 69 return param.name() + ":" + param.data_param().source(); 70 } 71 72 const shared_ptr<QueuePair> queue_pair_; 73 shared_ptr<Body> body_; 74 75 static map<const string, boost::weak_ptr<DataReader::Body> > bodies_; 76 77 DISABLE_COPY_AND_ASSIGN(DataReader); 78 }; 79 80 } // namespace caffe 81 82 #endif // CAFFE_DATA_READER_HPP_
data_reader.cpp
1 #include <boost/thread.hpp> 2 #include <map> 3 #include <string> 4 #include <vector> 5 6 #include "caffe/common.hpp" 7 #include "caffe/data_reader.hpp" 8 #include "caffe/layers/data_layer.hpp" 9 #include "caffe/proto/caffe.pb.h" 10 11 namespace caffe { 12 13 using boost::weak_ptr; 14 15 map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_; 16 static boost::mutex bodies_mutex_; 17 18 DataReader::DataReader(const LayerParameter& param) 19 : queue_pair_(new QueuePair( // 20 param.data_param().prefetch() * param.data_param().batch_size())) { 21 // Get or create a body 22 boost::mutex::scoped_lock lock(bodies_mutex_); 23 string key = source_key(param); 24 weak_ptr<Body>& weak = bodies_[key]; 25 //boost::weak_ptr 必定总是通过 boost::shared_ptr 来初始化的。一旦初始化之后,它基本上只提供一个有用的方法: lock()。 26 //此方法返回的boost::shared_ptr 与用来初始化弱指针的共享指针共享所有权。 如果这个共享指针不含有任何对象,返回的共享指针也将是空的。 27 //expired()用于检测所管理的对象是否已经释放;lock()用于获取所管理的对象的强引用指针。 28 body_ = weak.lock(); 29 if (!body_) { 30 body_.reset(new Body(param)); 31 bodies_[key] = weak_ptr<Body>(body_); 32 } 33 body_->new_queue_pairs_.push(queue_pair_); 34 } 35 36 DataReader::~DataReader() { 37 string key = source_key(body_->param_); 38 body_.reset(); 39 boost::mutex::scoped_lock lock(bodies_mutex_); 40 if (bodies_[key].expired()) { 41 bodies_.erase(key); //删除一个元素; 42 } 43 } 44 45 // 46 47 DataReader::QueuePair::QueuePair(int size) { 48 // Initialize the free queue with requested number of datums 49 for (int i = 0; i < size; ++i) { 50 free_.push(new Datum()); 51 } 52 } 53 54 DataReader::QueuePair::~QueuePair() { //释放掉内存; 54 DataReader::QueuePair::~QueuePair() { //释放掉内存; 55 Datum* datum; 56 while (free_.try_pop(&datum)) { 57 delete datum; 58 } 59 while (full_.try_pop(&datum)) { 60 delete datum; 61 } 62 } 63 64 // 65 66 DataReader::Body::Body(const LayerParameter& param) 67 : param_(param), 68 new_queue_pairs_() { 69 StartInternalThread(); 70 } 71 72 DataReader::Body::~Body() { 73 StopInternalThread(); 74 } 75 76 void DataReader::Body::InternalThreadEntry() { 77 shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend())); //我真没有看到db::DB的构造函数这样的初始化啊; 78 db->Open(param_.data_param().source(), db::READ); //创建环境并打开; 79 shared_ptr<db::Cursor> cursor(db->NewCursor()); //创建了一个cursor用于读取; 80 vector<shared_ptr<QueuePair> > qps; //从下面的代码可以看出里面装的是阻塞队列 new_queue_pqirs里的指针; 81 try { 82 int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1; 83 84 // To ensure deterministic runs, only start running once all solvers 85 // are ready. But solvers need to peek on one item during initialization, 86 // so read one item, then wait for the next solver. 87 for (int i = 0; i < solver_count; ++i) { //这个第一个数据是不是特别呢?,还是为了让cursor移动到first position? 88 shared_ptr<QueuePair> qp(new_queue_pairs_.pop()); //初始化为new_queue_pairs_队列里的第一个元素(里面放的是queuepair的指针; 89 read_one(cursor.get(), qp.get()); //不同的solver对应的数据在database里是连续存储的??这个solver_count到底是什么东西? 90 qps.push_back(qp); //在vector的尾部追加一个数据; 91 } 92 // Main loop 93 while (!must_stop()) { // 有点不明白什么时候退出循环; 94 for (int i = 0; i < solver_count; ++i) { 95 read_one(cursor.get(), qps[i].get()); 96 } 97 // Check no additional readers have been created. This can happen if 98 // more than one net is trained at a time per process, whether single 99 // or multi solver. It might also happen if two data layers have same 100 // name and same source. 101 CHECK_EQ(new_queue_pairs_.size(), 0); 102 } 103 } catch (boost::thread_interrupted&) { 104 // Interrupted exception is expected on shutdown 105 } 106 } // 107 108 void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) { 109 Datum* datum = qp->free_.pop(); //可以看出free_与full_共用一组地址; 110 // TODO deserialize in-place instead of copy? 111 datum->ParseFromString(cursor->value()); //cursor的value函数返回string形式的data值; 112 qp->full_.push(datum); 113 114 // go to the next iter 115 cursor->Next(); 116 if (!cursor->valid()) { //意思就是,当valid_值(valid()函数返回的)为false,说明没有找到,从数据开始,重新找) 117 DLOG(INFO) << "Restarting data prefetching from start."; 118 cursor->SeekToFirst(); //把curso移动到first位置; 119 } 120 } 121 122 } // namespace caffe
相关文章推荐
- caffe源码分析--data_layer.cpp
- C1083 无法打开源文件: “..\..\src\caffe\data_reader.cpp”: No such file or directory libcaffe
- caffe源代码分析--data_layer.cpp
- 调试错误 debug assertion failed; file:dlgdata.cpp; line 43 分析析
- 有哪位高手帮忙解决一下问题!!!DataContract 的using引用指令用什么?
- caffe caffe.cpp 程序入口分析
- caffe里的blocking_queue.hpp与.cpp干了点什么呢???
- 传感器数据分析(Sensor Data Analytics)是什么?
- 那位高手能给解释一下,在SqlDataReader读数据和直接返回DataTable(DataSet)之类的那个好?
- Caffe源码(十一):io.cpp 分析
- 在 caffe 的 data_later.cpp 中做数据增强
- 求助 服务器主板5520 主板 装的是 SERVER 2008系统 显示两个网卡驱动都装上了 ,但第二个网口连不上网 ,请大家帮忙分析一下什么原因 !!
- ramdisk 是什么? amdisk.img system.img userdata.img 分析
- 分析一下android2.3中SensorBase.cpp的程序流程
- caffe代码阅读3:data_reader、internalthread以及blocking_queue的实现细节-2016.3.15
- caffe data_transformer.cpp 函数中的一个小问题
- SqlDataReader 中的read 方法是什么意思?
- sqlconnection,sqlcommand,sqldataadapter,sqldatareader,dataset都是做什么用的?
- caffe源码分析--softmax_layer.cpp
- caffe源码分析:softmax_layer.cpp && softmax_loss_layer.cpp