import asyncio
import copy
import csv
import importlib
import json
import logging
import os
import sys
import time
import warnings
from re import I
from typing import Dict, List
import numpy as np
from deephyper.evaluator._job import Job
from deephyper.skopt.optimizer import OBJECTIVE_VALUE_FAILURE
EVALUATORS = {
"mpipool": "_mpi_pool.MPIPoolEvaluator",
"mpicomm": "_mpi_comm.MPICommEvaluator",
"process": "_process_pool.ProcessPoolEvaluator",
"ray": "_ray.RayEvaluator",
"serial": "_serial.SerialEvaluator",
"subprocess": "_subprocess.SubprocessEvaluator",
"thread": "_thread_pool.ThreadPoolEvaluator",
}
def _test_ipython_interpretor() -> bool:
"""Test if the current Python interpretor is IPython or not.
Suggested by: https://stackoverflow.com/questions/15411967/how-can-i-check-if-code-is-executed-in-the-ipython-notebook
"""
try:
shell = get_ipython().__class__.__name__
if shell == 'ZMQInteractiveShell':
return True # Jupyter notebook or qtconsole
elif shell == 'TerminalInteractiveShell':
return False # Terminal running IPython
else:
return False # Other type (?)
except NameError:
return False # Probably standard Python interpreter
[docs]class Evaluator:
"""This ``Evaluator`` class asynchronously manages a series of Job objects to help execute given HPS or NAS tasks on various environments with differing system settings and properties.
Args:
run_function (callable): functions to be executed by the ``Evaluator``.
num_workers (int, optional): Number of parallel workers available for the ``Evaluator``. Defaults to 1.
callbacks (list, optional): A list of callbacks to trigger custom actions at the creation or completion of jobs. Defaults to None.
"""
FAIL_RETURN_VALUE = OBJECTIVE_VALUE_FAILURE
PYTHON_EXE = os.environ.get("DEEPHYPER_PYTHON_BACKEND", sys.executable)
assert os.path.isfile(PYTHON_EXE)
def __init__(
self,
run_function,
num_workers: int = 1,
callbacks: list = None,
run_function_kwargs: dict = None,
):
self.run_function = run_function # User-defined run function.
self.run_function_kwargs = (
{} if run_function_kwargs is None else run_function_kwargs
)
# Number of parallel workers available
self.num_workers = num_workers
self.jobs = [] # Job objects currently submitted.
self.n_jobs = 1
self._tasks_running = [] # List of AsyncIO Task objects currently running.
self._tasks_done = [] # Temp list to hold completed tasks from asyncio.
self._tasks_pending = [] # Temp list to hold pending tasks from asyncio.
self.jobs_done = [] # List used to store all jobs completed by the evaluator.
self.timestamp = (
time.time()
) # Recorded time of when this evaluator interface was created.
self._loop = None # Event loop for asyncio.
self._start_dumping = False
self._callbacks = [] if callbacks is None else callbacks
self._lock = asyncio.Lock()
# to avoid "RuntimeError: This event loop is already running"
if _test_ipython_interpretor():
warnings.warn("Applying nest-asyncio patch for IPython Shell!", category=UserWarning)
import deephyper.evaluator._nest_asyncio as nest_asyncio
nest_asyncio.apply()
[docs] @staticmethod
def create(run_function, method="subprocess", method_kwargs={}):
"""Create evaluator with a specific backend and configuration.
Args:
run_function (function): the function to execute in parallel.
method (str, optional): the backend to use in ["thread", "process", "subprocess", "ray"]. Defaults to "subprocess".
method_kwargs (dict, optional): configuration dictionnary of the corresponding backend. Keys corresponds to the keyword arguments of the corresponding implementation. Defaults to "{}".
Raises:
ValueError: if the ``method is`` not acceptable.
Returns:
Evaluator: the ``Evaluator`` with the corresponding backend and configuration.
"""
logging.info(
f"Creating Evaluator({run_function}, method={method}, method_kwargs={method_kwargs}..."
)
if not method in EVALUATORS.keys():
val = ", ".join(EVALUATORS)
raise ValueError(
f'The method "{method}" is not a valid method for an Evaluator!'
f" Choose among the following evalutor types: "
f"{val}."
)
# create the evaluator
mod_name, attr_name = EVALUATORS[method].split(".")
mod = importlib.import_module(f"deephyper.evaluator.{mod_name}")
eval_cls = getattr(mod, attr_name)
evaluator = eval_cls(run_function, **method_kwargs)
logging.info(f"Creation done")
return evaluator
async def _get_at_least_n_tasks(self, n):
# If a user requests a batch size larger than the number of currently-running tasks, set n to the number of tasks running.
if n > len(self._tasks_running):
warnings.warn(
f"Requested a batch size ({n}) larger than currently running tasks ({len(self._tasks_running)}). Batch size has been set to the count of currently running tasks."
)
n = len(self._tasks_running)
# wait for all running tasks (sync.)
if n == len(self._tasks_running):
self._tasks_done, self._tasks_pending = await asyncio.wait(
self._tasks_running, return_when="ALL_COMPLETED"
)
else:
while len(self._tasks_done) < n:
self._tasks_done, self._tasks_pending = await asyncio.wait(
self._tasks_running, return_when="FIRST_COMPLETED"
)
async def _run_jobs(self, configs):
for config in configs:
# Create a Job object from the input configuration
new_job = Job(self.n_jobs, config, self.run_function)
self.n_jobs += 1
self.jobs.append(new_job)
self._on_launch(new_job)
task = self.loop.create_task(self._execute(new_job))
self._tasks_running.append(task)
def _on_launch(self, job):
"""Called after a job is started."""
job.status = job.RUNNING
job.timestamp_submit = time.time() - self.timestamp
# call callbacks
for cb in self._callbacks:
cb.on_launch(job)
def _on_done(self, job):
"""Called after a job has completed."""
job.status = job.DONE
job.timestamp_gather = time.time() - self.timestamp
if np.isscalar(job.result):
if np.isreal(job.result) and not (np.isfinite(job.result)):
job.result = Evaluator.FAIL_RETURN_VALUE
# call callbacks
for cb in self._callbacks:
cb.on_done(job)
async def _execute(self, job):
job = await self.execute(job)
# code to manage the profile decorator
profile_keys = ["objective", "timestamp_start", "timestamp_end"]
if isinstance(job.result, dict) and all(k in job.result for k in profile_keys):
profile = job.result
job.result = profile["objective"]
job.timestamp_start = profile["timestamp_start"] - self.timestamp
job.timestamp_end = profile["timestamp_end"] - self.timestamp
return job
[docs] async def execute(self, job) -> Job:
"""Execute the received job. To be implemented with a specific backend.
Args:
job (Job): the ``Job`` to be executed.
"""
raise NotImplementedError
[docs] def submit(self, configs: List[Dict]):
"""Send configurations to be evaluated by available workers.
Args:
configs (List[Dict]): A list of dict which will be passed to the run function to be executed.
"""
logging.info(f"submit {len(configs)} job(s) starts...")
self.loop = asyncio.get_event_loop()
self.loop.run_until_complete(self._run_jobs(configs))
logging.info("submit done")
[docs] def gather(self, type, size=1):
"""Collect the completed tasks from the evaluator in batches of one or more.
Args:
type (str):
Options:
``"ALL"``
Block until all jobs submitted to the evaluator are completed.
``"BATCH"``
Specify a minimum batch size of jobs to collect from the evaluator. The method will block until at least ``size`` evaluations are completed.
size (int, optional): The minimum batch size that we want to collect from the evaluator. Defaults to 1.
Raises:
Exception: Raised when a gather operation other than "ALL" or "BATCH" is provided.
Returns:
List[Job]: A batch of completed jobs that is at minimum the given size.
"""
logging.info(f"gather({type}, size={size}) starts...")
assert type in ["ALL", "BATCH"], f"Unsupported gather operation: {type}."
results = []
if type == "ALL":
size = len(self._tasks_running) # Get all tasks.
self.loop.run_until_complete(self._get_at_least_n_tasks(size))
for task in self._tasks_done:
job = task.result()
self._on_done(job)
results.append(job)
self.jobs_done.append(job)
self._tasks_running.remove(task)
self._tasks_done = []
self._tasks_pending = []
logging.info("gather done")
return results
[docs] def decode(self, key):
"""Decode the key following a JSON format to return a dict."""
x = json.loads(key)
if not isinstance(x, dict):
raise ValueError(f"Expected dict, but got {type(x)}")
return x
[docs] def convert_for_csv(self, val):
"""Convert an input value to an accepted format to be saved as a value of a CSV file (e.g., a list becomes it's str representation).
Args:
val (Any): The input value to convert.
Returns:
Any: The converted value.
"""
if type(val) is list:
return str(val)
else:
return val
[docs] def dump_evals(self, saved_keys=None, log_dir: str = "."):
"""Dump evaluations to a CSV file name ``"results.csv"``
Args:
saved_keys (list|callable): If ``None`` the whole ``job.config`` will be added as row of the CSV file. If a ``list`` filtered keys will be added as a row of the CSV file. If a ``callable`` the output dictionnary will be added as a row of the CSV file.
log_dir (str): directory where to dump the CSV file.
"""
logging.info("dump_evals starts...")
resultsList = []
for job in self.jobs_done:
if saved_keys is None:
result = copy.deepcopy(job.config)
elif type(saved_keys) is list:
decoded_key = copy.deepcopy(job.config)
result = {k: self.convert_for_csv(decoded_key[k]) for k in saved_keys}
elif callable(saved_keys):
result = copy.deepcopy(saved_keys(job))
result["job_id"] = job.id
# when the returned value of the bb is a dict we flatten it to add in csv
if isinstance(job.result, dict):
result.update(job.result)
else:
result["objective"] = job.result
result["timestamp_submit"] = job.timestamp_submit
result["timestamp_gather"] = job.timestamp_gather
if job.timestamp_start is not None and job.timestamp_end is not None:
result["timestamp_start"] = job.timestamp_start
result["timestamp_end"] = job.timestamp_end
if hasattr(job, 'dequed'):
result["dequed"] = ','.join(job.dequed)
if "optuna_trial" in result:
result.pop("optuna_trial")
resultsList.append(result)
self.jobs_done = []
if len(resultsList) != 0:
mode = "a" if self._start_dumping else "w"
with open(os.path.join(log_dir, "results.csv"), mode) as fp:
columns = resultsList[0].keys()
writer = csv.DictWriter(fp, columns)
if not (self._start_dumping):
writer.writeheader()
self._start_dumping = True
writer.writerows(resultsList)
logging.info("dump_evals done")