Coverage for src/gridtk/ 73%

175 statements  

« prev     ^ index     » next v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <> 


3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""Defines the job manager which can help you managing submitted grid jobs.""" 


6import copy 

7import logging 

8import os 

9import subprocess 

10import sys 

11import time 


13from .manager import JobManager 

14from .models import add_job 


16logger = logging.getLogger(__name__) 



19class JobManagerLocal(JobManager): 

20 """Manages jobs run in parallel on the local machine.""" 


22 def __init__(self, **kwargs): 

23 """Initializes this object with a state file and a method for 

24 qsub'bing. 


26 Keyword parameters: 


28 statefile 

29 The file containing a valid status database for the manager. If 

30 the file 

31 does not exist it is initialized. If it exists, it is loaded. 

32 """ 

33 JobManager.__init__(self, **kwargs) 


35 def submit( 

36 self, 

37 command_line, 

38 name=None, 

39 array=None, 

40 dependencies=[], 

41 exec_dir=None, 

42 log_dir=None, 

43 dry_run=False, 

44 stop_on_failure=False, 

45 **kwargs, 

46 ): 

47 """Submits a job that will be executed on the local machine during a 

48 call to "run". 


50 All kwargs will simply be ignored. 

51 """ 

52 # remove duplicate dependencies 

53 dependencies = sorted(list(set(dependencies))) 


55 # add job to database 

56 self.lock() 

57 job = add_job( 

58 self.session, 

59 command_line=command_line, 

60 name=name, 

61 dependencies=dependencies, 

62 array=array, 

63 exec_dir=exec_dir, 

64 log_dir=log_dir, 

65 stop_on_failure=stop_on_failure, 

66 ) 

67"Added job '%s' to the database", job) 


69 if dry_run: 

70 print( 

71 "Would have added the Job", 

72 job, 

73 "to the database to be executed locally.", 

74 ) 

75 self.session.delete(job) 

76"Deleted job '%s' from the database due to dry-run option", job) 

77 job_id = None 

78 else: 

79 job_id = job.unique 


81 # return the new job id 

82 self.unlock() 

83 return job_id 


85 def resubmit( 

86 self, 

87 job_ids=None, 

88 also_success=False, 

89 running_jobs=False, 

90 new_command=None, 

91 keep_logs=False, 

92 **kwargs, 

93 ): 

94 """Re-submit jobs automatically.""" 

95 self.lock() 

96 # iterate over all jobs 

97 jobs = self.get_jobs(job_ids) 

98 if new_command is not None: 

99 if len(jobs) == 1: 

100 jobs[0].set_command_line(new_command) 

101 else: 

102 logger.warn("Ignoring new command since no single job id was specified") 

103 accepted_old_status = ( 

104 ("submitted", "success", "failure") 

105 if also_success 

106 else ( 

107 "submitted", 

108 "failure", 

109 ) 

110 ) 

111 for job in jobs: 

112 # check if this job needs re-submission 

113 if running_jobs or job.status in accepted_old_status: 

114 if job.queue_name != "local" and job.status == "executing": 

115 logger.error( 

116 "Cannot re-submit job '%s' locally since it is still running in the grid. Use 'jman stop' to stop it's execution!", 

117 job, 

118 ) 

119 else: 

120 # re-submit job to the grid 

121"Re-submitted job '%s' to the database", job) 

122 if not keep_logs: 

123 self.delete_logs(job) 

124 job.submit("local") 


126 self.session.commit() 

127 self.unlock() 


129 def stop_jobs(self, job_ids=None): 

130 """Resets the status of the job to 'submitted' when they are labeled as 

131 'executing'.""" 

132 self.lock() 


134 jobs = self.get_jobs(job_ids) 

135 for job in jobs: 

136 if ( 

137 job.status in ("executing", "queued", "waiting") 

138 and job.queue_name == "local" 

139 ): 


141 "Reset job '%s' (%s) in the database", 


143 self._format_log(, 

144 ) 

145 job.submit() 


147 self.session.commit() 

148 self.unlock() 


150 def stop_job(self, job_id, array_id=None): 

151 """Resets the status of the given to 'submitted' when they are labeled 

152 as 'executing'.""" 

153 self.lock() 


155 job, array_job = self._job_and_array(job_id, array_id) 

156 if job is not None: 

157 if job.status in ("executing", "queued", "waiting"): 


159 "Reset job '%s' (%s) in the database", 


161 self._format_log(, 

162 ) 

163 job.status = "submitted" 


165 if array_job is not None and array_job.status in ( 

166 "executing", 

167 "queued", 

168 "waiting", 

169 ): 

170 logger.debug("Reset array job '%s' in the database", array_job) 

171 array_job.status = "submitted" 

172 if array_job is None: 

173 for array_job in job.array: 

174 if array_job.status in ("executing", "queued", "waiting"): 

175 logger.debug("Reset array job '%s' in the database", array_job) 

176 array_job.status = "submitted" 


178 self.session.commit() 

179 self.unlock() 


181 ############################################################ 

182 # Methods to run the jobs in parallel on the local machine # 

183 ############################################################ 


185 def _run_parallel_job( 

186 self, job_id, array_id=None, no_log=False, nice=None, verbosity=0 

187 ): 

188 """Executes the code for this job on the local machine.""" 

189 environ = copy.deepcopy(os.environ) 

190 environ["JOB_ID"] = str(job_id) 

191 if array_id: 

192 environ["SGE_TASK_ID"] = str(array_id) 

193 else: 

194 environ["SGE_TASK_ID"] = "undefined" 


196 # generate call to the wrapper script 

197 command = [ 

198 self.wrapper_script, 

199 "-l%sd" % ("v" * verbosity), 

200 self._database, 

201 "run-job", 

202 ] 


204 if nice is not None: 

205 command = ["nice", "-n%d" % nice] + command 


207 job, array_job = self._job_and_array(job_id, array_id) 

208 if job is None: 

209 # rare case: job was deleted before starting 

210 return None 



213 "Starting execution of Job '%s' (%s)", 


215 self._format_log(job_id, array_id, len(job.array)), 

216 ) 

217 # create log files 

218 if no_log or job.log_dir is None: 

219 out, err = sys.stdout, sys.stderr 

220 else: 

221 os.makedirs(job.log_dir, exist_ok=True) 

222 # create line-buffered files for writing output and error status 

223 if array_job is not None: 

224 out, err = open(array_job.std_out_file(), "w", 1), open( 

225 array_job.std_err_file(), "w", 1 

226 ) 

227 else: 

228 out, err = open(job.std_out_file(), "w", 1), open( 

229 job.std_err_file(), "w", 1 

230 ) 


232 # return the subprocess pipe to the process 

233 try: 

234 return subprocess.Popen( 

235 command, env=environ, stdout=out, stderr=err, bufsize=1 

236 ) 

237 except OSError as e: 

238 logger.error( 

239 "Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s", 


241 self._format_log(job_id, array_id, len(job.array)), 

242 e, 

243 " ".join(job.get_command_line()), 

244 "." if job.exec_dir is None else job.exec_dir, 

245 " ".join(command), 

246 ) 

247 job.finish(117, array_id) # ASCII 'O' 

248 return None 


250 def _format_log(self, job_id, array_id=None, array_count=0): 

251 return ( 

252 ("%d (%d/%d)" % (job_id, array_id, array_count)) 

253 if array_id is not None and array_count 

254 else ("%d (%d)" % (job_id, array_id)) 

255 if array_id is not None 

256 else ("%d" % job_id) 

257 ) 


259 def run_scheduler( 

260 self, 

261 parallel_jobs=1, 

262 job_ids=None, 

263 sleep_time=0.1, 

264 die_when_finished=False, 

265 no_log=False, 

266 nice=None, 

267 verbosity=0, 

268 ): 

269 """Starts the scheduler, which is constantly checking for jobs that 

270 should be ran.""" 

271 running_tasks = [] 

272 finished_tasks = set() 

273 try: 

274 # keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught 

275 while True: 

276 # Flag that might be set in some rare cases, and that prevents the scheduler to die 

277 repeat_execution = False 

278 # FIRST, try if there are finished processes 

279 for task_index in range(len(running_tasks) - 1, -1, -1): 

280 task = running_tasks[task_index] 

281 process = task[0] 


283 if process.poll() is not None: 

284 # process ended 

285 job_id = task[1] 

286 array_id = task[2] if len(task) > 2 else None 

287 self.lock() 

288 job, array_job = self._job_and_array(job_id, array_id) 

289 if job is not None: 

290 jj = array_job if array_job is not None else job 

291 result = ( 

292 "%s (%d)" % (jj.status, jj.result) 

293 if jj.result is not None 

294 else "%s (?)" % jj.status 

295 ) 

296 if jj.status not in ("success", "failure"): 

297 logger.error( 

298 "Job '%s' (%s) finished with status '%s' instead of 'success' or 'failure'. Usually this means an internal error. Check your wrapper_script parameter!", 


300 self._format_log(job_id, array_id), 

301 jj.status, 

302 ) 

303 raise StopIteration("Job did not finish correctly.") 


305 "Job '%s' (%s) finished execution with result '%s'", 


307 self._format_log(job_id, array_id), 

308 result, 

309 ) 

310 self.unlock() 

311 finished_tasks.add(job_id) 

312 # in any case, remove the job from the list 

313 del running_tasks[task_index] 


315 # SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE 

316 if len(running_tasks) < parallel_jobs: 

317 # get all unfinished jobs: 

318 self.lock() 

319 jobs = self.get_jobs(job_ids) 

320 # put all new jobs into the queue 

321 for job in jobs: 

322 if job.status == "submitted" and job.queue_name == "local": 

323 job.queue() 


325 # get all unfinished jobs that are submitted to the local queue 

326 unfinished_jobs = [ 

327 job 

328 for job in jobs 

329 if job.status in ("queued", "executing") 

330 and job.queue_name == "local" 

331 ] 

332 for job in unfinished_jobs: 

333 if job.array: 

334 # find array jobs that can run 

335 queued_array_jobs = [ 

336 array_job 

337 for array_job in job.array 

338 if array_job.status == "queued" 

339 ] 

340 if not len(queued_array_jobs): 

341 job.finish(0, -1) 

342 repeat_execution = True 

343 else: 

344 # there are new array jobs to run 

345 for i in range( 

346 min( 

347 parallel_jobs - len(running_tasks), 

348 len(queued_array_jobs), 

349 ) 

350 ): 

351 array_job = queued_array_jobs[i] 

352 # start a new job from the array 

353 process = self._run_parallel_job( 

354 job.unique, 


356 no_log=no_log, 

357 nice=nice, 

358 verbosity=verbosity, 

359 ) 

360 if process is None: 

361 continue 

362 running_tasks.append( 

363 (process, job.unique, 

364 ) 

365 # we here set the status to executing manually to avoid jobs to be run twice 

366 # e.g., if the loop is executed while the asynchronous job did not start yet 

367 array_job.status = "executing" 

368 job.status = "executing" 

369 if len(running_tasks) == parallel_jobs: 

370 break 

371 else: 

372 if job.status == "queued": 

373 # start a new job 

374 process = self._run_parallel_job( 

375 job.unique, 

376 no_log=no_log, 

377 nice=nice, 

378 verbosity=verbosity, 

379 ) 

380 if process is None: 

381 continue 

382 running_tasks.append((process, job.unique)) 

383 # we here set the status to executing manually to avoid jobs to be run twice 

384 # e.g., if the loop is executed while the asynchronous job did not start yet 

385 job.status = "executing" 

386 if len(running_tasks) == parallel_jobs: 

387 break 


389 self.session.commit() 

390 self.unlock() 


392 # if after the submission of jobs there are no jobs running, we should have finished all the queue. 

393 if ( 

394 die_when_finished 

395 and not repeat_execution 

396 and len(running_tasks) == 0 

397 ): 


399 "Stopping task scheduler since there are no more jobs running." 

400 ) 

401 break 


403 # THIRD: sleep the desired amount of time before re-checking 

404 time.sleep(sleep_time) 


406 # This is the only way to stop: you have to interrupt the scheduler 

407 except (KeyboardInterrupt, StopIteration): 

408 if hasattr(self, "session"): 

409 self.unlock() 

410"Stopping task scheduler due to user interrupt.") 

411 for task in running_tasks: 

412 logger.warn( 

413 "Killing job '%s' that was still running.", 

414 self._format_log(task[1], task[2] if len(task) > 2 else None), 

415 ) 

416 try: 

417 task[0].kill() 

418 except OSError as e: 

419 logger.error( 

420 "Killing job '%s' was not successful: '%s'", 

421 self._format_log(task[1], task[2] if len(task) > 2 else None), 

422 e, 

423 ) 

424 self.stop_job(task[1]) 

425 # stop all jobs that are currently running or queued 

426 self.stop_jobs(job_ids) 


428 # check the result of the jobs that we have run, and return the list of failed jobs 

429 self.lock() 

430 jobs = self.get_jobs(finished_tasks) 

431 failures = [job.unique for job in jobs if job.status != "success"] 

432 self.unlock() 

433 return sorted(failures)