zoukankan      html  css  js  c++  java
  • Mesos源码分析(15): Test Executor的运行

    Test Executor的代码在src/examples/test_executor.cpp中

     

    1. int main(int argc, char** argv)
    2. {
    3.   TestExecutor executor;
    4.   MesosExecutorDriver driver(&executor);
    5.   return driver.run() == DRIVER_STOPPED ? 0 : 1;
    6. }

     

    Executor的运行主要依赖于MesosExecutorDriver作为封装,和mesos-slave进行通信。

     

    MesosExecutorDriver的实现在src/exec/exec.cpp中

     

    1. Status MesosExecutorDriver::run()
    2. {
    3.   Status status = start();
    4.   return status != DRIVER_RUNNING ? status : join();
    5. }

     

    1. Status MesosExecutorDriver::start()
    2. {
    3.   synchronized (mutex) {
    4.     if (status != DRIVER_NOT_STARTED) {
    5.       return status;
    6.     }
    7.  
    8.     // Set stream buffering mode to flush on newlines so that we
    9.     // capture logs from user processes even when output is redirected
    10.     // to a file.
    11.     setvbuf(stdout, 0, _IOLBF, 0);
    12.     setvbuf(stderr, 0, _IOLBF, 0);
    13.  
    14.     bool local;
    15.  
    16.     UPID slave;
    17.     SlaveID slaveId;
    18.     FrameworkID frameworkId;
    19.     ExecutorID executorId;
    20.     string workDirectory;
    21.     bool checkpoint;
    22.  
    23.     Option<string> value;
    24.     std::istringstream iss;
    25.  
    26.     // Check if this is local (for example, for testing).
    27.     local = os::getenv("MESOS_LOCAL").isSome();
    28.  
    29.     // Get slave PID from environment.
    30.     value = os::getenv("MESOS_SLAVE_PID");
    31.     if (value.isNone()) {
    32.       EXIT(EXIT_FAILURE)
    33.         << "Expecting 'MESOS_SLAVE_PID' to be set in the environment";
    34.     }
    35.  
    36.     slave = UPID(value.get());
    37.     CHECK(slave) << "Cannot parse MESOS_SLAVE_PID '" << value.get() << "'";
    38.  
    39.     // Get slave ID from environment.
    40.     value = os::getenv("MESOS_SLAVE_ID");
    41.     if (value.isNone()) {
    42.       EXIT(EXIT_FAILURE)
    43.         << "Expecting 'MESOS_SLAVE_ID' to be set in the environment";
    44.     }
    45.     slaveId.set_value(value.get());
    46.  
    47.     // Get framework ID from environment.
    48.     value = os::getenv("MESOS_FRAMEWORK_ID");
    49.     if (value.isNone()) {
    50.       EXIT(EXIT_FAILURE)
    51.         << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
    52.     }
    53.     frameworkId.set_value(value.get());
    54.  
    55.     // Get executor ID from environment.
    56.     value = os::getenv("MESOS_EXECUTOR_ID");
    57.     if (value.isNone()) {
    58.       EXIT(EXIT_FAILURE)
    59.         << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
    60.     }
    61.     executorId.set_value(value.get());
    62.  
    63.     // Get working directory from environment.
    64.     value = os::getenv("MESOS_DIRECTORY");
    65.     if (value.isNone()) {
    66.       EXIT(EXIT_FAILURE)
    67.         << "Expecting 'MESOS_DIRECTORY' to be set in the environment";
    68.     }
    69.     workDirectory = value.get();
    70.  
    71.     // Get checkpointing status from environment.
    72.     value = os::getenv("MESOS_CHECKPOINT");
    73.     checkpoint = value.isSome() && value.get() == "1";
    74.  
    75.     Duration recoveryTimeout = RECOVERY_TIMEOUT;
    76.  
    77.     // Get the recovery timeout if checkpointing is enabled.
    78.     if (checkpoint) {
    79.       value = os::getenv("MESOS_RECOVERY_TIMEOUT");
    80.  
    81.       if (value.isSome()) {
    82.         Try<Duration> _recoveryTimeout = Duration::parse(value.get());
    83.  
    84.         if (_recoveryTimeout.isError()) {
    85.           EXIT(EXIT_FAILURE)
    86.             << "Cannot parse MESOS_RECOVERY_TIMEOUT '" << value.get() << "': "
    87.             << _recoveryTimeout.error();
    88.         }
    89.  
    90.         recoveryTimeout = _recoveryTimeout.get();
    91.       }
    92.     }
    93.  
    94.     CHECK(process == NULL);
    95.  
    96.     process = new ExecutorProcess(
    97.         slave,
    98.         this,
    99.         executor,
    100.         slaveId,
    101.         frameworkId,
    102.         executorId,
    103.         local,
    104.         workDirectory,
    105.         checkpoint,
    106.         recoveryTimeout,
    107.         &mutex,
    108.         latch);
    109.  
    110.     spawn(process);
    111.  
    112.     return status = DRIVER_RUNNING;
    113.   }
    114. }

     

    启动另一个线程ExecutorProcess,它的构造函数如下,注册了很多消息处理函数。

     

    1. ExecutorProcess(const UPID& _slave,
    2.                 MesosExecutorDriver* _driver,
    3.                 Executor* _executor,
    4.                 const SlaveID& _slaveId,
    5.                 const FrameworkID& _frameworkId,
    6.                 const ExecutorID& _executorId,
    7.                 bool _local,
    8.                 const string& _directory,
    9.                 bool _checkpoint,
    10.                 Duration _recoveryTimeout,
    11.                 std::recursive_mutex* _mutex,
    12.                 Latch* _latch)
    13.   : ProcessBase(ID::generate("executor")),
    14.     slave(_slave),
    15.     driver(_driver),
    16.     executor(_executor),
    17.     slaveId(_slaveId),
    18.     frameworkId(_frameworkId),
    19.     executorId(_executorId),
    20.     connected(false),
    21.     connection(UUID::random()),
    22.     local(_local),
    23.     aborted(false),
    24.     mutex(_mutex),
    25.     latch(_latch),
    26.     directory(_directory),
    27.     checkpoint(_checkpoint),
    28.     recoveryTimeout(_recoveryTimeout)
    29. {
    30.   LOG(INFO) << "Version: " << MESOS_VERSION;
    31.  
    32.   install<ExecutorRegisteredMessage>(
    33.       &ExecutorProcess::registered,
    34.       &ExecutorRegisteredMessage::executor_info,
    35.       &ExecutorRegisteredMessage::framework_id,
    36.       &ExecutorRegisteredMessage::framework_info,
    37.       &ExecutorRegisteredMessage::slave_id,
    38.       &ExecutorRegisteredMessage::slave_info);
    39.  
    40.   install<ExecutorReregisteredMessage>(
    41.       &ExecutorProcess::reregistered,
    42.       &ExecutorReregisteredMessage::slave_id,
    43.       &ExecutorReregisteredMessage::slave_info);
    44.  
    45.   install<ReconnectExecutorMessage>(
    46.       &ExecutorProcess::reconnect,
    47.       &ReconnectExecutorMessage::slave_id);
    48.  
    49.   install<RunTaskMessage>(
    50.       &ExecutorProcess::runTask,
    51.       &RunTaskMessage::task);
    52.  
    53.   install<KillTaskMessage>(
    54.       &ExecutorProcess::killTask,
    55.       &KillTaskMessage::task_id);
    56.  
    57.   install<StatusUpdateAcknowledgementMessage>(
    58.       &ExecutorProcess::statusUpdateAcknowledgement,
    59.       &StatusUpdateAcknowledgementMessage::slave_id,
    60.       &StatusUpdateAcknowledgementMessage::framework_id,
    61.       &StatusUpdateAcknowledgementMessage::task_id,
    62.       &StatusUpdateAcknowledgementMessage::uuid);
    63.  
    64.   install<FrameworkToExecutorMessage>(
    65.       &ExecutorProcess::frameworkMessage,
    66.       &FrameworkToExecutorMessage::slave_id,
    67.       &FrameworkToExecutorMessage::framework_id,
    68.       &FrameworkToExecutorMessage::executor_id,
    69.       &FrameworkToExecutorMessage::data);
    70.  
    71.   install<ShutdownExecutorMessage>(
    72.       &ExecutorProcess::shutdown);
    73. }

     

    在ExecutorProcess的initiailize的函数中,向mesos-slave发送消息进行注册。

    1. virtual void initialize()
    2. {
    3.   VLOG(1) << "Executor started at: " << self()
    4.           << " with pid " << getpid();
    5.  
    6.   link(slave);
    7.  
    8.   // Register with slave.
    9.   RegisterExecutorMessage message;
    10.   message.mutable_framework_id()->MergeFrom(frameworkId);
    11.   message.mutable_executor_id()->MergeFrom(executorId);
    12.   send(slave, message);
    13. }

     

    当Mesos-slave向TestExecutor发送RunTaskMessage消息的时候,ExecutorProcess调用runTask函数。

    1. void runTask(const TaskInfo& task)
    2. {
    3.   if (aborted.load()) {
    4.     VLOG(1) << "Ignoring run task message for task " << task.task_id()
    5.             << " because the driver is aborted!";
    6.     return;
    7.   }
    8.  
    9.   CHECK(!tasks.contains(task.task_id()))
    10.     << "Unexpected duplicate task " << task.task_id();
    11.  
    12.   tasks[task.task_id()] = task;
    13.  
    14.   VLOG(1) << "Executor asked to run task '" << task.task_id() << "'";
    15.  
    16.   Stopwatch stopwatch;
    17.   if (FLAGS_v >= 1) {
    18.     stopwatch.start();
    19.   }
    20.  
    21.   executor->launchTask(driver, task);
    22.  
    23.   VLOG(1) << "Executor::launchTask took " << stopwatch.elapsed();
    24. }

     

    最终调用executor的launchTask

    在src/examples/test_executor.cpp中

    1. virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
    2. {
    3.   cout << "Starting task " << task.task_id().value() << endl;
    4.  
    5.   TaskStatus status;
    6.   status.mutable_task_id()->MergeFrom(task.task_id());
    7.   status.set_state(TASK_RUNNING);
    8.  
    9.   driver->sendStatusUpdate(status);
    10.  
    11.   // This is where one would perform the requested task.
    12.  
    13.   cout << "Finishing task " << task.task_id().value() << endl;
    14.  
    15.   status.mutable_task_id()->MergeFrom(task.task_id());
    16.   status.set_state(TASK_FINISHED);
    17.  
    18.   driver->sendStatusUpdate(status);
    19. }
  • 相关阅读:
    android滤镜效果
    Android ListView的OnItemClickListener详解
    Categories
    利用Stack倒序List,利用Set使List不能添加重复元素
    IOS数据类型对应输出格式
    win7的dropbox无法启动 重新安装也没用
    记一次datatable的删除操作
    winform动态创建radio以及使用委托判断哪个选中
    临时表列的长度
    退出winform应用程序
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5727784.html
Copyright © 2011-2022 走看看