728x90

https://twobeach.tistory.com/62

 

[Pytorch]pytorch를 활용한 EfficientNet 학습 코드 [2/3]

https://twobeach.tistory.com/61 [Pytorch]pytorch를 활용한 EfficientNet 학습 코드 [1/3] 대학원 생활에 제가 직접 사용하였던 EfficientNet의 학습코드를 설명하기에 앞서 간단하게 설명을 드리면 일반적으로 이미

twobeach.tistory.com

기존에 위의 링크에서 진행했었던 코드와는 다른코드이다. 사용하기 편하게 코드가 되어있으며 Neptune ai를 사용하지 않으실 분들이 사용하기 편한코드이다.

 

출처 : 

https://github.com/lukemelas/EfficientNet-PyTorch

 

GitHub - lukemelas/EfficientNet-PyTorch: A PyTorch implementation of EfficientNet and EfficientNetV2 (coming soon!)

A PyTorch implementation of EfficientNet and EfficientNetV2 (coming soon!) - GitHub - lukemelas/EfficientNet-PyTorch: A PyTorch implementation of EfficientNet and EfficientNetV2 (coming soon!)

github.com

코드는 위의 링크에서 작성한 코드를 저의 데이터 세트에 맞게 변형한 것입니다.

저는 대학원에서 진행한 과제인 폭발물 분류 과제를 수행하기 위해 사용했었습니다.

이 링크는 이 코드의 원작자가 pip를 통한 설치나 git으로 인한 설치이후 사용하는 방법에 대해  설명해주며 코드에 대해 리뷰가 되어있습니다.

 

pip를 통한 사전학습 모델 라이브러리 설치

pip install efficientnet_pytorch

 git을 사용한 사전학습 모델 라이브러리 설치

git clone https://github.com/lukemelas/EfficientNet-PyTorch
cd EfficientNet-Pytorch
pip install -e .

 

먼저 설치가 끝나셨으면 라이브러리를 설치해주시고 model_name 부분에 efficientnet b0~b7 중 원하는 모델을 넣으시면 됩니다. 

class 부분에는 저의 경우 17개여서 17을 넣었지만 사용자가 사용하는 class의 개수에 맞추어서 진행하시면 됩니다.

import numpy as np
import json
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import transforms
import matplotlib.pyplot as plt
import time
import os
import copy
import random

from efficientnet_pytorch import EfficientNet
model_name = 'efficientnet-b0'# efficientnet b0~b7까지 원하는 모델 가지고 오면됩니다.

image_size = EfficientNet.get_image_size(model_name)
print(image_size)
model = EfficientNet.from_pretrained(model_name, num_classes=17)
# 저의 경우 class는 17이였지만 본인의 학습데이터 세트의 class에 맞게 조절하시면 됩니다.

 

이제 본인의 데이터세트를 class에 맞춰서 폴더에 저장한 후 data_path에 해당 경로중 가장 상위 경로를 입력해주시면 train, val, test 데이터 세트를 코드에서 자동으로 구성해 줍니다.

 batch_size의 경우 본인의 컴퓨터 사양에 맞추어서 조절해주셔야 합니다. 인공지능을 사용해본 사람들이라면 다들 아시겠지만 batch_size를 잘못 조절할경우 학습이 진행이 원할하게 돌아가지 못하고 error가 발생됩니다.

## 데이타 로드!!
batch_size  = 64 #batch size의 경우 본인의 컴퓨터 사양에 맞춰어서 조절해주셔야 합니다.
random_seed = 555
random.seed(random_seed)
torch.manual_seed(random_seed)

## make dataset
from torchvision import transforms, datasets
data_path = 'explosive/explosive_data'  # class 별 폴더로 나누어진걸 확 가져와서 라벨도 달아준다
explosive_dataset = datasets.ImageFolder(
                                data_path,
                                transforms.Compose([
                                    transforms.Resize((224, 224)),
                                    transforms.ToTensor(),
                                    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                                ]))
## data split
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
train_idx, tmp_idx = train_test_split(list(range(len(explosive_dataset))), test_size=0.2, random_state=random_seed)
datasets = {}
datasets['train'] = Subset(explosive_dataset, train_idx)
tmp_dataset       = Subset(explosive_dataset, tmp_idx)

val_idx, test_idx = train_test_split(list(range(len(tmp_dataset))), test_size=0.5, random_state=random_seed)
datasets['valid'] = Subset(tmp_dataset, val_idx)
datasets['test']  = Subset(tmp_dataset, test_idx)

## data loader 선언
dataloaders, batch_num = {}, {}
dataloaders['train'] = torch.utils.data.DataLoader(datasets['train'],
                                              batch_size=batch_size, shuffle=True,
                                              num_workers=4)
dataloaders['valid'] = torch.utils.data.DataLoader(datasets['valid'],
                                              batch_size=batch_size, shuffle=False,
                                              num_workers=4)
dataloaders['test']  = torch.utils.data.DataLoader(datasets['test'],
                                              batch_size=batch_size, shuffle=False,
                                              num_workers=4)
batch_num['train'], batch_num['valid'], batch_num['test'] = len(dataloaders['train']), len(dataloaders['valid']), len(dataloaders['test'])
print('batch_size : %d,  tvt : %d / %d / %d' % (batch_size, batch_num['train'], batch_num['valid'], batch_num['test']))

이제 데이터가 정상적으로 들어갔는지 몇개의 데이터만 선별해서 확인해봅시다.

여기서 class name의 경우 본인의 데이터 세트에 맞게 딕셔너리를 수정하셔야합니다!

## 데이타 체크
import torchvision
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated

num_show_img = 5

class_names = {
    "0": "ng_10",
    "1": "ng_50",
    "2": "ng_100",
    "3": "ng_200",
    "4": "petn_10",
    "5": "petn_50",
    "6": "petn_100",
    "7": "petn_200",
    "8": "rdx_10",
    "9": "rdx_50",
    "10": "rdx_100",
    "11": "rdx_200",
    "12": "tnt_10",
    "13": "tnt_50",
    "14": "tnt_100",
    "15": "tnt_200",
    "16": "nor"
}

# train check
inputs, classes = next(iter(dataloaders['train']))
out = torchvision.utils.make_grid(inputs[:num_show_img])  # batch의 이미지를 오려부친다
imshow(out, title=[class_names[str(int(x))] for x in classes[:num_show_img]])
# valid check
inputs, classes = next(iter(dataloaders['valid']))
out = torchvision.utils.make_grid(inputs[:num_show_img])  # batch의 이미지를 오려부친다
imshow(out, title=[class_names[str(int(x))] for x in classes[:num_show_img]])
# test check
inputs, classes = next(iter(dataloaders['test']))
out = torchvision.utils.make_grid(inputs[:num_show_img])  # batch의 이미지를 오려부친다
imshow(out, title=[class_names[str(int(x))] for x in classes[:num_show_img]])

이제 학습모델을 정의하여 봅시다.

def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    train_loss, train_acc, valid_loss, valid_acc = [], [], [], []
   
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'valid']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss, running_corrects, num_cnt = 0.0, 0, 0
           
            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                num_cnt += len(labels)
            if phase == 'train':
                scheduler.step()
           
            epoch_loss = float(running_loss / num_cnt)
            epoch_acc  = float((running_corrects.double() / num_cnt).cpu()*100)
           
            if phase == 'train':
                train_loss.append(epoch_loss)
                train_acc.append(epoch_acc)
            else:
                valid_loss.append(epoch_loss)
                valid_acc.append(epoch_acc)
            print('{} Loss: {:.2f} Acc: {:.1f}'.format(phase, epoch_loss, epoch_acc))
           
            # deep copy the model
            if phase == 'valid' and epoch_acc > best_acc:
                best_idx = epoch
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
#                 best_model_wts = copy.deepcopy(model.module.state_dict())
                print('==> best model saved - %d / %.1f'%(best_idx, best_acc))

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best valid Acc: %d - %.1f' %(best_idx, best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    torch.save(model.state_dict(), 'explosive_EffcientNet_e0_batch64.pt')
    #모델 저장부분입니다. 저장파일 이름을 본인이 변경하셔도 상관없습니다 확장자는 pt파일로 해주시길 바랍니다
    print('model saved')
    return model, best_idx, best_acc, train_loss, train_acc, valid_loss, valid_acc

 

device 부분에 cuda라고 뜨질 않으신다면 cuda를 다시 한번 설치해주시는 것을 추천드립니다.

 

이제 학습 모델 파라미터를 조절해 봅시다. lmbda의 값을 변형하셔도 좋지만 default로 지정해준 값을 사용하시는걸 추천드립니다.

optimizer 함수의 경우 SGD를 사용하였는데 다른것을 사용하셔도 괜찮습니다.

lr(learning rate)의 값도 저의 데이터 세트에 맞춘 값이므로 변형하셔서 사용하시기를 추천드립니다.

crossentropyloss 함수를 사용한 이유는 분류과제이기 때문에 사용한 것이므로 다른 것을 사용하셔도 무방합니다.

 

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")  # set gpu
print(device)

model = model.to(device)

criterion = nn.CrossEntropyLoss()

optimizer_ft = optim.SGD(model.parameters(),
                         lr = 0.05,
                         momentum=0.9,
                         weight_decay=1e-4)

lmbda = lambda epoch: 0.98739
exp_lr_scheduler = optim.lr_scheduler.MultiplicativeLR(optimizer_ft, lr_lambda=lmbda)

아래의 코드를 실행하면 학습이 시작됩니다! 저의 경우 학습 횟수(epoch)를 1000회 시행하였지만 efficientnet의 특성상 많은 학습 회수를 요구하지 않습니다.

학습횟수가 많아지면 많아질수록 과적합 현상(overfitting)이 발생되기 때문에 본인의 데이터 세트의 맞춰서 학습 횟수(epoch)을 조절합시다.

model, best_idx, best_acc, train_loss, train_acc, valid_loss, valid_acc = train_model(model, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=1000)

 

이제 학습이 끝났다면 결과 그래프를 코드를 통해 그려보겠습니다.

## 결과 그래프 그리기
print('best model : %d - %1.f / %.1f'%(best_idx, valid_acc[best_idx], valid_loss[best_idx]))
fig, ax1 = plt.subplots()

ax1.plot(train_acc, 'b-')
ax1.plot(valid_acc, 'r-')
plt.plot(best_idx, valid_acc[best_idx], 'ro')
ax1.set_xlabel('epoch')
# Make the y-axis label, ticks and tick labels match the line color.
ax1.set_ylabel('acc', color='k')
ax1.tick_params('y', colors='k')

ax2 = ax1.twinx()
ax2.plot(train_loss, 'g-')
ax2.plot(valid_loss, 'k-')
plt.plot(best_idx, valid_loss[best_idx], 'ro')
ax2.set_ylabel('loss', color='k')
ax2.tick_params('y', colors='k')

fig.tight_layout()
plt.show()

원하는 대로 그래프에서 학습이 잘 이루어진 것을 확인하셨다면 이제 테스트를 해봅시다.

def test_and_visualize_model(model, phase = 'test', num_images=4):
    # phase = 'train', 'valid', 'test'
   
    was_training = model.training
    model.eval()
    fig = plt.figure()
   
    running_loss, running_corrects, num_cnt = 0.0, 0, 0

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)  # batch의 평균 loss 출력

            running_loss    += loss.item() * inputs.size(0)
            running_corrects+= torch.sum(preds == labels.data)
            num_cnt += inputs.size(0)  # batch size

    #         if i == 2: break

        test_loss = running_loss / num_cnt
        test_acc  = running_corrects.double() / num_cnt      
        print('test done : loss/acc : %.2f / %.1f' % (test_loss, test_acc*100))

    # 예시 그림 plot
    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders[phase]):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)        

            # 예시 그림 plot
            for j in range(1, num_images+1):
                ax = plt.subplot(num_images//2, 2, j)
                ax.axis('off')
                ax.set_title('%s : %s -> %s'%(
                    'True' if class_names[str(labels[j].cpu().numpy())]==class_names[str(preds[j].cpu().numpy())] else 'False',
                    class_names[str(labels[j].cpu().numpy())], class_names[str(preds[j].cpu().numpy())]))
                imshow(inputs.cpu().data[j])          
            if i == 0 : break


    model.train(mode=was_training);  # 다시 train모드로
   
    ## TEST!
test_and_visualize_model(model, phase = 'test')

테스트 코드를 통해 학습이 잘되었는지 확인하실 수 있습니다. 원하는 결과를 얻지 못하셨다면 파라미터를 잘 조절해가시면서 다시한번 학습을 권해드리며 efficientnet 모델 특성상 안좋은 환경에서도 충분히 잘돌아가도록 설계된 모델입니다. 굳이 무거운 가중치를 가지는 모델인 b7까지 학습을 진행하시지 않으셔도 충분히 좋은 결과를 얻으실 수 있으니 낮은 모델부터 차근차근 학습시키시는 것을 추천드립니다. 아래의 사진들은 논문에서 제공하는 결과표입니다. 참고하실 분들은 참고해주세요

 

 

728x90

+ Recent posts