Source code for gridtk.tools

#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 09:26:46 CEST

"""Functions that replace the shell based utilities for the grid submission and
probing.
"""

# initialize the logging system
import logging
import math
import os
import re
import shlex

logger = logging.getLogger("gridtk")

# Constant regular expressions
QSTAT_FIELD_SEPARATOR = re.compile(":\\s+")


[docs]def makedirs_safe(fulldir): """Creates a directory if it does not exists. Takes into consideration concurrent access support. Works like the shell's 'mkdir -p'. """ os.makedirs(fulldir, exist_ok=True)
[docs]def str_(name): """Return the string representation of the given 'name'. If it is a bytes object, it will be converted into str. If it is a str object, it will simply be resurned.""" if isinstance(name, bytes) and not isinstance(name, str): return name.decode("utf8") else: return name
[docs]def qsub( command, queue=None, cwd=True, name=None, deps=[], stdout="", stderr="", env=[], array=None, context="grid", hostname=None, memfree=None, hvmem=None, gpumem=None, pe_opt=None, io_big=False, sge_extra_args="", ): """Submits a shell job to a given grid queue Keyword parameters: command The command to be submitted to the grid queue A valid queue name or None, to use the default queue cwd If the job should change to the current working directory before starting name An optional name to set for the job. If not given, defaults to the script name being launched. deps Job ids to which this job will be dependent on stdout The standard output directory. If not given, defaults to what qsub has as a default. stderr The standard error directory (if not given, defaults to the stdout directory). env This is a list of extra variables that will be set on the environment running the command of your choice. array If set should be either: 1. a string in the form m[-n[:s]] which indicates the starting range 'm', the closing range 'n' and the step 's'. 2. an integer value indicating the total number of jobs to be submitted. This is equivalent ot set the parameter to a string "1-k:1" where "k" is the passed integer value 3. a tuple that contains either 1, 2 or 3 elements indicating the start, end and step arguments ("m", "n", "s"). The minimum value for "m" is 1. Giving "0" is an error. If submitted with this option, the job to be created will be an SGE parametric job. In this mode SGE does not allow individual control of each job. The environment variable SGE_TASK_ID will be set on the executing process automatically by SGE and indicates the unique identifier in the range for which the current job instance is for. context The setshell context in which we should try a 'qsub'. Normally you don't need to change the default. This variable can also be set to a context dictionary in which case we just setup using that context instead of probing for a new one, what can be fast. memfree If set, it asks the queue for a node with a minimum amount of memory Used only if mem is not set (cf. qsub -l mem_free=<...>) hvmem If set, it asks the queue for a node with a minimum amount of memory Used only if mem is not set (cf. qsub -l h_vmem=<...>) gpumem Applicable only for GPU-based queues. If set, it asks for the GPU queue with a minimum amount of memory. The amount should not be more than 24. (cf. qsub -l gpumem=<...>) hostname If set, it asks the queue to use only a subset of the available nodes Symbols: | for OR, & for AND, ! for NOT, etc. (cf. qsub -l hostname=<...>) pe_opt If set, add a -pe option when launching a job (for instance pe_exclusive* 1-) io_big If set to true, the io_big flag will be set. Use this flag if your process will need a lot of Input/Output operations. sge_extra_args This is used to send extra argument to SGE. Note that all its arguments are directly used in `qsub` command. For example, `jman submit -e "-P project_name -l pytorch=true" -- ...` will be translated to `qsub -P project_name -l pytorch=true -- ...` Returns the job id assigned to this job (integer) """ import six from bob.extension import rc scmd = ["qsub"] prepend = rc.get("gridtk.sge.extra.args.prepend") or "" sge_extra_args = f"{prepend} {sge_extra_args or ''}" scmd += shlex.split(sge_extra_args) if isinstance(queue, six.string_types) and queue not in ( "all.q", "default", ): scmd += ["-l", queue] if memfree: scmd += ["-l", "mem_free=%s" % memfree] if hvmem: scmd += ["-l", "h_vmem=%s" % hvmem] if gpumem: scmd += ["-l", "gpumem=%s" % gpumem] if io_big: scmd += ["-l", "io_big"] if hostname: scmd += ["-l", "hostname=%s" % hostname] if pe_opt: scmd += ["-pe"] + pe_opt.split() if cwd: scmd += ["-cwd"] if name: scmd += ["-N", name] if deps: scmd += ["-hold_jid", ",".join(["%d" % k for k in deps])] if stdout: if not cwd: # pivot, temporarily, to home directory curdir = os.path.realpath(os.curdir) os.chdir(os.environ["HOME"]) if not os.path.exists(stdout): makedirs_safe(stdout) if not cwd: # go back os.chdir(os.path.realpath(curdir)) scmd += ["-o", stdout] if stderr: if not os.path.exists(stderr): makedirs_safe(stderr) scmd += ["-e", stderr] elif stdout: # just re-use the stdout settings scmd += ["-e", stdout] scmd += [ "-terse" ] # simplified job identifiers returned by the command line for k in env: scmd += ["-v", k] if array is not None: scmd.append("-t") if isinstance(array, six.string_types): try: i = int(array) scmd.append("1-%d:1" % i) except ValueError: # must be complete... scmd.append("%s" % array) if isinstance(array, six.integer_types): scmd.append("1-%d:1" % array) if isinstance(array, (tuple, list)): if len(array) < 1 or len(array) > 3: raise RuntimeError( "Array tuple should have length between 1 and 3" ) elif len(array) == 1: scmd.append("%s" % array[0]) elif len(array) == 2: scmd.append("%s-%s" % (array[0], array[1])) elif len(array) == 3: scmd.append("%s-%s:%s" % (array[0], array[1], array[2])) if not isinstance(command, (list, tuple)): command = [command] scmd += command logger.debug("Qsub command '%s'", " ".join(scmd)) from .setshell import sexec jobid = str_(sexec(context, scmd)) return int(jobid.split("\n")[-1].split(".", 1)[0])
[docs]def make_shell(shell, command): """Returns a single command given a shell and a command to be qsub'ed Keyword parameters: shell The path to the shell to use when submitting the job. command The script path to be submitted Returns the command parameters to be supplied to qsub() """ return ("-S", shell) + tuple(command)
[docs]def qstat(jobid, context="grid"): """Queries status of a given job. Keyword parameters: jobid The job identifier as returned by qsub() context The setshell context in which we should try a 'qsub'. Normally you don't need to change the default. This variable can also be set to a context dictionary in which case we just setup using that context instead of probing for a new one, what can be fast. Returns a dictionary with the specific job properties """ scmd = ["qstat", "-j", "%d" % jobid, "-f"] logger.debug("Qstat command '%s'", " ".join(scmd)) from .setshell import sexec data = str_(sexec(context, scmd, error_on_nonzero=False)) # some parsing: retval = {} for line in data.split("\n"): s = line.strip() if s.lower().find("do not exist") != -1: return {} if not s or s.find(10 * "=") != -1: continue kv = QSTAT_FIELD_SEPARATOR.split(s, 1) if len(kv) == 2: retval[kv[0]] = kv[1] return retval
[docs]def qdel(jobid, context="grid"): """Halts a given job. Keyword parameters: jobid The job identifier as returned by qsub() context The setshell context in which we should try a 'qsub'. Normally you don't need to change the default. This variable can also be set to a context dictionary in which case we just setup using that context instead of probing for a new one, what can be fast. """ scmd = ["qdel", "%d" % jobid] logger.debug("Qdel command '%s'", " ".join(scmd)) from .setshell import sexec sexec(context, scmd, error_on_nonzero=False)
[docs]def get_array_job_slice(total_length): """A helper function that let's you chunk a list in an SGE array job. Use this function like ``a = a[get_array_job_slice(len(a))]`` to only process a chunk of ``a``. Parameters ---------- total_length : int The length of the list that you are trying to slice Returns ------- slice A slice to be used. Raises ------ NotImplementedError If "SGE_TASK_FIRST" and "SGE_TASK_STEPSIZE" are not 1. """ sge_task_id = os.environ.get("SGE_TASK_ID") try: sge_task_id = int(sge_task_id) except Exception: return slice(None) if ( os.environ["SGE_TASK_FIRST"] != "1" or os.environ["SGE_TASK_STEPSIZE"] != "1" ): raise NotImplementedError( "Values other than 1 for SGE_TASK_FIRST and SGE_TASK_STEPSIZE is not supported!" ) job_id = sge_task_id - 1 number_of_parallel_jobs = int(os.environ["SGE_TASK_LAST"]) number_of_objects_per_job = int( math.ceil(total_length / number_of_parallel_jobs) ) start = min(job_id * number_of_objects_per_job, total_length) end = min((job_id + 1) * number_of_objects_per_job, total_length) return slice(start, end)