Source code for gwf.backends.base
import json
import logging
import os.path
from enum import Enum
import attrs
from ..utils import entry_points
from .exceptions import TargetError
logger = logging.getLogger(__name__)
[docs]
class BackendStatus(Enum):
"""BackendStatus of a target.
A target is unknown to the backend if it has not been submitted or the
target has completed and thus isn't being tracked anymore by the backend.
A target is submitted if it has been successfully submitted to the backend
and is pending execution.
A target is running if it is currently being executed by the backend.
"""
UNKNOWN = 0
SUBMITTED = 1
RUNNING = 2
COMPLETED = 3
FAILED = 4
CANCELLED = 5
[docs]
def guess_backend():
max_score = -1000
chosen_backend = None
for backend_name, (_, score) in discover_backends().items():
if score > max_score:
max_score = score
chosen_backend = backend_name
return max_score, chosen_backend
[docs]
def discover_backends():
return {ep.name: ep.load() for ep in entry_points(group="gwf.backends")}
[docs]
def list_backends():
"""Return the names of all registered backends."""
return set(discover_backends().keys())
[docs]
def create_backend(name, working_dir, config):
"""Return backend class for the backend given by `name`.
Returns the backend class registered with `name`. Note that the *class*
is returned, not the instance, since not all uses requires
initialization of the backend (e.g. accessing the backends' log
manager), and initialization of the backend may be expensive.
:arg str name: Path to a workflow file, optionally specifying a
workflow object in that file.
"""
backend_args = config.get_namespace(f"backend.{name}")
backend_cls, _ = discover_backends()[name]
return backend_cls(working_dir=working_dir, **backend_args)
@attrs.define()
class TrackingBackend:
working_dir: str = attrs.field()
name: str = attrs.field()
ops: object = attrs.field()
_tracked_jobs: dict = attrs.field(init=False, repr=False)
_job_states: dict = attrs.field(init=False, repr=False)
@_tracked_jobs.default
def _init_tracked(self):
try:
with open(self._get_state_path()) as state_file:
return json.load(state_file)
except FileNotFoundError:
return {}
@_job_states.default
def _init_status(self):
return self.ops.get_job_states(list(self._tracked_jobs.values()))
def _get_state_path(self):
return os.path.join(
self.working_dir, ".gwf", f"{self.name}-backend-tracked.json"
)
def status(self, target):
job_id = self._tracked_jobs.get(target.name)
return self._job_states.get(job_id, BackendStatus.UNKNOWN)
def submit(self, target, dependencies):
dependency_ids = [self._tracked_jobs[dep.name] for dep in dependencies]
job_id = self.ops.submit_target(target, dependency_ids)
self._tracked_jobs[target.name] = job_id
self._job_states[job_id] = BackendStatus.SUBMITTED
def get_tracked_id(self, target):
return self._tracked_jobs.get(target.name)
def cancel(self, target):
try:
self.ops.cancel_job(self._tracked_jobs[target.name])
except KeyError as exc:
raise TargetError(target.name) from exc
def close(self):
self.ops.close()
with open(self._get_state_path(), "w") as state_file:
json.dump(self._tracked_jobs, state_file)
@property
def target_defaults(self):
return self.ops.target_defaults
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()