人工知能に関する断創録

人工知能、認知科学、心理学、ロボティクス、生物学などに興味を持っています。このブログでは人工知能のさまざまな分野について調査したことをまとめています。最近は、機械学習、Deep Learning、Keras、PyTorchに関する記事が多いです。



Echo Sequence Prediction Problem

Long Short-Term Memory Networks With Python(2018/8/20)のつづき。

今回は、Echo Sequence Prediction Problemという単純なタスクを対象にKerasとPyTorchのVanilla LSTMの実装方法を比較してみます。

Echo Sequence PredictionProblem

Echo Sequence Prediction Problemとは、ランダムな整数の系列を入力とし、入力系列の特定の時刻の値を出力する単純なタスクです。時刻はネットワークへの入力とせずに固定します。例えば、入力系列として [5, 3, 2] を入力とした場合、時刻1の要素を返すモデルは 3 を出力します。

f:id:aidiary:20180827200522p:plain

系列を入力として、要素を1つだけ返すのでMany-to-one型のLSTMです。こんなモデルが何の役に立つのか?というと・・・何の役にもたちません(笑)練習用のタスクです。

KerasによるLSTMの実装

先にKerasで実装してみます。まず、必要なライブラリをインポート。

from random import randint
import numpy as np

from keras.models import Sequential
from keras.layers import LSTM, Dense

import matplotlib.pyplot as plt

次に長さが length の数字系列を生成します。

def generate_sequence(length, n_features):
    """長さがlengthで、ランダムな整数がn_featuresまでの系列を1つ生成する"""
    return [randint(0, n_features - 1) for _ in range(length)]

先の図ではネットワークへの入力も出力も整数値を使っていましたが、Kerasでは入力も出力もone-hot encodingして与えます。

def one_hot_encode(sequence, n_features):
    encoding = list()
    for value in sequence:
        vector = [0 for _ in range(n_features)]
        vector[value] = 1
        encoding.append(vector)
    return np.array(encoding)

def one_hot_decode(encoded_seq):
    return [np.argmax(vector) for vector in encoded_seq]

例えば、系列長が5で0から9までの乱数系列を生成したいときは下のようにします。

% sequence = generate_sequence(5, 10)
[4, 6, 1, 7, 3]

% encoded = one_hot_encode(sequence, 10)
[[0 0 0 0 1 0 0 0 0 0]
 [0 0 0 0 0 0 1 0 0 0]
 [0 1 0 0 0 0 0 0 0 0]
 [0 0 0 0 0 0 0 1 0 0]
 [0 0 0 1 0 0 0 0 0 0]]

% decoded = one_hot_decode(encoded)
[4, 6, 1, 7, 3]

例えば、0-9までの10個の数字を入力とする場合は、one-hotベクトル化すると10次元の特徴量ベクトルになります。

次に訓練データを生成する関数を実装します。

def generate_example(length, n_features, out_index):
    # 訓練データを1サンプル(1系列)だけ生成する
    sequence = generate_sequence(length, n_features)
    encoded = one_hot_encode(sequence, n_features)
    X = encoded.reshape((1, length, n_features))
    y = encoded[out_index].reshape(1, n_features)
    return X, y

KerasのLSTMへの入力は3Dテンソルで与える必要があります。ドキュメントを見ると入力テンソルXは (batch_size, timesteps, input_dim) で与えると書いてあります。また、出力テンソルyは (batch_size, units) で与えると書いてあります。

今回は簡単のためバッチサイズは1で固定です。例えば、バッチサイズ1、系列長が5、特徴量の次元は10とすると入力系列Xと出力yは下のようになります。

% X, y = generate_example(5, 10, 2)
% print(X.shape, y.shape)
(1, 5, 10) (1, 10)
% print(X)
[[[0 0 0 1 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0 0 1]
  [0 0 0 0 0 1 0 0 0 0]
  [0 0 1 0 0 0 0 0 0 0]
  [0 0 0 0 0 0 0 0 1 0]]]
% print(y)
[[0 0 0 0 0 1 0 0 0 0]]

この例では、入力系列が [3, 9, 5, 2, 8] で出力は2番目(インデックスは0から)を返すので 5 となります。

Model

LSTM層が1つ、分類のDense層が1つという単純なモデルを実装します。 LSTM層は系列データをまとめて入力することで、内部の隠れ状態が更新されていきます。KerasのLSTMのデフォルト設定では先のMany-to-oneモデルのように系列を全て入れたあとの最後の隠れ状態が出力されます。その隠れ状態を0-9の10分類のDense層に入力し、活性化関数をsoftmaxとすることで確率に変換します。

f:id:aidiary:20180827203522p:plain:h320

今回の実験では、長さ5の系列を入れて2番目の要素を出力するようにします。LSTMの隠れ状態は25次元です。

# length = 5
out_index = 2  # echo sequence predictionで入力の何番目の要素を返すか
n_features = 10
hidden_size = 25

model = Sequential()
model.add(LSTM(25, input_shape=(length, n_features)))
model.add(Dense(n_features, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc'])
model.summary()
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_3 (LSTM)                (None, 25)                3600      
_________________________________________________________________
dense_3 (Dense)              (None, 10)                260       
=================================================================
Total params: 3,860
Trainable params: 3,860
Non-trainable params: 0
_________________________________________________________________

Train

最後にモデルの訓練コードです。

losses = []
for i in range(10000):
    X, y = generate_example(length, n_features, out_index)
    history = model.fit(X, y, epochs=1, verbose=0)
    losses.append(history.history['loss'][0])
plt.plot(losses)

今回は、各エポックで系列データを1つだけ生成し(ミニバッチ数1)、学習します。lossを下のようにエポックが進むにつれて下がることがわかります。

f:id:aidiary:20180827204410p:plain

Evaluate

精度評価用のコードです。系列を100個生成して、正解と一致するか判定します。

# evaluate model
correct = 0
for i in range(100):
    X, y = generate_example(length, n_features, out_index)
    yhat = model.predict(X)
    if one_hot_decode(yhat) == one_hot_decode(y):
        correct += 1
print('Accuracy: %f' % ((correct / 100) * 100.0))

精度は100%です。

Accuracy: 100.000000

Predict

最後に予測用のコードです。

# predict on new data
X, y = generate_example(length, n_features, out_index)
yhat = model.predict(X)
print('Sequence: %s' % [one_hot_decode(x) for x in X])
print('Expected: %s' % one_hot_decode(y))
print('Predicted: %s' % one_hot_decode(yhat))
Sequence: [[7, 5, 1, 3, 6]]
Expected: [1]
Predicted: [1]

入力系列が [7, 5, 1, 3, 6] で2番目を返すように訓練したモデルなので正解は1です。予測も1で正解なことがわかります。

PyTorchによるLSTMの実装

次に同じタスクをPyTorchで実装してみます。

from random import randint
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

cuda = torch.cuda.is_available()
if cuda:
    print('cuda available')
device = torch.device('cuda' if cuda else 'cpu')

次にデータを生成する関数です。ここはKerasとちょっと違います。

def generate_example(length, n_features, out_index):
    sequence = generate_sequence(length, n_features)
    encoded = one_hot_encode(sequence, n_features)
    
    # ndarray => tensor
    # PyTorchでは入力はfloatにする必要あり
    encoded = torch.from_numpy(encoded).float()
    
    # LSTMへの入力は3Dテンソル (seq_len, batch, input_size)
    X = encoded.view(length, 1, n_features)

    # out_index番目の入力を出力するようにする
    # PyTorchは出力はone-hotではなくラベルそのものを返す
    y = torch.Tensor([sequence[out_index]]).long()

    return X, y

Kerasとの違いをまとめると

  • 入力はone-hot encodingしますが、出力側の分類ラベルはone-hot encodingする必要がありません
  • 入力テンソルはfloatで出力テンソルはlongに変換する必要があります
  • PyTorchのLSTMへの入力はKerasと同じく3Dテンソルですが、次元の順番が (seq_len, batch, input_size) となります。Kerasと異なり、系列長とバッチサイズが入れ替わります
% generate_example(5, 10, 2)
(tensor([[[ 0.,  1.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.]],
 
         [[ 1.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.]],
 
         [[ 0.,  0.,  0.,  0.,  0.,  1.,  0.,  0.,  0.,  0.]],
 
         [[ 0.,  0.,  0.,  0.,  0.,  1.,  0.,  0.,  0.,  0.]],
 
         [[ 1.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.]]]), tensor([ 5]))

Model

PyTorchの nn.LSTM もKerasと同じく、系列をまとめて入力して処理します。KerasのLSTMは最後の系列要素を与えた時の隠れ状態だけを出力するのに対し、PyTorchのLSTMは入力系列の各要素に対する隠れ状態を全て出力します(Kerasで同じことをするにはLSTMの引数として return_sequences = True をセットします)。また、 隠れ状態(LSTMの場合はhidden stateとcell stateの2つ)は明示的に与える必要があります

# model
length = 5
n_features = 10
hidden_size = 25
out_index = 2

class EchoSequencePredictionModel(nn.Module):
    
    def __init__(self, input_size, hidden_size, target_size):
        super(EchoSequencePredictionModel, self).__init__()
        self.hidden_size = hidden_size
        self.lstm = nn.LSTM(input_size, hidden_size)
        self.out = nn.Linear(hidden_size, target_size)
        self.softmax = nn.LogSoftmax(dim=2)

    def forward(self, input, h, c):
        output, (h, c) = self.lstm(input, (h, c))
        output = self.out(output)
        output = self.softmax(output)
        return output, (h, c)

    def init_hidden(self):
        # (num_layers, batch, hidden_size)
        h = torch.zeros(1, 1, self.hidden_size).to(device)
        c = torch.zeros(1, 1, self.hidden_size).to(device)
        return h, c

model = EchoSequencePredictionModel(n_features, hidden_size, n_features).to(device)
print(model)
EchoSequencePredictionModel(
  (lstm): LSTM(10, 25)
  (out): Linear(in_features=25, out_features=10, bias=True)
  (softmax): LogSoftmax()
)

Train

訓練用のコードです。

criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

losses = []

# training
for i in range(10000):
    # 系列データを1つ生成
    X, y = generate_example(length, n_features, out_index)

    # 系列を1つ入力してパラメータ更新したら勾配はリセット
    model.zero_grad()
    
    # 新しい系列を入力するたびに隠れ状態(hとc)はリセット
    # LSTMのパラメータは更新されたまま残る
    h0, c0 = model.init_hidden()

    # 系列を入力して出力系列を求める
    # output: (seq_len, batch, hidden_size)
    output, (h, c) = model(X, h0, c0)

    # many-to-oneのタスクなので出力系列の最後の要素のみ使う
    loss = criterion(output[-1], y)
    losses.append(loss.item())
    loss.backward()
    optimizer.step()

plt.plot(losses)
  • モデルの出力に LogSoftmax()を使っているため交差エントロピー誤差を使う場合は、nn.NLLLoss() を設定します
  • 系列を1つ入力して optimizer.step() でパラメータを更新したら、隠れ状態はリセットして、新しい系列を入力する準備をします
  • Many-to-oneモデルで系列の最後の出力のみを使う場合は output[-1] とし、それをLinear層に渡して分類します

この図がわかりやすいです。

f:id:aidiary:20180827210908p:plain

Kerasのときと同じようなlossが得られます。

f:id:aidiary:20180827211135p:plain

Predict

予測用のコードです。

X, y = generate_example(length, n_features, out_index)
h0, c0 = model.init_hidden()
yhat = model(X, h0, c0)

print('input sequence:', [one_hot_decode(x) for x in X])
print('expected:', y)
print('predicted:', torch.argmax(yhat[0][-1]))
input sequence: [[tensor(5)], [tensor(0)], [tensor(4)], [tensor(8)], [tensor(7)]]
expected: tensor([ 4])
predicted: tensor(4)

入力系列が [5, 0, 4, 8, 7] で2番目を出力するモデルなので4が正解、予測も4なので合っています。

実験

最後に、いくつか実験してみました。

入力系列が長くなるとどうなるか?

def moving_average(data_set, periods=3):
    weights = np.ones(periods) / periods
    return np.convolve(data_set, weights, mode='valid')

# model
n_features = 10
hidden_size = 25
out_index = 2

for length in range(5, 11):
    model = EchoSequencePredictionModel(n_features, hidden_size, n_features).to(device)
    criterion = nn.NLLLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    losses = []

    # training
    for i in range(10000):
        # 系列データを1つ生成
        X, y = generate_example(length, n_features, out_index)

        # 系列を1つ入力してパラメータ更新したら勾配はリセット
        model.zero_grad()

        # 新しい系列を入力するたびに隠れ状態(hとc)はリセット
        # LSTMのパラメータは更新されたまま残る
        h0, c0 = model.init_hidden()

        # 系列を入力して出力系列を求める
        # output: (seq_len, batch, hidden_size)
        output, (h, c) = model(X, h0, c0)

        # many-to-oneのタスクなので出力系列の最後の要素のみ使う
        loss = criterion(output[-1], y)
        losses.append(loss.item())

        loss.backward()
        optimizer.step()

    losses = moving_average(losses, 100)
    plt.title('n_features=%d, hidden_size=%d, out_index=%d' % (n_features, hidden_size, out_index))
    plt.plot(losses, label='length=%d' % length)
    plt.legend()

f:id:aidiary:20180827211530p:plain

入力系列が長いほどlossが下がらず、予測が難しいことがわかります。

以下、コードは似たような感じなので省略します。

予測の場所が系列の最後より遠い方が難しい?

f:id:aidiary:20180827211637p:plain

そのような傾向はあまりなさそう?

LSTMの隠れ状態のサイズを変えると?

f:id:aidiary:20180827211806p:plain

大きすぎても小さすぎてもダメ。

入力特徴量の次元数を変えると?

f:id:aidiary:20180827211901p:plain

次元数が大きいほど問題が難しくなるのでlossが下がりにくくなる。