Python】Optunaパッケージの紹介!PostgresDBを使った並列分散処理で高速化!【Optuna】

前回はOptunaを使った簡単な例の最適化とニューラルネットワークの最適化を行ってみました!

Optunaの凄さは前回だけでも十分伝わったと思うのですが、実はOptunaの本領はPostgressDBなどをバックエンドに使用したとき発揮されます!

PostgressDBをOptunaと一緒に使うと以下のことができるようになります!

データベースに最適化過程を保存することができる!
最適化を途中でやめたり再開したりできる!
Trialを並列に実行することができる!

つまりは結果の保存や確認が簡単になってかつ倍速で実行できるようになるということです!最適化の中断や再開ができるようになるのも実用上はかなり大きなメリットです!

以下のコードを実行するためにはPostgressDBをインストールする必要があります!インストールに関してはこちらのサイトなどをご参照ください!

それでは早速やってみましょう!

前回の記事で扱ったのと同じ最適化をPostgressDBを使ってやってみます!まずは100Trialを実行してみます!

A new study created in RDB with name: example2_postgressと出ているので、DBにexample2_postgressというStudyが新しく作成されていることがわかります!

 

DBをからの最適化結果の取り出し

さて早速DBに保存されている最適化の結果を取り出してみましょう!

DBからの結果の取り出しはoptuna.load_studyでstorage=DATABASE_URIを指定するだけでできます!

study = optuna.load_study(study_name=study_name, storage=DATABASE_URI)
study.trials_dataframe()

studyオブジェクトを作成してくれるのであとは前回紹介した可視化関数を使ったり今回使用したstudy.trials_dataframeで結果を確認できます!

ちなみにstudy.trials_dataframeでPandasのDataFrameとして結果を返してくれます!扱いが簡単になってとてもいいですね!

 

最適化の再開

さて次は最適化の再開を試してみます!今度は50Trialだけ実行してみます!

# もう一度同じstudy_nameとstorageを指定することで最適化の再開が可能
DATABASE_URI =  'postgresql://{user}:{password}@{host}:{port}/{database_name}'
study_name = 'example2_postgress'

study = optuna.create_study(
    study_name=study_name,
    storage=DATABASE_URI,
    load_if_exists=True
)
study.optimize(objective, n_trials=50)

Using an existing study with name ‘example2_postgress’ instead of creating a new one.と出ているので、先程のように新しく作成するのでなくて、すでに存在しているexample2_postgressを使用して最適化を再開してくれているのがわかります!

 

study = optuna.load_study(study_name=study_name, storage=DATABASE_URI)
study.trials_dataframe()

前回の100Trialと今回の50Trialで合計150Trial分の結果を返してくれました!

 

最適化プロセスの並列化

さて!次は最適化プロセスの分散並列処理をやってみます!

これはメチャクチャ簡単に最適化を数倍高速化することのできる方法です!

まずは必要なライブラリをインポートして、使用可能なCPUのコア数を確認します!

# 使用可能なCPUの数を確認
import os 
from multiprocessing import Process
max_cpu = os.cpu_count()
max_cpu # 12

私の環境だと12コア使用可能みたいです!

今回の並列化はmultiprocessingのProcess関数を使います!これに関してはこちらのサイトが参考になります!

以下のようにProcess関数に並列で実行したい関数とその引数を渡して、worker.start()を実行することでProcessに渡した関数を実行することができます!

worker = Process(target=optimize, args=(study_name, DATABASE_URI, n_trials_per_cpu))

 

Process関数の使い方がわかったところでこれまでと同様の簡単な例での最適化をやってみます!

今回はoptuna.create_studyしてstudy.optimizeまで実行するoptimize関数を並列化します!

def objective(trial):
    x = trial.suggest_uniform('x', -10, 10)
    return (x - 2) ** 2 

def optimize(study_name, storage, n_trials):
    study = optuna.create_study(
        study_name=study_name,
        storage=storage,
        load_if_exists=True
    )
    study.optimize(objective, n_trials=n_trials)

 

並列処理にはいくつか注意する点があります!

一つはif __name__ == ‘__main__’:節の中で実行することです!この節の外部で並列化処理を実装するとエラーが起こって実行できないときがあります!

またJupyter Notebookだとうまく実行できない可能性があるので.pyファイルを作成して実行してみるのが確実だと思います!

## jupyter notebookだと動かない可能性がある
# 並列化処理は if __name__ == '__main__': の中で書かないと動かない
if __name__ == '__main__':
    DATABASE_URI = 'postgresql://{user}:{password}@{host}:{port}/{database_name}'
    study_name = 'example3_distributed'

    n_trials = 300
    concurrency = 2  
    # max_cpuより使うCPUの数が多くないことを確認
    assert concurrency <= max_cpu
    n_trials_per_cpu = n_trials / concurrency


    # 並列化
    workers = [Process(target=optimize, args=(study_name, DATABASE_URI, n_trials_per_cpu)) for _ in range(concurrency)]
    for worker in workers:
        worker.start()

    for worker in workers:
        worker.join()

 

上記を実行することで2コア並列で最適化を実行することができます!

つまりほぼ2倍の速度で最適化が行えます!めちゃくちゃ簡単ですね!

 

ニューラルネットワークの最適化を並列化してみる!

あまりに簡単な例だけではつまらないので前回と同様のニューラルネットワークの構造最適化のタスクを並列処理して高速化してみます!

タスクの詳細については前回の記事をご参照ください!

タスク自体が多少複雑になっても並列処理の部分は全く変わりません!

基本的にはobjective関数とoptimize関数の中身を多少変えるだけです!

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import optuna 

import os 
from multiprocessing import Process
import numpy as np
import random

# 引数でレイヤー数、ノード数を変更可能なモデルクラス
class VariableModel(nn.Module):
    def __init__(self, n_layers, n_nodes):
        super(VariableModel, self).__init__()
        self.input_layer = nn.Linear(5,n_nodes)
        layer = [nn.Linear(n_nodes,n_nodes) for _ in range(n_layers)]
        self.layer = nn.ModuleList(layer)
        self.output_layer = nn.Linear(n_nodes,1)

    def forward(self, x):
        x = self.input_layer(x)
        x = F.relu(x)
        for i in range(len(self.layer)):
            x = self.layer[i](x)
            x = F.relu(x)

        out = self.output_layer(x)
        return out

# 正解ラベル生成用関数
def f(x):
    return x[0] + x[0]**2 + 2*x[1]**3 + 3*x[2]  + 4*x[3] + 5*x[4]

def objective(trial):
    # チューニングするパラメータの範囲の設定
    n_layers = trial.suggest_int('n_layers', 1, 4)
    n_nodes = trial.suggest_int('n_nodes', 2, 64)
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-1)
    optimizer_name = trial.suggest_categorical('optimizer_name', ['SGD', 'Adam'])

    # その他の定数
    n_epochs = 1000
    n_sample = 10000
    seed = 42

    # インプットと正解ラベルの生成
    torch.manual_seed(seed)
    X = torch.randn(n_sample, 5) 
    y = torch.FloatTensor([f(x_i) for x_i in X]).reshape(n_sample, 1)
    # Trainセット Testセットの分割
    X_train, y_train = X[:int(n_sample * 0.8)], y[:int(n_sample * 0.8)]
    X_test, y_test = X[int(n_sample * 0.8):], y[int(n_sample * 0.8):]

    # モデルのインスタンス化
    model = VariableModel(n_layers=n_layers, n_nodes=n_nodes)
    # 最適化手法のパラメータ設定
    if optimizer_name == 'SGD':
        optimizer = optim.SGD(model.parameters(), lr=lr, momentum=0.9)
    elif optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=lr)
    else:
        raise ValueError('Invalid optimizer_name')
    # loss関数の定義
    criterion = nn.MSELoss()

    # 学習ループ
    model.train()
    for epoch in range(n_epochs):
        optimizer.zero_grad()
        preds = model(X_train)
        loss = criterion(preds, y_train)
        loss.backward()
        optimizer.step()
        if (epoch + 1) % 100 == 0:
            print(f'Epoch [{epoch + 1}/{n_epochs}], Loss: {loss.item():.2f}')
    
    # テストセットで評価
    preds_test = model(X_test)
    test_loss = criterion(preds_test, y_test)
    # テストセットのLossを返す
    return test_loss.item()

def optimize(study_name, storage, n_trials):
    study = optuna.create_study(
        study_name=study_name,
        storage=storage,
        load_if_exists=True
    )
    study.optimize(objective, n_trials=n_trials)

if __name__ == '__main__':

    seed = 42
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

    # postgresql関連の設定
    DATABASE_URI =  'postgresql://{user}:{password}@{host}:{port}/{database_name}'
    study_name = 'example4_nn'

    # 探索の回数と並列する数を設定
    n_trials = 2
    concurrency = 2
    n_trials_per_cpu = n_trials / concurrency
    # max_cpuより使うCPUの数が多くないことを確認
    max_cpu = os.cpu_count()
    if not concurrency <= max_cpu: raise ValueError('concurrency > max_cpu')

    # optimizeを並列に実行
    workers = [
        Process(
            target=optimize, 
            args=(study_name, DATABASE_URI, n_trials_per_cpu)
        ) for _ in range(concurrency)
    ]
    for worker in workers:
        worker.start()

    for worker in workers:
        worker.join()

    # 最適化の結果を確認
    study = optuna.load_study(study_name=study_name, storage=DATABASE_URI)
    best_trial = study.best_trial
    print(f"""Number of finished trials: {len(study.trials)}
    Best trial:
        Value: {best_trial.value:.6f}
    Params: """)
    for k, v in best_trial.params.items():
        if type(v) is float:
            print(f'        {k}: {v:.6f}')
        else:
            print(f'        {k}: {v}')

 

参照

ハイパーパラメータ自動最適化ツール「Optuna」公開

https://optuna.readthedocs.io/en/v2.2.0/tutorial/004_distributed.html

https://dot-blog.jp/news/python-multiprocessing-basic/

https://www.dbonline.jp/postgresql/install/

NO IMAGE
最新情報をチェックしよう!