mesos-docker-executor的运行代码在src/docker/executor.cpp中
int main(int argc, char** argv)
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
mesos::internal::docker::Flags flags;
// Load flags from environment and command line.
Try<Nothing> load = flags.load(None(), &argc, &argv);
if (load.isError()) {
cerr << flags.usage(load.error()) << endl;
return EXIT_FAILURE;
}
std::cout << stringify(flags) << std::endl;
mesos::internal::logging::initialize(argv[0], flags, true); // Catch signals.
if (flags.help) {
cout << flags.usage() << endl;
return EXIT_SUCCESS;
}
std::cout << stringify(flags) << std::endl;
if (flags.docker.isNone()) {
cerr << flags.usage("Missing required option --docker") << endl;
return EXIT_FAILURE;
}
if (flags.container.isNone()) {
cerr << flags.usage("Missing required option --container") << endl;
return EXIT_FAILURE;
}
if (flags.sandbox_directory.isNone()) {
cerr << flags.usage("Missing required option --sandbox_directory") << endl;
return EXIT_FAILURE;
}
if (flags.mapped_directory.isNone()) {
cerr << flags.usage("Missing required option --mapped_directory") << endl;
return EXIT_FAILURE;
}
if (flags.stop_timeout.isNone()) {
cerr << flags.usage("Missing required option --stop_timeout") << endl;
return EXIT_FAILURE;
}
if (flags.launcher_dir.isNone()) {
cerr << flags.usage("Missing required option --launcher_dir") << endl;
return EXIT_FAILURE;
}
// The 2nd argument for docker create is set to false so we skip
// validation when creating a docker abstraction, as the slave
// should have already validated docker.
Try<Owned<Docker>> docker = Docker::create(
flags.docker.get(),
flags.docker_socket.get(),
false);
if (docker.isError()) {
cerr << "Unable to create docker abstraction: " << docker.error() << endl;
return EXIT_FAILURE;
}
mesos::internal::docker::DockerExecutor executor(
docker.get(),
flags.container.get(),
flags.sandbox_directory.get(),
flags.mapped_directory.get(),
flags.stop_timeout.get(),
flags.launcher_dir.get());
mesos::MesosExecutorDriver driver(&executor);
return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
}
|
如上一篇文章对MesosExecutorDriver的分析,Mesos-slave给Executor发送message运行Task,会调用DockerExecutor的launchTask函数。
virtual
void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
dispatch(process.get(), &DockerExecutorProcess::launchTask, driver, task);
}
|
最后调用DockerExecutorProcess的launchTask函数。
void launchTask(ExecutorDriver* driver, const TaskInfo& task)
{
if (run.isSome()) {
TaskStatus status;
status.mutable_task_id()->CopyFrom(task.task_id());
status.set_state(TASK_FAILED);
status.set_message(
"Attempted to run multiple tasks using a "docker" executor");
driver->sendStatusUpdate(status);
return;
}
// Capture the TaskID.
taskId = task.task_id();
cout << "Starting task " << taskId.get() << endl;
CHECK(task.has_container());
CHECK(task.has_command());
CHECK(task.container().type() == ContainerInfo::DOCKER);
// We're adding task and executor resources to launch docker since
// the DockerContainerizer updates the container cgroup limits
// directly and it expects it to be the sum of both task and
// executor resources. This does leave to a bit of unaccounted
// resources for running this executor, but we are assuming
// this is just a very small amount of overcommit.
run = docker->run(
task.container(),
task.command(),
containerName,
sandboxDirectory,
mappedDirectory,
task.resources() + task.executor().resources(),
None(),
Subprocess::FD(STDOUT_FILENO),
Subprocess::FD(STDERR_FILENO));
run->onAny(defer(self(), &Self::reaped, driver, lambda::_1));
// Delay sending TASK_RUNNING status update until we receive
// inspect output.
inspect = docker->inspect(containerName, DOCKER_INSPECT_DELAY)
.then(defer(self(), [=](const Docker::Container& container) {
if (!killed) {
TaskStatus status;
status.mutable_task_id()->CopyFrom(taskId.get());
status.set_state(TASK_RUNNING);
status.set_data(container.output);
if (container.ipAddress.isSome()) {
// TODO(karya): Deprecated -- Remove after 0.25.0 has shipped.
Label* label = status.mutable_labels()->add_labels();
label->set_key("Docker.NetworkSettings.IPAddress");
label->set_value(container.ipAddress.get());
NetworkInfo* networkInfo =
status.mutable_container_status()->add_network_infos();
// TODO(CD): Deprecated -- Remove after 0.27.0.
networkInfo->set_ip_address(container.ipAddress.get());
NetworkInfo::IPAddress* ipAddress =
networkInfo->add_ip_addresses();
ipAddress->set_ip_address(container.ipAddress.get());
}
driver->sendStatusUpdate(status);
}
return Nothing();
}));
inspect.onReady(
defer(self(), &Self::launchHealthCheck, containerName, task));
}
|
调用Docker函数启容器。