zoukankan      html  css  js  c++  java
  • [源码解析] PyTorch 分布式(10)DistributedDataParallel 之 Reducer静态架构

    [源码解析] PyTorch 分布式(10)------DistributedDataParallel之Reducer静态架构

    0x00 摘要

    通过上文分析,我们已经知道了 DDP 的基本架构和如何初始化,本文就看看其核心 Reducer 的静态架构。Reducer提供了反向传播中梯度同步的核心实现。

    本系列其他文章如下:

    深度学习利器之自动微分(1)

    深度学习利器之自动微分(2)

    [源码解析]深度学习利器之自动微分(3) --- 示例解读

    [源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

    [源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

    [源码解析] PyTorch如何实现前向传播(3) --- 具体实现

    [源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

    [源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

    [源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

    [源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

    [源码解析] PyTorch 分布式(1)------历史和概述

    [源码解析] PyTorch 分布式(2) ----- DataParallel(上)

    [源码解析] PyTorch 分布式(3) ----- DataParallel(下)

    [源码解析] PyTorch 分布式(4)------分布式应用基础概念

    [源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用

    [源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store

    [源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组

    [源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇

    [源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化

    0x01 引论

    1.1 调用

    Reducer 的创建代码如下,是在_ddp_init_helper 之中。

            # Note: reverse list of buckets because we want to approximate the
            # order in which their gradients are produced, and assume they
            # are used in the forward pass in the order they are defined.
            self.reducer = dist.Reducer(
                parameters, # parameters[0]是张量列表
                list(reversed(bucket_indices)), # 桶信息
                self.process_group,
                expect_sparse_gradient,
                self.bucket_bytes_cap,
                self.find_unused_parameters,
                self.gradient_as_bucket_view,
                param_to_name_mapping,
            )
    

    调用的 parameters 举例如下, parameters[0] 就是 rank 0 上模型的 parameters,可以看到其只有 [0] 元素有意义,这个 [0] 原始本身包括 20 个元素:

    parameters = {list: 1} 
    0 = {list: 4}           
     0 = {Parameter: 10} Parameter containing:\ntensor([[-4.0381e-02,  3.8828e-02, 1  )   
     1 = {Parameter: 10} Parameter containing:\ntensor([-0.0438, -0.2033,  0.2771,  0.0721,  ) 
     2 = {Parameter: 5} Parameter containing:\ntensor([[-0.0094, -0.1319,  0.0713,  0.3155,  )
     3 = {Parameter: 5} Parameter containing:\ntensor([-0.0008,  0.0582, -0.1245, -0.2538, )
     ...
     20 = {Parameter: 5} Parameter containing:\ntensor([-0.0008,  0.0582, -0.1245, -0.2538, )                                                   
     __len__ = {int} 20
    __len__ = {int} 1
    

    bucket_indices 举例如下:

    关于 tensor indices,就是给所有的tensor一个index,从0开始递增,一直到 tensors.size()。假如模型的 parameters 一共有20个张量,则 tensor index 从 0 到 19,分成 6 个buckets,则在这6个buckets之中,每个 tensor index 都是唯一不重复的。

    +-----------------------------------------------------------------------+
    |                                                                       |
    |  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
    |                                                                       |
    |                                                                       |
    |  <tensor index 4, tensor index 5, tensor 6>                           |
    |                                                                       |
    |                                                                       |
    |  ......                                                               |
    |                                                                       |
    |                                                                       |
    |  <tensor index 16, tensor index 17, tensor index 18, tensor index 19> |
    |                                                                       |
    +-----------------------------------------------------------------------+
    
    

    python代码无意义,我们只能看看C++。

    class Reducer(__pybind11_builtins.pybind11_object):
        def __init__(self, replicas, *args, **kwargs): 
            """ __init__(self: torch._C._distributed_c10d.Reducer, replicas: List[List[at::Tensor]], bucket_indices: List[List[int]], process_group: c10d::ProcessGroup, expect_sparse_gradients: List[List[bool]] = [], bucket_bytes_cap: int = 26214400, find_unused_parameters: bool = False, gradient_as_bucket_view: bool = False, param_to_name_mapping: Dict[int, str] = {}) -> None """
            pass
    

    于是我们来到了 torch/lib/c10d/reducer.h 和 torch/lib/c10d/reducer.cpp。

    0x02 Reducer 定义

    Reducer提供了反向传播中梯度同步的核心实现,其定义相当复杂,我们甚至需要去掉一些不重要的成员变量以便展示:

    class Reducer {
     public:
      // The constructor takes a list of variables for every model replica.
      // The bucket assignment for this reducer is specified as a list of
      // buckets, each of which is specified as a list of indices into the
      // variables list for **a single replica** (i.e. `variables[0]`).
      explicit Reducer(
          std::vector<std::vector<at::Tensor>> replicas,
          std::vector<std::vector<size_t>> bucket_indices,
          c10::intrusive_ptr<c10d::ProcessGroup> process_group,
          std::vector<std::vector<bool>> expect_sparse_gradients,
          int64_t bucket_bytes_cap,
          bool find_unused_parameters,
          bool gradient_as_bucket_view,
          std::unordered_map<size_t, std::string>
              paramNames);
    
     protected:
      // Forward declaration.
      struct Bucket;
    
      void push_rebuilt_params(const VariableIndex& index);
    
      mutable std::mutex mutex_;
      const std::vector<std::vector<at::Tensor>> replicas_;
      const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_;
      std::vector<std::vector<bool>> expect_sparse_gradients_;
    
      std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>>
          grad_accumulators_;
      std::unordered_map<torch::autograd::Node*, VariableIndex>
          gradAccToVariableMap_;
      std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
          hooks_;
    
      bool expect_autograd_hooks_;
      bool require_finalize_;
      size_t next_bucket_;
    
      bool has_marked_unused_parameters_;
      const bool find_unused_parameters_;
      const bool gradient_as_bucket_view_;
      std::vector<VariableIndex> unused_parameters_; // 如果没有用到,直接设置为就绪,第一次迭代之后久不会改变了
      // Locally used parameter maps indicating if parameters are used locally
      // during the current iteration or no_sync session if no_sync is on. One
      // tensor for each model replica and each tensor is one-dim int32 tensor of
      // number of parameters. These tensors are marked in autograd_hook to indicate
      // the corresponding param has been used, and get allreduced in the end of
      // backward of current iteration or no_sync session for figuring out the
      // globally unused parameters.
      //
      // local_used_maps_:     CPU tensors for bookkeeping locally used params
      // local_used_maps_dev_: dev tensors for reducing globally unused params
      std::vector<at::Tensor> local_used_maps_;
      std::vector<at::Tensor> local_used_maps_dev_;
      // Indicate that reduction is done and D2H copy is done as well.
      bool local_used_maps_reduced_;
    
      using GradCallback =
          torch::distributed::autograd::DistAutogradContext::GradCallback;
    
      // A bucket replica represents [1..N] gradients to be reduced,
      // with the same dtype, on the same device.
      //
      // Batching gradients together before reducing them can result in lower
      // overhead and/or faster time to completion. Only gradients of the same type
      // and on the same device can be batched. The tensor that represents the
      // flattened gradient uses the same type and is placed on the same device.
      // Buckets are filled as the gradients they hold are computed (triggered by
      // autograd hooks). Buckets are reduced in a predetermined order that is
      // identical across processes.
      struct BucketReplica {
        // Flattened (1 dimensional) contents of bucket.
        at::Tensor contents;
    
        // Views into contents for each grad.  Each view will be created with
        // layout (sizes + strides) matching the grad's expected layout
        // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h).
        // `bucket_views_in[i].copy_(grad)` and
        // `grad.copy_(bucket_views_out[i])`
        // provide convenient ways to move grad data in/out of contents.
        // The reason we keep two states for bucket_views is that if DDP
        // communication hook was registered, `bucket_views_out` could be
        // re-initialized with the value of hook's `future_work`. We still need to
        // keep a separate view reference to replica's original contents for
        // `bucket_views_in[i].copy_(grad)` call.
        std::vector<at::Tensor> bucket_views_in;
        std::vector<at::Tensor> bucket_views_out;
    
        // Variables that contribute to this bucket replica. Use refcounted value
        // here so that we can easily unflatten the bucket contents into the
        // participating variables after reduction has completed.
        std::vector<at::Tensor> variables;
    
        // Per-variable offset/length into the flat bucket contents tensor and grad
        // bucket.
        std::vector<size_t> offsets;
        std::vector<size_t> lengths;
    
        // Per-variable sizes into the grad bucekt.
        std::vector<c10::IntArrayRef> sizes_vec;
    
        // Number of tensors to be added before this bucket is complete.
        // This is reset to `variables.size()` every iteration.
        size_t pending;
    
        // TODO(@pietern)
        // Memory copies from gradient tensors into the bucket are potentially
        // done on different CUDA streams. We record an event for every copy
        // so that we can synchronize with them prior to kicking off the reduction.
        // std::vector<at::cuda::CUDAEvent> events;
      };
      // A bucket holds N bucket replicas (1 per model replica).
      //
      // If every bucket in this struct is ready, the reduction can be kicked off.
      // One bucket per replica. Reduction is kicked off when every bucket is ready.
      //
      struct Bucket {
        std::vector<BucketReplica> replicas;
    
        // Global indices of participating variables in the bucket
        std::vector<size_t> variable_indices;
    
        // Number of replicas to be marked done before this bucket is ready.
        size_t pending;
    
        // Keep work handle around when this set of buckets is being reduced.
        c10::intrusive_ptr<c10d::ProcessGroup::Work> work;
    
        // Keep future work handle around if DDP comm hook is registered.
        c10::intrusive_ptr<torch::jit::Future> future_work;
    
        // If this bucket should expect a single sparse gradient.
        // Implies: replicas[i].variables.size() == 1.
        bool expect_sparse_gradient = false;
      };
    
      std::vector<Bucket> buckets_;
    
      // A variable locator locates a particular variable in the bucket
      // structure. The `bucket_index` field points to the bucket in the `buckets_`
      // vector. The `intra_bucket_index` field points to the index of the variable
      // in any of the vector fields in the bucket replica.
      struct VariableLocator {
        // Index into the `buckets_` variable.
        size_t bucket_index;
        // Index of parameter in single bucket replica.
        size_t intra_bucket_index;
    
        VariableLocator() = default;
    
        VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) {
          bucket_index = bucket_index_;
          intra_bucket_index = intra_bucket_index_;
        }
      };
    
      // Map the index of a variable to its location in the bucket structure.
      std::vector<VariableLocator> variable_locators_;
    
      // track the number of iterations to synchronize grads in training so far.
      long num_iterations_;
      // track the number of buckets that have been ready for
      // communication calls like allReduce or communication hooks.
      int num_buckets_ready_;
    
      // We collect the relative timestamp of every gradient being ready
      // when executing autograd. This can be used to derive a timeline of
      // the point in time buckets were ready, or ideal bucket assignment/ordering.
      std::vector<std::vector<int64_t>> backward_stats_;
    
      int ddp_runtime_logging_sample_rate_ = kDDPRuntimeLoggingSampleRate;
    
      bool is_multi_device_module_ = false;
    
      // Following variables are to help build dynamic bucket order
      bool has_rebuilt_bucket_;
      std::vector<at::Tensor> rebuilt_params_;
      std::vector<int64_t> rebuilt_param_indices_;
      const int64_t bucket_bytes_cap_;
    
      struct RpcContext {
        using ContextPtr = torch::distributed::autograd::ContextPtr;
        // The shared_ptr is to hold the context instance.
        ContextPtr context_ptr_holder;
        std::atomic<ContextPtr::element_type*> context_ptr{nullptr};
    
        void set(ContextPtr&& new_context_ptr);
      };
      RpcContext rpc_context_;
    
      // A struct containing work handle and tensor for allreduce scheduled in
      // forward pass, if applicable.
      struct ForwardPassAllreduceWork {
        c10::intrusive_ptr<c10d::ProcessGroup::Work> workHandle;
        at::Tensor resultTensor;
        // whether we should divide by the initial world_size or the no. of
        // remaining DDP ranks.
        bool useStaticWorldSize;
      };
    
      // Handle for the currently scheduled allreduce in the forward pass, if
      // applicable.
      ForwardPassAllreduceWork forwardPassWorkHandle_;
    
      // Division factor for reduction of gradients.
      int divFactor_;
    
      bool static_graph_;
    
      // Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
      // should be triggered before marking this variable's grad as ready for communication.
      // Map will not change after 1st iteration.
      std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_;
      // Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
      // are left to be triggered before marking this variable's grad as ready for communication.
      // Map will change after 1st iteration to track a grad is ready for communication or not.
      std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_;
    
     private:
      // comm_hook_ is used to access the DDP communication hook if registered.
      std::unique_ptr<CommHookInterface> comm_hook_;
      // Current thread local state
      at::ThreadLocalState thread_local_state_;
      // Debug level setting. It is parsed once when Reducer is constructed, and
      // remains the same across a single invocation of DDP training.
      DistributedDebugLevel ddp_debug_level_;
      // Mapping of variable index to fully qualified name of model to notify users
      // about errors when certain parameters do not get gradient.
      std::unordered_map<size_t, std::string> param_names_;
      // Per iteration set of parameter indices that have been marked ready.
      std::unordered_set<size_t> perIterationReadyParams_;
      // Retrieves parameter names that have not been marked as ready as part of
      // previous iteration.
      std::vector<std::string> getUnmarkedParamsForIteration();
      // Retrives parameter indices that have not been marked as ready as part of
      // previous iteration.
      std::vector<size_t> getUnmarkedParamIndicesForIteration();
      // Raises appropriate error if mark_variable_ready is called on the same
      // variable twice, which is unexpected.
      void checkAndRaiseMarkedTwiceError(size_t curVariableIndex);
    
      friend class Logger;
    };
    

    Reducer 的关键成员变量如下。

      std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>>
          grad_accumulators_; // 对应的 index 存了相应的 grad_accumulator,就是 tensor index对应的grad_accumulator
      std::unordered_map<torch::autograd::Node*, VariableIndex>
          gradAccToVariableMap_; // 存了grad_accumulator & index 的对应关系,这样以后在 autograd graph 寻找 unused parameters 就方便了
      std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
          hooks_;
    
      std::vector<Bucket> buckets_;
    
      const std::vector<std::vector<at::Tensor>> replicas_; // 传入的张量
      const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_; // 进程组
    

    我们接下来一一分析这些成员变量。

    0x03 Bucket

    3.1 设计

    在规约梯度之前将梯度批处理在一起可以降低开销和/或加快完成时间。但是只能对同一设备上相同类型的梯度进行批处理。

    桶是梯度的集合,统一设备上相同类型的梯度被放到同一个桶之中。在代码之中,Bucket 就是桶的概念。

    在每次向后传播中,将所有参数梯度中的张量复制到桶中,并在AllReduce之后将平均梯度复制回桶中。为了加速复制操作,存储桶始终与参数在同一设备上创建。如果模型跨越多个设备,DDP会考虑设备关联性,以确保同一存储桶中的所有参数都位于同一设备上。AllReduce的顺序也会对结果产生影响,因为它决定了多少通信可以与计算重叠。DDP按model.parameters()的相反顺序启动AllReduce

    3.2 定义

    3.2.1 BucketReplica有几个

    为了更好的说明,我们首先要分析一下 BucketReplica 是什么。我们从注释出发看看。

    首先,一个桶 Bucket 有多个BucketReplica,每一个模型对应一个BucketReplica。

    // A bucket holds N bucket replicas (1 per model replica).
    

    但是只用了一个 [0] 元素,因为目前不支持单进程多设备模式,所以假定桶里只有一个replica。

        GradBucket grad_bucket(
            next_bucket_,
            tensors[0],
            // 这里的注释指明了不支持 SPMD
            // Since currently we do not support single-process multiple-device
            // mode, we can assume only one replica in the bucket.
            bucket.replicas[0].offsets,
            bucket.replicas[0].lengths,
            bucket.replicas[0].sizes_vec);
        bucket.future_work = comm_hook_->runHook(grad_bucket);
    

    再结合前文代码,未来不会支持 SPMD。parameters 就是 [ToyModel] 这个模型列表的参数集合,parameters[0] 就是 ToyModel 的参数。

        # 下面注释指明了未来也不会支持 SPMD
        # TODO(wayi@): Remove this field since SPMD is no longer supported,
        # and also remove all the relevant unnecessary loops.
        # Module replication within process (single-process multi device)
        
        self._module_copies = [self.module] # 构建一个比如 [ToyModel] 这样的列表
        # Build parameters for reducer.
        parameters, expect_sparse_gradient = self._build_params_for_reducer()
    

    综合以上我们知道:

    • DDP 原来是希望像 DP 那样支持 SPMD,所以本进程就需要维护多个 GPU 之上的多个模型副本的参数,即,parameters 就是一个数组,数组中每个元素是一个模型副本的参数。
    • parameters 被赋值为 Reducer.replicas_,而 Reducer.replicas_ 用来赋值给 bucket.replicas。
    • 因为未来不支持Reducer.replicas_,所以只有 parameters[0] 有意义。

    所以我们得出结论:

    • BucketReplica 就是一个模型的待求梯度参数组。replica 对应一个 device (GPU)上的模型副本的参数信息(部分),即,一个 replica 代表了 [1..N] 个需要被规约的梯度,这些梯度拥有同样的 dtype,位于同样的设备上。
    • 事实上,只有 bucket.replicas[0] 有意义,就对应了上面代码中的 [self.module] 之中的部分需求导张量,就是 parameters[0] 。

    3.2.2 关键

    我们再总结一下 Bucket 的关键:

    • replicas 成员变量就是 bucket 对应的各个BucketReplica。一个 BucketReplica 代表了 [1..N] 个需要被规约的梯度,这些梯度拥有同样的 dtype,位于同样的设备上。

      • 只有 bucket.replicas[0] 有意义,就对应了本模型的待求梯度参数组之中本bucket对应的张量
      • 如何赋值?就是使用 Reducer.replicas_ 来赋值,而 replicas_ 就是参数 parameters。我们下面就会介绍。
    • variable_indices 成员变量用来记录本桶之中有哪些variable 的index。

      如何赋值?使用前面介绍的 bucket_indices 进行赋值。

      bucket.variable_indices = std::move(bucket_indices[bucket_index]);
      

      如何使用?intra_bucket_index 是bucket.variable_indices的序号,利用序号得到真正的variable index。后文会依据代码再进行阐释。

      size_t variable_index = bucket.variable_indices[intra_bucket_index];
      

    3.2.3 具体定义

    最后,Bucket 具体定义如下:

      // A bucket holds N bucket replicas (1 per model replica).
      //
      // If every bucket in this struct is ready, the reduction can be kicked off.
      // One bucket per replica. Reduction is kicked off when every bucket is ready.
      //
      struct Bucket {
        std::vector<BucketReplica> replicas;// 每个模型副本对应一个桶
    
        // Global indices of participating variables in the bucket
        std::vector<size_t> variable_indices; // 具体每个桶里面有哪些 variable。
    
        // Number of replicas to be marked done before this bucket is ready.
        size_t pending; // 计数,
    
        // Keep work handle around when this set of buckets is being reduced.
        c10::intrusive_ptr<c10d::ProcessGroup::Work> work;
    
        // Keep future work handle around if DDP comm hook is registered.
        c10::intrusive_ptr<torch::jit::Future> future_work;
    
        // If this bucket should expect a single sparse gradient.
        // Implies: replicas[i].variables.size() == 1.
        bool expect_sparse_gradient = false;
      };
    

    3.3 设置

    Reducer 的成员变量buckets_ 是关键,这是Reducer 之中所有的桶。

    std::vector<Bucket> buckets_;
    

    在初始化函数中有如何初始化 buckets_,核心是:

    • 找到本bucket在 bucket_indices 之中的 index。
    • 在 parameters 之中找到 index 对应的张量。
    • 在 BucketReplica 之中配置这些张量,就是本bucket应该规约的张量。
    void Reducer::initialize_buckets(
        std::vector<std::vector<size_t>> bucket_indices) {
      
      buckets_.reserve(bucket_count);
      
      for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) {
        Bucket bucket;
        
        // Variables that expect sparse gradients must have their own bucket.
        if (bucket_indices[bucket_index].size() == 1) {
          const auto variable_index = bucket_indices[bucket_index].front();
          bucket.expect_sparse_gradient = // 设置 bucket
              expect_sparse_gradients_[0][variable_index];
        }     
        // Iterate over model replicas.
        for (size_t replica_index = 0; replica_index < replica_count;
             replica_index++) {
          
          BucketReplica replica; // 设置replica
    
          if (bucket.expect_sparse_gradient) {
            const auto variable_index = bucket_indices[bucket_index].front();
            // 找到index对应的tensor
            const auto& variable = replicas_[replica_index][variable_index];
            replica.variables = {variable};
          } else {
    
            // Iterate over bucket variables.
            for (const auto variable_index : bucket_indices[bucket_index]) {
              // 找到index对应的tensor
              const auto& variable = replicas_[replica_index][variable_index];
              if (!options.has_device()) {
                options = options.device(variable.device());
              } 
              if (!options.has_dtype()) {
                options = options.dtype(variable.dtype());
              } 
              
              const auto length = variable.numel();
              replica.variables.push_back(variable); // 插入张量
              replica.offsets.push_back(offset);
              replica.lengths.push_back(length);
              replica.sizes_vec.push_back(variable.sizes());
              offset += length;
            }
    
            // Allocate bucket contents tensor.
             initialize_bucket_views(replica, replica.contents);
          }
    
          // Add bucket replica to enclosing bucket.
          bucket.replicas.push_back(std::move(replica)); // 配置bucket
        }   
        
        bucket.variable_indices = std::move(bucket_indices[bucket_index]);
        buckets_.push_back(std::move(bucket)); //插入桶列表
      }  
    }
    

    用图例表示如下,这里假设 bucket index 是 1,即第 2 个桶,所以 variable_indices 对应了 bucket_indices 中的相应部分。比如 BucketReplica[0] 里面是 Tensor 4,5,6,而variable_indices就是 Tensor 4,5,6 分别的 index。

    下图中的 bucket_indices 是 Reducer 构造函数的参数之一。

    +--------------------------------+   +------------------------------------+
    |Reducer                         |   |                                    |
    |                                |   |bucket 0, bucket 1, ...... bucket n |
    |      vector<Bucket> buckets_ +---> |    +                               |
    |                                |   |    |                               |
    +--------------------------------+   +------------------------------------+
                                              |
                              +---------------+              +------------------------------+
                              |                         +--> | Tensor 4, Tensor 5, Tensor 6 |
                              |                         |    +------------------------------+
                              |                         |
                              v                   +-----------------------------------------+
    +-------------------------+-----------+       |     |                                   |
    | Bucket                              |       | +---+-----------+     +---------------+ |
    |                                     |       | | BucketReplica |     | BucketReplica | |
    |                                     |       | |               | ... |               | |
    |   vector<BucketReplica> replicas +--------> | +---------------+     +---------------+ |
    |                                     |       +-----------------------------------------+
    |                                     |
    |   vector<size_t> variable_indices +------->  <tensor index 4, tensor index 5, tensor 6>
    |                                     |
    +-------------------------------------+
    
    
    
    
    
    bucket_indices    +-----------------------------------------------------------------------+
         +            |                                                                       |
         |            |  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
         |            |                                                                       |
         +----------> |                                                                       |
                      |  <tensor index 4, tensor index 5, tensor 6>                           |
                      |                                                                       |
                      |                                                                       |
                      |  ......                                                               |
                      |                                                                       |
                      |                                                                       |
                      |  <tensor index 16, tensor index 17, tensor index 18, tensor index 19> |
                      |                                                                       |
                      +-----------------------------------------------------------------------+
    
    
    

    0x03 BucketReplica

    如前面讨论的,一个 BucketReplica 代表了 [1..N] 个需要被规约的梯度,这些梯度拥有同样的 dtype,位于同样的设备上。是一个模型待求梯度参数的一部分,具体是哪些,由 bucket 的 variable_indices 决定。

    其关键成员变量为:

    • std::vector<at::Tensor> variables 是构成此bucket副本的variable。我们在这里使用refcounted value,这样我们就可以在完成规约之后,轻松地将bucket内容 unflatten 到参与变量中。
    • at::Tensor contents :把桶的内容展平的结果,即Flattened (1 dimensional) 之后的结果。
    • std::vector<at::Tensor> bucket_views_in :提供了从输入角度在 contents 之中查看具体梯度的方法。
    • std::vector<at::Tensor> bucket_views_out :提供了从输出角度在 contents 之中查看具体梯度的方法。

    具体可以参见如下注释:

    Views serve as entry points to copy_ each grad's data in/out of the flat contents tensor.
    

    3.1 Views

    关于 std::vector<at::Tensor> bucket_views_instd::vector<at::Tensor> bucket_views_out 的进一步说明:

    • 在 PyTorch 之中,视图是指创建一个方便查看的东西,视图与原数据共享内存,它只是将原有的数据进行整理,直接显示其中部分内容或者进行重排序后再显示出来。
    • 每个 view 都将按照如下布局(sizes + strides)创建,这个布局与grad的预期布局相匹配。
    • bucket_views_in 和 bucket_views_out 这两个变量提供在 contents 之中操作具体梯度的方法,或者说,它们提供了视图(views),该视图可以操作contents 之中每个张量的梯度。用户把这两个变量作为入口点来把每个梯度的数据从 content 之中移入和移出。
    • 我们为bucket_视图保留两种状态的原因是:如果注册了DDP通信钩子(communication hook), bucket_views_out 可以用钩子的 future_work值重新初始化。所以我们需要为bucket_views_in[i].copy_(grad) 保留一个对 replica 原始 contents 的单独视图引用。
    • bucket_views_in[i].copy_(grad)grad.copy_(bucket_views_out[i]) 提供了将梯度数据移入/移出contents的方便方法。

    另外,以下三个成员变量存储桶的每个flat张量信息,比如offsets存储了各个张量在flat bucket contents中的offset。

    // Per-variable offset/length into the flat bucket contents tensor and grad
    // bucket.
    std::vector<size_t> offsets;
    std::vector<size_t> lengths;
    // Per-variable sizes into the grad bucekt.
    std::vector<c10::IntArrayRef> sizes_vec;
    

    3.2 定义

    BucketReplica 具体定义为:

    // A bucket replica represents [1..N] gradients to be reduced,
    // with the same dtype, on the same device.
    //
    // Batching gradients together before reducing them can result in lower
    // overhead and/or faster time to completion. Only gradients of the same type
    // and on the same device can be batched. The tensor that represents the
    // flattened gradient uses the same type and is placed on the same device.
    // Buckets are filled as the gradients they hold are computed (triggered by
    // autograd hooks). Buckets are reduced in a predetermined order that is
    // identical across processes.
    struct BucketReplica {
      // Flattened (1 dimensional) contents of bucket.
      at::Tensor contents; // 这里打平了
    
      // Views into contents for each grad.  Each view will be created with
      // layout (sizes + strides) matching the grad's expected layout
      // ("Gradient Layout Contract" in torch/csrc/autograd/AccumulateGrad.h).
      // `bucket_views_in[i].copy_(grad)` and
      // `grad.copy_(bucket_views_out[i])`
      // provide convenient ways to move grad data in/out of contents.
      // The reason we keep two states for bucket_views is that if DDP
      // communication hook was registered, `bucket_views_out` could be
      // re-initialized with the value of hook's `future_work`. We still need to
      // keep a separate view reference to replica's original contents for
      // `bucket_views_in[i].copy_(grad)` call.
      std::vector<at::Tensor> bucket_views_in; // 怎么从contents 之中查找
      std::vector<at::Tensor> bucket_views_out; // 一个输出视图
    
      // Variables that contribute to this bucket replica. Use refcounted value
      // here so that we can easily unflatten the bucket contents into the
      // participating variables after reduction has completed.
      std::vector<at::Tensor> variables;
    
      // Per-variable offset/length into the flat bucket contents tensor and grad
      // bucket.
      std::vector<size_t> offsets;
      std::vector<size_t> lengths;
    
      // Per-variable sizes into the grad bucekt.
      std::vector<c10::IntArrayRef> sizes_vec;
    
      // Number of tensors to be added before this bucket is complete.
      // This is reset to `variables.size()` every iteration.
      size_t pending;
    
      // TODO(@pietern)
      // Memory copies from gradient tensors into the bucket are potentially
      // done on different CUDA streams. We record an event for every copy
      // so that we can synchronize with them prior to kicking off the reduction.
      // std::vector<at::cuda::CUDAEvent> events;
    };
    

    目前为止,逻辑如下,如前所述,每个bucket只有 replicas[0] 有意义。

                                        +-----------------------------------------------------+
    +----------------------------+      | +-------+      +----------------------------------+ |
    | Reducer                    |      | |Bucket |      |Bucket                            | |
    |                            |      | |       |      |                                  | |
    |                            |      | |       |      |            Future  future_work   | |
    |  vector<Bucket> buckets_ +------> | |       | ...  |                                  | |
    |                            |      | |       |      |       ProcessGroup::Work  work   | |
    |                            |      | |       |      |                                  | |
    |                            |      | |       |      | vector<size_t> variable_indices  | |
    |                            |      | |       |      |                                  | |
    |                            |      | |       |      |  vector<BucketReplica> replicas  | |
    |                            |      | |       |      |                          +       | |
    |                            |      | |       |      |                          |       | |
    |                            |      | |       |      |                          |       | |
    +----------------------------+      | +-------+      +----------------------------------+ |
                                        +-----------------------------------------------------+
                                                                                    |
                                                                                    |
                                                                                    v
                               +--------------------------------------------------------------+
                               | +---------------+       +----------------------------------+ |
                               | |BucketReplica  |       | BucketReplica                    | |
                               | |               |       |                                  | |
                               | |               |       |                                  | |
                               | |               |       |  vector<Tensor> bucket_views_in  | |
                               | |               |  ...  |                                  | |
                               | |               |       |  vector<Tensor> bucket_views_out | |
                               | |               |       |                                  | |
                               | |               |       |  Tensor contents                 | |
                               | |               |       |                                  | |
                               | |               |       |  vector<Tensor> variables        | |
                               | |               |       |                                  | |
                               | |               |       |                                  | |
                               | +---------------+       +----------------------------------+ |
                               +--------------------------------------------------------------+
    
    

    3.3 初始化

    部分初始化的代码在 Reducer::initialize_buckets 之中。

    // Allocate bucket contents tensor. 分配内存
    replica.contents = at::empty({static_cast<long>(offset)}, options);
    
    initialize_bucket_views(replica, replica.contents);
    

    initialize_bucket_views 具体代码如下,这里需要对几个 PyTorch 函数进行说明。

    • as_strided :依据现有tensor以及给定的步长来创建一个视图(类型仍然为tensor),与原数据共享内存,不存储诗句,所以两个view都不是真实的存储,只是视图。
    • narrow :返回一个新的张量,其是原来张量的缩小版。

    initialize_bucket_views 主要逻辑是:

    • 遍历replica的张量,针对每一个张量,依据其是dense还是sparse进行不同处理,最后插入到replica.bucket_views_in之中。

    • 把 replica.bucket_views_out 设置为 replica.bucket_views_in,正常应该是相等的。

    • 如果gradient_as_bucket_view_设置为true,则需要处理两种情况:

      • 当调用 rebuild_buckets 重建 bucket时,initialize_bucket_view 可以在initialize_bucket内调用,如果grad在上一次迭代中已经定义/计算过,则需要将旧的grad复制到新的bucket_view中,并让grad指向新的bucket_view。

      • initialize_bucket_view 也可以在构建时候在 initialize_bucket 内调用。在构建时间内不会定义 Grad,

        在这种情况下,不要让梯度指向bucket_view,因为对于全局未使用的参数,梯度应保持为未定义。

    具体代码如下:

    // (see Note:  "Gradient Layout Contract" in initialize_buckets).
    void Reducer::initialize_bucket_views(
        Reducer::BucketReplica& replica,
        at::Tensor& contents) {
      for (size_t i = 0; i < replica.variables.size(); i++) { // 遍历replica的张量
        auto& v = replica.variables[i];
        const auto offset = replica.offsets[i];
        const auto length = replica.lengths[i];
        if (v.is_non_overlapping_and_dense()) {
          // If the param's memory is dense, match its layout, anticipating
          // the autograd engine (AccumulateGrad) will also create gradients
          // matching its layout.
          replica.bucket_views_in.push_back( // dense类型
              contents.as_strided(v.sizes(), v.strides(), offset));
        } else {
          // Fall back to a C-style contiguous view, again anticipating
          // AccumulateGrad will do the same when stashing grads for non-dense
          // params.
          replica.bucket_views_in.push_back( // sparse类型
              contents.narrow(0, offset, length).view(v.sizes()));
        }
        // By default `bucket_views_out` and `bucket_views_in` are
        // essentially the same thing.
        replica.bucket_views_out = replica.bucket_views_in;
    
        // If gradient_as_bucket_view_ is set as true, then there are two cases to
        // handle: initialize_bucket_views could be called inside initialize_buckets
        // when rebuild_buckets, if grad has already been defined/calculated in
        // previous iteration, old grad needs to be copied into new bucket_view and
        // let grad point to the new bucket_view, initialize_bucket_views could also
        // be called inside initialize_buckets during construction. Grads are not
        // defined during construction time, in this case, do not let grad point to
        // bucket_view, because grads should be kept as being undefined for globally
        // unused parameters.
        if (gradient_as_bucket_view_) {
          auto& bucket_view = replica.bucket_views_in.back();
          runGradCallbackForVariable(v, [&](auto& grad) {
            if (grad.defined() && !grad.is_alias_of(bucket_view)) {
              bucket_view.copy_(grad);
              grad = bucket_view;
              // 梯度被修改,需要写回去
              // The grad is modefied and needs to be written back.
              return true;
            }
            // 梯度没有被修改,不需要回写
            // The grad is not modified and does not need to be written back.
            return false;
          });
        }
      }
    }
    

    具体如下图:

    +------------------------------------------+
    | BucketReplica                            |
    |                                          |
    |       vector<Tensor> bucket_views_in +--------------------+
    |                                          |                |
    |                                          |                |
    |       vector<Tensor> bucket_views_out +--------------+    |
    |                                          |           |    |
    |                                          |           |    |
    |                                          |           v    v
    |                                          |     +-----+----+--------------------------+
    |       Tensor contents  +---------------------> |Flattened (Tensor1, Tensor2, Tensor3)|
    |                                          |     +-------------------------------------+
    |                                          |
    |                                          |
    |       vector<Tensor> variables  +------------>  [Tensor1,Tensor2,Tensor3]
    |                                          |
    |                                          |
    |                                          |
    +------------------------------------------+
    
    

    另外,mark_variable_ready_sparse, mark_variable_ready_dense, finalize_backward 都有对 contents 赋值。

    0x04 查询类

    以下两个类用来让 autograd hook 函数确定张量对应桶。

    4.1 VariableIndex

    VariableIndex 就是确定某个 tensor 在某个桶中的位置。这个对于 autograd hook 有用。对于autograd hook 回调,回调函数所在进程只是知道自己的梯度张量,但是回调函数需要知道这个张量位于哪个replica,以及位于replica之中哪个位置,这样才能进一步规约。

    4.1.1 成员变量

    Reducer 等类的实例之中,只有一个 VariableIndex 的成员变量,这个独立成员变量是:

    std::vector<VariableIndex> unused_parameters_
    

    VariableIndex 更多是作为其他成员变量的一部分或者参数存在,比如在 Reducer 之中,gradAccToVariableMap_ 就是使用了 VaribaleIndex。

    std::unordered_map<torch::autograd::Node*, VariableIndex>
          gradAccToVariableMap_; // 存了grad_accumulator & index 的对应关系,这样以后在 autograd graph 寻找 unused parameters 就方便了
    

    4.1.2 定义

    VariableIndex 定义如下:

    // Locates a specific variable by replica index and variable index.
    struct VariableIndex {
      size_t replica_index; // 位于哪个replica
      size_t variable_index; // variable index,注意,不是"位于replica之中哪个位置",而是所有 varibale的index,比如一共有10个参数,variable_index 的取值是从0~9。那么"位于replica之中哪个位置"由什么来确定?由下面的 VariableLocator 确定。
    
      VariableIndex() = default;
    
      VariableIndex(size_t replica_index_, size_t variable_index_) {
        replica_index = replica_index_;
        variable_index = variable_index_;
      }
    
      static size_t hash(const VariableIndex& key) {
        return c10::get_hash(key.replica_index, key.variable_index);
      }
    };
    

    在 Reducer 的构造函数中,有如下代码用于autogrid_hook的设定,这是给每个 replica 上的每个张量设置了一个 hook。如果autograd hook 不知道此梯度对应哪个 bucket,就无法告诉 DDP,这个 bucket 整体ready了。

    如何找到桶?需要使用下面的 VariableLocator。

    auto& variable = replicas_[replica_index][variable_index];
    const auto index = VariableIndex(replica_index, variable_index); // 生成了 VariableIndex
            hooks_.emplace_back(
                grad_accumulator->add_post_hook(
                    torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                        [=](const torch::autograd::variable_list& outputs,
                            const torch::autograd::variable_list& /* unused */) {
    #ifndef _WIN32
                          this->rpc_context_.set(  
                              ThreadLocalDistAutogradContext::getContextPtr());
    #endif
                          this->autograd_hook(index); // Hook的参数是 VariableIndex,目的是为了让 hook 可以顺利找到张量
                          return outputs;
                        })),
                grad_accumulator);
    

    4.2 VariableLocator

    4.2.1 定义

    VariableLocator 用来在 bucket 之中确定一个varaible。为了找到一个张量位置,我们需要知道在哪个桶,在桶的张量之中的哪个位置。

    • 哪个桶 : bucket_indexReducer.buckets_列表的位置,表示 buckets_ 之上的一个bucket。
    • 桶副本的哪个位置 : intra_bucket_index 是在 bucket.replica 之中 vector 域的 variable index。
    // A variable locator locates a particular variable in the bucket
    // structure. The `bucket_index` field points to the bucket in the `buckets_`
    // vector. The `intra_bucket_index` field points to the index of the variable
    // in any of the vector fields in the bucket replica.
    struct VariableLocator {
      // Index into the `buckets_` variable.
      size_t bucket_index; // 哪个桶
      // Index of parameter in single bucket replica.
      size_t intra_bucket_index; // 在桶副本的哪个位置
    
      VariableLocator() = default;
    
      VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) {
        bucket_index = bucket_index_;
        intra_bucket_index = intra_bucket_index_;
      }
    };
    

    4.2.2 成员变量

    Reducer 的成员变量为:

    // Map the index of a variable to its location in the bucket structure.
    std::vector<VariableLocator> variable_locators_;
    
    4.2.2.1 初始化

    如何初始化?

    void Reducer::initialize_buckets(
        std::vector<std::vector<size_t>> bucket_indices) {
      // Clear current bucket assignment.
      buckets_.clear();
      variable_locators_.clear();
      // Ensure we have a bucket index for every variable.
      variable_locators_.resize(replicas_[0].size());
      
      // Iterate over buckets.
      const auto bucket_count = bucket_indices.size();
      const auto replica_count = replicas_.size();
      buckets_.reserve(bucket_count);
      
      for (size_t bucket_index = 0; bucket_index < bucket_count; bucket_index++) { // 遍历桶  
        // Map participating variables to this bucket.
        // This is identical across replicas so we only need to do this once.
        size_t intra_bucket_index = 0;
        for (const auto variable_index : bucket_indices[bucket_index]) { // 遍历桶里面的张量,所有桶里每个张量index 都是唯一的
          variable_locators_[variable_index] =
              VariableLocator(bucket_index, intra_bucket_index++); // intra_bucket_index 就是递加
        }
    	}
    }
    

    问题:variable_locators_[variable_index] 在不同的桶之间,不会重复吗?不会,因为 VariableLocator(bucket_index, intra_bucket_index++) 从定义上看,bucket_index 和 intra_bucket_index 的组合是唯一的。

    我们给出一个例子。关于 tensor indices,就是给所有的tensor一个index,从0开始递增,一直到 tensors.size()。假如模型的 parameters 一共有12个张量,则 tensor index 从 0 到 11。假如分成 6 个buckets,则在这6个buckets之中,每个 tensor index 都是唯一不重复的。

    +-----------------------------------------------------------------------+
    |                                                                       |
    |  <tensor index 0, tensor index 1, tensor index 2, tensor index 3>     |
    |                                                                       |
    |                                                                       |
    |  <tensor index 4, tensor index 5, tensor 6>                           |
    |                                                                       |
    |                                                                       |
    |  ......                                                               |
    |                                                                       |
    |                                                                       |
    |  <tensor index 16, tensor index 17, tensor index 18, tensor index 19> |
    |                                                                       |
    +-----------------------------------------------------------------------+
    
    

    这样,对应的 variable_locators_ 是:

    variable_locators_[tensor index 0] =  VariableLocator(bucket 0, 0),即 tensor index 0 属于 bucket 0 的 第一个variable。
    
    variable_locators_[tensor index 1] =  VariableLocator(bucket 0, 1),即 tensor index 1 属于 bucket 0 的 第二个variable。
    
    variable_locators_[tensor index 2] =  VariableLocator(bucket 0, 2),即 tensor index 2 属于 bucket 0 的 第三个variable。
    
    variable_locators_[tensor index 3] =  VariableLocator(bucket 0, 3),即 tensor index 3 属于 bucket 0 的 第四个variable。
    
    4.2.2.2 使用

    如何使用?我们用下面做为例子。

    当 autograd hook 调用时候,使用 VariableIndex index 来回调,

    this->autograd_hook(index)
    

    autograd_hook 最终调用到 mark_variable_ready_dense,这里进而通过 variable_locators_ 来确定桶,然后进行后续操作。

    void Reducer::mark_variable_ready_dense(VariableIndex index) {
      const auto replica_index = index.replica_index;
      const auto variable_index = index.variable_index;
      const auto& bucket_index = variable_locators_[variable_index]; // 找到张量对应的桶index
      auto& bucket = buckets_[bucket_index.bucket_index]; // 找到桶
      auto& replica = bucket.replicas[replica_index]; // 再通过桶找到对应的 replica
      auto& variable = replica.variables[bucket_index.intra_bucket_index]; // 找到了张量
      const auto offset = replica.offsets[bucket_index.intra_bucket_index]; // 找到了张量信息
      const auto length = replica.lengths[bucket_index.intra_bucket_index];
      auto& bucket_view = replica.bucket_views_in[bucket_index.intra_bucket_index];
    
      // 接下来就可以继续处理了
      
      // Copy contents of gradient tensor to bucket tensor.
      // If the gradient is not set, we assume it wasn't computed
      // as part of the current backwards pass, and zero the part
      // of the bucket it would otherwise hold.
      runGradCallbackForVariable(variable, [&](auto& grad) {
        if (grad.defined()) {
          this->check_grad_layout(grad, bucket_view);
          // When gradient_as_bucket_view_ is false, or even when
          // gradient_as_bucket_view_ is true, in rare cases users may set grad to
          // be None after every iteration. In these cases, grad and bucket_view are
          // pointing to different storages and thus need to copy grads to
          // bucket_view. If gradient_as_bucket_view_ is set as true, let grad point
          // to bucket_view. If grad has already been set as views of buckets in
          // previous iterations, no copy is needed.
          if (!grad.is_alias_of(bucket_view)) {
            this->copy_grad_to_bucket(grad, bucket_view);
            if (gradient_as_bucket_view_) {
              // Let grad point to bucket_view buffer.
              grad = bucket_view;
              // The grad is modified and need to be written back.
              return true;
            }
          } else {
            // If grad and bucket view point to the same storage, no need to copy
            if (comm_hook_ == nullptr) {
              bucket_view.div_(divFactor_);
            }
          }
        } else {
          bucket_view.zero_();
        }
        // The grad is not modified and doesn't need to be written back.
        return false;
      });
    }
    
    

    0x05 累积相关类

    以下是梯度累积相关类。

    5.1 grad_accumulators_

    grad_accumulators_ 可以认为是一个矩阵,矩阵的每个item就是一个 AccumulateGrad(Node类型),就是用来计算梯度的。目前看来,这里只是一个bookkeeping作用。

    std::vector<std::vector<std::shared_ptr<torch::autograd::Node>>>
        grad_accumulators_;
    

    具体如下图,variable1 是一个实际的 张量,grad_accumulators_ 中的一个item 就指向 variable1 的 AccumulateGrad。

                                            variable1 +----+
                                                           |
                                                           |
                                                           v
    +-----------------------------------+    +-------------+-----------+
    |grad_accumulators_                 |    | Variable                |
    |                                   |    |                         |
    |                                   |    |   +------------------+  |
    | [replica_index][variable_index]+---------->+ AccumulateGrad   |  |
    |                                   |    |   |                  |  |
    |                                   |    |   |                  |  |
    +-----------------------------------+    |   |    post_hooks_+--------> autograd_hook(index)
                                             |   |                  |  |
                                             |   |                  |  |
                                             |   +------------------+  |
                                             |                         |
                                             +-------------------------+
    
    

    5.1.1 初始化

    如何初始化?在 Reducer 构建函数之中有:

      {
        const auto replica_count = replicas_.size();
    
        // 以下两个for循环会遍历所有的张量
        for (size_t replica_index = 0; replica_index < replica_count;
             replica_index++) {
    
          for (size_t variable_index = 0; variable_index < variable_count;
               variable_index++) {
            
            auto& variable = replicas_[replica_index][variable_index];
            const auto index = VariableIndex(replica_index, variable_index);
    
            // The gradient accumulator function is lazily initialized once.
            // Therefore we can use its presence in the autograd graph as
            // evidence that the parameter has participated in an iteration.
            
            auto grad_accumulator = // 得到一个张量的grad_accumulator
                torch::autograd::impl::grad_accumulator(variable);
    
            // Hook to execute after the gradient accumulator has executed.
            hooks_.emplace_back(
                grad_accumulator->add_post_hook(
                    torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                        [=](const torch::autograd::variable_list& outputs,
                            const torch::autograd::variable_list& /* unused */) {
    #ifndef _WIN32
                          this->rpc_context_.set(
                              ThreadLocalDistAutogradContext::getContextPtr());
    #endif
                          this->autograd_hook(index);
                          return outputs;
                        })),
                grad_accumulator);
    
            // Map raw function pointer to replica index and parameter index.
            // This is used later on when the autograd graph is traversed
            // to check for parameters for which no gradient is computed, if
            // find_unused_parameters=True.
            // Note that the mapping of gradient accumulator to variable should be
            // one to one as we deduplicate shared parameters before constructing
            // Reducer.
            if (find_unused_parameters_) {
              gradAccToVariableMap_[grad_accumulator.get()] = index;
            }
    
            numGradHooksTriggeredMap_[index] = 0;
    
            // The gradient accumulator is stored as weak_ptr in the autograd
            // metadata of the variable, so we have to keep it alive here for
            // the raw pointer to be valid.
            grad_accumulators_[replica_index][variable_index] =
                std::move(grad_accumulator); // 把这个张量的 grad_accumulator 复制到 grad_accumulators_
          }
        }
      }
    

    5.1.2 使用

    grad_accumulator 返回的是 Node,也就是 AccumulateGrad,是一个Node类型,我们取出了检查校验代码。

    std::shared_ptr<Node> grad_accumulator(const Variable& self) {
      auto autograd_meta = get_autograd_meta(self);
    
      std::lock_guard<std::mutex> lock(autograd_meta->mutex_);
    
      auto result = autograd_meta->grad_accumulator_.lock();
      if (result)
        return result;
    
      c10::raw::intrusive_ptr::incref(self.unsafeGetTensorImpl());
      auto intrusive_from_this = c10::intrusive_ptr<at::TensorImpl>::reclaim(self.unsafeGetTensorImpl());
      result = std::make_shared<AccumulateGrad>(Variable(std::move(intrusive_from_this)));
      autograd_meta->grad_accumulator_ = result;
      return result;
    }
    

    5.2 gradAccToVariableMap_

    gradAccToVariableMap_ 的定义如下:

    std::unordered_map<torch::autograd::Node*, VariableIndex> gradAccToVariableMap_;
    

    作用是给每个 Node 一个对应的VariableIndex,具体如图,下面就给 variable 1 一个 index 1:

                                                            +--------------+
                                                            | Variable     |
                                                      +---> |              |
                                                      |     |              |
                                                      |     +--------------+
                                                      |
                                                      |
    +-------------------------------------+           |
    | gradAccToVariableMap_               |           |
    |                                     |           |
    |                                     |           +
    |         <Node*, VariableIndex> +---------> [variable1 :index1, variable2 : index2]
    |                                     |                     +
    |                                     |                     |
    |                                     |                     |
    +-------------------------------------+                     |
                                                                |
                                                                v
                                                      +---------+-----------------------------+
                                                      |VariableIndex                          |
                                                      |                                       |
                                                      |          replica_index of Variable1   |
                                                      |                                       |
                                                      |          variable_index of Variable1  |
                                                      |                                       |
                                                      +---------------------------------------+
    

    5.2.1 初始化

    如何初始化?在 Reducer 构造函数中有如下,就是给每个需要求导的 Varaible 一个VariableIndex。

    auto& variable = replicas_[replica_index][variable_index];
    const auto index = VariableIndex(replica_index, variable_index);
    auto grad_accumulator = torch::autograd::impl::grad_accumulator(variable);
    
    if (find_unused_parameters_) {
      gradAccToVariableMap_[grad_accumulator.get()] = index;
    }
    

    5.2.2 使用

    gradAccToVariableMap_ 的使用如下,search_unused_parameters 就是遍历查找 gradAccToVariableMap_,如果某一个accumulator 函数没有在 gradAccToVariableMap_ 里面,就说明不用计算梯度。

    // Traverse the autograd graph starting at the specified output.
    // All parameters for which we have a pointer to their gradient accumulation
    // functions, but don't show up in the autograd graph will be marked ready for
    // for reduction as soon as the first autograd hook is called. This is not
    // done immediately because the model output may be ignored, and we only
    // want to start performing reductions on `torch.autograd.backward()`.
    void Reducer::search_unused_parameters(
        const std::vector<torch::autograd::Variable>& outputs) {
      std::unordered_set<torch::autograd::Node*> seen;
      std::vector<torch::autograd::Node*> queue;
    
      // Seed queue with the grad functions of all outputs.
      for (const auto& output : outputs) {
        const auto& grad_fn = output.grad_fn();
        if (grad_fn) {
          queue.push_back(grad_fn.get());
        }
      }
    
      // Traverse the autograd graph starting at the specified output.
      while (!queue.empty()) {
        auto fn = queue.back();
        queue.pop_back();
        for (const auto& edge : fn->next_edges()) {
          if (auto next_ptr = edge.function.get()) {
            const bool was_inserted = seen.insert(next_ptr).second;
            if (was_inserted) {
              queue.push_back(next_ptr);
            }
          }
        }
      }
    
      // 遍历查找,如果某一个accumulator 函数没有在这图里面,就说明不用计算梯度
      // Find accumulator functions that don't show up in this graph.
      for (const auto& it : gradAccToVariableMap_) {
        // If the accumulator function is present in the graph, we know
        // a gradient will be computed for the corresponding parameter.
        if (seen.count(it.first) == 0) {
          unused_parameters_.push_back(it.second);
        }
      }
    }
    

    5.3 numGradHooksTriggeredMap_

    记录在本张量的梯度就绪之前,该张量的 autograd_hook 应该被调用几次。第一次迭代之后,不再增加,所以这个数值应该就是1或者0。用来设置 unused_parameters_ 和 配置 numGradHooksTriggeredMapPerIteration_。

    // Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
    // should be triggered before marking this variable's grad as ready for communication.
    // Map will not change after 1st iteration.
    std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMap_;
    

    5.3.1 初始化

    如何初始化?在构建函数之中有:

    numGradHooksTriggeredMap_[index] = 0;
    

    第一次迭代之后,后续调用 autogrid_hook 就递增加一。

    // The function `autograd_hook` is called after the gradient for a
    // model parameter has been accumulated into its gradient tensor.
    // This function is only to be called from the autograd thread.
    void Reducer::autograd_hook(VariableIndex index) {
    
      // 省略部分代码
      
      if (static_graph_first_iteration()) {
        numGradHooksTriggeredMap_[index] += 1; // 静态图第一次迭代时候,这里会增加1
        return; // 然后直接返回,注意!
      }
    
      // If `find_unused_parameters_` is true there may be model parameters that
      // went unused when computing the model output, they won't be part of the
      // autograd graph, and won't receive gradients. These parameters are
      // discovered in the `prepare_for_backward` function and their indexes stored
      // in the `unused_parameters_` vector.
      if (!has_marked_unused_parameters_) {
        has_marked_unused_parameters_ = true;
        for (const auto& unused_index : unused_parameters_) {
          mark_variable_ready(unused_index);
        }
      }
    
      // If it is static graph, after 1st iteration, check a avariable
      // is ready for communication based on numGradHooksTriggeredMap_.
      if (static_graph_after_first_iteration()) {
        if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
          // Finally mark variable for which this function was originally called.
          mark_variable_ready(index); // 
        }
      } else {
        // Finally mark variable for which this function was originally called.
        mark_variable_ready(index);
      }
    }
    

    5.3.2 使用

    如何使用?这里会reset。

    void Reducer::reset_bucket_counting() {
      next_bucket_ = 0;
      // Reset num_buckets_ready_ at the beginning of backward computation
      // in each iteration.
      num_buckets_ready_ = 0;
    
      for (auto& bucket : buckets_) {
        for (auto& replica : bucket.replicas) {
          replica.pending = replica.variables.size();
        }
        bucket.pending = bucket.replicas.size();
      }
    
      if (static_graph_) {
        numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
      }
    }
    

    这里也会进行处理。如果为0,则插入unused_parameters_。

    // Right now delay_all_reduce is only called when static_graph_=true and
    // num_iterations_==1.
    void Reducer::delay_all_reduce() {
    
      // 省略部分代码
      
      // copy all gradients to buckets
      for (size_t replica_index = 0; replica_index < replicas_.size();
           replica_index++) {
        for (size_t variable_index = 0; variable_index < replicas_[replica_index].size();
             variable_index++) {
          const auto index = VariableIndex(replica_index, variable_index);
          // set unused_parameters_
          if (numGradHooksTriggeredMap_[index] == 0) { // 如果为0,则插入unused_parameters_
            unused_parameters_.push_back(index);
          }
          require_finalize_ = true;
          set_divide_factor();
          if (expect_sparse_gradients_[replica_index][variable_index]) {
            mark_variable_ready_sparse(index);
          } else {
            mark_variable_ready_dense(index);
          }
        }
      }
    
      // launch all reduces for all buckets
      for (auto & bucket : buckets_) {
        all_reduce_bucket(bucket);
      }
    
      finalize_backward();
    }
    

    5.4 numGradHooksTriggeredMapPerIteration_

    在本张量的梯度就绪之前,该张量的 autograd_hook 还需要被调用几次。如果为0,就说明这个桶应该整体就绪了。

    本成员变量是使用 numGradHooksTriggeredMap_ 来重置

    // Key: VariableIndex, Value: the number of times that a variable's autograd_hook()
    // are left to be triggered before marking this variable's grad as ready for communication.
    // Map will change after 1st iteration to track a grad is ready for communication or not.
    std::unordered_map<VariableIndex, int, c10::hash<VariableIndex>> numGradHooksTriggeredMapPerIteration_;
    

    5.4.1 使用

    如何使用?在静态图情况下,如果不是第一次迭代(此时刚刚产生梯度),就会把 numGradHooksTriggeredMapPerIteration_[index] 递减,如果为0,就说明该变量就绪,可以进行集合操作梯度规约了。

    // The function `autograd_hook` is called after the gradient for a
    // model parameter has been accumulated into its gradient tensor.
    // This function is only to be called from the autograd thread.
    void Reducer::autograd_hook(VariableIndex index) {
      
      // 省略其他代码
      
      // If it is static graph, after 1st iteration, check a avariable
      // is ready for communication based on numGradHooksTriggeredMap_.
      if (static_graph_after_first_iteration()) {
        if (--numGradHooksTriggeredMapPerIteration_[index] == 0) {
          // Finally mark variable for which this function was originally called.
          mark_variable_ready(index);
        }
      } else {
        // Finally mark variable for which this function was originally called.
        mark_variable_ready(index);
      }
    }
    

    当新一次迭代时候,会重置这个值,prepare_for_backward 会调用到 reset_bucket_counting。

    而且是使用 numGradHooksTriggeredMap_ 来重置

    void Reducer::reset_bucket_counting() {
      next_bucket_ = 0;
      // Reset num_buckets_ready_ at the beginning of backward computation
      // in each iteration.
      num_buckets_ready_ = 0;
    
      for (auto& bucket : buckets_) {
        for (auto& replica : bucket.replicas) {
          replica.pending = replica.variables.size();
        }
        bucket.pending = bucket.replicas.size();
      }
    
      if (static_graph_) {
        numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_;
      }
    }
    

    具体逻辑我们展示一下:

    • 对于 张量 2,就没有使用过,所以 delay_all_reduce 方法 之中直接放入到未使用参数。
    • 对于 张量 1:
      • numGradHooksTriggeredMap_ 初始化是 0。
      • 第一次迭代之后变成 1。
      • 后向传播时候,调用 prepare_for_backward 和 reset_bucket_counting,把 numGradHooksTriggeredMap_赋值给 numGradHooksTriggeredMapPerIteration_
      • autograd_hook 之中会递减,然后如果是 0,就设置此变量为 ready,可以规约了。
       Variable 2
    
                                         delay_all_reduce
    
       numGradHooksTriggeredMap_[2] = 0  +---------------> unused_parameters_.push_back(0)
    
    
    +----------------------------------------------------------------------------------------+
    
       Variable 1
    
    
    
        numGradHooksTriggeredMap_[1] = 0
    
                       +
                       |
                       |  first_iteration
                       |
                       v
    
        numGradHooksTriggeredMap_[1] = 1
    
                       +
                       |  prepare_for_backward
                       |
                       |  reset_bucket_counting
                       v
    
     numGradHooksTriggeredMapPerIteration_ = numGradHooksTriggeredMap_
                       +
                       |
                       |
                       | backward
                       |
                       | autograd_hook
                       v
                                                                   YES
     if (++numGradHooksTriggeredMapPerIteration_[index]=== 0)?? +------->  mark_variable_ready(1)
                       +
                       |  NO
                       |
                       v
    
    

    5.5 perIterationReadyParams_

    每个迭代之中,perIterationReadyParams_ 表示就绪的参数。

    // Per iteration set of parameter indices that have been marked ready.
    std::unordered_set<size_t> perIterationReadyParams_;
    

    5.5.1 设置

    就是如果某个variable是就绪状态,就插入到 perIterationReadyParams_。

    void Reducer::mark_variable_ready(VariableIndex index) {
    
      if (should_rebuild_buckets()) {
        push_rebuilt_params(index);
      }
    
      const auto replica_index = index.replica_index;
      const auto variable_index = index.variable_index;
    
      if (replica_index == 0) {
        checkAndRaiseMarkedTwiceError(variable_index);
        perIterationReadyParams_.insert(variable_index);
      }
    }
    

    5.5.2 重置

    在反向传播之前,会重置这个变量。

    void Reducer::prepare_for_backward(
        const std::vector<torch::autograd::Variable>& outputs) {
    
      // Reset per iteration marked ready parameters.
      perIterationReadyParams_.clear();
    
    }
    

    5.5.3 使用

    就是遍历perIterationReadyParams_,如果没找到,就返回。

    在 rebuild_buckets 方法中会调用 ensure_prior_reduction_finished,里面会调用这两个方法来校验。

    std::vector<std::string> Reducer::getUnmarkedParamsForIteration() {
      std::vector<std::string> unMarkedParamNames;
      for (const auto& it : param_names_) {
        if (perIterationReadyParams_.find(it.first) ==
            perIterationReadyParams_.end()) {
          unMarkedParamNames.push_back(it.second);
        }
      }
      return unMarkedParamNames;
    }
    
    std::vector<size_t> Reducer::getUnmarkedParamIndicesForIteration() {
      std::vector<size_t> unmarked_param_indices;
      const auto variable_count = replicas_[0].size();
      for (size_t variable_index = 0; variable_index < variable_count; variable_index++) {
        if (perIterationReadyParams_.find(variable_index) == perIterationReadyParams_.end()) {
          unmarked_param_indices.push_back(variable_index);
        }
      }
      return unmarked_param_indices;
    }
    

    5.6 使用过的参数

    以下两个变量用来记录本地使用过的参数,其标示在未启用同步的情况下(no_sync is on),在当前迭代或者 no_sync session 之中,这些参数是否在本地被使用过。

    每个模型副本对应map中的一个张量,每个张量是参数数量的一维int32(one-dim int32)张量。

    这些张量在autograd_hook中标记,以指示已使用了相应的参数。这些张量会在当前迭代或无同步会话(no_sync session)的后向传播结束时进行allreduce,以计算出全局未使用的参数。

    // Locally used parameter maps indicating if parameters are used locally
    // during the current iteration or no_sync session if no_sync is on. One
    // tensor for each model replica and each tensor is one-dim int32 tensor of
    // number of parameters. These tensors are marked in autograd_hook to indicate
    // the corresponding param has been used, and get allreduced in the end of
    // backward of current iteration or no_sync session for figuring out the
    // globally unused parameters.
    //
    // local_used_maps_:     CPU tensors for bookkeeping locally used params
    // local_used_maps_dev_: dev tensors for reducing globally unused params
    std::vector<at::Tensor> local_used_maps_; // autograd_hook中会设置,对应论文中的
    std::vector<at::Tensor> local_used_maps_dev_; // GPU
    

    5.6.1 论文

    此处可以结合论文看看。

    全局未使用参数(Globally Unused Parameters)的梯度在向前和向后过程中应保持不变。检测未使用的参数需要全局信息,因为在一个DDP过程中,一个参数可能在一次操作中不存在,但可能在另一个过程的同一次迭代中参与训练。因此DDP在位图中维护本地未使用的参数信息,并启动额外的AllReduce以收集全局位图。由于位图比张量尺寸小得多,因此模型中的所有参数共享同一位图,而不是创建每桶位图(per-bucket bitmaps)。位图位于CPU上,以避免为每次更新启动专用CUDA内核。但是,某些ProcessGroup后端可能无法在CPU 张量上运行AllReduce。例如,ProcessGroupNCCL仅支持CUDA张量。此外,由于DDP应该与任何定制的ProcessGroup后端一起工作,它不能假设所有后端都支持CPU张量。为了解决这个问题,DDP在同一设备上维护另一个位图作为第一个模型参数,并调用非阻塞拷贝操作(non-blocking copy)将CPU位图移动到设备位图以进行集合通信

    5.6.2 初始化

    初始化函数如下:

    void Reducer::initialize_local_used_map() {
      const auto replica_count = replicas_.size();
      const auto variable_count = replicas_[0].size();
      local_used_maps_.resize(replica_count);
      local_used_maps_dev_.resize(replica_count);
    
      for (size_t i = 0; i < replica_count; i++) {
        at::TensorOptions options;
        options = options.dtype(at::kInt);
    
        // Deliberately don't pin the memory even if local_used_maps_dev_ will
        // be cuda. See Note [local_used_maps_ -> local_used_maps_dev copying]
        local_used_maps_[i] =
            at::zeros({static_cast<long>(variable_count)}, options);
    
        // This tensor needs to be on the same device as replica because backend
        // such as NCCL may not support CPU tensors, and hence it might not work
        // if we always put it on CPU.
        options = options.device(replicas_[i][0].device());
        local_used_maps_dev_[i] =
            at::empty({static_cast<long>(variable_count)}, options);
      }
    }
    

    5.6.3 重置

    finalize_bucket_dense 和 finalize_backward 都会重置。

    void Reducer::finalize_backward() {
      if (dynamic_graph_find_unused()) {
        // Reset unused parameter accounting.
        // See Note [local_used_maps_ -> local_used_maps_dev copying]
        for (auto& local_used : local_used_maps_) {
          local_used.fill_(0);
        }
        local_used_maps_reduced_ = false;
      }  
    

    5.6.4 设置

    autograd_hook 之中如果使用了,就设置为1

    void Reducer::autograd_hook(VariableIndex index) {
    
      // 在这里会记录,已经使用了。	
      // See Note [Skip allreducing local_used_maps_dev]
      if (dynamic_graph_find_unused() || static_graph_first_iteration()) {
        // Since it gets here, this param has been used for this iteration. We want
        // to mark it in local_used_maps_. During no_sync session, the same var can
        // be set multiple times, which is OK as does not affect correctness. As
        // long as it is used once during no_sync session, it is marked as used.
        local_used_maps_[index.replica_index][index.variable_index] = 1;
      }
    

    5.6.5 使用

    在 mark_variable_ready 时候会调用到 all_reduce_local_used_map,如果需要同步,这里进行同步。我们还是翻译一下注释:

    • DDP 用异步H2D来避免阻塞开销。异步复制和allreduce 会着眼于当前流,因此将正确排序。

    • 关于主机操作的正确顺序也很重要。H2D copy_ 是按流排序的,而主机对 local_used_maps_ 的更改是按主机排序的。

    • 如果大量积压的cuda流工作将 copy_ 操作推迟到将来,并且如果从现在到finalize_backward 之间没有发生阻塞调用,那么finalize_backward 会在流执行复制之前将主机上使用的本地映射重新归零,在这种情况下,copy_会读取到这些零,而不是我们在这里告诉它读取的值。

    • 将 local_used_maps_[i] 复制到pinned临时内存(固定的缓存分配器应该异步提供)可以避免这种恶劣的、罕见的争用情况。

    • 在希望使用所有参数的情况下,从现在到重新调零,DDP本身不会做任何阻塞工作,因此这种危险情况是真实存在的。

    • 所以,Reducer 采用防御性操作,以确保 local_used_maps_tmp 与local_used_maps_[i] 不同。

    void Reducer::all_reduce_local_used_map() {
      // See Note [Skip allreducing local_used_maps_dev]
        // H2D from local_used_maps_ to local_used_maps_dev_
        for (size_t i = 0; i < local_used_maps_.size(); i++) {
          if (local_used_maps_dev_[i].is_cuda()) {
            // Note [local_used_maps_ -> local_used_maps_dev copying]
            // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
            // We do async H2D to avoid the blocking overhead. The async copy and
            // allreduce respect the current stream, so will be sequenced
            // correctly.
            //
            // Correct sequencing with respect to host operations is also
            // essential. The H2D copy_ is stream ordered, while the host's
            // changes to local_used_maps_ are host ordered. If a large backlog of
            // cuda-stream work pushes the copy_ far into the future, and if no
            // blocking calls occur between now and finalize_backward()** such
            // that finalize_backward() re-zeroes local_used_maps_ on the host
            // before the stream executes the copy_, copy_ will read those zeros
            // instead of the values we thought we told it to read here. Copying
            // local_used_maps_[i] to a pinned temporary (which the pinned caching
            // allocator should supply asynchronously) avoids this nasty, rare
            // race condition.
            //
            // ** In the hoped-for case where all params are used, DDP itself
            // won't do any blocking work between now and the re-zeroing, so the
            // danger is real.
            //
            // Defensively ensures local_used_maps_tmp is distinct from
            // local_used_maps_[i]
            auto local_used_maps_tmp = at::native::empty_like(
                local_used_maps_[i],
                optTypeMetaToScalarType(local_used_maps_[i].options().dtype_opt()),
                local_used_maps_[i].options().layout_opt(),
                local_used_maps_[i].options().device_opt(),
                true /* pinned_memory */);
            // Paranoid asserts here because in some workloads, the pinned
            // allocator behaves in a way we don't understand, and may be bugged.
            // See https://github.com/pytorch/pytorch/pull/54474
            TORCH_INTERNAL_ASSERT(local_used_maps_tmp.is_pinned());
            TORCH_INTERNAL_ASSERT(
                local_used_maps_tmp.data_ptr() != local_used_maps_[i].data_ptr());
            local_used_maps_tmp.copy_(local_used_maps_[i]);
            local_used_maps_dev_[i].copy_(local_used_maps_tmp, true);
          } else {
            local_used_maps_dev_[i].copy_(local_used_maps_[i], true);
          }
        }
        local_used_work_ = process_group_->allreduce(local_used_maps_dev_);
    }
    

    5.7 计算梯度支撑类

    我们接下来分析一些计算梯度所涉及到的基本函数和支撑类。

    5.7.1 RpcContext

    该类用来封装 distributed::autograd::ContextPtr。

    struct RpcContext {
      using ContextPtr = torch::distributed::autograd::ContextPtr;
      // The shared_ptr is to hold the context instance.
      ContextPtr context_ptr_holder;
      std::atomic<ContextPtr::element_type*> context_ptr{nullptr};
    
      void set(ContextPtr&& new_context_ptr);
    };
    RpcContext rpc_context_;
    

    5.7.2 hooks_

    其作用就是保持了 autograd hook,也是起到了bookkeeping 作用。

    std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
        hooks_;
    

    初始化如下:

            // Hook to execute after the gradient accumulator has executed.
            hooks_.emplace_back(
                grad_accumulator->add_post_hook(
                    torch::make_unique<torch::autograd::utils::LambdaPostHook>(
                        [=](const torch::autograd::variable_list& outputs,
                            const torch::autograd::variable_list& /* unused */) {
    #ifndef _WIN32
                          this->rpc_context_.set(
                              ThreadLocalDistAutogradContext::getContextPtr());
    #endif
                          this->autograd_hook(index);
                          return outputs;
                        })),
                grad_accumulator);
    

    5.7.3 comm_hook_

    5.7.3.1 概念

    我们通过 [DDP Communication Hook] 来看看概念。

    DDP通信钩子是一种增强功能,它提供了一个钩子,其可用于覆盖DDP来进行跨rank梯度通信,这可用于梯度压缩/GossipGrad等算法。可以使用Python API register_comm_hook来注册钩子函数。

    如果未注册DDP通信钩子(DDP communication hook),则reducer只需调用allreduce即可对桶进行规约。如果注册了,则会调用钩子并使用future work handle来处理。如果注册,reducer也会跳过"将梯度除以世界大小(world size)" 这个步骤。这样做的目的是:通信钩子可以完全覆盖我们执行通信的方式,用户可以完全控制如何处理梯度。

    PythonCommHookCommHookInterface的子类,其可以注册一个 Python 钩子。此外,还有一些内置的C++钩子实现,可以通过调用Python API register_builtin_comm_hook来指定。

    // Note [DDP Communication Hook]
    // ~~~~~~~~~~~~~~~~~~~~~~~~~~
    // If DDP communication hook is not registered, the reducer reduces the buckets
    // by just calling allreduce. If registered, it calls the hook and uses future
    // work handle. If registered, reducer also skips dividing grads by world size.
    // The reason for this is that the communication hook is expected to completely
    // override how we perform communication and the user should have complete
    // control over how the grads are handled.
    //
    // DDP communication hook is an enhancement that provides a hook which can be
    // used to override how DDP communicates gradients across ranks, this can be
    // used for algorithms like Gradient Compression/GossipGrad. This hook can be
    // registered from Python API using `register_comm_hook`. `PythonCommHook`
    // enables registering a Python hook and is a subclass of `CommHookInterface`.
    // Additionally, there are also some built-in C++ hook implementations that can
    // be specified by calling `register_builtin_comm_hook` from Python API.
    
    5.7.3.2 使用

    我们通过 torch/distributed/algorithms/ddp_comm_hooks/default_hooks.py 来看看。

    下面的 hook 就是在 all-reduce 前后进行自己的特殊处理。如果使用这个 hook,就使用 ddp_model.register_comm_hook(process_group, fp16_compress_hook)。

    def fp16_compress_hook(
        process_group: dist.ProcessGroup, bucket: dist.GradBucket
    ) -> torch.futures.Future:
        """
        This DDP communication hook implements a simple gradient compression
        approach that casts ``GradBucket`` tensors to half-precision floating-point format (``torch.float16``)
        and then divides it by the process group size.
        It allreduces those ``float16`` gradient tensors. Once compressed gradient
        tensors are allreduced, the chained callback ``decompress`` casts it back to the input data type (such as ``float32``).
    
        Example::
            >>> ddp_model.register_comm_hook(process_group, fp16_compress_hook)
        """
        group_to_use = process_group if process_group is not None else dist.group.WORLD
        world_size = group_to_use.size()
    
        compressed_tensor = bucket.get_tensor().to(torch.float16).div_(world_size)
    
        fut = dist.all_reduce(
            compressed_tensor, group=group_to_use, async_op=True
        ).get_future()
    
        def decompress(fut):
            decompressed_tensor = bucket.get_tensor()
            # Decompress in place to reduce the peak memory.
            # See: https://github.com/pytorch/pytorch/issues/45968
            decompressed_tensor.copy_(fut.value()[0])
            return [decompressed_tensor]
    
        return fut.then(decompress)
    

    5.7.4 runGradCallbackForVariable

    mark_variable_ready_dense 函数会调用到 runGradCallbackForVariable。

    5.7.4.1 Reducer

    Reducer的runGradCallbackForVariable如下,其调用 distributed::autograd::ContextPtr.runGradCallbackForVariable 来处理。

    void Reducer::runGradCallbackForVariable(
        at::Tensor& variable,
        GradCallback&& cb) {
      // 加载rpc context
      auto context_ptr = rpc_context_.context_ptr.load();
      if (context_ptr == nullptr) {
        cb(variable.mutable_grad());
      } else {
        // Under distributed autograd
    #ifndef _WIN32
        // 下面分析
        context_ptr->runGradCallbackForVariable(variable, std::move(cb));
    #endif
      }
    }
    
    5.7.4.2 DistAutogradContext

    我们顺着来到 DistAutogradContext

    它会在累积的梯度之中,在 accumulatedGrads_ 之中找到张量 对应的梯度 grad,然后用传入的回调函数来处理梯度grad,最后把处理后的梯度拷贝回accumulatedGrads_。这样就从 hook获取梯度 开始,到传回规约之后的梯度结束,完成了一个闭环

    void DistAutogradContext::runGradCallbackForVariable(
        const torch::autograd::Variable& variable,
        GradCallback&& cb) {
      torch::Tensor grad;
      {
        std::lock_guard<std::mutex> guard(lock_);
        auto it = accumulatedGrads_.find(variable); // 找到张量 对应的梯度 grad
        TORCH_INTERNAL_ASSERT(
            it != accumulatedGrads_.end(),
            "The grad for the variable should exist in dist_autograd context.");
        grad = it->value();
      }
      if (cb(grad)) { // 用传入的回调函数来处理梯度grad
        std::lock_guard<std::mutex> guard(lock_);
        auto device = grad.device();
        // Needs to update the grad in the map.
        accumulatedGrads_.insert_or_assign(variable, std::move(grad)); //最后把处理后的梯度拷贝回accumulatedGrads_
        recordGradEvent(device);
      }
    }
    

    DistAutogradContext 的 accumulatedGrads_会记录张量对应的当前梯度。

    // DistAutogradContext which stores information for a single distributed
    // autograd pass on a worker.
    class TORCH_API DistAutogradContext {
     public:
      // Gradients accumulated in this context so far. The key is the variable on
      // which the gradient needs to be accumulated and the value is the gradient
      // that needs to be accumulated on that variable..
      c10::Dict<torch::Tensor, torch::Tensor> accumulatedGrads_;  
    }
    

    至此,我们初步介绍了一些基本类,下一章继续介绍(是在是太多了......)。

    0xFF 参考

    pytorch分布式系列3——分布式训练时,torch.utils.data.distributed.DistributedSampler做了什么?

    pytorch分布式系列1——搞清torch.distributed.launch相关的环境变量

    pytorch分布式系列2——DistributedDataParallel是如何做同步的?

    pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel

    Pytorch的nn.DataParallel

    https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20

    https://pytorch.org/docs/stable/distributed.html

    PyTorch 源码解读之分布式训练了解一下?

    实操教程|PyTorch AutoGrad C++层实现

    PYTORCH 自动微分(一)

    PyTorch如何加速数据并行训练?分布式秘籍大揭秘

    pytorch分布式训练(二init_process_group)

    https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

    https://pytorch.org/docs/master/notes/ddp.html

    https://pytorch.org/tutorials/intermediate/dist_tuto.html

    PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析

    Pytorch模型中的parameter与buffer

  • 相关阅读:
    VS2019正式版 密钥 Visual Studio 2019 破解 激活码 Key
    关于随机数的前世今生
    木兮的纪中集训感想
    浅谈欧洲算法——模拟退火
    你没听过的梅森旋转算法
    二分贪心杂题
    DP专项训练
    实验八 进程间通信
    实验七 信号
    实验六 进程基础
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/15584523.html
Copyright © 2011-2022 走看看