zoukankan      html  css  js  c++  java
  • pytorch 分布式训练

    pytorch 分布式训练

    参考文献

    https://pytorch.org/tutorials/intermediate/dist_tuto.html
    代码
    https://github.com/overfitover/pytorch-distributed
    欢迎来star me.

    demo

    import os
    import torch
    import torch.distributed as dist
    from torch.multiprocessing import Process
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        pass
    
    def init_processes(rank, size, fn, backend='tcp'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '29500'
        dist.init_process_group(backend, rank=rank, world_size=size)
        fn(rank, size)
    
    
    if __name__ == "__main__":
        size = 2
        processes = []
        for rank in range(size):
            p = Process(target=init_processes, args=(rank, size, run))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    

    第一种实现方式:

    #!/usr/bin/env python
    import os
    import torch
    import torch as th
    import torch.distributed as dist
    from torch.multiprocessing import Process
    import torch.backends.cudnn as cudnn
    from torch.autograd import Variable
    import time
    
    def allreduce(send, recv):
        """ Implementation of a ring-reduce. """
        rank = dist.get_rank()
        size = dist.get_world_size()
        send_buff = th.zeros(send.size())
        recv_buff = th.zeros(send.size())
        accum = th.zeros(send.size())
        accum[:] = send[:]
        # th.cuda.synchronize()
    
        left = ((rank - 1) + size) % size
        right = (rank + 1) % size
    
        for i in range(size - 1):
            if i % 2 == 0:
                # Send send_buff
                send_req = dist.isend(send_buff, right)
                dist.recv(recv_buff, left)
                accum[:] += recv[:]
            else:
                # Send recv_buff
                send_req = dist.isend(recv_buff, right)
                dist.recv(send_buff, left)
                accum[:] += send[:]
            send_req.wait()
        # th.cuda.synchronize()
        recv[:] = accum[:]
    
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    
    
    def init_processes(rank, size, backend='gloo'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '192.168.0.12'
        os.environ['MASTER_PORT'] = '29555'
        dist.init_process_group(backend, rank=rank, world_size=size)
        # print("MM")
    
        ## 实现代码
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
    
    def main():
    
        size = 2
    
        processes=[]
        for i in range(size):
            p = Process(target=init_processes, args=(i, size))
            p.start()
            processes.append(p)
            # init_processes(i, size)
    
        for p in processes:
            p.join()
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print("耗时:", end_time-start_time)
    
    

    不使用分布式算法对比:

    import torch
    from torch.autograd import Variable
    import time
    start_time=time.time()
    # train data
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
        # our model
    model = Model().cuda()
    
    criterion = torch.nn.MSELoss(size_average=False)
    # Defined loss function
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    # Defined optimizer
    #  Training: forward, loss, backward, step
    #  Training loop
    for epoch in range(500000):
        #  Forward pass
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()
    
    # After training
    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var.cuda()).data[0][0])
    end_time=time.time()
    print("耗时: ", end_time-start_time)
    
    
  • 相关阅读:
    vue响应式原理的实现
    手写 Promise
    模拟ATM机功能(C语言)
    打印空心菱形
    假设一对耗子每个月都可以生一对小耗子。小耗子生长3个月后,从第4个月开始也就能够生小耗子。问:假设所有的耗子都不死的话,那么20个月后一共有多少只耗子?
    判断一年中的第几天
    依次将10个数输入,要求打印出其中最大的数
    九九乘法表
    判断是否为闰年
    从键盘上接收一个字母,判断是否是大写字母,如果是则转换成小写字母输出 ,否则直接输出。
  • 原文地址:https://www.cnblogs.com/o-v-o/p/9975356.html
Copyright © 2011-2022 走看看