Source code for deephyper.evaluator._ray

import logging

import ray

from typing import Callable, Hashable

from deephyper.evaluator._evaluator import Evaluator
from deephyper.evaluator._job import Job
from deephyper.evaluator.storage import Storage, RayStorage

ray_initializer = None

logger = logging.getLogger(__name__)


[docs]class RayEvaluator(Evaluator): """This evaluator uses the ``ray`` library as backend. Args: run_function (callable): functions to be executed by the ``Evaluator``. callbacks (list, optional): A list of callbacks to trigger custom actions at the creation or completion of jobs. Defaults to None. run_function_kwargs (dict, optional): Static keyword arguments to pass to the ``run_function`` when executed. storage (Storage, optional): Storage used by the evaluator. Defaults to ``RayStorage``. search_id (Hashable, optional): The id of the search to use in the corresponding storage. If ``None`` it will create a new search identifier when initializing the search. address (str, optional): address of the Ray-head. Defaults to None, if no Ray-head was started. password (str, optional): password to connect ot the Ray-head. Defaults to None, if the default Ray-password is used. num_cpus (int, optional): number of CPUs available in the Ray-cluster. Defaults to None, if the Ray-cluster was already started it will be automatically computed. num_gpus (int, optional): number of GPUs available in the Ray-cluster. Defaults to None, if the Ray-cluster was already started it will be automatically computed. num_cpus_per_task (float, optional): number of CPUs used per remote task. Defaults to 1. num_gpus_per_task (float, optional): number of GPUs used per remote task. Defaults to None. ray_kwargs (dict, optional): other ray keyword arguments passed to ``ray.init(...)``. Defaults to {}. num_workers (int, optional): number of workers available to compute remote-tasks in parallel. Defaults to ``None``, or if it is ``-1`` it is automatically computed based with ``num_workers = int(num_cpus // num_cpus_per_task)``. """ def __init__( self, run_function: Callable, callbacks: list = None, run_function_kwargs: dict = None, storage: Storage = None, search_id: Hashable = None, address: str = None, password: str = None, num_cpus: int = None, num_gpus: int = None, include_dashboard: bool = False, num_cpus_per_task: float = 1, num_gpus_per_task: float = None, ray_kwargs: dict = None, num_workers: int = None, ): # get the __init__ parameters self._init_params = locals() # ray_kwargs = {} if ray_kwargs is None else ray_kwargs if address is not None: ray_kwargs["address"] = address if password is not None: ray_kwargs["_redis_password"] = password if num_cpus is not None: ray_kwargs["num_cpus"] = num_cpus if num_gpus is not None: ray_kwargs["num_gpus"] = num_gpus if include_dashboard is not None: ray_kwargs["include_dashboard"] = include_dashboard if not (ray.is_initialized()): ray.init(**ray_kwargs) super().__init__( run_function=run_function, num_workers=num_workers, callbacks=callbacks, run_function_kwargs=run_function_kwargs, storage=storage if storage is not None else RayStorage(), search_id=search_id, ) self.num_cpus_per_task = num_cpus_per_task self.num_gpus_per_task = num_gpus_per_task if num_cpus is None: self.num_cpus = int( sum([node["Resources"].get("CPU", 0) for node in ray.nodes()]) ) else: self.num_cpus = num_cpus if num_gpus is None: self.num_gpus = int( sum([node["Resources"].get("GPU", 0) for node in ray.nodes()]) ) else: self.num_gpus = num_gpus if self.num_workers is None or self.num_workers == -1: self.num_workers = int(self.num_cpus // self.num_cpus_per_task) if hasattr(run_function, "__name__") and hasattr(run_function, "__module__"): logger.info( f"Ray Evaluator will execute {self.run_function.__name__}() from module {self.run_function.__module__}" ) else: logger.info(f"Ray Evaluator will execute {self.run_function}") self._remote_run_function = ray.remote( num_cpus=self.num_cpus_per_task, num_gpus=self.num_gpus_per_task, # max_calls=1, )(self.run_function)
[docs] async def execute(self, job: Job) -> Job: running_job = job.create_running_job(self._storage, self._stopper) output = await self._remote_run_function.remote( running_job, **self.run_function_kwargs ) job.set_output(output) return job