사실상 2020년을 끝으로 dataloader부터 metric까지 내 손으로 코드를 짠 적이 없다.
20년 말부터 올 5월까지 일할 때는 기존의 모델을 embedding용으로 사용했고, 또 얼마 전까지 일하던 회사에서도 기존에 있던 코드를 수정하거나, pre processing/post processing을 처리한 수준이었으니까!
학습을 시키려 하면 호흡이 턱턱 막히더라..
마침 pytorch-lightning이라는 간편하고 멋진 도구도 생겼으니 석사 진학 / 취업 전에 얼른 구현 능력을 올리려 한다.
1998년에 나온 논문입니다.
Document Recognition 문제를 풀기 위해 제안된 방법론인가 봅니다.
제목이 "Gradient-Based Learning Applied to Document Recognition'이네요.
다만 오늘은 MNIST dataset 을 분류하는 용도로 사용해보려 합니다.
구조는 위와 같습니다.
convNet - subsample(pooling) - convNet - subsample - FC - FC
간단하네요. 얼른 구현하고 자러 가겠습니다!
pytorch-lightning의 멋진 부분 중 하나는, LightningDataModule이라는 도구입니다.
기존의 pytorch에 비해 구성이 좀더 직관적이고 코드를 묶어주는 느낌이 들고,
multi gpu 할당이 쉬워졌다고 합니다. -> 다만 이는 경험이 없어 확답은 못 합니다.
우선 데이터를 다운 받아보겠습니다.
데이터와 풀 문제가 있어 모델이 존재하지, 모델이 있어 데이터가 존재하지 않으니까요.
ModuleNotFoundError: No module named 'pytorch_lightning'
라네요.
colab에 pytorch-lightning이 설치되어있지 않나 봅니다. 설치해줍시다.
잘 설치되고, import 되었습니다.
pytorch-lightning의 매력적인 점은, 역시 torch의 후예답게 도큐먼트가 무척 친절하다는 부분입니다.
LigtningDataModule documentation 에 나온 코드를 참고하였습니다.
class MNISTDataModule(LightningDataModule):
def __init__(self, data_dir: str = "path/to/dir", batch_size: int = 32):
super().__init__()
self.data_dir = data_dir
self.batch_size = batch_size
self.transform = transforms.Compose([transforms.ToTensor()])
def prepare_data(self):
"""
- download
- tokenize
- etc ...
"""
MNIST(self.data_dir, train=True, download=True) # just download at data_dir
MNIST(self.data_dir, train=False, download=True) # just download at data_dir
def setup(self, stage:Optional[str]) -> None:
"""
- count number of classes
- build vocabulary
- perform train/val/test split
- apply transforms
- etc...
"""
mnist_full = MNIST(self.data_dir, train=True, transform=self.transform)
self.mnist_train, self.mnist_val = random_split(mnist_full, [55000, 5000]) # split data
self.mnist_test = MNIST(self.data_dir, train=False, transform=self.transform)
def train_dataloader(self):
return DataLoader(self.mnist_train, batch_size=self.batch_size, shuffle=True)
def val_dataloader(self):
return DataLoader(self.mnist_val, batch_size=self.batch_size, shuffle=False)
def test_dataloader(self):
return DataLoader(self.mnist_test, batch_size=self.batch_size, shuffle=False)
LightningDataModule에서 미리 작성해준 틀에 맞추어 MNIST dataloader를 구성하는 코드를 작성해보았습니다.
현재 train_set의 크기가 55000, validation_set 의 크기가 5000 이고 이를 32로 나누면, 1718.75, 156.25 이니 맞게 나왔습니다.
label들을 뽑아봤는데, 32개가 잘 출력되는 것을 확인할 수 있었습니다.
자 그럼 이제, model을 작성하러 가보도록 하겠습니다.
모델 구조를 다시 보겠습니다.
오래된 논문이라 layer에 대한 정보가 없이 feature map의 shape만 줘서.. 제가 유츄해야 합니다.
그런데 오타가 있단 사실을 알았습니다.
첫 번째 입력에서 32 32 -> 28 28 이 일반적인 구조로 나올 수 없거든요.
그래서 제가 맘대로 짜기로 했습니다.
MNIST 데이터셋은 28 28 의 크기를 가지니 아래의 순서대로 계산을 하도록 하겠습니다.
그래도 큰 구조를 바꾸진 않았습니다. 그러면 여기 필요한 layer들
conv1, conv2, subsample, fc1, fc2 를 각각 구현하고, forward함수까지 작성해보도록 하겠습니다.
class LeNet(nn.Module):
def __init__(self):
super(LeNet, self).__init__()
self.conv1 = nn.Conv2d(in_channels=2,
out_channels=6,
kernel_size=3,
stride=1,
padding=0,
bias=True)
self.conv2 = nn.Conv2d(in_channels=3,
out_channels=6,
kernel_size=3,
stride=1,
padding=0,
bias=True)
self.subsample = nn.MaxPool2d(kernel_size=3)
self.fc1 = nn.Linear(588, 1024)
self.fc2 = nn.Linear(1024, 10)
def forward(self, x):
bs = x.size[0]
fm1 = self.conv1(x)
fm2 = self.subsample(fm1)
fm3 = self.conv2(fm2)
fm4 = self.subsample(fm3)
flattend = fm4.view(bs, -1)
em1 = fc1(flattend)
em2 = fc2(em1)
return F.log_softmax(em1)
class LeNetModule(pl.LightningModule):
def __init__(self,
init_lr : int = 1e-2):
super().__init__()
self.init_lr = init_lr
self.net = LeNet()
print("init done!")
def forward(self, X):
return self.net(X)
def training_step(self, batch, batch_idx):
X, y = batch
prediction = self.forward(X)
loss = F.cross_entropy(prediction, y)
return {'loss': loss}
def validation_step(self, batch, batch_idx):
X, y = batch
prediction = self.forward(X)
loss = F.cross_entropy(prediction, y)
acc = torch.sum(prediction.max(1, keepdim=True)[1] == y)
metrics = {'val_acc': acc, 'val_loss': loss}
self.log_dict(metrics)
def test_step(self, batch, batch_idx):
X, y= batch
prediction = self.forward(X)
loss = F.cross_entropy(prediction, y)
acc = torch.sum(prediction.max(1)[1] == y) * 100 / len(y)
metrics = {'test_acc': acc, 'test_loss': loss}
self.log_dict(metrics)
def configure_optimizers(self):
return torch.optim.SGD(self.parameters(), lr=self.init_lr)
특별한 함수는 없습니다.
LightningModule의 정말 멋진 점은, 채워야 될 코드들이 블럭처럼 비워져있어서 이를 채우기만 하면 된다는 점입니다.
실제로 이전에 datamodule 사용 시 그런 부분을 놓쳐 삽질한 적이 있는데, 그저 믿고 따라가면 예쁘게 코드가 짜집니다.
training_step, validation_step, test_step에서는 각 stage에 맞게 코드를 넣어주시면 됩니다.
다만 조금 헷갈렸던 부분의 코드를 공유하자면 acc를 구하는 부분이었습니다.
import numpy as np
a = [[1 if j % 10 == i else 0 for i in range(10)] for j in range(32)] # a.shape = tensor.Size([32, 10])
y = [i % 10 for i in range(32)] #
y = np.array(y)
y = torch.tensor(y) # y.shape = tensor.Size([32])
a = np.array(a)
b = torch.tensor(a)
c = b.max(1)[1] # c.shape = tensor.Size([32])
result = (y==c) # result.shape = tensor.Size([32])
이렇게 실행하면, result에는 하나의 batch의 각 idx에 해당하는 label과 prediction값이 같으면 1, 다르면 0이 들어갑니다.
이를 sum해주면 하나의 batch에 대해 몇 개의 예측이 맞았는지 계산할 수 있습니다.
거의 다 왔습니다.
우선 class로 선언만 해둔 LightningModule 객체를 만듭니다.lenet_module = LeNetModule(init_lr=1e-2)
우선은 초기 lr만 입력으로 받아 객체를 생성합니다.
추후 모델과 task가 복잡해지면 많은 인자를 받아 생성하게 되겠죠.
device = 1 if torch.cuda.is_available() else 0
max_epochs=10
trainer = pl.Trainer(gpus=device,
max_epochs=max_epochs,
progress_bar_refresh_rate=20,
num_sanity_val_steps=0)
trainer.fit(model=lenet_module, datamodule=mnist_datamodule)
앞서 선언해준 device, max_epochs를 입력으로 넣어 trainer 객체를 만듭니다.
그 뒤, LightningModule, DataModule을 넣어 trainer.fit 함수를 실행하면 train, validation 의 stage에 맞게 코드가 실행됩니다.
마지막으로, 같은 LightningModule, DataModule 을 입력으로 trainer.test 함수를 실행하면 test가 진행됩니다.
간단한 모델임에도 96%라는 좋은 수치를 보이며 test가 마무리 됩니다.
중간에 얼른 짜고 자러 가겠다고는 허세를 부려놓긴 했지만
1. pytorch-lightning에 대한 미숙함
2. 간만에 짜보는 학습 코드에 온 뇌정지
등의 이유로 글을 쓰기 시작한지 약 20시간 만에.. 마무리를 짓습니다.
한 번 짜봤으니 속도가 좀 붙겠죠.
다음은 VggNet을 구현해볼 예정입니다.
전체 코드는 아래 링크에 있습니다.
코드 링크
질문이 있거나 지적할 부분이 있다면 편하게 댓글 달아주세요. 감사합니다.
댓글 영역