"""A scheduler for running jobs locally in a parallel manner using joblib as
a backend.
"""
import multiprocessing as mp
import time
from typing import List
import joblib
from hypertunity import utils
from .jobs import Job, Result
__all__ = [
"Scheduler"
]
[docs]class Scheduler:
"""A manager for parallel execution of jobs.
A job must be of type :class:`Job` which produces a :class:`Result`
object upon successful completion. The scheduler maintains a job and
result queues.
Notes:
This class should be used as a context manager.
"""
[docs] def __init__(self, n_parallel: int = None):
"""Setup the job and results queues.
Args:
n_parallel: (optional) :obj:`int`. The number of jobs that can be
run in parallel. Defaults to `None` in which case all but one
available CPUs will be used.
"""
self._job_queue = mp.Queue()
self._result_queue = mp.Queue()
self._is_queue_closed = False
if n_parallel is None:
self.n_parallel = -2 # using all CPUs but 1
else:
self.n_parallel = max(n_parallel, 1)
self._servant = mp.Process(target=self._run_servant)
self._interrupt_event = mp.Event()
self._servant.start()
[docs] def __del__(self):
"""Clean up subprocesses on object deletion.
Close the queues and join all subprocesses before the object is deleted.
"""
if not self._is_queue_closed:
self.exit()
if self._servant.is_alive():
self._servant.terminate()
[docs] def __enter__(self):
"""Enter the context manager."""
return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit the context manager."""
self.exit()
def _run_servant(self):
"""Run the pool of workers on the dispatched jobs, fetched from the job
queue and collect the results into the result queue.
Notes:
The runner will take as long as all jobs from the job queue finish
before any results are written to the result queue.
"""
# TODO: Switch backend back to default "loky", after the leakage
# of semaphores is fixed
with joblib.Parallel(n_jobs=self.n_parallel,
backend="multiprocessing") as parallel:
while not self._interrupt_event.is_set():
current_jobs = utils.drain_queue(self._job_queue)
if not current_jobs:
continue
# the order of the results corresponds to the that of the jobs
# and the IDs don't need to be shuffled.
ids = [job.id for job in current_jobs]
# TODO: in a future version of joblib, this could be a generator
# and then the inputs would be stored immediately in the results
# queue. Be ready to update whenever this PR gets merged:
# https://github.com/joblib/joblib/pull/588
results = parallel(joblib.delayed(job)() for job in current_jobs)
assert len(ids) == len(results)
for res in results:
self._result_queue.put_nowait(res)
[docs] def dispatch(self, jobs: List[Job]):
"""Dispatch the jobs for parallel execution.
This method is non-blocking.
Args:
jobs: :obj:`List[Job]`. A list of jobs to run whenever resources
are available.
Notes:
Although the jobs are scheduled to run immediately, the actual
execution may take place after indefinite delay if the job runner
is occupied with older jobs.
"""
for job in jobs:
self._job_queue.put_nowait(job)
[docs] def collect(self, n_results: int, timeout: float = None) -> List[Result]:
"""Collect all the available results or wait until they become available.
Args:
n_results: :obj:`int`, number of results to wait for.
If `n_results` ≤ 0 then all available results will be returned.
timeout: (optional) :obj:`float`, number of seconds to wait for
results to appear. If None (default) then it will wait until
all `n_results` are collected.
Returns:
A list of :class:`Result` objects with length `n_results` at least.
Notes:
If `n_results` is overestimated and timeout is None, then this
method will hang forever. Therefore it is recommended that a timeout
is set.
Raises:
:obj:`TimeoutError`: if more than `timeout` seconds elapse before a
:class:`Result` is collected.
"""
if n_results > 0:
results = []
for i in range(n_results):
results.append(self._result_queue.get(block=True, timeout=timeout))
else:
results = utils.drain_queue(self._result_queue)
return results
[docs] def interrupt(self):
"""Interrupt the scheduler and all running jobs."""
self._interrupt_event.set()
[docs] def exit(self):
"""Exit the scheduler by closing the queues and terminating the
servant process.
"""
if not self._is_queue_closed:
utils.drain_queue(self._job_queue, close_queue=True)
self._job_queue.join_thread()
utils.drain_queue(self._result_queue, close_queue=True)
self._result_queue.join_thread()
self._is_queue_closed = True
self.interrupt()
# wait a bit for the subprocess to exit gracefully
n_retries = 3
while self._servant.is_alive() and n_retries > 0:
n_retries -= 1
time.sleep(0.05)
self._servant.terminate()