pytorch 分布式训练
参考文献
https://pytorch.org/tutorials/intermediate/dist_tuto.html
代码
https://github.com/overfitover/pytorch-distributed
欢迎来star me.
demo
import os
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
def run(rank, size):
""" Distributed function to be implemented later. """
pass
def init_processes(rank, size, fn, backend='tcp'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '127.0.0.1'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=size)
fn(rank, size)
if __name__ == "__main__":
size = 2
processes = []
for rank in range(size):
p = Process(target=init_processes, args=(rank, size, run))
p.start()
processes.append(p)
for p in processes:
p.join()
第一种实现方式:
#!/usr/bin/env python
import os
import torch
import torch as th
import torch.distributed as dist
from torch.multiprocessing import Process
import torch.backends.cudnn as cudnn
from torch.autograd import Variable
import time
def allreduce(send, recv):
""" Implementation of a ring-reduce. """
rank = dist.get_rank()
size = dist.get_world_size()
send_buff = th.zeros(send.size())
recv_buff = th.zeros(send.size())
accum = th.zeros(send.size())
accum[:] = send[:]
# th.cuda.synchronize()
left = ((rank - 1) + size) % size
right = (rank + 1) % size
for i in range(size - 1):
if i % 2 == 0:
# Send send_buff
send_req = dist.isend(send_buff, right)
dist.recv(recv_buff, left)
accum[:] += recv[:]
else:
# Send recv_buff
send_req = dist.isend(recv_buff, right)
dist.recv(send_buff, left)
accum[:] += send[:]
send_req.wait()
# th.cuda.synchronize()
recv[:] = accum[:]
def run(rank, size):
""" Distributed function to be implemented later. """
model = Model()
model = torch.nn.parallel.DistributedDataParallel(model.cuda())
criterion = torch.nn.MSELoss(size_average=False).cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
cudnn.benchmark = True
x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
for epoch in range(500000):
y_pred = model(x_data.cuda())
# Compute loss
loss = criterion(y_pred.cuda(), y_data.cuda())
print(epoch, loss.data[0])
# Zero gradients
optimizer.zero_grad()
# perform backward pass
loss.backward()
# update weights
optimizer.step()
hour_var = Variable(torch.Tensor([[7.0]]))
print("predict (after training)", 7, model.forward(hour_var).data[0][0])
def init_processes(rank, size, backend='gloo'):
""" Initialize the distributed environment. """
os.environ['MASTER_ADDR'] = '192.168.0.12'
os.environ['MASTER_PORT'] = '29555'
dist.init_process_group(backend, rank=rank, world_size=size)
# print("MM")
## 实现代码
model = Model()
model = torch.nn.parallel.DistributedDataParallel(model.cuda())
criterion = torch.nn.MSELoss(size_average=False).cuda()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
cudnn.benchmark = True
x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
for epoch in range(500000):
y_pred = model(x_data.cuda())
# Compute loss
loss = criterion(y_pred.cuda(), y_data.cuda())
print(epoch, loss.data[0])
# Zero gradients
optimizer.zero_grad()
# perform backward pass
loss.backward()
# update weights
optimizer.step()
hour_var = Variable(torch.Tensor([[7.0]]))
print("predict (after training)", 7, model.forward(hour_var).data[0][0])
class Model(torch.nn.Module):
def __init__(self):
super(Model, self).__init__()
self.linear = torch.nn.Linear(1, 1)
# One in and one out
def forward(self, x):
y_pred = self.linear(x)
return y_pred
def main():
size = 2
processes=[]
for i in range(size):
p = Process(target=init_processes, args=(i, size))
p.start()
processes.append(p)
# init_processes(i, size)
for p in processes:
p.join()
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
print("耗时:", end_time-start_time)
不使用分布式算法对比:
import torch
from torch.autograd import Variable
import time
start_time=time.time()
# train data
x_data = Variable(torch.Tensor([[1.0], [2.0], [3.0]]))
y_data = Variable(torch.Tensor([[2.0], [4.0], [6.0]]))
class Model(torch.nn.Module):
def __init__(self):
super(Model, self).__init__()
self.linear = torch.nn.Linear(1, 1)
# One in and one out
def forward(self, x):
y_pred = self.linear(x)
return y_pred
# our model
model = Model().cuda()
criterion = torch.nn.MSELoss(size_average=False)
# Defined loss function
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
# Defined optimizer
# Training: forward, loss, backward, step
# Training loop
for epoch in range(500000):
# Forward pass
y_pred = model(x_data.cuda())
# Compute loss
loss = criterion(y_pred.cuda(), y_data.cuda())
print(epoch, loss.data[0])
# Zero gradients
optimizer.zero_grad()
# perform backward pass
loss.backward()
# update weights
optimizer.step()
# After training
hour_var = Variable(torch.Tensor([[7.0]]))
print("predict (after training)", 7, model.forward(hour_var.cuda()).data[0][0])
end_time=time.time()
print("耗时: ", end_time-start_time)