8. PyTorch Lightning
[ ]:
#!pip install pytorch_lightning optuna mlflow
[ ]:
import numpy as np
import scipy.stats as stats
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.data import random_split, TensorDataset, DataLoader
import pickle
from copy import deepcopy
import pytorch_lightning as pl
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
import tempfile
import os
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.preprocessing import StandardScaler
import optuna
from optuna.integration import PyTorchLightningPruningCallback
%matplotlib inline
Let’s start by generating some random data
[ ]:
torch.manual_seed(1)
beta = torch.rand(10, 1)
train_inputv = torch.randn(700, 10)
train_target = torch.mm(train_inputv, beta)
train_target = train_target
test_inputv = torch.randn(200, 10)
test_target = torch.mm(test_inputv, beta)
test_target = test_target
cutpoints = [torch.quantile(train_target, x).item() for x in [.1, .7, .9]]
train_target_label = sum([0+(train_target > cutpoint) for cutpoint in cutpoints],0)
train_target_label = train_target_label.flatten()
test_target_label = sum([0+(test_target > cutpoint) for cutpoint in cutpoints],0)
test_target_label = test_target_label.flatten()
Let’s scale our data to help the neural network training process.
[ ]:
scaler = StandardScaler().fit(train_inputv.numpy())
train_inputv = torch.as_tensor(scaler.transform(train_inputv), dtype=torch.float32)
test_inputv = torch.as_tensor(scaler.transform(test_inputv), dtype=torch.float32)
8.1. Defining main classes
[ ]:
class LitNN(pl.LightningModule):
def __init__(self, nfeatures, n_classification_labels, hsizes = [50, 10],
lr=0.01, weight_decay=0, batch_size=50, dropout=0.5):
super().__init__()
assert n_classification_labels != 1
self.lr = lr
self.batch_size = batch_size
self.weight_decay = weight_decay
self.n_classification_labels = n_classification_labels
input_size = nfeatures
modules_list = []
for hsize in hsizes:
modules_list.extend([
nn.Linear(input_size, hsize),
nn.ELU(),
nn.BatchNorm1d(hsize),
nn.Dropout(dropout),
])
input_size = hsize
out_size = n_classification_labels if n_classification_labels else 1
modules_list.append(self._initialize_layer(nn.Linear(input_size, out_size)))
self.modules_list = nn.ModuleList(modules_list)
def forward(self, x):
for module in self.modules_list:
x = module(x)
return x
def _initialize_layer(self, layer):
nn.init.constant_(layer.bias, 0)
gain = nn.init.calculate_gain('relu')
nn.init.xavier_normal_(layer.weight, gain=gain)
return layer
def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
return optimizer
def training_step(self, train_batch, batch_idx):
inputv, target = train_batch
output = self.forward(inputv)
if self.n_classification_labels:
loss = F.cross_entropy(output, target)
self.log('train_loss_ce', loss.item())
else:
loss = F.mse_loss(output, target)
self.log('train_loss_rmse', np.sqrt(loss.item()))
return loss
def test_validation_step(self, batch, batch_idx, name):
inputv, target = batch
output = self.forward(inputv)
if self.n_classification_labels:
loss_ce = F.cross_entropy(output, target).item()
loss_zo = (torch.argmax(output, 1) != target)+0.
loss_zo = loss_zo.mean().item()
self.log(f'{name}_loss_ce', loss_ce)
self.log(f'{name}_loss_zo', loss_zo)
else:
loss_mse = F.mse_loss(output, target).item()
loss_mae = F.l1_loss(output, target).item()
self.log(f'{name}_loss_rmse', np.sqrt(loss_mse))
self.log(f'{name}_loss_mae', loss_mae)
def validation_step(self, val_batch, batch_idx):
self.test_validation_step(val_batch, batch_idx, 'val')
def test_step(self, test_batch, batch_idx):
self.test_validation_step(test_batch, batch_idx, 'test')
[ ]:
class DataModule(pl.LightningDataModule):
def __init__(self, train_inputv, train_target,
test_inputv=None, test_target=None,
n_classification_labels=None, batch_size = 50,
num_workers=2, train_val_split_seed=0):
super().__init__()
assert not n_classification_labels is None
assert n_classification_labels != 1
self.batch_size = min(batch_size, len(train_target))
self.n_classification_labels = n_classification_labels
y_dtype = torch.long if n_classification_labels else torch.float32
self.train_inputv = torch.as_tensor(train_inputv, dtype=torch.float32)
self.train_target = torch.as_tensor(train_target, dtype=y_dtype)
self.test_inputv = test_inputv
self.test_target = test_target
if test_inputv is not None:
self.test_inputv = torch.as_tensor(test_inputv, dtype=torch.float32)
if test_target is not None:
self.test_target = torch.as_tensor(test_target, dtype=y_dtype)
self.num_workers = num_workers
self.train_val_split_seed = train_val_split_seed
def setup(self, stage):
if stage == 'fit':
full_dataset = TensorDataset(self.train_inputv, self.train_target)
generator = torch.Generator().manual_seed(self.train_val_split_seed)
partitions = [len(full_dataset) - len(full_dataset)//10, len(full_dataset) // 10]
full_dataset = torch.utils.data.random_split(full_dataset, partitions,
generator=generator)
self.train_dataset, self.val_dataset = full_dataset
if stage == 'test':
if self.test_inputv is not None:
self.test_dataset = TensorDataset(self.test_inputv, self.test_target)
def train_dataloader(self):
return DataLoader(self.train_dataset, batch_size=self.batch_size, drop_last=True,
shuffle=True, num_workers=self.num_workers)
def val_dataloader(self):
return DataLoader(self.val_dataset, batch_size=self.batch_size,
num_workers = self.num_workers)
def test_dataloader(self):
if self.test_inputv is None:
raise RuntimeError("Test data not set")
return DataLoader(self.test_dataset, batch_size=self.batch_size,
num_workers = self.num_workers)
8.2. Classification example
Let’s check the cross entropy error performance on a Extra Trees classifier as simple baseline for our models
[ ]:
# For comparison
clf = ExtraTreesClassifier(n_estimators=1000, random_state=0)
clf.fit(train_inputv, train_target_label)
(clf.predict(test_inputv) != test_target_label.numpy()).mean()
Now, we train a neural network with fixed hyperparameters
[ ]:
datamodule = DataModule(train_inputv, train_target_label,
test_inputv, test_target_label,
n_classification_labels=4)
smodel = LitNN(nfeatures=train_inputv.shape[1], n_classification_labels=4)
early_stop_callback = EarlyStopping(
monitor='val_loss_ce',
min_delta=0.00,
patience=30,
verbose=False,
mode='min'
)
# use MLFlow as logger if available, see other options at
# https://pytorch-lightning.readthedocs.io/en/latest/common/loggers.html
# you can start MLFLow server with:
# mlflow server --backend-store-uri=./ml-runs
try:
from pytorch_lightning.loggers import MLFlowLogger
logger = MLFlowLogger(
experiment_name="Default",
tracking_uri="file:./mlruns"
)
except ImportError:
# default: Tensorboard, you can start with:
# tensorboard --logdir lightning_logs
logger = True
trainer = pl.Trainer(
precision=32,
gpus=torch.cuda.device_count(),
tpu_cores=None,
logger=logger,
val_check_interval=0.25, # do validation check 4 times for each epoch
#auto_scale_batch_size=True,
#auto_lr_find=True,
callbacks=early_stop_callback,
max_epochs = 100,
)
# find "best" batch_size and lr
#trainer.tune(smodel, datamodule = datamodule)
# fit smodel
trainer.fit(smodel, datamodule = datamodule)
# test smodel
trainer.test(smodel, datamodule = datamodule)
# predict smodel
test_pred = np.vstack(deepcopy(trainer).predict(deepcopy(smodel), DataLoader(test_inputv)))
# check if smodel if is pickable
_ = pickle.dumps(smodel)
smodel.trainer.callback_metrics
8.3. Hyperparameters optimization using Optuna
Let’s optimize the hyperparameters using Optuna library
[ ]:
try:
study
except NameError:
study = optuna.create_study(direction="minimize", pruner=optuna.pruners.SuccessiveHalvingPruner())
try:
tempdir
except NameError:
tempdir = tempfile.TemporaryDirectory().name
os.mkdir(tempdir)
print(tempdir)
[ ]:
def objective(trial: optuna.trial.Trial) -> float:
hsize1 = trial.suggest_int("hsize1", 10, 1000)
hsize2 = trial.suggest_int("hsize2", 10, max(20, 1000 - hsize1))
batch_size = trial.suggest_int("batch_size", 50, 400)
lr = trial.suggest_float("lr", 1e-5, 0.1)
dropout = trial.suggest_float("dropout", 0.0, 0.5)
weight_decay = trial.suggest_float("weight_decay", 0.0, 0.01)
hyperparameters = dict(
hsize1=hsize1, hsize2=hsize2,
batch_size=batch_size, lr=lr,
dropout=dropout, weight_decay=weight_decay,
)
model = LitNN(hsizes = [hsize1, hsize2], lr=lr, batch_size=batch_size, dropout=dropout,
weight_decay = weight_decay, nfeatures=train_inputv.shape[1],
n_classification_labels=4)
datamodule = DataModule(train_inputv, train_target_label, batch_size=batch_size,
n_classification_labels=4)
early_stop_callback = EarlyStopping(
monitor='val_loss_ce',
min_delta=0.00,
patience=30,
verbose=False,
mode='min'
)
try:
from pytorch_lightning.loggers import MLFlowLogger
logger = MLFlowLogger(
experiment_name="Default",
tracking_uri="file:./mlruns"
)
except ImportError:
logger = True
trainer = pl.Trainer(
precision=32,
gpus=torch.cuda.device_count(),
logger=logger,
val_check_interval=0.25,
callbacks=[early_stop_callback,
PyTorchLightningPruningCallback(trial, monitor="val_loss_ce")
],
max_epochs = 100,
)
trainer.fit(model, datamodule = datamodule)
trainer.logger.log_hyperparams(hyperparameters)
with open(f"{os.path.join(tempdir, str(trial.number))}.pkl", "wb") as f:
pickle.dump(model, f)
return trainer.callback_metrics["val_loss_ce"].item()
study.optimize(objective, n_trials=10000, timeout=6)
print("Number of finished trials: {}".format(len(study.trials)))
print("Best trial:", study.best_params)
with open(f"{os.path.join(tempdir, str(study.best_trial.number))}.pkl", "rb") as f:
best_model = pickle.load(f)
Let’s compare the results with our previous model:
[ ]:
best_model.trainer.test(best_model, datamodule = datamodule)
best_model.trainer.callback_metrics
[ ]:
smodel.trainer.callback_metrics
Let’s summarize the results:
[ ]:
# save on study on disk
with open(f"{os.path.join(tempdir, 'study')}.pkl", "wb") as f:
pickle.dump(study, f)
print("Number of finished trials: {}".format(len(study.trials)))
print("Best trial:", study.best_params)
with open(f"{os.path.join(tempdir, str(study.best_trial.number))}.pkl", "rb") as f:
best_model = pickle.load(f)
[ ]:
trials_summary = sorted(study.trials, key=lambda x: np.inf if x.value is None else x.value)
trials_summary = [dict(trial_number=trial.number, loss=trial.value, **trial.params) for trial in trials_summary]
trials_summary = pd.DataFrame(trials_summary)
trials_summary.iloc[:200]
8.4. Regression
[ ]:
datamodule = DataModule(train_inputv, train_target,
test_inputv, test_target,
n_classification_labels=0)
smodel = LitNN(nfeatures=train_inputv.shape[1], n_classification_labels=0)
early_stop_callback = EarlyStopping(
monitor='val_loss_rmse',
min_delta=0.00,
patience=30,
verbose=False,
mode='min'
)
try:
from pytorch_lightning.loggers import MLFlowLogger
logger = MLFlowLogger(
experiment_name="Default",
tracking_uri="file:./mlruns"
)
except ImportError:
logger = True
trainer = pl.Trainer(
precision=32,
gpus=torch.cuda.device_count(),
tpu_cores=None,
logger=logger,
val_check_interval=0.25, # do validation check 4 times for each epoch
#auto_scale_batch_size=True,
#auto_lr_find=True,
callbacks=early_stop_callback,
max_epochs = 100,
)
trainer.fit(smodel, datamodule = datamodule)
trainer.test(smodel, datamodule = datamodule)
test_pred = np.vstack(deepcopy(trainer).predict(deepcopy(smodel), DataLoader(test_inputv)))
_ = pickle.dumps(smodel)
smodel.trainer.callback_metrics