zoukankan      html  css  js  c++  java
  • pytorch 分布式训练

    pytorch 分布式训练

    参考文献

    https://pytorch.org/tutorials/intermediate/dist_tuto.html
    代码
    https://github.com/overfitover/pytorch-distributed
    欢迎来star me.

    demo

    import os
    import torch
    import torch.distributed as dist
    from torch.multiprocessing import Process
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        pass
    
    def init_processes(rank, size, fn, backend='tcp'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '29500'
        dist.init_process_group(backend, rank=rank, world_size=size)
        fn(rank, size)
    
    
    if __name__ == "__main__":
        size = 2
        processes = []
        for rank in range(size):
            p = Process(target=init_processes, args=(rank, size, run))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    

    第一种实现方式:

    #!/usr/bin/env python
    import os
    import torch
    import torch as th
    import torch.distributed as dist
    from torch.multiprocessing import Process
    import torch.backends.cudnn as cudnn
    from torch.autograd import Variable
    import time
    
    def allreduce(send, recv):
        """ Implementation of a ring-reduce. """
        rank = dist.get_rank()
        size = dist.get_world_size()
        send_buff = th.zeros(send.size())
        recv_buff = th.zeros(send.size())
        accum = th.zeros(send.size())
        accum[:] = send[:]
        # th.cuda.synchronize()
    
        left = ((rank - 1) + size) % size
        right = (rank + 1) % size
    
        for i in range(size - 1):
            if i % 2 == 0:
                # Send send_buff
                send_req = dist.isend(send_buff, right)
                dist.recv(recv_buff, left)
                accum[:] += recv[:]
            else:
                # Send recv_buff
                send_req = dist.isend(recv_buff, right)
                dist.recv(send_buff, left)
                accum[:] += send[:]
            send_req.wait()
        # th.cuda.synchronize()
        recv[:] = accum[:]
    
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    
    
    def init_processes(rank, size, backend='gloo'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '192.168.0.12'
        os.environ['MASTER_PORT'] = '29555'
        dist.init_process_group(backend, rank=rank, world_size=size)
        # print("MM")
    
        ## 实现代码
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
    
    def main():
    
        size = 2
    
        processes=[]
        for i in range(size):
            p = Process(target=init_processes, args=(i, size))
            p.start()
            processes.append(p)
            # init_processes(i, size)
    
        for p in processes:
            p.join()
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print("耗时:", end_time-start_time)
    
    

    不使用分布式算法对比:

    import torch
    from torch.autograd import Variable
    import time
    start_time=time.time()
    # train data
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
        # our model
    model = Model().cuda()
    
    criterion = torch.nn.MSELoss(size_average=False)
    # Defined loss function
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    # Defined optimizer
    #  Training: forward, loss, backward, step
    #  Training loop
    for epoch in range(500000):
        #  Forward pass
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()
    
    # After training
    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var.cuda()).data[0][0])
    end_time=time.time()
    print("耗时: ", end_time-start_time)
    
    
  • 相关阅读:
    C# 对Excel文档打印时的页面设置
    C# 对Excel 单元格格式, 及行高、 列宽、 单元格边框线、 冻结设置
    object does not contain a definition for get_range
    shell变一些小技巧
    Codeforces Round #277.5 (Div. 2)A——SwapSort
    ActiveMQ与RabbitMQ采用camel综合
    SAP ABAP规划 使用LOOP READ TABLE该方法取代双LOOP内部表的方法
    Object-c中间initialize 与 辛格尔顿
    队列——阵列实现
    左右GNU Linux企业加密文件系统 eCryptfs简介
  • 原文地址:https://www.cnblogs.com/o-v-o/p/9975356.html
Copyright © 2011-2022 走看看