zoukankan      html  css  js  c++  java
  • pytorch 分布式训练

    pytorch 分布式训练

    参考文献

    https://pytorch.org/tutorials/intermediate/dist_tuto.html
    代码
    https://github.com/overfitover/pytorch-distributed
    欢迎来star me.

    demo

    import os
    import torch
    import torch.distributed as dist
    from torch.multiprocessing import Process
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        pass
    
    def init_processes(rank, size, fn, backend='tcp'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '127.0.0.1'
        os.environ['MASTER_PORT'] = '29500'
        dist.init_process_group(backend, rank=rank, world_size=size)
        fn(rank, size)
    
    
    if __name__ == "__main__":
        size = 2
        processes = []
        for rank in range(size):
            p = Process(target=init_processes, args=(rank, size, run))
            p.start()
            processes.append(p)
    
        for p in processes:
            p.join()
    

    第一种实现方式:

    #!/usr/bin/env python
    import os
    import torch
    import torch as th
    import torch.distributed as dist
    from torch.multiprocessing import Process
    import torch.backends.cudnn as cudnn
    from torch.autograd import Variable
    import time
    
    def allreduce(send, recv):
        """ Implementation of a ring-reduce. """
        rank = dist.get_rank()
        size = dist.get_world_size()
        send_buff = th.zeros(send.size())
        recv_buff = th.zeros(send.size())
        accum = th.zeros(send.size())
        accum[:] = send[:]
        # th.cuda.synchronize()
    
        left = ((rank - 1) + size) % size
        right = (rank + 1) % size
    
        for i in range(size - 1):
            if i % 2 == 0:
                # Send send_buff
                send_req = dist.isend(send_buff, right)
                dist.recv(recv_buff, left)
                accum[:] += recv[:]
            else:
                # Send recv_buff
                send_req = dist.isend(recv_buff, right)
                dist.recv(send_buff, left)
                accum[:] += send[:]
            send_req.wait()
        # th.cuda.synchronize()
        recv[:] = accum[:]
    
    
    def run(rank, size):
        """ Distributed function to be implemented later. """
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    
    
    def init_processes(rank, size, backend='gloo'):
        """ Initialize the distributed environment. """
        os.environ['MASTER_ADDR'] = '192.168.0.12'
        os.environ['MASTER_PORT'] = '29555'
        dist.init_process_group(backend, rank=rank, world_size=size)
        # print("MM")
    
        ## 实现代码
        model = Model()
        model = torch.nn.parallel.DistributedDataParallel(model.cuda())
        criterion = torch.nn.MSELoss(size_average=False).cuda()
        optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    
        cudnn.benchmark = True
        x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
        y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
        for epoch in range(500000):
            y_pred = model(x_data.cuda())
            # Compute loss
            loss = criterion(y_pred.cuda(), y_data.cuda())
            print(epoch, loss.data[0])
            # Zero gradients
            optimizer.zero_grad()
            # perform backward pass
            loss.backward()
            # update weights
            optimizer.step()
    
        hour_var = Variable(torch.Tensor([[7.0]]))
        print("predict (after training)", 7, model.forward(hour_var).data[0][0])
    
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
    
    def main():
    
        size = 2
    
        processes=[]
        for i in range(size):
            p = Process(target=init_processes, args=(i, size))
            p.start()
            processes.append(p)
            # init_processes(i, size)
    
        for p in processes:
            p.join()
    
    if __name__ == "__main__":
        start_time = time.time()
        main()
        end_time = time.time()
        print("耗时:", end_time-start_time)
    
    

    不使用分布式算法对比:

    import torch
    from torch.autograd import Variable
    import time
    start_time=time.time()
    # train data
    x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
    y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
    
    class Model(torch.nn.Module):
        def __init__(self):
            super(Model, self).__init__()
            self.linear = torch.nn.Linear(1, 1)
            # One in and one out
        def forward(self, x):
            y_pred = self.linear(x)
            return y_pred
    
        # our model
    model = Model().cuda()
    
    criterion = torch.nn.MSELoss(size_average=False)
    # Defined loss function
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
    # Defined optimizer
    #  Training: forward, loss, backward, step
    #  Training loop
    for epoch in range(500000):
        #  Forward pass
        y_pred = model(x_data.cuda())
        # Compute loss
        loss = criterion(y_pred.cuda(), y_data.cuda())
        print(epoch, loss.data[0])
        # Zero gradients
        optimizer.zero_grad()
        # perform backward pass
        loss.backward()
        # update weights
        optimizer.step()
    
    # After training
    hour_var = Variable(torch.Tensor([[7.0]]))
    print("predict (after training)", 7, model.forward(hour_var.cuda()).data[0][0])
    end_time=time.time()
    print("耗时: ", end_time-start_time)
    
    
  • 相关阅读:
    我从0开始开发了一个LDAP服务。
    C#开发中常用的小功能
    webapi swagger 报错 路由集合中已存在名为“swagger_docsswagger/docs/{apiVersion}”的路由。路由名称必须唯一
    h5 web vlc 播放rtsp流
    Docker的基础概念与在window10下的安装
    .Net Core JWT 动态设置接口与权限
    .Net Core官方的 JWT 授权验证
    IdentityServer4中文文档
    中介者模式及在NetCore中的使用MediatR来实现
    .Net Core 使用 FluentValidation
  • 原文地址:https://www.cnblogs.com/o-v-o/p/9975356.html
Copyright © 2011-2022 走看看