Coverage for src/gridtk/tools.py: 22%

121 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Functions that replace shell-based utilities for grid submission and 

5probing.""" 

6 

7from __future__ import annotations 

8 

9import logging 

10import math 

11import os 

12import re 

13import shlex 

14 

15logger = logging.getLogger(__name__) 

16 

17# Constant regular expressions 

18QSTAT_FIELD_SEPARATOR = re.compile(":\\s+") 

19 

20# Name of the user configuration file at $XDG_CONFIG_HOME 

21USER_CONFIGURATION = "gridtk.toml" 

22 

23 

24def str_(v: str | bytes) -> str: 

25 """Always returns the string representation of the given ``name``. 

26 

27 If it is a bytes object, it will be converted into str. 

28 

29 If it is a str object, it will simply be resurned. 

30 

31 

32 Parameters: 

33 

34 v: a bytes or str object 

35 

36 

37 Returns: 

38 

39 A str object 

40 """ 

41 return v if not isinstance(v, bytes) else v.decode() 

42 

43 

44def qsub( 

45 command, 

46 queue=None, 

47 cwd=True, 

48 name=None, 

49 deps=[], 

50 stdout="", 

51 stderr="", 

52 env=[], 

53 array=None, 

54 context="grid", 

55 hostname=None, 

56 memfree=None, 

57 hvmem=None, 

58 gpumem=None, 

59 pe_opt=None, 

60 io_big=False, 

61 sge_extra_args="", 

62): 

63 """Submits a shell job to a given grid queue. 

64 

65 Keyword parameters: 

66 

67 command 

68 The command to be submitted to the grid 

69 

70 queue 

71 A valid queue name or None, to use the default queue 

72 

73 cwd 

74 If the job should change to the current working directory before starting 

75 

76 name 

77 An optional name to set for the job. If not given, defaults to the script 

78 name being launched. 

79 

80 deps 

81 Job ids to which this job will be dependent on 

82 

83 stdout 

84 The standard output directory. If not given, defaults to what qsub has as a 

85 default. 

86 

87 stderr 

88 The standard error directory (if not given, defaults to the stdout 

89 directory). 

90 

91 env 

92 This is a list of extra variables that will be set on the environment 

93 running the command of your choice. 

94 

95 array 

96 If set should be either: 

97 

98 1. a string in the form m[-n[:s]] which indicates the starting range 'm', 

99 the closing range 'n' and the step 's'. 

100 2. an integer value indicating the total number of jobs to be submitted. 

101 This is equivalent ot set the parameter to a string "1-k:1" where "k" is 

102 the passed integer value 

103 3. a tuple that contains either 1, 2 or 3 elements indicating the start, 

104 end and step arguments ("m", "n", "s"). 

105 

106 The minimum value for "m" is 1. Giving "0" is an error. 

107 

108 If submitted with this option, the job to be created will be an SGE 

109 parametric job. In this mode SGE does not allow individual control of each 

110 job. The environment variable SGE_TASK_ID will be set on the executing 

111 process automatically by SGE and indicates the unique identifier in the 

112 range for which the current job instance is for. 

113 

114 context 

115 The setshell context in which we should try a 'qsub'. Normally you don't 

116 need to change the default. This variable can also be set to a context 

117 dictionary in which case we just setup using that context instead of 

118 probing for a new one, what can be fast. 

119 

120 memfree 

121 If set, it asks the queue for a node with a minimum amount of memory 

122 Used only if mem is not set 

123 (cf. qsub -l mem_free=<...>) 

124 

125 hvmem 

126 If set, it asks the queue for a node with a minimum amount of memory 

127 Used only if mem is not set 

128 (cf. qsub -l h_vmem=<...>) 

129 

130 gpumem 

131 Applicable only for GPU-based queues. If set, it asks for the GPU queue 

132 with a minimum amount of memory. The amount should not be more than 24. 

133 (cf. qsub -l gpumem=<...>) 

134 

135 hostname 

136 If set, it asks the queue to use only a subset of the available nodes 

137 Symbols: | for OR, & for AND, ! for NOT, etc. 

138 (cf. qsub -l hostname=<...>) 

139 

140 pe_opt 

141 If set, add a -pe option when launching a job (for instance pe_exclusive* 1-) 

142 

143 io_big 

144 If set to true, the io_big flag will be set. 

145 Use this flag if your process will need a lot of Input/Output operations. 

146 

147 sge_extra_args 

148 This is used to send extra argument to SGE. Note that all its arguments are directly 

149 used in `qsub` command. For example, `jman submit -e "-P project_name -l pytorch=true" -- ...` will 

150 be translated to `qsub -P project_name -l pytorch=true -- ...` 

151 

152 

153 Returns the job id assigned to this job (integer) 

154 """ 

155 from clapper.rc import UserDefaults 

156 

157 scmd = ["qsub"] 

158 

159 defaults = UserDefaults(USER_CONFIGURATION) 

160 

161 prepend = defaults.get("sge-extra-args-prepend", "") 

162 sge_extra_args = f"{prepend} {sge_extra_args or ''}" 

163 scmd += shlex.split(sge_extra_args) 

164 

165 if isinstance(queue, (str, bytes)) and queue not in ( 

166 "all.q", 

167 "default", 

168 ): 

169 scmd += ["-l", queue] 

170 

171 if memfree: 

172 scmd += ["-l", "mem_free=%s" % memfree] 

173 if hvmem: 

174 scmd += ["-l", "h_vmem=%s" % hvmem] 

175 

176 if gpumem: 

177 scmd += ["-l", "gpumem=%s" % gpumem] 

178 

179 if io_big: 

180 scmd += ["-l", "io_big"] 

181 

182 if hostname: 

183 scmd += ["-l", "hostname=%s" % hostname] 

184 

185 if pe_opt: 

186 scmd += ["-pe"] + pe_opt.split() 

187 

188 if cwd: 

189 scmd += ["-cwd"] 

190 

191 if name: 

192 scmd += ["-N", name] 

193 

194 if deps: 

195 scmd += ["-hold_jid", ",".join(["%d" % k for k in deps])] 

196 

197 if stdout: 

198 if not cwd: 

199 # pivot, temporarily, to home directory 

200 curdir = os.path.realpath(os.curdir) 

201 os.chdir(os.environ["HOME"]) 

202 

203 if not os.path.exists(stdout): 

204 os.makedirs(stdout, exist_ok=True) 

205 

206 if not cwd: 

207 # go back 

208 os.chdir(os.path.realpath(curdir)) 

209 

210 scmd += ["-o", stdout] 

211 

212 if stderr: 

213 os.makedirs(stderr, exist_ok=True) 

214 scmd += ["-e", stderr] 

215 elif stdout: # just re-use the stdout settings 

216 scmd += ["-e", stdout] 

217 

218 scmd += [ 

219 "-terse" 

220 ] # simplified job identifiers returned by the command line 

221 

222 for k in env: 

223 scmd += ["-v", k] 

224 

225 if array is not None: 

226 scmd.append("-t") 

227 if isinstance(array, (str, bytes)): 

228 try: 

229 i = int(array) 

230 scmd.append("1-%d:1" % i) 

231 except ValueError: 

232 # must be complete... 

233 scmd.append("%s" % array) 

234 if isinstance(array, int): 

235 scmd.append("1-%d:1" % array) 

236 if isinstance(array, (tuple, list)): 

237 if len(array) < 1 or len(array) > 3: 

238 raise RuntimeError( 

239 "Array tuple should have length between 1 and 3" 

240 ) 

241 elif len(array) == 1: 

242 scmd.append("%s" % array[0]) 

243 elif len(array) == 2: 

244 scmd.append(f"{array[0]}-{array[1]}") 

245 elif len(array) == 3: 

246 scmd.append(f"{array[0]}-{array[1]}:{array[2]}") 

247 

248 if not isinstance(command, (list, tuple)): 

249 command = [command] 

250 scmd += command 

251 

252 logger.debug("Qsub command '%s'", " ".join(scmd)) 

253 

254 from .setshell import sexec 

255 

256 jobid = str_(sexec(context, scmd)) 

257 return int(jobid.split("\n")[-1].split(".", 1)[0]) 

258 

259 

260def make_shell(shell, command): 

261 """Returns a single command given a shell and a command to be qsub'ed. 

262 

263 Keyword parameters: 

264 

265 shell 

266 The path to the shell to use when submitting the job. 

267 

268 command 

269 The script path to be submitted 

270 

271 Returns the command parameters to be supplied to qsub() 

272 """ 

273 return ("-S", shell) + tuple(command) 

274 

275 

276def qstat(jobid, context="grid"): 

277 """Queries status of a given job. 

278 

279 Keyword parameters: 

280 

281 jobid 

282 The job identifier as returned by qsub() 

283 

284 context 

285 The setshell context in which we should try a 'qsub'. Normally you 

286 don't 

287 need to change the default. This variable can also be set to a 

288 context 

289 dictionary in which case we just setup using that context instead of 

290 probing for a new one, what can be fast. 

291 

292 Returns a dictionary with the specific job properties 

293 """ 

294 scmd = ["qstat", "-j", "%d" % jobid, "-f"] 

295 

296 logger.debug("Qstat command '%s'", " ".join(scmd)) 

297 

298 from .setshell import sexec 

299 

300 data = str_(sexec(context, scmd, error_on_nonzero=False)) 

301 

302 # some parsing: 

303 retval = {} 

304 for line in data.split("\n"): 

305 s = line.strip() 

306 if s.lower().find("do not exist") != -1: 

307 return {} 

308 if not s or s.find(10 * "=") != -1: 

309 continue 

310 kv = QSTAT_FIELD_SEPARATOR.split(s, 1) 

311 if len(kv) == 2: 

312 retval[kv[0]] = kv[1] 

313 

314 return retval 

315 

316 

317def qdel(jobid: int, context: str = "grid") -> None: 

318 """Halts a given job. 

319 

320 Parameters: 

321 

322 jobid: The job identifier as returned by :py:func:`qsub` 

323 

324 context: The setshell context in which we should try a 'qsub'. Normally 

325 you do not need to change the default. This variable can also be 

326 set to a context dictionary in which case we just setup using that 

327 context instead of probing for a new one, what can be fast. 

328 """ 

329 scmd = ["qdel", f"{jobid}"] 

330 

331 logger.debug(f"qdel command '{' '.join(scmd)}'") 

332 

333 from .setshell import sexec 

334 

335 sexec(context, scmd, error_on_nonzero=False) 

336 

337 

338def get_array_job_slice(total_length: int) -> slice: 

339 """A helper function that let's you chunk a list in an SGE array job. 

340 

341 Use this function like ``a = a[get_array_job_slice(len(a))]`` to only 

342 process a chunk of ``a``. 

343 

344 

345 Parameters: 

346 

347 total_length: The length of the list that you are trying to slice 

348 

349 

350 Returns: 

351 

352 A slice to be used. 

353 

354 

355 Raises: 

356 

357 NotImplementedError: If "SGE_TASK_FIRST" and "SGE_TASK_STEPSIZE" are 

358 not 1. 

359 """ 

360 sge_task_id = os.environ.get("SGE_TASK_ID") 

361 

362 if sge_task_id is None: 

363 return slice(None) 

364 

365 try: 

366 sge_task_int = int(sge_task_id) 

367 except Exception: 

368 return slice(None) 

369 

370 if ( 

371 os.environ["SGE_TASK_FIRST"] != "1" 

372 or os.environ["SGE_TASK_STEPSIZE"] != "1" 

373 ): 

374 raise NotImplementedError( 

375 "Values other than 1 for SGE_TASK_FIRST and SGE_TASK_STEPSIZE is not supported!" 

376 ) 

377 

378 job_id = sge_task_int - 1 

379 

380 number_of_parallel_jobs = int(os.environ["SGE_TASK_LAST"]) 

381 number_of_objects_per_job = int( 

382 math.ceil(total_length / number_of_parallel_jobs) 

383 ) 

384 

385 start = min(job_id * number_of_objects_per_job, total_length) 

386 end = min((job_id + 1) * number_of_objects_per_job, total_length) 

387 

388 return slice(start, end)