zoukankan      html  css  js  c++  java
  • Mesos源码分析(13): MesosContainerier运行一个Task

    MesosContainerizer的实现在文件src/slave/containerizer/mesos/containerizer.cpp中

     

    1. Future<bool> MesosContainerizer::launch(
    2.     const ContainerID& containerId,
    3.     const TaskInfo& taskInfo,
    4.     const ExecutorInfo& executorInfo,
    5.     const string& directory,
    6.     const Option<string>& user,
    7.     const SlaveID& slaveId,
    8.     const PID<Slave>& slavePid,
    9.     bool checkpoint)
    10. {
    11.   return dispatch(process.get(),
    12.                   &MesosContainerizerProcess::launch,
    13.                   containerId,
    14.                   taskInfo,
    15.                   executorInfo,
    16.                   directory,
    17.                   user,
    18.                   slaveId,
    19.                   slavePid,
    20.                   checkpoint);
    21. }

     

    转而调用MesosContainerizerProcess::launch,只有executorInfo.has_container()是Mesos的时候,才使用MesosContainerizer.

    1. // Launching an executor involves the following steps:
    2. // 1. Call prepare on each isolator.
    3. // 2. Fork the executor. The forked child is blocked from exec'ing until it has
    4. // been isolated.
    5. // 3. Isolate the executor. Call isolate with the pid for each isolator.
    6. // 4. Fetch the executor.
    7. // 5. Exec the executor. The forked child is signalled to continue. It will
    8. // first execute any preparation commands from isolators and then exec the
    9. // executor.
    10. Future<bool> MesosContainerizerProcess::launch(
    11.     const ContainerID& containerId,
    12.     const Option<TaskInfo>& taskInfo,
    13.     const ExecutorInfo& _executorInfo,
    14.     const string& directory,
    15.     const Option<string>& user,
    16.     const SlaveID& slaveId,
    17.     const PID<Slave>& slavePid,
    18.     bool checkpoint)
    19. {
    20.   if (containers_.contains(containerId)) {
    21.     return Failure("Container already started");
    22.   }
    23.  
    24.   if (taskInfo.isSome() &&
    25.       taskInfo.get().has_container() &&
    26.       taskInfo.get().container().type() != ContainerInfo::MESOS) {
    27.     return false;
    28.   }
    29.  
    30.   // NOTE: We make a copy of the executor info because we may mutate
    31.   // it with default container info.
    32.   ExecutorInfo executorInfo = _executorInfo;
    33.  
    34.   if (executorInfo.has_container() &&
    35.       executorInfo.container().type() != ContainerInfo::MESOS) {
    36.     return false;
    37.   }
    38.  
    39.   // Add the default container info to the executor info.
    40.   // TODO(jieyu): Rename the flag to be default_mesos_container_info.
    41.   if (!executorInfo.has_container() &&
    42.       flags.default_container_info.isSome()) {
    43.     executorInfo.mutable_container()->CopyFrom(
    44.         flags.default_container_info.get());
    45.   }
    46.  
    47.   LOG(INFO) << "Starting container '" << containerId
    48.             << "' for executor '" << executorInfo.executor_id()
    49.             << "' of framework '" << executorInfo.framework_id() << "'";
    50.  
    51.   Container* container = new Container();
    52.   container->directory = directory;
    53.   container->state = PROVISIONING;
    54.   container->resources = executorInfo.resources();
    55.  
    56.   // We need to set the `launchInfos` to be a ready future initially
    57.   // before we starting calling isolator->prepare() because otherwise,
    58.   // the destroy will wait forever trying to wait for this future to
    59.   // be ready , which it never will. See MESOS-4878.
    60.   container->launchInfos = list<Option<ContainerLaunchInfo>>();
    61.  
    62.   containers_.put(containerId, Owned<Container>(container));
    63.  
    64.   if (!executorInfo.has_container()) {
    65.     return prepare(containerId, taskInfo, executorInfo, directory, user, None())
    66.       .then(defer(self(),
    67.                   &Self::__launch,
    68.                   containerId,
    69.                   executorInfo,
    70.                   directory,
    71.                   user,
    72.                   slaveId,
    73.                   slavePid,
    74.                   checkpoint,
    75.                   lambda::_1));
    76.   }
    77.  
    78.   // Provision the root filesystem if needed.
    79.   CHECK_EQ(executorInfo.container().type(), ContainerInfo::MESOS);
    80.  
    81.   if (!executorInfo.container().mesos().has_image()) {
    82.     return _launch(containerId,
    83.                    taskInfo,
    84.                    executorInfo,
    85.                    directory,
    86.                    user,
    87.                    slaveId,
    88.                    slavePid,
    89.                    checkpoint,
    90.                    None());
    91.   }
    92.  
    93.   const Image& image = executorInfo.container().mesos().image();
    94.  
    95.   Future<ProvisionInfo> future =
    96.     provisioner->provision(containerId, image);
    97.  
    98.   container->provisionInfos.push_back(future);
    99.  
    100.   return future
    101.     .then(defer(PID<MesosContainerizerProcess>(this),
    102.                 &MesosContainerizerProcess::_launch,
    103.                 containerId,
    104.                 taskInfo,
    105.                 executorInfo,
    106.                 directory,
    107.                 user,
    108.                 slaveId,
    109.                 slavePid,
    110.                 checkpoint,
    111.                 lambda::_1));
    112. }

     

    大家注意ExecutorInfo里面的ContainerInfo和TaskInfo里面的ContainerInfo不同。

    如果大家看protocol buffer的定义文件include/mesos/mesos.proto里面,ExecutorInfo里面有一个ContainerInfo

    1. message ExecutorInfo {
    2.   required ExecutorID executor_id = 1;
    3.   optional FrameworkID framework_id = 8; // TODO(benh): Make this required.
    4.   required CommandInfo command = 7;
    5.   // Executor provided with a container will launch the container
    6.   // with the executor's CommandInfo and we expect the container to
    7.   // act as a Mesos executor.
    8.   optional ContainerInfo container = 11;
    9.   repeated Resource resources = 5;
    10.   optional string name = 9;
    11.  
    12.   // 'source' is an identifier style string used by frameworks to
    13.   // track the source of an executor. This is useful when it's
    14.   // possible for different executor ids to be related semantically.
    15.   //
    16.   // NOTE: 'source' is exposed alongside the resource usage of the
    17.   // executor via JSON on the slave. This allows users to import usage
    18.   // information into a time series database for monitoring.
    19.   optional string source = 10;
    20.  
    21.   optional bytes data = 4;
    22.  
    23.   // Service discovery information for the executor. It is not
    24.   // interpreted or acted upon by Mesos. It is up to a service
    25.   // discovery system to use this information as needed and to handle
    26.   // executors without service discovery information.
    27.   optional DiscoveryInfo discovery = 12;
    28. }

     

    如果ExecutorInfo的ContainerInfo有值,则executor会启动在这个container里面。

    那marathon里面的container info放在哪里呢?

    TaskInfo里面也有一个ContainerInfo

    1. message TaskInfo {
    2.   required string name = 1;
    3.   required TaskID task_id = 2;
    4.   required SlaveID slave_id = 3;
    5.   repeated Resource resources = 4;
    6.   optional ExecutorInfo executor = 5;
    7.   optional CommandInfo command = 7;
    8.   // Task provided with a container will launch the container as part
    9.   // of this task paired with the task's CommandInfo.
    10.   optional ContainerInfo container = 9;
    11.   optional bytes data = 6;
    12.   // A health check for the task (currently in *alpha* and initial
    13.   // support will only be for TaskInfo's that have a CommandInfo).
    14.   optional HealthCheck health_check = 8;
    15.  
    16.   // Labels are free-form key value pairs which are exposed through
    17.   // master and slave endpoints. Labels will not be interpreted or
    18.   // acted upon by Mesos itself. As opposed to the data field, labels
    19.   // will be kept in memory on master and slave processes. Therefore,
    20.   // labels should be used to tag tasks with light-weight meta-data.
    21.   // Labels should not contain duplicate key-value pairs.
    22.   optional Labels labels = 10;
    23.  
    24.   // Service discovery information for the task. It is not interpreted
    25.   // or acted upon by Mesos. It is up to a service discovery system
    26.   // to use this information as needed and to handle tasks without
    27.   // service discovery information.
    28.   optional DiscoveryInfo discovery = 11;
    29. }

     

    如果TaskInfo里面的ContainerInfo有值,才是真正的运行容器,容器里面运行任务。

     

    最终会调用MesosContainerizerProcess::__launch

    1. Future<bool> MesosContainerizerProcess::__launch(
    2.     const ContainerID& containerId,
    3.     const ExecutorInfo& executorInfo,
    4.     const string& directory,
    5.     const Option<string>& user,
    6.     const SlaveID& slaveId,
    7.     const PID<Slave>& slavePid,
    8.     bool checkpoint,
    9.     const list<Option<ContainerLaunchInfo>>& launchInfos)
    10. {
    11. ……
    12.   // Prepare environment variables for the executor.
    13.   map<string, string> environment = executorEnvironment(
    14.       executorInfo,
    15.       directory,
    16.       slaveId,
    17.       slavePid,
    18.       checkpoint,
    19.       flags);
    20.  
    21.   // Determine the root filesystem for the container. Only one
    22.   // isolator should return the container root filesystem.
    23.   Option<string> rootfs;
    24.  
    25.   // Determine the executor launch command for the container.
    26.   // At most one command can be returned from docker runtime
    27.   // isolator if a docker image is specifed.
    28.   Option<CommandInfo> executorLaunchCommand;
    29.   Option<string> workingDirectory;
    30.  
    31.   foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) {
    32.     if (launchInfo.isSome() && launchInfo->has_rootfs()) {
    33.       if (rootfs.isSome()) {
    34.         return Failure("Only one isolator should return the container rootfs");
    35.       } else {
    36.         rootfs = launchInfo->rootfs();
    37.       }
    38.     }
    39.  
    40.     if (launchInfo.isSome() && launchInfo->has_environment()) {
    41.       foreach (const Environment::Variable& variable,
    42.                launchInfo->environment().variables()) {
    43.         const string& name = variable.name();
    44.         const string& value = variable.value();
    45.  
    46.         if (environment.count(name)) {
    47.           VLOG(1) << "Overwriting environment variable '"
    48.                   << name << "', original: '"
    49.                   << environment[name] << "', new: '"
    50.                   << value << "', for container "
    51.                   << containerId;
    52.         }
    53.  
    54.         environment[name] = value;
    55.       }
    56.     }
    57.  
    58.     if (launchInfo.isSome() && launchInfo->has_command()) {
    59.       if (executorLaunchCommand.isSome()) {
    60.         return Failure("At most one command can be returned from isolators");
    61.       } else {
    62.         executorLaunchCommand = launchInfo->command();
    63.       }
    64.     }
    65.  
    66.     if (launchInfo.isSome() && launchInfo->has_working_directory()) {
    67.       if (workingDirectory.isSome()) {
    68.         return Failure(
    69.             "At most one working directory can be returned from isolators");
    70.       } else {
    71.         workingDirectory = launchInfo->working_directory();
    72.       }
    73.     }
    74.   }
    75.  
    76.   // TODO(jieyu): Consider moving this to 'executorEnvironment' and
    77.   // consolidating with docker containerizer.
    78.   environment["MESOS_SANDBOX"] =
    79.     rootfs.isSome() ? flags.sandbox_directory : directory;
    80.  
    81.   // Include any enviroment variables from CommandInfo.
    82.   foreach (const Environment::Variable& variable,
    83.            executorInfo.command().environment().variables()) {
    84.     environment[variable.name()] = variable.value();
    85.   }
    86.  
    87.   JSON::Array commandArray;
    88.   int namespaces = 0;
    89.   foreach (const Option<ContainerLaunchInfo>& launchInfo, launchInfos) {
    90.     if (!launchInfo.isSome()) {
    91.       continue;
    92.     }
    93.  
    94.     // Populate the list of additional commands to be run inside the container
    95.     // context.
    96.     foreach (const CommandInfo& command, launchInfo->commands()) {
    97.       commandArray.values.emplace_back(JSON::protobuf(command));
    98.     }
    99.  
    100.     // Process additional environment variables returned by isolators.
    101.     if (launchInfo->has_environment()) {
    102.       foreach (const Environment::Variable& variable,
    103.           launchInfo->environment().variables()) {
    104.         environment[variable.name()] = variable.value();
    105.       }
    106.     }
    107.  
    108.     if (launchInfo->has_namespaces()) {
    109.       namespaces |= launchInfo->namespaces();
    110.     }
    111.   }
    112.  
    113.   // TODO(jieyu): Use JSON::Array once we have generic parse support.
    114.   JSON::Object commands;
    115.   commands.values["commands"] = commandArray;
    116.  
    117.   return logger->prepare(executorInfo, directory)
    118.     .then(defer(
    119.         self(),
    120.         [=](const ContainerLogger::SubprocessInfo& subprocessInfo)
    121.           -> Future<bool> {
    122.     // Use a pipe to block the child until it's been isolated.
    123.     int pipes[2];
    124.  
    125.     // We assume this should not fail under reasonable conditions so
    126.     // we use CHECK.
    127.     CHECK(pipe(pipes) == 0);
    128.  
    129.     // Prepare the flags to pass to the launch process.
    130.     MesosContainerizerLaunch::Flags launchFlags;
    131.  
    132.     launchFlags.command = executorLaunchCommand.isSome()
    133.       ? JSON::protobuf(executorLaunchCommand.get())
    134.       : JSON::protobuf(executorInfo.command());
    135.  
    136.     launchFlags.sandbox = rootfs.isSome()
    137.       ? flags.sandbox_directory
    138.       : directory;
    139.  
    140.     // NOTE: If the executor shares the host filesystem, we should not
    141.     // allow them to 'cd' into an arbitrary directory because that'll
    142.     // create security issues.
    143.     if (rootfs.isNone() && workingDirectory.isSome()) {
    144.       LOG(WARNING) << "Ignore working directory '" << workingDirectory.get()
    145.                    << "' specified in container launch info for container "
    146.                    << containerId << " since the executor is using the "
    147.                    << "host filesystem";
    148.     } else {
    149.       launchFlags.working_directory = workingDirectory;
    150.     }
    151.  
    152.     launchFlags.pipe_read = pipes[0];
    153.     launchFlags.pipe_write = pipes[1];
    154.     launchFlags.commands = commands;
    155.  
    156.     // Fork the child using launcher.
    157.     vector<string> argv(2);
    158.     argv[0] = MESOS_CONTAINERIZER;
    159.     argv[1] = MesosContainerizerLaunch::NAME;
    160.  
    161.     Try<pid_t> forked = launcher->fork(
    162.         containerId,
    163.         path::join(flags.launcher_dir, MESOS_CONTAINERIZER),
    164.         argv,
    165.         Subprocess::FD(STDIN_FILENO),
    166.         (local ? Subprocess::FD(STDOUT_FILENO)
    167.                : Subprocess::IO(subprocessInfo.out)),
    168.         (local ? Subprocess::FD(STDERR_FILENO)
    169.                : Subprocess::IO(subprocessInfo.err)),
    170.         launchFlags,
    171.         environment,
    172.         None(),
    173.         namespaces); // 'namespaces' will be ignored by PosixLauncher.
    174.  
    175.     if (forked.isError()) {
    176.       return Failure("Failed to fork executor: " + forked.error());
    177.     }
    178.     pid_t pid = forked.get();
    179.  
    180.     // Checkpoint the executor's pid if requested.
    181.     if (checkpoint) {
    182.       const string& path = slave::paths::getForkedPidPath(
    183.           slave::paths::getMetaRootDir(flags.work_dir),
    184.           slaveId,
    185.           executorInfo.framework_id(),
    186.           executorInfo.executor_id(),
    187.           containerId);
    188.  
    189.       LOG(INFO) << "Checkpointing executor's forked pid " << pid
    190.                 << " to '" << path << "'";
    191.  
    192.       Try<Nothing> checkpointed =
    193.         slave::state::checkpoint(path, stringify(pid));
    194.  
    195.       if (checkpointed.isError()) {
    196.         LOG(ERROR) << "Failed to checkpoint executor's forked pid to '"
    197.                    << path << "': " << checkpointed.error();
    198.  
    199.         return Failure("Could not checkpoint executor's pid");
    200.       }
    201.     }
    202.  
    203.     // Monitor the executor's pid. We keep the future because we'll
    204.     // refer to it again during container destroy.
    205.     Future<Option<int>> status = process::reap(pid);
    206.     status.onAny(defer(self(), &Self::reaped, containerId));
    207.     containers_[containerId]->status = status;
    208.  
    209.     return isolate(containerId, pid)
    210.       .then(defer(self(),
    211.                   &Self::fetch,
    212.                   containerId,
    213.                   executorInfo.command(),
    214.                   directory,
    215.                   user,
    216.                   slaveId))
    217.       .then(defer(self(), &Self::exec, containerId, pipes[1]))
    218.       .onAny(lambda::bind(&os::close, pipes[0]))
    219.       .onAny(lambda::bind(&os::close, pipes[1]));
    220.   }));
    221. }

     

    最终运行的二进制文件为const char MESOS_CONTAINERIZER[] = "mesos-containerizer";

     

    Mesos-containerizer是一个独立运行的二进制文件,它的main函数在src/slave/containerizer/mesos/main.c

    1. int main(int argc, char** argv)
    2. {
    3.   return Subcommand::dispatch(
    4.       None(),
    5.       argc,
    6.       argv,
    7.       new MesosContainerizerLaunch(),
    8.       new MesosContainerizerMount());
    9. }

     

    Src/slave/containerizer/mesos/launch.cpp中MesosContainerizerLaunch::execute()函数最终调用

    1. if (command.get().shell()) {
    2.   // Execute the command using shell.
    3.   execlp("sh", "sh", "-c", command.get().value().c_str(), (char*) NULL);
    4. } else {
    5.   // Use os::execvpe to launch the command.
    6.   char** argv = new char*[command.get().arguments().size() + 1];
    7.   for (int i = 0; i < command.get().arguments().size(); i++) {
    8.     argv[i] = strdup(command.get().arguments(i).c_str());
    9.   }
    10.   argv[command.get().arguments().size()] = NULL;
    11.  
    12.   execvp(command.get().value().c_str(), argv);
    13. }

     

    来运行executor的二进制文件。

     

    如果是前面叙述的TestFramework,则运行的executor是TestExecutor,也就要求mesos-slave的相应目录下有这个二进制文件。

     

  • 相关阅读:
    恶意代码 第三章作业3
    openGauss使用指南
    Latex从入门到入门(不再更新,原因是博客园不支持latex语法,写的太累了)
    《网络对抗技术》Exp4 恶意代码分析
    恶意代码 第三章作业2
    《网络对抗技术》Exp5 信息搜集与漏洞扫描
    第三章作业数据查询
    实验一密码引擎商用密码算法实现1中遇到的问题
    buuctf学习笔记
    网页设计中的默认字体样式详解
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5724373.html
Copyright © 2011-2022 走看看