Source code for deephyper.evaluator._queued
import collections
[docs]
def queued(evaluator_class): # noqa: D417
"""Decorator transforming an Evaluator into a ``Queued{Evaluator}``.
The ``run_function`` used with a ``Queued{Evaluator}`` needs to have a
``dequed`` keyword-argument where the dequed resources from the queue
will be passed.
Args:
queue (list):
A list of queued resources.
queue_pop_per_task (int, optional):
The number of resources popped out of the queue each time a task
is submitted. Defaults to ``1``.
"""
def __init__(self, *args, queue: list = None, queue_pop_per_task: int = 1, **kwargs):
evaluator_class.__init__(self, *args, **kwargs)
self.queue = collections.deque(queue[:])
self.queue_pop_per_task = queue_pop_per_task
async def execute(self, job):
dequed = [self.queue.popleft() for _ in range(self.queue_pop_per_task)]
self.run_function_kwargs["dequed"] = dequed
job = await evaluator_class.execute(self, job)
job.metadata["dequed"] = ",".join((str(item) for item in dequed))
self.queue.extend(dequed)
return job
cls_attrs = {"__init__": __init__, "execute": execute}
queued_evaluator_class = type(
f"Queued{evaluator_class.__name__}", (evaluator_class,), cls_attrs
)
return queued_evaluator_class