1. Tuning of MPI Programs#

Author(s): Denis Boyda.

This tutorial demonstrates the DeepHyper ability to optimize hyperparameters of MPI programs. As a demonstration example, we write simple MPI c++ code and compile it to obtain a binary. When executed the binary initializes MPI, prints some information, and computes a polynomial with parameter obtained through the command line. In DeepHyper we will optimize this binary as black-box function which obtains some parameters through the command line (or input file) and produces a result of executioni.

This demonstration emulates a situation when a user has a binary that does some computations, and hyperparameters of these computations have to be optimized. In general, one can split binary execution into three stages. In the first, initialization stage, all necessary input files are prepared and a logging directory is created. In the second stage, an MPI program is submitted for execution. And, in the third, finalization stage, output files or artifacts are saved and analyzed, target value obtained from them is returned to DeepHyper.

The tutorial requires the installation of parse:

$ pip install parse

1.1. Example MPI binary compiled from of c++ code#

In the c++ source code posted below, we initialize MPI and print information from every worker. The master worker with rank 0, additionally evaluates the function f using the input parameter obtained through command-line argument. Function f computes polynomial f(x) as function x. It is a target function in this demonstration, and DeepHyper will optimize it. Solution x = - 2 of this optimization is known analytically for hyperparameters optimization.

#include <mpi.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

float f(float x) {
	// muximum value is 2 at x = -2
	return -1. * x * x + -4. * x -2.;
}

int main(int argc, char** argv) {
    // Initialize the MPI environment
    MPI_Init(NULL, NULL);

    // Get the number of processes
    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD, &world_size);

    // Get the rank of the process
    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

    // Get the name of the processor
    char processor_name[MPI_MAX_PROCESSOR_NAME];
    int name_len;
    MPI_Get_processor_name(processor_name, &name_len);

    // Print off a hello world message
    printf("Hello world from processor %s, rank %d out of %d processors\n",
           processor_name, world_rank, world_size);

    float x = atof(argv[1]);
    if (world_rank == 0)
	printf("f(x) = %f\n", f(x));

    // Finalize the MPI environment.
    MPI_Finalize();
}

This code can be compiled as mpicc mpi_f.c -o f_exe. Execution of f_exe with two MPI workers gives

$ mpirun -n 2 ./f_exe 2
Hello world from processor thetagpu05, rank 0 out of 2 processors
f(x) = -14.000000
Hello world from processor thetagpu05, rank 1 out of 2 processors

1.2. Python wrapper for MPI binary#

As DeepHyper accepts a Python function as a target function for optimization one needs to prepare a python wrapper for MPI binary. Python used as a scripting language opens large possibilities for such wrappers. In the example below run_mpi() function is such a wrapper.

import os
import parse
import subprocess


def _parse_results(stdout):
    res = parse.search('f(x) = {:f}', stdout)
    return res[0]


def run_mpi(config):
    exe = os.path.abspath(f"./f_exe")

    result = subprocess.run(
        [
            'mpirun',
            ' -x LD_LIBRARY_PATH -x PATH -x PYTHONPATH'
            ' --host localhost:1 ',
            exe,
            f' {config["arg"]}'
        ],
        stdout=subprocess.PIPE,
    )

    res = _parse_results(result.stdout.decode("utf-8"))

    return res


def test_run_mpi():
    config = {"nodes_per_task": 2, "arg": -2}
    result = run_mpi(config)
    print("result: ", result)


if __name__ == "__main__":
    test_run_mpi()

In run_mpi() we obtain the absolute path of the MPI binary, execute it with subprocess module, and parse the captured output. When submitting to execution in supbrosess.run we use mpirun as an executor, specify necessary environment variables and hosts with slots, and add binary exe with an argument obtained through config dictionary. The result of exe binary execution obtained through parsing is returned from run_mpi().

1.3. More powerfull wrapper with initialization and finalization#

In spite of being able to run simple MPI binary this wrapper has several limitations. The simple MPI binary we compiled above obtains hyperparameters through a command-line interface but in general, binary may require input files. Therefore it is important to do some initialization before submitting binary for execution. Another drawback is the absence of finalization. In this demonstration, we create a context manager Experiment for these purposes. In the initialization phase, Experiment creates a directory for the run, and changed the correct path to it, under finalization if changes path back. In the created folder we make to two files for stdout and stderr produced by binary exe. Running command is also saved.

Another important change we made is an additional argument dequed of the run_mpi function. This argument should contain a list of available hosts with slots for MPI execution. It allows using different evaluators with an arbitrary number of workers that manage an available resources.

import os
import pathlib
import subprocess

from search0 import _parse_results


class Experiment:
    """A simple context manager to do some initialization and finalization before run."""

    def __init__(self, config):
        self.config = config
        self.prev_dir = None
        self.exp_dir = None

    # Any initialization, data preparation, configuration can be done on enter
    def __enter__(self):
        self.prev_dir = os.getcwd()

        self.exp_dir = os.path.abspath(f"exp-{self.config['job_id']}")
        pathlib.Path(self.exp_dir).mkdir(parents=False, exist_ok=False)
        os.chdir(self.exp_dir)

        return self.exp_dir

    def __exit__(self, type, value, traceback):
        os.chdir(self.prev_dir)


def run_mpi(config, dequed=['localhost:1']):
    exe = os.path.dirname(os.path.abspath(__file__)) + "/f_exe"
    nodes = dequed
    runner = [
        'mpirun',
        ' -x LD_LIBRARY_PATH -x PATH -x PYTHONPATH'
        ' --host ' + ','.join(nodes) + ' ',
        exe,
        f' {config["arg"]}'
    ]

    # context manager creates a dirrectory for current run
    # and moves current execution dirrectory there
    # when exit context manager changes current dirrectory bask
    with Experiment(config) as exp_dir:
        with open('stdout.txt', 'wb') as out, open('stderr.txt', 'wb') as err:
            subprocess.run(
                runner,
                stdout=out,
                stderr=err,
            )
        with open('runcommand.txt', 'w') as f:
            f.write(''.join(runner))

        # We can parse any results here
        with open(os.path.join(exp_dir, 'stdout.txt'), 'r') as f:
            res = _parse_results(f.read())

    return res


def test_run_mpi():
    config = {"job_id": 0, "arg": 2}
    result = run_mpi(config, dequed=['localhost:1'])
    print("result: ", result)


if __name__ == "__main__":
    test_run_mpi()

When executed this script creates a dirrectory for evaluation, calls binary and saves output with running command.

$ python search1.py
result:  -14.0
$ ls
exp-0  f_exe  mpi_f.c  mpi_search.py  __pycache__  search0.py  search1.py  search2.py  search3.py  search.py  tutorial_mpi.ipynb
$ ls exp-0/
runcommand.txt      stderr.txt  stdout.txt

1.4. Optimization with a single node#

Once the infrastructure for MPI binary was created one can create a DeepHyper problem and run optimization.

from functools import partial

from deephyper.problem import HpProblem
from deephyper.search.hps import CBO
from deephyper.evaluator import SubprocessEvaluator

from search1 import run_mpi


def execute_deephyper():
    import logging
    logging.basicConfig(level=10)
    problem = HpProblem()
    problem.add_hyperparameter((-40., 40.), "arg")

    evaluator = SubprocessEvaluator(
        run_mpi, num_workers=1,
    )

    search = CBO(problem, evaluator)
    results = search.search(max_evals=10)
    print(results)


if __name__ == "__main__":
    execute_deephyper()

In this example evaluator does not provide dequed argument to run the function, therefore, the default value is used. By default we specified only one available host with one slot localhost:1, therefore, one evaluation will be done with only one rank.

1.5. Optimization with multiple nodes#

Finally, we demonstrate the execution of a binary within several ranks. For ThetaGPU we obtain a list of nodes in get_thetagpu_nodelist() and specify that every node has 8 slots. The evaluator is decorated with decorator queue which manages queue of resources (ThetaGPU nodes in this example) and provides queue_pop_per_task nodes for one evaluation. In this example queue_pop_per_task=2 such that every evaluation obtain two nodes with eight slots resulting in 16 ranks. The number of workers may be computed by dividing the total number of nodes by the number of nodes per evaluation.

import os
import logging

from deephyper.evaluator.callback import LoggerCallback
from deephyper.problem import HpProblem
from deephyper.search.hps import CBO
from deephyper.evaluator import SubprocessEvaluator, queued


from search1 import run_mpi


def get_thetagpu_nodelist():
    f = os.environ['COBALT_NODEFILE']
    with open(f) as f:
        lines = f.readlines()
        nodelist = [line.rstrip() for line in lines]
    return nodelist


def execute_deephyper():
    logging.basicConfig(level=10)

    problem = HpProblem()
    problem.add_hyperparameter((-40., 40.), "arg")

    nodes_per_task = 2
    n_ranks_per_node = 8
    nodes = [f'{n}:{n_ranks_per_node}' for n in get_thetagpu_nodelist()]

    evaluator = queued(SubprocessEvaluator)(
        run_mpi,
        num_workers=len(nodes) // nodes_per_task,
        queue=nodes,
        queue_pop_per_task=nodes_per_task,
        callbacks=[LoggerCallback()]
    )

    search = CBO(problem, evaluator)
    results = search.search(max_evals=40)
    print(results)


if __name__ == "__main__":
    execute_deephyper()