zoukankan      html  css  js  c++  java
  • 使用horovod构建分布式深度学习框架

      最近一直在尝试着分布式深度学习的架构,主要的原因一方面是几台机子全是1060卡,利用深度网络在较大数据样本上训练的效率极其低下,所以尝试着将几台机子做成分布式,看看能否提高训练效率;第二方面是有人习惯使用tensorflow,有人习惯使用keras,也有人喜欢使用pytorch等,虽然这些框架各自都有分布式的实现,但总的来说不能统一到一个平台上,造成使用上有不好的体验。在查资料的时候正好看到了horovod这个框架,它是集成了多个深度框架的一个统一平台,搭建和使用起来都比较方便,所以打算尝试基于horovod搭建一个分布式环境,供后期使用。可惜没有使用docker去部署,其中配置的过程中遇到不少坑,还好都解决了。

      一.分布式中的ps架构和ring-allreduce架构

      1.ps架构

        在Parameter server架构(PS架构)中,集群中的节点被分为parameter serverworker两类。其中parameter server收集、更新和存放模型的参数,而worker负责计算模型参  数的梯度。在每个迭代过程ps聚合所有worker传回的梯度,然后更新参数,并将新的参数广播给worker缺点是集群通信不均衡问题,类似木桶效应。

      ps架构同步随机梯度下降SGD如下图:

      

       

      2.ring-allreduce架构

        在Ring-allreduce架构中,各个设备都是worker,并且形成一个环,如上图所示,没有中心节点来聚合所有worker计算的梯度。在一个迭代过程,每个worker完成自己的mini-  batch训练,计算出梯度,并将梯度传递给环中的下一个worker,同时它也接收从上一个worker的梯度。对于一个包含Nworker的环,各个worker需要收到其它N-1worker的梯度  后就可以更新模型参数。其实这个过程需要两个部分:scatter-reduceallgather,百度开发了自己的allreduce框架,并将其用在了深度学习的分布式训练中。如下图。

      相比PS架构,Ring-allreduce架构有如下优点:

      a. 带宽优化,因为集群中每个节点的带宽都被充分利用。而PS架构,所有的worker计算节点都需要聚合给parameter server,这会造成一种通信瓶颈。parameter server的带宽瓶颈  会影响整个系统性能,随着worker数量的增加,其加速比会迅速的恶化。  

       b.此外,在深度学习训练过程中,计算梯度采用BP算法,其特点是后面层的梯度先被计算,而前面层的梯度慢于前面层,Ring-allreduce架构可以充分利用这个特点,在前面层梯  度计算的同时进行后面层梯度的传递,从而进一步减少训练时间。在百度的实验中,他们发现训练速度基本上线性正比于GPUs数目(worker数)。

       

      

      二.准备工作:

           1.一开始是利用虚拟机虚拟了三台机子进行了cpu版的成功测试,可惜GPU无法用在虚拟环境上。

              2.后来弄了三两台真实的物理机,将原windows都改为linux系统。

            a.安装nvidia驱动

            b.安装cuda

            c.安装cudnn

            d.安装pytorch和tf

            e.两台机子ssh相互免密码登录

            f.nfs共享文件系统(将脚本和样本放在这里),共它机器挂载在此目录下

            g.openmpi的安装配置

            h.nccl的安装配置

            i.horovod的安装配置

     (我这里使用的版本是:ubuntu16.0.4、nvidia384.130、cuda9.0、cudnn7.6.4、pytorch1.1.0、tf1.12、nccl2(nccl2.x版本为多机多卡)、openmpi4.0.0...)

      注意:在安装这些工具的时候,需要注意各自匹配的版本以及环境的配置,否则安装不成功,需要耐心。  

      环境在配置好后,进行了测试。有个问题是在指定命令运行的时候,多机多gpu会出现bash: orted: command not found,单机可以运行。这里解决的办法有2种:第一      是在安装 openmpi时指定--prefix=openmpi安装目录;第二种是在运行时指定命令--prefix openmpi安装目录。

     

      三. 利用mnist数字识别进行分布式测试,将测试脚本放在nfs共享目录里,mnist会自动下载训练样本

        

    import argparse
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torch.utils.tensorboard import SummaryWriter
    from torchvision import datasets,transforms
    import torch.utils.data.distributed
    import horovod.torch as hvd
    
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                        help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=42, metavar='S',
                        help='random seed (default: 42)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--fp16-allreduce', action='store_true', default=False,
                        help='use fp16 compression during allreduce')
    
    args = parser.parse_args()
    args.cuda = not args.no_cuda and torch.cuda.is_available()
    
    # Horovod: initialize library.
    hvd.init()
    torch.manual_seed(args.seed)
    
    if args.cuda:
        # horovod: pin GPU to local rank.
        torch.cuda.set_device(hvd.local_rank())
        torch.cuda.manual_seed(args.seed)
    #用来记录训练的loss情况,利用tensorboard可视化loss
    if hvd.rank() == 0:
        writer = SummaryWriter('/home/user/share/log/mnist_test')
    
    # horovod: limit # of cpu threads to be used per worker.
    torch.set_num_threads(1)
    kwargs = {'num_workers' : 1, 'pin_memory' : True} if args.cuda else {}
    train_dataset = datasets.MNIST('/home/user/share/data/', train=True, download=True,
                       transform=transforms.Compose([
                           transforms.ToTensor(),
                           transforms.Normalize((0.1307,), (0.3081,))
                       ]))
    # Horovod: use DistributedSampler to partition the training data.
    train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, 
                                                                    num_replicas=hvd.size(),
                                                                    rank=hvd.rank())
    train_loader = torch.utils.data.DataLoader(train_dataset,
                                              batch_size=args.batch_size,
                                              sampler=train_sampler, **kwargs)
    
    test_dataset = datasets.MNIST('/home/user/share/data/', train=False, download=True,transform=transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ]))
    # Horovod: use DistributedSampler to partition the test data.
    test_sampler = torch.utils.data.distributed.DistributedSampler(test_dataset,
                                                                    num_replicas=hvd.size(),
                                                                    rank=hvd.rank())
    test_loader = torch.utils.data.DataLoader(test_dataset,
                                             batch_size=args.test_batch_size,
                                             sampler=test_sampler, **kwargs)
    
    class Net(nn.Module):
        def __init__(self):
            super(Net,self).__init__()
            self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
            self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
            self.conv2_drop = nn.Dropout2d()
            self.fc1 = nn.Linear(320, 50)
            self.fc2 = nn.Linear(50, 10)
        
        def forward(self,x):
            x = F.relu(F.max_pool2d(self.conv1(x), 2))
            x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)),2))
            x = x.view(-1, 320)
            x = F.relu(self.fc1(x))
            x = F.dropout(x, training=self.training)
            x = self.fc2(x)
            return F.log_softmax(x)
        
    model = Net()
    
    if args.cuda:
        #move model to GPU
        model.cuda()
    
    # Horovod: scale learning rate by the number of GPUs.由于是多机运行,batch较大,所以这里将学习率增大
    optimizer = optim.SGD(model.parameters(),
                         lr=args.lr*hvd.size(),
                         momentum=args.momentum)
    
    # Horovod: 广播初始化参数和优化器状态
    hvd.broadcast_parameters(model.state_dict(), root_rank=0)
    hvd.broadcast_optimizer_state(optimizer, root_rank=0)
    
    # Horovod: (optional) compression algorithm.
    compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
    
    # Horovod: wrap optimizer with DistributedOptimizer
    optimizer = hvd.DistributedOptimizer(optimizer,
                                        named_parameters=model.named_parameters(),
                                        compression=compression)
    
    def train(epoch):
        model.train()
        # Horovod: set epoch to sampler for shuffling.
        train_sampler.set_epoch(epoch)
        for batch_idx, (data, target) in enumerate(train_loader):
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if (batch_idx % args.log_interval == 0) and (hvd.rank() == 0):
                # Horovod: use train_sampler to determine the number of examples in
                # this worker's partition.
                print('Train Epoch: {} [{}/{} ({:.0f}%)]	Loss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_sampler),
                    100. * batch_idx / len(train_loader), loss.item()))
                niter = epoch * len(train_loader) + batch_idx
                writer.add_scalar('Train/Loss',loss.item(),niter)
    
    
    def metric_average(val, name):
        tensor = torch.tensor(val)
        avg_tensor = hvd.allreduce(tensor, name=name)
        return avg_tensor.item()
    
    
    def test():
        model.eval()
        test_loss = 0.
        test_accuracy = 0.
        for data, target in test_loader:
            if args.cuda:
                data, target = data.cuda(), target.cuda()
            output = model(data)
            # sum up batch loss
            test_loss += F.nll_loss(output, target, size_average=False).item()
            # get the index of the max log-probability
            pred = output.data.max(1, keepdim=True)[1]
            test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()
    
        # Horovod: use test_sampler to determine the number of examples in
        # this worker's partition.
        test_loss /= len(test_sampler)
        test_accuracy /= len(test_sampler)
    
        # Horovod: average metric values across workers.
        test_loss = metric_average(test_loss, 'avg_loss')
        test_accuracy = metric_average(test_accuracy, 'avg_accuracy')
    
        # Horovod: 在first rank上打印输出
        if hvd.rank() == 0:
            print('
    Test set: Average loss: {:.4f}, Accuracy: {:.2f}%
    '.format(
                test_loss, 100. * test_accuracy))
    
    
    for epoch in range(1, args.epochs + 1):
        train(epoch)
        test()
        
    if hvd.rank() == 0:
        print('hvd.rank:{}'.format(hvd.rank()))
        save_path = '/home/user/model/mnist_model.pkl'
        # 将模型保存在rank为0的机子上
        torch.save(model.state_dict(), save_path)
        print('model save success! path = {}'.format(save_path))

    在linux中执行以上脚本的命令是:

        nohup mpirun -np 2 -H work_1:1,work_2:1 -bind-to none -map-by slot -x NCCL_DEBUG=INFO -x LD_LIBRARY_PATH -x PATH -mca pml ob1 -mca btl ^openib -prefix /home/user/openmpi python -u /home/user/share/torch/test_mnist.py  > /home/user/share/log/test_mnist.log 2>&1 &

    其中两台机器为work_1和work_2,分别使用一个GPU;脚本名为test_mnist.py;test_mnist.log为日志。

    参考:

    1.https://blog.csdn.net/zwqjoy/article/details/81013527

    2.https://www.cnblogs.com/goya/p/11790387.html

  • 相关阅读:
    数据结构(动态树):UOJ 207 共价大爷游长沙
    字符串(后缀自动机):NOI 2016 优秀的拆分
    数学(矩阵乘法):HDU 4565 So Easy!
    数据结构(线段树):NOI 2016 区间
    动态规划:NOI2013 快餐店
    图论(网络流):UVa 1659
    数学(矩阵乘法,随机化算法):POJ 3318 Matrix Multiplication
    数学(莫比乌斯反演):YY的GCD
    数学(莫比乌斯反演):HAOI 2011 问题B
    字符串(后缀自动机):USACO Dec10 恐吓信
  • 原文地址:https://www.cnblogs.com/little-horse/p/12026145.html
Copyright © 2011-2022 走看看