Source code for gwf.backends.base

import logging
from enum import Enum
from pkg_resources import iter_entry_points

from ..utils import PersistableDict, retry
from .exceptions import BackendError, DependencyError, TargetError
from .logmanager import FileLogManager

logger = logging.getLogger(__name__)


__all__ = (
    "list_backends",
    "backend_from_name",
    "backend_from_config",
    "Backend",
    "Status",
)


def _load_backends():
    return {ep.name: ep.load() for ep in iter_entry_points("gwf.backends")}


[docs]def list_backends(): """Return the names of all registered backends.""" return set(_load_backends().keys())
[docs]def backend_from_name(name): """Return backend class for the backend given by `name`. Returns the backend class registered with `name`. Note that the *class* is returned, not the instance, since not all uses requires initialization of the backend (e.g. accessing the backends' log manager), and initialization of the backend may be expensive. :arg str name: Path to a workflow file, optionally specifying a workflow object in that file. """ return _load_backends()[name]
[docs]def backend_from_config(config): """Return backend class for the backend specified by `config`. See :func:`backend_from_name` for further information.""" return backend_from_name(config["backend"])
[docs]class Status(Enum): """Status of a target. A target is unknown to the backend if it has not been submitted or the target has completed and thus isn't being tracked anymore by the backend. A target is submitted if it has been successfully submitted to the backend and is pending execution. A target is running if it is currently being executed by the backend. """ UNKNOWN = ( 0 ) #: The backend is not aware of the status of this target (it may be completed or failed). SUBMITTED = 1 #: The target has been submitted, but is not currently running. RUNNING = 2 #: The target is currently running.
[docs]class Backend: """Base class for backends.""" option_defaults = {} log_manager = FileLogManager() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def status(self, target): """Return the status of `target`. :param gwf.Target target: The target to return the status of. :return gwf.backends.Status: Status of `target`. """
[docs] def submit(self, target, dependencies): """Submit `target` with `dependencies`. This method must submit the `target` and return immediately. That is, the method must not block while waiting for the target to complete. :param gwf.Target target: The target to submit. :param dependencies: An iterable of :class:`gwf.Target` objects that `target` depends on and that have already been submitted to the backend. """
[docs] def cancel(self, target): """Cancel `target`. :param gwf.Target target: The target to cancel. :raises gwf.exception.TargetError: If the target does not exist in the workflow. """
[docs] @classmethod def logs(cls, target, stderr=False): """Return log files for a target. If the backend cannot return logs a :class:`~gwf.exceptions.NoLogFoundError` is raised. By default standard output (stdout) is returned. If `stderr=True` standard error will be returned instead. :param gwf.Target target: Target to return logs for. :param bool stderr: default: False. If true, return standard error. :return: A file-like object. The user is responsible for closing the returned file(s) after use. :raises gwf.exceptions.NoLogFoundError: if the backend could not find a log for the given target. """ if stderr: return cls.log_manager.open_stderr(target) return cls.log_manager.open_stdout(target)
[docs] def close(self): """Close the backend. Called when the backend is no longer needed and should close all resources (open files, connections) used by the backend. """
class PbsLikeBackendBase(Backend): """PBS-like backend base class.""" option_flags = {} option_defaults = {} log_manager = FileLogManager() def __init__(self): try: self._status = self.parse_queue_output(self.call_queue_command()) except retry.RetryError as exc: raise BackendError("Could not get queue state") from exc class_name = self.__class__.__name__ backend_name = class_name.strip("Backend").lower() path = ".gwf/{name}-backend-tracked.json".format(name=backend_name) self._tracked = PersistableDict(path=path) def parse_queue_output(self): raise NotImplementedError("parse_queue_output") def call_queue_command(self): raise NotImplementedError("call_queue_command") def call_submit_command(self): raise NotImplementedError("call_submit_command") def call_cancel_command(self): raise NotImplementedError("call_cancel_command") def compile_script(self, target): raise NotImplementedError("compile_script") def status(self, target): try: return self._get_status(target) except KeyError: return Status.UNKNOWN def submit(self, target, dependencies): script = self.compile_script(target) dependency_ids = self._collect_dependency_ids(dependencies) try: stdout = self.call_submit_command(script, dependency_ids) except retry.RetryError as exc: raise BackendError("Could not submit target") from exc else: job_id = stdout.strip() self._add_job(target, job_id) def cancel(self, target): try: job_id = self.get_job_id(target) self.call_cancel_command(job_id) except KeyError as exc: raise TargetError(target.name) from exc except retry.RetryError as exc: raise BackendError("Could not cancel target") from exc else: self.forget_job(target) def close(self): self._tracked.persist() def forget_job(self, target): """Force the backend to forget the job associated with `target`.""" job_id = self.get_job_id(target) del self._status[job_id] del self._tracked[target.name] def get_job_id(self, target): """Get the Slurm job id for a target. :raises KeyError: if the target is not tracked by the backend. """ return self._tracked[target.name] def _add_job(self, target, job_id, initial_status=Status.SUBMITTED): self._set_job_id(target, job_id) self._set_status(target, initial_status) def _set_job_id(self, target, job_id): self._tracked[target.name] = job_id def _get_status(self, target): job_id = self.get_job_id(target) return self._status[job_id] def _set_status(self, target, status): job_id = self.get_job_id(target) self._status[job_id] = status def _collect_dependency_ids(self, dependencies): try: return [self._tracked[dep.name] for dep in dependencies] except KeyError as exc: raise DependencyError(exc.args[0])