zoukankan      html  css  js  c++  java
  • [源码解析] PyTorch 分布式(3) DataParallel(下)

    [源码解析] PyTorch 分布式(3) ----- DataParallel(下)

    0x00 摘要

    本文是 PyTorch 分布式的第三篇,继续上文,介绍 DataPrallel 的并行操作和反向传播。

    本系列其他文章如下:

    深度学习利器之自动微分(1)

    深度学习利器之自动微分(2)

    [源码解析]深度学习利器之自动微分(3) --- 示例解读

    [源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)

    [源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)

    [源码解析] PyTorch如何实现前向传播(3) --- 具体实现

    [源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎

    [源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构

    [源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑

    [源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法

    [源码解析] PyTorch 分布式(1)------历史和概述

    [源码解析] PyTorch 如何使用GPU

    [源码解析] PyTorch 分布式(2) ----- DataParallel(上)

    0x01 前向操作

    我们先回忆一下目前的前向图,replicate 调用了Broadcast.forward,同时往其context 存储了input_device和num_inputs。

    +----------------------------------------------------------------------------------------+
    | DataParallel.forward                                                                   |
    |                                                                                        |
    |                                                                                        |
    |              replicate +--------------->   parallel_apply             gather           |
    |                                                                                        |
    +----------------------------------------------------------------------------------------+
    
         +---------------------------+
         | Broadcast                 |
         |                           |
         |                           |
         |                           |
         |          forward()  +----------->
         |                           |
         |                           |
         |  +---------------------+  |
         |  | ctx                 |  |
         |  |       input_device  |  |
         |  |                     |  |
         |  |       num_inputs    |  |
         |  |                     |  |
         |  +---------------------+  |
         |                           |
         |                           |
         |                           |
         |                           |
         |                           |
         |                           |
         +---------------------------+
    
    

    1.1 并行

    目前,我们已经使用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,用 Replicate 函数将模型从 device[0] 复制到不同的卡,这样各个卡都有了同样的模型和不同的数据,现在就要分别调用 forward 计算损失和梯度。也就是 parallel_apply 部分。

    # 分发数据
    inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)      
    # 分发模型
    replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
    
    # 并行训练
    outputs = self.parallel_apply(replicas, inputs, kwargs)
    

    对应我们传播图是:

    parallel_apply 是基于threading 实现,用前面准备好的 replica 和输入数据,然后for 循环启动多线程进行前向传播,最后输出传播结果。

    def parallel_apply(modules, inputs, kwargs_tup=None, devices=None):
    
        # 确保模型和输入大小一致
        assert len(modules) == len(inputs)
    
        # 确保每个 GPU 都有相应的元数据,如没有就空白补全
        if kwargs_tup is not None:
            # 在前面已经补全
            assert len(modules) == len(kwargs_tup)
        else:
            kwargs_tup = ({},) * len(modules)
    
        # 确保模型数目和CPU数目一致    
        if devices is not None:
            assert len(modules) == len(devices)
        else:
            devices = [None] * len(modules)
    
        devices = [_get_device_index(x, True) for x in devices]
    
        # 基于threading多线程实现
        lock = threading.Lock()
        results = {}
        grad_enabled, autocast_enabled = torch.is_grad_enabled(), torch.is_autocast_enabled()
    
        # 定义 worker
        def _worker(i, module, input, kwargs, device=None):
            torch.set_grad_enabled(grad_enabled)
            if device is None:
                device = get_a_var(input).get_device()
            try:
                # 设置当前的设备
                with torch.cuda.device(device), autocast(enabled=autocast_enabled):
                    # this also avoids accidental slicing of `input` if it is a Tensor
                    if not isinstance(input, (list, tuple)):
                        input = (input,)
                    output = module(*input, **kwargs) # 前向操作
                with lock:
                    # 并行计算得到输出
                    results[i] = output
            except Exception:
                with lock:
                    results[i] = ExceptionWrapper(
                        where="in replica {} on device {}".format(i, device))
    
        if len(modules) > 1:
            # 如有一个进程控制多个 GPU ,起多个线程
            # 注意,这里就是每个 worker 调用了 modules 数组中的一个模型copy
            threads = [threading.Thread(target=_worker,
                                        args=(i, module, input, kwargs, device))
                       for i, (module, input, kwargs, device) in
                       enumerate(zip(modules, inputs, kwargs_tup, devices))]
    
            for thread in threads:
                thread.start()
            for thread in threads:
                thread.join()
        else:
            # 一个GPU对应一个进程
            _worker(0, modules[0], inputs[0], kwargs_tup[0], devices[0])
    
        outputs = []
        for i in range(len(inputs)):
            output = results[i]
    
            # error handle
            if isinstance(output, ExceptionWrapper):
                output.reraise()
            outputs.append(output)
            
        # 输出 n 个计算结果
        return outputs
    

    此时前向传播具体对应如下图,现在并行操作调用了 module 的forward方法。

    +----------------------------------------------------------------------------------------+
    | DataParallel.forward                                                                   |
    |                                                                                        |
    |                  1                               2                      3              |
    |              replicate +--------------->   parallel_apply             gather           |
    |                                                                                        |
    +----------------------------------------------------------------------------------------+
    
         +---------------------------+       +-------------------+
         | Broadcast                 |       | module            |
         |                           |       |                   |
         |                           |       |                   |
         |              1            |       |         2         |
         |          forward()  +-----------> |      forward() +--------->
         |                           |       |                   |
         |                           |       |                   |
         |  +---------------------+  |       |                   |
         |  | ctx                 |  |       |                   |
         |  |       input_device  |  |       |                   |
         |  |                     |  |       |                   |
         |  |       num_inputs    |  |       |                   |
         |  |                     |  |       |                   |
         |  +---------------------+  |       |                   |
         |                           |       |                   |
         |                           |       |                   |
         |                           |       |                   |
         |                           |       |                   |
         |                           |       |                   |
         |                           |       |                   |
         +---------------------------+       +-------------------+
    
    

    1.2 Gather

    目前,我们已经使用 Scatter 函数将数据从 device[0] 分配并复制到不同的卡,用 Replicate 函数将模型从 device[0] 复制到不同的卡,这样各个卡都有了同样的模型和不同的数据,然后分别调用 forward 计算损失和梯度。也就是 parallel_apply 部分。

    现在要做的就是把分布式计算的梯度合并到 device[0],就是 self.output_device。

    # 分发数据
    inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)      
    # 分发模型
    replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
    # 并行训练
    outputs = self.parallel_apply(replicas, inputs, kwargs)
    # 收集到 devices[0]
    return self.gather(outputs, self.output_device)
    

    对应我们传播图是:

    我们看看如何把结果收集到 device[0],以及device[0]如何作为参数服务器。

    1.2.1 Python世界

    gather 主要是调用 Gather.apply(target_device, dim, *outputs) 完成收集工作。

    def gather(outputs, target_device, dim=0): # target_device 就是 device[0]
        r"""
        Gathers tensors from different GPUs on a specified device
          (-1 means the CPU).
        """
        def gather_map(outputs):
            out = outputs[0]
            if isinstance(out, torch.Tensor):
                return Gather.apply(target_device, dim, *outputs) # 调用下面的 Gather
            if out is None:
                return None
            if isinstance(out, dict):
                return type(out)(((k, gather_map([d[k] for d in outputs]))
                                  for k in out))
            return type(out)(map(gather_map, zip(*outputs)))
    
        # Recursive function calls like this create reference cycles.
        # Setting the function to None clears the refcycle.
        try:
            res = gather_map(outputs)
        finally:
            gather_map = None
        return res
    

    Gather 则调用了 comm.gather 完成工作,而 comm.gather 则会带领我们进入到 C++世界。

    我们省略一些校验代码。

    # Gather 源码
    class Gather(Function):
    
        @staticmethod
        def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0]
    
            # 下面会往 context 内部存放几个变量,后续会用到
            target_device = _get_device_index(target_device, True)
            ctx.target_device = target_device
            ctx.dim = dim
            ctx.input_gpus = tuple(i.get_device() for i in inputs)
    
            if all(t.dim() == 0 for t in inputs) and dim == 0:
                inputs = tuple(t.view(1) for t in inputs)
                ctx.unsqueezed_scalar = True
            else:
                ctx.unsqueezed_scalar = False
                
            ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
            return comm.gather(inputs, ctx.dim, ctx.target_device) # 这里会进入C++世界
    
        @staticmethod
        def backward(ctx, grad_output): # 注意,这里后续会用到
            scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
            if ctx.unsqueezed_scalar:
                scattered_grads = tuple(g[0] for g in scattered_grads)
            return (None, None) + scattered_grads
    

    现在前向计算如图:

    gather 调用到了Gather的forward 函数,forward 方法在 ctx 存储了 input_gpus, input_sizes, dim 这三个变量,这些变量后续会用到。

    +-----------------------------------------------------------------------------------------+
    | DataParallel.forward                                                                    |
    |                                                                                         |
    |                  1                               2                           3          |
    |              replicate +--------------->   parallel_apply +--------------> gather       |
    |                                                                                         |
    +-----------------------------------------------------------------------------------------+
    
         +---------------------------+       +-------------------+       +--------------------+
         | Broadcast                 |       | module            |       |Gather              |
         |                           |       |                   |       |                    |
         |                           |       |                   |       |                    |
         |              1            |       |         2         |       |         3          |
         |          forward()  +-----------> |      forward() +--------> |      forward()     |
         |                           |       |                   |       |                    |
         |                           |       |                   |       |                    |
         |  +---------------------+  |       |                   |       | +----------------+ |
         |  | ctx                 |  |       |                   |       | |ctx             | |
         |  |       input_device  |  |       |                   |       | |     input_gpus | |
         |  |                     |  |       |                   |       | |                | |
         |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
         |  |                     |  |       |                   |       | |                | |
         |  +---------------------+  |       |                   |       | |     dim        | |
         |                           |       |                   |       | +----------------+ |
         |                           |       |                   |       |                    |
         |                           |       |                   |       |                    |
         |                           |       |                   |       |                    |
         |                           |       |                   |       |                    |
         |                           |       |                   |       |                    |
         +---------------------------+       +-------------------+       +--------------------+
    
    

    1.2.2 C++世界

    gather 函数调用了 _gather_out_impl 来完成拷贝操作。

    at::Tensor gather(
        at::TensorList tensors,
        int64_t dim,
        c10::optional<int32_t> destination_index) { // destination_index 就是 device[0] 的index
    
      int64_t total_size = 0;
      auto& first = tensors.front();
      const auto first_size = first.sizes();
      dim = at::maybe_wrap_dim(dim, first);
      std::vector<int64_t> expected_size(first_size.begin(), first_size.end());
      auto memory_format = first.suggest_memory_format();
      for (size_t i = 0; i < tensors.size(); i++) {
        const auto& tensor = tensors[i];
        expected_size[dim] = tensor.size(dim);
        total_size += tensor.size(dim);
        if (memory_format != MemoryFormat::Contiguous &&
            tensor.suggest_memory_format() != memory_format) {
          memory_format = MemoryFormat::Contiguous;
        }
      }
      expected_size[dim] = total_size;
      at::Device device(DeviceType::CPU);
      // 根据 index 得到输出的目标设备
      if (!destination_index || *destination_index != -1) {
        // device 就是 GPU 0 这个设备
        device = at::Device(
            DeviceType::CUDA, destination_index ? *destination_index : -1);
      }
    
      //首先,构建一个空的目标tensor建立在目标设备之上,命名为result
      at::Tensor result =
          at::empty(expected_size, first.options().device(device), memory_format);
      
      return _gather_out_impl(tensors, result, dim); // 然后对result进行gather
    }
    

    _gather_out_impl 执行了具体的gather 操作,就是把 输入的tensors 拷贝到 目标 tensor 之上,即拷贝到 GPU0 之上。

    // ***************** Gather *******************
    //
    // Gather a list of CUDA tensors on one or more devices to a target tensor or
    // device, either CPU or CUDA.
    
    // no checks
    static inline at::Tensor& _gather_out_impl(
        at::TensorList tensors,
        at::Tensor& out_tensor,
        int64_t dim) {
      std::vector<int64_t> chunk_sizes;
      chunk_sizes.reserve(tensors.size());
      for (auto& tensor : tensors) {
        chunk_sizes.push_back(tensor.size(dim));
      }
      auto chunks =
          out_tensor.split_with_sizes(/*split_sizes=*/chunk_sizes, /*dim=*/dim);
      for (size_t i = 0; i < tensors.size(); i++) { // 拷贝到GPU 0 之上
        chunks[i].copy_(tensors[i], /*non_blocking=*/out_tensor.is_cuda());
      }
      return out_tensor;
    }
    

    0x02 计算损失

    现在,我们已经把梯度收集到 device[0] 之上,现在我们开始进行反向传播,其整体逻辑如上图所示。首先是在 device[0] 计算损失。其实这步计算损失算是前向计算和后向传播的中间环节,这里把它算成是反向传播的开端,如下图。

    我们找出来示例代码看看,里面关键的几点:

    1. 数据已经放到了默认GPU,即GPU 0上。
    2. prediction 是gather到 GPU 0 的前向计算输出
    3. 使用 loss = criterion(prediction,target_var) 在默认GPU之上计算loss。
    4. 使用 loss.backward() 开始反向传播。
    for batch_idx, (data, label) in pbar:   
        if args.cuda:
            data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上
        data_v = Variable(data)
        target_var = Variable(label)
        prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出
        
        # 到目前为止,我们完成了DataParallel.forward()
        #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在于前向传播里
        #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果聚合到主gpu里
    
        criterion = nn.CrossEntropyLoss()
        loss = criterion(prediction,target_var)  # 3. 在默认GPU之上计算loss
        optimizer.zero_grad()
        loss.backward()   # 4. 开始反向传播
        optimizer.step()
    

    0x03 后向传播

    我们前面运行的是上面的 Forward 部分,计算损失,接下来就运行上面代码中 loss.backward() 部分。

    3.1 分发梯度

    我们首先来到分发梯度部分,这部分作用是:把损失在 GPUs 之间 scatter,这样后续才可以在每个GPU之上独立进行后向传播。对应下图:

    3.1.1 Gather.backward

    前面有提到,prediction 是gather到 GPU 0 的前向计算输出。而 loss 又是根据 prediction 计算出来,所以从 loss.backward() 开始反向传播,从后向前的第一个步骤就来到了 gather 的传播操作,对应的就是 Gather 的 backward 函数,其中的核心代码是 Scatter.apply。

    class Gather(Function):
    
        # 这里前向传播用到了,为了对照,我们依然贴出来
        @staticmethod
        def forward(ctx, target_device, dim, *inputs): # target_device 就是 device[0]
    
            # 下面会往 context 内部存放几个变量,后续会用到
            target_device = _get_device_index(target_device, True)
            ctx.target_device = target_device
            ctx.dim = dim
            ctx.input_gpus = tuple(i.get_device() for i in inputs)
    
            if all(t.dim() == 0 for t in inputs) and dim == 0:
                inputs = tuple(t.view(1) for t in inputs)
                ctx.unsqueezed_scalar = True
            else:
                ctx.unsqueezed_scalar = False
                
            ctx.input_sizes = tuple(i.size(ctx.dim) for i in inputs)
            # 这里会进入C++世界,把输出聚集到 GPU 0。
            return comm.gather(inputs, ctx.dim, ctx.target_device) 
    
        @staticmethod
        def backward(ctx, grad_output): # 这里现在后向传播用到了!
            # 把前向传播在 context 之中存放的变量取出,作为 Scatter 的输入 
            scattered_grads = Scatter.apply(ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output)
            if ctx.unsqueezed_scalar:
                scattered_grads = tuple(g[0] for g in scattered_grads)
            return (None, None) + scattered_grads
    

    具体如下,可以看到,backward 使用了之前前向传播时候存储的 ctx.input_gpus, ctx.input_sizes, ctx.dim, grad_output,以此调用 Scatter.apply。

    图中,最上面是前向传播过程,最下面是反向传播过程,中间是某些在前后传播中都用到的代码模块。

    +--------------------------------------------------------------------------------------+
    | DataParallel.forward                                                                 |
    |                                                                                      |
    |               1                               2                           3          |
    |           replicate +--------------->   parallel_apply +--------------> gather       |
    |                                                                                      |
    +--------------------------------------------------------------------------------------+
    
      +---------------------------+       +-------------------+       +--------------------+
      | Broadcast                 |       | module            |       |Gather              |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |              1            |       |         2         |       |         3          |
      |          forward()  +-----------> |      forward() +--------> |      forward()     |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |  +---------------------+  |       |                   |       | +----------------+ |
      |  | ctx                 |  |       |                   |       | |ctx             | |
      |  |       input_device  |  |       |                   |       | |     input_gpus | |
      |  |                     |  |       |                   |       | |                | |
      |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
      |  |                     |  |       |                   |       | |                | |
      |  +---------------------+  |       |                   |       | |     dim        | |
      |                           |       |                   |       | +----------------+ |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |                           |       |                   | <---------+ backward()     |
      |                           |       |                   |       |         3          |
      |                           |       |                   |       |                    |
      +---------------------------+       +-------------------+       +--------------------+
    
    +--------------------------------------------------------------------------------------+
    | loss.backward()                                                                      |
    |                                                                           3          |
    |                                                           <--------------------+     |
    |                                                                                      |
    |                                                                                      |
    +--------------------------------------------------------------------------------------+
    
    

    3.1.2 Scatter

    Scatter.apply 实际上调用到了其 forward 方法。

    • 首先从上下文之中提取之前存储的变量,这里主要是输入设备 input_device(源设备)和 target_gpus(目标设备)。
    • 获取到目标设备的流。
    • 调用 comm.scatter 把梯度分发到目标设备。
    class Scatter(Function):
    
        @staticmethod
        def forward(ctx, target_gpus, chunk_sizes, dim, input):
            target_gpus = [_get_device_index(x, True) for x in target_gpus]
            ctx.dim = dim
            ctx.input_device = input.get_device() if input.device.type != "cpu" else -1
            streams = None
            if torch.cuda.is_available() and ctx.input_device == -1:
                # Perform CPU to GPU copies in a background stream
                streams = [_get_stream(device) for device in target_gpus]
             
            # 分发到其他GPU
            outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams)
            
            # Synchronize with the copy stream
            if streams is not None:
                for i, output in enumerate(outputs):
                    with torch.cuda.device(target_gpus[i]):
                        main_stream = torch.cuda.current_stream()
                        main_stream.wait_stream(streams[i])
                        output.record_stream(main_stream)
            return outputs
    
        @staticmethod
        def backward(ctx, *grad_output):
            return None, None, None, Gather.apply(ctx.input_device, ctx.dim, *grad_output)
    

    3.1.3 C++

    上面python代码 outputs = comm.scatter(input, target_gpus, chunk_sizes, ctx.dim, streams) 会直接进入到C++世界。具体代码位于 torch/csrc/cuda/comm.cpp。

    scatter 的作用就是把tensor进行split,然后分发给各个设备的流。

    std::vector<at::Tensor> scatter(
        const at::Tensor& tensor,
        at::IntArrayRef devices,
        const c10::optional<std::vector<int64_t>>& chunk_sizes,
        int64_t dim,
        const c10::optional<std::vector<c10::optional<at::cuda::CUDAStream>>>&
            streams) {
      dim = at::maybe_wrap_dim(dim, tensor);
      
      // 把tensor进行split
      std::vector<at::Tensor> chunks = chunk_sizes
          ? tensor.split_with_sizes(/*split_sizes=*/*chunk_sizes, /*dim=*/dim)
          : tensor.chunk(/*chunks=*/devices.size(), /*dim=*/dim);
      
      at::cuda::OptionalCUDAStreamGuard cuda_guard;
      for (size_t i = 0; i < chunks.size(); ++i) {
        const auto device_index = static_cast<int16_t>(devices[i]);
        if (device_index != tensor.get_device()) {
          if (i < (streams ? streams->size() : 0U) && (*streams)[i]) {
            cuda_guard.reset_stream(*(*streams)[i]);
          }
          // 发送给各个设备的流
          chunks[i] = chunks[i].to( 
              {DeviceType::CUDA, device_index},
              /*non_blocking=*/true,
              /*copy=*/false,
              /*memory_format=*/at::MemoryFormat::Preserve);
        }
      }
      return chunks;
    }
    

    3.2 并行后向传播

    现在梯度已经分发到各个 GPU,接下来正式进入并行后向传播,这部分作用是:在各个GPU之上并行运行后向传播,计算参数梯度。对应下图:

    这部分调用到了原始模型的 backward,具体如下图中的数值 4:

    +--------------------------------------------------------------------------------------+
    | DataParallel.forward                                                                 |
    |                                                                                      |
    |               1                               2                           3          |
    |           replicate +--------------->   parallel_apply +--------------> gather       |
    |                                                                                      |
    +--------------------------------------------------------------------------------------+
    
      +---------------------------+       +-------------------+       +--------------------+
      | Broadcast                 |       | module            |       |Gather              |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |              1            |       |         2         |       |         3          |
      |          forward()  +-----------> |      forward() +--------> |      forward()     |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |  +---------------------+  |       |                   |       | +----------------+ |
      |  | ctx                 |  |       |                   |       | |ctx             | |
      |  |       input_device  |  |       |                   |       | |     input_gpus | |
      |  |                     |  |       |                   |       | |                | |
      |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
      |  |                     |  |       |                   |       | |                | |
      |  +---------------------+  |       |                   |       | |     dim        | |
      |                           |       |                   |       | +----------------+ |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |                           | <---------+  backward()   | <---------+ backward()     |
      |                           |       |          4        |       |         3          |
      |                           |       |                   |       |                    |
      +---------------------------+       +-------------------+       +--------------------+
    
    +--------------------------------------------------------------------------------------+
    | loss.backward()                                                                      |
    |                                                4                          3          |
    |                                     <------------------+  <--------------------+     |
    |                                                                                      |
    |                                                                                      |
    +--------------------------------------------------------------------------------------+
    
    

    3.3 归并梯度

    这部分作用是 在 GPU 0 之上归并梯度,总体流程拓展对应下图:

    3.3.1 Broadcast.backward

    这部分对应了 Broadcast 的 反向传播。

    class Broadcast(Function):
    
        @staticmethod
        def forward(ctx, target_gpus, *inputs):
            target_gpus = [_get_device_index(x, True) for x in target_gpus]
            
            # 前向传播时候,向上下文存入了一些变量
            ctx.target_gpus = target_gpus
            if len(inputs) == 0:
                return tuple()
            ctx.num_inputs = len(inputs)
            # input 放在 device[0],所以 input_device 就是 GPU 0
            ctx.input_device = inputs[0].get_device()
            # 和 detach 的情形一样
            outputs = comm.broadcast_coalesced(inputs, ctx.target_gpus)
            non_differentiables = []
            
            # 在上下文中设置哪些不需要梯度
            for idx, input_requires_grad in enumerate(ctx.needs_input_grad[1:]):
                if not input_requires_grad:
                    for output in outputs:
                        non_differentiables.append(output[idx])
            ctx.mark_non_differentiable(*non_differentiables)
            return tuple([t for tensors in outputs for t in tensors])
    
        @staticmethod
        def backward(ctx, *grad_outputs):
            # 反向传播来到这里,取出之前在上下文存放的变量作为输入。ctx.input_device 就是之前存储的 GPU 0。
            return (None,) + ReduceAddCoalesced.apply(ctx.input_device, ctx.num_inputs, *grad_outputs)
    

    因此,我们可以拓展流程图:

    +--------------------------------------------------------------------------------------+
    | DataParallel.forward                                                                 |
    |                                                                                      |
    |               1                               2                           3          |
    |           replicate +--------------->   parallel_apply +--------------> gather       |
    |                                                                                      |
    +--------------------------------------------------------------------------------------+
    
      +---------------------------+       +-------------------+       +--------------------+
      | Broadcast                 |       | module            |       |Gather              |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |              1            |       |         2         |       |         3          |
      |          forward()  +-----------> |      forward() +--------> |      forward()     |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |  +---------------------+  |       |                   |       | +----------------+ |
      |  | ctx                 |  |       |                   |       | |ctx             | |
      |  |       input_device  |  |       |                   |       | |     input_gpus | |
      |  |                     |  |       |                   |       | |                | |
      |  |       num_inputs    |  |       |                   |       | |     input_sizes| |
      |  |                     |  |       |                   |       | |                | |
      |  +---------------------+  |       |                   |       | |     dim        | |
      |                           |       |                   |       | +----------------+ |
      |                           |       |                   |       |                    |
      |                           |       |                   |       |                    |
      |          backward()       | <---------+  backward()   | <---------+ backward()     |
      |              5            |       |          4        |       |         3          |
      |                           |       |                   |       |                    |
      +---------------------------+       +-------------------+       +--------------------+
    
    +--------------------------------------------------------------------------------------+
    | loss.backward()                                                                      |
    |                5                               4                          3          |
    |         <------------------------+  <------------------+  <--------------------+     |
    |                                                                                      |
    |                                                                                      |
    +--------------------------------------------------------------------------------------+
    
    

    3.3.2 ReduceAddCoalesced

    Broadcast.backward 调用了 ReduceAddCoalesced.apply,其对应了 ReduceAddCoalesced 的 forward 方法,目的是把梯度归并到目标设备 destination,就是GPU 0。

    class ReduceAddCoalesced(Function):
    
        @staticmethod
        # 会调用到这里,destination 是GPU 0
        def forward(ctx, destination, num_inputs, *grads): 
            # 从梯度之中提取所在的设备
            ctx.target_gpus = [grads[i].get_device() for i in range(0, len(grads), num_inputs)]
    
            grads_ = [grads[i:i + num_inputs]
                      for i in range(0, len(grads), num_inputs)]
            # 把梯度归并到目标设备 destination,就是GPU 0
            return comm.reduce_add_coalesced(grads_, destination)
    
        @staticmethod
        def backward(ctx, *grad_outputs):
            return (None, None,) + Broadcast.apply(ctx.target_gpus, *grad_outputs)
    

    3.3.3 c++

    看注释就是:从多个 GPU 来相加梯度,代码之中就是归并相加。

    def reduce_add_coalesced(inputs, destination=None, buffer_size=10485760):
        """Sums tensors from multiple GPUs.
    
        Small tensors are first coalesced into a buffer to reduce the number
        of synchronizations.
    
        Args:
            inputs (Iterable[Iterable[Tensor]]): iterable of iterables that
                contain tensors from a single device.
            destination (int, optional): a device on which the output will be
                placed (default: current device).
            buffer_size (int): maximum size of the buffer used for coalescing
    
        Returns:
            A tuple of tensors containing an elementwise sum of each group of
            inputs, placed on the ``destination`` device.
        """
        dense_tensors: List[List] = [[] for _ in inputs]  # shape (num_gpus, num_tensors)
        output = []
        ref_order = []
        # process sparse ones first since they may have different sizes on different gpus
        for tensor_at_gpus in zip(*inputs):
            if all(t.is_sparse for t in tensor_at_gpus):
                # 进行归并
                result = reduce_add(tensor_at_gpus, destination)  # this will be sparse too
                output.append(result)
                ref_order.append(tensor_at_gpus[0])
            else:
                for coll, t in zip(dense_tensors, tensor_at_gpus):
                    coll.append(t.to_dense() if t.is_sparse else t)
                ref_order.append(dense_tensors[0][-1])
        itrs = [_take_tensors(tensors, buffer_size) for tensors in dense_tensors]
        # now the dense ones, which have consistent sizes
        for chunks in zip(*itrs):
            flat_tensors = [_flatten_dense_tensors(chunk) for chunk in chunks]  # (num_gpus,)
            # 进行归并
            flat_result = reduce_add(flat_tensors, destination)
            for t in _unflatten_dense_tensors(flat_result, chunks[0]):
                # The unflattened tensors do not share storage, and we don't expose
                # base flat tensor anyways, so give them different version counters.
                # See NOTE [ Version Counter in comm.*_coalesced ]
                output.append(t.data)
        return tuple(_reorder_tensors_as(output, ref_order))
    

    3.4 更新模型参数

    这部分功能是:更新梯度参数。进行梯度下降,并更新主GPU上的模型参数。

    另外,由于模型参数仅在主GPU上更新,而其他从属GPU此时并不是同步更新的,所以需要将更新后的模型参数复制到剩余的从属 GPU 中,以此来实现并行。这就是在下一次for循环之中进行,以此循环反复。

    对应示例代码是:

    for batch_idx, (data, label) in pbar:   # 6. 下一次迭代会继续从分发开始
        if args.cuda:
            data,label= data.cuda(),label.cuda(); # 1. 数据已经放到了默认GPU上
        data_v = Variable(data)
        target_var = Variable(label)
        prediction= model(data_v,target_var,args) # 2. prediction 是gather到 GPU 0 的前向计算输出
        
        # 到目前为止,我们完成了DataParallel.forward()
        #这里的prediction 预测结果是由两个gpu合并过的,并行计算只存在在前向传播里
        #前向传播每个gpu计算量为 batch_size/len(device_ids),等前向传播完了将结果和到主gpu里
    
        criterion = nn.CrossEntropyLoss()
        loss = criterion(prediction,target_var)  # 3. 在默认GPU之上计算loss
        optimizer.zero_grad()
        loss.backward()   # 4. 开始反向传播
        optimizer.step() # 5. 更新模型
    

    0x04 总结

    我们总结一下流程,起初数据和模型被放入到默认GPU,就是 GPU 0,然后迭代如下:

    1. scatter 会把数据分发到其他 GPU。
    2. replicate 会把模型分发到其他 GPU。
    3. parallel_apply 会启动多个线程进行前向计算。
    4. gather 会把计算输出收集到 GPU 0。
    5. GPU 0 会计算损失。
    6. 把梯度 scatter 到其他 GPU。
    7. 模型调用 backward 计算。
    8. 把梯度归并到 GPU 0。
    9. optimizer.step 更新模型。

    具体对应下图之中的数字。

                         +-----+                   +-------+
                         |GPU1 |                   | GPU1  |
    main thread          +-----+                   +-------+
     +-----> Forward----> scatter +--------------> replicate------->  parallel_apply  +-------->  gather +---------+
                            +                           +                     +                                    |
                          1 |                         2 |                   3 |                                    |
                            |                           |                     |                                    |
                            |  +---------+----------+---+                     |                                    |
                            |  |         |          |                         |                                    |
                            +---------+----------+  |               +--------------------+                         |
                            |  |      |  |       |  |               |         |          |                         |
                            |  | 2    |  | 2     |  | 2       thread|1     thread 2    thread 3                    |
                          1 |  |    1 |  |     1 |  |               |         |          |                         |
                            |  v      |  v       |  v               |         |          |                         |
                            v         v          v                  v         v          v                         |
                         +--+---+  +--+---+   +--+---+           +--+---+  +--+---+   +--+---+    +-------+        |
                         | GPU1 |  | GPU2 |   | GPU3 |           | GPU1 |  | GPU2 |   | GPU3 |    | GPU1  |        |
                         +------+  +------+   +------+           +--+---+  +-+----+   +---+--+    +-+-+--++        |
                                                                    |        |            |         ^ ^  ^         |
                                                                    |        |            |   4     | |  |         |
                                                                    |        |            ----------^ |  |         |
                                                                    |        |                4       |  |         |
                                                                    |        +------------------------+  |         |
                                                                    |                                    |         |
                                                                    +------------------------------------+         |
            +------------------------------------------------------------------------------------------------------+
            |                               +------+
            |                               | GPU1 |
            |                               +------+                                                                     main thread
            +-> loss = criterion(...)+-----> scatter   +-------------->  model.backward() +---------->  reduce gradient +-------> optimizer.step
                         +                      +                               +                          +------+         9
                         | 5                    | 6                             | 7                        | GPU1 |
                         |                      |                               |                          +--+---+
                         |              v---------------v             +--------------------+                  ^
                         |              |       |       |             |         |          |                  | 8
                         |              |       |       |         thread 1    thread 2   thread 3             |
                         |              |       |       |             +         |          |           +-------------+
                         |              |       |       |             |         |          |           |      |      |
                         v              v       v       v             v         v          v           |      |      |
                      +--+---+      +---+-+  +--+--+  +-+---+      +--+--+  +---+--+    +--+--+     +--+--+ +-+--+ +-+---+
                      | GPU1 |      | GPU1|  | GPU2|  |GPU3 |      | GPU1|  | GPU2 |    |GPU3 |     | GPU1| |GPU2| | GPU3|
                      +------+      +-----+  +-----+  +-----+      +-----+  +------+    +-----+     +-----+ +----+ +-----+
    
    

    手机如下:

    至此,DP 分析完毕,我们下一篇要介绍 DDP 的一些相关知识。

    0xFF 参考

    PyTorch 源码解读之 torch.optim:优化算法接口详解

    pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel

    Pytorch的nn.DataParallel

    PyTorch 源码解读之分布式训练了解一下?

    https://discuss.pytorch.org/t/dataparallel-imbalanced-memory-usage/22551/20

    [原创][深度][PyTorch] DDP系列第二篇:实现原理与源代码解析

    Pytorch-CUDA从入门到放弃(二)

    Pytorch踩坑记:赋值、浅拷贝、深拷贝三者的区别以及model.state_dict()和model.load_state_dict()的坑点

    PyTorch 源码解读之 DP & DDP:模型并行和分布式训练解析

  • 相关阅读:
    ServiceStack支持跨域提交
    CookiesHelper
    poj 3669 线段树成段更新+区间合并
    poj2528 线段树+离散化
    hdu3308 线段树 区间合并
    hdu1542矩阵的并 线段树+扫描线
    hdu1255 矩阵的交 线段树+扫描线
    简单单点更新线段树
    树状数组模版
    hdu1873优先队列
  • 原文地址:https://www.cnblogs.com/rossiXYZ/p/15538332.html
Copyright © 2011-2022 走看看