8. PyTorch Lightning

[ ]:
#!pip install pytorch_lightning optuna mlflow
[ ]:
import numpy as np
import scipy.stats as stats
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn import functional as F
from torch.utils.data import random_split, TensorDataset, DataLoader
import pickle
from copy import deepcopy
import pytorch_lightning as pl
from pytorch_lightning.callbacks.early_stopping import EarlyStopping
import tempfile
import os
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.preprocessing import StandardScaler
import optuna
from optuna.integration import PyTorchLightningPruningCallback

%matplotlib inline

Let’s start by generating some random data

[ ]:
torch.manual_seed(1)
beta = torch.rand(10, 1)
train_inputv = torch.randn(700, 10)
train_target = torch.mm(train_inputv, beta)
train_target = train_target

test_inputv = torch.randn(200, 10)
test_target = torch.mm(test_inputv, beta)
test_target = test_target

cutpoints = [torch.quantile(train_target, x).item() for x in [.1, .7, .9]]

train_target_label = sum([0+(train_target > cutpoint) for cutpoint in cutpoints],0)
train_target_label = train_target_label.flatten()

test_target_label = sum([0+(test_target > cutpoint) for cutpoint in cutpoints],0)
test_target_label = test_target_label.flatten()

Let’s scale our data to help the neural network training process.

[ ]:
scaler = StandardScaler().fit(train_inputv.numpy())
train_inputv = torch.as_tensor(scaler.transform(train_inputv), dtype=torch.float32)
test_inputv = torch.as_tensor(scaler.transform(test_inputv), dtype=torch.float32)

8.1. Defining main classes

[ ]:
class LitNN(pl.LightningModule):
    def __init__(self, nfeatures, n_classification_labels, hsizes = [50, 10],
                 lr=0.01, weight_decay=0, batch_size=50, dropout=0.5):
        super().__init__()

        assert n_classification_labels != 1
        self.lr = lr
        self.batch_size = batch_size
        self.weight_decay = weight_decay
        self.n_classification_labels = n_classification_labels

        input_size = nfeatures

        modules_list = []
        for hsize in hsizes:
            modules_list.extend([
                nn.Linear(input_size, hsize),
                nn.ELU(),
                nn.BatchNorm1d(hsize),
                nn.Dropout(dropout),
            ])
            input_size = hsize

        out_size = n_classification_labels if n_classification_labels else 1
        modules_list.append(self._initialize_layer(nn.Linear(input_size, out_size)))
        self.modules_list = nn.ModuleList(modules_list)

    def forward(self, x):
        for module in self.modules_list:
            x = module(x)
        return x

    def _initialize_layer(self, layer):
        nn.init.constant_(layer.bias, 0)
        gain = nn.init.calculate_gain('relu')
        nn.init.xavier_normal_(layer.weight, gain=gain)
        return layer

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=self.weight_decay)
        return optimizer

    def training_step(self, train_batch, batch_idx):
        inputv, target = train_batch
        output = self.forward(inputv)
        if self.n_classification_labels:
            loss = F.cross_entropy(output, target)
            self.log('train_loss_ce', loss.item())
        else:
            loss = F.mse_loss(output, target)
            self.log('train_loss_rmse', np.sqrt(loss.item()))

        return loss

    def test_validation_step(self, batch, batch_idx, name):
        inputv, target = batch
        output = self.forward(inputv)
        if self.n_classification_labels:
            loss_ce = F.cross_entropy(output, target).item()
            loss_zo = (torch.argmax(output, 1) != target)+0.
            loss_zo = loss_zo.mean().item()
            self.log(f'{name}_loss_ce', loss_ce)
            self.log(f'{name}_loss_zo', loss_zo)
        else:
            loss_mse = F.mse_loss(output, target).item()
            loss_mae = F.l1_loss(output, target).item()
            self.log(f'{name}_loss_rmse', np.sqrt(loss_mse))
            self.log(f'{name}_loss_mae', loss_mae)

    def validation_step(self, val_batch, batch_idx):
        self.test_validation_step(val_batch, batch_idx, 'val')

    def test_step(self, test_batch, batch_idx):
        self.test_validation_step(test_batch, batch_idx, 'test')
[ ]:
class DataModule(pl.LightningDataModule):
    def __init__(self, train_inputv, train_target,
                 test_inputv=None, test_target=None,
                 n_classification_labels=None, batch_size = 50,
                 num_workers=2, train_val_split_seed=0):
        super().__init__()

        assert not n_classification_labels is None
        assert n_classification_labels != 1

        self.batch_size = min(batch_size, len(train_target))
        self.n_classification_labels = n_classification_labels

        y_dtype = torch.long if n_classification_labels else torch.float32

        self.train_inputv = torch.as_tensor(train_inputv, dtype=torch.float32)
        self.train_target = torch.as_tensor(train_target, dtype=y_dtype)

        self.test_inputv = test_inputv
        self.test_target = test_target
        if test_inputv is not None:
            self.test_inputv = torch.as_tensor(test_inputv, dtype=torch.float32)
        if test_target is not None:
            self.test_target = torch.as_tensor(test_target, dtype=y_dtype)

        self.num_workers = num_workers
        self.train_val_split_seed = train_val_split_seed

    def setup(self, stage):
        if stage == 'fit':
            full_dataset = TensorDataset(self.train_inputv, self.train_target)

            generator = torch.Generator().manual_seed(self.train_val_split_seed)
            partitions = [len(full_dataset) - len(full_dataset)//10, len(full_dataset) // 10]
            full_dataset = torch.utils.data.random_split(full_dataset, partitions,
                                                         generator=generator)
            self.train_dataset, self.val_dataset = full_dataset

        if stage == 'test':
            if self.test_inputv is not None:
                self.test_dataset = TensorDataset(self.test_inputv, self.test_target)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, drop_last=True,
                          shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size,
                          num_workers = self.num_workers)

    def test_dataloader(self):
        if self.test_inputv is None:
            raise RuntimeError("Test data not set")
        return DataLoader(self.test_dataset, batch_size=self.batch_size,
                          num_workers = self.num_workers)

8.2. Classification example

Let’s check the cross entropy error performance on a Extra Trees classifier as simple baseline for our models

[ ]:
# For comparison
clf = ExtraTreesClassifier(n_estimators=1000, random_state=0)
clf.fit(train_inputv, train_target_label)
(clf.predict(test_inputv) != test_target_label.numpy()).mean()

Now, we train a neural network with fixed hyperparameters

[ ]:
datamodule = DataModule(train_inputv, train_target_label,
                        test_inputv, test_target_label,
                        n_classification_labels=4)
smodel = LitNN(nfeatures=train_inputv.shape[1], n_classification_labels=4)

early_stop_callback = EarlyStopping(
   monitor='val_loss_ce',
   min_delta=0.00,
   patience=30,
   verbose=False,
   mode='min'
)

# use MLFlow as logger if available, see other options at
# https://pytorch-lightning.readthedocs.io/en/latest/common/loggers.html
# you can start MLFLow server with:
# mlflow server --backend-store-uri=./ml-runs
try:
    from pytorch_lightning.loggers import MLFlowLogger
    logger = MLFlowLogger(
        experiment_name="Default",
        tracking_uri="file:./mlruns"
    )
except ImportError:
    # default: Tensorboard, you can start with:
    # tensorboard --logdir lightning_logs
    logger = True

trainer = pl.Trainer(
                     precision=32,
                     gpus=torch.cuda.device_count(),
                     tpu_cores=None,
                     logger=logger,
                     val_check_interval=0.25, # do validation check 4 times for each epoch
                     #auto_scale_batch_size=True,
                     #auto_lr_find=True,
                     callbacks=early_stop_callback,
                     max_epochs = 100,
                    )

# find "best" batch_size and lr
#trainer.tune(smodel, datamodule = datamodule)

# fit smodel
trainer.fit(smodel, datamodule = datamodule)

# test smodel
trainer.test(smodel, datamodule = datamodule)

# predict smodel
test_pred = np.vstack(deepcopy(trainer).predict(deepcopy(smodel), DataLoader(test_inputv)))

# check if smodel if is pickable
_ = pickle.dumps(smodel)

smodel.trainer.callback_metrics

8.3. Hyperparameters optimization using Optuna

Let’s optimize the hyperparameters using Optuna library

[ ]:
try:
    study
except NameError:
    study = optuna.create_study(direction="minimize", pruner=optuna.pruners.SuccessiveHalvingPruner())
try:
    tempdir
except NameError:
    tempdir = tempfile.TemporaryDirectory().name
    os.mkdir(tempdir)
print(tempdir)
[ ]:
def objective(trial: optuna.trial.Trial) -> float:
    hsize1 = trial.suggest_int("hsize1", 10, 1000)
    hsize2 = trial.suggest_int("hsize2", 10, max(20, 1000 - hsize1))
    batch_size = trial.suggest_int("batch_size", 50, 400)
    lr = trial.suggest_float("lr", 1e-5, 0.1)
    dropout = trial.suggest_float("dropout", 0.0, 0.5)
    weight_decay = trial.suggest_float("weight_decay", 0.0, 0.01)

    hyperparameters = dict(
                           hsize1=hsize1, hsize2=hsize2,
                           batch_size=batch_size, lr=lr,
                           dropout=dropout, weight_decay=weight_decay,
                          )

    model = LitNN(hsizes = [hsize1, hsize2], lr=lr, batch_size=batch_size, dropout=dropout,
                  weight_decay = weight_decay, nfeatures=train_inputv.shape[1],
                  n_classification_labels=4)
    datamodule = DataModule(train_inputv, train_target_label, batch_size=batch_size,
                            n_classification_labels=4)
    early_stop_callback = EarlyStopping(
       monitor='val_loss_ce',
       min_delta=0.00,
       patience=30,
       verbose=False,
       mode='min'
    )
    try:
        from pytorch_lightning.loggers import MLFlowLogger
        logger = MLFlowLogger(
            experiment_name="Default",
            tracking_uri="file:./mlruns"
        )
    except ImportError:
        logger = True
    trainer = pl.Trainer(
                         precision=32,
                         gpus=torch.cuda.device_count(),
                         logger=logger,
                         val_check_interval=0.25,
                         callbacks=[early_stop_callback,
                                    PyTorchLightningPruningCallback(trial, monitor="val_loss_ce")
                                   ],
                         max_epochs = 100,
                        )
    trainer.fit(model, datamodule = datamodule)

    trainer.logger.log_hyperparams(hyperparameters)

    with open(f"{os.path.join(tempdir, str(trial.number))}.pkl", "wb") as f:
        pickle.dump(model, f)

    return trainer.callback_metrics["val_loss_ce"].item()

study.optimize(objective, n_trials=10000, timeout=6)

print("Number of finished trials: {}".format(len(study.trials)))
print("Best trial:", study.best_params)

with open(f"{os.path.join(tempdir, str(study.best_trial.number))}.pkl", "rb") as f:
    best_model = pickle.load(f)

Let’s compare the results with our previous model:

[ ]:
best_model.trainer.test(best_model, datamodule = datamodule)
best_model.trainer.callback_metrics
[ ]:
smodel.trainer.callback_metrics

Let’s summarize the results:

[ ]:
# save on study on disk
with open(f"{os.path.join(tempdir, 'study')}.pkl", "wb") as f:
    pickle.dump(study, f)

print("Number of finished trials: {}".format(len(study.trials)))
print("Best trial:", study.best_params)

with open(f"{os.path.join(tempdir, str(study.best_trial.number))}.pkl", "rb") as f:
    best_model = pickle.load(f)
[ ]:
trials_summary = sorted(study.trials, key=lambda x: np.inf if x.value is None else x.value)
trials_summary = [dict(trial_number=trial.number, loss=trial.value, **trial.params) for trial in trials_summary]
trials_summary = pd.DataFrame(trials_summary)
trials_summary.iloc[:200]

8.4. Regression

[ ]:
datamodule = DataModule(train_inputv, train_target,
                        test_inputv, test_target,
                        n_classification_labels=0)
smodel = LitNN(nfeatures=train_inputv.shape[1], n_classification_labels=0)

early_stop_callback = EarlyStopping(
   monitor='val_loss_rmse',
   min_delta=0.00,
   patience=30,
   verbose=False,
   mode='min'
)

try:
    from pytorch_lightning.loggers import MLFlowLogger
    logger = MLFlowLogger(
        experiment_name="Default",
        tracking_uri="file:./mlruns"
    )
except ImportError:
    logger = True

trainer = pl.Trainer(
                     precision=32,
                     gpus=torch.cuda.device_count(),
                     tpu_cores=None,
                     logger=logger,
                     val_check_interval=0.25, # do validation check 4 times for each epoch
                     #auto_scale_batch_size=True,
                     #auto_lr_find=True,
                     callbacks=early_stop_callback,
                     max_epochs = 100,
                    )

trainer.fit(smodel, datamodule = datamodule)
trainer.test(smodel, datamodule = datamodule)
test_pred = np.vstack(deepcopy(trainer).predict(deepcopy(smodel), DataLoader(test_inputv)))
_ = pickle.dumps(smodel)
smodel.trainer.callback_metrics