Source code for gwf.core

import logging
import os
import os.path
from collections import defaultdict
from enum import Enum

from .backends import Status
from .exceptions import WorkflowError
from .utils import cache, timer
from .workflow import Workflow

logger = logging.getLogger(__name__)

class TargetStatus(Enum):
    """Status of a target as computed by the Scheduler."""

    SHOULDRUN = 0  #: The target should run.
    SUBMITTED = 1  #: The target has been submitted, but is not currently running.
    RUNNING = 2  #: The target is currently running.
    COMPLETED = 3  #: The target has completed and should not run.

[docs]class Graph: """Represents a dependency graph for a set of targets. The graph represents the targets present in a workflow, but also their dependencies and the files they provide. During construction of the graph the dependencies between targets are determined by looking at target inputs and outputs. If a target specifies a file as input, the file must either be provided by another target or already exist on disk. In case that the file is provided by another target, a dependency to that target will be added: :ivar dict dependencies: A dictionary mapping a target to a set of its dependencies. If the file is not provided by another target, the file is *unresolved*: :ivar set unresolved: A set containing file paths of all unresolved files. If the graph is constructed successfully, the following instance variables will be available: :ivar dict targets: A dictionary mapping target names to instances of :class:`gwf.Target`. :ivar dict provides: A dictionary mapping a file path to the target that provides that path. :ivar dict dependents: A dictionary mapping a target to a set of all targets which depend on the target. The graph can be manipulated in arbitrary, diabolic ways after it has been constructed. Checks are only performed at construction-time, thus introducing e.g. a circular dependency by manipulating *dependencies* will not raise an exception. :raises gwf.exceptions.WorkflowError: Raised if the workflow contains a circular dependency. """ def __init__(self, targets, provides, dependencies, dependents, unresolved): self.targets = targets self.provides = provides self.dependencies = dependencies self.dependents = dependents self.unresolved = unresolved self._check_for_circular_dependencies()
[docs] @classmethod def from_targets(cls, targets): """Construct a dependency graph from a set of targets. When a graph is initialized it computes all dependency relations between targets, ensuring that the graph is semantically sane. Therefore, construction of the graph is an expensive operation which may raise a number of exceptions: :raises gwf.exceptions.FileProvidedByMultipleTargetsError: Raised if the same file is provided by multiple targets. Since this method initializes the graph, it may also raise: :raises gwf.exceptions.WorkflowError: Raised if the workflow contains a circular dependency. """ provides = {} unresolved = set() dependencies = defaultdict(set) dependents = defaultdict(set) logger.debug("Building dependency graph from %d targets", len(targets)) with timer("Built dependency graph in %.3fms", logger=logger): for target in targets.values(): for path in target.flattened_outputs(): if path in provides: msg = 'File "{}" provided by targets "{}" and "{}".'.format( path, provides[path].name, target ) raise WorkflowError(msg) provides[path] = target for target in targets.values(): for path in target.flattened_inputs(): if path in provides: dependencies[target].add(provides[path]) else: unresolved.add(path) for target, deps in dependencies.items(): for dep in deps: dependents[dep].add(target) return cls( targets=targets, provides=provides, dependencies=dependencies, dependents=dependents, unresolved=unresolved, )
[docs] @classmethod def from_path(cls, path): """Return graph for the workflow given by `path`. Returns a :class:`~gwf.Graph` object containing the workflow graph of the workflow given by `path`. Note that calling this function computes the complete dependency graph which may take some time for large workflows. :arg str path: Path to a workflow file, optionally specifying a workflow object in that file. """ workflow = Workflow.from_path(path) return Graph.from_targets(workflow.targets)
[docs] @classmethod def from_config(cls, config): """Return graph for the workflow specified by `config`. See :func:`graph_from_path` for further information. """ return cls.from_path(config["file"])
@timer("Checked for circular dependencies in %.3fms", logger=logger) def _check_for_circular_dependencies(self): """Check for circular dependencies in the graph. Raises :class:`WorkflowError` if a circular dependency is found. """ logger.debug("Checking for circular dependencies") fresh, started, done = 0, 1, 2 nodes = self.targets.values() state = dict((n, fresh) for n in nodes) def visitor(node): state[node] = started for dep in self.dependencies[node]: if state[dep] == started: raise WorkflowError("Target {} depends on itself.".format(node)) elif state[dep] == fresh: visitor(dep) state[node] = done for node in nodes: if state[node] == fresh: visitor(node)
[docs] def endpoints(self): """Return a set of all targets that are not depended on by other targets.""" return set(self.targets.values()) - set(self.dependents.keys())
[docs] @cache def dfs(self, root): """Return the depth-first traversal path through a graph from `root`.""" visited = set() path = [] def dfs_inner(node): if node in visited: return visited.add(node) for dep in self.dependencies[node]: dfs_inner(dep) path.append(node) dfs_inner(root) return path
def __iter__(self): return iter(self.targets.values()) def __len__(self): return len(self.targets) def __getitem__(self, target_name): return self.targets[target_name] def __contains__(self, target_name): return target_name in self.targets
class CachedFilesystem: def __init__(self): self._cache = {} def _lookup_file(self, path): if path not in self._cache: try: st = os.stat(path) except FileNotFoundError: self._cache[path] = None else: self._cache[path] = st.st_mtime return self._cache[path] def exists(self, path): return self._lookup_file(path) is not None def changed_at(self, path): st = self._lookup_file(path) if st is None: raise FileNotFoundError(path) return st
[docs]class Scheduler: """Schedule one or more targets and submit to a backend. Scheduling a target will determine whether the target needs to run based on whether it already has been submitted and whether any of its dependencies have been submitted. Targets that should run will be submitted to *backend*, unless *dry_run* is set to ``True``. When scheduling a target, the scheduler checks whether any of its inputs are unresolved, meaning that during construction of the graph, no other target providing the file was found. This means that the file should then exist on disk. If it doesn't the following exception is raised: :raises gwf.exceptions.FileRequiredButNotProvidedError: Raised if a target has an input file that does not exist on the file system and that is not provided by another target. """ def __init__(self, graph, backend, dry_run=False, filesystem=CachedFilesystem()): """ :param gwf.Graph graph: Graph of the workflow. :param gwf.backends.Backend backend: An instance of :class:`gwf.backends.Backend` to which targets will be submitted. :param bool dry_run: If ``True``, targets will not be submitted to the backend. Defaults to ``False``. """ self.graph = graph self.backend = backend self.dry_run = dry_run self._filesystem = filesystem self._pretend_known = set()
[docs] def prepare_target_options(self, target): """Apply backend-specific option defaults to a target. Injects backend target defaults into the target options and checks whether the option in the given target are supported by the backend. Warns the user and removes the option if this is not the case. """ new_options = dict(self.backend.option_defaults) new_options.update(target.options) for option_name, option_value in list(new_options.items()): if option_name not in self.backend.option_defaults.keys(): logger.warning( 'Option "{}" used in "{}" is not supported by backend. Ignored.'.format( option_name, ) ) del new_options[option_name] elif option_value is None: del new_options[option_name] target.options = new_options
[docs] def schedule(self, target): """Schedule a target and its dependencies. Returns ``True`` if *target* was submitted to the backend (even when *dry_run* is ``True``). :param gwf.Target target: Target to be scheduled. """ logger.debug("Scheduling target %s", target) if ( self.backend.status(target) != Status.UNKNOWN or target in self._pretend_known ): logger.debug("Target %s has already been submitted", target) return True submitted_deps = set() for dependency in sorted(self.graph.dependencies[target], key=lambda t: was_submitted = self.schedule(dependency) if was_submitted: submitted_deps.add(dependency) if submitted_deps or self.should_run(target): self.prepare_target_options(target) if self.dry_run:"Would submit target %s", target) self._pretend_known.add(target) else:"Submitting target %s", target) self.backend.submit(target, dependencies=submitted_deps) return True else: logger.debug("Target %s should not run", target) return False
[docs] def schedule_many(self, targets): """Schedule multiple targets and their dependencies. This is a convenience method for scheduling multiple targets. See :func:`schedule` for a detailed description of the arguments and behavior. :param list targets: A list of targets to be scheduled. """ logger.debug("Scheduling %d targets", len(targets)) schedules = [] submitted_targets = 0 with timer("Scheduled targets in %.3fms", logger=logger): for target in targets: was_submitted = self.schedule(target) if was_submitted: submitted_targets += 1 schedules.append(was_submitted) logger.debug("Submitted %d targets", submitted_targets) return schedules
[docs] @cache def should_run(self, target): """Return whether a target should be run or not.""" for dep in self.graph.dependencies[target]: if self.should_run(dep): logger.debug( "%s should run because its dependency %s should run", target, dep ) return True # Check whether all input files actually exists are are being provided # by another target. If not, it's an error. for path in target.flattened_inputs(): if path in self.graph.unresolved and not self._filesystem.exists(path): msg = ( 'File "{}" is required by "{}", but does not exist and is not ' "provided by any target in the workflow." ).format(path, target) raise WorkflowError(msg) if target.is_sink: logger.debug("%s should run because it is a sink", target) return True for path in target.flattened_outputs(): if not self._filesystem.exists(path): logger.debug( "%s should run because its output file %s does not exist", target, path, ) return True if target.is_source: logger.debug("%s should not run because it is a source", target) return False youngest_in_ts, youngest_in_path = max( (self._filesystem.changed_at(path), path) for path in target.flattened_inputs() ) logger.debug( "%s is the youngest input file of %s with timestamp %s", youngest_in_path, target, youngest_in_ts, ) oldest_out_ts, oldest_out_path = min( (self._filesystem.changed_at(path), path) for path in target.flattened_outputs() ) logger.debug( "%s is the oldest output file of %s with timestamp %s", oldest_out_path, target, youngest_in_ts, ) if youngest_in_ts > oldest_out_ts: logger.debug( "%s should run because input file %s is newer than output file %s", target, youngest_in_path, oldest_out_path, ) return True return False
[docs] def status(self, target): """Return the status of a target. Returns the status of a target where it is taken into account whether the target should run or not. :param Target target: The target to return status for. """ status = self.backend.status(target) if status == Status.UNKNOWN: if self.should_run(target): return TargetStatus.SHOULDRUN return TargetStatus.COMPLETED return TargetStatus[]