"""A scheduler for running jobs locally in a parallel manner using joblib as
import multiprocessing as mp
from typing import List
from hypertunity import utils
from .jobs import Job, Result
__all__ = [
"""A manager for parallel execution of jobs.
A job must be of type :class:`Job` which produces a :class:`Result`
object upon successful completion. The scheduler maintains a job and
This class should be used as a context manager.
[docs] def __init__(self, n_parallel: int = None):
"""Setup the job and results queues.
n_parallel: (optional) :obj:`int`. The number of jobs that can be
run in parallel. Defaults to `None` in which case all but one
available CPUs will be used.
self._job_queue = mp.Queue()
self._result_queue = mp.Queue()
self._is_queue_closed = False
if n_parallel is None:
self.n_parallel = -2 # using all CPUs but 1
self.n_parallel = max(n_parallel, 1)
self._servant = mp.Process(target=self._run_servant)
self._interrupt_event = mp.Event()
[docs] def __del__(self):
"""Clean up subprocesses on object deletion.
Close the queues and join all subprocesses before the object is deleted.
if not self._is_queue_closed:
[docs] def __enter__(self):
"""Enter the context manager."""
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager."""
"""Run the pool of workers on the dispatched jobs, fetched from the job
queue and collect the results into the result queue.
The runner will take as long as all jobs from the job queue finish
before any results are written to the result queue.
# TODO: Switch backend back to default "loky", after the leakage
# of semaphores is fixed
backend="multiprocessing") as parallel:
while not self._interrupt_event.is_set():
current_jobs = utils.drain_queue(self._job_queue)
if not current_jobs:
# the order of the results corresponds to the that of the jobs
# and the IDs don't need to be shuffled.
ids = [job.id for job in current_jobs]
# TODO: in a future version of joblib, this could be a generator
# and then the inputs would be stored immediately in the results
# queue. Be ready to update whenever this PR gets merged:
results = parallel(joblib.delayed(job)() for job in current_jobs)
assert len(ids) == len(results)
for res in results:
[docs] def dispatch(self, jobs: List[Job]):
"""Dispatch the jobs for parallel execution.
This method is non-blocking.
jobs: :obj:`List[Job]`. A list of jobs to run whenever resources
Although the jobs are scheduled to run immediately, the actual
execution may take place after indefinite delay if the job runner
is occupied with older jobs.
for job in jobs:
[docs] def collect(self, n_results: int, timeout: float = None) -> List[Result]:
"""Collect all the available results or wait until they become available.
n_results: :obj:`int`, number of results to wait for.
If `n_results` ≤ 0 then all available results will be returned.
timeout: (optional) :obj:`float`, number of seconds to wait for
results to appear. If None (default) then it will wait until
all `n_results` are collected.
A list of :class:`Result` objects with length `n_results` at least.
If `n_results` is overestimated and timeout is None, then this
method will hang forever. Therefore it is recommended that a timeout
:obj:`TimeoutError`: if more than `timeout` seconds elapse before a
:class:`Result` is collected.
if n_results > 0:
results = 
for i in range(n_results):
results = utils.drain_queue(self._result_queue)
[docs] def interrupt(self):
"""Interrupt the scheduler and all running jobs."""
[docs] def exit(self):
"""Exit the scheduler by closing the queues and terminating the
if not self._is_queue_closed:
self._is_queue_closed = True
# wait a bit for the subprocess to exit gracefully
n_retries = 3
while self._servant.is_alive() and n_retries > 0:
n_retries -= 1