Coverage for src/gridtk/manager.py: 82%

250 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5import logging 

6import os 

7import socket # to get the host name 

8import subprocess 

9 

10from shutil import which 

11 

12import sqlalchemy 

13 

14from .models import ArrayJob, Base, Job, Status, times 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class JobManager: 

20 """This job manager defines the basic interface for handling jobs in the 

21 SQL database.""" 

22 

23 def __init__( 

24 self, database="submitted.sql3", wrapper_script=None, debug=False 

25 ): 

26 self._database = os.path.realpath(database) 

27 self._engine = sqlalchemy.create_engine( 

28 "sqlite:///" + self._database, 

29 connect_args={"timeout": 600}, 

30 echo=debug, 

31 ) 

32 self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine) 

33 

34 # store the command that this job manager was called with 

35 if wrapper_script is None: 

36 wrapper_script = "jman" 

37 if not os.path.exists(wrapper_script): 

38 bindir = os.path.join(os.path.realpath(os.curdir), "bin") 

39 wrapper_script = which( 

40 wrapper_script, 

41 path=os.pathsep.join((bindir, os.environ["PATH"])), 

42 ) 

43 

44 if wrapper_script is None: 

45 raise OSError( 

46 "Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager." 

47 ) 

48 if not os.path.exists(wrapper_script): 

49 raise OSError( 

50 "Your wrapper_script cannot be found. Jobs will not be executable." 

51 ) 

52 self.wrapper_script = wrapper_script 

53 

54 def __del__(self): 

55 # remove the database if it is empty 

56 if os.path.isfile(self._database): 

57 # in errornous cases, the session might still be active, so don't create a deadlock here! 

58 if not hasattr(self, "session"): 

59 self.lock() 

60 job_count = len(self.get_jobs()) 

61 self.unlock() 

62 if not job_count: 

63 logger.debug( 

64 "Removed database file '%s' since database is empty" 

65 % self._database 

66 ) 

67 os.remove(self._database) 

68 

69 def lock(self): 

70 """Generates (and returns) a blocking session object to the 

71 database.""" 

72 if hasattr(self, "session"): 

73 raise RuntimeError( 

74 "Dead lock detected. Please do not try to lock the session when it is already locked!" 

75 ) 

76 

77 # create the database if it does not exist yet 

78 if not os.path.exists(self._database): 

79 self._create() 

80 

81 # now, create a session 

82 self.session = self._session_maker() 

83 logger.debug("Created new database session to '%s'" % self._database) 

84 return self.session 

85 

86 def unlock(self): 

87 """Closes the session to the database.""" 

88 if not hasattr(self, "session"): 

89 raise RuntimeError( 

90 "Error detected! The session that you want to close does not exist any more!" 

91 ) 

92 logger.debug("Closed database session of '%s'" % self._database) 

93 self.session.close() 

94 del self.session 

95 

96 def _create(self): 

97 """Creates a new and empty database.""" 

98 # create directory for sql database 

99 os.makedirs(os.path.dirname(self._database), exist_ok=True) 

100 

101 # create all the tables 

102 Base.metadata.create_all(self._engine) 

103 logger.debug("Created new empty database '%s'" % self._database) 

104 

105 def get_jobs(self, job_ids=None): 

106 """Returns a list of jobs that are stored in the database.""" 

107 if job_ids is not None and len(job_ids) == 0: 

108 return [] 

109 q = self.session.query(Job) 

110 if job_ids is not None: 

111 q = q.filter(Job.unique.in_(job_ids)) 

112 return sorted(list(q), key=lambda job: job.unique) 

113 

114 def _job_and_array(self, job_id, array_id=None): 

115 # get the job (and the array job) with the given id(s) 

116 job = self.get_jobs((job_id,)) 

117 if len(job) > 1: 

118 logger.error( 

119 "%d jobs with the same ID '%d' were detected in the database" 

120 % (len(job), job_id) 

121 ) 

122 elif not len(job): 

123 logger.error( 

124 "Job with ID '%d' was not found in the database." % job_id 

125 ) 

126 return (None, None) 

127 

128 job = job[0] 

129 unique_id = job.unique 

130 

131 if array_id is not None: 

132 array_job = list( 

133 self.session.query(ArrayJob) 

134 .filter(ArrayJob.job_id == unique_id) 

135 .filter(ArrayJob.id == array_id) 

136 ) 

137 assert len(array_job) == 1 

138 return (job, array_job[0]) 

139 else: 

140 return (job, None) 

141 

142 def run_job(self, job_id, array_id=None): 

143 """This function is called to run a job (e.g. in the grid) with the 

144 given id and the given array index if applicable.""" 

145 # set the job's status in the database 

146 try: 

147 # get the job from the database 

148 self.lock() 

149 jobs = self.get_jobs((job_id,)) 

150 if not len(jobs): 

151 # it seems that the job has been deleted in the meanwhile 

152 return 

153 job = jobs[0] 

154 

155 # get the machine name we are executing on; this might only work at idiap 

156 machine_name = socket.gethostname() 

157 

158 # set the 'executing' status to the job 

159 job.execute(array_id, machine_name) 

160 

161 self.session.commit() 

162 except Exception as e: 

163 logger.error("Caught exception '%s'", e) 

164 pass 

165 finally: 

166 self.unlock() 

167 

168 # get the command line of the job from the database; does not need write access 

169 self.lock() 

170 job = self.get_jobs((job_id,))[0] 

171 command_line = job.get_command_line() 

172 exec_dir = job.get_exec_dir() 

173 self.unlock() 

174 

175 logger.info("Starting job %d: %s", job_id, " ".join(command_line)) 

176 

177 # execute the command line of the job, and wait until it has finished 

178 try: 

179 result = subprocess.call(command_line, cwd=exec_dir) 

180 logger.info("Job %d finished with result %s", job_id, str(result)) 

181 except Exception as e: 

182 logger.error( 

183 "The job with id '%d' could not be executed: %s", job_id, e 

184 ) 

185 result = 69 # ASCII: 'E' 

186 

187 # set a new status and the results of the job 

188 try: 

189 self.lock() 

190 jobs = self.get_jobs((job_id,)) 

191 if not len(jobs): 

192 # it seems that the job has been deleted in the meanwhile 

193 logger.error( 

194 "The job with id '%d' could not be found in the database!", 

195 job_id, 

196 ) 

197 self.unlock() 

198 return 

199 

200 job = jobs[0] 

201 job.finish(result, array_id) 

202 

203 self.session.commit() 

204 

205 # This might not be working properly, so use with care! 

206 if job.stop_on_failure and job.status == "failure": 

207 # the job has failed 

208 # stop this and all dependent jobs from execution 

209 dependent_jobs = job.get_jobs_waiting_for_us() 

210 dependent_job_ids = set( 

211 [dep.unique for dep in dependent_jobs] + [job.unique] 

212 ) 

213 while len(dependent_jobs): 

214 dep = dependent_jobs.pop(0) 

215 new = dep.get_jobs_waiting_for_us() 

216 dependent_jobs += new 

217 dependent_job_ids.update([dep.unique for dep in new]) 

218 

219 self.unlock() 

220 deps = sorted(list(dependent_job_ids)) 

221 self.stop_jobs(deps) 

222 logger.warn( 

223 "Stopped dependent jobs '%s' since this job failed.", 

224 str(deps), 

225 ) 

226 

227 except Exception as e: 

228 logger.error("Caught exception '%s'", e) 

229 pass 

230 finally: 

231 if hasattr(self, "session"): 

232 self.unlock() 

233 

234 def list( 

235 self, 

236 job_ids, 

237 print_array_jobs=False, 

238 print_dependencies=False, 

239 long=False, 

240 print_times=False, 

241 status=Status, 

242 names=None, 

243 ids_only=False, 

244 ): 

245 """Lists the jobs currently added to the database.""" 

246 # configuration for jobs 

247 fields = ("job-id", "grid-id", "queue", "status", "job-name") 

248 lengths = (6, 17, 11, 12, 16) 

249 dependency_length = 0 

250 

251 if print_dependencies: 

252 fields += ("dependencies",) 

253 lengths += (25,) 

254 dependency_length = lengths[-1] 

255 

256 if long: 

257 fields += ("submitted command",) 

258 lengths += (43,) 

259 

260 format = "{:^%d} " * len(lengths) 

261 format = format % lengths 

262 

263 # if ids_only: 

264 # self.lock() 

265 # for job in self.get_jobs(): 

266 # print(job.unique, end=" ") 

267 # self.unlock() 

268 # return 

269 

270 array_format = "{0:^%d} {1:>%d} {2:^%d} {3:^%d}" % lengths[:4] 

271 delimiter = format.format(*["=" * k for k in lengths]) 

272 array_delimiter = array_format.format(*["-" * k for k in lengths[:4]]) 

273 header = [fields[k].center(lengths[k]) for k in range(len(lengths))] 

274 

275 # print header 

276 if not ids_only: 

277 print(" ".join(header)) 

278 print(delimiter) 

279 

280 self.lock() 

281 for job in self.get_jobs(job_ids): 

282 job.refresh() 

283 if job.status in status and (names is None or job.name in names): 

284 if ids_only: 

285 print(job.unique, end=" ") 

286 else: 

287 print(job.format(format, dependency_length)) 

288 if print_times: 

289 print(times(job)) 

290 

291 if (not ids_only) and print_array_jobs and job.array: 

292 print(array_delimiter) 

293 for array_job in job.array: 

294 if array_job.status in status: 

295 print(array_job.format(array_format)) 

296 if print_times: 

297 print(times(array_job)) 

298 print(array_delimiter) 

299 

300 self.unlock() 

301 

302 def report( 

303 self, 

304 job_ids=None, 

305 array_ids=None, 

306 output=True, 

307 error=True, 

308 status=Status, 

309 name=None, 

310 ): 

311 """Iterates through the output and error files and write the results to 

312 command line.""" 

313 

314 def _write_contents(job): 

315 # Writes the contents of the output and error files to command line 

316 out_file, err_file = job.std_out_file(), job.std_err_file() 

317 logger.info("Contents of output file: '%s'" % out_file) 

318 if ( 

319 output 

320 and out_file is not None 

321 and os.path.exists(out_file) 

322 and os.stat(out_file).st_size > 0 

323 ): 

324 print(open(out_file).read().rstrip()) 

325 print("-" * 20) 

326 if ( 

327 error 

328 and err_file is not None 

329 and os.path.exists(err_file) 

330 and os.stat(err_file).st_size > 0 

331 ): 

332 logger.info("Contents of error file: '%s'" % err_file) 

333 print(open(err_file).read().rstrip()) 

334 print("-" * 40) 

335 

336 def _write_array_jobs(array_jobs): 

337 for array_job in array_jobs: 

338 print( 

339 "Array Job", 

340 str(array_job.id), 

341 ( 

342 "(%s) :" % array_job.machine_name 

343 if array_job.machine_name is not None 

344 else ":" 

345 ), 

346 ) 

347 _write_contents(array_job) 

348 

349 self.lock() 

350 

351 # check if an array job should be reported 

352 if array_ids: 

353 if len(job_ids) != 1: 

354 logger.error( 

355 "If array ids are specified exactly one job id must be given." 

356 ) 

357 array_jobs = list( 

358 self.session.query(ArrayJob) 

359 .join(Job) 

360 .filter(Job.unique.in_(job_ids)) 

361 .filter(Job.unique == ArrayJob.job_id) 

362 .filter(ArrayJob.id.in_(array_ids)) 

363 ) 

364 if array_jobs: 

365 print(array_jobs[0].job) 

366 _write_array_jobs(array_jobs) 

367 

368 else: 

369 # iterate over all jobs 

370 jobs = self.get_jobs(job_ids) 

371 for job in jobs: 

372 if name is not None and job.name != name: 

373 continue 

374 if job.status not in status: 

375 continue 

376 if job.array: 

377 print(job) 

378 _write_array_jobs(job.array) 

379 else: 

380 print(job) 

381 _write_contents(job) 

382 if job.log_dir is not None: 

383 print("-" * 60) 

384 

385 self.unlock() 

386 

387 def delete_logs(self, job): 

388 out_file, err_file = job.std_out_file(), job.std_err_file() 

389 if out_file and os.path.exists(out_file): 

390 os.remove(out_file) 

391 logger.debug("Removed output log file '%s'" % out_file) 

392 if err_file and os.path.exists(err_file): 

393 os.remove(err_file) 

394 logger.debug("Removed error log file '%s'" % err_file) 

395 

396 def delete( 

397 self, 

398 job_ids, 

399 array_ids=None, 

400 delete_logs=True, 

401 delete_log_dir=False, 

402 status=Status, 

403 delete_jobs=True, 

404 ): 

405 """Deletes the jobs with the given ids from the database.""" 

406 

407 def _delete_dir_if_empty(log_dir): 

408 if ( 

409 log_dir 

410 and delete_log_dir 

411 and os.path.isdir(log_dir) 

412 and not os.listdir(log_dir) 

413 ): 

414 os.rmdir(log_dir) 

415 logger.info("Removed empty log directory '%s'" % log_dir) 

416 

417 def _delete(job, try_to_delete_dir=False): 

418 # delete the job from the database 

419 if delete_logs: 

420 self.delete_logs(job) 

421 if try_to_delete_dir: 

422 _delete_dir_if_empty(job.log_dir) 

423 if delete_jobs: 

424 self.session.delete(job) 

425 

426 self.lock() 

427 

428 # check if array ids are specified 

429 if array_ids: 

430 if len(job_ids) != 1: 

431 logger.error( 

432 "If array ids are specified exactly one job id must be given." 

433 ) 

434 array_jobs = list( 

435 self.session.query(ArrayJob) 

436 .join(Job) 

437 .filter(Job.unique.in_(job_ids)) 

438 .filter(Job.unique == ArrayJob.job_id) 

439 .filter(ArrayJob.id.in_(array_ids)) 

440 ) 

441 if array_jobs: 

442 job = array_jobs[0].job 

443 for array_job in array_jobs: 

444 if array_job.status in status: 

445 if delete_jobs: 

446 logger.debug( 

447 "Deleting array job '%d' of job '%d' from the database." 

448 % (array_job.id, job.unique) 

449 ) 

450 _delete(array_job) 

451 if not job.array: 

452 if job.status in status: 

453 if delete_jobs: 

454 logger.info( 

455 "Deleting job '%d' from the database." 

456 % job.unique 

457 ) 

458 _delete(job, delete_jobs) 

459 

460 else: 

461 # iterate over all jobs 

462 jobs = self.get_jobs(job_ids) 

463 for job in jobs: 

464 # delete all array jobs 

465 if job.array: 

466 for array_job in job.array: 

467 if array_job.status in status: 

468 if delete_jobs: 

469 logger.debug( 

470 "Deleting array job '%d' of job '%d' from the database." 

471 % (array_job.id, job.unique) 

472 ) 

473 _delete(array_job) 

474 # delete this job 

475 if job.status in status: 

476 if delete_jobs: 

477 logger.info( 

478 "Deleting job '%d' from the database." % job.unique 

479 ) 

480 _delete(job, delete_jobs) 

481 

482 self.session.commit() 

483 self.unlock()