人工知能に関する断創録

このブログでは人工知能のさまざまな分野について調査したことをまとめています(更新停止: 2019年12月31日)

PyTorch (11) Variational Autoencoder

今回は、Variational Autoencoder (VAE) の実験をしてみよう。

実は自分が始めてDeep Learningに興味を持ったのがこのVAEなのだ!VAEの潜在空間をいじって多様な顔画像を生成するデモ(Morphing Faces)を見て、これを音声合成の声質生成に使いたいと思ったのが興味のきっかけだった。

今回の実験は、PyTorchの公式にあるVAEのスクリプト を自分なりに読み解いてまとめてみた結果になっている。

180221-variational-autoencoder.ipynb - Google ドライブ

さっそく実験!いつものimport。

import os
import numpy as np
import torch
import torch.nn as nn
import torch.utils.data
import torch.optim as optim
from torch.autograd import Variable
from torch.nn import functional as F
from torchvision import datasets, transforms
from torchvision.utils import save_image

batch_size = 128
num_epochs = 100
seed = 1
out_dir = './vae_2'

出力先のディレクトリを vae_2 としている。潜在空間が2次元であるVAEの結果を意味する。

cuda = torch.cuda.is_available()
if cuda:
    print('cuda is available!')

if not os.path.exists(out_dir):
    os.mkdir(out_dir)

torch.manual_seed(seed)
if cuda:
    torch.cuda.manual_seed(seed)

MNISTのデータをロードする。前回と同じだけど標準化は [0, 1] にしている。

train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('data',
                   train=True,
                   download=True,
                   transform=transforms.ToTensor()),
    batch_size=batch_size,
    shuffle=True
)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('data',
                   train=False,
                   transform=transforms.ToTensor()),
    batch_size=batch_size,
    shuffle=True
)

VAEのアーキテクチャ

VAEは、Autoencoderと似ているが、Encoderの出力が正規分布の平均と共分散行列になり、潜在表現zがその正規分布からサンプリングされる点が異なる。潜在表現zはランダムサンプリングされるため同じ入力画像Xを入れても毎回異なるzにマッピングされる。

f:id:aidiary:20180228212323p:plain

上図のようにEncoderは正規分布の平均ベクトル(mu)と分散共分散行列の対数(logvar)を出力する。ここでは、潜在表現 z は可視化しやすいように2次元としたので、平均ベクトルのサイズは2となる。分散共分散行列は対角行列と仮定しているため対角成分のみ取ってこちらも2次元ベクトルとなる。対数を取る理由がはっきりしないけどアンダーフローを防ぐためかな?

Encoderが出力した平均と分散を持つ正規分布から入力Xの潜在表現zをサンプリングする。

 z \sim N(\mu(X), \Sigma(X))

ただ、これを普通にやると誤差逆伝搬ができないので Reparameterization Trick というのを使う。上の式でサンプリングするのではなく、

 \epsilon \sim N(0, I)

 z = \mu(X) + \Sigma(X)^{\frac{1}{2}} \epsilon

でzを計算する。このように計算すると和と積の演算だけで構成されるので計算グラフが構築でき、誤差逆伝搬ができるとのこと。PyTorchには normal_(mean=0, std=1) という正規乱数を生成するTensor Operationが実装されている。

ここがちょっとわからない。meanとstdを指定しても乱数生成できるみたいだけどReparameterization Trick必要なのかな?多次元正規分布になるとできないのだろうか?

上のアーキテクチャをPyTorchのコードで書くと下のようになる。

class VAE(nn.Module):

    def __init__(self):
        super(VAE, self).__init__()
        
        self.fc1 = nn.Linear(784, 512)
        self.fc21 = nn.Linear(512, 2)  # mu
        self.fc22 = nn.Linear(512, 2)  # logvar

        self.fc3 = nn.Linear(2, 512)
        self.fc4 = nn.Linear(512, 784)
        
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()
    
    def encode(self, x):
        h = self.relu(self.fc1(x))
        return self.fc21(h), self.fc22(h)

    def reparameterize(self, mu, logvar):
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = Variable(std.data.new(std.size()).normal_())
            return eps.mul(std).add_(mu)
        else:
            return mu

    def decode(self, z):
        h = self.relu(self.fc3(z))
        return self.sigmoid(self.fc4(h))
    
    def forward(self, x):
        x = x.view(-1, 784)
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        return self.decode(z), mu, logvar

model = VAE()
if cuda:
    model.cuda()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

ちょっと数式とコードの対応関係を補足すると logvar \log \Sigma にあたる。

std = logvar.mul(0.5).exp_() は、

 std = \exp(0.5 * \log \Sigma) = \exp(\log \Sigma ^ \frac{1}{2}) = \Sigma^{\frac{1}{2}}

となる。よって、eps.mul(std).add_(mu) は、

 \epsilon * \Sigma^{\frac{1}{2}} + \mu

となり、先の式とコードが一致することがわかる!数式だとεはスカラーっぽいけど実装ではベクトルになっている。潜在表現の各次元ごとに異なる乱数をかけるようだ。

スカラーでもよいのかな?

VAEの損失関数

VAEの損失関数は既存のものではなく、独自の定義が必要。

def loss_function(recon_x, x, mu, logvar):
    # size_average=Falseなのでバッチ内のサンプルの合計lossを求める
    # reconstruction loss 入力画像をどのくらい正確に復元できたか?
    # 数式では対数尤度の最大化だが交差エントロピーlossの最小化と等価
    recon = F.binary_cross_entropy(recon_x, x.view(-1, 784), size_average=False)

    # 潜在空間zに対する正則化項
    # P(z|x) が N(0, I)に近くなる(KL-distanceが小さくなる)ようにする
    kld = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

    return recon + kld

このコードを理解するのがとても難しかった。数式の導出は

を見ていただくほうがよいと思う。最終的には下の目的関数が出て来るのだがそこまでのたどり着き方にはいくつかアプローチがあるようだ。

VAEの最終的な目的関数は、

 \log P(X) - D_{KL} [ Q(z|X) || P(z|X) ] = E [\log P(X|z) ] - D_{KL} [Q(z|X)||P(z)]

となる。左辺のKL divergenceは  D_{KL} [ Q(z|X) || P(z|X) ] \ge 0 なので

 \log P(X) \ge E [\log P(X|z) ] - D_{KL} [Q(z|X)||P(z)]

が成り立つ。たとえば、12 - 2 = 10 のとき 12 >= 10。

左辺がデータXの対数尤度なので生成モデルにおいて最大化したい値になる。右辺は 変分下限(ELBO: evidence lower bound) と呼び、対数尤度の下限となる。ここで、対数尤度を最大化する問題を変分下限を最大化する問題に置き換えるのがポイント。変分下限をできるだけ大きくしてやれば、それより大きい対数尤度も大きくなるというわけ。変分下限を最大化するには、

 E_{Q(z|X)} [\log P(X|z) ] \to \max

 D_{KL} [Q(z|X) || P(z) ] \to \min

とすればよい。簡単のため期待値の分布は省略してたけど最初の式をきちんと導出していくと Q(z|X) になることがわかる。

Reconstruction Loss

実はここの理解がちょっと怪しい。尤度最大化って交差エントロピーの最小化と等価で正しい?ときどき一部の論文でlossの式に尤度が書いてあってちょっと混乱する。

 E_{Q(z|X)} [\log P(X|z) ] \to \max

から見ていく。確率分布で書かれているのでわかりにくいが、Q(z|X) は入力画像Xを潜在空間zにマッピングしているためEncoderとみなせる。また、P(X|z) は潜在空間zから元の画像XにマッピングしているためDecoderとみなせる。つまり、この式は入力画像Xを潜在空間zに落としてそこからXに戻したときの対数尤度を最大化しろという意味だと解釈できる。

この対数尤度の最大化は入力画像 Xと再構成画像 \hat{X} の交差エントロピーの最小化とみなせる。つまり、数式で表すと下の部分と一致する。これをReconstruction Lossと呼ぶ。

recon = F.binary_cross_entropy(recon_x, x.view(-1, 784), size_average=False)

KL Divergence

次はこの項。

 D_{KL} [Q(z|X) || P(z) ] \to \min

先に述べたように Q(z|X) は入力画像Xを潜在空間zにマッピングしているためEncoderとみなせる。つまり、Encoderで入力画像Xをマッピングした分布 Q(z|X) が P(z) に近くなるようにしろという制約 と解釈できる。

ここで、P(z) は簡単のため平均0、分散1の多次元正規分布 N(0, I) と仮定する

先に書いたようにEncoderの出力は N(μ(X), Σ(X)) の正規分布に従うようにしたため、上の式は

 D_{KL} [N(\mu(X), \Sigma (X)) || N(0, I) ]

となる。多次元正規分布間のKL Divergenceは下の簡単な式で求まる。

 \displaystyle D_{KL} [N(\mu(X), \Sigma(X)) || N(0, I) ] = - \frac{1}{2} \sum_{k} (1 + \log \Sigma(X) - \mu(X)^2 - \Sigma(X))

参考: 多変量正規分布の場合のKullback Leibler Divergenceの導出 - Qiita

これをコードで書くと

kld = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())

となる。この kld潜在空間zが N(0, I) にちらばるようにする正則化項とみなせる。あとで実際に潜在空間を描画してみるとこの正則化が正しく機能していることがわかる。

訓練ループ

あとはいつもの訓練ループなので比較的簡単。

def train(epoch):
    model.train()
    train_loss = 0
    for batch_idx, (data, _) in enumerate(train_loader):
        if cuda:
            data = Variable(data.cuda())
        else:
            data = Variable(data)
        optimizer.zero_grad()
        recon_batch, mu, logvar = model(data)
        loss = loss_function(recon_batch, data, mu, logvar)
        loss.backward()
        train_loss += loss.data[0]
        optimizer.step()
    
    # loss_function() は平均ではなく全サンプルの合計lossを返すのでサンプル数で割る
    train_loss /= len(train_loader.dataset)

    return train_loss    
    

def test(epoch):
    model.eval()
    test_loss = 0
    for batch_idx, (data, _) in enumerate(test_loader):
        if cuda:
            data = Variable(data.cuda(), volatile=True)
        else:
            data = Variable(data, volatile=True)
        recon_batch, mu, logvar = model(data)
        loss = loss_function(recon_batch, data, mu, logvar)
        test_loss += loss.data[0]
        
        if epoch % 10 == 0:
            # 10エポックごとに最初のminibatchの入力画像と復元画像を保存
            if batch_idx == 0:
                n = 8
                comparison = torch.cat([data[:n],
                                        recon_batch.view(batch_size, 1, 28, 28)[:n]])
                save_image(comparison.data.cpu(),
                           '{}/reconstruction_{}.png'.format(out_dir, epoch), nrow=n)

    test_loss /= len(test_loader.dataset)

    return test_loss

loss_list = []
test_loss_list = []
for epoch in range(1, num_epochs + 1):
    loss = train(epoch)
    test_loss = test(epoch)

    print('epoch [{}/{}], loss: {:.4f} test_loss: {:.4f}'.format(
        epoch + 1,
        num_epochs,
        loss,
        test_loss))

    # logging
    loss_list.append(loss)
    test_loss_list.append(test_loss)

# save the training model
np.save('loss_list.npy', np.array(loss_list))
np.save('test_loss_list.npy', np.array(test_loss_list))
torch.save(model.state_dict(), 'vae.pth')

学習曲線

学習曲線を描いてみよう。

import matplotlib.pyplot as plt
%matplotlib inline
loss_list = np.load('{}/loss_list.npy'.format(out_dir))
plt.plot(loss_list)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.grid()

f:id:aidiary:20180228225157p:plain

test_loss_list = np.load('{}/test_loss_list.npy'.format(out_dir))
plt.plot(test_loss_list)
plt.xlabel('epoch')
plt.ylabel('test loss')
plt.grid()

f:id:aidiary:20180228225210p:plain

テストlossも減っており、学習が進んでいることが確認できる。

入力画像と再構成画像の比較

上が入力画像で下が再構成画像。

from IPython.display import Image
Image('vae_2/reconstruction_10.png')

f:id:aidiary:20180228225258p:plain

Image('vae_2/reconstruction_100.png')

f:id:aidiary:20180228225314p:plain

10エポック目だとあまり再現できてないが、100エポック目だとある程度は再現できている。ただ、潜在空間を2次元とかなり絞ったのであまりくっきりと再現できていない。潜在空間を20次元にすると

f:id:aidiary:20180228225426p:plain

こんな感じでほぼ再構成できることがわかる。潜在空間が2次元だとテストlossは150くらいで収束するが、20次元にするとテストlossは90台まで下がる。

潜在空間の可視化

最後にテストデータを使って潜在空間を可視化しよう。

model.load_state_dict(torch.load('{}/vae.pth'.format(out_dir),
                                 map_location=lambda storage,
                                 loc: storage))
test_dataset = datasets.MNIST('./data', download=True, train=False, transform=transforms.ToTensor())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=10000, shuffle=False)
images, labels = iter(test_loader).next()
images = images.view(10000, -1)

# 784次元ベクトルを2次元ベクトルにencode
z = model.encode(Variable(images, volatile=True))
mu, logvar = z
mu, logvar = mu.data.numpy(), logvar.data.numpy()
print(mu.shape, logvar.shape)

各入力画像Xごとに正規分布のパラメータ μ(X), Σ(X) が出力されるがここでは平均 μ(X) の場所に点をプロットすることにする。ランダムサンプリングすると平均の場所にマッピングされる可能性が一番高い。

import pylab
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 10))
plt.scatter(mu[:, 0], mu[:, 1], marker='.', c=labels.numpy(), cmap=pylab.cm.jet)
plt.colorbar()
plt.xlim((-6, 6))
plt.ylim((-6, 6))
plt.grid()

f:id:aidiary:20180228225730p:plain

潜在空間zは平均0で分散Iの正規分布 P(z) = N(0, I) 上にデータが散らばっており、損失関数のKL divergenceの正則化項が効いていることがわかる。

次はConditional VAEいってみよう!

参考文献

PyTorch (10) Autoencoder

Autoencoderの実験!MNISTで試してみよう。

180221-autoencoder.ipynb - Google ドライブ

28x28の画像 x をencoder(ニューラルネット)で2次元データ z にまで圧縮し、その2次元データから元の画像をdecoder(別のニューラルネット)で復元する。ただし、一度情報を圧縮してしまうので完全に元の画像には戻らず再構成した画像 xhat は入力画像の近似となる。

f:id:aidiary:20180225095657p:plain

さっそくやってみよう。まずはいつもの。

import os
import numpy as np
import torch
import torchvision
from torch import nn
from torch.autograd import Variable
from torch.utils.data import DataLoader
from torchvision import transforms
from torchvision.datasets import MNIST
from torchvision.utils import save_image

cuda = torch.cuda.is_available()
if cuda:
    print('cuda is available!')

num_epochs = 100
batch_size = 128
learning_rate = 0.001
out_dir = './autoencoder'

if not os.path.exists(out_dir):
    os.mkdir(out_dir)

データ変換関数。MNISTは ToTensor() すると [0, 1] になるので0.5を引いて0.5で割って [-1, 1] の範囲に変換する。

img_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))  # [0,1] => [-1,1]
])
train_dataset = MNIST('./data', download=True, transform=img_transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

Autoencoderのモデルを作成。encoderdecoder を定義する。別々に定義しておくとEncoder・Decoderを独立して使えるので便利(あとでEncoderのみ使う例を示す)。

Encoderは普通の多層ニューラルネットで784 => 128 => 64 => 12 => 2 と2次元まで圧縮してみた。2次元まで圧縮すると潜在空間を描画できる。逆にDecoderは2 => 12 => 64 => 128 => 784 と元の画像サイズまで戻す。入力画像は [-1, 1] に標準化したので出力の活性化関数には tanh を使う(tanhは出力の範囲が [-1, 1])。EncoderとDecoderで重みは共有せずにそれぞれ別に学習する*1

class Autoencoder(nn.Module):
    
    def __init__(self):
        super(Autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(28 * 28, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True),
            nn.Linear(64, 12),
            nn.ReLU(True),
            nn.Linear(12, 2))
        
        self.decoder = nn.Sequential(
            nn.Linear(2, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True),
            nn.Linear(128, 28 * 28),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

model = Autoencoder()
if cuda:
    model.cuda()
Autoencoder(
  (encoder): Sequential(
    (0): Linear(in_features=784, out_features=128)
    (1): ReLU(inplace)
    (2): Linear(in_features=128, out_features=64)
    (3): ReLU(inplace)
    (4): Linear(in_features=64, out_features=12)
    (5): ReLU(inplace)
    (6): Linear(in_features=12, out_features=2)
  )
  (decoder): Sequential(
    (0): Linear(in_features=2, out_features=12)
    (1): ReLU(inplace)
    (2): Linear(in_features=12, out_features=64)
    (3): ReLU(inplace)
    (4): Linear(in_features=64, out_features=128)
    (5): ReLU(inplace)
    (6): Linear(in_features=128, out_features=784)
    (7): Tanh()
  )
)

変換したテンソルを元の画像に戻す関数。

def to_img(x):
    x = 0.5 * (x + 1)  # [-1,1] => [0, 1]
    x = x.clamp(0, 1)
    x = x.view(x.size(0), 1, 28, 28)
    return x

訓練ループ。Autoencoderはラベルは使わない。ターゲットは入力画像そのものとなる。Loss関数は入力と出力の間の平均二乗誤差とする。つまり、出力が入力に近づくように学習する

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(),
                             lr=learning_rate,
                             weight_decay=1e-5)

loss_list = []

for epoch in range(num_epochs):
    for data in train_loader:
        img, _ = data
        x = img.view(img.size(0), -1)
        if cuda:
            x = Variable(x).cuda()
        else:
            x = Variable(x)
        
        xhat = model(x)
    
        # 出力画像(再構成画像)と入力画像の間でlossを計算
        loss = criterion(xhat, x)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # logging
        loss_list.append(loss.data[0])
    
    print('epoch [{}/{}], loss: {:.4f}'.format(
        epoch + 1,
        num_epochs,
        loss.data[0]))

    # 10エポックごとに再構成された画像(xhat)を描画する
    if epoch % 10 == 0:
        pic = to_img(xhat.cpu().data)
        save_image(pic, './{}/image_{}.png'.format(out_dir, epoch))

np.save('./{}/loss_list.npy'.format(out_dir), np.array(loss_list))
torch.save(model.state_dict(), './{}/autoencoder.pth'.format(out_dir))

実験結果

学習曲線。普段はエポックごとにロギングするんだけど今回はミニバッチごとにロギングしてた・・・その場合は横軸はepochではなく、iterationになる。

loss_list = np.load('{}/loss_list.npy'.format(out_dir))
plt.plot(loss_list)
plt.xlabel('iteration')
plt.ylabel('loss')
plt.grid()

f:id:aidiary:20180224172928p:plain

Jupyter Notebook上で保存した画像を描画してみよう。今回は再構成した画像のみ。入力画像も一緒に表示したほうがわかりやすかったかも。

from IPython.display import Image
Image('autoencoder/image_0.png')

f:id:aidiary:20180224172609p:plain

Image('autoencoder/image_90.png')

f:id:aidiary:20180224172744p:plain

潜在空間の可視化

テストデータ10000画像をEncoderで潜在空間にマッピングして各画像がどのように分布するか可視化してみよう。ここでは、Encoderしか使わない。先のようにモデルを定義しておくと、model.encoder() でエンコーダだけ呼び出せる

model.load_state_dict(torch.load('{}/autoencoder.pth'.format(out_dir),
                                 map_location=lambda storage,
                                 loc: storage))

test_dataset = MNIST('./data', download=True, train=False, transform=img_transform)
test_loader = DataLoader(test_dataset, batch_size=10000, shuffle=False)

images, labels = iter(test_loader).next()
images = images.view(10000, -1)

# 784次元ベクトルを2次元ベクトルにencode
z = model.encoder(Variable(images, volatile=True)).data.numpy()

zは (10000, 2) となり、784次元の画像が2次元データになっていることがわかる。

import pylab
import matplotlib.pyplot as plt
%matplotlib inline

plt.figure(figsize=(10, 10))
plt.scatter(z[:, 0], z[:, 1], marker='.', c=labels.numpy(), cmap=pylab.cm.jet)
plt.colorbar()
plt.grid()

f:id:aidiary:20180224173419p:plain

各数字のデータがばらついて分布していることがわかる。つまり、2次元の潜在空間上では各数字を分離しやすいことを意味する。

ここでは潜在空間の分布の範囲にも注目!x軸方向が -30〜20 でy軸方向が -40〜40 あたりに散らばっていることがわかる。次回、AutoencoderをVariational Autoencoder (VAE)に拡張する予定だがVAEだと潜在空間が正規分布 N(0, I) で散らばるようになる。

参考

*1:tied weightsというencoderとdecoderが重みを共有する実装もある

PyTorch (9) Transfer Learning (Dogs vs Cats)

前回(2018/2/17)は、アリとハチだったけど、今回はイヌとネコでやってみよう。

180209-dogs-vs-cats.ipynb - Google ドライブ

f:id:aidiary:20180221225005j:plain:medium vs. f:id:aidiary:20180221225052j:plain:medium (*^_^*)

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torchvision
from torchvision import datasets, models, transforms

データ準備

Kaggleのイヌとネコのコンペサイトから画像 train.ziptest.zip をダウンロードする。kaggle-cliというコマンドラインツールを使うと、Kaggleにログインした上でコマンドでデータのダウンロードができるのでとても便利!

各種ディレクトリのパスを定義する。

import os
current_dir = os.getcwd()
data_dir = os.path.join(current_dir, 'data', 'dogscats')
train_dir = os.path.join(data_dir, 'train')
valid_dir = os.path.join(data_dir, 'valid')
test_dir = os.path.join(data_dir, 'test')

下のような感じでデータを解凍。fast.ai の受け売りだけどこういうデータ操作もJupyter Notebook上で記録しておくのはよい習慣*1。Jupyter Notebook上でシェルコマンドを使うには ! をコマンドの前につける。

!mkdir $data_dir
!unzip train.zip -d $data_dir
!unzip test.zip -d $data_dir

訓練画像は25000枚、テスト画像は15000枚。訓練データからランダムに選んだ2000枚をバリデーションデータとする。

!mkdir $valid_dir
%cd $train_dir

import os
from glob import glob
import numpy as np
g = glob('*.jpg')
shuf = np.random.permutation(g)
for i in range(2000):
    os.rename(shuf[i], os.path.join(valid_dir, shuf[i]))

PyTorchで読み込みやすいようにクラスごとにサブディレクトリを作成する。Kaggleのテストデータは正解ラベルがついていないため unknown というサブディレクトリにいれる.

# train
%cd $train_dir
%mkdir cats dogs
%mv cat.*.jpg cats/
%mv dog.*.jpg dogs/

# valid
%cd $valid_dir
%mkdir cats dogs
%mv cat.*.jpg cats/
%mv dog.*.jpg dogs/

# test
%cd $test_dir
%mkdir unknown
%mv *.jpg unknown

最終的にデータはこんな階層構造になる。

data
  dogscats
    train
      cats
        cat.1.jpg
        cat.2.jpg
      dogs
        dog.1.jpg
        dog.2.jpg
    valid
      cats
        cat.10.jpg
        cat.21.jpg
      dogs
        dog.8.jpg
        dog.52.jpg
    test
      unknown
        1.jpg
        2.jpg

VGG16のFine-tuning

前回は、ResNet50を使ったので今回はVGG16でやってみた。

vgg16 = models.vgg16(pretrained=True)
print(vgg16)
VGG(
  (features): Sequential(
    (0): Conv2d (3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace)
    (2): Conv2d (64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace)
    (4): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), dilation=(1, 1))
    (5): Conv2d (64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace)
    (7): Conv2d (128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace)
    (9): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), dilation=(1, 1))
    (10): Conv2d (128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace)
    (12): Conv2d (256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace)
    (14): Conv2d (256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace)
    (16): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), dilation=(1, 1))
    (17): Conv2d (256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (18): ReLU(inplace)
    (19): Conv2d (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (20): ReLU(inplace)
    (21): Conv2d (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (22): ReLU(inplace)
    (23): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), dilation=(1, 1))
    (24): Conv2d (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (25): ReLU(inplace)
    (26): Conv2d (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (27): ReLU(inplace)
    (28): Conv2d (512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (29): ReLU(inplace)
    (30): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), dilation=(1, 1))
  )
  (classifier): Sequential(
    (0): Linear(in_features=25088, out_features=4096)
    (1): ReLU(inplace)
    (2): Dropout(p=0.5)
    (3): Linear(in_features=4096, out_features=4096)
    (4): ReLU(inplace)
    (5): Dropout(p=0.5)
    (6): Linear(in_features=4096, out_features=1000)
  )
)

(classifier)(6) の出力が1000クラスになっているのでここをイヌとネコの2クラスにしたい。最初、下のように (6) Linear だけ置き換えられないかトライしてみたがこれはダメなようだ。

num_features = vgg16.classifier[6].in_features
vgg16.classifier[6] = nn.Linear(num_features, 2)  # <= この代入はできない!

1つ上の (classifier) を丸ごと置き換える必要があるみたい。今回は (classifier) より上のレイヤの重みはすべて固定した。(classifier) のみ学習対象とする。

# 全層のパラメータを固定
for param in vgg16.parameters():
    param.requires_grad = False

vgg16.classifier = nn.Sequential(
    nn.Linear(25088, 4096),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),
    nn.Linear(4096, 4096),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),
    nn.Linear(4096, 2)    
)

use_gpu = torch.cuda.is_available()
if use_gpu:
    vgg16 = vgg16.cuda()

print(vgg16)

データのロード

まずデータ変換関数を定義する。訓練用とテスト用を分けたが同じ内容でOK。train_preprocess の方にデータ拡張を入れるとさらに精度が上がるかも?

train_preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

test_preprocess = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

各ディレクトリから画像をロードするDataSetを作成。

train_dataset = datasets.ImageFolder(train_dir, train_preprocess)
valid_dataset = datasets.ImageFolder(valid_dir, test_preprocess)
test_dataset = datasets.ImageFolder(test_dir, test_preprocess)

下のコードでクラスラベルを確認できる。

classes = train_dataset.classes
print(train_dataset.classes)
print(valid_dataset.classes)
print(test_dataset.classes)
['cats', 'dogs']
['cats', 'dogs']
['unknown']

サブディレクトリの cats がラベル0で dogs がラベル1になっていることがわかる。おそらくアルファベット順にラベルが割り当てられるのだろう。unknownはラベル0だけどこれはテスト時にしか使わないので問題ない。

データをミニバッチ単位で取得するDataLoaderを定義。バッチサイズは128にし、訓練データだけシャッフルする。

train_loader = torch.utils.data.DataLoader(train_dataset,
                                           batch_size=128,
                                           shuffle=True)
valid_loader = torch.utils.data.DataLoader(valid_dataset,
                                           batch_size=128,
                                           shuffle=False)

1バッチだけデータを描画してみよう。

def imshow(images, title=None):
    images = images.numpy().transpose((1, 2, 0))  # (h, w, c)
    # denormalize
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    images = std * images + mean
    images = np.clip(images, 0, 1)
    plt.imshow(images)
    if title is not None:
        plt.title(title)

images, classes = next(iter(train_loader))
print(images.size(), classes.size())
grid = torchvision.utils.make_grid(images[:25], nrow=5)
imshow(grid)

f:id:aidiary:20180221230323p:plain

モデルの訓練

optimizerには、更新対象のパラメータだけ渡す必要があるので注意。requires_grad = False で固定しているパラメータを含むvgg16.parameters()を指定するとエラーになる。

Fine-tuningなので学習率を小さめにしたSGDを使う。

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(vgg16.classifier.parameters(), lr=0.001, momentum=0.9)

訓練とバリデーションの関数を定義。ここはいつもの。

def train(model, criterion, optimizer, train_loader):
    model.train()
    running_loss = 0
    for batch_idx, (images, labels) in enumerate(train_loader):
        if use_gpu:
            images = Variable(images.cuda())
            labels = Variable(labels.cuda())
        else:
            images = Variable(images)
            labels = Variable(labels)
        
        optimizer.zero_grad()
        outputs = model(images)
        
        loss = criterion(outputs, labels)
        running_loss += loss.data[0]
        
        loss.backward()
        optimizer.step()
    
    train_loss = running_loss / len(train_loader)
    
    return train_loss


def valid(model, criterion, valid_loader):
    model.eval()
    running_loss = 0
    correct = 0
    total = 0
    for batch_idx, (images, labels) in enumerate(valid_loader):
        if use_gpu:
            images = Variable(images.cuda(), volatile=True)
            labels = Variable(labels.cuda(), volatile=True)
        else:
            images = Variable(images, volatile=True)
            labels = Variable(labels, volatile=True)

        outputs = model(images)

        loss = criterion(outputs, labels)
        running_loss += loss.data[0]

        _, predicted = torch.max(outputs.data, 1)
        correct += (predicted == labels.data).sum()
        total += labels.size(0)

    val_loss = running_loss / len(valid_loader)
    val_acc = correct / total
    
    return val_loss, val_acc

出力の logs を予め作成しておく。エポックガンガン回したあとに出力先ディレクトリがなくてエラーになると悲惨・・・あとでTensorBoardXを導入してTensorBoardでロギングするとこういうディレクトリ作成がいらなくなる。

%mkdir logs

Fine-tuningなので5エポックと少しだけ回した。

num_epochs = 5
log_dir = './logs'

best_acc = 0
loss_list = []
val_loss_list = []
val_acc_list = []
for epoch in range(num_epochs):
    loss = train(vgg16, criterion, optimizer, train_loader)
    val_loss, val_acc = valid(vgg16, criterion, valid_loader)

    print('epoch %d, loss: %.4f val_loss: %.4f val_acc: %.4f'
          % (epoch, loss, val_loss, val_acc))
    
    if val_acc > best_acc:
        print('val_acc improved from %.5f to %.5f!' % (best_acc, val_acc))
        best_acc = val_acc
        model_file = 'epoch%03d-%.3f-%.3f.pth' % (epoch, val_loss, val_acc)
        torch.save(vgg16.state_dict(), os.path.join(log_dir, model_file))

    # logging
    loss_list.append(loss)
    val_loss_list.append(val_loss)
    val_acc_list.append(val_acc)
epoch 0, loss: 0.1068 val_loss: 0.0346 val_acc: 0.9905
val_acc improved from 0.00000 to 0.99050!
epoch 1, loss: 0.0351 val_loss: 0.0316 val_acc: 0.9925
val_acc improved from 0.99050 to 0.99250!
epoch 2, loss: 0.0267 val_loss: 0.0315 val_acc: 0.9930
val_acc improved from 0.99250 to 0.99300!
epoch 3, loss: 0.0224 val_loss: 0.0317 val_acc: 0.9930
epoch 4, loss: 0.0175 val_loss: 0.0311 val_acc: 0.9920

バリデーションデータに対する認識率は 99.3%。かなり高い!

テストデータに対する評価

最後にテストデータで評価してみよう。Kaggleのテストデータはラベルがついていないので認識率を求められない。なのでいくつかの画像を描画して目視で確認してみた。

まずは学習済みモデルをロード。PyTorchは重みファイルだけ保存するのが推奨になっていたので、対応するモデル構造は予め用意する必要がある。モデルと重みを両方保存することもできるのかな?

weight_file = 'logs/dogsvscats_ft/epoch003-0.032-0.993.pth'

model = models.vgg16(pretrained=False)
model.classifier = nn.Sequential(
    nn.Linear(25088, 4096),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),
    nn.Linear(4096, 4096),
    nn.ReLU(inplace=True),
    nn.Dropout(0.5),
    nn.Linear(4096, 2)    
)

model.load_state_dict(torch.load(weight_file,
                                 map_location=lambda storage,
                                 loc: storage))

テストデータのDataLoaderを作成して最初の128枚の画像をロードする。

test_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=128,
                                          shuffle=False)

images, _ = iter(test_loader).next()
images = Variable(images, volatile=True)
print(images.size())

正しく読めてるか最初の25枚を描画して確認しよう。

imshow(torchvision.utils.make_grid(images.data[:25], nrow=5))

OK。test_loadershuffle=False なので順番は固定のはず。でも順番どおりでないな?と思ったら 1.jpg, 10.jpg, 100.jpg, 1000.jpg のようにアルファベット順に並んでいた・・・

f:id:aidiary:20180221231251p:plain

ネットワークに通して予測しよう。

outputs = model(images)
_, predicted = torch.max(outputs.data, 1)

predicted には0(ネコ)、1(イヌ)の予測ラベルが入っている。

print(predicted.numpy())
array([0, 1, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 0, 1, 0, 1, 0,
       1, 1, 1, 0, 0, 1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 1, 1,
       1, 0, 0, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 0, 0, 1])

numpy() してTensorからndarrayに戻したのは出力が上のようにコンパクトに表示されるからで深い意味はない。

この結果から1(イヌ)と予測された画像だけ取り出すには下のように書けばよい。

pred_dogs_images = images[predicted.nonzero().view(-1), :, :, :]
imshow(torchvision.utils.make_grid(pred_dogs_images.data[:25], nrow=5))
  • predicted.nonzero() で0でない(つまり1)のインデックスが取得できる
  • ただし、サイズが (62, 1) のように2D tensorで返ってくるので、view(-1) で1Dtensorにしてからインデキシング

f:id:aidiary:20180221231957p:plain

逆に0(ネコ)と予測された画像だけ取り出すには下のように書けばよい。

pred_cats_images = images[(predicted != 1).nonzero().view(-1), :, :, :]
imshow(torchvision.utils.make_grid(pred_cats_images.data[:25], nrow=5))

f:id:aidiary:20180221232010p:plain

テストデータでもかなりの精度で予測できていることがわかる。

参考

*1:ノートブック再度開いたときにうっかりセル実行しちゃうと大惨事だけど