import collections
import collections.abc
import inspect
import os.path
import sys
import unicodedata
from .compat import fspath
from .exceptions import NameError, TypeError, WorkflowError
from .utils import is_valid_name
class PathError(WorkflowError):
def __init__(self, path, char, position):
self.path = path
self.char = char
self.position = position
class EmptyPathError(WorkflowError):
pass
def _check_path(path):
if not path:
raise EmptyPathError("Empty paths are not allowed")
for pos, char in enumerate(path):
if unicodedata.category(char) == "Cc":
raise PathError(
path.encode("unicode_escape").decode("utf-8"),
char.encode("unicode_escape").decode("utf-8"),
pos,
)
def _norm_path(working_dir, path):
path = fspath(path)
if os.path.isabs(path):
return path
return os.path.abspath(os.path.join(working_dir, path))
def _norm_paths(working_dir, paths):
return [_norm_path(working_dir, path) for path in paths]
def _flatten(t):
res = []
def flatten_rec(g):
if isinstance(g, str) or hasattr(g, "__fspath__"):
res.append(g)
elif isinstance(g, collections.abc.Mapping):
for k, v in g.items():
flatten_rec(v)
else:
for v in g:
flatten_rec(v)
flatten_rec(t)
return res
def select(lst, fields):
"""Select fields from an iterable of dictionaries.
Given an iterable of dictionaries and an iterable of key names, selects the
given key names from the dictionaries and returns an iterable of
dictionaries containing only those keys.
For example, given the list::
>>> lst = [
... {'A': 'a1', 'B': 'b1'},
... {'A': 'a2', 'B': 'b2'},
... {'A': 'a3', 'B': 'b3'},
... ]
We can select only the `A` keys from the dictionaries with::
>>> list(select(lst, ['A']))
[{'A': 'a1'}, {'A': 'a2'}, {'A': 'a3'}]
:param iterable lst:
An iterable of dictionaries.
:param iterable fields:
An iterable of key names.
"""
fields = tuple(fields)
for item in lst:
dct = {}
for name in fields:
dct[name] = item[name]
yield dct
def collect(lst, fields, rename=None):
"""Collect values from an iterable of dictionaries into a dictionary.
Given an iterable of dictionaries and an iterable of key names, collect the
value of each field name in `fields` and return a dictionary of lists for
each field.
For example, given the list:
>>> lst = [
... {'A': 'a1', 'B': 'b1'},
... {'A': 'a2', 'B': 'b2'},
... {'A': 'a3', 'B': 'b3'},
... ]
We can collect the values into a dictionary of lists with::
>>> collect(lst, ['A'])
{'As': ['a1', 'a2', 'a3']}
:param iterable lst:
An iterable of dictionaries.
:param iterable fields:
An iterable of key names.
"""
fields = tuple(fields)
if rename is None:
rename = {}
selected = collections.defaultdict(list)
for item in lst:
for name in fields:
selected_name = rename.get(name, name + "s")
selected[selected_name].append(item[name])
return dict(selected)
[docs]class TargetList(list):
"""A list of target objects with access to all inputs and outputs.
This is a thin wrapper around a normal list and thus provides all normal
``list`` methods. However, it provides access to the collective
inputs and outputs of the targets contained in the list.
"""
@property
def outputs(self):
"""Return a list of the outputs of all targets.
The returned list may be a list of strings, lists or dictionaries
depending on the form of the outputs of the contained targets.
"""
return [target.outputs for target in self]
@property
def inputs(self):
"""Return a list of the inputs of all targets.
The returned list may be a list of strings, lists or dictionaries
depending on the form of the inputs of the contained targets.
"""
return [target.inputs for target in self]
def __str__(self):
class_name = self.__class__.__name__
if not self:
return "{}(targets=[])".format(class_name)
return "{}(targets=[{!r}, ...])".format(class_name, self[0])
def __repr__(self):
return str(self)
[docs]class AnonymousTarget:
"""Represents an unnamed target.
An anonymous target is an unnamed, abstract target much like the tuple
returned by function templates. Thus, `AnonymousTarget` can also be used as
the return value of a template function.
:ivar list inputs:
A string, list or dictionary containing inputs to the target.
:ivar list outputs:
A string, list or dictionary containing outputs to the target.
:ivar dict options:
Options such as number of cores, memory requirements etc. Options are
backend-dependent. Backends will ignore unsupported options.
:ivar str working_dir:
Working directory of this target.
:ivar str spec:
The specification of the target.
:ivar set protect:
An iterable of protected files which will not be removed during
cleaning, even if this target is not an endpoint.
"""
_creation_order = 0
def __init__(
self, inputs, outputs, options, working_dir=None, spec="", protect=None
):
self.options = options
self.working_dir = working_dir
self.inputs = inputs
self.outputs = outputs
self._spec = spec
self.order = AnonymousTarget._creation_order
AnonymousTarget._creation_order += 1
if protect is None:
self.protected = set()
else:
self.protected = set(protect)
@property
def spec(self):
return self._spec
@spec.setter
def spec(self, value):
if not isinstance(value, str):
msg = (
"Target spec must be a string, not {}. Did you attempt to "
"assign a template to this target? This is no is not allowed "
"since version 1.0. Use the Workflow.target_from_template() "
"method instead. See the tutorial for more details."
)
raise TypeError(msg.format(type(value)))
self._spec = value
@property
def is_source(self):
"""Return whether this target is a source.
A target is a source if it does not depend on any files.
"""
return not self.inputs
@property
def is_sink(self):
"""Return whether this target is a sink.
A target is a sink if it does not output any files.
"""
return not self.outputs
def inherit_options(self, super_options):
options = super_options.copy()
options.update(self.options)
self.options = options
def __lshift__(self, spec):
self.spec = spec
return self
def __repr__(self):
return "{}(inputs={!r}, outputs={!r}, options={!r}, working_dir={!r}, spec={!r})".format(
self.__class__.__name__,
self.inputs,
self.outputs,
self.options,
self.working_dir,
self.spec,
)
def __str__(self):
return "{}_{}".format(self.__class__.__name__, id(self))
[docs]class Target(AnonymousTarget):
"""Represents a target.
This class inherits from :class:`AnonymousTarget`.
A target is a named unit of work that declare their file *inputs* and
*outputs*. Target names must be valid Python identifiers.
A script (or spec) is associated with the target. The script must be a
valid Bash script and should produce the files declared as *outputs* and
consume the files declared as *inputs*. Both parameters must be provided
explicitly, even if no inputs or outputs are needed. In that case, provide
the empty list::
Target('Foo', inputs=[], outputs=[], options={}, working_dir='/tmp')
The *inputs* and *outputs* arguments can either be a string, a list or
a dictionary. If a dictionary is given, the keys act as names for the
files. The values may be either strings or a list of strings::
foo = Target(
name='foo',
inputs={'A': ['a1', 'a2'], 'B': 'b'},
outputs={'C': ['a1b', 'a2b], 'D': 'd},
)
This is useful for referring the outputs of a target::
bar = Target(
name='bar',
inputs=foo.outputs['C'],
outputs='result',
)
The target can also specify an *options* dictionary specifying the
resources needed to run the target. The options are consumed by the backend
and may be ignored if the backend doesn't support a given option. For
example, we can set the *cores* option to set the number of cores that the
target uses::
Target('Foo', inputs=[], outputs=[], options={'cores': 16}, working_dir='/tmp')
To see which options are supported by your backend of choice, see the
documentation for the backend.
:ivar str name:
Name of the target.
.. versionchanged:: 1.6.0
Named inputs and outputs were added. Prior versions require *inputs*
and *outputs* to be lists.
"""
def __init__(
self, name, inputs, outputs, options, working_dir=None, spec="", protect=None
):
self.name = name
if not is_valid_name(self.name):
raise NameError('Target defined with invalid name: "{}".'.format(self.name))
for path in inputs:
_check_path(path)
for path in outputs:
_check_path(path)
super().__init__(
inputs=inputs,
outputs=outputs,
options=options,
working_dir=working_dir,
spec=spec,
protect=protect,
)
def flattened_inputs(self):
return _norm_paths(self.working_dir, _flatten(self.inputs))
def flattened_outputs(self):
return _norm_paths(self.working_dir, _flatten(self.outputs))
[docs] @classmethod
def empty(cls, name):
"""Return a target with no inputs, outputs and options.
This is mostly useful for testing.
"""
return cls(
name=name, inputs=[], outputs=[], options={}, working_dir=os.getcwd()
)
def __repr__(self):
return "{}(name={!r}, ...)".format(self.__class__.__name__, self.name)
def __str__(self):
return self.name
[docs]class Workflow(object):
"""Represents a workflow.
This is the most central user-facing abstraction in *gwf*.
A workflow consists of a collection of targets and has methods for adding
targets to the workflow in two different ways. A workflow can be initialized
with the following arguments:
:ivar str name: initial value: None
The name is used for namespacing when including workflows. See
:func:`~include` for more details on namespacing.
:ivar str working_dir:
The directory containing the file where the workflow was initialized.
All file paths used in targets added to this workflow are relative to
the working directory.
:ivar dict defaults:
A dictionary with defaults for target options.
By default, *working_dir* is set to the directory of the workflow file which
initialized the workflow. However, advanced users may wish to set it manually.
Targets added to the workflow will inherit the workflow working directory.
The *defaults* argument is a dictionary of option defaults for targets and
overrides defaults provided by the backend. Targets can override the
defaults individually. For example::
gwf = Workflow(defaults={
'cores': 12,
'memory': '16g',
})
gwf.target('Foo', inputs=[], outputs=[]) << \"\"\"echo hello\"\"\"
gwf.target('Bar', inputs=[], outputs=[], cores=2) << \"\"\"echo world\"\"\"
In this case `Foo` and `Bar` inherit the `cores` and `memory` options set in
`defaults`, but `Bar` overrides the `cores` option.
See :func:`~include` for a description of the use of the `name` argument.
"""
def __init__(self, name=None, working_dir=None, defaults=None):
self.name = name
if self.name is not None and not is_valid_name(self.name):
raise NameError(
'Workflow defined with invalid name: "{}".'.format(self.name)
)
self.targets = {}
self.defaults = defaults or {}
self.working_dir = working_dir
if self.working_dir is None:
# Get the frame object of whatever called the Workflow.__init__
# and extract the path of the file which is was defined in. Then
# normalize the path and get the directory of the file.
filename = inspect.getfile(sys._getframe(1))
self.working_dir = os.path.dirname(os.path.realpath(filename))
def _add_target(self, target):
if target.name in self.targets:
raise WorkflowError(
'Target "{}" already exists in workflow.'.format(target.name)
)
self.targets[target.name] = target
[docs] def target(self, name, inputs, outputs, **options):
"""Create a target and add it to the :class:`gwf.Workflow`.
This is syntactic sugar for creating a new :class:`~gwf.Target` and
adding it to the workflow. The target is also returned from the method
so that the user can directly manipulate it, if necessary. For example,
this allows assigning a spec to a target directly after defining it::
workflow = Workflow()
workflow.target('NewTarget', inputs=['test.txt', 'out.txt']) <<< '''
cat test.txt > out.txt
echo hello world >> out.txt
'''
This will create a new target named `NewTarget`, add it to the workflow
and assign a spec to the target.
:param str name: Name of the target.
:param iterable inputs: List of files that this target depends on.
:param iterable outputs: List of files that this target produces.
Any further keyword arguments are passed to the backend.
"""
new_target = Target(
name=name,
inputs=inputs,
outputs=outputs,
options=options,
working_dir=self.working_dir,
)
new_target.inherit_options(self.defaults)
self._add_target(new_target)
return new_target
[docs] def target_from_template(self, name, template, **options):
"""Create a target from a template and add it to the :class:`gwf.Workflow`.
This is syntactic sugar for creating a new :class:`~gwf.Target` and
adding it to the workflow. The target is also returned from the method
so that the user can directly manipulate it, if necessary.
.. code-block:: python
workflow = Workflow()
workflow.target_from_template('NewTarget', my_template())
This will create a new target named `NewTarget`, configure it based
on the specification in the template `my_template`, and
add it to the workflow.
:param str name:
Name of the target.
:param AnonymousTarget template:
The anonymous target which describes the template.
Any further keyword arguments are passed to the backend and will
override any options provided by the template.
"""
new_target = Target(
name=name,
inputs=template.inputs,
outputs=template.outputs,
options=options,
working_dir=template.working_dir or self.working_dir,
spec=template.spec,
)
new_target.inherit_options(template.options)
new_target.inherit_options(self.defaults)
self._add_target(new_target)
return new_target
[docs] def map(self, template_func, inputs, extra=None, name=None, **kwargs):
"""Add targets to the workflow given a template and a list of inputs.
This method accepts a template function and an iterable of inputs. For
each item in `inputs` it produces a target using the template function
and adds the target to this workflow.
For example, given this template:
.. code-block::
def copy_file(from_file):
inputs = {'from_file': from_file}
outputs = {'to_file': to_file + '.copy'}
options = {}
spec = "cp {from_file} {to_file}".format(from_file, to_file)
return AnonymousTarget(
inputs=inputs,
outputs=outputs,
options=options,
spec=spec
)
and this list of files:
.. code-block::
files = ['file1', 'file2', 'file3']
we can generate targets to copy all three files:
.. code-block::
gwf = Workflow()
res = gwf.map(copy_file, files)
The :func:`map` method returns a :class:`TargetList` which contains the
generated targets.
:param template_func:
A function or callable class instance that returns an
:class:`AnonymousTarget`. Essentially a *template function*.
:param iterable inputs:
An iterable of inputs for the generated targets. This can be an
iterable of strings, tuples or dictionaries.
:param mapping extra:
A mapping of extra keyword arguments to be passed to the template.
:param name: Must be either `None`, a string or a function.
If `None` is given, the name of each target will be generated from
the name of the template and an index.
If a string is given, e.g. `foo`, the generated names will be
`foo_0`, `foo_1`, etc.
If a function is given, it must have the signature
`f(idx, target)` where `idx` is the index and `target` is the
:class:`AnonymousTarget` returned by the template. The function
must return the name to assign to the target as a string.
Any remaining keyword arguments will be passed directly to
:func:`target_from_template` and thus override template-specified
target options.
"""
if extra is None:
extra = {}
if not (
callable(template_func)
and (
hasattr(template_func, "__name__")
or hasattr(template_func, "__class__")
)
):
raise ValueError(
"Argument `template_func` must be a function or a callable class instance."
)
def template_namer(idx, target):
if hasattr(template_func, "__name__"):
name = template_func.__name__
else:
name = template_func.__class__.__name__
return "{name}_{idx}".format(name=name, idx=idx)
def string_namer(idx, target):
return "{name}_{idx}".format(name=name, idx=idx)
if name is None:
name_func = template_namer
elif isinstance(name, str):
name_func = string_namer
else:
name_func = name
targets = TargetList()
for idx, args in enumerate(inputs):
if isinstance(args, collections.abc.Mapping):
template = template_func(**args, **extra)
elif isinstance(args, collections.abc.Iterable) and not isinstance(
args, str
):
template = template_func(*args, **extra)
else:
template = template_func(args, **extra)
target_name = name_func(idx, template)
target = self.target_from_template(
name=target_name, template=template, **kwargs
)
targets.append(target)
return targets
def __repr__(self):
return "{}(name={!r}, working_dir={!r})".format(
self.__class__.__name__, self.name, self.working_dir
)