zoukankan      html  css  js  c++  java
  • pytorch中基于DistributedDataParallel实现多卡并行计算

    torch.nn.parallel.DistributedDataParallel提供了更为高效的单机多卡和多机多卡训练接口。

    DistributedDataParallel并行库接口参数参考:

    https://www.cnblogs.com/jiangkejie/p/13256115.html

    这里给出单机多卡的具体实现步骤

    参考:

    https://github.com/tczhangzhi/pytorch-distributed

    https://blog.csdn.net/zwqjoy/article/details/89415933

    https://www.cnblogs.com/yh-blog/p/12877922.html

     

    几个基本概念

    • group:

      即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。

      当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。

    • world size :

      表示全局进程个数。

    • rank:

      表示进程序号,用于进程间通讯,表征进程优先级。rank = 0 的主机为 master 节点。

    • local_rank:

      节点内,GPU 编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个节点内的第 1 块 GPU。节点内每个进程会分配一个 local_rank 参数,表示当前进程在当前主机上的编号。例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程。

    1:  torch.nn.parallel.DistributedDataParallel

    这个从名字上就能看出来与DataParallel相类似,也是一个模型wrapper。这个包是实现多机多卡分布训练最核心东西,它可以帮助我们在不同机器的多个模型拷贝之间平均梯度。

    2: torch.utils.data.distributed.DistributedSampler

    在多机多卡情况下分布式训练数据的读取也是一个问题,不同的卡读取到的数据应该是不同的。dataparallel的做法是直接将batch切分到不同的卡,这种方法对于多机来说不可取,因为多机之间直接进行数据传输会严重影响效率。于是有了利用sampler确保dataloader只会load到整个数据集的一个特定子集的做法。DistributedSampler就是做这件事的。它为每一个子进程划分出一部分数据集,以避免不同进程之间数据重复

    使用流程

    Pytorch 中分布式的基本使用流程如下:

    1. 在使用 distributed 包的任何其他函数之前,需要使用 init_process_group 初始化进程组,同时初始化 distributed 包。

    2. 如果需要进行小组内集体通信,用 new_group 创建子分组

    3. 创建分布式并行(DistributedDataParallel)模型 DDP(model, device_ids=device_ids)

    4. 为数据集创建distributed Sampler
    5. 使用启动工具 torch.distributed.launch 在每个主机上执行一次脚本,开始训练

    6. 使用 destory_process_group() 销毁进程组

    单机多卡--DistributedDataParallel

    import torch
    import torch.nn as nn
    from torch.autograd import Variable
    from torch.utils.data import Dataset, DataLoader
    import os
    from torch.utils.data.distributed import DistributedSampler
    # 1) 初始化
    torch.distributed.init_process_group(backend="nccl")
     
    input_size = 5
    output_size = 2
    batch_size = 30
    data_size = 90
     
    # 2) 配置每个进程的gpu
    local_rank = torch.distributed.get_rank()  # 也可以通过设置args.local_rank得到(见下文)
    torch.cuda.set_device(local_rank)
    device = torch.device("cuda", local_rank)
     
    class RandomDataset(Dataset):
        def __init__(self, size, length):
            self.len = length
            self.data = torch.randn(length, size).to('cuda')
     
        def __getitem__(self, index):
            return self.data[index]
     
        def __len__(self):
            return self.len
     
    dataset = RandomDataset(input_size, data_size)
    # 3)使用DistributedSampler
    rand_loader = DataLoader(dataset=dataset,
                             batch_size=batch_size,
                             sampler=DistributedSampler(dataset))
     
    class Model(nn.Module):
        def __init__(self, input_size, output_size):
            super(Model, self).__init__()
            self.fc = nn.Linear(input_size, output_size)
     
        def forward(self, input):
            output = self.fc(input)
            print("  In Model: input size", input.size(),
                  "output size", output.size())
            return output
     
    model = Model(input_size, output_size)
     
    # 4) 封装之前要把模型移到对应的gpu
    model.to(device)
     
    if torch.cuda.device_count() > 1:
        print("Let's use", torch.cuda.device_count(), "GPUs!")
        # 5) 封装
        model = torch.nn.parallel.DistributedDataParallel(model,
                                                          device_ids=[local_rank],
                                                          output_device=local_rank)
     
    for data in rand_loader:
        if torch.cuda.is_available():
            input_var = data
        else:
            input_var = data
     
        output = model(input_var)
        print("Outside: input size", input_var.size(), "output_size", output.size())
    

    torch.distributed.launch 会给模型分配一个args.local_rank的参数,也可以通过torch.distributed.get_rank()获取进程id。

    # 这个参数是torch.distributed.launch传递过来的,我们设置位置参数来接受,local_rank代表当前程序进程使用的GPU标号
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', default=-1, type=int, help='node rank for distributed training')
    args = parser.parse_args()
    print(args.local_rank))
    

      

    怎么将程序跑起来。这里也有两种方法:

    1. 用 torch.distributed.launch

    CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nproc_per_node=4 main.py

    参考:https://pytorch.org/docs/stable/distributed.html#distributed-launch

    torch.distributed package在torch. distribued .launch中提供了一个启动实用程序。

    这个辅助工具可以用于为每个节点启动多个进程,以进行分布式训练。它是在每个训练节点上生成多个分布式训练进程的模块。

    一些Notes参见:https://pytorch.org/docs/stable/distributed.html#distributed-launch

    2. 用 torch.multiprocessing:

    import torch.multiprocessing as mp
     
    def main(rank, your_custom_arg_1, your_custom_arg_2): # 这里第一个 rank 变量会被 mp.spawn 函数自动填充,可以充当 local_rank 来用(参见前面的代码)
        pass # 将前面那一堆东西包装成一个 main 函数
     
    mp.spawn(main, nprocs=how_many_process, args=(your_custom_arg_1, your_custom_arg_2))
    

    多机多卡训练

    参考:https://blog.csdn.net/zwqjoy/article/details/89415933

    Save and Load Checkpoints

    在训练期间,通常使用torch.save和torch.load来设置检查点模块并从检查点恢复。有关更多详细信息,请参见保存和加载模型

    使用DDP时,一种优化方法是仅在一个进程中保存模型,然后将其加载到所有进程中,从而减少写入开销。这是正确的,因为所有进程都从相同的参数开始,并且梯度在向后传递中同步,因此优化程序应将不同位置的模型参数设置为相同的值。

    如果使用此优化,请确保在保存完成之前所有进程不要开始加载(使用dist.barrier()命令)。

    此外,在加载模块时,您需要提供适当的map_location参数,以防止进程进入其他的设备。

    如果缺少map_location,torch.load将首先将模块加载到CPU,然后将每个参数复制到保存该参数的位置,这将导致同一台计算机上的所有进程使用相同的设备集。

    有关更高级的故障恢复和弹性支持,请参阅TorchElastic

    示例:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html

    import os
    import sys
    import tempfile
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    import torch.optim as optim
    import torch.multiprocessing as mp
    
    from torch.nn.parallel import DistributedDataParallel as DDP
    
    # On Windows platform, the torch.distributed package only
    # supports Gloo backend, FileStore and TcpStore.
    # For FileStore, set init_method parameter in init_process_group
    # to a local file. Example as follow:
    # init_method="file:///f:/libtmp/some_file"
    # dist.init_process_group(
    #    "gloo",
    #    rank=rank,
    #    init_method=init_method,
    #    world_size=world_size)
    # For TcpStore, same way as on Linux.
    
    def setup(rank, world_size):
        os.environ['MASTER_ADDR'] = 'localhost'
        os.environ['MASTER_PORT'] = '12355'
    
        # initialize the process group
        dist.init_process_group("gloo", rank=rank, world_size=world_size)
    
    def cleanup():
        dist.destroy_process_group()
    
    
    
    class ToyModel(nn.Module):
        def __init__(self):
            super(ToyModel, self).__init__()
            self.net1 = nn.Linear(10, 10)
            self.relu = nn.ReLU()
            self.net2 = nn.Linear(10, 5)
    
        def forward(self, x):
            return self.net2(self.relu(self.net1(x)))
    
    
    def demo_basic(rank, world_size):
        print(f"Running basic DDP example on rank {rank}.")
        setup(rank, world_size)
    
        # create model and move it to GPU with id rank
        model = ToyModel().to(rank)
        ddp_model = DDP(model, device_ids=[rank])
    
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    
        optimizer.zero_grad()
        outputs = ddp_model(torch.randn(20, 10))
        labels = torch.randn(20, 5).to(rank)
        loss_fn(outputs, labels).backward()
        optimizer.step()
    
        cleanup()
    
    
    def run_demo(demo_fn, world_size):
        mp.spawn(demo_fn,
                 args=(world_size,),
                 nprocs=world_size,
                 join=True)
    
    
    
    
    def demo_checkpoint(rank, world_size):
        print(f"Running DDP checkpoint example on rank {rank}.")
        setup(rank, world_size)
    
        model = ToyModel().to(rank)
        ddp_model = DDP(model, device_ids=[rank])
    
        loss_fn = nn.MSELoss()
        optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)
    
        CHECKPOINT_PATH = tempfile.gettempdir() + "/model.checkpoint"
        if rank == 0:
            # All processes should see same parameters as they all start from same
            # random parameters and gradients are synchronized in backward passes.
            # Therefore, saving it in one process is sufficient.
            torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
    
        # Use a barrier() to make sure that process 1 loads the model after process
        # 0 saves it.
        dist.barrier()
        # configure map_location properly
        map_location = {'cuda:%d' % 0: 'cuda:%d' % rank}
        ddp_model.load_state_dict(
            torch.load(CHECKPOINT_PATH, map_location=map_location))
    
        optimizer.zero_grad()
        outputs = ddp_model(torch.randn(20, 10))
        labels = torch.randn(20, 5).to(rank)
        loss_fn = nn.MSELoss()
        loss_fn(outputs, labels).backward()
        optimizer.step()
    
        # Not necessary to use a dist.barrier() to guard the file deletion below
        # as the AllReduce ops in the backward pass of DDP already served as
        # a synchronization.
    
        if rank == 0:
            os.remove(CHECKPOINT_PATH)
    
        cleanup()
    

      

    快去成为你想要的样子!
  • 相关阅读:
    使用脚本改变树控件的行为 (转)点文本 收..
    (面包屑)SiteMapPath控件简化Web网站导航 (转)
    Web.config详解(转)
    SiteMap(站点地图)示例(转)
    url传递中文的解决方案总结(转)
    INI文件编程,WINAPI函数WritePrivateProfileString,GetPrivateProfileString(转帖)
    hdu 2708
    hdu 1709
    hdu 1045
    hdu 2714
  • 原文地址:https://www.cnblogs.com/jiangkejie/p/14603479.html
Copyright © 2011-2022 走看看