Coverage for src/gridtk/local.py: 73%

175 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Defines the job manager which can help you managing submitted grid jobs.""" 

5 

6import copy 

7import logging 

8import os 

9import subprocess 

10import sys 

11import time 

12 

13from .manager import JobManager 

14from .models import add_job 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class JobManagerLocal(JobManager): 

20 """Manages jobs run in parallel on the local machine.""" 

21 

22 def __init__(self, **kwargs): 

23 """Initializes this object with a state file and a method for 

24 qsub'bing. 

25 

26 Keyword parameters: 

27 

28 statefile 

29 The file containing a valid status database for the manager. If 

30 the file 

31 does not exist it is initialized. If it exists, it is loaded. 

32 """ 

33 JobManager.__init__(self, **kwargs) 

34 

35 def submit( 

36 self, 

37 command_line, 

38 name=None, 

39 array=None, 

40 dependencies=[], 

41 exec_dir=None, 

42 log_dir=None, 

43 dry_run=False, 

44 stop_on_failure=False, 

45 **kwargs, 

46 ): 

47 """Submits a job that will be executed on the local machine during a 

48 call to "run". 

49 

50 All kwargs will simply be ignored. 

51 """ 

52 # remove duplicate dependencies 

53 dependencies = sorted(list(set(dependencies))) 

54 

55 # add job to database 

56 self.lock() 

57 job = add_job( 

58 self.session, 

59 command_line=command_line, 

60 name=name, 

61 dependencies=dependencies, 

62 array=array, 

63 exec_dir=exec_dir, 

64 log_dir=log_dir, 

65 stop_on_failure=stop_on_failure, 

66 ) 

67 logger.info("Added job '%s' to the database", job) 

68 

69 if dry_run: 

70 print( 

71 "Would have added the Job", 

72 job, 

73 "to the database to be executed locally.", 

74 ) 

75 self.session.delete(job) 

76 logger.info("Deleted job '%s' from the database due to dry-run option", job) 

77 job_id = None 

78 else: 

79 job_id = job.unique 

80 

81 # return the new job id 

82 self.unlock() 

83 return job_id 

84 

85 def resubmit( 

86 self, 

87 job_ids=None, 

88 also_success=False, 

89 running_jobs=False, 

90 new_command=None, 

91 keep_logs=False, 

92 **kwargs, 

93 ): 

94 """Re-submit jobs automatically.""" 

95 self.lock() 

96 # iterate over all jobs 

97 jobs = self.get_jobs(job_ids) 

98 if new_command is not None: 

99 if len(jobs) == 1: 

100 jobs[0].set_command_line(new_command) 

101 else: 

102 logger.warn("Ignoring new command since no single job id was specified") 

103 accepted_old_status = ( 

104 ("submitted", "success", "failure") 

105 if also_success 

106 else ( 

107 "submitted", 

108 "failure", 

109 ) 

110 ) 

111 for job in jobs: 

112 # check if this job needs re-submission 

113 if running_jobs or job.status in accepted_old_status: 

114 if job.queue_name != "local" and job.status == "executing": 

115 logger.error( 

116 "Cannot re-submit job '%s' locally since it is still running in the grid. Use 'jman stop' to stop it's execution!", 

117 job, 

118 ) 

119 else: 

120 # re-submit job to the grid 

121 logger.info("Re-submitted job '%s' to the database", job) 

122 if not keep_logs: 

123 self.delete_logs(job) 

124 job.submit("local") 

125 

126 self.session.commit() 

127 self.unlock() 

128 

129 def stop_jobs(self, job_ids=None): 

130 """Resets the status of the job to 'submitted' when they are labeled as 

131 'executing'.""" 

132 self.lock() 

133 

134 jobs = self.get_jobs(job_ids) 

135 for job in jobs: 

136 if ( 

137 job.status in ("executing", "queued", "waiting") 

138 and job.queue_name == "local" 

139 ): 

140 logger.info( 

141 "Reset job '%s' (%s) in the database", 

142 job.name, 

143 self._format_log(job.id), 

144 ) 

145 job.submit() 

146 

147 self.session.commit() 

148 self.unlock() 

149 

150 def stop_job(self, job_id, array_id=None): 

151 """Resets the status of the given to 'submitted' when they are labeled 

152 as 'executing'.""" 

153 self.lock() 

154 

155 job, array_job = self._job_and_array(job_id, array_id) 

156 if job is not None: 

157 if job.status in ("executing", "queued", "waiting"): 

158 logger.info( 

159 "Reset job '%s' (%s) in the database", 

160 job.name, 

161 self._format_log(job.id), 

162 ) 

163 job.status = "submitted" 

164 

165 if array_job is not None and array_job.status in ( 

166 "executing", 

167 "queued", 

168 "waiting", 

169 ): 

170 logger.debug("Reset array job '%s' in the database", array_job) 

171 array_job.status = "submitted" 

172 if array_job is None: 

173 for array_job in job.array: 

174 if array_job.status in ("executing", "queued", "waiting"): 

175 logger.debug("Reset array job '%s' in the database", array_job) 

176 array_job.status = "submitted" 

177 

178 self.session.commit() 

179 self.unlock() 

180 

181 ############################################################ 

182 # Methods to run the jobs in parallel on the local machine # 

183 ############################################################ 

184 

185 def _run_parallel_job( 

186 self, job_id, array_id=None, no_log=False, nice=None, verbosity=0 

187 ): 

188 """Executes the code for this job on the local machine.""" 

189 environ = copy.deepcopy(os.environ) 

190 environ["JOB_ID"] = str(job_id) 

191 if array_id: 

192 environ["SGE_TASK_ID"] = str(array_id) 

193 else: 

194 environ["SGE_TASK_ID"] = "undefined" 

195 

196 # generate call to the wrapper script 

197 command = [ 

198 self.wrapper_script, 

199 "-l%sd" % ("v" * verbosity), 

200 self._database, 

201 "run-job", 

202 ] 

203 

204 if nice is not None: 

205 command = ["nice", "-n%d" % nice] + command 

206 

207 job, array_job = self._job_and_array(job_id, array_id) 

208 if job is None: 

209 # rare case: job was deleted before starting 

210 return None 

211 

212 logger.info( 

213 "Starting execution of Job '%s' (%s)", 

214 job.name, 

215 self._format_log(job_id, array_id, len(job.array)), 

216 ) 

217 # create log files 

218 if no_log or job.log_dir is None: 

219 out, err = sys.stdout, sys.stderr 

220 else: 

221 os.makedirs(job.log_dir, exist_ok=True) 

222 # create line-buffered files for writing output and error status 

223 if array_job is not None: 

224 out, err = open(array_job.std_out_file(), "w", 1), open( 

225 array_job.std_err_file(), "w", 1 

226 ) 

227 else: 

228 out, err = open(job.std_out_file(), "w", 1), open( 

229 job.std_err_file(), "w", 1 

230 ) 

231 

232 # return the subprocess pipe to the process 

233 try: 

234 return subprocess.Popen( 

235 command, env=environ, stdout=out, stderr=err, bufsize=1 

236 ) 

237 except OSError as e: 

238 logger.error( 

239 "Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s", 

240 job.name, 

241 self._format_log(job_id, array_id, len(job.array)), 

242 e, 

243 " ".join(job.get_command_line()), 

244 "." if job.exec_dir is None else job.exec_dir, 

245 " ".join(command), 

246 ) 

247 job.finish(117, array_id) # ASCII 'O' 

248 return None 

249 

250 def _format_log(self, job_id, array_id=None, array_count=0): 

251 return ( 

252 ("%d (%d/%d)" % (job_id, array_id, array_count)) 

253 if array_id is not None and array_count 

254 else ("%d (%d)" % (job_id, array_id)) 

255 if array_id is not None 

256 else ("%d" % job_id) 

257 ) 

258 

259 def run_scheduler( 

260 self, 

261 parallel_jobs=1, 

262 job_ids=None, 

263 sleep_time=0.1, 

264 die_when_finished=False, 

265 no_log=False, 

266 nice=None, 

267 verbosity=0, 

268 ): 

269 """Starts the scheduler, which is constantly checking for jobs that 

270 should be ran.""" 

271 running_tasks = [] 

272 finished_tasks = set() 

273 try: 

274 # keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught 

275 while True: 

276 # Flag that might be set in some rare cases, and that prevents the scheduler to die 

277 repeat_execution = False 

278 # FIRST, try if there are finished processes 

279 for task_index in range(len(running_tasks) - 1, -1, -1): 

280 task = running_tasks[task_index] 

281 process = task[0] 

282 

283 if process.poll() is not None: 

284 # process ended 

285 job_id = task[1] 

286 array_id = task[2] if len(task) > 2 else None 

287 self.lock() 

288 job, array_job = self._job_and_array(job_id, array_id) 

289 if job is not None: 

290 jj = array_job if array_job is not None else job 

291 result = ( 

292 "%s (%d)" % (jj.status, jj.result) 

293 if jj.result is not None 

294 else "%s (?)" % jj.status 

295 ) 

296 if jj.status not in ("success", "failure"): 

297 logger.error( 

298 "Job '%s' (%s) finished with status '%s' instead of 'success' or 'failure'. Usually this means an internal error. Check your wrapper_script parameter!", 

299 job.name, 

300 self._format_log(job_id, array_id), 

301 jj.status, 

302 ) 

303 raise StopIteration("Job did not finish correctly.") 

304 logger.info( 

305 "Job '%s' (%s) finished execution with result '%s'", 

306 job.name, 

307 self._format_log(job_id, array_id), 

308 result, 

309 ) 

310 self.unlock() 

311 finished_tasks.add(job_id) 

312 # in any case, remove the job from the list 

313 del running_tasks[task_index] 

314 

315 # SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE 

316 if len(running_tasks) < parallel_jobs: 

317 # get all unfinished jobs: 

318 self.lock() 

319 jobs = self.get_jobs(job_ids) 

320 # put all new jobs into the queue 

321 for job in jobs: 

322 if job.status == "submitted" and job.queue_name == "local": 

323 job.queue() 

324 

325 # get all unfinished jobs that are submitted to the local queue 

326 unfinished_jobs = [ 

327 job 

328 for job in jobs 

329 if job.status in ("queued", "executing") 

330 and job.queue_name == "local" 

331 ] 

332 for job in unfinished_jobs: 

333 if job.array: 

334 # find array jobs that can run 

335 queued_array_jobs = [ 

336 array_job 

337 for array_job in job.array 

338 if array_job.status == "queued" 

339 ] 

340 if not len(queued_array_jobs): 

341 job.finish(0, -1) 

342 repeat_execution = True 

343 else: 

344 # there are new array jobs to run 

345 for i in range( 

346 min( 

347 parallel_jobs - len(running_tasks), 

348 len(queued_array_jobs), 

349 ) 

350 ): 

351 array_job = queued_array_jobs[i] 

352 # start a new job from the array 

353 process = self._run_parallel_job( 

354 job.unique, 

355 array_job.id, 

356 no_log=no_log, 

357 nice=nice, 

358 verbosity=verbosity, 

359 ) 

360 if process is None: 

361 continue 

362 running_tasks.append( 

363 (process, job.unique, array_job.id) 

364 ) 

365 # we here set the status to executing manually to avoid jobs to be run twice 

366 # e.g., if the loop is executed while the asynchronous job did not start yet 

367 array_job.status = "executing" 

368 job.status = "executing" 

369 if len(running_tasks) == parallel_jobs: 

370 break 

371 else: 

372 if job.status == "queued": 

373 # start a new job 

374 process = self._run_parallel_job( 

375 job.unique, 

376 no_log=no_log, 

377 nice=nice, 

378 verbosity=verbosity, 

379 ) 

380 if process is None: 

381 continue 

382 running_tasks.append((process, job.unique)) 

383 # we here set the status to executing manually to avoid jobs to be run twice 

384 # e.g., if the loop is executed while the asynchronous job did not start yet 

385 job.status = "executing" 

386 if len(running_tasks) == parallel_jobs: 

387 break 

388 

389 self.session.commit() 

390 self.unlock() 

391 

392 # if after the submission of jobs there are no jobs running, we should have finished all the queue. 

393 if ( 

394 die_when_finished 

395 and not repeat_execution 

396 and len(running_tasks) == 0 

397 ): 

398 logger.info( 

399 "Stopping task scheduler since there are no more jobs running." 

400 ) 

401 break 

402 

403 # THIRD: sleep the desired amount of time before re-checking 

404 time.sleep(sleep_time) 

405 

406 # This is the only way to stop: you have to interrupt the scheduler 

407 except (KeyboardInterrupt, StopIteration): 

408 if hasattr(self, "session"): 

409 self.unlock() 

410 logger.info("Stopping task scheduler due to user interrupt.") 

411 for task in running_tasks: 

412 logger.warn( 

413 "Killing job '%s' that was still running.", 

414 self._format_log(task[1], task[2] if len(task) > 2 else None), 

415 ) 

416 try: 

417 task[0].kill() 

418 except OSError as e: 

419 logger.error( 

420 "Killing job '%s' was not successful: '%s'", 

421 self._format_log(task[1], task[2] if len(task) > 2 else None), 

422 e, 

423 ) 

424 self.stop_job(task[1]) 

425 # stop all jobs that are currently running or queued 

426 self.stop_jobs(job_ids) 

427 

428 # check the result of the jobs that we have run, and return the list of failed jobs 

429 self.lock() 

430 jobs = self.get_jobs(finished_tasks) 

431 failures = [job.unique for job in jobs if job.status != "success"] 

432 self.unlock() 

433 return sorted(failures)