Coverage for src/gridtk/sge.py: 23%

126 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Defines the job manager which can help you managing submitted grid jobs.""" 

5 

6from __future__ import annotations 

7 

8import logging 

9import os 

10import re 

11import sys 

12 

13from .manager import JobManager 

14from .models import Job, add_job 

15from .setshell import environ 

16from .tools import make_shell, qdel, qstat, qsub 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class JobManagerSGE(JobManager): 

22 """The JobManager will submit and control the status of submitted jobs.""" 

23 

24 def __init__(self, context="grid", **kwargs): 

25 """Initializes this object with a state file and a method for 

26 qsub'bing. 

27 

28 Keyword parameters: 

29 

30 statefile 

31 The file containing a valid status database for the manager. If 

32 the file 

33 does not exist it is initialized. If it exists, it is loaded. 

34 

35 context 

36 The context to provide when setting up the environment to call 

37 the SGE 

38 utilities such as qsub, qstat and qdel (normally 'grid', which 

39 also 

40 happens to be default) 

41 """ 

42 self.context = environ(context) 

43 JobManager.__init__(self, **kwargs) 

44 

45 def _queue(self, kwargs): 

46 """The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. 

47 

48 To process it we have to split it twice (',' and then on '='), 

49 create a dictionary and extract just the qname 

50 """ 

51 if "hard resource_list" not in kwargs: 

52 return "all.q" 

53 d = dict([k.split("=") for k in kwargs["hard resource_list"].split(",")]) 

54 for k in d: 

55 if k[0] == "q" and d[k] == "TRUE": 

56 return k 

57 return "all.q" 

58 

59 def _submit_to_grid( 

60 self, job, name, array, dependencies, log_dir, verbosity, **kwargs 

61 ): 

62 # ... what we will actually submit to the grid is a wrapper script that will call the desired command... 

63 # get the name of the file that was called originally 

64 jman = self.wrapper_script 

65 python = sys.executable 

66 

67 # get the grid id's for the dependencies and remove duplicates 

68 dependent_jobs = self.get_jobs(dependencies) 

69 deps = sorted(list({j.id for j in dependent_jobs})) 

70 

71 # make sure log directory is created and is a directory 

72 os.makedirs(job.log_dir, exist_ok=True) 

73 assert os.path.isdir( 

74 job.log_dir 

75 ), "Please make sure --log-dir `{}' either does not exist or is a directory.".format( 

76 job.log_dir 

77 ) 

78 

79 # generate call to the wrapper script 

80 command = make_shell( 

81 python, 

82 [jman, "-%sd" % ("v" * verbosity), self._database, "run-job"], 

83 ) 

84 q_array = "%d-%d:%d" % array if array else None 

85 grid_id = qsub( 

86 command, 

87 context=self.context, 

88 name=name, 

89 deps=deps, 

90 array=q_array, 

91 stdout=log_dir, 

92 stderr=log_dir, 

93 **kwargs, 

94 ) 

95 

96 # get the result of qstat 

97 status = qstat(grid_id, context=self.context) 

98 

99 # set the grid id of the job 

100 job.queue( 

101 new_job_id=int(status["job_number"]), 

102 new_job_name=status["job_name"], 

103 queue_name=self._queue(status), 

104 ) 

105 

106 logger.info( 

107 "Submitted job '%s' with dependencies '%s' to the SGE grid." 

108 % (job, str(deps)) 

109 ) 

110 

111 if ( 

112 "io_big" in kwargs 

113 and kwargs["io_big"] 

114 and ("queue" not in kwargs or kwargs["queue"] == "all.q") 

115 ): 

116 logger.warn( 

117 "This job will never be executed since the 'io_big' flag is not available for the 'all.q'." 

118 ) 

119 if "pe_opt" in kwargs and ( 

120 "queue" not in kwargs 

121 or kwargs["queue"] not in ("q1dm", "q_1day_mth", "q1wm", "q_1week_mth") 

122 ): 

123 logger.warn( 

124 "This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead." 

125 % kwargs["queue"] 

126 if "queue" in kwargs 

127 else "all.q" 

128 ) 

129 if ( 

130 "gpumem" in kwargs 

131 and "queue" in kwargs 

132 and kwargs["queue"] in ("gpu", "lgpu", "sgpu", "vsgpu") 

133 and int(re.sub("\\D", "", kwargs["gpumem"])) > 24 

134 ): 

135 logger.warn( 

136 "This job will never be executed since the GPU queue '%s' cannot have more than 24GB of memory." 

137 % kwargs["queue"] 

138 ) 

139 

140 assert job.id == grid_id 

141 return job.unique 

142 

143 def submit( 

144 self, 

145 command_line, 

146 name=None, 

147 array=None, 

148 dependencies=[], 

149 exec_dir=None, 

150 log_dir="logs", 

151 dry_run=False, 

152 verbosity=0, 

153 stop_on_failure=False, 

154 **kwargs, 

155 ): 

156 """Submits a job that will be executed in the grid.""" 

157 # add job to database 

158 self.lock() 

159 job = add_job( 

160 self.session, 

161 command_line, 

162 name, 

163 dependencies, 

164 array, 

165 exec_dir=exec_dir, 

166 log_dir=log_dir, 

167 stop_on_failure=stop_on_failure, 

168 context=self.context, 

169 **kwargs, 

170 ) 

171 logger.info("Added job '%s' to the database." % job) 

172 if dry_run: 

173 print("Would have added the Job") 

174 print(job) 

175 print( 

176 "to the database to be executed in the grid with options:", 

177 str(kwargs), 

178 ) 

179 self.session.delete(job) 

180 logger.info( 

181 "Deleted job '%s' from the database due to dry-run option" % job 

182 ) 

183 job_id = None 

184 

185 else: 

186 job_id = self._submit_to_grid( 

187 job, name, array, dependencies, log_dir, verbosity, **kwargs 

188 ) 

189 

190 self.session.commit() 

191 self.unlock() 

192 

193 return job_id 

194 

195 def communicate(self, job_ids=None): 

196 """Communicates with the SGE grid (using qstat) to see if jobs are 

197 still running.""" 

198 self.lock() 

199 # iterate over all jobs 

200 jobs = self.get_jobs(job_ids) 

201 for job in jobs: 

202 job.refresh() 

203 if ( 

204 job.status in ("queued", "executing", "waiting") 

205 and job.queue_name != "local" 

206 ): 

207 status = qstat(job.id, context=self.context) 

208 if len(status) == 0: 

209 job.status = "failure" 

210 job.result = 70 # ASCII: 'F' 

211 logger.warn( 

212 "The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files." 

213 % job 

214 ) 

215 for array_job in job.array: 

216 if array_job.status in ("queued", "executing"): 

217 array_job.status = "failure" 

218 array_job.result = 70 # ASCII: 'F' 

219 

220 self.session.commit() 

221 self.unlock() 

222 

223 def resubmit( 

224 self, 

225 job_ids=None, 

226 also_success=False, 

227 running_jobs=False, 

228 new_command=None, 

229 verbosity=0, 

230 keep_logs=False, 

231 **kwargs, 

232 ): 

233 """Re-submit jobs automatically.""" 

234 self.lock() 

235 # iterate over all jobs 

236 jobs = self.get_jobs(job_ids) 

237 if new_command is not None: 

238 if len(jobs) == 1: 

239 jobs[0].set_command_line(new_command) 

240 else: 

241 logger.warn("Ignoring new command since no single job id was specified") 

242 accepted_old_status = ( 

243 ("submitted", "success", "failure") 

244 if also_success 

245 else ( 

246 "submitted", 

247 "failure", 

248 ) 

249 ) 

250 for job in jobs: 

251 # check if this job needs re-submission 

252 if running_jobs or job.status in accepted_old_status: 

253 grid_status = qstat(job.id, context=self.context) 

254 if len(grid_status) != 0: 

255 logger.warn( 

256 "Deleting job '%d' since it was still running in the grid." 

257 % job.unique 

258 ) 

259 qdel(job.id, context=self.context) 

260 # re-submit job to the grid 

261 arguments = job.get_arguments() 

262 arguments.update(**kwargs) 

263 if "queue" not in arguments or arguments["queue"] == "all.q": 

264 for arg in ("hvmem", "pe_opt", "io_big"): 

265 if arg in arguments: 

266 del arguments[arg] 

267 job.set_arguments(kwargs=arguments) 

268 # delete old status and result of the job 

269 if not keep_logs: 

270 self.delete_logs(job) 

271 job.submit() 

272 if job.queue_name == "local" and "queue" not in arguments: 

273 logger.warn( 

274 "Re-submitting job '%s' locally (since no queue name is specified)." 

275 % job 

276 ) 

277 else: 

278 deps = [dep.unique for dep in job.get_jobs_we_wait_for()] 

279 logger.debug( 

280 "Re-submitting job '%s' with dependencies '%s' to the grid." 

281 % (job, deps) 

282 ) 

283 self._submit_to_grid( 

284 job, 

285 job.name, 

286 job.get_array(), 

287 deps, 

288 job.log_dir, 

289 verbosity, 

290 **arguments, 

291 ) 

292 

293 # commit after each job to avoid failures of not finding the job during execution in the grid 

294 self.session.commit() 

295 self.unlock() 

296 

297 def run_job(self, job_id, array_id=None): 

298 """Overwrites the run-job command from the manager to extract the 

299 correct job id before calling base class implementation.""" 

300 # get the unique job id from the given grid id 

301 self.lock() 

302 jobs = list(self.session.query(Job).filter(Job.id == job_id)) 

303 if len(jobs) != 1: 

304 self.unlock() 

305 raise ValueError("Could not find job id '%d' in the database'" % job_id) 

306 job_id = jobs[0].unique 

307 self.unlock() 

308 # call base class implementation with the corrected job id 

309 return JobManager.run_job(self, job_id, array_id) 

310 

311 def stop_jobs(self, job_ids): 

312 """Stops the jobs in the grid.""" 

313 self.lock() 

314 

315 jobs = self.get_jobs(job_ids) 

316 for job in jobs: 

317 if job.status in ("executing", "queued", "waiting"): 

318 qdel(job.id, context=self.context) 

319 logger.info("Stopped job '%s' in the SGE grid." % job) 

320 job.submit() 

321 

322 self.session.commit() 

323 self.unlock()