import collections
import collections.abc
import inspect
import logging
import os
import os.path
import subprocess
import sys
from glob import glob as _glob
from glob import iglob as _iglob
from pathlib import Path
import attrs
from .core import Target
from .exceptions import WorkflowError
from .utils import chain, find_workflow, load_workflow
logger = logging.getLogger(__name__)
def select(lst, fields):
"""Select fields from an iterable of dictionaries.
Given an iterable of dictionaries and an iterable of key names, selects the
given key names from the dictionaries and returns an iterable of
dictionaries containing only those keys.
For example, given the list::
>>> lst = [
... {'A': 'a1', 'B': 'b1'},
... {'A': 'a2', 'B': 'b2'},
... {'A': 'a3', 'B': 'b3'},
... ]
We can select only the `A` keys from the dictionaries with::
>>> list(select(lst, ['A']))
[{'A': 'a1'}, {'A': 'a2'}, {'A': 'a3'}]
:param iterable lst:
An iterable of dictionaries.
:param iterable fields:
An iterable of key names.
"""
fields = tuple(fields)
for item in lst:
dct = {}
for name in fields:
dct[name] = item[name]
yield dct
def collect(lst, fields, rename=None):
"""Collect values from an iterable of dictionaries into a dictionary.
Given an iterable of dictionaries and an iterable of key names, collect the
value of each field name in `fields` and return a dictionary of lists for
each field.
For example, given the list:
>>> lst = [
... {'A': 'a1', 'B': 'b1'},
... {'A': 'a2', 'B': 'b2'},
... {'A': 'a3', 'B': 'b3'},
... ]
We can collect the values into a dictionary of lists with::
>>> collect(lst, ['A'])
{'As': ['a1', 'a2', 'a3']}
:param iterable lst:
An iterable of dictionaries.
:param iterable fields:
An iterable of key names.
"""
fields = tuple(fields)
if rename is None:
rename = {}
selected = collections.defaultdict(list)
for item in lst:
for name in fields:
selected_name = rename.get(name, name + "s")
selected[selected_name].append(item[name])
return dict(selected)
[docs]
@attrs.define
class TargetList(list):
"""A list of target objects with access to all inputs and outputs.
This is a thin wrapper around a normal list and thus provides all normal
``list`` methods. However, it provides access to the collective
inputs and outputs of the targets contained in the list.
"""
@property
def outputs(self):
"""Return a list of the outputs of all targets.
The returned list may be a list of strings, lists or dictionaries
depending on the form of the outputs of the contained targets.
"""
return [target.outputs for target in self]
@property
def inputs(self):
"""Return a list of the inputs of all targets.
The returned list may be a list of strings, lists or dictionaries
depending on the form of the inputs of the contained targets.
"""
return [target.inputs for target in self]
[docs]
@attrs.define
class Workflow:
"""Represents a workflow.
This is the most central user-facing abstraction in *gwf*.
A workflow consists of a collection of targets and has methods for adding
targets to the workflow in two different ways. A workflow can be
initialized with the following arguments:
:ivar str working_dir:
The directory containing the file where the workflow was initialized.
All file paths used in targets added to this workflow are relative to
the working directory.
:ivar dict defaults:
A dictionary with defaults for target options.
By default, *working_dir* is set to the directory of the workflow file
which initialized the workflow. However, advanced users may wish to set it
manually. Targets added to the workflow will inherit the workflow working
directory.
The *defaults* argument is a dictionary of option defaults for targets and
overrides defaults provided by the backend. Targets can override the
defaults individually. For example::
gwf = Workflow(defaults={
'cores': 12,
'memory': '16g',
})
gwf.target('Foo', inputs=[], outputs=[]) << \"\"\"echo hello\"\"\"
gwf.target('Bar', inputs=[], outputs=[], cores=2) << \"\"\"echo world\"\"\"
In this case `Foo` and `Bar` inherit the `cores` and `memory` options set
in `defaults`, but `Bar` overrides the `cores` option.
"""
working_dir: str = attrs.field()
defaults: dict = attrs.field(factory=dict)
targets: dict = attrs.field(factory=dict, init=False, repr=False)
@working_dir.default # type: ignore
def _get_working_dir(self):
# Get the frame object of whatever called the Workflow.__init__
# and extract the path of the file which is was defined in. Then
# normalize the path and get the directory of the file.
filename = inspect.getfile(sys._getframe(2))
return os.path.dirname(os.path.realpath(filename))
@classmethod
def from_parsed_path(cls, path: Path, obj="gwf"):
return load_workflow(path, obj)
[docs]
@classmethod
def from_path(cls, path):
"""Return workflow object for the workflow given by `path`.
Returns a :class:`~gwf.Workflow` object containing the workflow object
of the workflow given by `path`.
:arg str path: Path to a workflow file, optionally specifying a
workflow object in that file.
"""
path, obj = find_workflow(path)
return cls.from_parsed_path(path, obj)
[docs]
@classmethod
def from_context(cls, ctx):
"""Return workflow object for the workflow specified by `ctx`.
See :func:`Workflow.from_path` for further information.
"""
return cls.from_parsed_path(ctx.workflow_file, ctx.workflow_obj)
def _add_target(self, target):
if target.name in self.targets:
raise WorkflowError(f"Target {target} already exists in workflow.")
self.targets[target.name] = target
[docs]
def target(self, name, inputs, outputs, protect=None, group=None, **options):
"""Create a target and add it to the :class:`gwf.Workflow`.
This is syntactic sugar for creating a new :class:`~gwf.Target` and
adding it to the workflow. The target is also returned from the method
so that the user can directly manipulate it, if necessary. For example,
this allows assigning a spec to a target directly after defining it::
workflow = Workflow()
workflow.target('NewTarget', inputs=['test.txt', 'out.txt']) <<< '''
cat test.txt > out.txt
echo hello world >> out.txt
'''
This will create a new target named `NewTarget`, add it to the workflow
and assign a spec to the target.
:param str name: Name of the target.
:param iterable inputs: List of files that this target depends on.
:param iterable outputs: List of files that this target produces.
Any further keyword arguments are passed to the backend.
"""
new_target = Target(
name=name,
inputs=inputs,
outputs=outputs,
protect=protect if protect else set(),
group=group,
options=chain(self.defaults, options),
working_dir=self.working_dir,
)
self._add_target(new_target)
return new_target
[docs]
def target_from_template(self, name, template, **options):
"""Create a target from a template and add it to the :class:`gwf.Workflow`.
This is syntactic sugar for creating a new :class:`~gwf.Target` and
adding it to the workflow. The target is also returned from the method
so that the user can directly manipulate it, if necessary.
.. code-block:: python
workflow = Workflow()
workflow.target_from_template('NewTarget', my_template())
This will create a new target named `NewTarget`, configure it based
on the specification in the template `my_template`, and
add it to the workflow.
:param str name:
Name of the target.
:param AnonymousTarget template:
The anonymous target which describes the template.
Any further keyword arguments are passed to the backend and will
override any options provided by the template.
"""
new_target = Target(
name=name,
group=template.group,
inputs=template.inputs,
outputs=template.outputs,
protect=template.protect,
options=chain(self.defaults, template.options, options),
working_dir=template.working_dir or self.working_dir,
spec=template.spec,
)
self._add_target(new_target)
return new_target
[docs]
def map(self, template_func, inputs, extra=None, name=None, **kwargs):
"""Add targets to the workflow given a template and a list of inputs.
This method accepts a template function and an iterable of inputs. For
each item in `inputs` it produces a target using the template function
and adds the target to this workflow.
For example, given this template:
.. code-block::
def copy_file(from_file):
inputs = {'from_file': from_file}
outputs = {'to_file': to_file + '.copy'}
options = {}
spec = f"cp {inputs[from_file]} {outputs[to_file]}"
return AnonymousTarget(
inputs=inputs,
outputs=outputs,
options=options,
spec=spec
)
and this list of files:
.. code-block::
files = ['file1', 'file2', 'file3']
we can generate targets to copy all three files:
.. code-block::
gwf = Workflow()
res = gwf.map(copy_file, files)
The :func:`map` method returns a :class:`TargetList` which contains the
generated targets.
:param template_func:
A function or callable class instance that returns an
:class:`AnonymousTarget`. Essentially a *template function*.
:param iterable inputs:
An iterable of inputs for the generated targets. This can be an
iterable of strings, tuples or dictionaries.
:param mapping extra:
A mapping of extra keyword arguments to be passed to the template.
:param name: Must be either `None`, a string or a function.
If `None` is given, the name of each target will be generated from
the name of the template and an index.
If a string is given, e.g. `foo`, the generated names will be
`foo_0`, `foo_1`, etc.
If a function is given, it must have the signature
`f(idx, target)` where `idx` is the index and `target` is the
:class:`AnonymousTarget` returned by the template. The function
must return the name to assign to the target as a string.
Any remaining keyword arguments will be passed directly to
:func:`target_from_template` and thus override template-specified
target options.
"""
if extra is None:
extra = {}
if not (
callable(template_func)
and (
hasattr(template_func, "__name__")
or hasattr(template_func, "__class__")
)
):
raise ValueError(
"Argument `template_func` must be a function or a callable "
"class instance."
)
def template_namer(idx, target):
if hasattr(template_func, "__name__"):
name = template_func.__name__
else:
name = template_func.__class__.__name__
return "{name}_{idx}".format(name=name, idx=idx)
def string_namer(idx, target):
return "{name}_{idx}".format(name=name, idx=idx)
if name is None:
name_func = template_namer
elif isinstance(name, str):
name_func = string_namer
else:
name_func = name
targets = TargetList()
for idx, args in enumerate(inputs):
if isinstance(args, collections.abc.Mapping):
template = template_func(**args, **extra)
elif isinstance(args, collections.abc.Iterable) and not isinstance(
args, str
):
template = template_func(*args, **extra)
else:
template = template_func(args, **extra)
target_name = name_func(idx, template)
target = self.target_from_template(
name=target_name, template=template, **kwargs
)
targets.append(target)
return targets
[docs]
def glob(self, pathname, *args, **kwargs):
"""Return a list of paths matching `pathname`.
This method is equivalent to :func:`python:glob.glob`, but searches with
relative paths will be performed relative to the working directory
of the workflow.
"""
if not os.path.isabs(pathname):
pathname = os.path.join(self.working_dir, pathname)
return _glob(pathname, *args, **kwargs)
[docs]
def iglob(self, pathname, *args, **kwargs):
"""Return an iterator which yields paths matching `pathname`.
This method is equivalent to :func:`python:glob.iglob`, but searches with
relative paths will be performed relative to the working directory
of the workflow.
"""
if not os.path.isabs(pathname):
pathname = os.path.join(self.working_dir, pathname)
return _iglob(pathname, *args, **kwargs)
[docs]
def shell(self, *args, **kwargs):
"""Return the output of a shell command.
This method is equivalent to :func:`python:subprocess.check_output`, but
automatically runs the command in a shell with the current working
directory set to the working directory of the workflow.
.. versionchanged:: 1.0
This function no longer return a list of lines in the output, but a
byte array with the output, exactly like
:func:`python:subprocess.check_output`.
You may specifically set *universal_newlines* to `True` to get a
string with the output instead.
"""
return subprocess.check_output(
*args, shell=True, cwd=self.working_dir, **kwargs
)