Source code for deephyper.evaluator.callback

"""The callback module contains sub-classes of the ``Callback`` class.

The ``Callback`` class is used to trigger custom actions on the start and
completion of jobs by the ``Evaluator``. Callbacks can be used with any
``Evaluator`` implementation.
"""

import abc

import numpy as np

from deephyper.evaluator._evaluator import _test_ipython_interpretor
from deephyper.skopt.moo import hypervolume

if _test_ipython_interpretor():
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm


[docs] class Callback(abc.ABC): """Callback interface."""
[docs] def on_launch(self, job): """Called each time a ``Job`` is created by the ``Evaluator``. Args: job (Job): The created job. """
[docs] def on_done(self, job): """Called each time a local ``Job`` has been gathered by the Evaluator. Args: job (Job): The completed job. """
[docs] def on_done_other(self, job): """Called after local ``Job`` have been gathered for each remote ``Job`` that is done. Args: job (Job): The completed Job. """
[docs] class ObjectiveRecorder: """Records the objective values of the jobs. :meta: private """ def __init__(self): self._objectives = [] self.is_multi_objective = False self._last_return = -float("inf")
[docs] def __call__(self, job): """Called when a local job has been gathered.""" # Only add the objective if it is not a string (i.e., failure...) if not isinstance(job.objective, str): self._objectives.append(job.objective) # Then check if the objective is multi-objective if np.ndim(job.objective) > 0: self.is_multi_objective = True # If no objectives are received but only failures then return -inf if len(self._objectives) == 0: return self._last_return # If single objective then returns the maximum if not self.is_multi_objective: self._last_return = max(self._objectives[-1], self._last_return) return self._last_return else: objectives = -np.asarray(self._objectives) ref = np.max(objectives, axis=0) # reference point return hypervolume(objectives, ref)
[docs] class LoggerCallback(Callback): """Print information when jobs are completed by the ``Evaluator``. An example usage can be: >>> evaluator.create(method="ray", method_kwargs={..., "callbacks": [LoggerCallback()]}) """ def __init__(self): self._best_objective = None self._n_done = 0 self._objective_func = ObjectiveRecorder()
[docs] def on_done_other(self, job): """Called after gathering local jobs on available remote jobs that are done.""" self.on_done(job)
[docs] def on_done(self, job): """Called when a local job has been gathered.""" self._n_done += 1 # Test if multi objectives are received if np.ndim(job.objective) > 0: if np.isreal(job.objective).all(): self._best_objective = self._objective_func(job) tmp = tuple(round(o, 5) if not isinstance(o, str) else o for o in job.objective) print( f"[{self._n_done:05d}] -- HVI Objective: {self._best_objective:.5f} -- " f"Last Objective: {tmp}" ) elif np.any(type(res) is str and "F" == res[0] for res in job.objective): print(f"[{self._n_done:05d}] -- Last Failure: {job.objective}") elif np.isreal(job.objective): self._best_objective = self._objective_func(job) print( f"[{self._n_done:05d}] -- Maximum Objective: {self._best_objective:.5f} -- " f"Last Objective: {job.objective:.5f}" ) elif type(job.objective) is str and "F" == job.objective[0]: print(f"[{self._n_done:05d}] -- Last Failure: {job.objective}")
[docs] class TqdmCallback(Callback): """Print information when jobs are completed by the ``Evaluator``. An example usage can be: >>> evaluator.create(method="ray", method_kwargs={..., "callbacks": [TqdmCallback()]}) """ def __init__(self): self._best_objective = None self._n_done = 0 self._n_failures = 0 self._max_evals = None self._tqdm = None self._objective_func = ObjectiveRecorder()
[docs] def set_max_evals(self, max_evals): """Setter for the maximum number of evaluations. It is used to initialize the tqdm progressbar. """ self._max_evals = max_evals self._tqdm = None
[docs] def on_done_other(self, job): """Called after gathering local jobs on available remote jobs that are done.""" self.on_done(job)
[docs] def on_done(self, job): """Called when a local job has been gathered.""" if self._tqdm is None: if self._max_evals: self._tqdm = tqdm(total=self._max_evals) else: self._tqdm = tqdm() self._n_done += 1 self._tqdm.update(1) # Test if multi objectives are received if np.ndim(job.objective) > 0: if not (any(not (np.isreal(objective_i)) for objective_i in job.objective)): self._best_objective = self._objective_func(job) else: self._n_failures += 1 self._tqdm.set_postfix({"failures": self._n_failures, "hvi": self._best_objective}) else: if np.isreal(job.objective): self._best_objective = self._objective_func(job) else: self._n_failures += 1 self._tqdm.set_postfix(objective=self._best_objective, failures=self._n_failures)
[docs] class SearchEarlyStopping(Callback): """Stop the search gracefully when it does not improve for a given number of evaluations. Args: patience (int, optional): The number of not improving evaluations to wait for before stopping the search. Defaults to ``10``. objective_func (callable, optional): A function that takes a ``Job`` has input and returns the maximized scalar value monitored by this callback. Defaults to computes the maximum for single-objective optimization and the hypervolume for multi-objective optimization. threshold (float, optional): The threshold to reach before activating the patience to stop the search. Defaults to ``None``, patience is reinitialized after each improving observation. verbose (bool, optional): Activation or deactivate the verbose mode. Defaults to ``True``. """ def __init__( self, patience: int = 10, objective_func=None, threshold: float = None, verbose: bool = 1, ): self._best_objective = None self._n_lower = 0 self._patience = patience self._objective_func = ObjectiveRecorder() if objective_func is None else objective_func self._threshold = threshold self._verbose = verbose self.search_stopped = False
[docs] def on_done_other(self, job): """Called after gathering local jobs on available remote jobs that are done.""" self.on_done(job)
[docs] def on_done(self, job): """Called when a local job has been gathered.""" job_objective = self._objective_func(job) if self._best_objective is None: self._best_objective = job_objective else: if job_objective > self._best_objective: if self._verbose: print( "Objective has improved from " f"{self._best_objective:.5f} -> {job_objective:.5f}" ) self._best_objective = job_objective self._n_lower = 0 else: self._n_lower += 1 if self._n_lower >= self._patience: if self._threshold is None: if self._verbose: print( "Stopping the search because it did not improve for the last " f"{self._patience} evaluations!" ) self.search_stopped = True else: if self._best_objective > self._threshold: if self._verbose: print( "Stopping the search because it did not improve for the last " f"{self._patience} evaluations!" ) self.search_stopped = True