zoukankan      html  css  js  c++  java
  • [源码解析] 深度学习分布式训练框架 horovod (13) --- 弹性训练之 Driver

    [源码解析] 深度学习分布式训练框架 horovod (13) --- 弹性训练之 Driver

    0x00 摘要

    Horovod 是Uber于2017年发布的一个易于使用的高性能的分布式训练框架,在业界得到了广泛应用。

    本系列将通过源码分析来带领大家了解 Horovod。本文是系列第十三篇,看看 horovod 弹性实现中 的 Driver 角色。

    本部分对应架构图中的 Driver main 部分,因为这部分和 Host discovery 强相关,所以一起展示出来。因为弹性训练的主体是 Driver,所以本文是把 调用代码 和 Driver 一起分析。

    本系列其他文章链接如下:

    [源码解析] 深度学习分布式训练框架 Horovod (1) --- 基础知识

    [源码解析] 深度学习分布式训练框架 horovod (2) --- 从使用者角度切入

    [源码解析] 深度学习分布式训练框架 horovod (3) --- Horovodrun背后做了什么

    [源码解析] 深度学习分布式训练框架 horovod (4) --- 网络基础 & Driver

    [源码解析] 深度学习分布式训练框架 horovod (5) --- 融合框架

    [源码解析] 深度学习分布式训练框架 horovod (6) --- 后台线程架构

    [源码解析] 深度学习分布式训练框架 horovod (7) --- DistributedOptimizer

    [源码解析] 深度学习分布式训练框架 horovod (8) --- on spark

    [源码解析] 深度学习分布式训练框架 horovod (9) --- 启动 on spark

    [源码解析] 深度学习分布式训练框架 horovod (10) --- run on spark

    [源码解析] 深度学习分布式训练框架 horovod (11) --- on spark --- GLOO 方案

    [源码解析] 深度学习分布式训练框架 horovod (12) --- 弹性训练总体架构

    0x01 角色

    我们首先要回忆一下弹性训练中的角色设定。

    1.1 角色设定

    Horovod 的弹性训练包含两个角色,driver 进程和 worker 进程。driver 进程运行在 CPU 节点上,worker 进程可运行在 CPU 或者 GPU 节点上

    这两个角色和 Spark 的 Driver -- Executor 依然很类似。Driver 进程就可以认为是 Spark 的 Driver,或者说是 master 节点。Worker 就类似于 Spark 的 Executor。

    具体如图:

                     +------------------------------+
                     |                              |
                     |            Driver            |
                     |                              |
                     |                              |
                     +-----+-------+--------+-------+
                           ^       ^        ^
                           |       |        |
                           |       |        |
             +-------------+       |        +--------------+
             |                     |                       |
             |                     |                       |
             |                     |                       |
             v                     v                       v
    +--------+----+        +-------+------+           +----+--------+
    |  Worker     |        |  Worker      |           |  Worker     |
    |             |        |              |           |             |
    |      host1  |        |      host2   |           |     host3   |
    +-------------+        +--------------+           +-------------+
    

    1.2 职责

    角色的职责如下:

    master(控制节点)职责:

    • 负责实时检测现有 worker(工作节点)是否有变化,掉线情况;
    • 负责通过脚本来实时监控 host 是否有变化;
    • 负责分配任务到存活的worker(工作节点);
    • 在有AllReduce 调用失败导致进程失败的情况下,master 通过 blacklist 机制 组织剩下的活着的进程构造一个新的环。
    • 如果有新 host 加入,则在新host之上生成新的 worker,新 worker 和 旧 worker 一起构造成一个新的通信环。

    worker(工作节点)职责:

    • 负责汇报(其实是被动的,没有主动机制)自己的状态(就是训练完成情况);

    • 负责在该worker(工作节点)负责的数据上执行训练。

    0x02 调用部分

    我们首先分析调用部分,弹性调用具体是从普适到特殊,一点点深入。

    2.1 _run

    前文介绍了 horovod 程序的入口是 _run 函数。可以看到,会依据是否是弹性训练来选择不同的路径。我们本文开始介绍 _run_elastic。

    def _run(args):
        # if hosts are not specified, either parse from hostfile, or default as
        # localhost
        if not args.hosts and not args.host_discovery_script:
            if args.hostfile:
                args.hosts = hosts.parse_host_files(args.hostfile)
            else:
                # Set hosts to localhost if not specified
                args.hosts = 'localhost:{np}'.format(np=args.np)
    
        # Convert nics into set
        args.nics = set(args.nics.split(',')) if args.nics else None
    
        if _is_elastic(args):
            return _run_elastic(args) # 本文在这里
        else:
            return _run_static(args)
    

    2.2 _run_elastic

    此部分逻辑如下:

    • 首先,如果参数配置了“获取参数的脚本”,则调用 discovery.HostDiscoveryScript 得到一个object(目前只是一个object,未来在构建 ElasticDriver 的时候会获取 host 信息)。否则就直接读取固定 host 配置;
    • 其次,使用 host 配置以及其他信息来配置 ElasticSettings;
    • 最后,调用 gloo_run_elastic 来进行弹性训练;

    代码如下:

    def _run_elastic(args):
        # construct host discovery component
        if args.host_discovery_script:
            discover_hosts = discovery.HostDiscoveryScript(args.host_discovery_script, args.slots)
        elif args.hosts:
            _, available_host_slots = hosts.parse_hosts_and_slots(args.hosts)
            discover_hosts = discovery.FixedHosts(available_host_slots)
        ......
    
        # horovodrun has to finish all the checks before this timeout runs out.
        settings = elastic_settings.ElasticSettings(discovery=discover_hosts,
                                                    min_np=args.min_np or args.np,
                                                    max_np=args.max_np,
                                                    elastic_timeout=args.elastic_timeout,
                                                    reset_limit=args.reset_limit,
                                                    num_proc=args.np,
                                                    verbose=2 if args.verbose else 0,
                                                    ssh_port=args.ssh_port,
                                                    ssh_identity_file=args.ssh_identity_file,
                                                    extra_mpi_args=args.mpi_args,
                                                    key=secret.make_secret_key(),
                                                    start_timeout=tmout,
                                                    output_filename=args.output_filename,
                                                    run_func_mode=args.run_func is not None,
                                                    nics=args.nics,...)
    
        env = os.environ.copy()
        config_parser.set_env_from_args(env, args)
        gloo_run_elastic(settings, env, args.command)
    

    2.3 gloo_run_elastic

    这部分开始,就是架构图中第一部分:

    主要做了如下:

    • 定义了 get_common_interfaces,这是一个可以获取网络路由信息以及host能力的函数;需要注意的一点是:会等待所需的最小节点数目,然后才会开始获取网络路由。
    • _exec_command_fn 我们在之前介绍过,就是提供了一种运行命令的能力,或者说是运行环境;
    • 建立了一个 RendezvousServer,用来保存各种host信息;
    • 使用以上这些参数和 command 参数来运行 launch_gloo_elastic,command参数就是类似 python train.py
    def gloo_run_elastic(settings, env, command):
    
        def get_common_interfaces(driver):
            # Host-to-host common interface detection requires at least 2 hosts in an elastic job.
            min_hosts = _get_min_start_hosts(settings)
            current_hosts = driver.wait_for_available_slots(settings.num_proc, min_hosts=min_hosts)
            return driver_service.get_common_interfaces(settings, current_hosts.host_assignment_order)
    
        exec_command = _exec_command_fn(settings)
        rendezvous = RendezvousServer(settings.verbose)
        launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous)
    

    2.4 get_common_interfaces

    get_common_interfaces 可以获取网络路由信息以及host。

    • 如果配置了远端host,则会分布式执行,在各个 host 之上执行,获取每个 host 的网卡和路由信息。
    • 否则就获取本地网卡等信息。

    具体函数在:runner/driver/driver_service.py

    def get_common_interfaces(settings, all_host_names, remote_host_names=None, fn_cache=None):
        if remote_host_names is None:
            remote_host_names = network.filter_local_addresses(all_host_names)
    
        if len(remote_host_names) > 0:
            if settings.nics:
                # If args.nics is provided, we will use those interfaces. All the workers
                # must have at least one of those interfaces available.
                nics = settings.nics
            else:
                # Find the set of common, routed interfaces on all the hosts (remote
                # and local) and specify it in the args to be used by NCCL. It is
                # expected that the following function will find at least one interface
                # otherwise, it will raise an exception.
                local_host_names = set(all_host_names) - set(remote_host_names)
                nics = _driver_fn(all_host_names, local_host_names, settings, fn_cache=fn_cache)
    
        else:
            nics = get_local_interfaces(settings)
        return nics
    

    get_local_interfaces 是获取本地 host 的网卡信息。

    def get_local_interfaces(settings):
        if settings.verbose >= 2:
            print('All hosts are local, finding the interfaces '
                  'with address 127.0.0.1')
        # If all the given hosts are local, find the interfaces with address
        # 127.0.0.1
        nics = set()
        for iface, addrs in net_if_addrs().items():
            if settings.nics and iface not in settings.nics:
                continue
            for addr in addrs:
                if addr.family == AF_INET and addr.address == '127.0.0.1':
                    nics.add(iface)
                    break
    
        return nics
    

    2.5 获取异地网卡信息

    此处信息在前文中已经讲述,我们精简如下:

    _driver_fn 的作用是分布式执行 探寻函数,作用是:

    • 启动 service 服务;
    • 使用 driver.addresses() 获取 Driver 服务的地址(使用self._addresses = self._get_local_addresses()完成);
    • 使用 _launch_task_servers(利用 Driver 服务的地址)在每个 worker 之中启动 task 服务,然后 task 服务会在 service 服务中注册;
    • 因为是一个环形,每个 worker 会探测 worker index + 1 的所有网络接口;
    • 最后 _run_probe 返回一个所有 workers 上的所有路由接口的交集;
    @cache.use_cache()
    def _driver_fn(all_host_names, local_host_names, settings):
        """
        launches the service service, launches the task service on each worker and
        have them register with the service service. Each worker probes all the
        interfaces of the worker index + 1 (in a ring manner) and only keeps the
        routed interfaces. Function returns the intersection of the set of all the
        routed interfaces on all the workers.
        :param all_host_names: list of addresses. for example,
            ['worker-0','worker-1']
            ['10.11.11.11', '10.11.11.12']
        :type all_host_names: list(string)
        :param local_host_names: host names that resolve into a local addresses.
        :type local_host_names: set
        :param settings: the object that contains the setting for running horovod
        :type settings: horovod.runner.common.util.settings.Settings
        :return: example: ['eth0', 'eth1']
        :rtype: list[string]
        """
        # Launch a TCP server called service service on the host running horovod
        num_hosts = len(all_host_names)
        driver = HorovodRunDriverService(num_hosts, settings.key, settings.nics)
    
        # Have all the workers register themselves with the service service.
        _launch_task_servers(all_host_names, local_host_names,
                             driver.addresses(), settings)
        try:
            return _run_probe(driver, settings, num_hosts)
        finally:
            driver.shutdown()
    

    2.6 launch_gloo_elastic

    到了这里,才是正式调用起 gloo 弹性系统,就是生成 Driver 相关部分 & 建立 弹性训练的 worker。

    运行之中,只有一个 RendezvousServer,launch_gloo_elastic 也只运行一次。

    逻辑如下:

    • 如果需要配置输出文件,则创建;
    • 使用"发现脚本"等作为参数 建立 ElasticDriver;
    • 使用 create_rendezvous_handler 作为 handler 来启动 RendezvousServer;
    • 使用 driver.wait_for_available_slots 来等待所需的最小数目 slots;
    • 如果等到了,就调用 get_common_interfaces 获取网络路由等,从而得到 server ip;
    • 注册 shutdown event;
    • 利用 get_run_command 得到运行的命令;
    • 利用 _create_elastic_worker_fn 建立 弹性训练的 worker;

    简单代码如下:

    def launch_gloo_elastic(command, exec_command, settings, env, get_common_interfaces, rendezvous):
        # Make the output directory if it does not exist
        if settings.output_filename:
            _mkdir_p(settings.output_filename)
    
        # 使用"发现脚本"等作为参数 建立 ElasticDriver
        driver = ElasticDriver(rendezvous, settings.discovery,
                               settings.min_np, settings.max_np,
                               timeout=settings.elastic_timeout,
                               reset_limit=settings.reset_limit,
                               verbose=settings.verbose)
    
        handler = create_rendezvous_handler(driver)
        global_rendezv_port = rendezvous.start(handler) # 启动 RendezvousServer
        driver.wait_for_available_slots(settings.num_proc)
    
        nics = get_common_interfaces(driver) # 获取网络路由等
        server_ip = network.get_driver_ip(nics)
    
        event = register_shutdown_event()
        run_command = get_run_command(command, server_ip, nics, global_rendezv_port, elastic=True)
    
        # 建立 弹性训练的 worker
        create_worker = _create_elastic_worker_fn(exec_command, run_command, env, event)
    
        driver.start(settings.num_proc, create_worker)
        res = driver.get_results()
        driver.stop()
    
        for name, value in sorted(res.worker_results.items(), key=lambda item: item[1][1]):
            exit_code, timestamp = value
    

    这里很复杂,我们需要逐一分析。

    首先,我们看看 get_run_command,这个在前文spark gloo之中介绍过,这里再说一下。

    它会调用 create_run_env_vars 得到gloo需要信息,并据此构建 run_command,其格式如下:

    HOROVOD_GLOO_RENDEZVOUS_ADDR=1.1.1.1 HOROVOD_GLOO_RENDEZVOUS_PORT=2222 HOROVOD_CPU_OPERATIONS=gloo HOROVOD_GLOO_IFACE=lo HOROVOD_CONTROLLER=gloo python 
    

    可以看到,elastic 和 spark gloo 版本很类似,都是使用 RendezvousServer 来完成一些master的控制功能。

    其次,我们看看Driver主体。

    0x03 Driver Main

    3.1 ElasticDriver

    定义如下,基本成员是:

    • _rendezvous :driver 会根据当前正在运行的节点重新执行一个 RendezvousServer,这个 rendezvous 会存储每个 worker 的地址和给其在逻辑通信环分配的序号 rank;

    • _host_manager :HostManager 负责发现,管理各种 host;

    • _worker_registry :WorkerStateRegistry

    • _discovery_thread :负责后台定期探寻 host,具体会调用 _host_manager 完成功能;

    • _worker_clients :WorkerNotificationClient,每一个 worker 对应一个;

    • _host_assignments :host 分配信息;

    • _rank_assignments :rank 分配信息。rank 可以认为是代表分布式任务里的一个执行训练的进程。Rank 0 在Horovod中通常具有特殊的意义:它是负责此同步的设备。

    • _world_size :进程总数量,会等到所有world_size个进程就绪之后才会开始训练;

    • _wait_hosts_cond :类型是 threading.Condition,目的是等待 训练所需最小的 host 数目;

    具体定义如下:

    class ElasticDriver(object):
        def __init__(self, rendezvous, discovery, min_np, max_np, timeout=None, reset_limit=None, verbose=0):
            self._rendezvous = rendezvous
            self._host_manager = HostManager(discovery)
    
            self._host_assignments = {}
            self._rank_assignments = {}
            self._world_size = 0
    
            self._wait_hosts_cond = threading.Condition()
    
            self._create_worker_fn = None
            self._worker_clients = {}
    
            self._worker_registry = WorkerStateRegistry(self, self._host_manager, reset_limit=reset_limit)
            self._results = ResultsRecorder()
            self._shutdown = threading.Event()
    
            self._discovery_thread = threading.Thread(target=self._discover_hosts)
            self._discovery_thread.daemon = True
            self._discovery_thread.start()
    

    创建Driver之后,接下来的主要动作是:

    • 等待最小数目 host。
    • 配置 worker。
    • 启动 driver,其内部会启动worker。
    • Driver 等待 worker 的运行结果。

    我们逐步分析。

    3.2 等待最小数目 host

    启动之后,会调用 driver.wait_for_available_slots(settings.num_proc) 等待最小数目host。

    可以看到,这里就是无限循环等待,如果 avail_slots >= min_np and avail_hosts >= min_hosts 才会返回。其实,就是看 self._host_manager.current_hosts 的数目是否已经达到了 所需最小的 host 数目,而且 slot 也达到了所需最小数目。

    def wait_for_available_slots(self, min_np, min_hosts=1):
        tmout = timeout.Timeout(self._timeout,  message='')
        self._wait_hosts_cond.acquire()
    
        try:
            while True: # 无限循环等待
                current_hosts = self._host_manager.current_hosts
                
                avail_slots = current_hosts.count_available_slots()
                avail_hosts = len(current_hosts.available_hosts)
                
                if avail_slots >= min_np and avail_hosts >= min_hosts:
                    return current_hosts
                if self._shutdown.is_set():
                    raise RuntimeError('Job has been shutdown, see above error messages for details.')
                self._wait_hosts_cond.wait(tmout.remaining())
                tmout.check_time_out_for('minimum number of slots to become available')
        finally:
            self._wait_hosts_cond.release()
    

    逻辑如下:

          launch_gloo_elastic
                   +
                   |
                   |
                   |
                   |                                +----------------+
                   v                                |   HostManager  |
    +--------------+------------------+   wait      |                |
    | driver.wait_for_available_slots | +---------> |                |
    +---------------------------------+             |  current_hosts |
                                                    +----------------+
    

    3.3 配置 worker

    配置过程 是由 _create_elastic_worker_fn 完成。

    _create_elastic_worker_fn 分为两部分:

    • _slot_info_to_command_fn 会建立 slot_info_to_command,套路和之前文章中类似,就是把各种环境变量和运行命令 run_command 糅合起来,得到一个可以在 “某个 host and slot” 之上运行的命令文本;
    • 返回 create_worker。
      • create_worker 是利用 exec_command 和 命令文本 构建的函数。
      • exec_command 我们在之前介绍过,就是提供了一种运行命令的能力,或者说是运行环境;
      • 所以 create_worker 就是提供一个在某个环境下运行某个命令的能力;

    这几个概念关系具体如下:

    3.4 启动 driver

    driver.start(settings.num_proc, create_worker)
    

    具体启动经历了以下几个步骤。

    3.4.1 start

    def start(self, np, create_worker_fn):
        self._create_worker_fn = create_worker_fn
        self._activate_workers(np)
    

    3.4.2 _activate_workers

    ElasticDriver 的 resume / start 函数会调用到 _activate_workers,其定义如下,可以看到,如果此时 discovery 脚本已经发现了新节点,进而返回了 pending_slotspending_slots 就是可以在这些 slot 之上启动新 worker 的,于是 就会 调用 _start_worker_processes

    def _activate_workers(self, min_np):
        current_hosts = self.wait_for_available_slots(min_np)
        pending_slots = self._update_host_assignments(current_hosts)
        self._worker_registry.reset(self.world_size())
        self._start_worker_processes(pending_slots)
    

    3.4.3 _start_worker_processes

    启动之后,在一个线程中通过run_worker启动worker,然后使用 self._results.expect(thread) 向 ResultsRecorder 放入 worker 线程。这是等待结果的关键。

    def _start_worker_processes(self, pending_slots):
        for slot_info in pending_slots:
            self._start_worker_process(slot_info)
    
    def _start_worker_process(self, slot_info):
        create_worker_fn = self._create_worker_fn
        shutdown_event = self._shutdown
        host_event = self._host_manager.get_host_event(slot_info.hostname)
    
        def run_worker():
            res = create_worker_fn(slot_info, [shutdown_event, host_event])
            exit_code, timestamp = res
            self._handle_worker_exit(slot_info, exit_code, timestamp)
    
        thread = threading.Thread(target=run_worker) # 启动训练线程
        thread.daemon = True
        thread.start()
        self._results.expect(thread) # 等待运行结果
    

    3.5 等待运行结果

    Driver 使用 如下得到结果。

    def get_results(self):
        return self._results.get_results()
    

    _results是 ResultsRecorder 类型,所以我们需要看看其实现。

    3.5.1 ResultsRecorder

    几个功能如下:

    • expect 等待 thread:采用 expect 来 self._worker_threads.put(worker_thread),这样就知道应该等待哪些 thread。
    • add_result 添加结果:_handle_worker_exit 会 在 record 之后,调用 self._results.add_result(name, (exit_code, timestamp)) 纪录结果;
    • get_results 获取结果:driver 就是调用此函数,获取结果,利用了 join。
    class ResultsRecorder(object):
        def __init__(self):
            self._error_message = None
            self._worker_results = {}
            self._worker_threads = queue.Queue()
    
        def expect(self, worker_thread):
            self._worker_threads.put(worker_thread)
    
        def add_result(self, key, value):
            if key in self._worker_results:
                return
            self._worker_results[key] = value
    
        def get_results(self):
            while not self._worker_threads.empty():
                worker_thread = self._worker_threads.get()
                worker_thread.join()
            return Results(self._error_message, self._worker_results)
    

    3.5.2 worker 结束

    Driver 使用 _handle_worker_exit 来等待具体 worker结束。根据 worker 的返回来决定如何处理。

    _handle_worker_exit 是运行在 worker thread 之中,运行时候,会通过self._results.add_result 往 ResultsRecorder 注册信息。

    def _handle_worker_exit(self, slot_info, exit_code, timestamp):
        if not self.has_rank_assignment(slot_info.hostname, slot_info.local_rank):
            # Ignore hosts that are not assigned a rank
            return
    
        if exit_code == 0: # 顺利完成记录
            rendezvous_id = self._worker_registry.record_success(slot_info.hostname, slot_info.local_rank)
        else: # 否则记录失败
            rendezvous_id = self._worker_registry.record_failure(slot_info.hostname, slot_info.local_rank)
    
        if self.finished() and self._worker_registry.last_rendezvous() == rendezvous_id:
            name = '{}[{}]'.format(slot_info.hostname, slot_info.local_rank)
            self._results.add_result(name, (exit_code, timestamp)) # 往ResultsRecorder注册信息
    

    具体如下:

    +-----------------------------+
    | ElasticDriver               |
    |                             |
    |        start                |
    |          +                  |
    |          | 1                |
    |          |                  |
    |          v                  |
    |  _activate_workers          |
    |          +                  |                 +-------------------------+
    |          |                  |                 | Thread                  |
    |          | 2                |                 |                         |
    |          v                  |                 |        run_worker       |
    | _start_worker_processes     |                 |            +            |
    |          +                  |                 |            |            |
    |          |                  |                 |            | 7          |
    |          | 3                |                 |            v            |
    |          v                  |                 |     create_worker_fn    |
    |  _start_worker_process      |                 |            +            |
    |          +                  |                 |            |            |
    |          |                  |                 |            | 8          |
    |          | 4                |                 |            v            |  results.add_result
    |          v                  | thread.start()  |   _handle_worker_exit +---------------------------------+
    |      run_worker +---------------------------> |                         |                9              |
    |          +                  |      5          +-------------------------+                               |
    |          |                  |                                                                           |
    |          |                  |                 +------------------------------------------------------+  |
    |          v                  | expect(thread)  | ResultsRecorder                                      |  |
    |     self._results  +------------------------> |                                                      |  |
    |                             |      6          | _worker_results = [thread]                           |  |
    |                             |                 |                                                      |  |
    |                             |                 | _worker_threads = [name : (exit_code, timestamp)] <-----+
    +-----------------------------+                 |                                                      |
                                                    +------------------------------------------------------+
    

    或者手机如下:

    我们接下来会具体看看弹性训练的其他部分。

    因为 Driver 是弹性训练主要框架,所以不可避免的在其他文章中也会出现 本文部分文字,敬请谅解。

    0xEE 个人信息

    ★★★★★★关于生活和技术的思考★★★★★★

    微信公众账号:罗西的思考

    如果您想及时得到个人撰写文章的消息推送,或者想看看个人推荐的技术资料,敬请关注。

    在这里插入图片描述

    0xFF 参考

    ElasticDL调用 Horovod 在Kubernetes上实现弹性 AllReduce(一)

    kubernetes 培训_在Kubernetes上使用horovod进行分布式深度学习培训

    在 Kubernetes 上弹性深度学习训练利器 -- Elastic Training Operator

    ElasticHorovod - 弹性、容错的分布式训练 (尝鲜版)

    Horovod 弹性训练

  • 相关阅读:
    Asp.Net Web API 2第八课——Web API 2中的属性路由
    Asp.Net Web API 2第七课——Web API异常处理
    Asp.Net Web API 2第六课——Web API路由和动作选择
    Asp.Net Web API 2第五课——Web API路由
    开始学习python
    BMI 小程序 购物车
    深浅copy 文件操作
    字典 dict 集合set
    基本数据类型 (str,int,bool,tuple,)
    python 运算符
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/14995117.html
Copyright © 2011-2022 走看看