import hashlib
import inspect
import json
import logging
import os
import os.path
import unicodedata
from collections import defaultdict
from collections.abc import Mapping
from enum import Enum
from os import fspath
import attrs
import click
from .exceptions import GWFError
from .utils import is_valid_name, timer
logger = logging.getLogger(__name__)
def _flatten(t):
res = []
def flatten_rec(g):
if isinstance(g, str) or hasattr(g, "__fspath__"):
res.append(g)
elif isinstance(g, Mapping):
for k, v in g.items():
flatten_rec(v)
else:
for v in g:
flatten_rec(v)
flatten_rec(t)
return res
def _has_nonprintable_char(s):
chars = enumerate((unicodedata.category(char) == "Cc", char) for char in s)
for pos, (unprintable, char) in chars:
if unprintable:
return (
s.encode("unicode_escape").decode("utf-8"),
char.encode("unicode_escape").decode("utf-8"),
pos,
)
return None
class InvalidPathError(GWFError):
pass
def _check_path(path):
if not path:
raise InvalidPathError("Path is empty")
result = _has_nonprintable_char(path)
if result is not None:
clean_path, char, pos = result
raise InvalidPathError(
f"Path {clean_path} contains a non-printable character '{char}' at {pos}. "
f"This is always unintentional and can cause strange behaviour."
)
def _norm_path(working_dir, path):
path = fspath(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(working_dir, path))
def _norm_paths(working_dir, paths):
return [_norm_path(working_dir, path) for path in paths]
def hash_spec(spec):
return hashlib.sha1(spec.encode("utf-8")).hexdigest()
@attrs.define
class NoopSpecHashes:
def has_changed(self, target): # pragma: no cover
return None
def update(self, target): # pragma: no cover
pass
def invalidate(self, target): # pragma: no cover
pass
def close(self): # pragma: no cover
pass
def __enter__(self): # pragma: no cover
return self
def __exit__(self, *exc): # pragma: no cover
self.close()
@attrs.define
class FileSpecHashes:
path: str = attrs.field()
hashes: dict = attrs.field(factory=dict, init=False)
def __attrs_post_init__(self):
try:
with open(self.path) as hashes_file:
self.hashes = json.load(hashes_file)
except FileNotFoundError:
logger.debug("First run with spec hashes enabled")
def has_changed(self, target):
spec_hash = hash_spec(target.spec)
saved_hash = self.hashes.get(target.name)
if saved_hash is None:
logger.debug("No spec hash for %s exists", target)
return spec_hash
if spec_hash != saved_hash:
return spec_hash
return None
def update(self, target):
self.hashes[target.name] = hash_spec(target.spec)
def invalidate(self, target):
try:
del self.hashes[target.name]
except KeyError:
pass
def close(self):
with open(self.path, "w") as hashes_file:
json.dump(self.hashes, hashes_file)
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
def get_spec_hashes(*, working_dir, config):
if config.get("use_spec_hashes"):
return FileSpecHashes(os.path.join(working_dir, ".gwf", "spec-hashes.json"))
else:
return NoopSpecHashes()
class Status(Enum):
"""BackendStatus of a target as computed by the Scheduler."""
SHOULDRUN = 0 #: The target should run.
SUBMITTED = 1 #: The target has been submitted, but is not currently running.
RUNNING = 2 #: The target is currently running.
COMPLETED = 3 #: The target has completed and should not run.
FAILED = 4 #: The target failed and should run again.
CANCELLED = 5 #: The target or one of its dependencies was cancelled.
[docs]
@attrs.define(eq=False)
class AnonymousTarget:
"""Represents an unnamed target.
An anonymous target is an unnamed, abstract target much like the tuple
returned by function templates. Thus, `AnonymousTarget` can also be used as
the return value of a template function.
:ivar list inputs:
A string, list or dictionary containing inputs to the target.
:ivar list outputs:
A string, list or dictionary containing outputs to the target.
:ivar dict options:
Options such as number of cores, memory requirements etc. Options are
backend-dependent. Backends will ignore unsupported options.
:ivar str group:
The group to which the target is assigned.
:ivar str working_dir:
Working directory of this target.
:ivar str spec:
The specification of the target.
:ivar set protect:
An iterable of protected files which will not be removed during
cleaning, even if this target is not an endpoint.
"""
inputs: list = attrs.field()
outputs: list = attrs.field()
options: dict = attrs.field()
group: str = attrs.field(default=None)
working_dir: str = attrs.field(default=".")
protect: set = attrs.field(factory=set, converter=set)
spec: str = attrs.field(default="")
def __attrs_post_init__(self):
if self.group is None:
self.group = (
inspect.stack()[2].function
or inspect.currentframe().f_back.f_code.co_name
)
def _validate_path(instance, attribute, value):
for path in _flatten(value):
_check_path(path)
[docs]
@attrs.define(eq=False)
class Target:
"""Represents a target.
This class inherits from :class:`AnonymousTarget`.
A target is a named unit of work that declare their file *inputs* and
*outputs*. Target names must be valid Python identifiers.
A script (or spec) is associated with the target. The script must be a
valid Bash script and should produce the files declared as *outputs* and
consume the files declared as *inputs*. Both parameters must be provided
explicitly, even if no inputs or outputs are needed. In that case, provide
the empty list::
Target('Foo', inputs=[], outputs=[], options={}, working_dir='/tmp')
The *inputs* and *outputs* arguments can either be a string, a list or
a dictionary. If a dictionary is given, the keys act as names for the
files. The values may be either strings or a list of strings::
foo = Target(
name='foo',
inputs={'A': ['a1', 'a2'], 'B': 'b'},
outputs={'C': ['a1b', 'a2b'], 'D': 'd'},
)
This is useful for referring the outputs of a target::
bar = Target(
name='bar',
inputs=foo.outputs['C'],
outputs='result',
)
The target can also specify an *options* dictionary specifying the
resources needed to run the target. The options are consumed by the backend
and may be ignored if the backend doesn't support a given option. For
example, we can set the *cores* option to set the number of cores that the
target uses::
Target('Foo', inputs=[], outputs=[], options={'cores': 16}, working_dir='/tmp')
To see which options are supported by your backend of choice, see the
documentation for the backend.
:ivar str name:
Name of the target.
:ivar str group:
The group to which the target is assigned.
.. versionchanged:: 1.6.0
Named inputs and outputs were added. Prior versions require *inputs*
and *outputs* to be lists.
"""
name: str = attrs.field()
inputs: list = attrs.field(validator=_validate_path)
outputs: list = attrs.field(validator=_validate_path)
options: dict = attrs.field()
group: str = attrs.field(default=None)
working_dir: str = attrs.field(default=".")
protect: set = attrs.field(factory=set, converter=set)
spec: str = attrs.field(default="")
order: int = attrs.field(init=False)
_creation_order = 0
@order.default # type: ignore
def _set_target_order(self):
order = Target._creation_order
Target._creation_order += 1
return order
@name.validator # type: ignore
def _validate_name(self, attribute, value):
if not is_valid_name(self.name):
raise GWFError(f"Target defined with invalid name: {value}")
@working_dir.validator
def _validate_working_dir(self, attribute, value):
_check_path(value)
def __lshift__(self, spec):
self.spec = spec
return self
def __str__(self):
return self.name
def flattened_inputs(self):
return _norm_paths(self.working_dir, _flatten(self.inputs))
def flattened_outputs(self):
return _norm_paths(self.working_dir, _flatten(self.outputs))
def protected(self):
return set(_norm_paths(self.working_dir, _flatten(self.protect)))
class CircularDependencyError(GWFError):
pass
class FileProvidedByMultipleTargetsError(GWFError):
pass
class UnresolvedInputError(GWFError):
pass
@timer("Checked for circular dependencies in %.3fms", logger=logger)
def check_for_circular_dependencies(targets, dependencies):
logger.debug("Checking for circular dependencies")
fresh, started, done = 0, 1, 2
nodes = targets.values()
state = dict((n, fresh) for n in nodes)
def visitor(node):
state[node] = started
for dep in dependencies[node]:
if state[dep] == started:
raise CircularDependencyError(
"Target {} depends on itself.".format(node)
)
elif state[dep] == fresh:
visitor(dep)
state[node] = done
for node in nodes:
if state[node] == fresh:
visitor(node)
[docs]
@attrs.define
class Graph:
"""Represents a dependency graph for a set of targets.
The graph represents the targets present in a workflow, but also their
dependencies and the files they provide.
During construction of the graph the dependencies between targets are
determined by looking at target inputs and outputs. If a target specifies a
file as input, the file must either be provided by another target or
already exist on disk. In case that the file is provided by another target,
a dependency to that target will be added:
:ivar dict dependencies:
A dictionary mapping a target to a set of its dependencies.
If the file is not provided by another target, the file is *unresolved*:
:ivar set unresolved:
A set containing file paths of all unresolved files.
If the graph is constructed successfully, the following instance variables
will be available:
:ivar dict targets:
A dictionary mapping target names to instances of :class:`gwf.Target`.
:ivar dict provides:
A dictionary mapping a file path to the target that provides that path.
:ivar dict dependents:
A dictionary mapping a target to a set of all targets which depend on
the target.
The graph can be manipulated in arbitrary, diabolic ways after it has been
constructed. Checks are only performed at construction-time, thus
introducing e.g. a circular dependency by manipulating *dependencies* will
not raise an exception.
"""
targets: dict = attrs.field()
provides: dict = attrs.field()
dependencies: defaultdict = attrs.field()
dependents: defaultdict = attrs.field()
unresolved: set = attrs.field()
[docs]
@classmethod
def from_targets(cls, targets, fs):
"""Construct a dependency graph from a set of targets.
When a graph is initialized it computes all dependency relations
between targets, ensuring that the graph is semantically sane.
Therefore, construction of the graph is an expensive operation which
may raise a number of exceptions:
:raises gwf.exceptions.FileProvidedByMultipleTargetsError:
Raised if the same file is provided by multiple targets.
:raises gwf.exceptions.CircularDependencyError:
Raised if the graph contains a circular dependency.
"""
provides = {}
unresolved = set()
dependencies = defaultdict(set)
dependents = defaultdict(set)
logger.debug("Building dependency graph from %d targets", len(targets))
if isinstance(targets, dict):
targets = targets.values()
with timer("Built dependency graph in %.3fms", logger=logger):
for target in targets:
for path in target.flattened_outputs():
if path in provides:
msg = 'File "{}" provided by targets "{}" and "{}".'.format(
path, provides[path].name, target
)
raise FileProvidedByMultipleTargetsError(msg)
provides[path] = target
for target in targets:
for path in target.flattened_inputs():
if path in provides:
dependencies[target].add(provides[path])
else:
unresolved.add(path)
for target, deps in dependencies.items():
for dep in deps:
dependents[dep].add(target)
# Check whether all input files actually exist or are being provided
# by another target. If not, it's an error.
for target in targets:
for path in target.flattened_inputs():
if path in unresolved and not fs.exists(path):
msg = (
'File "{}" is required by "{}", but does not exist and is not '
"provided by any target in the workflow."
).format(path, target)
raise UnresolvedInputError(msg)
targets = {target.name: target for target in targets}
check_for_circular_dependencies(targets, dependencies)
return cls(
targets=targets,
provides=provides,
dependencies=dependencies,
dependents=dependents,
unresolved=unresolved,
)
[docs]
def endpoints(self):
"""Return a set of all targets that are not depended on by other targets."""
return set(self.targets.values()) - set(self.dependents.keys())
[docs]
def dfs(self, root):
"""Return the depth-first traversal path through a graph from `root`."""
visited = set()
path = []
def dfs_inner(node):
if node in visited:
return
visited.add(node)
for dep in self.dependencies[node]:
dfs_inner(dep)
path.append(node)
dfs_inner(root)
return path
def __iter__(self):
return iter(self.targets.values())
def __len__(self):
return len(self.targets)
def __getitem__(self, target_name):
return self.targets[target_name]
def __contains__(self, target_name):
return target_name in self.targets
@attrs.define
class CachedFilesystem:
"""A cached file system abstraction."""
_cache: dict = attrs.field(factory=dict)
def _lookup_file(self, path):
if path not in self._cache:
try:
st = os.stat(path)
except FileNotFoundError:
self._cache[path] = None
else:
self._cache[path] = st.st_mtime
return self._cache[path]
def exists(self, path):
return self._lookup_file(path) is not None
def changed_at(self, path):
st = self._lookup_file(path)
if st is None:
raise FileNotFoundError(path)
return st
@attrs.frozen
class Context:
working_dir = attrs.field()
config = attrs.field()
backend = attrs.field()
workflow_file = attrs.field()
workflow_obj = attrs.field()
@property
def config_dir(self):
return os.path.join(self.working_dir, ".gwf")
@property
def logs_dir(self):
return os.path.join(self.config_dir, "logs")
pass_context = click.make_pass_decorator(Context)