人工知能に関する断創録

人工知能、認知科学、心理学、ロボティクス、生物学などに興味を持っています。このブログでは人工知能のさまざまな分野について調査したことをまとめています。最近は、機械学習、Deep Learning、Keras、PyTorchに関する記事が多いです。



Damped Sine Wave Prediction Problem

Long Short-Term Memory Networks With Python(2018/8/20)のつづき。

今回は、Damped Sine Wave Prediction Problemという時系列予測のタスクを対象にKerasとPyTorchでStacked LSTMの実装方法を比較してみます。

減衰するサイン波を系列データとみなして、先の系列を予測するタスクです。例えば、下のような減衰サイン波に対して、時刻0から80までの系列を入力したときに、系列80から100の値を予測します。サイン波の周期や減衰率はランダムで与えます。

f:id:aidiary:20180902203854p:plain:w480

系列を入力して、系列を出力するseq2seq(Many-to-many)のタスクのようですが、出力を系列ではなくて特徴量とみなすことでMany-to-oneのタスクとみなすことができます。

f:id:aidiary:20180902201753p:plain:w480

1次元特徴量の長さ3の系列を入力して、1次元特徴量の長さ2の系列を出力するのではなく、2次元特徴量を出力するとみなします。

KerasによるStacked LSTMの実装

KerasのLSTMはデフォルト設定では、出力が (batch_size, units) となり、全ての系列を入力した後の最後の隠れ状態しか出力されません(上の図のLSTM2)。

# (timesteps, input_dim)
# 長さ3で特徴量の次元数が1の系列データを入力(ミニバッチサイズは省略)
model = Sequential()
model.add(LSTM(1, input_shape=(3, 1)))
model.compile(optimizer='adam', loss='mse')

# (nb_samples, timesteps, input_dim)の3Dテンソルに変換
data = np.array([0.1, 0.2, 0.3]).reshape((1, 3, 1))

# (nb_samples, output_dim)
result = model.predict(data)

print(result.shape)

この例では、系列長が3で特徴量の次元数が1の系列を入力し、隠れ状態のユニット数が1のLSTMを使っています。出力のサイズは (batch_size, units) = (1, 1) となり、予想通り最後の隠れ状態だけが出力されます。

KerasのLSTMで入力系列のそれぞれの要素を入力した時の隠れ状態を全て出力するには return_sequences=True を指定します。すると、LSTMの出力は (batch_size, timesteps, units) となります(上の図のLSTM1)。

model = Sequential()
model.add(LSTM(1, return_sequences=True, input_shape=(3, 1)))
model.compile(optimizer='adam', loss='mse')
data = np.array([0.1, 0.2, 0.3]).reshape((1, 3, 1))
result = model.predict(data)
print(result.shape)

この例では、出力は (batch_size, timesteps, units) = (1, 3, 1) となり、入力系列長と同じ長さの出力系列長が得られます。

Stacked LSTMは下の図のようにLSTM層を積み上げます。このとき、途中にあるLSTM層は return_sequences=True を指定して、入力系列の各要素に対する出力を全て出す必要があります。

f:id:aidiary:20180901192557p:plain:h480

なので、Stacked LSTMを実装するときは

model = Sequential()
model.add(LSTM(..., return_sequences=True, input_shape=(...)))
model.add(LSTM(..., return_sequences=True, input_shape=(...)))
model.add(LSTM(..., return_sequences=True, input_shape=(...)))
model.add(LSTM(...))
model.add(Dense(...))

のようになります。今回のタスクは、Meny-to-one型なので最後のLSTMは、return_sequences=False にして最後の隠れ状態のみを取り出します。

さっそく、実装してみます。まずは必要なライブラリをimport。

from keras.models import Sequential
from keras.layers import LSTM, Dense
import numpy as np
import random

次に減衰サイン波系列を1つだけ出力する関数を実装します。

def generate_sequence(length, period, decay):
    return [0.5 + 0.5 * math.sin(2 * math.pi * i / period) * math.exp(-decay * i) for i in range(length)]

系列長と周期と減衰率を指定するとサイン波が生成されます。

sequence = generate_sequence(100, 10, 0.05)
plt.plot(sequence)
plt.show()

f:id:aidiary:20180901182606p:plain

長さlengthのランダムな減衰サイン波系列をn_patterns個生成する関数です。

# periodとdecayはランダムな系列を生成する
# 長さがlengthでテスト用に長さがoutputの系列を付け加える
def generate_examples(length, n_patterns, output):
    X, y = list(), list()
    for _ in range(n_patterns):
        p = random.randint(10, 20)
        d = random.uniform(0.01, 0.1)
        sequence = generate_sequence(length + output, p, d)
        X.append(sequence[:-output])
        y.append(sequence[-output:])
    # input: (nb_samples, timesteps, input_dim)
    X = np.array(X).reshape(n_patterns, length, 1)
    # output: (nb_samples, output_dim)
    y = np.array(y).reshape(n_patterns, output)
    return X, y

試しに長さが50の系列を5個生成します。テスト用に継続する5の系列も作成します。

# 
X, y = generate_examples(50, 5, 5)
print(X.shape, y.shape)

描画して確認。

for i in range(len(X)):
    plt.plot([x for x in X[i, :, 0]] + [x for x in y[i]], '-o')

f:id:aidiary:20180901192127p:plain

Model

今回は、LSTMを積み上げて多層化するStacked LSTMを使います。多層パーセプトロンのように層を積み上げることで入力データを変換していくように、LSTM層を積み上げて時系列データを変換していくイメージです。多層パーセプトロンで層を深くすると表現力が上がるように、LSTMを多数積み上げたモデルも時系列データの表現力が上がります。

サイン波は1次元系列なので特徴量の次元数は1です。隠れ状態は20次元とします。

length = 50
output = 5

model = Sequential()
model.add(LSTM(20, return_sequences=True, input_shape=(length, 1)))
model.add(LSTM(20))
model.add(Dense(output))
model.compile(loss='mae', optimizer='adam')
model.summary()

損失関数は、元の実装に合わせてMean Absolute Error (MAE) を使いましたが、MSEでもよいと思います。

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_9 (LSTM)                (None, 50, 20)            1760      
_________________________________________________________________
lstm_10 (LSTM)               (None, 20)                3280      
_________________________________________________________________
dense_3 (Dense)              (None, 5)                 105       
=================================================================
Total params: 5,145
Trainable params: 5,145
Non-trainable params: 0
_________________________________________________________________

Train

訓練用のコードです。全部で10000個のサイン波系列を生成し、ミニバッチ数10で1エポックだけ訓練します。

X, y = generate_examples(length, 10000, output)
model.fit(X, y, batch_size=10, epochs=1)
Epoch 1/1
10000/10000 [==============================] - 70s 7ms/step - loss: 0.0487

訓練が進むにつれて損失が小さくなることがわかります。

Evaluate

ランダムに1000個の系列をさらに生成して損失関数を評価します。MAE: 0.022444 なのでそこそこよい感じです。

X, y = generate_examples(length, 1000, output)
loss = model.evaluate(X, y, verbose=0)
print('MAE: %f' % loss)

Predict

MAEだけ見てもわかりにくいので実際に描画して確認します。正解の系列yと予測系列yhatはほぼ一致していて正しく予測できていることがわかります。縦軸のスケールをylimで指定しないとかなりズームされてしまって一致しているかわかりにくくなるので要注意!全然ずれてしまってしばらく悩んでました(笑)

X, y = generate_examples(length, 1, output)
yhat = model.predict(X, verbose=0)
plt.plot(y[0], label='y')
plt.plot(yhat[0], label='yhat')
plt.ylim((0.0, 1.0))
plt.legend()
plt.show()

f:id:aidiary:20180901214629p:plain

PyTorchによるStacked LSTMの実装

次は、PyTorchで同じのを実装してみます!ここからが本番。

import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

cuda = torch.cuda.is_available()
if cuda:
    print('cuda available')
device = torch.device('cuda' if cuda else 'cpu')

def generate_sequence(length, period, decay):
    return [0.5 + 0.5 * math.sin(2 * math.pi * i / period) * math.exp(-decay * i) for i in range(length)]

def generate_examples(length, n_patterns, output):
    X, y = list(), list()
    for _ in range(n_patterns):
        p = random.randint(10, 20)
        d = random.uniform(0.01, 0.1)
        sequence = generate_sequence(length + output, p, d)
        X.append(sequence[:-output])
        y.append(sequence[-output:])

    # numpy to tensor
    # regressionタスクなので入力・出力ともにfloat
    X = torch.from_numpy(np.array(X)).float()
    y = torch.from_numpy(np.array(y)).float()

    # input: (seq_len, batch, input_size)
    X = X.view(length, n_patterns, 1)

    # output: (batch, seq_len)
    y = y.view(n_patterns, output)

    return X, y

Kerasと似てますが、PyTorchではnumpy arrayをtorch.Tensorに変換する必要があります。今回はregressionのタスクなので入力、出力ともfloatにします。また、入力の3DテンソルはKerasと順番が違って (seq_len, batch, input_size) になります。batchとseq_lenが入れ替わります。

Model

Stacked LSTMをPyTorchで実装するのは簡単です。Kerasのように自分でLSTMオブジェクトを複数積み上げる必要はありません。LSTMの num_layers 引数に層の数を指定するだけです

num_layers – Number of recurrent layers. E.g., setting num_layers=2 would mean stacking two LSTMs together to form a stacked LSTM, with the second LSTM taking in outputs of the first LSTM and computing the final results. Default: 1

length = 50
n_features = 1
hidden_size = 20
num_layers = 2
output_size = 5
batch_size = 10
    
class DampedSineWavePredictionModel(nn.Module):

    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(DampedSineWavePredictionModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers)
        self.out = nn.Linear(hidden_size, output_size)
    
    def forward(self, input, h, c):
        output, (h, c) = self.lstm(input, (h, c))
        output = self.out(output)
        return output, (h, c)

model = DampedSineWavePredictionModel(n_features, hidden_size, num_layers, output_size)

今回、隠れ状態のhとcを初期化する関数は作りませんでした。この隠れ状態のサイズはミニバッチ数に依存するためモデルオブジェクトの外で初期化しています。モデルにミニバッチ数を渡すのはあまりきれいな実装ではなさそう。

DampedSineWavePredictionModel(
  (lstm): LSTM(1, 20, num_layers=2)
  (out): Linear(in_features=20, out_features=5, bias=True)
)

Train

訓練用のコードです。1000エポックのループを外で回して、各エポックをミニバッチ数10のデータを生成して訓練していますが、全部で10000個の系列で訓練しているのはKerasと同じです。ミニバッチの系列集合でモデルパラメータを更新したら、隠れ状態をリセットするのは前回と同様です。

KerasではMAE損失が使われていましたが、PyTorchでは実装がありませんでした。今回は、MAEの代わりにMSEで代替しています。

LSTMの隠れ状態hとcのサイズは、LSTMの層数とミニバッチサイズに依存します

criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

losses = []

for i in range(1000):
    # ミニバッチの系列データを生成
    X, y = generate_examples(length, batch_size, output_size)

    # ミニバッチ系列を入力してパラメータを更新したら勾配はリセット
    model.zero_grad()
    
    # 新しいミニバッチを入れるたびに隠れ状態はリセット
    h0 = torch.zeros(num_layers, batch_size, hidden_size)
    c0 = torch.zeros(num_layers, batch_size, hidden_size)

    output, (h, c) = model(X, h0, c0)
    
    loss = criterion(output[-1], y)
    losses.append(loss.item())
    loss.backward()
    optimizer.step()

損失をプロットしてみると下のようになります。

f:id:aidiary:20180901215537p:plain

Predict

最後に系列を1つだけ生成して予測してみます。

# 予測
X, y = generate_examples(length, 1, output_size)
h0 = torch.zeros(num_layers, 1, hidden_size)
c0 = torch.zeros(num_layers, 1, hidden_size)
yhat, (h, c) = model(X, h0, c0)

PyTorchのyhatは最後の隠れ状態だけでなく、入力系列Xの全ての要素に対する隠れ状態が出力されるので最後の隠れ状態だけが欲しい場合は、yhat[-1] とします。また、yhatは勾配を持つのでnumpy arrayに変換する前に detach() が必要です。

plt.plot(y[0].numpy())
plt.plot(yhat[-1][0].detach().numpy())
plt.ylim((0.0, 1.0))

f:id:aidiary:20180901215759p:plain

正解のyと予測のyhatはほぼ一致することが確認できました。

Stacked LSTMの層数を増やしたり、減らしたりするとMSE誤差に変化が出るかと思って色々試してみましたが、これくらい単純なタスクだとほとんど差が出ませんでした。

大規模な翻訳、音声認識、音声合成のネットワークは当たり前のようにStacked LSTMが使われています。今回の実装で色々勉強になりました!