zoukankan      html  css  js  c++  java
  • caffe的data_reader.cpp分析一下干了点什么

    首先说明:下面的内容不一定对

    类body:

    变量:LayerParameter param_ :它里面放的是:body传进来的layerparameter的参数;

    BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_:这是一个队列,它里面放的是一个队列对指针,

    它的初始化:由传入的layerparameter参数赋值param_变量,并开启一个相应的进程;

    它的析构函数:让进程停下来;

    开启的这个进程它会干什么呢??用来读数据吧。

    类data_reader:

    变量:shared_ptr<QueuePair> queue_pair_:它是一个队列对的指针,并且用 prefetch*batch_size QueuePair 初始化了指向的queuePair 的实例 ,即里面的 free_ 队列的大小;

    shared_ptr<Body> body_: 它是一个类body的指针,

    map<const string, boost::weak_ptr<DataReader::Body> > bodies_: 它是一个map的容器,

                                                                                                     其中的它键值:网络层的名字+源数据的路径表示,

                                                                                                      而它对应的值:是一个指 针,指向了

    在data_reader类的初始化时,它传入一个参数LayerParameter,下面是它做的事情:

    1,把它的队列对指针queue_pair_ ,并且用 prefetch*batch_size QueuePair 初始化了指向的queuePair 的实例;

    2,让body_的指针指向一个用LayerParameter初始化的body指针,并且向它指向的body里的变量阻塞队列new_queue_pairs_里压入一个值:为queue_pair_

    3. 初始化上面的参数bodies_, 它的键值为相关的网络层的名字+源数据的路径表示,而值为:与body_相对应的弱指针。

    它的析构函数做的事:

    1,把body_指向的空间释放掉,2,把 bodies_ 内的键-值 删除掉,因为里面的弱指针已经过期了。

    类queuePair:

    变量: BlockingQueue<Datum*> free_; 它是一个存放 datum指针的阻塞队列;

             BlockingQueue<Datum*> full_;它也是一个存放 datum指针的阻塞队列;

    它的初始化为:初始化一定大小size(传入的参数)的free_的空间;

    它的析构函数做的事情:释放掉free里的指针所指向的内存空间,并且把free_的阻塞队列清空;

    (它都没有管full_的事情啊,)

    还有不懂的地方啊,先粘上吧;;;fuck.

    data_reader.hpp

    1 #ifndef CAFFE_DATA_READER_HPP_
      2 #define CAFFE_DATA_READER_HPP_
      3 
      4 #include <map>
      5 #include <string>
      6 #include <vector>
      7 
      8 #include "caffe/common.hpp"
      9 #include "caffe/internal_thread.hpp"
     10 #include "caffe/util/blocking_queue.hpp"
     11 #include "caffe/util/db.hpp"
     12 
     13 namespace caffe {                                                                                                                                                                          
     14 
     15 /**
     16  * @brief Reads data from a source to queues available to data layers.
     17  * A single reading thread is created per source, even if multiple solvers
     18  * are running in parallel, e.g. for multi-GPU training. This makes sure
     19  * databases are read sequentially, and that each solver accesses a different
     20  * subset of the database. Data is distributed to solvers in a round-robin
     21  * way to keep parallel training deterministic.
     22  */
     23 class DataReader {
     24  public:
     25   explicit DataReader(const LayerParameter& param);
     26   ~DataReader();
     27 
     28   inline BlockingQueue<Datum*>& free() const {       //返回queue_pair_指向的queuepair里的free_阻塞队列;
     29     return queue_pair_->free_; 
     30   }
     31   inline BlockingQueue<Datum*>& full() const {      //返回queue_pair_指向的queuepair里的full_阻塞队列;
     32     return queue_pair_->full_;
     33   }
     34 
     35  protected:
     36   // Queue pairs are shared between a body and its readers
     37   class QueuePair {
     38    public:
     39     explicit QueuePair(int size);    //它初始化时,会为free_阻塞队列里push进去size个 Datum*;
     40     ~QueuePair(); //做的就是:把free_与full_里的指针指向的空间释放掉;
     41 
     42     BlockingQueue<Datum*> free_;
     43     BlockingQueue<Datum*> full_;
     44 
     45   DISABLE_COPY_AND_ASSIGN(QueuePair);
     46   };
     47 
     48   // A single body is created per source
     49   class Body : public InternalThread {
     50    public:
     51     explicit Body(const LayerParameter& param);
     52     virtual ~Body();
     53 
     54    protected:
    55     void InternalThreadEntry();        //定义的入口函数,就是说对于body来说 ,这个线程是干什么的; ,它根据Layerparameter里的路径读取读据到new_queue_pairs_里的指针指向的queuepair中;
     56     void read_one(db::Cursor* cursor, QueuePair* qp);
     57 
     58     const LayerParameter param_;
     59     BlockingQueue<shared_ptr<QueuePair> > new_queue_pairs_; //这个阻塞队列里的不同的元素与solver_count有关系啊? ,初始化时,就把queue_pair_放进去了啊;
     60 
     61     friend class DataReader;
     62 
     63   DISABLE_COPY_AND_ASSIGN(Body);
     64   };
     65 
     66   // A source is uniquely identified by its layer name + path, in case
     67   // the same database is read from two different locations in the net.
     68   static inline string source_key(const LayerParameter& param) {  //它做的就是形成一个字符串;
     69     return param.name() + ":" + param.data_param().source();
     70   }
     71 
     72   const shared_ptr<QueuePair> queue_pair_;
     73   shared_ptr<Body> body_;
     74 
     75   static map<const string, boost::weak_ptr<DataReader::Body> > bodies_;
     76 
     77 DISABLE_COPY_AND_ASSIGN(DataReader);
     78 };
     79 
     80 }  // namespace caffe
     81 
     82 #endif  // CAFFE_DATA_READER_HPP_

    data_reader.cpp

    1 #include <boost/thread.hpp>                                                                                                                                                                
      2 #include <map>
      3 #include <string>
      4 #include <vector>
      5 
      6 #include "caffe/common.hpp"
      7 #include "caffe/data_reader.hpp"
      8 #include "caffe/layers/data_layer.hpp"
      9 #include "caffe/proto/caffe.pb.h"
     10 
     11 namespace caffe {
     12 
     13 using boost::weak_ptr;
     14 
     15 map<const string, weak_ptr<DataReader::Body> > DataReader::bodies_;
     16 static boost::mutex bodies_mutex_;
     17 
     18 DataReader::DataReader(const LayerParameter& param)
     19     : queue_pair_(new QueuePair(  //
     20         param.data_param().prefetch() * param.data_param().batch_size())) {
     21   // Get or create a body
     22   boost::mutex::scoped_lock lock(bodies_mutex_);
     23   string key = source_key(param);
     24   weak_ptr<Body>& weak = bodies_[key];
     25   //boost::weak_ptr 必定总是通过 boost::shared_ptr 来初始化的。一旦初始化之后,它基本上只提供一个有用的方法: lock()。
     26   //此方法返回的boost::shared_ptr 与用来初始化弱指针的共享指针共享所有权。 如果这个共享指针不含有任何对象,返回的共享指针也将是空的。
     27   //expired()用于检测所管理的对象是否已经释放;lock()用于获取所管理的对象的强引用指针。
     28   body_ = weak.lock();
     29   if (!body_) {
     30     body_.reset(new Body(param));
     31     bodies_[key] = weak_ptr<Body>(body_);
     32   }
     33   body_->new_queue_pairs_.push(queue_pair_);
     34 }
     35 
     36 DataReader::~DataReader() {
     37   string key = source_key(body_->param_);
     38   body_.reset();
     39   boost::mutex::scoped_lock lock(bodies_mutex_);
     40   if (bodies_[key].expired()) {
     41     bodies_.erase(key); //删除一个元素;
     42   }
     43 }
     44 
     45 //
     46 
     47 DataReader::QueuePair::QueuePair(int size) {
     48   // Initialize the free queue with requested number of datums
     49   for (int i = 0; i < size; ++i) {
     50     free_.push(new Datum());
     51   }
     52 }
     53 
     54 DataReader::QueuePair::~QueuePair() { //释放掉内存;
     54 DataReader::QueuePair::~QueuePair() { //释放掉内存;
     55   Datum* datum;
     56   while (free_.try_pop(&datum)) {
     57     delete datum;
     58   }
     59   while (full_.try_pop(&datum)) {
     60     delete datum;
     61   }
     62 }
     63 
     64 //
     65 
     66 DataReader::Body::Body(const LayerParameter& param)
     67     : param_(param),
     68       new_queue_pairs_() {
     69   StartInternalThread();
     70 }
     71 
     72 DataReader::Body::~Body() {
     73   StopInternalThread();
     74 }
     75 
     76 void DataReader::Body::InternalThreadEntry() {
     77   shared_ptr<db::DB> db(db::GetDB(param_.data_param().backend())); //我真没有看到db::DB的构造函数这样的初始化啊;
     78   db->Open(param_.data_param().source(), db::READ); //创建环境并打开;
     79   shared_ptr<db::Cursor> cursor(db->NewCursor());  //创建了一个cursor用于读取;
     80   vector<shared_ptr<QueuePair> > qps; //从下面的代码可以看出里面装的是阻塞队列 new_queue_pqirs里的指针;
     81   try {
     82     int solver_count = param_.phase() == TRAIN ? Caffe::solver_count() : 1;
     83 
     84     // To ensure deterministic runs, only start running once all solvers
     85     // are ready. But solvers need to peek on one item during initialization,
     86     // so read one item, then wait for the next solver.
     87     for (int i = 0; i < solver_count; ++i) { //这个第一个数据是不是特别呢?,还是为了让cursor移动到first position?
     88       shared_ptr<QueuePair> qp(new_queue_pairs_.pop()); //初始化为new_queue_pairs_队列里的第一个元素(里面放的是queuepair的指针;
     89       read_one(cursor.get(), qp.get()); //不同的solver对应的数据在database里是连续存储的??这个solver_count到底是什么东西?
     90       qps.push_back(qp); //在vector的尾部追加一个数据;
     91     }       
     92     // Main loop
     93     while (!must_stop()) { // 有点不明白什么时候退出循环;
     94       for (int i = 0; i < solver_count; ++i) {
     95         read_one(cursor.get(), qps[i].get());
     96       }
     97       // Check no additional readers have been created. This can happen if
     98       // more than one net is trained at a time per process, whether single
     99       // or multi solver. It might also happen if two data layers have same
    100       // name and same source.
    101       CHECK_EQ(new_queue_pairs_.size(), 0);
    102     }
    103   } catch (boost::thread_interrupted&) {        
    104     // Interrupted exception is expected on shutdown
    105   }
    106 } //
    107 
    108 void DataReader::Body::read_one(db::Cursor* cursor, QueuePair* qp) {
    109   Datum* datum = qp->free_.pop(); //可以看出free_与full_共用一组地址;
    110   // TODO deserialize in-place instead of copy?
    111   datum->ParseFromString(cursor->value()); //cursor的value函数返回string形式的data值;
    112   qp->full_.push(datum);
    113 
    114   // go to the next iter
    115   cursor->Next();
    116   if (!cursor->valid()) { //意思就是,当valid_值(valid()函数返回的)为false,说明没有找到,从数据开始,重新找)
    117     DLOG(INFO) << "Restarting data prefetching from start.";
    118     cursor->SeekToFirst(); //把curso移动到first位置;
    119   }
    120 }
    121 
    122 }  // namespace caffe
  • 相关阅读:
    PHP curl_setopt函数用法介绍补充篇
    Javascript的setTimeOut和setInterval的定时器用法
    PHP curl_setopt函数用法介绍上篇
    开启PHP的伪静态
    关于MySQL的几个命令之load
    使用PHP生成和获取XML格式数据
    WEB开发中常用的正则表达式
    WEB开发中的页面跳转方法总结
    PHP的serialize序列化数据与JSON格式化数据
    PHP防止重复提交表单
  • 原文地址:https://www.cnblogs.com/yinheyi/p/5985965.html
Copyright © 2011-2022 走看看