Echo Sequence Prediction Problem
Long Short-Term Memory Networks With Python(2018/8/20)のつづき。
今回は、Echo Sequence Prediction Problemという単純なタスクを対象にKerasとPyTorchのVanilla LSTMの実装方法を比較してみます。
Echo Sequence PredictionProblem
Echo Sequence Prediction Problemとは、ランダムな整数の系列を入力とし、入力系列の特定の時刻の値を出力する単純なタスクです。時刻はネットワークへの入力とせずに固定します。例えば、入力系列として [5, 3, 2] を入力とした場合、時刻1の要素を返すモデルは 3 を出力します。
系列を入力として、要素を1つだけ返すのでMany-to-one型のLSTMです。こんなモデルが何の役に立つのか?というと・・・何の役にもたちません(笑)練習用のタスクです。
KerasによるLSTMの実装
先にKerasで実装してみます。まず、必要なライブラリをインポート。
from random import randint import numpy as np from keras.models import Sequential from keras.layers import LSTM, Dense import matplotlib.pyplot as plt
次に長さが length
の数字系列を生成します。
def generate_sequence(length, n_features): """長さがlengthで、ランダムな整数がn_featuresまでの系列を1つ生成する""" return [randint(0, n_features - 1) for _ in range(length)]
先の図ではネットワークへの入力も出力も整数値を使っていましたが、Kerasでは入力も出力もone-hot encodingして与えます。
def one_hot_encode(sequence, n_features): encoding = list() for value in sequence: vector = [0 for _ in range(n_features)] vector[value] = 1 encoding.append(vector) return np.array(encoding) def one_hot_decode(encoded_seq): return [np.argmax(vector) for vector in encoded_seq]
例えば、系列長が5で0から9までの乱数系列を生成したいときは下のようにします。
% sequence = generate_sequence(5, 10) [4, 6, 1, 7, 3] % encoded = one_hot_encode(sequence, 10) [[0 0 0 0 1 0 0 0 0 0] [0 0 0 0 0 0 1 0 0 0] [0 1 0 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 1 0 0] [0 0 0 1 0 0 0 0 0 0]] % decoded = one_hot_decode(encoded) [4, 6, 1, 7, 3]
例えば、0-9までの10個の数字を入力とする場合は、one-hotベクトル化すると10次元の特徴量ベクトルになります。
次に訓練データを生成する関数を実装します。
def generate_example(length, n_features, out_index): # 訓練データを1サンプル(1系列)だけ生成する sequence = generate_sequence(length, n_features) encoded = one_hot_encode(sequence, n_features) X = encoded.reshape((1, length, n_features)) y = encoded[out_index].reshape(1, n_features) return X, y
KerasのLSTMへの入力は3Dテンソルで与える必要があります。ドキュメントを見ると入力テンソルXは (batch_size, timesteps, input_dim)
で与えると書いてあります。また、出力テンソルyは (batch_size, units)
で与えると書いてあります。
今回は簡単のためバッチサイズは1で固定です。例えば、バッチサイズ1、系列長が5、特徴量の次元は10とすると入力系列Xと出力yは下のようになります。
% X, y = generate_example(5, 10, 2) % print(X.shape, y.shape) (1, 5, 10) (1, 10) % print(X) [[[0 0 0 1 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 0 1] [0 0 0 0 0 1 0 0 0 0] [0 0 1 0 0 0 0 0 0 0] [0 0 0 0 0 0 0 0 1 0]]] % print(y) [[0 0 0 0 0 1 0 0 0 0]]
この例では、入力系列が [3, 9, 5, 2, 8] で出力は2番目(インデックスは0から)を返すので 5 となります。
Model
LSTM層が1つ、分類のDense層が1つという単純なモデルを実装します。 LSTM層は系列データをまとめて入力することで、内部の隠れ状態が更新されていきます。KerasのLSTMのデフォルト設定では先のMany-to-oneモデルのように系列を全て入れたあとの最後の隠れ状態が出力されます。その隠れ状態を0-9の10分類のDense層に入力し、活性化関数をsoftmaxとすることで確率に変換します。
今回の実験では、長さ5の系列を入れて2番目の要素を出力するようにします。LSTMの隠れ状態は25次元です。
# length = 5 out_index = 2 # echo sequence predictionで入力の何番目の要素を返すか n_features = 10 hidden_size = 25 model = Sequential() model.add(LSTM(25, input_shape=(length, n_features))) model.add(Dense(n_features, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['acc']) model.summary()
_________________________________________________________________ Layer (type) Output Shape Param # ================================================================= lstm_3 (LSTM) (None, 25) 3600 _________________________________________________________________ dense_3 (Dense) (None, 10) 260 ================================================================= Total params: 3,860 Trainable params: 3,860 Non-trainable params: 0 _________________________________________________________________
Train
最後にモデルの訓練コードです。
losses = [] for i in range(10000): X, y = generate_example(length, n_features, out_index) history = model.fit(X, y, epochs=1, verbose=0) losses.append(history.history['loss'][0]) plt.plot(losses)
今回は、各エポックで系列データを1つだけ生成し(ミニバッチ数1)、学習します。lossを下のようにエポックが進むにつれて下がることがわかります。
Evaluate
精度評価用のコードです。系列を100個生成して、正解と一致するか判定します。
# evaluate model correct = 0 for i in range(100): X, y = generate_example(length, n_features, out_index) yhat = model.predict(X) if one_hot_decode(yhat) == one_hot_decode(y): correct += 1 print('Accuracy: %f' % ((correct / 100) * 100.0))
精度は100%です。
Accuracy: 100.000000
Predict
最後に予測用のコードです。
# predict on new data X, y = generate_example(length, n_features, out_index) yhat = model.predict(X) print('Sequence: %s' % [one_hot_decode(x) for x in X]) print('Expected: %s' % one_hot_decode(y)) print('Predicted: %s' % one_hot_decode(yhat))
Sequence: [[7, 5, 1, 3, 6]] Expected: [1] Predicted: [1]
入力系列が [7, 5, 1, 3, 6] で2番目を返すように訓練したモデルなので正解は1です。予測も1で正解なことがわかります。
PyTorchによるLSTMの実装
次に同じタスクをPyTorchで実装してみます。
from random import randint import numpy as np import torch import torch.nn as nn import torch.optim as optim cuda = torch.cuda.is_available() if cuda: print('cuda available') device = torch.device('cuda' if cuda else 'cpu')
次にデータを生成する関数です。ここはKerasとちょっと違います。
def generate_example(length, n_features, out_index): sequence = generate_sequence(length, n_features) encoded = one_hot_encode(sequence, n_features) # ndarray => tensor # PyTorchでは入力はfloatにする必要あり encoded = torch.from_numpy(encoded).float() # LSTMへの入力は3Dテンソル (seq_len, batch, input_size) X = encoded.view(length, 1, n_features) # out_index番目の入力を出力するようにする # PyTorchは出力はone-hotではなくラベルそのものを返す y = torch.Tensor([sequence[out_index]]).long() return X, y
Kerasとの違いをまとめると
- 入力はone-hot encodingしますが、出力側の分類ラベルはone-hot encodingする必要がありません
- 入力テンソルはfloatで出力テンソルはlongに変換する必要があります
- PyTorchのLSTMへの入力はKerasと同じく3Dテンソルですが、次元の順番が
(seq_len, batch, input_size)
となります。Kerasと異なり、系列長とバッチサイズが入れ替わります
% generate_example(5, 10, 2) (tensor([[[ 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.]], [[ 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.]], [[ 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.]], [[ 0., 0., 0., 0., 0., 1., 0., 0., 0., 0.]], [[ 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.]]]), tensor([ 5]))
Model
PyTorchの nn.LSTM
もKerasと同じく、系列をまとめて入力して処理します。KerasのLSTMは最後の系列要素を与えた時の隠れ状態だけを出力するのに対し、PyTorchのLSTMは入力系列の各要素に対する隠れ状態を全て出力します(Kerasで同じことをするにはLSTMの引数として return_sequences = True
をセットします)。また、 隠れ状態(LSTMの場合はhidden stateとcell stateの2つ)は明示的に与える必要があります。
# model length = 5 n_features = 10 hidden_size = 25 out_index = 2 class EchoSequencePredictionModel(nn.Module): def __init__(self, input_size, hidden_size, target_size): super(EchoSequencePredictionModel, self).__init__() self.hidden_size = hidden_size self.lstm = nn.LSTM(input_size, hidden_size) self.out = nn.Linear(hidden_size, target_size) self.softmax = nn.LogSoftmax(dim=2) def forward(self, input, h, c): output, (h, c) = self.lstm(input, (h, c)) output = self.out(output) output = self.softmax(output) return output, (h, c) def init_hidden(self): # (num_layers, batch, hidden_size) h = torch.zeros(1, 1, self.hidden_size).to(device) c = torch.zeros(1, 1, self.hidden_size).to(device) return h, c model = EchoSequencePredictionModel(n_features, hidden_size, n_features).to(device) print(model)
EchoSequencePredictionModel( (lstm): LSTM(10, 25) (out): Linear(in_features=25, out_features=10, bias=True) (softmax): LogSoftmax() )
Train
訓練用のコードです。
criterion = nn.NLLLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) losses = [] # training for i in range(10000): # 系列データを1つ生成 X, y = generate_example(length, n_features, out_index) # 系列を1つ入力してパラメータ更新したら勾配はリセット model.zero_grad() # 新しい系列を入力するたびに隠れ状態(hとc)はリセット # LSTMのパラメータは更新されたまま残る h0, c0 = model.init_hidden() # 系列を入力して出力系列を求める # output: (seq_len, batch, hidden_size) output, (h, c) = model(X, h0, c0) # many-to-oneのタスクなので出力系列の最後の要素のみ使う loss = criterion(output[-1], y) losses.append(loss.item()) loss.backward() optimizer.step() plt.plot(losses)
- モデルの出力に
LogSoftmax()
を使っているため交差エントロピー誤差を使う場合は、nn.NLLLoss()
を設定します - 系列を1つ入力して
optimizer.step()
でパラメータを更新したら、隠れ状態はリセットして、新しい系列を入力する準備をします - Many-to-oneモデルで系列の最後の出力のみを使う場合は
output[-1]
とし、それをLinear層に渡して分類します
この図がわかりやすいです。
Kerasのときと同じようなlossが得られます。
Predict
予測用のコードです。
X, y = generate_example(length, n_features, out_index) h0, c0 = model.init_hidden() yhat = model(X, h0, c0) print('input sequence:', [one_hot_decode(x) for x in X]) print('expected:', y) print('predicted:', torch.argmax(yhat[0][-1]))
input sequence: [[tensor(5)], [tensor(0)], [tensor(4)], [tensor(8)], [tensor(7)]] expected: tensor([ 4]) predicted: tensor(4)
入力系列が [5, 0, 4, 8, 7] で2番目を出力するモデルなので4が正解、予測も4なので合っています。
実験
最後に、いくつか実験してみました。
入力系列が長くなるとどうなるか?
def moving_average(data_set, periods=3): weights = np.ones(periods) / periods return np.convolve(data_set, weights, mode='valid') # model n_features = 10 hidden_size = 25 out_index = 2 for length in range(5, 11): model = EchoSequencePredictionModel(n_features, hidden_size, n_features).to(device) criterion = nn.NLLLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) losses = [] # training for i in range(10000): # 系列データを1つ生成 X, y = generate_example(length, n_features, out_index) # 系列を1つ入力してパラメータ更新したら勾配はリセット model.zero_grad() # 新しい系列を入力するたびに隠れ状態(hとc)はリセット # LSTMのパラメータは更新されたまま残る h0, c0 = model.init_hidden() # 系列を入力して出力系列を求める # output: (seq_len, batch, hidden_size) output, (h, c) = model(X, h0, c0) # many-to-oneのタスクなので出力系列の最後の要素のみ使う loss = criterion(output[-1], y) losses.append(loss.item()) loss.backward() optimizer.step() losses = moving_average(losses, 100) plt.title('n_features=%d, hidden_size=%d, out_index=%d' % (n_features, hidden_size, out_index)) plt.plot(losses, label='length=%d' % length) plt.legend()
入力系列が長いほどlossが下がらず、予測が難しいことがわかります。
以下、コードは似たような感じなので省略します。
予測の場所が系列の最後より遠い方が難しい?
そのような傾向はあまりなさそう?
LSTMの隠れ状態のサイズを変えると?
大きすぎても小さすぎてもダメ。
入力特徴量の次元数を変えると?
次元数が大きいほど問題が難しくなるのでlossが下がりにくくなる。