人工知能に関する断創録

このブログでは人工知能のさまざまな分野について調査したことをまとめています(更新停止: 2019年12月31日)

Theanoによるロジスティック回帰の実装

Theanoによる2クラスロジスティック回帰の実装(2015/5/19)のつづき。

今回からDeep Learning Tutorialの内容にそって実装していく。このチュートリアルのコードやデータはGithub上で公開されているのでcloneしておいた。

git clone https://github.com/lisa-lab/DeepLearningTutorials

さっそく、最初の例のロジスティック回帰(Logistic Regression)でMNISTの手書き数字認識を試してみよう。載せているソースコードは上のリポジトリとほとんど同じ(コメントを日本語で補足したくらい)。ここでは全部掲載したりせずに自分がポイントだと思ったところだけまとめておきたい。

ソースコード全体はここに置いた。

MNISTデータのロード

MNISTは手書き数字の画像データセット。前に多層パーセプトロンで使用したことがあった。

このチュートリアルでは、MNISTのデータをPythonのcPickleモジュールでロードできる形式に圧縮したmnist.pkl.gzが提供されているのでそちらを使う。このデータはMNISTの70000サンプルが

  • 50000の訓練セット
  • 10000のバリデーションセット
  • 10000のテストセット

にあらかじめ分割されている。各サンプルは28x28=784次元のデータで値は0.0から1.0に正規化されている。

f:id:aidiary:20150525203441p:plain

学習データをロードする関数

def load_data(dataset):
    """データセットをロードしてGPUの共有変数に格納"""
    f = gzip.open(dataset, 'rb')
    train_set, valid_set, test_set = cPickle.load(f)
    f.close()

    def shared_dataset(data_xy, borrow=True):
        data_x, data_y = data_xy

        # 共有変数には必ずfloat型で格納
        shared_x = theano.shared(np.asarray(data_x, dtype=theano.config.floatX), borrow=borrow)
        shared_y = theano.shared(np.asarray(data_y, dtype=theano.config.floatX), borrow=borrow)

        # ラベルはint型なのでキャストして返す
        return shared_x, T.cast(shared_y, 'int32')

    test_set_x, test_set_y = shared_dataset(test_set)
    valid_set_x, valid_set_y = shared_dataset(valid_set)
    train_set_x, train_set_y = shared_dataset(train_set)

    rval = [(train_set_x, train_set_y),
            (valid_set_x, valid_set_y),
            (test_set_x, test_set_y)]

    return rval

ファイルからcPickleでロードした後に共有変数に格納している。共有変数にするとGPUのメモリ領域に保存されるため学習時に高速に読み込めるという利点がある。共有変数の値は、get_value()で取得できる。また、shapeで配列のサイズが調べられる。

train_set_x.get_value().shape
(50000, 784)

行にサンプル、列に特徴量という一般的な形式で格納されていることがわかる。

訓練データを共有変数に格納するときは必ずfloatX型にする必要がある。本来は整数型のラベル(今回は数字認識なので0から9)も同様。ただし、ラベルを使うときは整数型の方が都合がよいためT.cast()で整数型にキャストしてから返している。キャストするとSharedVariableTensorVariableになってしまい、get_value()で値が取得できなくなるみたい・・・

先の数字の画像を描画するには、shared_dataset()の前に下のコードを挿入すればOK。共有変数に入れてからget_value()で取り出して描画しようとするとなぜかMemoryErrorで描画できなかった。

    import pylab
    train_set_x, train_set_y = train_set
    pylab.figure()
    for index in range(100):
        pylab.subplot(10, 10, index + 1)
        pylab.axis('off')
        pylab.imshow(train_set_x[index, :].reshape(28, 28), cmap=pylab.cm.gray_r, interpolation='nearest')
    pylab.show()
    exit()

多クラスロジスティック回帰

ロジスティック回帰のパラメータは、重み行列Wとバイアスベクトルbの2つ*1。パラメータは適当な値(今回は0)で初期化し、あとで更新できるように共有変数で定義しておく。

        # 重み行列を初期化
        self.W = theano.shared(value=np.zeros((n_in, n_out), dtype=theano.config.floatX),
                               name='W',
                               borrow=True)

        # バイアスベクトルを初期化
        self.b = theano.shared(value=np.zeros((n_out,), dtype=theano.config.floatX),
                               name='b',
                               borrow=True)

ロジスティック回帰では、入力ベクトルxを与えたとき、それがクラスiに分類される確率は

 \displaystyle P(Y=i|x, W, b) = {\rm softmax}_{i} (W x + b) = \frac{e^{W_i x + b_i}}{\sum_{j} e^{W_j x + b_j}}

で定義される。この数式をTheanoのシンボルで表すと

        # 各サンプルが各クラスに分類される確率を計算するシンボル
        # 全データを行列化してまとめて計算している
        # 出力は(n_samples, n_out)の行列
        self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W) + self.b)

最初に見たとき、T.dot()の中身が数式と逆じゃないかと思ったのだがこれで正しかった。先の数式はサンプルxを1つだけ与えたときの確率を計算しているのに対し、このコードは複数のサンプル(ミニバッチ単位)を行列化してinputに与え、それらのサンプルの確率をまとめて計算している。

確率が求まったら最終的に一番高い確率が得られるクラスに分類する。

 y_{pred} = {\rm argmax}_{i} P(Y=i|x,W,b)

TheanoではT.argmax()axis=1を指定することでp_y_given_xの各行(サンプルに相当)において一番確率が高いインデックス(クラスに相当)がまとめて取得できる。

        # 確率が最大のクラスのインデックスを計算
        # 出力は(n_samples,)のベクトル
        self.y_pred = T.argmax(self.p_y_given_x, axis=1)

self.y_predは現在のパラメータにおける各サンプルの分類クラスを意味している。

Theanoの厄介なところはself.p_y_given_xなどのシンボルをprintしてみても具体的な値が表示されないのでデバッグしにくいところだと思う。一応、デバッグのコツがあるのであとでTheanoの使い方としてまとめたい。

コスト関数の定義

2クラスロジスティック回帰のコスト関数は交差エントロピー誤差だったが、多クラスロジスティック回帰のコスト関数は負の対数尤度(NLL)を使う(追記)ここによるとどうやら同じものらしい・・・要確認。

 \displaystyle NLL(\theta=\{W, b\}, D) = - \sum_{i=0}^{|D|} \log (P(Y=y^{(i)} | x^{(i)}, W, b))

TheanoでNLLを実装すると下のようになる。

    def negative_log_likelihood(self, y):
        """誤差関数である負の対数尤度を計算するシンボルを返す
        yにはinputに対応する正解ラベルを渡す"""
        # 式通りに計算するとsumだがmeanの方がよい
        return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]), y])

NLLを計算するためには訓練データ集合 (x^{(i)}, y^{(i)})が必要なので正解クラスを表すシンボルyを引数で与えている。対応するxはNLLで使われているシンボルself.p_y_given_xを計算するときにすでにinputとして与えているためこの関数で渡す必要はない(ここら辺がややこしく感じた)。

行列をスライスしている[T.arange(y.shape[0]), y]の理解が少し難しい。self.p_y_given_xは各訓練データが各クラスに分類される確率を表した行列になる。そのT.logをとっても同じ形状の行列になる。 その行列から訓練データの正解のクラスy^{(i)}に対応する確率P(Y=y^{(i)})のみを集めているのが先のスライスの意味。

数式ではスライスした確率の和を取っているが、コードでは平均を取っている。チュートリアル資料によると平均にすると学習率の設定がミニバッチのサイズに依存しにくくなるという利点があるらしい。これは初めて知った。

この関数の戻り値はNLLを計算するためのシンボルであることに注意が必要。何か具体的な値が計算されて戻されるわけではない。具体的な計算をするためにはこのシンボルをtheano.function()で使う必要がある。

このNLLを最小化するパラメータWとbを求めるのがロジスティック回帰の学習に当たる。勾配降下法を使うためにはコスト関数NLLの各パラメータ(Wとb)での偏微分が必要になるが、Theanoの自動微分で求められるため自分で定義しなくてよい。

モデルの訓練

モデルの訓練にはミニバッチ確率的勾配降下法(MSGD)を使う。確率的勾配降下法(SGD)はただ1つのサンプルで1回だけパラメータを更新するのに対し、MGSDでは複数のサンプルをまとめて1回パラメータを更新する。

何番目のミニバッチを用いるかはシンボルindexで表す。たとえば、訓練データのindex番目のサンプル集合は下のコードで取得できる。

  train_set_x[index * batch_size: (index + 1) * batch_size]
  train_set_y[index * batch_size: (index + 1) * batch_size]

train_set_xtrain_set_yもシンボルなのでprintしても値は表示できない。

確率的勾配降下法では、コスト関数とその微分が必要になる。コスト関数は先に述べたようにNLL(negative_log_likelihood)を使う。コスト関数の微分は、Theanoの自動微分を使えば下記のように簡単に書ける。これはすごい!

    cost = classifier.negative_log_likelihood(y)
    g_W = T.grad(cost=cost, wrt=classifier.W)
    g_b = T.grad(cost=cost, wrt=classifier.b)

パラメータの更新式は、

    # パラメータ更新
    updates = [(classifier.W, classifier.W - learning_rate * g_W),
               (classifier.b, classifier.b - learning_rate * g_b)]

となる。今回はパラメータがWbの2つあるため更新式も2つ指定している。ここまでいろいろシンボルを定義してきてようやくミニバッチ確率的勾配降下法を行う関数が定義できる。

    # index番目の訓練バッチを入力し、パラメータを更新する関数を定義
    # 戻り値としてコストが返される
    # この関数の呼び出し時にindexに具体的な値が初めて渡される
    train_model = theano.function(
        inputs=[index],
        outputs=cost,
        updates=updates,
        givens={
            x: train_set_x[index * batch_size: (index + 1) * batch_size],
            y: train_set_y[index * batch_size: (index + 1) * batch_size]
        })

関数の入力には何番目のミニバッチを使って訓練するかを表すインデックスを与える。関数の出力は更新されたパラメータでのコストが返される。givensは関数を実行する際にシンボルを置き換えるオプションである。ここでは、xindex番目のミニバッチの訓練データのシンボル、yindex番目のミニバッチのラベルのシンボルに置き換えている。関数の引数のindexgivensで使われるだけみたい。

yはシンボルcostの計算に使われていたのにすぐ気付いたが、xがどこで使われているかすぐにわからなかった。いろいろ調べていくとupdatesで使われるclassifierオブジェクトの引数として渡されていいたのに気付いた。

    # MNISTの手書き数字を分類するロジスティック回帰モデル
    # 入力は28ピクセルx28ピクセルの画像、出力は0から9のラベル
    # 入力はシンボルxを割り当てておいてあとで具体的なデータに置換する
    classifier = LogisticRegression(input=x, n_in=28*28, n_out=10)

結局のところxgivensでミニバッチのデータを表すシンボルに置き換えられ、それが引数inputに渡されるという仕組みになっている。このinputは前に見たように出力確率(classifier.p_y_given_x)の計算で使われていた。これは慣れないときつい・・・

モデルの評価

モデルの良さはエラー率で評価する。まずはエラー率を計算するシンボルを返す関数を定義する。

    def errors(self, y):
        """分類の誤差率を計算するシンボルを返す
        yにはinputに対応する正解クラスを渡す"""
        return T.mean(T.neq(self.y_pred, y))

self.y_predは前に書いたように現在のパラメータでの各サンプルの予測クラスを表すシンボル。yは正解クラスを意味する。T.neq()で両者が等しくない要素だけ1になるベクトルが返る。これの平均を取ればエラー率になる。T.sum()で和を取れば分類エラーになったサンプル数(Zero-One Loss)になる。

このerrors()の戻り値もシンボルのため関数化しないと使えない。バリデーションデータとテストデータのそれぞれについてエラー率を計算する関数を定義する。バリデーションデータはEarly-stoppingによる収束判定で使われる。最終的なモデルの評価はテストデータのエラー率で評価する。

    # index番目のバリデーション用ミニバッチを入力してエラー率を返す関数を定義
    validate_model = theano.function(
        inputs=[index],
        outputs=classifier.errors(y),
        givens={
            x: valid_set_x[index * batch_size: (index + 1) * batch_size],
            y: valid_set_y[index * batch_size: (index + 1) * batch_size]
        })


    # index番目のテスト用ミニバッチを入力してエラー率を返す関数を定義
    test_model = theano.function(
        inputs=[index],
        outputs=classifier.errors(y),
        givens={  # ここで初めてシンボル x, y を具体的な値で置き換える
            x: test_set_x[index * batch_size: (index + 1) * batch_size],
            y: test_set_y[index * batch_size: (index + 1) * batch_size]
        })

Early-Stoppingによる収束判定

Early-Stopping(早期終了)は、バリデーションセットを用いてモデルのパフォーマンスをモニタリングすることで過学習を防ぐ手法とのこと。WikipediaのEarly-Stoppingの項目を読むといくつか実装の仕方があるようだがこのチュートリアルではpatienceを利用した収束判定をしている。

    # モデル訓練
    print 'training the model ...'

    # eary-stoppingのパラメータ
    patience = 5000
    patience_increase = 2
    improvement_threshold = 0.995
    validation_frequency = min(n_train_batches, patience / 2)

    best_validation_loss = np.inf
    test_score = 0
    start_time = time.clock()

    done_looping = False
    epoch = 0

    while (epoch < n_epochs) and (not done_looping):
        epoch = epoch + 1
        for minibatch_index in xrange(n_train_batches):
            # minibatch_index番目の訓練データのミニバッチを用いてパラメータ更新
            minibatch_avg_cost = train_model(minibatch_index)

            # validation_frequency回の更新ごとにバリデーションセットによるモデル検証が入る
            iter = (epoch - 1) * n_train_batches + minibatch_index
            if (iter + 1) % validation_frequency == 0:
                # バリデーションセットの平均エラー率を計算
                validation_losses = [validate_model(i) for i in xrange(n_valid_batches)]
                this_validation_loss = np.mean(validation_losses)
                print "epoch %i, minibatch %i/%i, validation error %f %%" % (epoch, minibatch_index + 1, n_train_batches, this_validation_loss * 100)

                # エラー率が十分改善したならまだモデル改善の余地があるためpatienceを上げてより多くループを回せるようにする
                if this_validation_loss < best_validation_loss:
                    if this_validation_loss < best_validation_loss * improvement_threshold:
                        patience = max(patience, iter * patience_increase)
                        print "*** iter %d / patience %d" % (iter, patience)

                    best_validation_loss = this_validation_loss

                    # テストセットを用いたエラー率も求めておく
                    test_losses = [test_model(i) for i in xrange(n_test_batches)]
                    test_score = np.mean(test_losses)
                    print "    epoch %i, minibatch %i/%i, test error of best model %f %%" % (epoch, minibatch_index + 1, n_train_batches, test_score * 100)

            # patienceを超えたらループを終了
            if patience <= iter:
                done_looping = True
                break

    end_time = time.clock()
    print "Optimization complete with best validation score of %f %%, with test performance %f %%" % (best_validation_loss * 100, test_score * 100)
    print "The code run for %d epochs, with %f epochs/sec" % (epoch, 1.0 * epoch / (end_time - start_time))

このコードはモデル訓練をしている一番重要なループだけれど読み解くのが少し大変。私は以下のように理解した。

  • モデルの訓練は訓練データのミニバッチ単位で行う。
  • エポック epoch は訓練データのミニバッチを使い切ったら+1される。訓練データを一通り使いきっても収束していなければまた最初のミニバッチに戻って同じデータを繰り返し使ってパラメータ更新する。
  • 最初の while ループは、最大n_epochs 回エポックを繰り返されたら更新を終えることを意味する。これによりたとえ収束しなくてもプログラムはいつかは終わることが保証される。
  • iterはこれまで訓練に使用したミニバッチの数が入る。つまり、パラメータ更新回数を意味する。
  • パラメータ更新回数がpatienceを超えたらもうこれ以上更新してもモデルは改善しないと判断されてループを抜ける。逆に言うと、patienceの値が上がるとまだまだパラメータ更新によるモデル改善の余地があると判断されてことになる
  • patienceを上げるかどうかの判断はバリデーションセットを用いて決められる
  • validation_frequency回更新されるたびにモデルの検証が行われる。
  • バリデーションセットを用いて平均エラー率が計算され、この平均エラー率がこれまでの最良の平均エラー率より十分な程度下がっていればまだまだモデル改善の余地ありと判断してpatienceを引き上げる。十分かどうかはimprovement_thresholdで調整できる。
  • この検証の際にテストセットを用いたモデルのエラー率の評価も合わせて行う。

デフォルトで patience は5000になっているけど経過をたどると6140まで引き上げられていた。

実行時間の計測

最後にCPUとGPUで実行時間を比較してみよう。まず、CPUを使った場合。

Optimization complete with best validation score of 7.500000 %, with test performance 7.489583 %
The code run for 74 epochs, with 5.222703 epochs/sec

real    0m17.656s
user    0m0.015s
sys     0m0.031s

学習に17秒かかった。CPUでもかなり早い。エラー率は7.49%なので正解率は92.51%。単純なロジスティック回帰でもかなりの精度が出ていることがわかる。次にGPUを使った場合。

Using gpu device 0: GeForce GTX 760 Ti OEM

Optimization complete with best validation score of 7.500000 %, with test performance 7.489583 %
The code run for 74 epochs, with 9.571053 epochs/sec

real    0m11.927s
user    0m0.000s
sys     0m0.031s

17秒が11秒に改善されている。ロジスティック回帰はパラメータの収束が早いため改善効果はあまり大きくない。1秒あたりの更新エポック数を比較すると5エポックから9エポックになっているためGPUを使うことで着実に速くなっていることが確認できる。

次回は多層パーセプトロン。

参考

うちと同じようにDeep Learning Tutorialに沿って実装を解説されている。Theanoの検証例が豊富で非常に参考になった。今後も参考にさせてもらう予定。

*1:入力データに1を追加することでバイアスベクトルbを重み行列Wに含めることもできるが今回はわけている