Coverage for src/gridtk/tools.py: 22%
121 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
4"""Functions that replace shell-based utilities for grid submission and
5probing."""
7from __future__ import annotations
9import logging
10import math
11import os
12import re
13import shlex
15logger = logging.getLogger(__name__)
17# Constant regular expressions
18QSTAT_FIELD_SEPARATOR = re.compile(":\\s+")
20# Name of the user configuration file at $XDG_CONFIG_HOME
21USER_CONFIGURATION = "gridtk.toml"
24def str_(v: str | bytes) -> str:
25 """Always returns the string representation of the given ``name``.
27 If it is a bytes object, it will be converted into str.
29 If it is a str object, it will simply be resurned.
32 Parameters:
34 v: a bytes or str object
37 Returns:
39 A str object
40 """
41 return v if not isinstance(v, bytes) else v.decode()
44def qsub(
45 command,
46 queue=None,
47 cwd=True,
48 name=None,
49 deps=[],
50 stdout="",
51 stderr="",
52 env=[],
53 array=None,
54 context="grid",
55 hostname=None,
56 memfree=None,
57 hvmem=None,
58 gpumem=None,
59 pe_opt=None,
60 io_big=False,
61 sge_extra_args="",
62):
63 """Submits a shell job to a given grid queue.
65 Keyword parameters:
67 command
68 The command to be submitted to the grid
70 queue
71 A valid queue name or None, to use the default queue
73 cwd
74 If the job should change to the current working directory before starting
76 name
77 An optional name to set for the job. If not given, defaults to the script
78 name being launched.
80 deps
81 Job ids to which this job will be dependent on
83 stdout
84 The standard output directory. If not given, defaults to what qsub has as a
85 default.
87 stderr
88 The standard error directory (if not given, defaults to the stdout
89 directory).
91 env
92 This is a list of extra variables that will be set on the environment
93 running the command of your choice.
95 array
96 If set should be either:
98 1. a string in the form m[-n[:s]] which indicates the starting range 'm',
99 the closing range 'n' and the step 's'.
100 2. an integer value indicating the total number of jobs to be submitted.
101 This is equivalent ot set the parameter to a string "1-k:1" where "k" is
102 the passed integer value
103 3. a tuple that contains either 1, 2 or 3 elements indicating the start,
104 end and step arguments ("m", "n", "s").
106 The minimum value for "m" is 1. Giving "0" is an error.
108 If submitted with this option, the job to be created will be an SGE
109 parametric job. In this mode SGE does not allow individual control of each
110 job. The environment variable SGE_TASK_ID will be set on the executing
111 process automatically by SGE and indicates the unique identifier in the
112 range for which the current job instance is for.
114 context
115 The setshell context in which we should try a 'qsub'. Normally you don't
116 need to change the default. This variable can also be set to a context
117 dictionary in which case we just setup using that context instead of
118 probing for a new one, what can be fast.
120 memfree
121 If set, it asks the queue for a node with a minimum amount of memory
122 Used only if mem is not set
123 (cf. qsub -l mem_free=<...>)
125 hvmem
126 If set, it asks the queue for a node with a minimum amount of memory
127 Used only if mem is not set
128 (cf. qsub -l h_vmem=<...>)
130 gpumem
131 Applicable only for GPU-based queues. If set, it asks for the GPU queue
132 with a minimum amount of memory. The amount should not be more than 24.
133 (cf. qsub -l gpumem=<...>)
135 hostname
136 If set, it asks the queue to use only a subset of the available nodes
137 Symbols: | for OR, & for AND, ! for NOT, etc.
138 (cf. qsub -l hostname=<...>)
140 pe_opt
141 If set, add a -pe option when launching a job (for instance pe_exclusive* 1-)
143 io_big
144 If set to true, the io_big flag will be set.
145 Use this flag if your process will need a lot of Input/Output operations.
147 sge_extra_args
148 This is used to send extra argument to SGE. Note that all its arguments are directly
149 used in `qsub` command. For example, `jman submit -e "-P project_name -l pytorch=true" -- ...` will
150 be translated to `qsub -P project_name -l pytorch=true -- ...`
153 Returns the job id assigned to this job (integer)
154 """
155 from clapper.rc import UserDefaults
157 scmd = ["qsub"]
159 defaults = UserDefaults(USER_CONFIGURATION)
161 prepend = defaults.get("sge-extra-args-prepend", "")
162 sge_extra_args = f"{prepend} {sge_extra_args or ''}"
163 scmd += shlex.split(sge_extra_args)
165 if isinstance(queue, (str, bytes)) and queue not in (
166 "all.q",
167 "default",
168 ):
169 scmd += ["-l", queue]
171 if memfree:
172 scmd += ["-l", "mem_free=%s" % memfree]
173 if hvmem:
174 scmd += ["-l", "h_vmem=%s" % hvmem]
176 if gpumem:
177 scmd += ["-l", "gpumem=%s" % gpumem]
179 if io_big:
180 scmd += ["-l", "io_big"]
182 if hostname:
183 scmd += ["-l", "hostname=%s" % hostname]
185 if pe_opt:
186 scmd += ["-pe"] + pe_opt.split()
188 if cwd:
189 scmd += ["-cwd"]
191 if name:
192 scmd += ["-N", name]
194 if deps:
195 scmd += ["-hold_jid", ",".join(["%d" % k for k in deps])]
197 if stdout:
198 if not cwd:
199 # pivot, temporarily, to home directory
200 curdir = os.path.realpath(os.curdir)
201 os.chdir(os.environ["HOME"])
203 if not os.path.exists(stdout):
204 os.makedirs(stdout, exist_ok=True)
206 if not cwd:
207 # go back
208 os.chdir(os.path.realpath(curdir))
210 scmd += ["-o", stdout]
212 if stderr:
213 os.makedirs(stderr, exist_ok=True)
214 scmd += ["-e", stderr]
215 elif stdout: # just re-use the stdout settings
216 scmd += ["-e", stdout]
218 scmd += ["-terse"] # simplified job identifiers returned by the command line
220 for k in env:
221 scmd += ["-v", k]
223 if array is not None:
224 scmd.append("-t")
225 if isinstance(array, (str, bytes)):
226 try:
227 i = int(array)
228 scmd.append("1-%d:1" % i)
229 except ValueError:
230 # must be complete...
231 scmd.append("%s" % array)
232 if isinstance(array, int):
233 scmd.append("1-%d:1" % array)
234 if isinstance(array, (tuple, list)):
235 if len(array) < 1 or len(array) > 3:
236 raise RuntimeError("Array tuple should have length between 1 and 3")
237 elif len(array) == 1:
238 scmd.append("%s" % array[0])
239 elif len(array) == 2:
240 scmd.append(f"{array[0]}-{array[1]}")
241 elif len(array) == 3:
242 scmd.append(f"{array[0]}-{array[1]}:{array[2]}")
244 if not isinstance(command, (list, tuple)):
245 command = [command]
246 scmd += command
248 logger.debug("Qsub command '%s'", " ".join(scmd))
250 from .setshell import sexec
252 jobid = str_(sexec(context, scmd))
253 return int(jobid.split("\n")[-1].split(".", 1)[0])
256def make_shell(shell, command):
257 """Returns a single command given a shell and a command to be qsub'ed.
259 Keyword parameters:
261 shell
262 The path to the shell to use when submitting the job.
264 command
265 The script path to be submitted
267 Returns the command parameters to be supplied to qsub()
268 """
269 return ("-S", shell) + tuple(command)
272def qstat(jobid, context="grid"):
273 """Queries status of a given job.
275 Keyword parameters:
277 jobid
278 The job identifier as returned by qsub()
280 context
281 The setshell context in which we should try a 'qsub'. Normally you
282 don't
283 need to change the default. This variable can also be set to a
284 context
285 dictionary in which case we just setup using that context instead of
286 probing for a new one, what can be fast.
288 Returns a dictionary with the specific job properties
289 """
290 scmd = ["qstat", "-j", "%d" % jobid, "-f"]
292 logger.debug("Qstat command '%s'", " ".join(scmd))
294 from .setshell import sexec
296 data = str_(sexec(context, scmd, error_on_nonzero=False))
298 # some parsing:
299 retval = {}
300 for line in data.split("\n"):
301 s = line.strip()
302 if s.lower().find("do not exist") != -1:
303 return {}
304 if not s or s.find(10 * "=") != -1:
305 continue
306 kv = QSTAT_FIELD_SEPARATOR.split(s, 1)
307 if len(kv) == 2:
308 retval[kv[0]] = kv[1]
310 return retval
313def qdel(jobid: int, context: str = "grid") -> None:
314 """Halts a given job.
316 Parameters:
318 jobid: The job identifier as returned by :py:func:`qsub`
320 context: The setshell context in which we should try a 'qsub'. Normally
321 you do not need to change the default. This variable can also be
322 set to a context dictionary in which case we just setup using that
323 context instead of probing for a new one, what can be fast.
324 """
325 scmd = ["qdel", f"{jobid}"]
327 logger.debug(f"qdel command '{' '.join(scmd)}'")
329 from .setshell import sexec
331 sexec(context, scmd, error_on_nonzero=False)
334def get_array_job_slice(total_length: int) -> slice:
335 """A helper function that let's you chunk a list in an SGE array job.
337 Use this function like ``a = a[get_array_job_slice(len(a))]`` to only
338 process a chunk of ``a``.
341 Parameters:
343 total_length: The length of the list that you are trying to slice
346 Returns:
348 A slice to be used.
351 Raises:
353 NotImplementedError: If "SGE_TASK_FIRST" and "SGE_TASK_STEPSIZE" are
354 not 1.
355 """
356 sge_task_id = os.environ.get("SGE_TASK_ID")
358 if sge_task_id is None:
359 return slice(None)
361 try:
362 sge_task_int = int(sge_task_id)
363 except Exception:
364 return slice(None)
366 if os.environ["SGE_TASK_FIRST"] != "1" or os.environ["SGE_TASK_STEPSIZE"] != "1":
367 raise NotImplementedError(
368 "Values other than 1 for SGE_TASK_FIRST and SGE_TASK_STEPSIZE is not supported!"
369 )
371 job_id = sge_task_int - 1
373 number_of_parallel_jobs = int(os.environ["SGE_TASK_LAST"])
374 number_of_objects_per_job = int(math.ceil(total_length / number_of_parallel_jobs))
376 start = min(job_id * number_of_objects_per_job, total_length)
377 end = min((job_id + 1) * number_of_objects_per_job, total_length)
379 return slice(start, end)