经常用,保存一下,方便查看
class Data(Dataset): def __init__(self, img_path, txt_path, dataset = '', data_transforms=None, loader = default_loader): with open(txt_path) as input_file: lines = input_file.readlines() self.img_name =A= [os.path.join(img_path, line.strip().split(' ')[0]) for line in lines] self.img_label =B= [int(line.strip(' ').split(' ')[-1]) for line in lines] self.data_transforms = data_transforms self.dataset = dataset self.loader = loader def __len__(self): return len(self.img_name) def __getitem__(self, item): img_name = self.img_name[item] label = self.img_label[item] img = self.loader(img_name) if self.data_transforms is not None: try: img = self.data_transforms[self.dataset](img) except: print("Cannot transform image: {}".format(img_name)) return img, label
def default_loader(path): return Image.open(path).convert('RGB') class Train(Dataset): def __init__(self, txt, transform=True, target_transform=None, loader=default_loader): # mean = [0.42963392, 0.45779333, 0.45690465] # std = [0.27065045, 0.26811078, 0.29823384] mean = [0.4309895, 0.4576381, 0.4534026] std = [0.2699252, 0.26827288, 0.29846913] transform = transforms.Compose([transforms.Resize((32, 32)), transforms.ToTensor(),transforms.Normalize(mean, std)]) super(Train, self).__init__() fh = open(txt, 'r') imgs = [] for line in fh: words = line.split() ## train/0.jpg 1 words1 = 'SceneClassification/'+os.path.join(words[0]) # print('img path:', words1, 'label:', words[1]) imgs.append((words1, int(words[1]))) self.imgs = imgs self.transforms =transform self.target_transform = target_transform self.loader = loader def __getitem__(self, index): # print('index:',index) img_path, label = self.imgs[index] img = self.loader(img_path) if self.transforms is not None: img = self.transforms(img) # 数据标签转换为Tensor return label, img def __len__(self): return len(self.imgs) class Test(Dataset): def __init__(self, txt, transform=True, target_transform=None, loader=default_loader): # mean = [0.42963392, 0.45779333, 0.45690465] # std = [0.27065045, 0.26811078, 0.29823384] mean = [0.4309895, 0.4576381, 0.4534026] std = [0.2699252, 0.26827288, 0.29846913] transform = transforms.Compose([transforms.Resize((32, 32)), transforms.ToTensor(),transforms.Normalize(mean, std)]) super(Test, self).__init__() fh = open(txt, 'r') imgs = [] for line in fh: words = line.split() ## train/0.jpg 1 words1 = 'SceneClassification/' + os.path.join(words[0]) # print('img path:', words1, 'label:', words[1]) imgs.append((words1, int(words[1]))) self.imgs = imgs self.transforms = transform self.target_transform = target_transform self.loader = loader def __getitem__(self, index): # print('index:',index) img_path, label = self.imgs[index] img = self.loader(img_path) if self.transforms is not None: img = self.transforms(img) # 数据标签转换为Tensor return label, img def __len__(self): return len(self.imgs)
写了一个简单地分类任务(基于PyTorch):
from __future__ import print_function, division import torch import torch.nn as nn import torch.optim as optim from torch.optim import lr_scheduler from torch.autograd import Variable from torchvision import models, transforms import time import os from torch.utils.data import Dataset from PIL import Image # use PIL Image to read image def default_loader(path): try: img = Image.open(path) return img.convert('RGB') except: print("Cannot read image: {}".format(path)) # define your Dataset. Assume each line in your .txt file is [name/tab/label], for example:0001.jpg 1 class Data(Dataset): def __init__(self, img_path, txt_path, dataset = '', data_transforms=None, loader = default_loader): with open(txt_path) as input_file: lines = input_file.readlines() self.img_name =A= [os.path.join(img_path, line.strip().split(' ')[0]) for line in lines] self.img_label =B= [int(line.strip(' ').split(' ')[-1]) for line in lines] self.data_transforms = data_transforms self.dataset = dataset self.loader = loader def __len__(self): return len(self.img_name) def __getitem__(self, item): img_name = self.img_name[item] label = self.img_label[item] img = self.loader(img_name) if self.data_transforms is not None: try: img = self.data_transforms[self.dataset](img) except: print("Cannot transform image: {}".format(img_name)) return img, label def train_model(model, criterion, optimizer, scheduler, num_epochs, use_gpu): since = time.time() best_model_wts = model.state_dict() best_acc = 0.0 count_batch = 0 for epoch in range(num_epochs): begin_time = time.time() print('Epoch {}/{}'.format(epoch, num_epochs - 1)) print('-' * 10) # Each epoch has a training and validation phase for phase in ['train', 'test']: if phase == 'train': scheduler.step() model.train(True) # Set model to training mode else: model.train(False) # Set model to evaluate mode running_loss = 0.0 running_corrects = 0.0 # Iterate over data. for data in dataloders[phase]: count_batch += 1 # get the inputs inputs, labels = data # wrap them in Variable if use_gpu: inputs = Variable(inputs.cuda()) labels = Variable(labels.cuda()) else: inputs, labels = Variable(inputs), Variable(labels) # zero the parameter gradients optimizer.zero_grad() # forward outputs = model(inputs) _, preds = torch.max(outputs.data, 1) loss = criterion(outputs, labels) # backward + optimize only if in training phase if phase == 'train': loss.backward() optimizer.step() # statistics # running_loss += loss.data[0] running_loss += loss.item() running_corrects += torch.sum(preds == labels.data).to(torch.float32) # print result every 10 batch if count_batch%10 == 0: batch_loss = running_loss / (batch_size*count_batch) batch_acc = running_corrects / (batch_size*count_batch) print('{} Epoch [{}] Batch [{}] Loss: {:.4f} Acc: {:.4f} Time: {:.4f}s'. format(phase, epoch, count_batch, batch_loss, batch_acc, time.time()-begin_time)) begin_time = time.time() epoch_loss = running_loss / dataset_sizes[phase] epoch_acc = running_corrects / dataset_sizes[phase] print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc)) # save model if phase == 'train': if not os.path.exists('output'): os.makedirs('output') torch.save(model, 'output/resnet_epoch{}.pkl'.format(epoch)) # deep copy the model if phase == 'test' and epoch_acc > best_acc: best_acc = epoch_acc best_model_wts = model.state_dict() time_elapsed = time.time() - since print('Training complete in {:.0f}m {:.0f}s'.format( time_elapsed // 60, time_elapsed % 60)) print('Best test Acc: {:4f}'.format(best_acc)) # load best model weights model.load_state_dict(best_model_wts) return model if __name__ == '__main__': mean = [0.4309895, 0.4576381, 0.4534026] std = [0.2699252, 0.26827288, 0.29846913] data_transforms = { 'train': transforms.Compose([ transforms.Resize((150, 150)), transforms.ToTensor(), transforms.Normalize(mean, std) ]), 'test': transforms.Compose([ transforms.Resize((150, 150)), transforms.ToTensor(), transforms.Normalize(mean, std) ]), } use_gpu = torch.cuda.is_available() batch_size = 16 num_class = 6 image_datasets = {x: Data(img_path='SceneClassification', txt_path=('SceneClassification/' + x + '.txt'), data_transforms=data_transforms, dataset=x) for x in ['train', 'test']} # wrap your data and label into Tensor dataloders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'test']} dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'test']} # get model and replace the original fc layer with your fc layer model_ft = models.resnet50(pretrained=True) num_ftrs = model_ft.fc.in_features model_ft.fc = nn.Linear(num_ftrs, num_class) # if use gpu if use_gpu: model_ft = model_ft.cuda() # define cost function criterion = nn.CrossEntropyLoss() # Observe that all parameters are being optimized optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.005, momentum=0.9) # Decay LR by a factor of 0.2 every 5 epochs exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=5, gamma=0.2) # multi-GPU model_ft = torch.nn.DataParallel(model_ft, device_ids=[0]) # train model model_ft = train_model(model=model_ft, criterion=criterion, optimizer=optimizer_ft, scheduler=exp_lr_scheduler, num_epochs=25, use_gpu=use_gpu) # save best model torch.save(model_ft,"output/best_resnet.pkl")