zoukankan      html  css  js  c++  java
  • pytorch 分布式训练

    pytorch 分布式训练

    参考文献

    https://pytorch.org/tutorials/intermediate/dist_tuto.html
    代码
    https://github.com/overfitover/pytorch-distributed
    欢迎来star me.

    demo

    import os
    import torch
    import torch.distributed as dist
    from torch.multiprocessing import Process
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        pass
    
    def init_processes(rank, size, fn, backend='tcp'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '29500'
        dist.init_process_group(backend, rank=rank, world_size=size)
        fn(rank, size)
    
    
    if __name__ == "__main__":
        size = 2
        processes = []
        for rank in range(size):
            p = Process(target=init_processes, args=(rank, size, run))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    

    第一种实现方式:

    #!/usr/bin/env python
    import os
    import torch
    import torch as th
    import torch.distributed as dist
    from torch.multiprocessing import Process
    import torch.backends.cudnn as cudnn
    from torch.autograd import Variable
    import time
    
    def allreduce(send, recv):
        """ Implementation of a ring-reduce. """
        rank = dist.get_rank()
        size = dist.get_world_size()
        send_buff = th.zeros(send.size())
        recv_buff = th.zeros(send.size())
        accum = th.zeros(send.size())
        accum[:] = send[:]
        # th.cuda.synchronize()
    
        left = ((rank - 1) + size) % size
        right = (rank + 1) % size
    
        for i in range(size - 1):
            if i % 2 == 0:
                # Send send_buff
                send_req = dist.isend(send_buff, right)
                dist.recv(recv_buff, left)
                accum[:] += recv[:]
            else:
                # Send recv_buff
                send_req = dist.isend(recv_buff, right)
                dist.recv(send_buff, left)
                accum[:] += send[:]
            send_req.wait()
        # th.cuda.synchronize()
        recv[:] = accum[:]
    
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    
    
    def init_processes(rank, size, backend='gloo'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '192.168.0.12'
        os.environ['MASTER_PORT'] = '29555'
        dist.init_process_group(backend, rank=rank, world_size=size)
        # print("MM")
    
        ## 实现代码
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
    
    def main():
    
        size = 2
    
        processes=[]
        for i in range(size):
            p = Process(target=init_processes, args=(i, size))
            p.start()
            processes.append(p)
            # init_processes(i, size)
    
        for p in processes:
            p.join()
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print("耗时:", end_time-start_time)
    
    

    不使用分布式算法对比:

    import torch
    from torch.autograd import Variable
    import time
    start_time=time.time()
    # train data
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
        # our model
    model = Model().cuda()
    
    criterion = torch.nn.MSELoss(size_average=False)
    # Defined loss function
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    # Defined optimizer
    #  Training: forward, loss, backward, step
    #  Training loop
    for epoch in range(500000):
        #  Forward pass
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()
    
    # After training
    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var.cuda()).data[0][0])
    end_time=time.time()
    print("耗时: ", end_time-start_time)
    
    
  • 相关阅读:
    RN-Android构建失败:Caused by: org.gradle.api.ProjectConfigurationException: A problem occurred configuring root project 'AwesomeProject'.
    Android更新包下载成功后不出现安装界面
    真机调试: The application could not be installed: INSTALL_FAILED_TEST_ONLY
    react native 屏幕尺寸转换
    Android Studio生成签名文件,自动签名,以及获取SHA1和MD5值
    React Native安卓真机调试
    git提交代码报错Permission denied, please try again
    The sandbox is not in sync with the Podfile.lock. Run 'pod install' or update your CocoaPods installation.
    命令行设置快捷命令
    Linux 常用指令
  • 原文地址:https://www.cnblogs.com/o-v-o/p/9975356.html
Copyright © 2011-2022 走看看