Coverage for src/gridtk/local.py: 73%

175 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Defines the job manager which can help you managing submitted grid jobs.""" 

5 

6import copy 

7import logging 

8import os 

9import subprocess 

10import sys 

11import time 

12 

13from .manager import JobManager 

14from .models import add_job 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class JobManagerLocal(JobManager): 

20 """Manages jobs run in parallel on the local machine.""" 

21 

22 def __init__(self, **kwargs): 

23 """Initializes this object with a state file and a method for 

24 qsub'bing. 

25 

26 Keyword parameters: 

27 

28 statefile 

29 The file containing a valid status database for the manager. If 

30 the file 

31 does not exist it is initialized. If it exists, it is loaded. 

32 """ 

33 JobManager.__init__(self, **kwargs) 

34 

35 def submit( 

36 self, 

37 command_line, 

38 name=None, 

39 array=None, 

40 dependencies=[], 

41 exec_dir=None, 

42 log_dir=None, 

43 dry_run=False, 

44 stop_on_failure=False, 

45 **kwargs, 

46 ): 

47 """Submits a job that will be executed on the local machine during a 

48 call to "run". 

49 

50 All kwargs will simply be ignored. 

51 """ 

52 # remove duplicate dependencies 

53 dependencies = sorted(list(set(dependencies))) 

54 

55 # add job to database 

56 self.lock() 

57 job = add_job( 

58 self.session, 

59 command_line=command_line, 

60 name=name, 

61 dependencies=dependencies, 

62 array=array, 

63 exec_dir=exec_dir, 

64 log_dir=log_dir, 

65 stop_on_failure=stop_on_failure, 

66 ) 

67 logger.info("Added job '%s' to the database", job) 

68 

69 if dry_run: 

70 print( 

71 "Would have added the Job", 

72 job, 

73 "to the database to be executed locally.", 

74 ) 

75 self.session.delete(job) 

76 logger.info( 

77 "Deleted job '%s' from the database due to dry-run option", job 

78 ) 

79 job_id = None 

80 else: 

81 job_id = job.unique 

82 

83 # return the new job id 

84 self.unlock() 

85 return job_id 

86 

87 def resubmit( 

88 self, 

89 job_ids=None, 

90 also_success=False, 

91 running_jobs=False, 

92 new_command=None, 

93 keep_logs=False, 

94 **kwargs, 

95 ): 

96 """Re-submit jobs automatically.""" 

97 self.lock() 

98 # iterate over all jobs 

99 jobs = self.get_jobs(job_ids) 

100 if new_command is not None: 

101 if len(jobs) == 1: 

102 jobs[0].set_command_line(new_command) 

103 else: 

104 logger.warn( 

105 "Ignoring new command since no single job id was specified" 

106 ) 

107 accepted_old_status = ( 

108 ("submitted", "success", "failure") 

109 if also_success 

110 else ( 

111 "submitted", 

112 "failure", 

113 ) 

114 ) 

115 for job in jobs: 

116 # check if this job needs re-submission 

117 if running_jobs or job.status in accepted_old_status: 

118 if job.queue_name != "local" and job.status == "executing": 

119 logger.error( 

120 "Cannot re-submit job '%s' locally since it is still running in the grid. Use 'jman stop' to stop it's execution!", 

121 job, 

122 ) 

123 else: 

124 # re-submit job to the grid 

125 logger.info("Re-submitted job '%s' to the database", job) 

126 if not keep_logs: 

127 self.delete_logs(job) 

128 job.submit("local") 

129 

130 self.session.commit() 

131 self.unlock() 

132 

133 def stop_jobs(self, job_ids=None): 

134 """Resets the status of the job to 'submitted' when they are labeled as 

135 'executing'.""" 

136 self.lock() 

137 

138 jobs = self.get_jobs(job_ids) 

139 for job in jobs: 

140 if ( 

141 job.status in ("executing", "queued", "waiting") 

142 and job.queue_name == "local" 

143 ): 

144 logger.info( 

145 "Reset job '%s' (%s) in the database", 

146 job.name, 

147 self._format_log(job.id), 

148 ) 

149 job.submit() 

150 

151 self.session.commit() 

152 self.unlock() 

153 

154 def stop_job(self, job_id, array_id=None): 

155 """Resets the status of the given to 'submitted' when they are labeled 

156 as 'executing'.""" 

157 self.lock() 

158 

159 job, array_job = self._job_and_array(job_id, array_id) 

160 if job is not None: 

161 if job.status in ("executing", "queued", "waiting"): 

162 logger.info( 

163 "Reset job '%s' (%s) in the database", 

164 job.name, 

165 self._format_log(job.id), 

166 ) 

167 job.status = "submitted" 

168 

169 if array_job is not None and array_job.status in ( 

170 "executing", 

171 "queued", 

172 "waiting", 

173 ): 

174 logger.debug("Reset array job '%s' in the database", array_job) 

175 array_job.status = "submitted" 

176 if array_job is None: 

177 for array_job in job.array: 

178 if array_job.status in ("executing", "queued", "waiting"): 

179 logger.debug( 

180 "Reset array job '%s' in the database", array_job 

181 ) 

182 array_job.status = "submitted" 

183 

184 self.session.commit() 

185 self.unlock() 

186 

187 ############################################################ 

188 # Methods to run the jobs in parallel on the local machine # 

189 ############################################################ 

190 

191 def _run_parallel_job( 

192 self, job_id, array_id=None, no_log=False, nice=None, verbosity=0 

193 ): 

194 """Executes the code for this job on the local machine.""" 

195 environ = copy.deepcopy(os.environ) 

196 environ["JOB_ID"] = str(job_id) 

197 if array_id: 

198 environ["SGE_TASK_ID"] = str(array_id) 

199 else: 

200 environ["SGE_TASK_ID"] = "undefined" 

201 

202 # generate call to the wrapper script 

203 command = [ 

204 self.wrapper_script, 

205 "-l%sd" % ("v" * verbosity), 

206 self._database, 

207 "run-job", 

208 ] 

209 

210 if nice is not None: 

211 command = ["nice", "-n%d" % nice] + command 

212 

213 job, array_job = self._job_and_array(job_id, array_id) 

214 if job is None: 

215 # rare case: job was deleted before starting 

216 return None 

217 

218 logger.info( 

219 "Starting execution of Job '%s' (%s)", 

220 job.name, 

221 self._format_log(job_id, array_id, len(job.array)), 

222 ) 

223 # create log files 

224 if no_log or job.log_dir is None: 

225 out, err = sys.stdout, sys.stderr 

226 else: 

227 os.makedirs(job.log_dir, exist_ok=True) 

228 # create line-buffered files for writing output and error status 

229 if array_job is not None: 

230 out, err = open(array_job.std_out_file(), "w", 1), open( 

231 array_job.std_err_file(), "w", 1 

232 ) 

233 else: 

234 out, err = open(job.std_out_file(), "w", 1), open( 

235 job.std_err_file(), "w", 1 

236 ) 

237 

238 # return the subprocess pipe to the process 

239 try: 

240 return subprocess.Popen( 

241 command, env=environ, stdout=out, stderr=err, bufsize=1 

242 ) 

243 except OSError as e: 

244 logger.error( 

245 "Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s", 

246 job.name, 

247 self._format_log(job_id, array_id, len(job.array)), 

248 e, 

249 " ".join(job.get_command_line()), 

250 "." if job.exec_dir is None else job.exec_dir, 

251 " ".join(command), 

252 ) 

253 job.finish(117, array_id) # ASCII 'O' 

254 return None 

255 

256 def _format_log(self, job_id, array_id=None, array_count=0): 

257 return ( 

258 ("%d (%d/%d)" % (job_id, array_id, array_count)) 

259 if array_id is not None and array_count 

260 else ("%d (%d)" % (job_id, array_id)) 

261 if array_id is not None 

262 else ("%d" % job_id) 

263 ) 

264 

265 def run_scheduler( 

266 self, 

267 parallel_jobs=1, 

268 job_ids=None, 

269 sleep_time=0.1, 

270 die_when_finished=False, 

271 no_log=False, 

272 nice=None, 

273 verbosity=0, 

274 ): 

275 """Starts the scheduler, which is constantly checking for jobs that 

276 should be ran.""" 

277 running_tasks = [] 

278 finished_tasks = set() 

279 try: 

280 # keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught 

281 while True: 

282 # Flag that might be set in some rare cases, and that prevents the scheduler to die 

283 repeat_execution = False 

284 # FIRST, try if there are finished processes 

285 for task_index in range(len(running_tasks) - 1, -1, -1): 

286 task = running_tasks[task_index] 

287 process = task[0] 

288 

289 if process.poll() is not None: 

290 # process ended 

291 job_id = task[1] 

292 array_id = task[2] if len(task) > 2 else None 

293 self.lock() 

294 job, array_job = self._job_and_array(job_id, array_id) 

295 if job is not None: 

296 jj = array_job if array_job is not None else job 

297 result = ( 

298 "%s (%d)" % (jj.status, jj.result) 

299 if jj.result is not None 

300 else "%s (?)" % jj.status 

301 ) 

302 if jj.status not in ("success", "failure"): 

303 logger.error( 

304 "Job '%s' (%s) finished with status '%s' instead of 'success' or 'failure'. Usually this means an internal error. Check your wrapper_script parameter!", 

305 job.name, 

306 self._format_log(job_id, array_id), 

307 jj.status, 

308 ) 

309 raise StopIteration( 

310 "Job did not finish correctly." 

311 ) 

312 logger.info( 

313 "Job '%s' (%s) finished execution with result '%s'", 

314 job.name, 

315 self._format_log(job_id, array_id), 

316 result, 

317 ) 

318 self.unlock() 

319 finished_tasks.add(job_id) 

320 # in any case, remove the job from the list 

321 del running_tasks[task_index] 

322 

323 # SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE 

324 if len(running_tasks) < parallel_jobs: 

325 # get all unfinished jobs: 

326 self.lock() 

327 jobs = self.get_jobs(job_ids) 

328 # put all new jobs into the queue 

329 for job in jobs: 

330 if ( 

331 job.status == "submitted" 

332 and job.queue_name == "local" 

333 ): 

334 job.queue() 

335 

336 # get all unfinished jobs that are submitted to the local queue 

337 unfinished_jobs = [ 

338 job 

339 for job in jobs 

340 if job.status in ("queued", "executing") 

341 and job.queue_name == "local" 

342 ] 

343 for job in unfinished_jobs: 

344 if job.array: 

345 # find array jobs that can run 

346 queued_array_jobs = [ 

347 array_job 

348 for array_job in job.array 

349 if array_job.status == "queued" 

350 ] 

351 if not len(queued_array_jobs): 

352 job.finish(0, -1) 

353 repeat_execution = True 

354 else: 

355 # there are new array jobs to run 

356 for i in range( 

357 min( 

358 parallel_jobs - len(running_tasks), 

359 len(queued_array_jobs), 

360 ) 

361 ): 

362 array_job = queued_array_jobs[i] 

363 # start a new job from the array 

364 process = self._run_parallel_job( 

365 job.unique, 

366 array_job.id, 

367 no_log=no_log, 

368 nice=nice, 

369 verbosity=verbosity, 

370 ) 

371 if process is None: 

372 continue 

373 running_tasks.append( 

374 (process, job.unique, array_job.id) 

375 ) 

376 # we here set the status to executing manually to avoid jobs to be run twice 

377 # e.g., if the loop is executed while the asynchronous job did not start yet 

378 array_job.status = "executing" 

379 job.status = "executing" 

380 if len(running_tasks) == parallel_jobs: 

381 break 

382 else: 

383 if job.status == "queued": 

384 # start a new job 

385 process = self._run_parallel_job( 

386 job.unique, 

387 no_log=no_log, 

388 nice=nice, 

389 verbosity=verbosity, 

390 ) 

391 if process is None: 

392 continue 

393 running_tasks.append((process, job.unique)) 

394 # we here set the status to executing manually to avoid jobs to be run twice 

395 # e.g., if the loop is executed while the asynchronous job did not start yet 

396 job.status = "executing" 

397 if len(running_tasks) == parallel_jobs: 

398 break 

399 

400 self.session.commit() 

401 self.unlock() 

402 

403 # if after the submission of jobs there are no jobs running, we should have finished all the queue. 

404 if ( 

405 die_when_finished 

406 and not repeat_execution 

407 and len(running_tasks) == 0 

408 ): 

409 logger.info( 

410 "Stopping task scheduler since there are no more jobs running." 

411 ) 

412 break 

413 

414 # THIRD: sleep the desired amount of time before re-checking 

415 time.sleep(sleep_time) 

416 

417 # This is the only way to stop: you have to interrupt the scheduler 

418 except (KeyboardInterrupt, StopIteration): 

419 if hasattr(self, "session"): 

420 self.unlock() 

421 logger.info("Stopping task scheduler due to user interrupt.") 

422 for task in running_tasks: 

423 logger.warn( 

424 "Killing job '%s' that was still running.", 

425 self._format_log( 

426 task[1], task[2] if len(task) > 2 else None 

427 ), 

428 ) 

429 try: 

430 task[0].kill() 

431 except OSError as e: 

432 logger.error( 

433 "Killing job '%s' was not successful: '%s'", 

434 self._format_log( 

435 task[1], task[2] if len(task) > 2 else None 

436 ), 

437 e, 

438 ) 

439 self.stop_job(task[1]) 

440 # stop all jobs that are currently running or queued 

441 self.stop_jobs(job_ids) 

442 

443 # check the result of the jobs that we have run, and return the list of failed jobs 

444 self.lock() 

445 jobs = self.get_jobs(finished_tasks) 

446 failures = [job.unique for job in jobs if job.status != "success"] 

447 self.unlock() 

448 return sorted(failures)