Coverage for src/gridtk/sge.py: 23%

126 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Defines the job manager which can help you managing submitted grid jobs.""" 

5 

6from __future__ import annotations 

7 

8import logging 

9import os 

10import re 

11import sys 

12 

13from .manager import JobManager 

14from .models import Job, add_job 

15from .setshell import environ 

16from .tools import make_shell, qdel, qstat, qsub 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class JobManagerSGE(JobManager): 

22 """The JobManager will submit and control the status of submitted jobs.""" 

23 

24 def __init__(self, context="grid", **kwargs): 

25 """Initializes this object with a state file and a method for 

26 qsub'bing. 

27 

28 Keyword parameters: 

29 

30 statefile 

31 The file containing a valid status database for the manager. If 

32 the file 

33 does not exist it is initialized. If it exists, it is loaded. 

34 

35 context 

36 The context to provide when setting up the environment to call 

37 the SGE 

38 utilities such as qsub, qstat and qdel (normally 'grid', which 

39 also 

40 happens to be default) 

41 """ 

42 self.context = environ(context) 

43 JobManager.__init__(self, **kwargs) 

44 

45 def _queue(self, kwargs): 

46 """The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. 

47 

48 To process it we have to split it twice (',' and then on '='), 

49 create a dictionary and extract just the qname 

50 """ 

51 if "hard resource_list" not in kwargs: 

52 return "all.q" 

53 d = dict( 

54 [k.split("=") for k in kwargs["hard resource_list"].split(",")] 

55 ) 

56 for k in d: 

57 if k[0] == "q" and d[k] == "TRUE": 

58 return k 

59 return "all.q" 

60 

61 def _submit_to_grid( 

62 self, job, name, array, dependencies, log_dir, verbosity, **kwargs 

63 ): 

64 # ... what we will actually submit to the grid is a wrapper script that will call the desired command... 

65 # get the name of the file that was called originally 

66 jman = self.wrapper_script 

67 python = sys.executable 

68 

69 # get the grid id's for the dependencies and remove duplicates 

70 dependent_jobs = self.get_jobs(dependencies) 

71 deps = sorted(list({j.id for j in dependent_jobs})) 

72 

73 # make sure log directory is created and is a directory 

74 os.makedirs(job.log_dir, exist_ok=True) 

75 assert os.path.isdir( 

76 job.log_dir 

77 ), "Please make sure --log-dir `{}' either does not exist or is a directory.".format( 

78 job.log_dir 

79 ) 

80 

81 # generate call to the wrapper script 

82 command = make_shell( 

83 python, 

84 [jman, "-%sd" % ("v" * verbosity), self._database, "run-job"], 

85 ) 

86 q_array = "%d-%d:%d" % array if array else None 

87 grid_id = qsub( 

88 command, 

89 context=self.context, 

90 name=name, 

91 deps=deps, 

92 array=q_array, 

93 stdout=log_dir, 

94 stderr=log_dir, 

95 **kwargs, 

96 ) 

97 

98 # get the result of qstat 

99 status = qstat(grid_id, context=self.context) 

100 

101 # set the grid id of the job 

102 job.queue( 

103 new_job_id=int(status["job_number"]), 

104 new_job_name=status["job_name"], 

105 queue_name=self._queue(status), 

106 ) 

107 

108 logger.info( 

109 "Submitted job '%s' with dependencies '%s' to the SGE grid." 

110 % (job, str(deps)) 

111 ) 

112 

113 if ( 

114 "io_big" in kwargs 

115 and kwargs["io_big"] 

116 and ("queue" not in kwargs or kwargs["queue"] == "all.q") 

117 ): 

118 logger.warn( 

119 "This job will never be executed since the 'io_big' flag is not available for the 'all.q'." 

120 ) 

121 if "pe_opt" in kwargs and ( 

122 "queue" not in kwargs 

123 or kwargs["queue"] 

124 not in ("q1dm", "q_1day_mth", "q1wm", "q_1week_mth") 

125 ): 

126 logger.warn( 

127 "This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead." 

128 % kwargs["queue"] 

129 if "queue" in kwargs 

130 else "all.q" 

131 ) 

132 if ( 

133 "gpumem" in kwargs 

134 and "queue" in kwargs 

135 and kwargs["queue"] in ("gpu", "lgpu", "sgpu", "vsgpu") 

136 and int(re.sub("\\D", "", kwargs["gpumem"])) > 24 

137 ): 

138 logger.warn( 

139 "This job will never be executed since the GPU queue '%s' cannot have more than 24GB of memory." 

140 % kwargs["queue"] 

141 ) 

142 

143 assert job.id == grid_id 

144 return job.unique 

145 

146 def submit( 

147 self, 

148 command_line, 

149 name=None, 

150 array=None, 

151 dependencies=[], 

152 exec_dir=None, 

153 log_dir="logs", 

154 dry_run=False, 

155 verbosity=0, 

156 stop_on_failure=False, 

157 **kwargs, 

158 ): 

159 """Submits a job that will be executed in the grid.""" 

160 # add job to database 

161 self.lock() 

162 job = add_job( 

163 self.session, 

164 command_line, 

165 name, 

166 dependencies, 

167 array, 

168 exec_dir=exec_dir, 

169 log_dir=log_dir, 

170 stop_on_failure=stop_on_failure, 

171 context=self.context, 

172 **kwargs, 

173 ) 

174 logger.info("Added job '%s' to the database." % job) 

175 if dry_run: 

176 print("Would have added the Job") 

177 print(job) 

178 print( 

179 "to the database to be executed in the grid with options:", 

180 str(kwargs), 

181 ) 

182 self.session.delete(job) 

183 logger.info( 

184 "Deleted job '%s' from the database due to dry-run option" % job 

185 ) 

186 job_id = None 

187 

188 else: 

189 job_id = self._submit_to_grid( 

190 job, name, array, dependencies, log_dir, verbosity, **kwargs 

191 ) 

192 

193 self.session.commit() 

194 self.unlock() 

195 

196 return job_id 

197 

198 def communicate(self, job_ids=None): 

199 """Communicates with the SGE grid (using qstat) to see if jobs are 

200 still running.""" 

201 self.lock() 

202 # iterate over all jobs 

203 jobs = self.get_jobs(job_ids) 

204 for job in jobs: 

205 job.refresh() 

206 if ( 

207 job.status in ("queued", "executing", "waiting") 

208 and job.queue_name != "local" 

209 ): 

210 status = qstat(job.id, context=self.context) 

211 if len(status) == 0: 

212 job.status = "failure" 

213 job.result = 70 # ASCII: 'F' 

214 logger.warn( 

215 "The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files." 

216 % job 

217 ) 

218 for array_job in job.array: 

219 if array_job.status in ("queued", "executing"): 

220 array_job.status = "failure" 

221 array_job.result = 70 # ASCII: 'F' 

222 

223 self.session.commit() 

224 self.unlock() 

225 

226 def resubmit( 

227 self, 

228 job_ids=None, 

229 also_success=False, 

230 running_jobs=False, 

231 new_command=None, 

232 verbosity=0, 

233 keep_logs=False, 

234 **kwargs, 

235 ): 

236 """Re-submit jobs automatically.""" 

237 self.lock() 

238 # iterate over all jobs 

239 jobs = self.get_jobs(job_ids) 

240 if new_command is not None: 

241 if len(jobs) == 1: 

242 jobs[0].set_command_line(new_command) 

243 else: 

244 logger.warn( 

245 "Ignoring new command since no single job id was specified" 

246 ) 

247 accepted_old_status = ( 

248 ("submitted", "success", "failure") 

249 if also_success 

250 else ( 

251 "submitted", 

252 "failure", 

253 ) 

254 ) 

255 for job in jobs: 

256 # check if this job needs re-submission 

257 if running_jobs or job.status in accepted_old_status: 

258 grid_status = qstat(job.id, context=self.context) 

259 if len(grid_status) != 0: 

260 logger.warn( 

261 "Deleting job '%d' since it was still running in the grid." 

262 % job.unique 

263 ) 

264 qdel(job.id, context=self.context) 

265 # re-submit job to the grid 

266 arguments = job.get_arguments() 

267 arguments.update(**kwargs) 

268 if "queue" not in arguments or arguments["queue"] == "all.q": 

269 for arg in ("hvmem", "pe_opt", "io_big"): 

270 if arg in arguments: 

271 del arguments[arg] 

272 job.set_arguments(kwargs=arguments) 

273 # delete old status and result of the job 

274 if not keep_logs: 

275 self.delete_logs(job) 

276 job.submit() 

277 if job.queue_name == "local" and "queue" not in arguments: 

278 logger.warn( 

279 "Re-submitting job '%s' locally (since no queue name is specified)." 

280 % job 

281 ) 

282 else: 

283 deps = [dep.unique for dep in job.get_jobs_we_wait_for()] 

284 logger.debug( 

285 "Re-submitting job '%s' with dependencies '%s' to the grid." 

286 % (job, deps) 

287 ) 

288 self._submit_to_grid( 

289 job, 

290 job.name, 

291 job.get_array(), 

292 deps, 

293 job.log_dir, 

294 verbosity, 

295 **arguments, 

296 ) 

297 

298 # commit after each job to avoid failures of not finding the job during execution in the grid 

299 self.session.commit() 

300 self.unlock() 

301 

302 def run_job(self, job_id, array_id=None): 

303 """Overwrites the run-job command from the manager to extract the 

304 correct job id before calling base class implementation.""" 

305 # get the unique job id from the given grid id 

306 self.lock() 

307 jobs = list(self.session.query(Job).filter(Job.id == job_id)) 

308 if len(jobs) != 1: 

309 self.unlock() 

310 raise ValueError( 

311 "Could not find job id '%d' in the database'" % job_id 

312 ) 

313 job_id = jobs[0].unique 

314 self.unlock() 

315 # call base class implementation with the corrected job id 

316 return JobManager.run_job(self, job_id, array_id) 

317 

318 def stop_jobs(self, job_ids): 

319 """Stops the jobs in the grid.""" 

320 self.lock() 

321 

322 jobs = self.get_jobs(job_ids) 

323 for job in jobs: 

324 if job.status in ("executing", "queued", "waiting"): 

325 qdel(job.id, context=self.context) 

326 logger.info("Stopped job '%s' in the SGE grid." % job) 

327 job.submit() 

328 

329 self.session.commit() 

330 self.unlock()