Hyperparameter Search with Data-Parallel Training

Horovod + Balsam (ALCF’s Theta)

Warning

This tutorial shows how to run hyperparameter search with data-parallel training using Horovod and Balsam on the Theta supercomputer at the ALCF. This tutorial follows one of the example provided in the Horovod documentation.

Let’s start by creating a new DeepHyper project workspace. This is a directory where you will create search problem instances that are automatically installed and importable across your Python environment.

bash
deephyper start-project dh_project

A new dh_project directory is created, containing the following files:

dh_project/
    dh_project/
        __init__.py
    setup.py

We can now define DeepHyper search problems inside this directory, using deephyper new-problem hps {name} for HPS. Let’s set up an HPS problem called mnist as follows:

bash
cd dh_project/dh_project/
deephyper new-problem hps mnist

Start by editing the load_data.py file. We will download the MNIST dataset and create two functions. One will return the training and test datasets which are useful to estsimate the generalization performance of your model. The other will return a random-split of the full training data that we will call training and validation datasets. The validation dataset is used for the HPS optimization. The script looks like the following:

mnist/load_data.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os

import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split

HERE = os.path.dirname(os.path.abspath(__file__))


def load_data_train_test():
    """Loads the MNIST dataset Training and Test sets with normalized pixels.

    Returns:
        tuple: (train_X, train_y), (test_X, test_y)
    """
    data_path = os.path.join(HERE, "mnist.npz")

    (train_X, train_y), (test_X, test_y) = tf.keras.datasets.mnist.load_data(
        path=data_path
    )

    train_X = train_X / 255
    test_X = test_X / 255

    return (train_X, train_y), (test_X, test_y)


def load_data_train_valid(prop=0.33, verbose=0):
    """Loads the MNIST dataset Training and Validation sets with normalized pixels.

    Returns:
        tuple: (train_X, train_y), (valid_X, valid_y)
    """

    (X, y), _ = load_data_train_test()

    train_X, valid_X, train_y, valid_y = train_test_split(
        X, y, test_size=prop, random_state=42
    )

    if verbose:
        print(f"train_X shape: {np.shape(train_X)}")
        print(f"train_y shape: {np.shape(train_y)}")
        print(f"valid_X shape: {np.shape(valid_X)}")
        print(f"valid_y shape: {np.shape(valid_y)}")

    return (train_X, train_y), (valid_X, valid_y)


if __name__ == "__main__":
    load_data_train_valid(verbose=1)

Then, check that the data are loaded properly by executing the script:

bash
python load_data.py

which should return something like:

bash
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 12s 1us/step
train_X shape: (40200, 28, 28)
train_y shape: (40200,)
valid_X shape: (19800, 28, 28)
valid_y shape: (19800,)

After the data comes the execution of the model for which we want to optimize the hyperparameters. Let’s develop a function named run in the model_run.py script:

mnist/model_run.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
import os
import tensorflow as tf
import horovod.tensorflow.keras as hvd
from dh_project.mnist.load_data import load_data_train_valid

HERE = os.path.dirname(os.path.abspath(__file__))


def run(config: dict) -> float:
    verbose = 1

    # Horovod: initialize Horovod.
    hvd.init()

    # Horovod: write logs on worker 0.
    verbose = verbose if hvd.rank() == 0 else 0

    # Horovod: pin GPU to be used to process local rank (one GPU per process)
    gpus = tf.config.experimental.list_physical_devices("GPU")
    for gpu in gpus:
        tf.config.experimental.set_memory_growth(gpu, True)
    if gpus:
        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU")

    (train_X, train_y), (valid_X, valid_y) = load_data_train_valid()

    def to_dataset(X, y):
        dataset = tf.data.Dataset.from_tensor_slices(
            (
                tf.cast(X[..., tf.newaxis], tf.float32),
                tf.cast(y, tf.int64),
            )
        )
        return dataset

    train_dataset = to_dataset(train_X, train_y).repeat().shuffle(10000).batch(128)
    valid_dataset = to_dataset(valid_X, valid_y).batch(128).repeat()
    valid_steps_per_epoch = len(valid_X) // 128

    mnist_model = tf.keras.Sequential(
        [
            tf.keras.layers.Conv2D(32, [3, 3], activation="relu"),
            tf.keras.layers.Conv2D(64, [3, 3], activation="relu"),
            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
            tf.keras.layers.Dropout(0.25),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dropout(0.5),
            tf.keras.layers.Dense(10, activation="softmax"),
        ]
    )

    # Horovod: adjust learning rate based on number of GPUs.
    scaled_lr = config["lr"] * hvd.size()
    opt = tf.optimizers.Adam(scaled_lr)

    # Horovod: add Horovod DistributedOptimizer.
    opt = hvd.DistributedOptimizer(
        opt, backward_passes_per_step=1, average_aggregated_gradients=True
    )

    # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
    # uses hvd.DistributedOptimizer() to compute gradients.
    mnist_model.compile(
        loss=tf.losses.SparseCategoricalCrossentropy(),
        optimizer=opt,
        metrics=["accuracy"],
        experimental_run_tf_function=False,
    )

    callbacks = [
        # Horovod: broadcast initial variable states from rank 0 to all other processes.
        # This is necessary to ensure consistent initialization of all workers when
        # training is started with random weights or restored from a checkpoint.
        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
        # Horovod: average metrics among workers at the end of every epoch.
        #
        # Note: This callback must be in the list before the ReduceLROnPlateau,
        # TensorBoard or other metrics-based callbacks.
        hvd.callbacks.MetricAverageCallback(),
        # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
        # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
        # the first three epochs. See https://arxiv.org/abs/1706.02677 for details.
        hvd.callbacks.LearningRateWarmupCallback(
            initial_lr=scaled_lr, warmup_epochs=3, verbose=verbose
        ),
    ]

    # Train the model.
    # Horovod: adjust number of steps based on number of GPUs.
    history = mnist_model.fit(
        train_dataset,
        steps_per_epoch=500 // hvd.size(),
        callbacks=callbacks,
        epochs=3,
        validation_data=valid_dataset,
        validation_steps=valid_steps_per_epoch,
        verbose=verbose,
    )

    return history.history["val_accuracy"][-1]

Then, use a script named test_horovod.py to test the behaviour of Horovod:

mnist/test_horovod.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
import horovod.tensorflow.keras as hvd

from dh_project.mnist.model_run import run

def test():
    hvd.init()

    config = {
        "lr": 1e-3
    }

    score = run(config)
    if hvd.rank() == 0:
        print(f"Score: {score:.3f}")

if __name__ == "__main__":
    test()

Execute the test script with a MPI command such as horovodrun, mpirun or aprun. If you choose aprun remember to run it from a Theta Mom node:

bash
aprun -n 2 -N 2 python test_horovod.py

Note

Here we assume that you requested a job allocation with qsub in interactive mode for example with qsub -n 2 -q debug-cache-quad -A $PROJECTNAME -t 30 -I.

If you want to test with MPI:

mpirun -np 2 python test_horovod.py

Now that we have a function loading the data and a model learning from these data we can create the hyperparameter search problem to define the hyperparameters we want to optimize. Create a problem.py script:

mnist/problem.py
1
2
3
4
5
6
7
8
9
from deephyper.problem import HpProblem

Problem = HpProblem()

Problem.add_hyperparameter((1e-4, 1e-1, "log-uniform"), "lr")


if __name__ == "__main__":
    print(Problem)

Test the problem script in a standalone fashion to make sure there is no error:

python problem.py

The next step, is to link our current work to a Balsam database and submit a job to be executed on Theta. Create a Balsam database named expdb:

balsam init expdb

Then start and connect to the expdb database:

source balsamactivate expdb

Finally, we can submit a search job to Balsam and the COBALT scheduler:

deephyper balsam-submit hps ambs -w mnist_hvd --problem dh_project.mnist.problem.Problem --run dh_project.mnist.model_run.run -t 30 -q debug-cache-quad -n 4 -A datascience -j mpi --num-nodes-per-eval 2 --num-ranks-per-node 2 --num-threads-per-rank 32