zoukankan      html  css  js  c++  java
  • Mesos源码分析(14): DockerContainerier运行一个Task

    DockerContainerizer的实现在文件src/slave/containerizer/docker.cpp

     

    1. Future<bool> DockerContainerizer::launch(
    2.     const ContainerID& containerId,
    3.     const ExecutorInfo& executorInfo,
    4.     const string& directory,
    5.     const Option<string>& user,
    6.     const SlaveID& slaveId,
    7.     const PID<Slave>& slavePid,
    8.     bool checkpoint)
    9. {
    10.   return dispatch(
    11.       process.get(),
    12.       &DockerContainerizerProcess::launch,
    13.       containerId,
    14.       None(),
    15.       executorInfo,
    16.       directory,
    17.       user,
    18.       slaveId,
    19.       slavePid,
    20.       checkpoint);
    21. }

     

    转而调用DockerContainerizerProcess::launch,无论是TaskInfo里面有ContainerInfo,还是ExecutorInfo里面有ContainerInfo,都由这个函数处理,只不过分支不同。

    1. Future<bool> DockerContainerizerProcess::launch(
    2.     const ContainerID& containerId,
    3.     const Option<TaskInfo>& taskInfo,
    4.     const ExecutorInfo& executorInfo,
    5.     const string& directory,
    6.     const Option<string>& user,
    7.     const SlaveID& slaveId,
    8.     const PID<Slave>& slavePid,
    9.     bool checkpoint)
    10. {
    11.   Option<ContainerInfo> containerInfo;
    12.  
    13.   if (taskInfo.isSome() && taskInfo.get().has_container()) {
    14.     containerInfo = taskInfo.get().container();
    15.   } else if (executorInfo.has_container()) {
    16.     containerInfo = executorInfo.container();
    17.   }
    18.  
    19.   if (containerInfo.isNone()) {
    20.     LOG(INFO) << "No container info found, skipping launch";
    21.     return false;
    22.   }
    23.  
    24.   if (containerInfo.get().type() != ContainerInfo::DOCKER) {
    25.     LOG(INFO) << "Skipping non-docker container";
    26.     return false;
    27.   }
    28.  
    29.   Try<Container*> container = Container::create(
    30.       containerId,
    31.       taskInfo,
    32.       executorInfo,
    33.       directory,
    34.       user,
    35.       slaveId,
    36.       slavePid,
    37.       checkpoint,
    38.       flags);
    39.  
    40.   if (container.isError()) {
    41.     return Failure("Failed to create container: " + container.error());
    42.   }
    43.  
    44.   containers_[containerId] = container.get();
    45.  
    46.   if (taskInfo.isSome()) {
    47.     LOG(INFO) << "Starting container '" << containerId
    48.               << "' for task '" << taskInfo.get().task_id()
    49.               << "' (and executor '" << executorInfo.executor_id()
    50.               << "') of framework '" << executorInfo.framework_id() << "'";
    51.   } else {
    52.     LOG(INFO) << "Starting container '" << containerId
    53.               << "' for executor '" << executorInfo.executor_id()
    54.               << "' and framework '" << executorInfo.framework_id() << "'";
    55.   }
    56.  
    57.   if (HookManager::hooksAvailable()) {
    58.     HookManager::slavePreLaunchDockerHook(
    59.         container.get()->container,
    60.         container.get()->command,
    61.         taskInfo,
    62.         executorInfo,
    63.         container.get()->name(),
    64.         container.get()->directory,
    65.         flags.sandbox_directory,
    66.         container.get()->resources,
    67.         container.get()->environment);
    68.   }
    69.  
    70.   if (taskInfo.isSome() && flags.docker_mesos_image.isNone()) {
    71.     // Launching task by forking a subprocess to run docker executor.
    72.     return container.get()->launch = fetch(containerId, slaveId)
    73.       .then(defer(self(), [=]() { return pull(containerId); }))
    74.       .then(defer(self(), [=]() {
    75.         return mountPersistentVolumes(containerId);
    76.       }))
    77.       .then(defer(self(), [=]() { return launchExecutorProcess(containerId); }))
    78.       .then(defer(self(), [=](pid_t pid) {
    79.         return reapExecutor(containerId, pid);
    80.       }));
    81.   }
    82.  
    83.   string containerName = container.get()->name();
    84.  
    85.   if (container.get()->executorName().isSome()) {
    86.     // Launch the container with the executor name as we expect the
    87.     // executor will launch the docker container.
    88.     containerName = container.get()->executorName().get();
    89.   }
    90.  
    91.   // Launching task or executor by launching a seperate docker
    92.   // container to run the executor.
    93.   // We need to do so for launching a task because as the slave
    94.   // is running in a container (via docker_mesos_image flag)
    95.   // we want the executor to keep running when the slave container
    96.   // dies.
    97.   return container.get()->launch = fetch(containerId, slaveId)
    98.     .then(defer(self(), [=]() { return pull(containerId); }))
    99.     .then(defer(self(), [=]() {
    100.       return mountPersistentVolumes(containerId);
    101.     }))
    102.     .then(defer(self(), [=]() {
    103.       return launchExecutorContainer(containerId, containerName);
    104.     }))
    105.     .then(defer(self(), [=](const Docker::Container& dockerContainer) {
    106.       return checkpointExecutor(containerId, dockerContainer);
    107.     }))
    108.     .then(defer(self(), [=](pid_t pid) {
    109.       return reapExecutor(containerId, pid);
    110.     }));
    111. }

     

    如果是TaskInfo里面的ContainerInfo,则调用launchExecutorProcess(containerId)。

    如果是ExecutorInfo里面的ContainerInfo,则调用launchExecutorContainer(containerId, containerName)。

     

    DockerContainerizerProcess::launchExecutorProcess实现如下:

    1. Future<pid_t> DockerContainerizerProcess::launchExecutorProcess(
    2.     const ContainerID& containerId)
    3. {
    4.   Container* container = containers_[containerId];
    5.   container->state = Container::RUNNING;
    6.  
    7.   // Prepare environment variables for the executor.
    8.   map<string, string> environment = executorEnvironment(
    9.       container->executor,
    10.       container->directory,
    11.       container->slaveId,
    12.       container->slavePid,
    13.       container->checkpoint,
    14.       flags,
    15.       false);
    16.  
    17.   // Include any enviroment variables from ExecutorInfo.
    18.   foreach (const Environment::Variable& variable,
    19.            container->executor.command().environment().variables()) {
    20.     environment[variable.name()] = variable.value();
    21.   }
    22.  
    23.   // Pass GLOG flag to the executor.
    24.   const Option<string> glog = os::getenv("GLOG_v");
    25.   if (glog.isSome()) {
    26.     environment["GLOG_v"] = glog.get();
    27.   }
    28.  
    29.   vector<string> argv;
    30.   argv.push_back("mesos-docker-executor");
    31.  
    32.   return logger->prepare(container->executor, container->directory)
    33.     .then(defer(
    34.         self(),
    35.         [=](const ContainerLogger::SubprocessInfo& subprocessInfo)
    36.           -> Future<pid_t> {
    37.     // If we are on systemd, then extend the life of the executor. Any
    38.     // grandchildren's lives will also be extended.
    39.     std::vector<Subprocess::Hook> parentHooks;
    40.     // Construct the mesos-docker-executor using the "name" we gave the
    41.     // container (to distinguish it from Docker containers not created
    42.     // by Mesos).
    43.     Try<Subprocess> s = subprocess(
    44.         path::join(flags.launcher_dir, "mesos-docker-executor"),
    45.         argv,
    46.         Subprocess::PIPE(),
    47.         subprocessInfo.out,
    48.         subprocessInfo.err,
    49.         dockerFlags(flags, container->name(), container->directory),
    50.         environment,
    51.         lambda::bind(&setup, container->directory),
    52.         None(),
    53.         parentHooks);
    54.  
    55.     if (s.isError()) {
    56.       return Failure("Failed to fork executor: " + s.error());
    57.     }
    58.  
    59.     // Checkpoint the executor's pid (if necessary).
    60.     Try<Nothing> checkpointed = checkpoint(containerId, s.get().pid());
    61.  
    62.     if (checkpointed.isError()) {
    63.       return Failure(
    64.           "Failed to checkpoint executor's pid: " + checkpointed.error());
    65.     }
    66.  
    67.     // Checkpoing complete, now synchronize with the process so that it
    68.     // can continue to execute.
    69.     CHECK_SOME(s.get().in());
    70.     char c;
    71.     ssize_t length;
    72.     while ((length = write(s.get().in().get(), &c, sizeof(c))) == -1 &&
    73.            errno == EINTR);
    74.  
    75.     if (length != sizeof(c)) {
    76.       return Failure("Failed to synchronize with child process: " +
    77.                      os::strerror(errno));
    78.     }
    79.  
    80.     return s.get().pid();
    81.   }));
    82. }

     

    这个函数最终运行一个名为mesos-docker-executor的子进程,这是一个独立的二进制进程。这也是大多数使用mesos运行Docker的方式。

    1. [root@a061f582-9be2-45a8-bda5-2280926f825c ~]# ps aux | grep mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 | grep -v grep
    2. root 13538 0.2 0.1 802432 18120 ? Ssl Jul27 16:03 mesos-docker-executor --container=mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 --docker=docker --docker_socket=/var/run/docker.sock --help=false --launcher_dir=/usr/libexec/mesos --mapped_directory=/mnt/mesos/sandbox --sandbox_directory=/tmp/mesos/slaves/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2/frameworks/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-0000/executors/linkerdcos_cluster_mongodb.a50b6205-53a3-11e6-a67a-024214d517fa/runs/838a1dc0-cc13-4bd0-9380-77809f95ad04 --stop_timeout=0ns
    3. root 13548 0.0 0.0 126860 14916 ? Sl Jul27 0:00 docker -H unix:///var/run/docker.sock run --privileged --cpu-shares 102 --memory 536870912 -e MARATHON_APP_VERSION=2016-07-06T10:44:54.554Z -e HOST=10.25.161.248 -e MARATHON_APP_RESOURCE_CPUS=0.1 -e MONGODB_NODES=10.25.161.248 -e MARATHON_APP_DOCKER_IMAGE=linkerrepository/linkerdcos_mongodb_repl:1.0.1 -e PORT_10001=31166 -e MESOS_TASK_ID=linkerdcos_cluster_mongodb.a50b6205-53a3-11e6-a67a-024214d517fa -e PORT=31166 -e MARATHON_APP_RESOURCE_MEM=512.0 -e ENNAME=eth0 -e PORTS=31166 -e MARATHON_APP_RESOURCE_DISK=0.0 -e MARATHON_APP_LABELS= -e MARATHON_APP_ID=/linkerdcos/cluster/mongodb -e PORT0=31166 -e MESOS_SANDBOX=/mnt/mesos/sandbox -e MESOS_CONTAINER_NAME=mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 -v /opt:/data:rw -v /tmp/mesos/slaves/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2/frameworks/13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-0000/executors/linkerdcos_cluster_mongodb.a50b6205-53a3-11e6-a67a-024214d517fa/runs/838a1dc0-cc13-4bd0-9380-77809f95ad04:/mnt/mesos/sandbox --net host --name mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04 linkerrepository/linkerdcos_mongodb_repl:1.0.1
    4. [root@a061f582-9be2-45a8-bda5-2280926f825c ~]# docker ps | grep mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04
    5. 43a45be25f37 linkerrepository/linkerdcos_mongodb_repl:1.0.1 "/scripts/run.sh " 5 days ago Up 5 days mesos-13beba1f-dcdf-4e6c-b88c-8fcdf56559bd-S2.838a1dc0-cc13-4bd0-9380-77809f95ad04

     

    DockerContainerizerProcess::launchExecutorContainer实现如下:

    1. Future<Docker::Container> DockerContainerizerProcess::launchExecutorContainer(
    2.     const ContainerID& containerId,
    3.     const string& containerName)
    4. {
    5.   if (!containers_.contains(containerId)) {
    6.     return Failure("Container is already destroyed");
    7.   }
    8.  
    9.   Container* container = containers_[containerId];
    10.   container->state = Container::RUNNING;
    11.  
    12.   return logger->prepare(container->executor, container->directory)
    13.     .then(defer(
    14.         self(),
    15.         [=](const ContainerLogger::SubprocessInfo& subprocessInfo)
    16.           -> Future<Docker::Container> {
    17.     // Start the executor in a Docker container.
    18.     // This executor could either be a custom executor specified by an
    19.     // ExecutorInfo, or the docker executor.
    20.     Future<Nothing> run = docker->run(
    21.         container->container,
    22.         container->command,
    23.         containerName,
    24.         container->directory,
    25.         flags.sandbox_directory,
    26.         container->resources,
    27.         container->environment,
    28.         subprocessInfo.out,
    29.         subprocessInfo.err);
    30.  
    31.     Owned<Promise<Docker::Container>> promise(new Promise<Docker::Container>());
    32.     // We like to propogate the run failure when run fails so slave can
    33.     // send this failure back to the scheduler. Otherwise we return
    34.     // inspect's result or its failure, which should not fail when
    35.     // the container isn't launched.
    36.     Future<Docker::Container> inspect =
    37.       docker->inspect(containerName, slave::DOCKER_INSPECT_DELAY)
    38.         .onAny([=](Future<Docker::Container> f) {
    39.             // We cannot associate the promise outside of the callback
    40.             // because we like to propagate run's failure when
    41.             // available.
    42.             promise->associate(f);
    43.         });
    44.  
    45.     run.onFailed([=](const string& failure) mutable {
    46.       inspect.discard();
    47.       promise->fail(failure);
    48.     });
    49.  
    50.     return promise->future();
    51.   }));
    52. }

     

    运行一个Docker,在Docker里面运行Executor

  • 相关阅读:
    foxmail邮箱在代理环境下不能使用解决方法。
    Win7下IE8无法打开https类型的网站解决方法笔记
    重新注册IE组件
    Web开发者的六个代码调试平台
    仿Material UI框架的动画特效
    JS几种数组遍历方式以及性能分析对比
    js 函数提升和变量提升
    彻底掌握this,call,apply
    深入理解requestAnimationFrame
    基于iscroll.js实现下拉刷新和上拉加载特效
  • 原文地址:https://www.cnblogs.com/popsuper1982/p/5724379.html
Copyright © 2011-2022 走看看