import logging
from collections import defaultdict
from ..conf import config
from ..utils import ensure_trailing_newline, retry
from .base import PbsLikeBackendBase, Status
from .exceptions import BackendError
from .utils import call
logger = logging.getLogger(__name__)
SLURM_JOB_STATES = defaultdict(
lambda: Status.UNKNOWN,
{
"CF": Status.RUNNING, # CONFIGURING
"CG": Status.RUNNING, # COMPLETING
"R": Status.RUNNING, # RUNNING
"S": Status.RUNNING, # SUSPENDED
"PD": Status.SUBMITTED, # PENDING
"SE": Status.SUBMITTED, # SPECIAL_EXIT
},
)
[docs]class SlurmBackend(PbsLikeBackendBase):
"""Backend for the Slurm workload manager.
To use this backend you must activate the `slurm` backend.
**Backend options:**
* **backend.slurm.log_mode (str):** Must be either `full`, `merged` or
`none`. If `full`, two log files will be stored for each target, one for
standard output and one for standard error. If `merged`, only one log
file will be written containing the combined streams. If `none`, no logs
will be stored. (default: `full`).
**Target options:**
* **cores (int):**
Number of cores allocated to this target (default: 1).
* **memory (str):**
Memory allocated to this target (default: 1).
* **walltime (str):**
Time limit for this target (default: 01:00:00).
* **queue (str):**
Queue to submit the target to. To specify multiple queues, specify a
comma-separated list of queue names. A queue is equivalent to a Slurm
partition.
* **account (str):**
Account to be used when running the target.
* **constraint (str):**
Constraint string. Equivalent to setting the `--constraint` flag on
`sbatch`.
* **qos (str):**
Quality-of-service string. Equivalent to setting the `--qos` flog
on `sbatch`.
* **mail_type (str):**
Equivalent to the `--mail-type` flag on `sbatch`.
* **mail_user (str):**
Equivalent to the `--mail-user` flag on `sbatch`.
* **mail_type (str):**
Account to be used when running the target.
* **gres (str):**
Equivalent to the `--gres` flog on `sbatch`. Usually used to
request access to GPUs.
"""
option_defaults = {
"cores": 1,
"memory": "1g",
"walltime": "01:00:00",
"nodes": None,
"queue": None,
"account": None,
"constraint": None,
"mail_type": None,
"mail_user": None,
"qos": None,
"gres": None,
}
option_flags = {
"nodes": "-N ",
"cores": "-c ",
"memory": "--mem=",
"walltime": "-t ",
"queue": "-p ",
"account": "-A ",
"constraint": "-C ",
"mail_type": "--mail-type=",
"mail_user": "--mail-user=",
"qos": "--qos=",
"gres": "--gres=",
}
option_str = "#SBATCH {0}{1}"
@retry(on_exc=BackendError)
def call_queue_command(self):
return call("squeue", "--noheader", "--format=%i;%t", "--all")
@retry(on_exc=BackendError)
def call_cancel_command(self, job_id):
# The --verbose flag here is necessary, otherwise we're not able to tell
# whether the command failed. See the comment in call() if you
# want to know more.
return call("scancel", "--verbose", job_id)
@retry(on_exc=BackendError)
def call_submit_command(self, script, dependencies):
args = ["--parsable"]
if dependencies:
args.append("--dependency=afterok:{}".format(":".join(dependencies)))
return call("sbatch", *args, input=script)
def parse_queue_output(self, stdout):
job_states = {}
for line in stdout.splitlines():
job_id, state = line.split(";")
job_states[job_id] = SLURM_JOB_STATES[state]
return job_states
def compile_script(self, target):
out = []
out.append("#!/bin/bash")
out.append("# Generated by: gwf")
out.append(self.option_str.format("--job-name=", target.name))
for option_name, option_value in target.options.items():
out.append(
self.option_str.format(self.option_flags[option_name], option_value)
)
log_mode = config.get("backend.slurm.log_mode", "full")
if log_mode == "full":
out.append(
self.option_str.format(
"--output=", self.log_manager.stdout_path(target)
)
)
out.append(
self.option_str.format("--error=", self.log_manager.stderr_path(target))
)
elif log_mode == "merged":
out.append(
self.option_str.format(
"--output=", self.log_manager.stdout_path(target)
)
)
elif log_mode == "none":
out.append(self.option_str.format("--output=", "/dev/null"))
out.append("")
out.append("cd {}".format(target.working_dir))
out.append("export GWF_JOBID=$SLURM_JOBID")
out.append('export GWF_TARGET_NAME="{}"'.format(target.name))
out.append("set -e")
out.append("")
out.append(ensure_trailing_newline(target.spec))
return "\n".join(out)