"""The callback module contains sub-classes of the ``Callback`` class.
The ``Callback`` class is used to trigger custom actions on the start and
completion of jobs by the ``Evaluator``. Callbacks can be used with any
``Evaluator`` implementation.
"""
import abc
import csv
import logging
import os
from typing import List
import numpy as np
from deephyper.evaluator import HPOJob, Job
from deephyper.evaluator.utils import test_ipython_interpretor
from deephyper.skopt.moo import hypervolume
if test_ipython_interpretor():
from tqdm.notebook import tqdm
else:
from tqdm import tqdm
__all__ = ["Callback", "LoggerCallback", "TqdmCallback", "SearchEarlyStopping"]
logger = logging.getLogger(__name__)
[docs]
class Callback(abc.ABC):
"""Callback interface."""
[docs]
def on_launch(self, job: Job):
"""Called each time a ``Job`` is created by the ``Evaluator``.
Args:
job (Job): The created job.
"""
[docs]
def on_done(self, job: Job):
"""Called each time a local ``Job`` has been gathered by the Evaluator.
Args:
job (Job): The completed job.
"""
[docs]
def on_done_other(self, job: Job):
"""Called after local ``Job`` have been gathered for each remote ``Job`` that is done.
Args:
job (Job): The completed Job.
"""
[docs]
def on_gather(self, local_jobs: List[Job], other_jobs: List[Job]):
"""Called after gathering jobs.
Args:
local_jobs (List[Job]):
gathered jobs from local evaluator instance.
other_jobs (List[Job]):
gathered jobs from other evaluators using the same storage.
"""
[docs]
def on_close(self):
"""Called when the evaluator is being closed."""
[docs]
class ObjectiveRecorder:
"""Records the objective values of the jobs.
:meta: private
"""
def __init__(self):
self._objectives = []
self.is_multi_objective = False
self._last_return = -float("inf")
[docs]
def __call__(self, job):
"""Called when a local job has been gathered."""
# Only add the objective if it is not a string (i.e., failure...)
if not isinstance(job.objective, str):
self._objectives.append(job.objective)
# Then check if the objective is multi-objective
if np.ndim(job.objective) > 0:
self.is_multi_objective = True
# If no objectives are received but only failures then return -inf
if len(self._objectives) == 0:
return self._last_return
# If single objective then returns the maximum
if not self.is_multi_objective:
self._last_return = max(self._objectives[-1], self._last_return)
return self._last_return
else:
objectives = -np.asarray(self._objectives)
ref = np.max(objectives, axis=0) # reference point
return hypervolume(objectives, ref)
[docs]
class LoggerCallback(Callback):
"""Print information when jobs are completed by the ``Evaluator``.
An example usage can be:
>>> evaluator.create(method="ray", method_kwargs={..., "callbacks": [LoggerCallback()]})
"""
def __init__(self):
self._best_objective = None
self._n_done = 0
self._objective_func = ObjectiveRecorder()
[docs]
def on_done_other(self, job):
"""Called after gathering local jobs on available remote jobs that are done."""
self.on_done(job)
[docs]
def on_done(self, job):
"""Called when a local job has been gathered."""
self._n_done += 1
# Test if multi objectives are received
if np.ndim(job.objective) > 0:
if np.isreal(job.objective).all():
self._best_objective = self._objective_func(job)
tmp = tuple(round(o, 5) if not isinstance(o, str) else o for o in job.objective)
print(
f"[{self._n_done:05d}] -- HVI Objective: {self._best_objective:.5f} -- "
f"Last Objective: {tmp}"
)
elif np.any(type(res) is str and "F" == res[0] for res in job.objective):
print(f"[{self._n_done:05d}] -- Last Failure: {job.objective}")
elif np.isreal(job.objective):
self._best_objective = self._objective_func(job)
print(
f"[{self._n_done:05d}] -- Maximum Objective: {self._best_objective:.5f} -- "
f"Last Objective: {job.objective:.5f}"
)
elif type(job.objective) is str and "F" == job.objective[0]:
print(f"[{self._n_done:05d}] -- Last Failure: {job.objective}")
[docs]
class TqdmCallback(Callback):
"""Print information when jobs are completed by the ``Evaluator``.
Args:
description (str, optional): an optional description to add to the progressbar.
An example usage can be:
>>> evaluator.create(method="ray", method_kwargs={..., "callbacks": [TqdmCallback()]})
"""
def __init__(self, description: str = None):
self._best_objective = None
self._n_done = 0
self._n_failures = 0
self._max_evals = None
self._tqdm = None
self._objective_func = ObjectiveRecorder()
self._description = description
[docs]
def set_max_evals(self, max_evals):
"""Setter for the maximum number of evaluations.
It is used to initialize the tqdm progressbar.
"""
self._max_evals = max_evals
self._tqdm = None
[docs]
def on_done_other(self, job):
"""Called after gathering local jobs on available remote jobs that are done."""
self.on_done(job)
[docs]
def on_done(self, job):
"""Called when a local job has been gathered."""
if self._tqdm is None:
if self._max_evals:
self._tqdm = tqdm(total=self._max_evals)
else:
self._tqdm = tqdm()
if self._description:
self._tqdm.set_description(self._description)
self._n_done += 1
self._tqdm.update(1)
if isinstance(job, HPOJob):
# Test if multi objectives are received
if np.ndim(job.objective) > 0:
if not (any(not (np.isreal(objective_i)) for objective_i in job.objective)):
self._best_objective = self._objective_func(job)
else:
self._n_failures += 1
self._tqdm.set_postfix({"failures": self._n_failures, "hvi": self._best_objective})
else:
if np.isreal(job.objective):
self._best_objective = self._objective_func(job)
else:
self._n_failures += 1
self._tqdm.set_postfix(objective=self._best_objective, failures=self._n_failures)
if self._max_evals == self._n_done:
self._tqdm.close()
[docs]
class SearchEarlyStopping(Callback):
"""Stop the search gracefully when it does not improve for a given number of evaluations.
Args:
patience (int, optional):
The number of not improving evaluations to wait for before
stopping the search. Defaults to ``10``.
objective_func (callable, optional):
A function that takes a ``Job`` has input and returns the maximized scalar value
monitored by this callback. Defaults to computes the maximum for single-objective
optimization and the hypervolume for multi-objective optimization.
threshold (float, optional):
The threshold to reach before activating the patience to stop the
search. Defaults to ``None``, patience is reinitialized after
each improving observation.
verbose (bool, optional): Activation or deactivate the verbose mode. Defaults to ``True``.
"""
def __init__(
self,
patience: int = 10,
objective_func=None,
threshold: float = None,
verbose: bool = 1,
):
self._best_objective = None
self._n_lower = 0
self._patience = patience
self._objective_func = ObjectiveRecorder() if objective_func is None else objective_func
self._threshold = threshold
self._verbose = verbose
self.search_stopped = False
[docs]
def on_done_other(self, job):
"""Called after gathering local jobs on available remote jobs that are done."""
self.on_done(job)
[docs]
def on_done(self, job):
"""Called when a local job has been gathered."""
job_objective = self._objective_func(job)
if self._best_objective is None:
self._best_objective = job_objective
else:
if job_objective > self._best_objective:
if self._verbose:
print(
"Objective has improved from "
f"{self._best_objective:.5f} -> {job_objective:.5f}"
)
self._best_objective = job_objective
self._n_lower = 0
else:
self._n_lower += 1
if self._n_lower >= self._patience:
if self._threshold is None:
if self._verbose:
print(
"Stopping the search because it did not improve for the last "
f"{self._patience} evaluations!"
)
self.search_stopped = True
else:
if self._best_objective > self._threshold:
if self._verbose:
print(
"Stopping the search because it did not improve for the last "
f"{self._patience} evaluations!"
)
self.search_stopped = True
# TODO: Add unit tests
# This class is made to be used by people who wants to log results from the
# evaluator without using it within the Search.
[docs]
class CSVLoggerCallback(Callback):
"""Dump jobs done to a CSV file.
Args:
path (str): The path where the CSV is being dumped.
"""
def __init__(self, path: str = "results.csv"):
self.path = os.path.abspath(path)
if not os.path.exists(os.path.dirname(path)):
raise ValueError(f"Directory not found {self.path}")
self.jobs_done = []
self.num_objective = None
self._start_dumping = False
self._columns_dumped = None
self._job_class = None
[docs]
def on_gather(self, local_jobs: List[Job], other_jobs: List[Job]):
"""Called after gathering jobs.
Args:
local_jobs (List[Job]):
gathered jobs from local evaluator instance.
other_jobs (List[Job]):
gathered jobs from other evaluators using the same storage.
"""
self.jobs_done.extend(local_jobs)
self.jobs_done.extend(other_jobs)
self.dump_jobs_done_to_csv(self.path)
[docs]
def on_close(self):
self.dump_jobs_done_to_csv(self.path, flush=True)
[docs]
def dump_jobs_done_to_csv(self, path: str, flush: bool = False):
"""Dump completed jobs to a CSV file.
This will reset the ``Evaluator.jobs_done`` attribute to an empty list.
Args:
path (str):
The path of the file where the CSV is being dumped.
flush (bool):
A boolean indicating if the results should be flushed (i.e., forcing the dumping).
"""
if len(self.jobs_done) > 0:
if self._job_class is None:
self._job_class = type(self.jobs_done[0])
else:
return
logger.info("Dumping completed jobs to CSV...")
if self._job_class is HPOJob:
self._dump_jobs_done_to_csv_as_hpo_format(path, flush)
else:
self._dump_jobs_done_to_csv_as_regular_format(path)
logger.info("Dumping done")
def _dump_jobs_done_to_csv_as_regular_format(self, path: str):
"""Dump completed jobs to a CSV file for regular job format.
Args:
path (str):
The path of the file where the CSV is being dumped.
"""
records_list = []
for job in self.jobs_done:
# Start with job.id
result = {"job_id": int(job.id.split(".")[1])}
# Add job.status
result["job_status"] = job.status.name
# input arguments: add prefix for all keys found in "args"
result.update({f"p:{k}": v for k, v in job.args.items()})
# output
if isinstance(job.output, dict):
output = {f"o:{k}": v for k, v in job.output.items()}
else:
output = {"o:": job.output}
result.update(output)
# metadata
metadata = {f"m:{k}": v for k, v in job.metadata.items() if k[0] != "_"}
result.update(metadata)
records_list.append(result)
if len(records_list) != 0:
mode = "a" if self._start_dumping else "w"
with open(path, mode) as fp:
if not (self._start_dumping):
self._columns_dumped = records_list[0].keys()
if self._columns_dumped is not None:
writer = csv.DictWriter(fp, self._columns_dumped, extrasaction="ignore")
if not (self._start_dumping):
writer.writeheader()
self._start_dumping = True
writer.writerows(records_list)
self.jobs_done = []
def _dump_jobs_done_to_csv_as_hpo_format(self, path: str, flush: bool = False):
"""Dump completed jobs to a CSV file for the hyperparameter optimization format.
This will reset the ``Evaluator.jobs_done`` attribute to an empty list.
Args:
path (str):
The path of the file where the CSV is being dumped.
flush (bool):
A boolean indicating if the results should be flushed (i.e., forcing the dumping).
"""
resultsList = []
for job in self.jobs_done:
# add prefix for all keys found in "args"
result = {f"p:{k}": v for k, v in job.args.items()}
# when the returned value of the run-function is a dict we flatten it to add in csv
result["objective"] = job.objective
print(f"{job.objective=}")
# when the objective is a tuple (multi-objective) we create 1 column per tuple-element
if isinstance(result["objective"], tuple) or isinstance(result["objective"], list):
obj = result.pop("objective")
if self.num_objective is None:
self.num_objective = len(obj)
for i, objval in enumerate(obj):
result[f"objective_{i}"] = objval
else:
if self.num_objective is None:
self.num_objective = 1
if self.num_objective > 1:
obj = result.pop("objective")
for i in range(self.num_objective):
result[f"objective_{i}"] = obj
# Add job.id
result["job_id"] = int(job.id.split(".")[1])
# Add job.status
result["job_status"] = job.status.name
# Profiling and other
# methdata keys starting with "_" are not saved (considered as internal)
metadata = {f"m:{k}": v for k, v in job.metadata.items() if k[0] != "_"}
result.update(metadata)
resultsList.append(result)
if len(resultsList) != 0:
mode = "a" if self._start_dumping else "w"
with open(path, mode) as fp:
if not (self._start_dumping):
for result in resultsList:
# Waiting to start receiving non-failed jobs before dumping results
is_single_obj_and_has_success = (
"objective" in result and type(result["objective"]) is not str
)
is_multi_obj_and_has_success = (
"objective_0" in result and type(result["objective_0"]) is not str
)
print(f"{is_single_obj_and_has_success=}, {is_multi_obj_and_has_success=}")
if is_single_obj_and_has_success or is_multi_obj_and_has_success or flush:
self._columns_dumped = result.keys()
break
if self._columns_dumped is not None:
writer = csv.DictWriter(fp, self._columns_dumped, extrasaction="ignore")
if not (self._start_dumping):
writer.writeheader()
self._start_dumping = True
writer.writerows(resultsList)
self.jobs_done = []