Coverage for src/gridtk/tools.py: 22%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Functions that replace shell-based utilities for grid submission and 

5probing.""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import math 

11import os 

12import re 

13import shlex 

14 

15logger = logging.getLogger(__name__) 

16 

17# Constant regular expressions 

18QSTAT_FIELD_SEPARATOR = re.compile(":\\s+") 

19 

20# Name of the user configuration file at $XDG_CONFIG_HOME 

21USER_CONFIGURATION = "gridtk.toml" 

22 

23 

24def str_(v: str | bytes) -> str: 

25 """Always returns the string representation of the given ``name``. 

26 

27 If it is a bytes object, it will be converted into str. 

28 

29 If it is a str object, it will simply be resurned. 

30 

31 

32 Parameters: 

33 

34 v: a bytes or str object 

35 

36 

37 Returns: 

38 

39 A str object 

40 """ 

41 return v if not isinstance(v, bytes) else v.decode() 

42 

43 

44def qsub( 

45 command, 

46 queue=None, 

47 cwd=True, 

48 name=None, 

49 deps=[], 

50 stdout="", 

51 stderr="", 

52 env=[], 

53 array=None, 

54 context="grid", 

55 hostname=None, 

56 memfree=None, 

57 hvmem=None, 

58 gpumem=None, 

59 pe_opt=None, 

60 io_big=False, 

61 sge_extra_args="", 

62): 

63 """Submits a shell job to a given grid queue. 

64 

65 Keyword parameters: 

66 

67 command 

68 The command to be submitted to the grid 

69 

70 queue 

71 A valid queue name or None, to use the default queue 

72 

73 cwd 

74 If the job should change to the current working directory before starting 

75 

76 name 

77 An optional name to set for the job. If not given, defaults to the script 

78 name being launched. 

79 

80 deps 

81 Job ids to which this job will be dependent on 

82 

83 stdout 

84 The standard output directory. If not given, defaults to what qsub has as a 

85 default. 

86 

87 stderr 

88 The standard error directory (if not given, defaults to the stdout 

89 directory). 

90 

91 env 

92 This is a list of extra variables that will be set on the environment 

93 running the command of your choice. 

94 

95 array 

96 If set should be either: 

97 

98 1. a string in the form m[-n[:s]] which indicates the starting range 'm', 

99 the closing range 'n' and the step 's'. 

100 2. an integer value indicating the total number of jobs to be submitted. 

101 This is equivalent ot set the parameter to a string "1-k:1" where "k" is 

102 the passed integer value 

103 3. a tuple that contains either 1, 2 or 3 elements indicating the start, 

104 end and step arguments ("m", "n", "s"). 

105 

106 The minimum value for "m" is 1. Giving "0" is an error. 

107 

108 If submitted with this option, the job to be created will be an SGE 

109 parametric job. In this mode SGE does not allow individual control of each 

110 job. The environment variable SGE_TASK_ID will be set on the executing 

111 process automatically by SGE and indicates the unique identifier in the 

112 range for which the current job instance is for. 

113 

114 context 

115 The setshell context in which we should try a 'qsub'. Normally you don't 

116 need to change the default. This variable can also be set to a context 

117 dictionary in which case we just setup using that context instead of 

118 probing for a new one, what can be fast. 

119 

120 memfree 

121 If set, it asks the queue for a node with a minimum amount of memory 

122 Used only if mem is not set 

123 (cf. qsub -l mem_free=<...>) 

124 

125 hvmem 

126 If set, it asks the queue for a node with a minimum amount of memory 

127 Used only if mem is not set 

128 (cf. qsub -l h_vmem=<...>) 

129 

130 gpumem 

131 Applicable only for GPU-based queues. If set, it asks for the GPU queue 

132 with a minimum amount of memory. The amount should not be more than 24. 

133 (cf. qsub -l gpumem=<...>) 

134 

135 hostname 

136 If set, it asks the queue to use only a subset of the available nodes 

137 Symbols: | for OR, & for AND, ! for NOT, etc. 

138 (cf. qsub -l hostname=<...>) 

139 

140 pe_opt 

141 If set, add a -pe option when launching a job (for instance pe_exclusive* 1-) 

142 

143 io_big 

144 If set to true, the io_big flag will be set. 

145 Use this flag if your process will need a lot of Input/Output operations. 

146 

147 sge_extra_args 

148 This is used to send extra argument to SGE. Note that all its arguments are directly 

149 used in `qsub` command. For example, `jman submit -e "-P project_name -l pytorch=true" -- ...` will 

150 be translated to `qsub -P project_name -l pytorch=true -- ...` 

151 

152 

153 Returns the job id assigned to this job (integer) 

154 """ 

155 from clapper.rc import UserDefaults 

156 

157 scmd = ["qsub"] 

158 

159 defaults = UserDefaults(USER_CONFIGURATION) 

160 

161 prepend = defaults.get("sge-extra-args-prepend", "") 

162 sge_extra_args = f"{prepend} {sge_extra_args or ''}" 

163 scmd += shlex.split(sge_extra_args) 

164 

165 if isinstance(queue, (str, bytes)) and queue not in ( 

166 "all.q", 

167 "default", 

168 ): 

169 scmd += ["-l", queue] 

170 

171 if memfree: 

172 scmd += ["-l", "mem_free=%s" % memfree] 

173 if hvmem: 

174 scmd += ["-l", "h_vmem=%s" % hvmem] 

175 

176 if gpumem: 

177 scmd += ["-l", "gpumem=%s" % gpumem] 

178 

179 if io_big: 

180 scmd += ["-l", "io_big"] 

181 

182 if hostname: 

183 scmd += ["-l", "hostname=%s" % hostname] 

184 

185 if pe_opt: 

186 scmd += ["-pe"] + pe_opt.split() 

187 

188 if cwd: 

189 scmd += ["-cwd"] 

190 

191 if name: 

192 scmd += ["-N", name] 

193 

194 if deps: 

195 scmd += ["-hold_jid", ",".join(["%d" % k for k in deps])] 

196 

197 if stdout: 

198 if not cwd: 

199 # pivot, temporarily, to home directory 

200 curdir = os.path.realpath(os.curdir) 

201 os.chdir(os.environ["HOME"]) 

202 

203 if not os.path.exists(stdout): 

204 os.makedirs(stdout, exist_ok=True) 

205 

206 if not cwd: 

207 # go back 

208 os.chdir(os.path.realpath(curdir)) 

209 

210 scmd += ["-o", stdout] 

211 

212 if stderr: 

213 os.makedirs(stderr, exist_ok=True) 

214 scmd += ["-e", stderr] 

215 elif stdout: # just re-use the stdout settings 

216 scmd += ["-e", stdout] 

217 

218 scmd += ["-terse"] # simplified job identifiers returned by the command line 

219 

220 for k in env: 

221 scmd += ["-v", k] 

222 

223 if array is not None: 

224 scmd.append("-t") 

225 if isinstance(array, (str, bytes)): 

226 try: 

227 i = int(array) 

228 scmd.append("1-%d:1" % i) 

229 except ValueError: 

230 # must be complete... 

231 scmd.append("%s" % array) 

232 if isinstance(array, int): 

233 scmd.append("1-%d:1" % array) 

234 if isinstance(array, (tuple, list)): 

235 if len(array) < 1 or len(array) > 3: 

236 raise RuntimeError("Array tuple should have length between 1 and 3") 

237 elif len(array) == 1: 

238 scmd.append("%s" % array[0]) 

239 elif len(array) == 2: 

240 scmd.append(f"{array[0]}-{array[1]}") 

241 elif len(array) == 3: 

242 scmd.append(f"{array[0]}-{array[1]}:{array[2]}") 

243 

244 if not isinstance(command, (list, tuple)): 

245 command = [command] 

246 scmd += command 

247 

248 logger.debug("Qsub command '%s'", " ".join(scmd)) 

249 

250 from .setshell import sexec 

251 

252 jobid = str_(sexec(context, scmd)) 

253 return int(jobid.split("\n")[-1].split(".", 1)[0]) 

254 

255 

256def make_shell(shell, command): 

257 """Returns a single command given a shell and a command to be qsub'ed. 

258 

259 Keyword parameters: 

260 

261 shell 

262 The path to the shell to use when submitting the job. 

263 

264 command 

265 The script path to be submitted 

266 

267 Returns the command parameters to be supplied to qsub() 

268 """ 

269 return ("-S", shell) + tuple(command) 

270 

271 

272def qstat(jobid, context="grid"): 

273 """Queries status of a given job. 

274 

275 Keyword parameters: 

276 

277 jobid 

278 The job identifier as returned by qsub() 

279 

280 context 

281 The setshell context in which we should try a 'qsub'. Normally you 

282 don't 

283 need to change the default. This variable can also be set to a 

284 context 

285 dictionary in which case we just setup using that context instead of 

286 probing for a new one, what can be fast. 

287 

288 Returns a dictionary with the specific job properties 

289 """ 

290 scmd = ["qstat", "-j", "%d" % jobid, "-f"] 

291 

292 logger.debug("Qstat command '%s'", " ".join(scmd)) 

293 

294 from .setshell import sexec 

295 

296 data = str_(sexec(context, scmd, error_on_nonzero=False)) 

297 

298 # some parsing: 

299 retval = {} 

300 for line in data.split("\n"): 

301 s = line.strip() 

302 if s.lower().find("do not exist") != -1: 

303 return {} 

304 if not s or s.find(10 * "=") != -1: 

305 continue 

306 kv = QSTAT_FIELD_SEPARATOR.split(s, 1) 

307 if len(kv) == 2: 

308 retval[kv[0]] = kv[1] 

309 

310 return retval 

311 

312 

313def qdel(jobid: int, context: str = "grid") -> None: 

314 """Halts a given job. 

315 

316 Parameters: 

317 

318 jobid: The job identifier as returned by :py:func:`qsub` 

319 

320 context: The setshell context in which we should try a 'qsub'. Normally 

321 you do not need to change the default. This variable can also be 

322 set to a context dictionary in which case we just setup using that 

323 context instead of probing for a new one, what can be fast. 

324 """ 

325 scmd = ["qdel", f"{jobid}"] 

326 

327 logger.debug(f"qdel command '{' '.join(scmd)}'") 

328 

329 from .setshell import sexec 

330 

331 sexec(context, scmd, error_on_nonzero=False) 

332 

333 

334def get_array_job_slice(total_length: int) -> slice: 

335 """A helper function that let's you chunk a list in an SGE array job. 

336 

337 Use this function like ``a = a[get_array_job_slice(len(a))]`` to only 

338 process a chunk of ``a``. 

339 

340 

341 Parameters: 

342 

343 total_length: The length of the list that you are trying to slice 

344 

345 

346 Returns: 

347 

348 A slice to be used. 

349 

350 

351 Raises: 

352 

353 NotImplementedError: If "SGE_TASK_FIRST" and "SGE_TASK_STEPSIZE" are 

354 not 1. 

355 """ 

356 sge_task_id = os.environ.get("SGE_TASK_ID") 

357 

358 if sge_task_id is None: 

359 return slice(None) 

360 

361 try: 

362 sge_task_int = int(sge_task_id) 

363 except Exception: 

364 return slice(None) 

365 

366 if os.environ["SGE_TASK_FIRST"] != "1" or os.environ["SGE_TASK_STEPSIZE"] != "1": 

367 raise NotImplementedError( 

368 "Values other than 1 for SGE_TASK_FIRST and SGE_TASK_STEPSIZE is not supported!" 

369 ) 

370 

371 job_id = sge_task_int - 1 

372 

373 number_of_parallel_jobs = int(os.environ["SGE_TASK_LAST"]) 

374 number_of_objects_per_job = int(math.ceil(total_length / number_of_parallel_jobs)) 

375 

376 start = min(job_id * number_of_objects_per_job, total_length) 

377 end = min((job_id + 1) * number_of_objects_per_job, total_length) 

378 

379 return slice(start, end)