import torch
import horovod.torch as hvd
# Initialize Horovod 初始化horovod
hvd.init()
# Pin GPU to be used to process local rank (one GPU per process) 分配到每个gpu上
torch.cuda.set_device(hvd.local_rank())
# Define dataset... 定义dataset
train_dataset = ...
# Partition dataset among workers using DistributedSampler 对dataset的采样器进行调整,使用torch.utils.data.distributed.DistributedSampler
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=..., sampler=train_sampler)
# Build model...
model = ...
model.cuda()
optimizer = optim.SGD(model.parameters())
# Add Horovod Distributed Optimizer 使用Horovod的分布式优化器函数包裹在原先optimizer上
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=model.named_parameters())
# Broadcast parameters from rank 0 to all other processes. 参数广播到每个gpu上
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
for epoch in range(100):
for batch_idx, (data, target) in enumerate(train_loader):
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{}] Loss: {}'.format(
epoch, batch_idx * len(data), len(train_sampler), loss.item()))
1 from __future__ import print_function
2
3 import torch
4 import argparse
5 import torch.backends.cudnn as cudnn
6 import torch.nn.functional as F
7 import torch.optim as optim
8 import torch.utils.data.distributed
9 from torchvision import datasets, transforms, models
10 import horovod.torch as hvd
11 import os
12 import math
13 from tqdm import tqdm
14 from distutils.version import LooseVersion
15
16 # Training settings
17 parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
18 formatter_class=argparse.ArgumentDefaultsHelpFormatter)
19 parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
20 help='path to training data')
21 parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
22 help='path to validation data')
23 parser.add_argument('--log-dir', default='./logs',
24 help='tensorboard log directory')
25 parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
26 help='checkpoint file format')
27 parser.add_argument('--fp16-allreduce', action='store_true', default=False,
28 help='use fp16 compression during allreduce')
29 parser.add_argument('--batches-per-allreduce', type=int, default=1,
30 help='number of batches processed locally before '
31 'executing allreduce across workers; it multiplies '
32 'total batch size.')
33 parser.add_argument('--use-adasum', action='store_true', default=False,
34 help='use adasum algorithm to do reduction')
35
36 # Default settings from https://arxiv.org/abs/1706.02677.
37 parser.add_argument('--batch-size', type=int, default=32,
38 help='input batch size for training')
39 parser.add_argument('--val-batch-size', type=int, default=32,
40 help='input batch size for validation')
41 parser.add_argument('--epochs', type=int, default=90,
42 help='number of epochs to train')
43 parser.add_argument('--base-lr', type=float, default=0.0125,
44 help='learning rate for a single GPU')
45 parser.add_argument('--warmup-epochs', type=float, default=5,
46 help='number of warmup epochs')
47 parser.add_argument('--momentum', type=float, default=0.9,
48 help='SGD momentum')
49 parser.add_argument('--wd', type=float, default=0.00005,
50 help='weight decay')
51
52 parser.add_argument('--no-cuda', action='store_true', default=False,
53 help='disables CUDA training')
54 parser.add_argument('--seed', type=int, default=42,
55 help='random seed')
56
57 args = parser.parse_args()
58 args.cuda = not args.no_cuda and torch.cuda.is_available()
59
60 allreduce_batch_size = args.batch_size * args.batches_per_allreduce
61
62 hvd.init()
63 torch.manual_seed(args.seed)
64
65 if args.cuda:
66 # Horovod: pin GPU to local rank.
67 torch.cuda.set_device(hvd.local_rank())
68 torch.cuda.manual_seed(args.seed)
69
70 cudnn.benchmark = True
71
72 # If set > 0, will resume training from a given checkpoint.
73 resume_from_epoch = 0
74 for try_epoch in range(args.epochs, 0, -1):
75 if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
76 resume_from_epoch = try_epoch
77 break
78
79 # Horovod: broadcast resume_from_epoch from rank 0 (which will have
80 # checkpoints) to other ranks.
81 resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
82 name='resume_from_epoch').item()
83
84 # Horovod: print logs on the first worker.
85 verbose = 1 if hvd.rank() == 0 else 0
86
87 # Horovod: write TensorBoard logs on first worker.
88 try:
89 if LooseVersion(torch.__version__) >= LooseVersion('1.2.0'):
90 from torch.utils.tensorboard import SummaryWriter
91 else:
92 from tensorboardX import SummaryWriter
93 log_writer = SummaryWriter(args.log_dir) if hvd.rank() == 0 else None
94 except ImportError:
95 log_writer = None
96
97 # Horovod: limit # of CPU threads to be used per worker.
98 torch.set_num_threads(4)
99
100 kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}
101 train_dataset =
102 datasets.ImageFolder(args.train_dir,
103 transform=transforms.Compose([
104 transforms.RandomResizedCrop(224),
105 transforms.RandomHorizontalFlip(),
106 transforms.ToTensor(),
107 transforms.Normalize(mean=[0.485, 0.456, 0.406],
108 std=[0.229, 0.224, 0.225])
109 ]))
110 # Horovod: use DistributedSampler to partition data among workers. Manually specify
111 # `num_replicas=hvd.size()` and `rank=hvd.rank()`.
112 train_sampler = torch.utils.data.distributed.DistributedSampler(
113 train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
114 train_loader = torch.utils.data.DataLoader(
115 train_dataset, batch_size=allreduce_batch_size,
116 sampler=train_sampler, **kwargs)
117
118 val_dataset =
119 datasets.ImageFolder(args.val_dir,
120 transform=transforms.Compose([
121 transforms.Resize(256),
122 transforms.CenterCrop(224),
123 transforms.ToTensor(),
124 transforms.Normalize(mean=[0.485, 0.456, 0.406],
125 std=[0.229, 0.224, 0.225])
126 ]))
127 val_sampler = torch.utils.data.distributed.DistributedSampler(
128 val_dataset, num_replicas=hvd.size(), rank=hvd.rank())
129 val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size,
130 sampler=val_sampler, **kwargs)
131
132
133 # Set up standard ResNet-50 model.
134 model = models.resnet50()
135
136 # By default, Adasum doesn't need scaling up learning rate.
137 # For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
138 lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1
139
140 if args.cuda:
141 # Move model to GPU.
142 model.cuda()
143 # If using GPU Adasum allreduce, scale learning rate by local_size.
144 if args.use_adasum and hvd.nccl_built():
145 lr_scaler = args.batches_per_allreduce * hvd.local_size()
146
147 # Horovod: scale learning rate by the number of GPUs.
148 optimizer = optim.SGD(model.parameters(),
149 lr=(args.base_lr *
150 lr_scaler),
151 momentum=args.momentum, weight_decay=args.wd)
152
153 # Horovod: (optional) compression algorithm.
154 compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
155
156 # Horovod: wrap optimizer with DistributedOptimizer.
157 optimizer = hvd.DistributedOptimizer(
158 optimizer, named_parameters=model.named_parameters(),
159 compression=compression,
160 backward_passes_per_step=args.batches_per_allreduce,
161 op=hvd.Adasum if args.use_adasum else hvd.Average)
162
163 # Restore from a previous checkpoint, if initial_epoch is specified.
164 # Horovod: restore on the first worker which will broadcast weights to other workers.
165 if resume_from_epoch > 0 and hvd.rank() == 0:
166 filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
167 checkpoint = torch.load(filepath)
168 model.load_state_dict(checkpoint['model'])
169 optimizer.load_state_dict(checkpoint['optimizer'])
170
171 # Horovod: broadcast parameters & optimizer state.
172 hvd.broadcast_parameters(model.state_dict(), root_rank=0)
173 hvd.broadcast_optimizer_state(optimizer, root_rank=0)
174
175 def train(epoch):
176 model.train()
177 train_sampler.set_epoch(epoch)
178 train_loss = Metric('train_loss')
179 train_accuracy = Metric('train_accuracy')
180
181 with tqdm(total=len(train_loader),
182 desc='Train Epoch #{}'.format(epoch + 1),
183 disable=not verbose) as t:
184 for batch_idx, (data, target) in enumerate(train_loader):
185 adjust_learning_rate(epoch, batch_idx)
186
187 if args.cuda:
188 data, target = data.cuda(), target.cuda()
189 optimizer.zero_grad()
190 # Split data into sub-batches of size batch_size
191 for i in range(0, len(data), args.batch_size):
192 data_batch = data[i:i + args.batch_size]
193 target_batch = target[i:i + args.batch_size]
194 output = model(data_batch)
195 train_accuracy.update(accuracy(output, target_batch))
196 loss = F.cross_entropy(output, target_batch)
197 train_loss.update(loss)
198 # Average gradients among sub-batches
199 loss.div_(math.ceil(float(len(data)) / args.batch_size))
200 loss.backward()
201 # Gradient is applied across all ranks
202 optimizer.step()
203 t.set_postfix({'loss': train_loss.avg.item(),
204 'accuracy': 100. * train_accuracy.avg.item()})
205 t.update(1)
206
207 if log_writer:
208 log_writer.add_scalar('train/loss', train_loss.avg, epoch)
209 log_writer.add_scalar('train/accuracy', train_accuracy.avg, epoch)
210
211
212 def validate(epoch):
213 model.eval()
214 val_loss = Metric('val_loss')
215 val_accuracy = Metric('val_accuracy')
216
217 with tqdm(total=len(val_loader),
218 desc='Validate Epoch #{}'.format(epoch + 1),
219 disable=not verbose) as t:
220 with torch.no_grad():
221 for data, target in val_loader:
222 if args.cuda:
223 data, target = data.cuda(), target.cuda()
224 output = model(data)
225
226 val_loss.update(F.cross_entropy(output, target))
227 val_accuracy.update(accuracy(output, target))
228 t.set_postfix({'loss': val_loss.avg.item(),
229 'accuracy': 100. * val_accuracy.avg.item()})
230 t.update(1)
231
232 if log_writer:
233 log_writer.add_scalar('val/loss', val_loss.avg, epoch)
234 log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)
235
236
237 # Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
238 # accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
239 # the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
240 # After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
241 def adjust_learning_rate(epoch, batch_idx):
242 if epoch < args.warmup_epochs:
243 epoch += float(batch_idx + 1) / len(train_loader)
244 lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
245 elif epoch < 30:
246 lr_adj = 1.
247 elif epoch < 60:
248 lr_adj = 1e-1
249 elif epoch < 80:
250 lr_adj = 1e-2
251 else:
252 lr_adj = 1e-3
253 for param_group in optimizer.param_groups:
254 param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj
255
256
257 def accuracy(output, target):
258 # get the index of the max log-probability
259 pred = output.max(1, keepdim=True)[1]
260 return pred.eq(target.view_as(pred)).cpu().float().mean()
261
262
263 def save_checkpoint(epoch):
264 if hvd.rank() == 0:
265 filepath = args.checkpoint_format.format(epoch=epoch + 1)
266 state = {
267 'model': model.state_dict(),
268 'optimizer': optimizer.state_dict(),
269 }
270 torch.save(state, filepath)
271
272
273 # Horovod: average metrics from distributed training.
274 class Metric(object):
275 def __init__(self, name):
276 self.name = name
277 self.sum = torch.tensor(0.)
278 self.n = torch.tensor(0.)
279
280 def update(self, val):
281 self.sum += hvd.allreduce(val.detach().cpu(), name=self.name)
282 self.n += 1
283
284 @property
285 def avg(self):
286 return self.sum / self.n
287
288
289 for epoch in range(resume_from_epoch, args.epochs):
290 train(epoch)
291 validate(epoch)
292 save_checkpoint(epoch)