Source code for deephyper.evaluator._loky
import asyncio
import functools
from typing import Callable, Hashable
from loky import get_reusable_executor
from deephyper.evaluator import Evaluator, Job, JobStatus
from deephyper.evaluator.storage import SharedMemoryStorage, Storage
[docs]
class LokyEvaluator(Evaluator):
"""This evaluator uses the ``ProcessPoolExecutor`` from ``loky`` as backend.
The ``loky`` backend uses ``cloudpickle`` to serialize by value the called function and
its arguments. This allows to pass lambda or local functions to the ``Evaluator`` as
``run_function.``
Args:
run_function (callable):
Functions to be executed by the ``Evaluator``.
num_workers (int, optional):
Number of parallel processes used to compute the ``run_function``. Defaults to 1.
callbacks (list, optional):
A list of callbacks to trigger custom actions at the creation or
completion of jobs. Defaults to ``None``.
run_function_kwargs (dict, optional):
Static keyword arguments to pass to the ``run_function`` when executed.
storage (Storage, optional):
Storage used by the evaluator. Defaults to ``SharedMemoryStorage``.
search_id (Hashable, optional):
The id of the search to use in the corresponding storage. If
``None`` it will create a new search identifier when initializing
the search.
"""
def __init__(
self,
run_function: Callable,
num_workers: int = 1,
callbacks: list = None,
run_function_kwargs: dict = None,
storage: Storage = None,
search_id: Hashable = None,
):
if storage is None:
storage = SharedMemoryStorage()
super().__init__(
run_function=run_function,
num_workers=num_workers,
callbacks=callbacks,
run_function_kwargs=run_function_kwargs,
storage=storage,
search_id=search_id,
)
# Creating the exector once here is crutial to avoid repetitive overheads
# context="spawn" is important to avoid deadlocks in multi-threaded environments
# which is the case because we are using asyncio
self.executor = get_reusable_executor(max_workers=self.num_workers, context="spawn")
self.sem = None
def set_event_loop(self):
super().set_event_loop()
# The semaphore should be created after getting the event loop to avoid
# binding it to a different event loop
self.sem = asyncio.Semaphore(self.num_workers)
[docs]
async def execute(self, job: Job) -> Job:
async with self.sem:
job.status = JobStatus.RUNNING
running_job = job.create_running_job(self._stopper)
run_function = functools.partial(
job.run_function, running_job, **self.run_function_kwargs
)
run_function_future = self.loop.run_in_executor(self.executor, run_function)
if self.timeout is not None:
try:
output = await asyncio.wait_for(
asyncio.shield(run_function_future), timeout=self.time_left
)
except asyncio.TimeoutError:
job.status = JobStatus.CANCELLING
output = await run_function_future
job.status = JobStatus.CANCELLED
else:
output = await run_function_future
return self._update_job_when_done(job, output)