新基準となるか?カプセルネットワーク(CapsNet)をKeras で構築してMNISTをやってみました

従来のニューラルネットワークを超えるとまで言われている、最新鋭のアルゴリズム「カプセルネットワーク」。人工知能・機械学習をやっている方であれば、末に耳にした方も多いかと思います。

年末年始のお休みでCapsNetを紐解いてみようとお考えの方も多いはず!本記事では「速報」として、カプセルネットワーク(CapsNet)の簡単な概要、さらにはKeras(TensorFlow Backend)を使ってCapsNetの構築を行い、MNISTの結果を確認するチュートリアルとなります。(参考ソース:こちら

CapsNetの論文(英語)を紐解く前に、まずはコードを一緒に動かしてみましょう!

カプセルネットワーク(CapsNet)とは?

Googleに所属する天才AI研究者「ジェフ・ヒントン」が生み出した、「ニューラルネット」を超える可能性がある新しい深層学習のアルゴリズムです。

ジェフ・ヒントンですが、現在はGoogleとトロント大学で働いており、既に人工知能・機械学習のスタンダードとなりつつある「ニューラルネットワーク」を生み出した研究者の一人でもあります。そんな天才が約40年もの年月を費やして考え発表したのが「カプセルネットワーク」です。

カプセルネットワークの概要の前に、まずはCNN(畳み込みニューラルネットワーク )の理解が必要です。CNNですが、機械学習の画像を扱うアルゴリズムで、画像の解析を行う際に最も頻繁に使われるアルゴリズムです。人工知能の技術でよく騒がれる、自動車の自動運転や工場での検品など、画像を扱う多くの場合で、このCNNが使われていました。

CNNですが非常に優れたアルゴリズムですが、それでも克服しなくてはいけない弱点がありました。その弱点を克服したのが、この新しいカプセル・ネットワークとなります。

より詳しいカプセルネットワークの説明は、下記のリンクをご参考ください!(特に最後のYouTubeがシンプルで非常にわかりやすかったです。)

では、今回の記事の目的でもある、CapsNetをKerasを使ってに構築してみましょう。

チュートリアルの概要

本チュートリアルの概要や環境のバージョンなどをまとめています。あくまで、CapsNetを全て理解するという趣旨ではなく、まずは手っ取り早く実際に構築してみて、触ってみたいという方向けのチュートリアルとなります。

概要
カプセル・ネットワーク(CapsNet)をKeras(TensorFlow Backend)を使って構築して、MNIST(手書き数字データセット)の結果をみてみます。実行環境はJupyter Notebookを使用しています。

必要な時間
1時間〜3時間程度

実行環境

  • macOS High Sierra 10.13.1
  • プロセッサ 2.4GHz Intel Core i5
  • メモリ 8GB
  • Python 3.6.1
  • Numpy 1.12.1
  • Matplotlib 2.0.2
  • iPython 6.2.1
  • Keras 2.1.2
  • TensorFlow 1.4.1

チュートリアルの流れ

  1. カプセル層の作成
  2. モデル構築
  3. MNISTデータの読み込み
  4. 結果確認

では、早速、Jupyter Notebookを立ち上げてやってみましょう。

カプセル層の作成

kerasとTensorFlowを使ってカプセルネットワークのレイヤーを作成します。まずは必要なライブラリをインポートしていきましょう。

import numpy as np
import os
import pandas as pd
from keras.preprocessing.image import ImageDataGenerator
from keras import callbacks
from keras.utils.vis_utils import plot_model
import keras.backend as K
import tensorflow as tf
from keras import initializers, layers

まずはベクトルの長さを計算する処理です。margin_lossのy_trueと同じサイズのテンソルを計算します。

  • 入力 : shape = [dim_1, …, dim_{n-1}, dim_n}
  • 出力 :shape = [dim_1, …, dim_{n-1}}
import keras.backend as K
import tensorflow as tf
from keras import initializers, layers

class Length(layers.Layer):
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1))

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

次にテンソルをマスクするクラスの作成です。

class Mask(layers.Layer):
    def call(self, inputs, **kwargs):
        if type(inputs) is list: 
            assert len(inputs) == 2
            inputs, mask = inputs
        else:
            x = inputs

            x = (x - K.max(x, 1, True)) / K.epsilon() + 1
            mask = K.clip(x, 0, 1)

        inputs_masked = K.batch_dot(inputs, mask, [1, 1])
        return inputs_masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:
            return tuple([None, input_shape[0][-1]])
        else:
            return tuple([None, input_shape[-1]])

次が非線形の活性化関数「squash」を作りましょう。計算式ですが、下記となります。より詳しく調べてみたい方は、冒頭で触れましたが論文または説明動画をお勧めいたします。基本的に大きなベクトルをほぼ1へ、小さいベクトルをほぼ0として処理をします。

def squash(vectors, axis=-1):
    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm)
    return scale * vectors

さて、いよいよカプセルレイヤー(カプセル層)の作成をします。デンスレイヤーと似ています。デンスレイヤーでは、インプットをスカラーのin_num(一つ前の層の出力ニューロン)として、アウトプットはout_num(出力ニューロン)でした。カプセルネットワークも似たような構造ですが、出力ニューロンがスカラーではなく、ベクトルの部分が異なります。

カプセルネットワークの入出力
インプットのサイズ:[None, input_num_capsule, input_dim_vector] アウトプットのサイズ:[None, num_capsule, dim_vector]

パラメーター
num_capsule:レイヤーのカプセル数
dim_vector:レイヤーのカプセルベクトルの出力のディメンション
num_routing:ルーティングアルゴリズムの反復回数

class CapsuleLayer(layers.Layer):
    def __init__(self, num_capsule, dim_vector, num_routing=3,
                 kernel_initializer='glorot_uniform',
                 bias_initializer='zeros',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_vector = dim_vector
        self.num_routing = num_routing
        self.kernel_initializer = initializers.get(kernel_initializer)
        self.bias_initializer = initializers.get(bias_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3,
        self.input_num_capsule = input_shape[1]
        self.input_dim_vector = input_shape[2]

        self.W = self.add_weight(shape=[self.input_num_capsule, self.num_capsule, self.input_dim_vector, self.dim_vector],
                                 initializer=self.kernel_initializer,
                                 name='W')

        self.bias = self.add_weight(shape=[1, self.input_num_capsule, self.num_capsule, 1, 1],
                                    initializer=self.bias_initializer,
                                    name='bias',
                                    trainable=False)
        self.built = True

    def call(self, inputs, training=None):
        inputs_expand = K.expand_dims(K.expand_dims(inputs, 2), 2)

        inputs_tiled = K.tile(inputs_expand, [1, 1, self.num_capsule, 1, 1])

        inputs_hat = tf.scan(lambda ac, x: K.batch_dot(x, self.W, [3, 2]),
                             elems=inputs_tiled,
                             initializer=K.zeros([self.input_num_capsule, self.num_capsule, 1, self.dim_vector]))

        assert self.num_routing > 0, 'The num_routing should be > 0.'
        for i in range(self.num_routing):
            c = tf.nn.softmax(self.bias, dim=2)

            outputs = squash(K.sum(c * inputs_hat, 1, keepdims=True))


            if i != self.num_routing - 1:
                self.bias += K.sum(inputs_hat * outputs, -1, keepdims=True)
        return K.reshape(outputs, [-1, self.num_capsule, self.dim_vector])

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_vector])

def PrimaryCap(inputs, dim_vector, n_channels, kernel_size, strides, padding):
    output = layers.Conv2D(filters=dim_vector*n_channels, kernel_size=kernel_size, strides=strides, padding=padding)(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_vector])(output)
    return layers.Lambda(squash)(outputs)

これでカプセル層の作成は完了です!次のステップへいきましょう。

モデル構築

さて、次は前項で作成した層を使ってモデルの構築を行いましょう。カプセルネットワークのモデルですが、一般的なX → yと少々異なり、(X, y) → (y, X)となっています。この方法ですが、cGAN(Conditional Generative Adversarial Nets)と似ていています。

from keras import layers, models
from keras import backend as K
from keras.utils import to_categorical
def CapsNet(input_shape, n_class, num_routing):
    x = layers.Input(shape=input_shape)
    conv1 = layers.Conv2D(filters=256, kernel_size=9, strides=1, padding='valid', activation='relu', name='conv1')(x)
    primarycaps = PrimaryCap(conv1, dim_vector=8, n_channels=32, kernel_size=9, strides=2, padding='valid')
    digitcaps = CapsuleLayer(num_capsule=n_class, dim_vector=16, num_routing=num_routing, name='digitcaps')(primarycaps)
    out_caps = Length(name='out_caps')(digitcaps)

    y = layers.Input(shape=(n_class,))
    masked = Mask()([digitcaps, y])
    x_recon = layers.Dense(512, activation='relu')(masked)
    x_recon = layers.Dense(1024, activation='relu')(x_recon)
    x_recon = layers.Dense(784, activation='sigmoid')(x_recon)
    x_recon = layers.Reshape(target_shape=[28, 28, 1], name='out_recon')(x_recon)

    return models.Model([x, y], [out_caps, x_recon])

次にマージンロスの処理を作ります。

def margin_loss(y_true, y_pred):
    L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))

    return K.mean(K.sum(L, 1))

さて、最後にモデルの宣言をしましょう。最後にモデルの概要をプリントして確認してみます。

model = CapsNet(input_shape=[28, 28, 1],
                n_class=10,
                num_routing=3)
model.summary()
try:
    plot_model(model, to_file='model.png', show_shapes=True)
except Exception as e:
    print('No fancy plot {}'.format(e))

これでCapsNet(カプセルネットワーク)の構築が完了です!次はMNISTのデータを読み込んで、訓練をしてみましょう!

MNIST読み込み&カプセルネットワークの訓練

いよいよMNISTのデータを読み込んで、カプセルネットワークのトレーニングを行います。MNISTのデータをお持ちでない方は、Kaggle MNISTデータからダウロードをお願いします。(Kaggleへの無料会員登録が必要です。参考:Kaggleとは

では、早速データを読み込んで、train_test_splitを使ってテストデータと訓練データに分けてあげましょう。

from sklearn.model_selection import train_test_split
data_train = pd.read_csv('train.csv') # 自身の環境に合わせてパスを指定してください
X_full = data_train.iloc[:,1:]
y_full = data_train.iloc[:,:1]
x_train, x_test, y_train, y_test = train_test_split(X_full, y_full, test_size = 0.3)

次にデータ型と簡単な事前処理を行います。

x_train = x_train.values.reshape(-1, 28, 28, 1).astype('float32') / 255.
x_test = x_test.values.reshape(-1, 28, 28, 1).astype('float32') / 255.
y_train = to_categorical(y_train.astype('float32'))
y_test = to_categorical(y_test.astype('float32'))

いよいよCapsNet(カプセルネットワーク)のトレーニングの処理を書きましょう。

def train(model, data, epoch_size_frac=1.0):

    (x_train, y_train), (x_test, y_test) = data

    log = callbacks.CSVLogger('log.csv')
    checkpoint = callbacks.ModelCheckpoint('weights-{epoch:02d}.h5',
                                           save_best_only=True, save_weights_only=True, verbose=1)
    lr_decay = callbacks.LearningRateScheduler(schedule=lambda epoch: 0.001 * np.exp(-epoch / 10.))

    model.compile(optimizer='adam',
                  loss=[margin_loss, 'mse'],
                  loss_weights=[1., 0.0005],
                  metrics={'out_caps': 'accuracy'})

    # -----------------------------------Begin: Training with data augmentation -----------------------------------#
    def train_generator(x, y, batch_size, shift_fraction=0.):
        train_datagen = ImageDataGenerator(width_shift_range=shift_fraction,
                                           height_shift_range=shift_fraction)
        generator = train_datagen.flow(x, y, batch_size=batch_size)
        while 1:
            x_batch, y_batch = generator.next()
            yield ([x_batch, y_batch], [y_batch, x_batch])

    model.fit_generator(generator=train_generator(x_train, y_train, 64, 0.1),
                        steps_per_epoch=int(epoch_size_frac*y_train.shape[0] / 64),
                        epochs=1,
                        validation_data=[[x_test, y_test], [y_test, x_test]],
                        callbacks=[log, checkpoint, lr_decay])
    # -----------------------------------End: Training with data augmentation -----------------------------------#

    model.save_weights('trained_model.h5')
    print('Trained model saved to \'trained_model.h5\'')

    return model

さて、これで全ての準備が整いました!いよいよ、MNISTのデータをカプセルネットワークでトレーニングしてみましょう!私の環境(Mac、 Core i5、8GBRAM)で30分程度かかりました。GPUが欲しい・・・。

train(model=model, data=((x_train, y_train), (x_test[:60], y_test[:60])), 
      epoch_size_frac = 0.5)
# 上記コードの出力内容
Epoch 1/1
228/229 [============================>.] - ETA: 7s - loss: 0.2209 - out_caps_loss: 0.2208 - out_recon_loss: 0.1189 - out_caps_acc: 0.7514 Epoch 00001: val_loss improved from inf to 0.04321, saving model to weights-01.h5
229/229 [==============================] - 1799s 8s/step - loss: 0.2203 - out_caps_loss: 0.2203 - out_recon_loss: 0.1187 - out_caps_acc: 0.7524 - val_loss: 0.0432 - val_out_caps_loss: 0.0432 - val_out_recon_loss: 0.0650 - val_out_caps_acc: 0.9833
Trained model saved to 'trained_model.h5'
Out[11]:
<keras.engine.training.Model at 0x1194ff978>

これでCapsNetのトレーニングが完了です。次はいよいよデータを渡して精度を確認してみましょう。

CapsNetの精度を確認してみよう

MNISTの予測をする前に、テストとMNIST画像を簡単に処理する関数を作っておきましょう。テストデータでの精度(Accuracy)、さらにx_testとx_reconの画像をプロッティングしてみましょう。

def combine_images(generated_images):
    num = generated_images.shape[0]
    width = int(np.sqrt(num))
    height = int(np.ceil(float(num)/width))
    shape = generated_images.shape[1:3]
    image = np.zeros((height*shape[0], width*shape[1]),
                     dtype=generated_images.dtype)
    for index, img in enumerate(generated_images):
        i = int(index/width)
        j = index % width
        image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1]] = \
            img[:, :, 0]
    return image

def test(model, data):
    x_test, y_test = data
    y_pred, x_recon = model.predict([x_test, y_test], batch_size=100)
    print('-'*50)
    print('Test acc:', np.sum(np.argmax(y_pred, 1) == np.argmax(y_test, 1))/y_test.shape[0])

    import matplotlib.pyplot as plt
    from PIL import Image

    img = combine_images(np.concatenate([x_test[:50],x_recon[:50]]))
    image = img * 255
    Image.fromarray(image.astype(np.uint8)).save("real_and_recon.png")
    print()
    print('Reconstructed images are saved to ./real_and_recon.png')
    print('-'*50)
    plt.imshow(plt.imread("real_and_recon.png", ))
    plt.show()

では、テスト用データの最初の100件を使って訓練したCapsNetのテストを行ってみましょう!新しいアルゴリズムを確認するのって、ワクワクしますよね(笑)

test(model=model, data=(x_test[:100], y_test[:100]))

精度が0.97!驚くほど高い精度ですね!

CapsNet(カプセルネットワーク)まとめ

いかがでしたでしょうか?今回はKerasを使って実際にCapsNet(カプセルネットワーク)の構築、さらにはMNISTのデータセットでテストを行ってみました。個人的にも、まだまだ紐解きが必要な部分が多数ありますので、今回を皮切りに論文などを読み漁ってみようかと思いました!

機械学習初心者向けのハンズオンチュートリアルを多数掲載しています。今回はMNISTを使って画像の処理でしたが、基本的なランダムフォレストやXGBoostなどのチュートリアルも掲載していますので、興味のある方はぜひ下記のチュートリアルもやってみてください!

以上、最新アルゴリズム「カプセルネットワーク(CapsNet)」の速報チュートリアルでした!

 

codexaチーム: 人工知能関連(機械学習・深層学習)に関連するニュースやチュートリアルを掲載しています。また先端技術を扱っているスタートアップや企業様向けにインタビューも常に募集しています!お気軽にお声掛けください。 >> お問い合わせ