zoukankan      html  css  js  c++  java
  • cartographer-sensor-internal-ordered_muti_queue

    ordered_multi_queue.cc

    OrderedMultiQueue主要函数为;

    类中定义一个queues_,包含很多个带有key的Queue,每个Queue负责一种传感器数据收集和处理。
    void OrderedMultiQueue::Add(const QueueKey& queue_key, std::unique_ptr data);

    如果了解ROS ,会经常用到call_back,怎样实现的内,本类即可说明。

     using Callback = std::function<void(std::unique_ptr<Data>)>;//回调函数
    /*
      struct QueueKey{
        int trajectory_id; // 轨线id;
        std::string sensor_id; //传感器id;
      }
      struct Queue {
        common::BlockingQueue<std::unique_ptr<Data>> queue;
        Callback callback;
        bool finished = false;
      };
      std::map<int, common::Time> common_start_time_per_trajectory_;//轨迹id及对应创建轨迹时间
      std::map<QueueKey, Queue> queues_;//多队列主体,本类最大的内存占用量: first 是QueueKey,second 是 Queue。
      QueueKey blocker_;
    */
    
    // Number of items that can be queued up before we log which queues are waiting
    // for data.
    const int kMaxQueueSize = 500;
    
    }  // namespace
    // 重载QueueKey的<<输出运算符,友元函数
    inline std::ostream& operator<<(std::ostream& out, const QueueKey& key) {
      return out << '(' << key.trajectory_id << ", " << key.sensor_id << ')';
    }
    
    
    //添加一个关键词是key的队列,并用比较函数Callback排序
    void OrderedMultiQueue::AddQueue(const QueueKey& queue_key, Callback callback) {
      CHECK_EQ(queues_.count(queue_key), 0);
      queues_[queue_key].callback = std::move(callback);
    }
      /*
       某一key标识的【队列】Queue已经完成入队,因此不能再入队列,并在map中移除key.
      */
    void OrderedMultiQueue::MarkQueueAsFinished(const QueueKey& queue_key) {
      auto it = queues_.find(queue_key);
      CHECK(it != queues_.end()) << "Did not find '" << queue_key << "'.";
      auto& queue = it->second;
      CHECK(!queue.finished);//检查状态
      queue.finished = true;//标记本队列已完成,别的数据不能再入队.
      Dispatch(); //调用一次MarkQueueAsFinished()就要调用一次Dispatch()
    }
    
    //根据key找到队列,并添加data元素
    void OrderedMultiQueue::Add(const QueueKey& queue_key,
                                std::unique_ptr<Data> data) {
      auto it = queues_.find(queue_key);
      if (it == queues_.end()) {//没有key时,警告
        LOG_EVERY_N(WARNING, 1000)
            << "Ignored data for queue: '" << queue_key << "'";
        return;
      }
      it->second.queue.Push(std::move(data));
      Dispatch();//调用一次Add()就要调用一次Dispatch()
    }
    //先找到没有finished的队列,然后再对这些队列标记finished.已完成的则不作任何处理
    void OrderedMultiQueue::Flush() {
      std::vector<QueueKey> unfinished_queues;
      for (auto& entry : queues_) {
        if (!entry.second.finished) {
          unfinished_queues.push_back(entry.first);
        }
      }
      for (auto& unfinished_queue : unfinished_queues) {
        MarkQueueAsFinished(unfinished_queue);
      }
    }
    
    QueueKey OrderedMultiQueue::GetBlocker() const {
      CHECK(!queues_.empty());
      return blocker_;
    }
    
    //处理最早的数据
    void OrderedMultiQueue::Dispatch() {
      while (true) {
        //首先处理的数据,也即最早采集的数据
        const Data* next_data = nullptr;
        Queue* next_queue = nullptr;
        QueueKey next_queue_key;
        //遍历队列中的每一个key:填充上面3个变量值。如果某一key对应的data为空,则直接return
        for (auto it = queues_.begin(); it != queues_.end();) {  // 通过for循环,找到所有队列中最早的data,赋值给next_data
          const auto* data = it->second.queue.Peek<Data>();//获取某一队的队首data
          if (data == nullptr) {
            if (it->second.finished) {//it对应的队列为空且为finished,故删除it对应的key
              queues_.erase(it++);
              continue;
            }
            CannotMakeProgress(it->first);//此时什么也不做
            return;
          }
          //找到next_data数据: 即采集时间最早的数据,理论上应该最先处理它。 next_data改为 earlest_data更好理解
          if (next_data == nullptr || data->GetTime() < next_data->GetTime()) {
            next_data = data;
            next_queue = &it->second;
            next_queue_key = it->first;
          }
          CHECK_LE(last_dispatched_time_, next_data->GetTime())
              << "Non-sorted data added to queue: '" << it->first << "'";
          ++it;
        } 
        if (next_data == nullptr) {
          CHECK(queues_.empty());//只有多队列为空,才可能next_data==nullptr
          return;
        }
    
        // If we haven't dispatched any data for this trajectory yet, fast forward
        // all queues of this trajectory until a common start time has been reached.
        const common::Time common_start_time = //common_start_time即所有的sensor都开始有data的时间点。
            GetCommonStartTime(next_queue_key.trajectory_id);
        
        if (next_data->GetTime() >= common_start_time) { //大多数情况,happy case
          // Happy case, we are beyond the 'common_start_time' already.
          last_dispatched_time_ = next_data->GetTime();//调用回调函数处理data。
          next_queue->callback(next_queue->queue.Pop()); //处理并删除最新数据队列定端数据,也就是最新数据
        } else if (next_queue->queue.Size() < 2) { // 罕见
          if (!next_queue->finished) {
            // We cannot decide whether to drop or dispatch this yet.
            CannotMakeProgress(next_queue_key);
            return;
          }
          last_dispatched_time_ = next_data->GetTime();
          next_queue->callback(next_queue->queue.Pop());
        } else {
          // We take a peek at the time after next data. If it also is not beyond
          // 'common_start_time' we drop 'next_data', otherwise we just found the
          // first packet to dispatch from this queue.
          std::unique_ptr<Data> next_data_owner = next_queue->queue.Pop();
          if (next_queue->queue.Peek<Data>()->GetTime() > common_start_time) {
            last_dispatched_time_ = next_data->GetTime();
            next_queue->callback(std::move(next_data_owner));
          }
        }
    
      }
    }
    //queue_key赋值给blocker_,表示
    void OrderedMultiQueue::CannotMakeProgress(const QueueKey& queue_key) {
      blocker_ = queue_key;//标识该队列Queue已经阻塞
      for (auto& entry : queues_) {
        if (entry.second.queue.Size() > kMaxQueueSize) {
          LOG_EVERY_N(WARNING, 60) << "Queue waiting for data: " << queue_key;
          return;
        }
      }
    }
    
    common::Time OrderedMultiQueue::GetCommonStartTime(const int trajectory_id) {
       //map.emplace():Construct and insert element,根据trajectory_id构造一个map。
      //std::map<int, common::Time> common_start_time_per_trajectory_;//轨迹id及对应创建轨迹时间
      auto emplace_result = common_start_time_per_trajectory_.emplace(
          trajectory_id, common::Time::min());
      common::Time& common_start_time = emplace_result.first->second;//first是插入元素指针 
      if (emplace_result.second) {
        for (auto& entry : queues_) {
          //entry是map的pair<,>.本循环求得所有传感器中的maxtime
          if (entry.first.trajectory_id == trajectory_id) {
            common_start_time = std::max(
                common_start_time, entry.second.queue.Peek<Data>()->GetTime());//peak 最先元素,也就是最早时间
          }
        }
        LOG(INFO) << "All sensor data for trajectory " << trajectory_id
                  << " is available starting at '" << common_start_time << "'.";
      }
      return common_start_time;
    }
    
  • 相关阅读:
    WinForm 窗体应用程序(初步)之一
    ADO.NET
    面向对象思想
    数据库原理
    HTML学习总结
    c# 学习心得(2)
    c# 学习心得(1)
    《大话数据结构》读书笔记(2)
    《大话数据结构》读书笔记(1)
    ASP.NET Core学习总结(3)
  • 原文地址:https://www.cnblogs.com/heimazaifei/p/12439009.html
Copyright © 2011-2022 走看看