機械学習の「学習」で用いるデータは、バッチと呼ばれる単位で処理することが必要になります。
以前に、『TensorFlow推奨フォーマット「TFRecord」の作成と読み込み方法』という記事で紹介した方法では、TFRecordファイルの作成と読み込みを行いましたが、データを1レコードずつ順番に取り出すことしかできていません。
そこで、今回は、TensorFlowで推奨されている Dataset API を利用して、マルチスレッド(並列処理)で、バッチ単位にデータを取得する手法で、(機械学習の)「学習」を行っていきます。
※データの扱い方について、以前の記事では、キューを扱うことを明示的に記載したコーディング(本記事では、以後「キューベースパイプライン」と呼ぶこととします)となっていましたが、TensorFlowバージョン1.4(2017年11月リリース)から、Dataset APIを利用することが推奨されています。
参考:TensorFlow API Documentation「Threading and Queues」
※この記事では、(機械学習の)「学習」を以下の条件で行うものとします。
・入力データとなる、モデルの学習に用いる「学習用データ」と、学習済みモデルを評価するのに用いる「評価用データ」を、別に分けて用意します。
・同じモデルを利用して、上記2つの入力データを切り替えて、「学習」と「評価」を実行します。
「キューベースパイプライン」の場合、入力データの切り替えがなかなか難しく、困ったのですが、今回のDataset APIではその切り替えも簡単に行えます。
TFRecordの学習をDataset APIで行う流れ
- MNISTデータ(10000件)から、学習用データと評価用データをTFRecord形式で準備します。
(※TFRecordの作成方法は、こちらの記事に記載しているのでご覧ください)- 学習用データ: MNISTの先頭から7000件目までのデータ
- 評価用データ: MNISTの7001件目から最後までのデータ
- 上記1のデータを用いて、学習と評価を実施します。
(※後述のサンプルコードは、この内容のみを含んでいます)- 入力データの解析処理
- 学習用データのTFRecordファイルを解析し、読み込める状態にします。
- モデルの作成
- TFRecordファイルを入力にしたモデルを構築します。
- 入力ファイルは、100レコードを1バッチサイズとして扱います。
- 学習と評価
- このモデルでの学習は、1000ステップ行います。
- 入力ファイル「学習用データ」の取り扱いイメージは、次の通り。
- ファイルの先頭レコードから、ステップ毎にバッチサイズ分のレコードを、順次「学習」に利用します。
- ステップが進み、レコードの最後まで到達すると、先頭からもう一度取得するようにします。
- 入力ファイル「学習用データ」の取り扱いイメージは、次の通り。
- 学習が済んだら、評価を行います。
- 前述のように学習済みである同じモデルを用います。
- 入力ファイルを「評価用データ」に切り替えます。
- 推定値と正解ラベルで、答え合わせをします。
- このモデルでの学習は、1000ステップ行います。
- 入力データの解析処理
サンプルコード(全体)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import numpy as np import tensorflow as tf import time # -------------------------------------- # パラメータ設定 # -------------------------------------- MAX_STEPS = 1000 # 学習回数 BATCH_SIZE = 100 # バッチサイズ IMAGE_WIDTH = 28 # 画像サイズ:幅 IMAGE_HEIGHT = 28 # 画像サイズ:高さ IMAGE_CHANNE = 1 # 画像チャネル数 TARGET_SIZE = 10 # 教師画像の種類数 CONV1_FILTER_SIZE = 3 # フィルターサイズ(幅、高さ) CONV1_FEATURES = 32 # 特徴マップ数 CONV1_STRIDE = 1 # ストライドの設定 MAX_POOL_SIZE1 = 2 # マップサイズ(幅、高さ) POOL1_STRIDE = 2 # ストライドの設定 AFFINE1_OUTPUT_SIZE = 100 # 全結合層(1)の出力数 NUM_THREADS = 4 # スレッド INPUT_TFRECORD_TRAIN = "training.tfrecords" # TFRecordファイル名(学習用) INPUT_TFRECORD_TEST = "test.tfrecords" # TFRecordファイル名(評価用) # -------------------------------------- # 入力データの解析処理 # -------------------------------------- # データ解析(1) def _parse_function(example_proto): features={ 'label': tf.FixedLenFeature((), tf.int64, default_value=0), 'image': tf.FixedLenFeature((), tf.string, default_value="") } parsed_features = tf.parse_single_example(example_proto, features) # データ構造を解析 return parsed_features["image"], parsed_features["label"] # データ解析(2) def read_image(images, labels): label = tf.cast(labels, tf.int32) label = tf.one_hot(label, TARGET_SIZE) image = tf.decode_raw(images, tf.uint8) image = tf.cast(image, tf.float32) image = image / 255 # 画像データを、0~1の範囲に変換する image = tf.reshape(image, [IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_CHANNE]) return image, label # -------------------------------------- # モデルの作成 # -------------------------------------- # 入力層 filenames = tf.placeholder(tf.string, shape=[None]) dataset = tf.data.TFRecordDataset(filenames) dataset = dataset.map(_parse_function, NUM_THREADS) # レコードを解析し、テンソルに変換 dataset = dataset.map(read_image, NUM_THREADS) # データの形式、形状を変更 dataset = dataset.batch(BATCH_SIZE) # 連続するレコードをバッチに結合 dataset = dataset.repeat(-1) # 無限に繰り返す iterator = tf.contrib.data.Iterator.from_structure(dataset.output_types, dataset.output_shapes) # イテレータを作成 x, y_ = iterator.get_next() # イテレータの次の要素を取得 init_op = iterator.make_initializer(dataset) # イテレータを初期化 # CONV層 conv1_weight = tf.Variable(tf.truncated_normal([CONV1_FILTER_SIZE, CONV1_FILTER_SIZE, 1, CONV1_FEATURES], stddev=0.1)) conv1_bias = tf.Variable(tf.zeros([CONV1_FEATURES])) conv1 = tf.nn.bias_add( tf.nn.conv2d(x, conv1_weight, strides=[1, CONV1_STRIDE, CONV1_STRIDE, 1], padding='SAME') ,conv1_bias) relu1 = tf.nn.relu(conv1) # POOL層 pool1 = tf.nn.max_pool(relu1, ksize=[1, MAX_POOL_SIZE1, MAX_POOL_SIZE1, 1], strides=[1, POOL1_STRIDE, POOL1_STRIDE, 1], padding='SAME') pool1_shape = pool1.get_shape().as_list() pool1_flat_shape = pool1_shape[1]*pool1_shape[2]*pool1_shape[3] pool1_flat = tf.reshape(pool1, [-1, pool1_flat_shape] ) # 2次元に変換 # 全結合層1 W1 = tf.Variable(tf.truncated_normal([pool1_flat_shape, AFFINE1_OUTPUT_SIZE])) b1 = tf.Variable(tf.zeros([AFFINE1_OUTPUT_SIZE])) affine1 = tf.matmul(pool1_flat, W1) + b1 sigmoid1 = tf.sigmoid(affine1) # 全結合層2 W2 = tf.Variable(tf.truncated_normal([AFFINE1_OUTPUT_SIZE, TARGET_SIZE])) b2 = tf.Variable(tf.zeros([TARGET_SIZE])) affine2 = tf.matmul(sigmoid1, W2) + b2 # 出力層 y = tf.nn.softmax(affine2) # 誤差関数(loss) loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=y_, logits=affine2)) # 最適化手段(最急降下法) train_step = tf.train.GradientDescentOptimizer(0.05).minimize(loss) # 正答率 correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) # -------------------------------------- # 学習と評価 # -------------------------------------- with tf.Session() as sess: try: # 学習 init = tf.global_variables_initializer() sess.run(init) # 変数の初期化処理 sess.run(init_op, feed_dict={filenames: [INPUT_TFRECORD_TRAIN]}) # データの初期化 for step in range(MAX_STEPS): start_time = time.time() _, l, acr = sess.run([train_step, loss, accuracy]) # 最急勾配法でパラメータ更新 duration = time.time() - start_time if (step + 1) % 100 == 0: print("step={:4d}, loss={:5.2f}, Accuracy={:5.2f} ({:.3f} sec)".format(step + 1, l, acr, duration)) # 評価 sess.run(init_op, feed_dict={filenames: [INPUT_TFRECORD_TEST]}) # データの初期化 est_accuracy, est_y, new_y_ = sess.run([accuracy, y, y_]) print("Accuracy (for test data): {:5.2f}".format(est_accuracy)) print("True Label:", np.argmax(new_y_[0:15,], 1)) print("Est Label:", np.argmax(est_y[0:15, ], 1)) except tf.errors.OutOfRangeError: print('Done training -- epoch limit reached') print("step={:4d}".format(step + 1)) |
サンプルコードの解説
1)TFRecordファイルを解析する
TFRecordファイルを作成した際に各項目を定義した内容で、バイナリーデータを解析し、読み込めるようにします。
◇32~40行目のコード
32 33 34 35 36 37 38 39 40 |
# データ解析(1) def _parse_function(example_proto): features={ 'label': tf.FixedLenFeature((), tf.int64, default_value=0), 'image': tf.FixedLenFeature((), tf.string, default_value="") } parsed_features = tf.parse_single_example(example_proto, features) # データ構造を解析 return parsed_features["image"], parsed_features["label"] |
TFRecordの各要素の定義を指定して、値を取得します。
-
features:TFRecord形式でファイルを作成した時の各要素の定義情報。
-
「label」要素(フィーチャ):MINSTの各画像の教師ラベル。
-
「image」要素(フィーチャ):MINSTの各画像データ。
-
-
tf.parse_single_example:各要素の定義情報を基に、データ構造を解析。
◇42~52行目のコード
42 43 44 45 46 47 48 49 50 51 52 |
# データ解析(2) def read_image(images, labels): label = tf.cast(labels, tf.int32) label = tf.one_hot(label, TARGET_SIZE) image = tf.decode_raw(images, tf.uint8) image = tf.cast(image, tf.float32) image = image / 255 # 画像データを、0~1の範囲に変換する image = tf.reshape(image, [IMAGE_HEIGHT,IMAGE_WIDTH,IMAGE_CHANNE]) return image, label |
TFRecordから取得した値を加工します。
- label:
- one-hot表現に変換します。
つまり、分類10種類の内、該当する値を1にし、その他を0にしたテンソルにします。
- one-hot表現に変換します。
- image:
- 値を255で割って、0~1の範囲に収まるように正規化しています。
- データの形状を、[ 高さ, 幅 , チャネル数 ] に変形しています。
2)モデルの作成
入力画像に対して、畳み込みニューラルネットワークで処理するようにしています。
- 畳み込み層
- 重み:3(高さ)×3(幅)×1(チャネル数)×32
- プーリング層
- 最大値プーリング
- パディングあり
- 出力結果をフラット化
- 全結合層(1)
- 出力:100
- 全結合層(2)
- 出力:10
- 出力層
- ソフトマックス関数
◇57~67行目のコード
57 58 59 60 61 62 63 64 65 66 67 |
# 入力層 filenames = tf.placeholder(tf.string, shape=[None]) dataset = tf.data.TFRecordDataset(filenames) dataset = dataset.map(_parse_function, NUM_THREADS) # レコードを解析し、テンソルに変換 dataset = dataset.map(read_image, NUM_THREADS) # データの形式、形状を変更 dataset = dataset.batch(BATCH_SIZE) # 連続するレコードをバッチに結合 dataset = dataset.repeat(-1) # 無限に繰り返す iterator = tf.contrib.data.Iterator.from_structure(dataset.output_types, dataset.output_shapes) # イテレータを作成 x, y_ = iterator.get_next() # イテレータの次の要素を取得 init_op = iterator.make_initializer(dataset) # イテレータを初期化 |
- 入力ファイルを、「tf.placeholder」を用いて指定しています。
- これにより、入力ファイルの切り替えが可能となります。
- 従来の方法「キューベースパイプライン」では、エラーが発生して利用できませんでした。
-
dataset.map(_parse_function, NUM_THREADS)
-
TFRecordのデータ定義に従い、データを取得しています。
- 前述で定義した関数「_parse_function」で処理します。
- スレッド数(= NUM_THREADS)を指定し、マルチスレッド(並列処理)を行っています。
-
-
dataset.map(read_image, NUM_THREADS)
-
データに対して、自分でカスタマイズした加工を行っています。
- 前述で定義した関数「read_image」で処理します。
- スレッド数(= NUM_THREADS)を指定し、マルチスレッド(並列処理)を行っています。
-
-
dataset.batch(BATCH_SIZE)
-
バッチサイズ(= BATCH_SIZE)を指定し、まとまったデータを取得しています。
-
-
dataset.repeat(-1)
-
データセットを繰り返す回数を指定しています。
-
「-1」を指定することで、無限に繰り返すことを意味しています。
-
-
tf.contrib.data.Iterator.from_structure(dataset.output_types, dataset.output_shapes)
- データセットを繰り返し取得するためのイテレータを、データ型とデータ形状を指定し、作成します。
- この時点で、データは含んでいません。
- データの初期化は、以下で行っています。
-
iterator.make_initializer(dataset)
-
3)学習と評価
これまでの内容で、入力ファイルを読み込む準備と、そのファイルを利用したモデルの作成が終わりましたので、これから実際に学習を行います。学習が済んだら、入力ファイルを評価用データに切り替えて、評価を行います。
◇116行目のコード
116 |
sess.run(init_op, feed_dict={filenames: [INPUT_TFRECORD_TRAIN]}) # データの初期化 |
◇128行目のコード
128 |
sess.run(init_op, feed_dict={filenames: [INPUT_TFRECORD_TEST]}) # データの初期化 |
sess.run(init_op, feed_dict={filenames: [(ファイル名)]})
-
ここで、入力ファイルを初期化しています。
- このコードを実行することで、入力に用いる TFRecordファイルを、切り替えることができます。
- このコードは for ループの外側で実行しています。
※ループ内で実行してしまうと、毎回初期化され、毎回先頭レコードからバッチサイズ分のデータが、「学習」に繰り返し利用されてしまいます。
まとめ
ここまで見てきたように、TFRecordとDataset APIを用いることで、大量のデータを扱った、マルチスレッド(並列処理)によるミニバッチ学習を、簡潔なコードでコーディングできるうえ、入力データを切り替えて、同じモデルを「学習」処理と「評価」処理などで再利用することが簡単にできるようになります。
執筆者プロフィール
- 入社以来、C/S型の業務システム開発に従事してきました。ここ数年は、SalesforceやOutSystemsなどの製品や、スクラム開発手法に取り組み、現在のテーマは、DeepLearning/機械学習です。
この執筆者の最新記事
- Pick UP!2021.11.11VoTTを複数人で使って、アノテーションを行いたい!(ファイル移行を用いて)
- Pick UP!2020.11.20AIoTデバイス「M5StickV」、はじめの一歩
- RPA2019.08.15「OSSのRPA」+「自作の三目並べマシン」でGoogleに挑む!
- AI2019.04.22暗記学習(Rote Learning)で三目並べを強くする