API#

The implementation of gwf consists of a few main abstractions. Units of work are defined by creating Target instances which also define the files used and produced by the target. A Workflow ties together and allows for easy creation of targets.

When all targets have been defined on a workflow, the workflow is turned into a Graph which will compute the entire dependency graph of the workflow, checking the workflow for inconsistencies and circular dependencies.

A target in a Graph can be scheduled on a Backend using the submit_workflow.

Workflow#

gwf - a pragmatic workflow tool

class gwf.AnonymousTarget(inputs: list, outputs: list, options: dict, working_dir: str = '.', protect=_Nothing.NOTHING, spec: str = '')[source]#

Represents an unnamed target.

An anonymous target is an unnamed, abstract target much like the tuple returned by function templates. Thus, AnonymousTarget can also be used as the return value of a template function.

Variables:
  • inputs (list) – A string, list or dictionary containing inputs to the target.

  • outputs (list) – A string, list or dictionary containing outputs to the target.

  • options (dict) – Options such as number of cores, memory requirements etc. Options are backend-dependent. Backends will ignore unsupported options.

  • working_dir (str) – Working directory of this target.

  • spec (str) – The specification of the target.

  • protect (set) – An iterable of protected files which will not be removed during cleaning, even if this target is not an endpoint.

class gwf.Target(name: str, inputs: list, outputs: list, options: dict, working_dir: str = '.', protect=_Nothing.NOTHING, spec: str = '')[source]#

Represents a target.

This class inherits from AnonymousTarget.

A target is a named unit of work that declare their file inputs and outputs. Target names must be valid Python identifiers.

A script (or spec) is associated with the target. The script must be a valid Bash script and should produce the files declared as outputs and consume the files declared as inputs. Both parameters must be provided explicitly, even if no inputs or outputs are needed. In that case, provide the empty list:

Target('Foo', inputs=[], outputs=[], options={}, working_dir='/tmp')

The inputs and outputs arguments can either be a string, a list or a dictionary. If a dictionary is given, the keys act as names for the files. The values may be either strings or a list of strings:

foo = Target(
    name='foo',
    inputs={'A': ['a1', 'a2'], 'B': 'b'},
    outputs={'C': ['a1b', 'a2b'], 'D': 'd'},
)

This is useful for referring the outputs of a target:

bar = Target(
    name='bar',
    inputs=foo.outputs['C'],
    outputs='result',
)

The target can also specify an options dictionary specifying the resources needed to run the target. The options are consumed by the backend and may be ignored if the backend doesn’t support a given option. For example, we can set the cores option to set the number of cores that the target uses:

Target('Foo', inputs=[], outputs=[], options={'cores': 16}, working_dir='/tmp')

To see which options are supported by your backend of choice, see the documentation for the backend.

Variables:

name (str) – Name of the target.

Changed in version 1.6.0: Named inputs and outputs were added. Prior versions require inputs and outputs to be lists.

class gwf.TargetList[source]#

A list of target objects with access to all inputs and outputs.

This is a thin wrapper around a normal list and thus provides all normal list methods. However, it provides access to the collective inputs and outputs of the targets contained in the list.

property inputs#

Return a list of the inputs of all targets.

The returned list may be a list of strings, lists or dictionaries depending on the form of the inputs of the contained targets.

property outputs#

Return a list of the outputs of all targets.

The returned list may be a list of strings, lists or dictionaries depending on the form of the outputs of the contained targets.

class gwf.Workflow(working_dir: str = _Nothing.NOTHING, defaults: dict = _Nothing.NOTHING)[source]#

Represents a workflow.

This is the most central user-facing abstraction in gwf.

A workflow consists of a collection of targets and has methods for adding targets to the workflow in two different ways. A workflow can be initialized with the following arguments:

Variables:
  • working_dir (str) – The directory containing the file where the workflow was initialized. All file paths used in targets added to this workflow are relative to the working directory.

  • defaults (dict) – A dictionary with defaults for target options.

By default, working_dir is set to the directory of the workflow file which initialized the workflow. However, advanced users may wish to set it manually. Targets added to the workflow will inherit the workflow working directory.

The defaults argument is a dictionary of option defaults for targets and overrides defaults provided by the backend. Targets can override the defaults individually. For example:

gwf = Workflow(defaults={
    'cores': 12,
    'memory': '16g',
})

gwf.target('Foo', inputs=[], outputs=[]) << """echo hello"""
gwf.target('Bar', inputs=[], outputs=[], cores=2) << """echo world"""

In this case Foo and Bar inherit the cores and memory options set in defaults, but Bar overrides the cores option.

classmethod from_context(ctx)[source]#

Return workflow object for the workflow specified by ctx.

See Workflow.from_path() for further information.

classmethod from_path(path)[source]#

Return workflow object for the workflow given by path.

Returns a Workflow object containing the workflow object of the workflow given by path.

Parameters:

path (str) – Path to a workflow file, optionally specifying a workflow object in that file.

glob(pathname, *args, **kwargs)[source]#

Return a list of paths matching pathname.

This method is equivalent to glob.glob(), but searches with relative paths will be performed relative to the working directory of the workflow.

iglob(pathname, *args, **kwargs)[source]#

Return an iterator which yields paths matching pathname.

This method is equivalent to glob.iglob(), but searches with relative paths will be performed relative to the working directory of the workflow.

map(template_func, inputs, extra=None, name=None, **kwargs)[source]#

Add targets to the workflow given a template and a list of inputs.

This method accepts a template function and an iterable of inputs. For each item in inputs it produces a target using the template function and adds the target to this workflow.

For example, given this template:

def copy_file(from_file):
    inputs = {'from_file': from_file}
    outputs = {'to_file': to_file + '.copy'}
    options = {}
    spec = f"cp {inputs[from_file]} {outputs[to_file]}"
    return AnonymousTarget(
        inputs=inputs,
        outputs=outputs,
        options=options,
        spec=spec
    )

and this list of files:

files = ['file1', 'file2', 'file3']

we can generate targets to copy all three files:

gwf = Workflow()
res = gwf.map(copy_file, files)

The map() method returns a TargetList which contains the generated targets.

Parameters:
  • template_func – A function or callable class instance that returns an AnonymousTarget. Essentially a template function.

  • inputs (iterable) – An iterable of inputs for the generated targets. This can be an iterable of strings, tuples or dictionaries.

  • extra (mapping) – A mapping of extra keyword arguments to be passed to the template.

  • name

    Must be either None, a string or a function.

    If None is given, the name of each target will be generated from the name of the template and an index.

    If a string is given, e.g. foo, the generated names will be foo_0, foo_1, etc.

    If a function is given, it must have the signature f(idx, target) where idx is the index and target is the AnonymousTarget returned by the template. The function must return the name to assign to the target as a string.

Any remaining keyword arguments will be passed directly to target_from_template() and thus override template-specified target options.

shell(*args, **kwargs)[source]#

Return the output of a shell command.

This method is equivalent to subprocess.check_output(), but automatically runs the command in a shell with the current working directory set to the working directory of the workflow.

Changed in version 1.0: This function no longer return a list of lines in the output, but a byte array with the output, exactly like subprocess.check_output(). You may specifically set universal_newlines to True to get a string with the output instead.

target(name, inputs, outputs, protect=None, **options)[source]#

Create a target and add it to the gwf.Workflow.

This is syntactic sugar for creating a new Target and adding it to the workflow. The target is also returned from the method so that the user can directly manipulate it, if necessary. For example, this allows assigning a spec to a target directly after defining it:

workflow = Workflow()
workflow.target('NewTarget', inputs=['test.txt', 'out.txt']) <<< '''
cat test.txt > out.txt
echo hello world >> out.txt
'''

This will create a new target named NewTarget, add it to the workflow and assign a spec to the target.

Parameters:
  • name (str) – Name of the target.

  • inputs (iterable) – List of files that this target depends on.

  • outputs (iterable) – List of files that this target produces.

Any further keyword arguments are passed to the backend.

target_from_template(name, template, **options)[source]#

Create a target from a template and add it to the gwf.Workflow.

This is syntactic sugar for creating a new Target and adding it to the workflow. The target is also returned from the method so that the user can directly manipulate it, if necessary.

workflow = Workflow()
workflow.target_from_template('NewTarget', my_template())

This will create a new target named NewTarget, configure it based on the specification in the template my_template, and add it to the workflow.

Parameters:
  • name (str) – Name of the target.

  • template (AnonymousTarget) – The anonymous target which describes the template.

Any further keyword arguments are passed to the backend and will override any options provided by the template.

Core#

class gwf.core.Graph(targets: dict, provides: dict, dependencies: defaultdict, dependents: defaultdict, unresolved: set)[source]#

Represents a dependency graph for a set of targets.

The graph represents the targets present in a workflow, but also their dependencies and the files they provide.

During construction of the graph the dependencies between targets are determined by looking at target inputs and outputs. If a target specifies a file as input, the file must either be provided by another target or already exist on disk. In case that the file is provided by another target, a dependency to that target will be added:

Variables:

dependencies (dict) – A dictionary mapping a target to a set of its dependencies.

If the file is not provided by another target, the file is unresolved:

Variables:

unresolved (set) – A set containing file paths of all unresolved files.

If the graph is constructed successfully, the following instance variables will be available:

Variables:
  • targets (dict) – A dictionary mapping target names to instances of gwf.Target.

  • provides (dict) – A dictionary mapping a file path to the target that provides that path.

  • dependents (dict) – A dictionary mapping a target to a set of all targets which depend on the target.

The graph can be manipulated in arbitrary, diabolic ways after it has been constructed. Checks are only performed at construction-time, thus introducing e.g. a circular dependency by manipulating dependencies will not raise an exception.

dfs(root)[source]#

Return the depth-first traversal path through a graph from root.

endpoints()[source]#

Return a set of all targets that are not depended on by other targets.

classmethod from_targets(targets, fs)[source]#

Construct a dependency graph from a set of targets.

When a graph is initialized it computes all dependency relations between targets, ensuring that the graph is semantically sane. Therefore, construction of the graph is an expensive operation which may raise a number of exceptions:

Raises:
  • gwf.exceptions.FileProvidedByMultipleTargetsError – Raised if the same file is provided by multiple targets.

  • gwf.exceptions.CircularDependencyError – Raised if the graph contains a circular dependency.

Scheduling#

gwf.scheduling.get_status_map(graph, fs, spec_hashes, backend, endpoints=None)[source]#

Get the status of each targets in the graph.

gwf.scheduling.submit_workflow(endpoints, graph, fs, spec_hashes, backend, dry_run=False)[source]#

Submit a workflow to a backend.

Backends#

class gwf.backends.BackendStatus(value)[source]#

BackendStatus of a target.

A target is unknown to the backend if it has not been submitted or the target has completed and thus isn’t being tracked anymore by the backend.

A target is submitted if it has been successfully submitted to the backend and is pending execution.

A target is running if it is currently being executed by the backend.

CANCELLED = 5#
COMPLETED = 3#
FAILED = 4#
RUNNING = 2#
SUBMITTED = 1#
UNKNOWN = 0#
gwf.backends.create_backend(name, working_dir, config)[source]#

Return backend class for the backend given by name.

Returns the backend class registered with name. Note that the class is returned, not the instance, since not all uses requires initialization of the backend (e.g. accessing the backends’ log manager), and initialization of the backend may be expensive.

Parameters:

name (str) – Path to a workflow file, optionally specifying a workflow object in that file.

gwf.backends.discover_backends()[source]#
gwf.backends.guess_backend()[source]#
gwf.backends.list_backends()[source]#

Return the names of all registered backends.

Filtering#

gwf.filtering.filter_generic(targets, filters)[source]#

Filter targets given a list of filters.

Return all targets from targets passing all filters. For example:

matched_targets = filter_generic(
    targets=graph.targets.values(),
    filters=[
        NameFilter(patterns=['Foo*'],
        StatusFilter(scheduler=scheduler, status='running'),
    ]
)

returns a generator yielding all targets with a name matching Foo* which are currently running.

Parameters:
  • targets – A list of targets to be filtered.

  • filters – A list of Filter instances.

gwf.filtering.filter_names(targets, patterns)[source]#

Filter targets with a list of patterns.

Return all targets in targets where the target name matches one or more of the patterns in pattern. For example:

matched_targets = filter_names(graph.targets.values(), ['Foo*'])

returns a generator yielding all targets with a name matching the pattern Foo*. Multiple patterns can be provided:

matched_targets = filter_names(graph.targets.values(), ['Foo*', 'Bar*'])

returns all targets with a name matching either Foo* or Bar*.

This function is a simple wrapper around NameFilter.

Helpers for filtering:

class gwf.filtering.ApplyMixin[source]#

A mixin for predicate-based filters providing the apply method.

Most filters are predicate-based in the sense that they simply filter targets one by one based on a predicate function that decides whether to include the target or not. Such filters can inherit this mixin and then only need to declare a predicate() method which returns True if the target should be included and False otherwise.

For examples of using this mixin, see the StatusFilter and EndpointFilter filters.

apply(targets)[source]#

Apply the filter to all targets.

This method returns a generator yielding all targets in targets for each predicate() returns True.

predicate(target)[source]#

Return True if target should be included, False otherwise.

This method must be overriden by subclasses.