zoukankan      html  css  js  c++  java
  • licode学习之erizo篇--Worker

    erizo使用Worker来管理Task,每个Task是一个函数片段,其执行完全由Worker来接管。这次主要学习Worker的结构定义和实现机制

     1 class Worker : public std::enable_shared_from_this<Worker> {
     2  public:
     3   typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
     4   typedef std::function<void()> Task; //返回值为空的函数为Task
     5   typedef std::function<bool()> ScheduledTask;  //返回值为bool的函数为scheduletask
     6 
     7   explicit Worker(std::weak_ptr<Scheduler> scheduler,
     8                   std::shared_ptr<Clock> the_clock = std::make_shared<SteadyClock>());
     9   ~Worker();
    10 
    11   virtual void task(Task f); //设置运行Task
    12 
    13   virtual void start(); //开启线程
    14   virtual void start(std::shared_ptr<std::promise<void>> start_promise);  //同步方式开启线程,即确定线程启动后,调用者才会返回
    15   virtual void close(); //停止线程
    16 
    17   virtual std::shared_ptr<ScheduledTaskReference> scheduleFromNow(Task f, duration delta); //定时器,可以取消的定时器
    18   virtual void unschedule(std::shared_ptr<ScheduledTaskReference> id); //取消定时器
    19 
    20   virtual void scheduleEvery(ScheduledTask f, duration period);  //循环定时器,f返回false时停止执行
    21 
    22  private:
    23   void scheduleEvery(ScheduledTask f, duration period, duration next_delay);
    24   std::function<void()> safeTask(std::function<void(std::shared_ptr<Worker>)> f);
    25 
    26  protected:
    27   int next_scheduled_ = 0;
    28 
    29  private:
    30   std::weak_ptr<Scheduler> scheduler_;
    31   std::shared_ptr<Clock> clock_;
    32   boost::asio::io_service service_;
    33   asio_worker service_worker_;
    34   boost::thread_group group_;
    35   std::atomic<bool> closed_;
    36 };

    先来研究一下构造函数

    Worker::Worker(std::weak_ptr<Scheduler> scheduler, std::shared_ptr<Clock> the_clock)
        : scheduler_{scheduler}, //构造定时器变量
          clock_{the_clock}, //构造自己的时钟变量
          service_{}, //构造io_service对象
          service_worker_{new asio_worker::element_type(service_)}, //为io_service注入service_worker,避免直接退出
          closed_{false} { //线程控制变量,为true时,退出
    }

    在构造函数中,使用boost io service,构建了基本的线程架构。

    研究一下start

    void Worker::start() {
      auto promise = std::make_shared<std::promise<void>>();
      start(promise);
    }
    
    void Worker::start(std::shared_ptr<std::promise<void>> start_promise) {
      auto this_ptr = shared_from_this();
      auto worker = [this_ptr, start_promise] { //创建一个代理worker,准备好执行过程
        start_promise->set_value(); //通知promise,线程已经就绪
        if (!this_ptr->closed_) { //如果不是close状态
          return this_ptr->service_.run(); //调用io service的run函数,开启线程过程
        }
        return size_t(0);
      };
      group_.add_thread(new boost::thread(worker)); //实际创建线程,并将之添加到group里面
    }

    提供了两个start函数,无参的直接创建一个promise,调用有参数的,并且并未使用get_future.wait进行流程控制。

    这里就可以理解为:无参数start,不关心线程是否创建成功,如果在线程没有创建成功时,调用了task函数,则可能出现异常错误。有参数的start为外面控制线程存在,优化处理流程提供了可能。

    看一下close函数

    void Worker::close() {
      closed_ = true;
      service_worker_.reset();
      group_.join_all();
      service_.stop();
    }

    在close函数中,将变量设为true,并调用各种析构。

    从start和close的控制可以看到,Worker的start和close只能成功调用一次,如果close以后,再start,线程就会直接退出了。这应该也是一个小弊端了。

    看task函数

    void Worker::task(Task f) {
      service_.post(f);
    }

    task调用io service的post,直接投递任务。也就是说task实际上就是一个基础的处理,让任务进行投递然后执行。

    std::shared_ptr<ScheduledTaskReference> Worker::scheduleFromNow(Task f, duration delta) {
      auto delta_ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta);
      auto id = std::make_shared<ScheduledTaskReference>();
      if (auto scheduler = scheduler_.lock()) {
        scheduler->scheduleFromNow(safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一个task,给scheduler的scheduleFromNow做参数传递
          this_ptr->task(this_ptr->safeTask([f, id](std::shared_ptr<Worker> this_ptr) { //使用safeTask生成一个task,生成的task功能是如果id->isCancelled为true,直接返回,否则执行f,并将这个task传递给自己的任务投递方法
            {
              if (id->isCancelled()) {
                return;
              }
            }
            f();
          }));
        }), delta_ms);
      }
      return id;
    }

     在scheduleFromNow里面,调用了scheduler的scheduleFromNow方法,在scheduler里面,进入定时线程,到达时间后,执行Worker的task方法,投递一个Task,进而激活Worker,运行Task内容,完成定时执行操作。

    停止定时器:

    void Worker::unschedule(std::shared_ptr<ScheduledTaskReference> id) {
      id->cancel();
    }

    设置cancel,停止定时器

    循环定时器:

    void Worker::scheduleEvery(ScheduledTask f, duration period) {
      scheduleEvery(f, period, period);
    }
    
    void Worker::scheduleEvery(ScheduledTask f, duration period, duration next_delay) {
      time_point start = clock_->now();
      std::shared_ptr<Clock> clock = clock_;
    
      scheduleFromNow(safeTask([start, period, next_delay, f, clock](std::shared_ptr<Worker> this_ptr) {
        if (f()) {
          duration clock_skew = clock->now() - start - next_delay;
          duration delay = period - clock_skew;
          this_ptr->scheduleEvery(f, period, delay); //循环递归调用
        }
      }), std::max(next_delay, duration{0}));
    }

    循环定时器,使用递归调用,来实现循环定时器,其停止依托于ScheduledTask的返回值为false,停止循环。

    总结:

    Worker提供了基本的线程管理,提供了Task执行机制以及定时器控制机制,但是没有提供资源重复使用的机制,即多次调用close,start的机制

    使用例子:

    #include <thread/Worker.h>
    
    void sample_worker_task()
    {
        std::shared_ptr<erizo::Scheduler> schedule = std::make_shared<erizo::Scheduler>(2);
        std::shared_ptr<erizo::Worker> worker_no_promise = std::make_shared<erizo::Worker>(schedule);
        worker_no_promise->start();
        //maybe, you should do a sleep.
        int index = 0;
        worker_no_promise->task([index] {//here may corruption because the thread maybe not ok when task push
            printf("index is %d
    ", index);
        });
        index++;
        worker_no_promise->task([index] {
            printf("second task index is %d
    ", index);
        });
        //worker_no_promise->close();
        //worker_no_promise->reset();
    
        std::shared_ptr<erizo::Worker> worker_with_promise = std::make_shared<erizo::Worker>(schedule);
        std::promise pro = std::make_shared<std::promise<void>>();
        worker_with_promise->start(pro);
        pro.get_future().wait();//wait util the thread is ok
        index++;
        worker_with_promise->task([index] {//here is safe
            printf("index is %d
    ", index);
        });
        index++;
        worker_with_promise->task([index] {
            printf("second task index is %d
    ", index);
        });
    
        //worker_with_promise->close();
        //worker_with_promise->reset();
        //schedule->reset();
    }
    
    void sample_woker_schedule_task()
    {
        std::shared_ptr<erizo::Scheduler> schedule = std::make_shared<erizo::Scheduler>(2);
        std::shared_ptr<erizo::Worker> worker_with_promise = std::make_shared<erizo::Worker>(schedule);
        std::promise pro = std::make_shared<std::promise<void>>();
        worker_with_promise->start(pro);
        pro.get_future().wait();//wait util the thread is ok
        
        int index = 0;
        std::shared_ptr<Clock> clk = std::make_shared<SteadyClock>();
        printf("now is %u", clk->now().time_point);
        worker_with_promise->scheduleFromNow([clk] {//schedule once after 10 secondes
            printf("delay 10, now is %u", clk->now().time_point);
        }, 10000);
    
        worker_with_promise->scheduleEvery([index, clk] {//schedule multi per 3 secondes
            printf("schedule evevry index:%d, now:%u", index, clk->now().time_point);
            index++;
            if (index > 3)
            {
                return true;
            }
            return false;
        }, 3000);
        //worker_with_promise->close();
        //worker_with_promise->reset();
        //schedule->reset();
    }
  • 相关阅读:
    当前信息型强人工智能发展缺失条件--规则
    假象篇(1)-动态可变参数的神经网络
    02梦断代码阅读笔记
    结队开发之NABCD
    01梦断代码阅读笔记
    03构建之法阅读笔记
    进度3
    02构建之法阅读笔记
    01构建之法阅读笔记
    关于最大子序和的算法问题(二)
  • 原文地址:https://www.cnblogs.com/limedia/p/licode_erizo_worker.html
Copyright © 2011-2022 走看看