deephyper.evaluator.ProcessPoolEvaluator#

class deephyper.evaluator.ProcessPoolEvaluator(run_function: Callable, num_workers: int = 1, callbacks: list = None, run_function_kwargs: dict = None, storage: Storage = None, search_id: Hashable = None)[source]#

Bases: Evaluator

This evaluator uses the ProcessPoolExecutor as backend.

Parameters:
  • run_function (callable) – Functions to be executed by the Evaluator.

  • num_workers (int, optional) – Number of parallel processes used to compute the run_function. Defaults to 1.

  • callbacks (list, optional) – A list of callbacks to trigger custom actions at the creation or completion of jobs. Defaults to None.

  • run_function_kwargs (dict, optional) – Static keyword arguments to pass to the run_function when executed.

  • storage (Storage, optional) – Storage used by the evaluator. Defaults to SharedMemoryStorage.

  • search_id (Hashable, optional) – The id of the search to use in the corresponding storage. If None it will create a new search identifier when initializing the search.

Methods

close

Closes the Evaluator.

create

Create evaluator with a specific backend and configuration.

execute

Execute the received job.

gather

Collect the completed tasks from the evaluator in batches of one or more.

gather_other_jobs_done

Access storage to return results from other processes.

set_event_loop

set_maximum_num_jobs_submitted

submit

Send configurations to be evaluated by available workers.

to_json

Returns a json version of the evaluator.

Attributes

FAIL_RETURN_VALUE

NEST_ASYNCIO_PATCHED

PYTHON_EXE

is_master

Indicates if the current Evaluator object is a "master".

num_jobs_gathered

The number of jobs gathered.

num_jobs_submitted

The number of jobs submitted.

time_left

The time remaining according to a previously set timeout.

timeout

The timeout value set.

close() List[Job]#

Closes the Evaluator.

This will:

  1. check if there are still running tasks in the AsyncIO loop.

  2. check if there are task’s results not collected yet.

  3. cancel running tasks.

  4. wait for running tasks to complete.

  5. close the asyncio loop.

static create(run_function, method='serial', method_kwargs={})#

Create evaluator with a specific backend and configuration.

Parameters:
  • run_function (callable) – The function to execute in parallel.

  • method (str, optional) –

    The backend to use in ["serial", "thread", "process", "loky", "ray", "mpicomm"]. Defaults to "serial".

    This uses Python’s asyncio base module for concurrency. It is an efficient method for Python’s functions that are I/O bound and implemented through the async def and await primitives. It is running the code in in the local memory context of the current process.

    This uses Python’s threading base module for concurrency. It is an efficient method for Python’s functions that are synchronously defined def foo(…) but use the threading module internaly. It is running the code in in the local memory context of the current process.

    This uses Python’s concurrents.futures base module for concurrency. It is an efficient method for Python’s functions that are compute bound and should be scheduled on different CPU cores of the local node. This method uses serialization by reference through the pickle base module. Therefore it can only work with functions that are “importable”. It is running the code in a different memory context of the current process.

    loky Python package for concurrency. It is an efficient method for Python’s functions that are compute bound and should be scheduled on different CPU cores of the local node. This method uses serialization by value through the cloudpickle Python package. Therefore it can be usesful to schedule the execution of localy defined functions (i.e., not at the module level, inside an other function for example) that are not importable or lambda functions. It is running the code in a different memory context of the current process.

    ray Python package. It is an efficient method for Python’s function that are compute bound and should be scheduled on different compute ressources not necessarily on the local node. For a multi-nodes setting it requires a Ray cluster to be started before creating the evaluator. This method uses serialization by value through the cloudpickle Python package. Therefore it can also work with local definitions of functions. It is also useful to easily perform some I/O optimization for example by pre-loading data to remote processes (e.g., using the ray.put and ray.get primitives). It is running the code in a different memory context of the current process. However, a global “Object Storage” is accessible to all executed code.

    uses the mpi4py Python package. It is an efficient method for Python’s function that are compute bound and should be scheduled on different compute ressources not necessarily on the local node. It schedules task on MPI ranks available. This method uses serialization by reference through the pickle base module. Therefore it can only work with functions that are “importable”. It is running the code in a different memory context of the current process.

  • method_kwargs (dict, optional) – Configuration dictionnary of the corresponding backend. Keys corresponds to the keyword arguments of the constructor of the corresponding evaluator class. Defaults to "{}".

Raises:

ValueError – if the method is not acceptable.

Returns:

the instanciated Evaluator with the corresponding backend and configuration.

Return type:

Evaluator

async execute(job: Job) Job[source]#

Execute the received job. To be implemented with a specific backend.

Parameters:

job (Job) – the Job to be executed.

Returns:

the update Job.

Return type:

job

gather(type, size: int = 1) list[Job] | tuple[list[Job], list[Job]]#

Collect the completed tasks from the evaluator in batches of one or more.

Parameters:
  • type (str) –

    • "ALL": Block until all jobs submitted to the evaluator are completed.

    • "BATCH" Specify a minimum batch size of jobs to collect from the evaluator.

    The method will block until at least size evaluations are completed.

  • size (int) – The minimum batch size that we want to collect from the evaluator. Defaults to 1.

Raises:

Exception – Raised when a gather operation other than “ALL” or “BATCH” is provided.

Returns:

A batch of completed jobs that is at minimum the given size.

Return type:

list[Job] | tuple[list[Job], list[Job]]

gather_other_jobs_done() list[Job]#

Access storage to return results from other processes.

Returns:

A batch of completed jobs.

Return type:

list[Job]

property is_master: bool#

Indicates if the current Evaluator object is a “master”.

property num_jobs_gathered: int#

The number of jobs gathered.

property num_jobs_submitted: int#

The number of jobs submitted.

submit(args_list: List[Dict])#

Send configurations to be evaluated by available workers.

Parameters:

args_list (List[Dict]) – A list of dict which will be passed to the run function to be executed.

property time_left: float | None#

The time remaining according to a previously set timeout.

property timeout: float | None#

The timeout value set.

to_json()#

Returns a json version of the evaluator.