zoukankan      html  css  js  c++  java
  • pytorch 分布式训练

    pytorch 分布式训练

    参考文献

    https://pytorch.org/tutorials/intermediate/dist_tuto.html
    代码
    https://github.com/overfitover/pytorch-distributed
    欢迎来star me.

    demo

    import os
    import torch
    import torch.distributed as dist
    from torch.multiprocessing import Process
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        pass
    
    def init_processes(rank, size, fn, backend='tcp'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '29500'
        dist.init_process_group(backend, rank=rank, world_size=size)
        fn(rank, size)
    
    
    if __name__ == "__main__":
        size = 2
        processes = []
        for rank in range(size):
            p = Process(target=init_processes, args=(rank, size, run))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    

    第一种实现方式:

    #!/usr/bin/env python
    import os
    import torch
    import torch as th
    import torch.distributed as dist
    from torch.multiprocessing import Process
    import torch.backends.cudnn as cudnn
    from torch.autograd import Variable
    import time
    
    def allreduce(send, recv):
        """ Implementation of a ring-reduce. """
        rank = dist.get_rank()
        size = dist.get_world_size()
        send_buff = th.zeros(send.size())
        recv_buff = th.zeros(send.size())
        accum = th.zeros(send.size())
        accum[:] = send[:]
        # th.cuda.synchronize()
    
        left = ((rank - 1) + size) % size
        right = (rank + 1) % size
    
        for i in range(size - 1):
            if i % 2 == 0:
                # Send send_buff
                send_req = dist.isend(send_buff, right)
                dist.recv(recv_buff, left)
                accum[:] += recv[:]
            else:
                # Send recv_buff
                send_req = dist.isend(recv_buff, right)
                dist.recv(send_buff, left)
                accum[:] += send[:]
            send_req.wait()
        # th.cuda.synchronize()
        recv[:] = accum[:]
    
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    
    
    def init_processes(rank, size, backend='gloo'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '192.168.0.12'
        os.environ['MASTER_PORT'] = '29555'
        dist.init_process_group(backend, rank=rank, world_size=size)
        # print("MM")
    
        ## 实现代码
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
    
    def main():
    
        size = 2
    
        processes=[]
        for i in range(size):
            p = Process(target=init_processes, args=(i, size))
            p.start()
            processes.append(p)
            # init_processes(i, size)
    
        for p in processes:
            p.join()
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print("耗时:", end_time-start_time)
    
    

    不使用分布式算法对比:

    import torch
    from torch.autograd import Variable
    import time
    start_time=time.time()
    # train data
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
        # our model
    model = Model().cuda()
    
    criterion = torch.nn.MSELoss(size_average=False)
    # Defined loss function
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    # Defined optimizer
    #  Training: forward, loss, backward, step
    #  Training loop
    for epoch in range(500000):
        #  Forward pass
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()
    
    # After training
    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var.cuda()).data[0][0])
    end_time=time.time()
    print("耗时: ", end_time-start_time)
    
    
  • 相关阅读:
    react 在IE9下input标签使用e.target.value取值失败
    mingw-w64 about
    Cygwin .a 转为 .lib .dll
    windows terminal
    ssh key authentication
    sshd_config 2
    sshd_config
    bash sudo redirect multiple lines to file
    计算几何
    vs cli
  • 原文地址:https://www.cnblogs.com/o-v-o/p/9975356.html
Copyright © 2011-2022 走看看