Coverage for src/gridtk/manager.py: 82%

250 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5import logging 

6import os 

7import socket # to get the host name 

8import subprocess 

9 

10from shutil import which 

11 

12import sqlalchemy 

13 

14from .models import ArrayJob, Base, Job, Status, times 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class JobManager: 

20 """This job manager defines the basic interface for handling jobs in the 

21 SQL database.""" 

22 

23 def __init__(self, database="submitted.sql3", wrapper_script=None, debug=False): 

24 self._database = os.path.realpath(database) 

25 self._engine = sqlalchemy.create_engine( 

26 "sqlite:///" + self._database, 

27 connect_args={"timeout": 600}, 

28 echo=debug, 

29 ) 

30 self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine) 

31 

32 # store the command that this job manager was called with 

33 if wrapper_script is None: 

34 wrapper_script = "jman" 

35 if not os.path.exists(wrapper_script): 

36 bindir = os.path.join(os.path.realpath(os.curdir), "bin") 

37 wrapper_script = which( 

38 wrapper_script, 

39 path=os.pathsep.join((bindir, os.environ["PATH"])), 

40 ) 

41 

42 if wrapper_script is None: 

43 raise OSError( 

44 "Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager." 

45 ) 

46 if not os.path.exists(wrapper_script): 

47 raise OSError( 

48 "Your wrapper_script cannot be found. Jobs will not be executable." 

49 ) 

50 self.wrapper_script = wrapper_script 

51 

52 def __del__(self): 

53 # remove the database if it is empty 

54 if os.path.isfile(self._database): 

55 # in errornous cases, the session might still be active, so don't create a deadlock here! 

56 if not hasattr(self, "session"): 

57 self.lock() 

58 job_count = len(self.get_jobs()) 

59 self.unlock() 

60 if not job_count: 

61 logger.debug( 

62 "Removed database file '%s' since database is empty" 

63 % self._database 

64 ) 

65 os.remove(self._database) 

66 

67 def lock(self): 

68 """Generates (and returns) a blocking session object to the 

69 database.""" 

70 if hasattr(self, "session"): 

71 raise RuntimeError( 

72 "Dead lock detected. Please do not try to lock the session when it is already locked!" 

73 ) 

74 

75 # create the database if it does not exist yet 

76 if not os.path.exists(self._database): 

77 self._create() 

78 

79 # now, create a session 

80 self.session = self._session_maker() 

81 logger.debug("Created new database session to '%s'" % self._database) 

82 return self.session 

83 

84 def unlock(self): 

85 """Closes the session to the database.""" 

86 if not hasattr(self, "session"): 

87 raise RuntimeError( 

88 "Error detected! The session that you want to close does not exist any more!" 

89 ) 

90 logger.debug("Closed database session of '%s'" % self._database) 

91 self.session.close() 

92 del self.session 

93 

94 def _create(self): 

95 """Creates a new and empty database.""" 

96 # create directory for sql database 

97 os.makedirs(os.path.dirname(self._database), exist_ok=True) 

98 

99 # create all the tables 

100 Base.metadata.create_all(self._engine) 

101 logger.debug("Created new empty database '%s'" % self._database) 

102 

103 def get_jobs(self, job_ids=None): 

104 """Returns a list of jobs that are stored in the database.""" 

105 if job_ids is not None and len(job_ids) == 0: 

106 return [] 

107 q = self.session.query(Job) 

108 if job_ids is not None: 

109 q = q.filter(Job.unique.in_(job_ids)) 

110 return sorted(list(q), key=lambda job: job.unique) 

111 

112 def _job_and_array(self, job_id, array_id=None): 

113 # get the job (and the array job) with the given id(s) 

114 job = self.get_jobs((job_id,)) 

115 if len(job) > 1: 

116 logger.error( 

117 "%d jobs with the same ID '%d' were detected in the database" 

118 % (len(job), job_id) 

119 ) 

120 elif not len(job): 

121 logger.error("Job with ID '%d' was not found in the database." % job_id) 

122 return (None, None) 

123 

124 job = job[0] 

125 unique_id = job.unique 

126 

127 if array_id is not None: 

128 array_job = list( 

129 self.session.query(ArrayJob) 

130 .filter(ArrayJob.job_id == unique_id) 

131 .filter(ArrayJob.id == array_id) 

132 ) 

133 assert len(array_job) == 1 

134 return (job, array_job[0]) 

135 else: 

136 return (job, None) 

137 

138 def run_job(self, job_id, array_id=None): 

139 """This function is called to run a job (e.g. in the grid) with the 

140 given id and the given array index if applicable.""" 

141 # set the job's status in the database 

142 try: 

143 # get the job from the database 

144 self.lock() 

145 jobs = self.get_jobs((job_id,)) 

146 if not len(jobs): 

147 # it seems that the job has been deleted in the meanwhile 

148 return 

149 job = jobs[0] 

150 

151 # get the machine name we are executing on; this might only work at idiap 

152 machine_name = socket.gethostname() 

153 

154 # set the 'executing' status to the job 

155 job.execute(array_id, machine_name) 

156 

157 self.session.commit() 

158 except Exception as e: 

159 logger.error("Caught exception '%s'", e) 

160 pass 

161 finally: 

162 self.unlock() 

163 

164 # get the command line of the job from the database; does not need write access 

165 self.lock() 

166 job = self.get_jobs((job_id,))[0] 

167 command_line = job.get_command_line() 

168 exec_dir = job.get_exec_dir() 

169 self.unlock() 

170 

171 logger.info("Starting job %d: %s", job_id, " ".join(command_line)) 

172 

173 # execute the command line of the job, and wait until it has finished 

174 try: 

175 result = subprocess.call(command_line, cwd=exec_dir) 

176 logger.info("Job %d finished with result %s", job_id, str(result)) 

177 except Exception as e: 

178 logger.error("The job with id '%d' could not be executed: %s", job_id, e) 

179 result = 69 # ASCII: 'E' 

180 

181 # set a new status and the results of the job 

182 try: 

183 self.lock() 

184 jobs = self.get_jobs((job_id,)) 

185 if not len(jobs): 

186 # it seems that the job has been deleted in the meanwhile 

187 logger.error( 

188 "The job with id '%d' could not be found in the database!", 

189 job_id, 

190 ) 

191 self.unlock() 

192 return 

193 

194 job = jobs[0] 

195 job.finish(result, array_id) 

196 

197 self.session.commit() 

198 

199 # This might not be working properly, so use with care! 

200 if job.stop_on_failure and job.status == "failure": 

201 # the job has failed 

202 # stop this and all dependent jobs from execution 

203 dependent_jobs = job.get_jobs_waiting_for_us() 

204 dependent_job_ids = set( 

205 [dep.unique for dep in dependent_jobs] + [job.unique] 

206 ) 

207 while len(dependent_jobs): 

208 dep = dependent_jobs.pop(0) 

209 new = dep.get_jobs_waiting_for_us() 

210 dependent_jobs += new 

211 dependent_job_ids.update([dep.unique for dep in new]) 

212 

213 self.unlock() 

214 deps = sorted(list(dependent_job_ids)) 

215 self.stop_jobs(deps) 

216 logger.warn( 

217 "Stopped dependent jobs '%s' since this job failed.", 

218 str(deps), 

219 ) 

220 

221 except Exception as e: 

222 logger.error("Caught exception '%s'", e) 

223 pass 

224 finally: 

225 if hasattr(self, "session"): 

226 self.unlock() 

227 

228 def list( 

229 self, 

230 job_ids, 

231 print_array_jobs=False, 

232 print_dependencies=False, 

233 long=False, 

234 print_times=False, 

235 status=Status, 

236 names=None, 

237 ids_only=False, 

238 ): 

239 """Lists the jobs currently added to the database.""" 

240 # configuration for jobs 

241 fields = ("job-id", "grid-id", "queue", "status", "job-name") 

242 lengths = (6, 17, 11, 12, 16) 

243 dependency_length = 0 

244 

245 if print_dependencies: 

246 fields += ("dependencies",) 

247 lengths += (25,) 

248 dependency_length = lengths[-1] 

249 

250 if long: 

251 fields += ("submitted command",) 

252 lengths += (43,) 

253 

254 format = "{:^%d} " * len(lengths) 

255 format = format % lengths 

256 

257 # if ids_only: 

258 # self.lock() 

259 # for job in self.get_jobs(): 

260 # print(job.unique, end=" ") 

261 # self.unlock() 

262 # return 

263 

264 array_format = "{0:^%d} {1:>%d} {2:^%d} {3:^%d}" % lengths[:4] 

265 delimiter = format.format(*["=" * k for k in lengths]) 

266 array_delimiter = array_format.format(*["-" * k for k in lengths[:4]]) 

267 header = [fields[k].center(lengths[k]) for k in range(len(lengths))] 

268 

269 # print header 

270 if not ids_only: 

271 print(" ".join(header)) 

272 print(delimiter) 

273 

274 self.lock() 

275 for job in self.get_jobs(job_ids): 

276 job.refresh() 

277 if job.status in status and (names is None or job.name in names): 

278 if ids_only: 

279 print(job.unique, end=" ") 

280 else: 

281 print(job.format(format, dependency_length)) 

282 if print_times: 

283 print(times(job)) 

284 

285 if (not ids_only) and print_array_jobs and job.array: 

286 print(array_delimiter) 

287 for array_job in job.array: 

288 if array_job.status in status: 

289 print(array_job.format(array_format)) 

290 if print_times: 

291 print(times(array_job)) 

292 print(array_delimiter) 

293 

294 self.unlock() 

295 

296 def report( 

297 self, 

298 job_ids=None, 

299 array_ids=None, 

300 output=True, 

301 error=True, 

302 status=Status, 

303 name=None, 

304 ): 

305 """Iterates through the output and error files and write the results to 

306 command line.""" 

307 

308 def _write_contents(job): 

309 # Writes the contents of the output and error files to command line 

310 out_file, err_file = job.std_out_file(), job.std_err_file() 

311 logger.info("Contents of output file: '%s'" % out_file) 

312 if ( 

313 output 

314 and out_file is not None 

315 and os.path.exists(out_file) 

316 and os.stat(out_file).st_size > 0 

317 ): 

318 print(open(out_file).read().rstrip()) 

319 print("-" * 20) 

320 if ( 

321 error 

322 and err_file is not None 

323 and os.path.exists(err_file) 

324 and os.stat(err_file).st_size > 0 

325 ): 

326 logger.info("Contents of error file: '%s'" % err_file) 

327 print(open(err_file).read().rstrip()) 

328 print("-" * 40) 

329 

330 def _write_array_jobs(array_jobs): 

331 for array_job in array_jobs: 

332 print( 

333 "Array Job", 

334 str(array_job.id), 

335 ( 

336 "(%s) :" % array_job.machine_name 

337 if array_job.machine_name is not None 

338 else ":" 

339 ), 

340 ) 

341 _write_contents(array_job) 

342 

343 self.lock() 

344 

345 # check if an array job should be reported 

346 if array_ids: 

347 if len(job_ids) != 1: 

348 logger.error( 

349 "If array ids are specified exactly one job id must be given." 

350 ) 

351 array_jobs = list( 

352 self.session.query(ArrayJob) 

353 .join(Job) 

354 .filter(Job.unique.in_(job_ids)) 

355 .filter(Job.unique == ArrayJob.job_id) 

356 .filter(ArrayJob.id.in_(array_ids)) 

357 ) 

358 if array_jobs: 

359 print(array_jobs[0].job) 

360 _write_array_jobs(array_jobs) 

361 

362 else: 

363 # iterate over all jobs 

364 jobs = self.get_jobs(job_ids) 

365 for job in jobs: 

366 if name is not None and job.name != name: 

367 continue 

368 if job.status not in status: 

369 continue 

370 if job.array: 

371 print(job) 

372 _write_array_jobs(job.array) 

373 else: 

374 print(job) 

375 _write_contents(job) 

376 if job.log_dir is not None: 

377 print("-" * 60) 

378 

379 self.unlock() 

380 

381 def delete_logs(self, job): 

382 out_file, err_file = job.std_out_file(), job.std_err_file() 

383 if out_file and os.path.exists(out_file): 

384 os.remove(out_file) 

385 logger.debug("Removed output log file '%s'" % out_file) 

386 if err_file and os.path.exists(err_file): 

387 os.remove(err_file) 

388 logger.debug("Removed error log file '%s'" % err_file) 

389 

390 def delete( 

391 self, 

392 job_ids, 

393 array_ids=None, 

394 delete_logs=True, 

395 delete_log_dir=False, 

396 status=Status, 

397 delete_jobs=True, 

398 ): 

399 """Deletes the jobs with the given ids from the database.""" 

400 

401 def _delete_dir_if_empty(log_dir): 

402 if ( 

403 log_dir 

404 and delete_log_dir 

405 and os.path.isdir(log_dir) 

406 and not os.listdir(log_dir) 

407 ): 

408 os.rmdir(log_dir) 

409 logger.info("Removed empty log directory '%s'" % log_dir) 

410 

411 def _delete(job, try_to_delete_dir=False): 

412 # delete the job from the database 

413 if delete_logs: 

414 self.delete_logs(job) 

415 if try_to_delete_dir: 

416 _delete_dir_if_empty(job.log_dir) 

417 if delete_jobs: 

418 self.session.delete(job) 

419 

420 self.lock() 

421 

422 # check if array ids are specified 

423 if array_ids: 

424 if len(job_ids) != 1: 

425 logger.error( 

426 "If array ids are specified exactly one job id must be given." 

427 ) 

428 array_jobs = list( 

429 self.session.query(ArrayJob) 

430 .join(Job) 

431 .filter(Job.unique.in_(job_ids)) 

432 .filter(Job.unique == ArrayJob.job_id) 

433 .filter(ArrayJob.id.in_(array_ids)) 

434 ) 

435 if array_jobs: 

436 job = array_jobs[0].job 

437 for array_job in array_jobs: 

438 if array_job.status in status: 

439 if delete_jobs: 

440 logger.debug( 

441 "Deleting array job '%d' of job '%d' from the database." 

442 % (array_job.id, job.unique) 

443 ) 

444 _delete(array_job) 

445 if not job.array: 

446 if job.status in status: 

447 if delete_jobs: 

448 logger.info( 

449 "Deleting job '%d' from the database." % job.unique 

450 ) 

451 _delete(job, delete_jobs) 

452 

453 else: 

454 # iterate over all jobs 

455 jobs = self.get_jobs(job_ids) 

456 for job in jobs: 

457 # delete all array jobs 

458 if job.array: 

459 for array_job in job.array: 

460 if array_job.status in status: 

461 if delete_jobs: 

462 logger.debug( 

463 "Deleting array job '%d' of job '%d' from the database." 

464 % (array_job.id, job.unique) 

465 ) 

466 _delete(array_job) 

467 # delete this job 

468 if job.status in status: 

469 if delete_jobs: 

470 logger.info("Deleting job '%d' from the database." % job.unique) 

471 _delete(job, delete_jobs) 

472 

473 self.session.commit() 

474 self.unlock()