Source code for deephyper.evaluator._decorator

import asyncio
import functools
import pickle
import psutil
import os
import sys
import time

from concurrent.futures import ProcessPoolExecutor


def register_inner_function_for_pickle(func):
    """Register former decorated function under a new name.

    This is to be called in subprocess within the decorator.

    See: https://stackoverflow.com/questions/73146709/python-process-inside-decorator
    """
    prefix = "profiled_"
    func_name = func.__qualname__
    saved_name = prefix + func_name
    module_name = pickle.whichmodule(func, func_name)
    module = sys.modules[module_name]
    setattr(module, saved_name, func)
    func.__qualname__ = saved_name


# Example from
# https://github.com/dabeaz/python-cookbook/blob/master/src/9/defining_a_decorator_that_takes_an_optional_argument/example.py


def asyncio_run(func, *args, **kwargs):
    """Useful to run async function from subprocess."""
    if asyncio.iscoroutinefunction(func):
        return asyncio.run(func(*args, **kwargs))
    else:
        return func(*args, **kwargs)


[docs] def profile( # noqa: D417 _func=None, *, memory: bool = False, memory_limit: int = -1, memory_tracing_interval: float = 0.1, raise_exception: bool = False, register=True, ): """Decorator to use on a ``run_function`` to profile its execution-time and peak memory usage. By default, only the run-time is measured, for example by using the decorator as follows: .. code-block:: @profile def run(config): ... return y If the ``memory`` argument is set to ``True``, the memory usage is also measured, for example by using the decorator as follows: .. code-block:: @profile(memory=True) def run(config): ... return y If the ``memory_limit` is used then the call will be cancelled (when possible) if the memory usage exceeds the limit, for example by using the decorator as follows: .. code-block:: @profile(memory=True, memory_limit=0.1 * 1024**3, memory_tracing_interval=0.01) def run(config): ... return y Args: memory (bool): If ``True``, the memory usage is measured. The measured memory, in bytes, accounts for the whole process. Defaults to ``False``. memory_limit (int): In bytes, if set to a positive integer, the memory usage is measured at regular intervals and the function is interrupted if the memory usage exceeds the limit. If set to ``-1``, only the peak memory is measured. If the executed function is busy outside of the Python interpretor, this mechanism will not work properly. Defaults to ``-1``. memory_tracing_interval (float): In seconds, the interval at which the memory usage is measured. Defaults to ``0.1``. register (bool): Register the called function to be pickalable and executed in a subprocess when the we use as decorator ``@profile``. Returns: function: a decorated function. """ def decorator_profile(func): if register and memory: register_inner_function_for_pickle(func) @functools.wraps(func) async def async_wrapper_profile(*args, **kwargs): timestamp_start = time.time() if memory: p = psutil.Process() output = None with ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(os.getpid) pid = future.result() p = psutil.Process(pid) asyncio_run_func = functools.partial(asyncio_run, func) future = executor.submit(asyncio_run_func, *args, **kwargs) memory_peak = p.memory_info().rss while not future.done(): memory_peak = max(p.memory_info().rss, memory_peak) if memory_limit > 0 and memory_peak > memory_limit: p.kill() future.cancel() output = "F_memory_limit_exceeded" if raise_exception: raise MemoryError( f"Memory limit exceeded: {memory_peak} > {memory_limit}" ) break await asyncio.sleep(memory_tracing_interval) if output is None: output = future.result() else: output = await func(*args, **kwargs) timestamp_end = time.time() new_metadata = { "timestamp_start": timestamp_start, "timestamp_end": timestamp_end, } if memory: new_metadata["memory"] = memory_peak # Format correctly the output to return metadata if isinstance(output, dict): if "output" in output: if "metadata" not in output: output["metadata"] = {} else: output = {"output": output, "metadata": {}} else: output = {"output": output, "metadata": {}} output["metadata"].update(new_metadata) return output @functools.wraps(func) def sync_wrapper_profile(*args, **kwargs): timestamp_start = time.time() if memory: p = psutil.Process() output = None with ProcessPoolExecutor(max_workers=1) as executor: future = executor.submit(os.getpid) pid = future.result() p = psutil.Process(pid) future = executor.submit(func, *args, **kwargs) memory_peak = p.memory_info().rss while not future.done(): memory_peak = max(p.memory_info().rss, memory_peak) if memory_limit > 0 and memory_peak > memory_limit: p.kill() future.cancel() output = "F_memory_limit_exceeded" if raise_exception: raise MemoryError( f"Memory limit exceeded: {memory_peak} > {memory_limit}" ) break time.sleep(memory_tracing_interval) if output is None: output = future.result() else: output = func(*args, **kwargs) timestamp_end = time.time() new_metadata = { "timestamp_start": timestamp_start, "timestamp_end": timestamp_end, } if memory: new_metadata["memory"] = memory_peak # Format correctly the output to return metadata if isinstance(output, dict): if "output" in output: if "metadata" not in output: output["metadata"] = {} else: output = {"output": output, "metadata": {}} else: output = {"output": output, "metadata": {}} output["metadata"].update(new_metadata) return output if asyncio.iscoroutinefunction(func): return async_wrapper_profile else: return sync_wrapper_profile if _func is None: return decorator_profile else: return decorator_profile(_func)
def slow_down(_func=None, *, rate=1): """Sleep given amount of seconds before calling the function.""" def decorator_slow_down(func): @functools.wraps(func) def wrapper_slow_down(*args, **kwargs): time.sleep(rate) return func(*args, **kwargs) return wrapper_slow_down if _func is None: return decorator_slow_down else: return decorator_slow_down(_func)