Hyperparameter Search with Data-Parallel Training#

Horovod + Balsam (ALCF’s Theta)#

Warning

This tutorial shows how to run hyperparameter search with data-parallel training using Horovod and Balsam on the Theta supercomputer at the ALCF. This tutorial follows one of the example provided in the Horovod documentation.

Let’s start by creating a new DeepHyper project workspace. This is a directory where you will create search problem instances that are automatically installed and importable across your Python environment.

bash#
deephyper start-project dh_project

A new dh_project directory is created, containing the following files:

dh_project/
    dh_project/
        __init__.py
    setup.py

We can now define DeepHyper search problems inside this directory, using deephyper new-problem hps {name} for HPS. Let’s set up an HPS problem called mnist as follows:

bash#
cd dh_project/dh_project/
deephyper new-problem hps mnist

Start by editing the load_data.py file. We will download the MNIST dataset and create two functions. One will return the training and test datasets which are useful to estsimate the generalization performance of your model. The other will return a random-split of the full training data that we will call training and validation datasets. The validation dataset is used for the HPS optimization. The script looks like the following:

mnist/load_data.py#
 1import os
 2
 3import numpy as np
 4import tensorflow as tf
 5from sklearn.model_selection import train_test_split
 6
 7HERE = os.path.dirname(os.path.abspath(__file__))
 8
 9
10def load_data_train_test():
11    """Loads the MNIST dataset Training and Test sets with normalized pixels.
12
13    Returns:
14        tuple: (train_X, train_y), (test_X, test_y)
15    """
16    data_path = os.path.join(HERE, "mnist.npz")
17
18    (train_X, train_y), (test_X, test_y) = tf.keras.datasets.mnist.load_data(
19        path=data_path
20    )
21
22    train_X = train_X / 255
23    test_X = test_X / 255
24
25    return (train_X, train_y), (test_X, test_y)
26
27
28def load_data_train_valid(prop=0.33, verbose=0):
29    """Loads the MNIST dataset Training and Validation sets with normalized pixels.
30
31    Returns:
32        tuple: (train_X, train_y), (valid_X, valid_y)
33    """
34
35    (X, y), _ = load_data_train_test()
36
37    train_X, valid_X, train_y, valid_y = train_test_split(
38        X, y, test_size=prop, random_state=42
39    )
40
41    if verbose:
42        print(f"train_X shape: {np.shape(train_X)}")
43        print(f"train_y shape: {np.shape(train_y)}")
44        print(f"valid_X shape: {np.shape(valid_X)}")
45        print(f"valid_y shape: {np.shape(valid_y)}")
46
47    return (train_X, train_y), (valid_X, valid_y)
48
49
50if __name__ == "__main__":
51    load_data_train_valid(verbose=1)

Then, check that the data are loaded properly by executing the script:

bash#
python load_data.py

which should return something like:

bash#
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
11493376/11490434 [==============================] - 12s 1us/step
train_X shape: (40200, 28, 28)
train_y shape: (40200,)
valid_X shape: (19800, 28, 28)
valid_y shape: (19800,)

After the data comes the execution of the model for which we want to optimize the hyperparameters. Let’s develop a function named run in the model_run.py script:

mnist/model_run.py#
  1import os
  2import tensorflow as tf
  3import horovod.tensorflow.keras as hvd
  4from dh_project.mnist.load_data import load_data_train_valid
  5
  6HERE = os.path.dirname(os.path.abspath(__file__))
  7
  8
  9def run(config: dict) -> float:
 10    verbose = 1
 11
 12    # Horovod: initialize Horovod.
 13    hvd.init()
 14
 15    # Horovod: write logs on worker 0.
 16    verbose = verbose if hvd.rank() == 0 else 0
 17
 18    # Horovod: pin GPU to be used to process local rank (one GPU per process)
 19    gpus = tf.config.experimental.list_physical_devices("GPU")
 20    for gpu in gpus:
 21        tf.config.experimental.set_memory_growth(gpu, True)
 22    if gpus:
 23        tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], "GPU")
 24
 25    (train_X, train_y), (valid_X, valid_y) = load_data_train_valid()
 26
 27    def to_dataset(X, y):
 28        dataset = tf.data.Dataset.from_tensor_slices(
 29            (
 30                tf.cast(X[..., tf.newaxis], tf.float32),
 31                tf.cast(y, tf.int64),
 32            )
 33        )
 34        return dataset
 35
 36    train_dataset = to_dataset(train_X, train_y).repeat().shuffle(10000).batch(128)
 37    valid_dataset = to_dataset(valid_X, valid_y).batch(128).repeat()
 38    valid_steps_per_epoch = len(valid_X) // 128
 39
 40    mnist_model = tf.keras.Sequential(
 41        [
 42            tf.keras.layers.Conv2D(32, [3, 3], activation="relu"),
 43            tf.keras.layers.Conv2D(64, [3, 3], activation="relu"),
 44            tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
 45            tf.keras.layers.Dropout(0.25),
 46            tf.keras.layers.Flatten(),
 47            tf.keras.layers.Dense(128, activation="relu"),
 48            tf.keras.layers.Dropout(0.5),
 49            tf.keras.layers.Dense(10, activation="softmax"),
 50        ]
 51    )
 52
 53    # Horovod: adjust learning rate based on number of GPUs.
 54    scaled_lr = config["lr"] * hvd.size()
 55    opt = tf.optimizers.Adam(scaled_lr)
 56
 57    # Horovod: add Horovod DistributedOptimizer.
 58    opt = hvd.DistributedOptimizer(
 59        opt, backward_passes_per_step=1, average_aggregated_gradients=True
 60    )
 61
 62    # Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
 63    # uses hvd.DistributedOptimizer() to compute gradients.
 64    mnist_model.compile(
 65        loss=tf.losses.SparseCategoricalCrossentropy(),
 66        optimizer=opt,
 67        metrics=["accuracy"],
 68        experimental_run_tf_function=False,
 69    )
 70
 71    callbacks = [
 72        # Horovod: broadcast initial variable states from rank 0 to all other processes.
 73        # This is necessary to ensure consistent initialization of all workers when
 74        # training is started with random weights or restored from a checkpoint.
 75        hvd.callbacks.BroadcastGlobalVariablesCallback(0),
 76        # Horovod: average metrics among workers at the end of every epoch.
 77        #
 78        # Note: This callback must be in the list before the ReduceLROnPlateau,
 79        # TensorBoard or other metrics-based callbacks.
 80        hvd.callbacks.MetricAverageCallback(),
 81        # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
 82        # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
 83        # the first three epochs. See https://arxiv.org/abs/1706.02677 for details.
 84        hvd.callbacks.LearningRateWarmupCallback(
 85            initial_lr=scaled_lr, warmup_epochs=3, verbose=verbose
 86        ),
 87    ]
 88
 89    # Train the model.
 90    # Horovod: adjust number of steps based on number of GPUs.
 91    history = mnist_model.fit(
 92        train_dataset,
 93        steps_per_epoch=500 // hvd.size(),
 94        callbacks=callbacks,
 95        epochs=3,
 96        validation_data=valid_dataset,
 97        validation_steps=valid_steps_per_epoch,
 98        verbose=verbose,
 99    )
100
101    return history.history["val_accuracy"][-1]

Then, use a script named test_horovod.py to test the behaviour of Horovod:

mnist/test_horovod.py#
 1import horovod.tensorflow.keras as hvd
 2
 3from dh_project.mnist.model_run import run
 4
 5def test():
 6    hvd.init()
 7
 8    config = {
 9        "lr": 1e-3
10    }
11
12    score = run(config)
13    if hvd.rank() == 0:
14        print(f"Score: {score:.3f}")
15
16if __name__ == "__main__":
17    test()
18

Execute the test script with a MPI command such as horovodrun, mpirun or aprun. If you choose aprun remember to run it from a Theta Mom node:

bash#
aprun -n 2 -N 2 python test_horovod.py

Note

Here we assume that you requested a job allocation with qsub in interactive mode for example with qsub -n 2 -q debug-cache-quad -A $PROJECTNAME -t 30 -I.

If you want to test with MPI:

mpirun -np 2 python test_horovod.py

Now that we have a function loading the data and a model learning from these data we can create the hyperparameter search problem to define the hyperparameters we want to optimize. Create a problem.py script:

mnist/problem.py#
1from deephyper.problem import HpProblem
2
3Problem = HpProblem()
4
5Problem.add_hyperparameter((1e-4, 1e-1, "log-uniform"), "lr")
6
7
8if __name__ == "__main__":
9    print(Problem)

Test the problem script in a standalone fashion to make sure there is no error:

python problem.py

The next step, is to link our current work to a Balsam database and submit a job to be executed on Theta. Create a Balsam database named expdb:

balsam init expdb

Then start and connect to the expdb database:

source balsamactivate expdb

Finally, we can submit a search job to Balsam and the COBALT scheduler:

deephyper balsam-submit hps ambs -w mnist_hvd --problem dh_project.mnist.problem.Problem --run dh_project.mnist.model_run.run -t 30 -q debug-cache-quad -n 4 -A datascience -j mpi --num-nodes-per-eval 2 --num-ranks-per-node 2 --num-threads-per-rank 32