zoukankan      html  css  js  c++  java
  • Mesos源码分析(16): mesos-docker-executor的运行

    mesos-docker-executor的运行代码在src/docker/executor.cpp中

     

    1. int main(int argc, char** argv)
    2. {
    3.   GOOGLE_PROTOBUF_VERIFY_VERSION;
    4.  
    5.   mesos::internal::docker::Flags flags;
    6.  
    7.   // Load flags from environment and command line.
    8.   Try<Nothing> load = flags.load(None(), &argc, &argv);
    9.  
    10.   if (load.isError()) {
    11.     cerr << flags.usage(load.error()) << endl;
    12.     return EXIT_FAILURE;
    13.   }
    14.  
    15.   std::cout << stringify(flags) << std::endl;
    16.  
    17.   mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals.
    18.  
    19.   if (flags.help) {
    20.     cout << flags.usage() << endl;
    21.     return EXIT_SUCCESS;
    22.   }
    23.  
    24.   std::cout << stringify(flags) << std::endl;
    25.  
    26.   if (flags.docker.isNone()) {
    27.     cerr << flags.usage("Missing required option --docker") << endl;
    28.     return EXIT_FAILURE;
    29.   }
    30.  
    31.   if (flags.container.isNone()) {
    32.     cerr << flags.usage("Missing required option --container") << endl;
    33.     return EXIT_FAILURE;
    34.   }
    35.  
    36.   if (flags.sandbox_directory.isNone()) {
    37.     cerr << flags.usage("Missing required option --sandbox_directory") << endl;
    38.     return EXIT_FAILURE;
    39.   }
    40.  
    41.   if (flags.mapped_directory.isNone()) {
    42.     cerr << flags.usage("Missing required option --mapped_directory") << endl;
    43.     return EXIT_FAILURE;
    44.   }
    45.  
    46.   if (flags.stop_timeout.isNone()) {
    47.     cerr << flags.usage("Missing required option --stop_timeout") << endl;
    48.     return EXIT_FAILURE;
    49.   }
    50.  
    51.   if (flags.launcher_dir.isNone()) {
    52.     cerr << flags.usage("Missing required option --launcher_dir") << endl;
    53.     return EXIT_FAILURE;
    54.   }
    55.  
    56.   // The 2nd argument for docker create is set to false so we skip
    57.   // validation when creating a docker abstraction, as the slave
    58.   // should have already validated docker.
    59.   Try<Owned<Docker>> docker = Docker::create(
    60.       flags.docker.get(),
    61.       flags.docker_socket.get(),
    62.       false);
    63.  
    64.   if (docker.isError()) {
    65.     cerr << "Unable to create docker abstraction: " << docker.error() << endl;
    66.     return EXIT_FAILURE;
    67.   }
    68.  
    69.   mesos::internal::docker::DockerExecutor executor(
    70.       docker.get(),
    71.       flags.container.get(),
    72.       flags.sandbox_directory.get(),
    73.       flags.mapped_directory.get(),
    74.       flags.stop_timeout.get(),
    75.       flags.launcher_dir.get());
    76.  
    77.   mesos::MesosExecutorDriver driver(&executor);
    78.   return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
    79. }

     

    如上一篇文章对MesosExecutorDriver的分析,Mesos-slave给Executor发送message运行Task,会调用DockerExecutor的launchTask函数。

    1. virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
    2. {
    3.   dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
    4. }

     

    最后调用DockerExecutorProcess的launchTask函数。

    1. void launchTask(ExecutorDriver* driver, const TaskInfo& task)
    2. {
    3.   if (run.isSome()) {
    4.     TaskStatus status;
    5.     status.mutable_task_id()->CopyFrom(task.task_id());
    6.     status.set_state(TASK_FAILED);
    7.     status.set_message(
    8.         "Attempted to run multiple tasks using a "docker" executor");
    9.  
    10.     driver->sendStatusUpdate(status);
    11.     return;
    12.   }
    13.  
    14.   // Capture the TaskID.
    15.   taskId = task.task_id();
    16.  
    17.   cout << "Starting task " << taskId.get() << endl;
    18.  
    19.   CHECK(task.has_container());
    20.   CHECK(task.has_command());
    21.  
    22.   CHECK(task.container().type() == ContainerInfo::DOCKER);
    23.  
    24.   // We're adding task and executor resources to launch docker since
    25.   // the DockerContainerizer updates the container cgroup limits
    26.   // directly and it expects it to be the sum of both task and
    27.   // executor resources. This does leave to a bit of unaccounted
    28.   // resources for running this executor, but we are assuming
    29.   // this is just a very small amount of overcommit.
    30.   run = docker->run(
    31.       task.container(),
    32.       task.command(),
    33.       containerName,
    34.       sandboxDirectory,
    35.       mappedDirectory,
    36.       task.resources() + task.executor().resources(),
    37.       None(),
    38.       Subprocess::FD(STDOUT_FILENO),
    39.       Subprocess::FD(STDERR_FILENO));
    40.  
    41.   run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
    42.  
    43.   // Delay sending TASK_RUNNING status update until we receive
    44.   // inspect output.
    45.   inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
    46.     .then(defer(self(), [=](const Docker::Container& container) {
    47.       if (!killed) {
    48.         TaskStatus status;
    49.         status.mutable_task_id()->CopyFrom(taskId.get());
    50.         status.set_state(TASK_RUNNING);
    51.         status.set_data(container.output);
    52.         if (container.ipAddress.isSome()) {
    53.           // TODO(karya): Deprecated -- Remove after 0.25.0 has shipped.
    54.           Label* label = status.mutable_labels()->add_labels();
    55.           label->set_key("Docker.NetworkSettings.IPAddress");
    56.           label->set_value(container.ipAddress.get());
    57.  
    58.           NetworkInfo* networkInfo =
    59.             status.mutable_container_status()->add_network_infos();
    60.  
    61.           // TODO(CD): Deprecated -- Remove after 0.27.0.
    62.           networkInfo->set_ip_address(container.ipAddress.get());
    63.  
    64.           NetworkInfo::IPAddress* ipAddress =
    65.             networkInfo->add_ip_addresses();
    66.           ipAddress->set_ip_address(container.ipAddress.get());
    67.         }
    68.         driver->sendStatusUpdate(status);
    69.       }
    70.  
    71.       return Nothing();
    72.     }));
    73.  
    74.   inspect.onReady(
    75.       defer(self(), &Self::launchHealthCheck, containerName, task));
    76. }

     

    调用Docker函数启容器。

  • 相关阅读:
    Live2d Test Env
    Live2d Test Env
    Live2d Test Env
    Live2d Test Env
    Live2d Test Env
    偷东西的学问-背包问题
    HMM-前向后向算法理解与实现(python)
    详解数组分段和最大值最小问题(最小m段和问题)
    打家劫舍系列
    面试题56
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5727791.html
Copyright © 2011-2022 走看看