tensorflow2 Subclassing APIとGradientTapeでシンプルな線形回帰!【tensorflow】

tensorflowにはSequential API, Functional API, Subclassing APIと3つの書き方があります!

この中でSequential API, Functional APIは比較的わかりやすくて初めてTensorflowを触る方でもすぐにモデルを構築することができます!実際、Tensorflowで検索するとSequential APIかFunctional APIで書かれた記事が多く見つかります!

しかし、Subclassing APIは比較的情報が少なくて手を出すのが難しいと感じておられる方も多いのではないでしょうか?実際私もSubclassing APIを試して見たかったのですが参考記事も少なく苦労しました。

そこで今回はSubclassing APIを使って、かなり簡単な線形回帰モデルを作成してみたいと思います!

また、model.fit()による学習ではなく、GradientTapeを用いて学習の操作を自分で実装してみたいと思います!

 

Subclassing APIとは?

Subclassing APIはモデルのパーツと順伝播をオブジェクト指向的に自分自身でカスタマイズして実装することのできるAPIです!最大のメリットはそのカスタマイズ性で、非常に複雑な分岐を持つモデルや、Functional APIでは実装が不可能な動的なモデルも実装可能になります!

Tensorflowを初めて触る人から上級者まで使用するSequential APIやFunctional APIと比較してやや直感的には理解が難しいため中級者〜上級者向けのAPIと言えるかもしれません!

回帰モデルの実装

まずこの記事のコードで使用するライブラリをインポートします!

import tensorflow as tf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import collections
print(tf.__version__)
# 2.5.0

 

今回はかなりシンプルな回帰モデルを作成します!

1次元の入力xからy = a*x + bを計算して正解ラベルを予測するモデルです!

学習するパラメータは傾きaと切片bの2つだけです!

さて、早速Subclassing APIで回帰モデルを実装してみましょう!

具体的な手順は以下です!

  1. tf.keras.Modelを継承する
  2. __init__メソッドの中でレイヤーなどの学習する重み(今回はaとb)を定義する
  3. callメソッドで順伝播を定義する
class MyLinearModel(tf.keras.Model):
    def __init__(self):
        super(MyLinearModel, self).__init__()
        self.a = tf.Variable(0.)
        self.b = tf.Variable(0.)

    def call(self, x):
        y = self.a * x + self.b
        return y

 

まずclass MyLinearModel(tf.keras.Model):でtf.keras.Modelを継承したクラスを作成します!

そしてdef __init__(self)の中で傾きaと切片bを定義します!self.aが傾きaを、self.bが切片bです!

tf.Variable(0.)は訓練可能な変数で初期値を0とおいています!tf.Variableで重みを定義することで、このパラメータを訓練の中で少しづつ変化させる(学習する)ことができます!今回は傾きのような1次元のパラメータですが、Denseレイヤーなどもこの中で定義することができます!(次回紹介します)

ちなみに定数(学習する必要のないパラメータ)を定義したいときにはtf.constantによって定義します!

最後にdef call(self, x):の中でモデルがどのような計算をして予測値を出力するか(順伝播)を定義します!

今回は入力xからself.a * x + self.bを計算して予測値を出力するモデルを定義しました!

モデルが定義できたので以下のようにしてモデルのインスタンス化ができます!

model = MyLinearModel()

 

サンプルデータの作成

今回は傾きaと切片bの真の値をそれぞれ2と100として設定して、それに正規分布に従うノイズを加えたものを訓練データとして作成します!

この節はあまり重要ではないのでこういうデータを作ってるんだなくらいに思って読み飛ばして頂いても大丈夫です!

np.random.seed(seed=32)
a_true, b_true = 2, 100
x = np.random.normal(50, 100, 200)
noise = np.random.normal(0,100,200)
y = a_true * x + b_true + noise
plt.scatter(x, y)

次に上記で作成した訓練データの標準化を行います!今回はsklearnのStandardScalerを使用します!

from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()

xy = np.hstack([x.reshape(-1, 1), y.reshape(-1, 1)])
scaler.fit(xy)
print(f"scaler.mean_: {scaler.mean_[0]:.3}\nscaler.var_: {scaler.var_[0]:.3}")
# scaler.mean_: 60.3
# scaler.var_: 1.01e+04

上記で訓練したscalerを用いて訓練データを標準化します!

xy_scaled = scaler.transform(xy)
x_scaled, y_scaled = xy_scaled[:,0], xy_scaled[:,1]

fig = plt.figure(figsize=(10,5))
ax1 = fig.add_subplot(1,2,1)
ax1.hist(x_scaled)
ax1.set_title('x_scaled')
ax2 = fig.add_subplot(1,2,2)
ax2.hist(y_scaled)
ax2.set_title('y_scaled');

上が標準化後の訓練データのxとyそれぞれのヒストグラムです!平均0分散1に標準化できていることが確認できました!

plt.scatter(x_scaled, y_scaled)
plt.title("x_scaled vs y_scaled")
plt.xlabel("x_scaled")
plt.ylabel("y_scaled")
plt.grid(True)

標準化したあとの訓練データの散布図が上です!元のデータの形はそのままにxとyそれぞれに平均0分散1に標準化できていますね!

これを用いて線形モデルの訓練を行います!

 

GradientTapeを用いた学習の実装

 

GradientTapeとは?

Gradient Tape(勾配テープ)は自動微分を計算するために用意されたAPIです!具体的にGradient Tapeで何ができるかというと、その節の中で行われた演算すべてをテープに記録することでその演算に関連する勾配を計算できます!参考:自動微分と勾配テープ

その名前の通り、「勾配」を「テープ」に記録するイメージです!

これは説明を読むよりも実際にコードを見てみるほうが理解が進むと思います!早速今回の線形回帰の例を見てみましょう!

上記で実装したモデルによる予測値の計算と、予測値と正解ラベルを用いて損失関数であるMSEを計算する部分をGradientTapeの内部で記述して、その勾配を計算します!

model = MyLinearModel()
learning_rate = 1e-3

print(f"Initial value:\n\tmodel.a: {model.a.numpy():.3}\n\tmodel.b: {model.b.numpy():.3}\n")
# Initial value:
# 	model.a: 0.0
# 	model.b: 0.0
with tf.GradientTape() as tape:
    y_pred = model(x_scaled)
    loss = calculate_mse_loss(y_pred, y_scaled)
gradients = tape.gradient(loss, model.trainable_weights)
model.a.assign_sub(learning_rate * gradients[0])
model.b.assign_sub(learning_rate * gradients[1])

print(f"After 1epoch training:\n\tmodel.a: {model.a.numpy():.3}\n\tmodel.b: {model.b.numpy():.3}\n")
# After 1epoch training:
# 	model.a: 0.00179
# 	model.b: -3.73e-12

with tf.GradientTape as tape:節内でモデルによる予測値の計算と、予測値と正解ラベルを用いて損失関数であるMSEを計算を行いました!

そしてwith tf.GradientTape as tape:節を抜けてtape.gradient(loss, model.trainable_weights)で上記の演算の勾配を計算します!

model.trainable_weightはmodelの訓練可能なパラメータで今回はmodel.aとmodel.bを指します!model.trainable_weightはtf.Variableで定義された変数を取り出します!ちなみに訓練可能なパラメータがどれだけ増えたとしてもmodel.trainable_weightですべてを取り出すことができます!

tape.gradient(loss, model.trainable_weights)はlossをmodel.trainable_weightについて微分した値を計算することで「Lossを減少させるにはmodel.aとmodel.bをどちらの方向へ変化させれば良いのか?」を知ることができます!

そして、aに関する勾配(gradients[0])に学習率をかけたものをmodel.aから引き算することでmodel.aを更新します!同様の操作をmodel.bについても行うことでパラメータの更新を行います!

aとbの更新後の値を確認してみます!model.a: 0.00179、model.b: -3.73e-12に更新されました!(model.aとmodel.bの初期値は0.0でした)

さて、上記で1エポックの操作を実装することができたのでこれを複数エポック行うコードを作成します!

epochs = 20
learning_rate = 1e-1

model = MyLinearModel()

log = pd.DataFrame()
for epoch in range(epochs):
    with tf.GradientTape() as tape:
        y_pred = model(x_scaled)
        loss = calculate_mse_loss(y_pred, y_scaled)
    gradients = tape.gradient(loss, [model.a, model.b])
    model.a.assign_sub(learning_rate * gradients[0])
    model.b.assign_sub(learning_rate * gradients[1])

    if (epoch + 1) % 5 == 0:
        print(f"epoch: {epoch+1}, mse_loss: {loss.numpy():1f}\n\t a: {model.a.numpy():4f}, b: {model.b.numpy():4f}")

    log = log.append({'epoch': epoch, 'loss': loss.numpy(), 'model.a': model.a.numpy(), 'model.b': model.b.numpy()},ignore_index=True)

# epoch: 5, mse_loss: 0.330925
# 	 a: 0.602826, b: -0.000000
# epoch: 10, mse_loss: 0.210526
# 	 a: 0.800361, b: -0.000000
# epoch: 15, mse_loss: 0.197598
# 	 a: 0.865089, b: -0.000000
# epoch: 20, mse_loss: 0.196210
# 	 a: 0.886299, b: -0.000000

 

モデルの予測精度の改善の様子

エポックごとのLossの減少を確認してみます!

plt.plot('epoch', 'loss', data=log)
plt.title("Training History")
plt.xlabel("epoch)")
plt.ylabel("MSE loss")

10エポック程度でプラトーに達していることが確認できました!

さて、20エポック後の訓練後のmodel.aとmodel.bを使って回帰直線を描画してみます!

a, b = model.a.numpy(), model.b.numpy()

plt.scatter(x_scaled, y_scaled)
plt.plot([-3,4], [-3 * a + b, 4 * a + b])
plt.title("x_scaled vs y_scaled")
plt.xlabel("x_scaled")
plt.ylabel("y_scaled")
plt.grid(True)

かなりいい感じにデータにフィットしていっていることが確認できました!

最後に、エポックごとの回帰直線の改善の様子を確認してみます!

lines = [[(-3, -3 * row['model.a'] + row['model.b']), (4, 4 * row['model.a'] + row['model.b'])] 
    for _, row in log.iterrows()]
n_lines = len(lines)
regression_line_collection = collections.LineCollection(lines)

fig, ax = plt.subplots()
regression_line_collection.set_array(np.arange(n_lines))
ax.add_collection(regression_line_collection)
ax.scatter(x_scaled, y_scaled)
axcb = fig.colorbar(regression_line_collection)
axcb.set_label('Epoch')
ax.set_title('Regression line of each epoch')

紫色が1~5エポック後の回帰直線で、黄色のものが20エポック後の最終的な回帰直線です!エポックごとに回帰直線が改善していく様子が確認できますね!

 

Optimizerを用いたモデルのパラメータの更新

上ではパラメータの更新の理解のために、勾配に学習率をかけたものを使ってパラメータの更新を行いました!しかし、この方法はそれほど効率の良いパラメータの更新方法(学習方法)ではありません。

この方法よりも早く最適解に達することのできる方法がいくつも提案されていて、Optimizerとして実装してくれています!今回はそれを試してみます!

また上記のコードだと以下のようにパラメータごとにassign_sub(learning_rate * gradients[0])を行うことでパラメータの更新を行って来ましたが、これはパラメータの数が多くなると全部書くのは不可能になります。

gradients = tape.gradient(loss, model.trainable_weights) 
model.a.assign_sub(learning_rate * gradients[0]) 
model.b.assign_sub(learning_rate * gradients[1])

そこで,パラメータの数がいくつになっても1行で実装が可能なoptimizerを使用したパラメータの更新を実装してみます!

今回はAdam Optimizerを使用してモデルのパラメータ更新を行ってみます!

epochs = 20
learning_rate = 1e-1

model = MyLinearModel()
optimizer = tf.keras.optimizers.Adam(learning_rate)

log = pd.DataFrame()
for epoch in range(epochs):
    with tf.GradientTape() as tape:
        y_pred = model(x_scaled)
        loss = calculate_mse_loss(y_pred, y_scaled)
    gradients = tape.gradient(loss, model.trainable_weights)
    optimizer.apply_gradients(zip(gradients, model.trainable_weights))

    if (epoch + 1) % 5 == 0:
        print(f"epoch: {epoch+1}, mse_loss: {loss.numpy():1f}\n\t a: {model.a.numpy():4f}, b: {model.b.numpy():4f}")

    log = log.append({'epoch': epoch, 'loss': loss.numpy(), 'model.a': model.a.numpy(), 'model.b': model.b.numpy()},ignore_index=True)

# epoch: 5, mse_loss: 0.449824
# 	 a: 0.490501, b: -0.060856
# epoch: 10, mse_loss: 0.201932
# 	 a: 0.906225, b: 0.041341
# epoch: 15, mse_loss: 0.242080
# 	 a: 1.135318, b: -0.031943
# epoch: 20, mse_loss: 0.262648
# 	 a: 1.139602, b: 0.025384

 

上記ではモデルのパラメータをmodel.a.assign_sub(learning_rate * gradients[0])のように一つ一つ行っていましたが、今回は optimizer.apply_gradients(zip(gradients, model.trainable_weights))を使って一行ですべてのパラメータの更新を行いました!

この実装の1番のメリットは、モデルがどれほど複雑になってもパラメータが増えても変更する必要がないという点です!

 

plt.plot('epoch', 'loss', data=log)
plt.title("Training History")
plt.xlabel("epoch)")
plt.ylabel("MSE loss")

Adam Optimizerを使用すると少しだけ早く最適解に達することが確認できました!しかしその後はLossが少し大きくなってしまっているのでこの場合は9エポック目のモデルを使用するのがいいかもしれません!

まとめ

今回はSubclassing APIとGradientTapeを使ってかなりシンプルな線形回帰を実装してみました!

Sequential APIでは無視してしまう順伝播の実装や、モデルのパラメータの更新の部分を自分で実装するにはどのようにすればいいのかの参考になりましたら幸いです!

今回は線形回帰を題材にしたのでDenseレイヤーのようなNeural Networkの実装の主役になる部分の実装ができなかったので、次回はDenseレイヤーを使ったより実践的なモデルを実装してみたいと思います!

 

https://www.tensorflow.org/tutorials/customization/autodiff?hl=ja

NO IMAGE
最新情報をチェックしよう!