Coverage for src/gridtk/models.py: 87%

275 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5import logging 

6import os 

7 

8from datetime import datetime 

9from pickle import dumps, loads 

10 

11from sqlalchemy import ( 

12 Boolean, 

13 Column, 

14 DateTime, 

15 Enum, 

16 ForeignKey, 

17 Integer, 

18 String, 

19) 

20from sqlalchemy.orm import declarative_base, relationship 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class Base: 

26 __allow_unmapped__ = True 

27 

28 

29Base = declarative_base(cls=Base) # type: ignore 

30 

31Status = ("submitted", "queued", "waiting", "executing", "success", "failure") 

32 

33 

34class ArrayJob(Base): 

35 """This class defines one element of an array job.""" 

36 

37 __tablename__ = "ArrayJob" 

38 

39 unique = Column(Integer, primary_key=True) 

40 id = Column(Integer) 

41 job_id = Column(Integer, ForeignKey("Job.unique")) 

42 status = Column(Enum(*Status)) 

43 result = Column(Integer) 

44 machine_name = Column(String(10)) 

45 

46 submit_time = Column(DateTime) 

47 start_time = Column(DateTime) 

48 finish_time = Column(DateTime) 

49 

50 job = relationship("Job", backref="array", order_by=id) 

51 

52 def __init__(self, id, job_id): 

53 self.id = id 

54 self.job_id = job_id 

55 self.status = Status[0] 

56 self.result = None 

57 self.machine_name = None # will be set later, by the Job class 

58 

59 self.submit_time = datetime.now() 

60 self.start_time = None 

61 self.finish_time = None 

62 

63 def std_out_file(self): 

64 return ( 

65 self.job.std_out_file() + "." + str(self.id) 

66 if self.job.log_dir 

67 else None 

68 ) 

69 

70 def std_err_file(self): 

71 return ( 

72 self.job.std_err_file() + "." + str(self.id) 

73 if self.job.log_dir 

74 else None 

75 ) 

76 

77 def __str__(self): 

78 n = "<ArrayJob %d> of <Job %d>" % (self.id, self.job.id) 

79 if self.result is not None: 

80 r = "%s (%d)" % (self.status, self.result) 

81 else: 

82 r = "%s" % self.status 

83 return f"{n} : {r}" 

84 

85 def format(self, format): 

86 """Formats the current job into a nicer string to fit into a table.""" 

87 job_id = "%d - %d" % (self.job.id, self.id) 

88 queue = ( 

89 self.job.queue_name 

90 if self.machine_name is None 

91 else self.machine_name 

92 ) 

93 status = "%s" % self.status + ( 

94 " (%d)" % self.result if self.result is not None else "" 

95 ) 

96 

97 return format.format("", job_id, queue, status) 

98 

99 

100class Job(Base): 

101 """This class defines one Job that was submitted to the Job Manager.""" 

102 

103 __tablename__ = "Job" 

104 

105 unique = Column( 

106 Integer, primary_key=True 

107 ) # The unique ID of the job (not corresponding to the grid ID) 

108 command_line = Column( 

109 String(255) 

110 ) # The command line to execute, converted to one string 

111 name = Column(String(20)) # A hand-chosen name for the task 

112 queue_name = Column(String(20)) # The name of the queue 

113 machine_name = Column( 

114 String(10) 

115 ) # The name of the machine in which the job is run 

116 grid_arguments = Column( 

117 String(255) 

118 ) # The kwargs arguments for the job submission (e.g. in the grid) 

119 id = Column(Integer) # The ID of the job as given from the grid 

120 exec_dir = Column( 

121 String(255) 

122 ) # The directory in which the command should be executed 

123 log_dir = Column( 

124 String(255) 

125 ) # The directory where the log files will be put to 

126 array_string = Column( 

127 String(255) 

128 ) # The array string (only needed for re-submission) 

129 stop_on_failure = Column( 

130 Boolean 

131 ) # An indicator whether to stop depending jobs when this job finishes with an error 

132 

133 submit_time = Column(DateTime) 

134 start_time = Column(DateTime) 

135 finish_time = Column(DateTime) 

136 

137 status = Column(Enum(*Status)) 

138 result = Column(Integer) 

139 

140 def __init__( 

141 self, 

142 command_line, 

143 name=None, 

144 exec_dir=None, 

145 log_dir=None, 

146 array_string=None, 

147 queue_name="local", 

148 machine_name=None, 

149 stop_on_failure=False, 

150 **kwargs, 

151 ): 

152 """Constructs a Job object without an ID (needs to be set later).""" 

153 self.command_line = dumps(command_line) 

154 self.name = name 

155 self.queue_name = ( 

156 queue_name # will be set during the queue command later 

157 ) 

158 self.machine_name = ( 

159 machine_name # will be set during the execute command later 

160 ) 

161 self.grid_arguments = dumps(kwargs) 

162 self.exec_dir = exec_dir 

163 self.log_dir = log_dir 

164 self.stop_on_failure = stop_on_failure 

165 self.array_string = dumps(array_string) 

166 self.submit() 

167 

168 def submit(self, new_queue=None): 

169 """Sets the status of this job to 'submitted'.""" 

170 self.status = "submitted" 

171 self.result = None 

172 self.machine_name = None 

173 if new_queue is not None: 

174 self.queue_name = new_queue 

175 for array_job in self.array: 

176 array_job.status = "submitted" 

177 array_job.result = None 

178 array_job.machine_name = None 

179 self.submit_time = datetime.now() 

180 self.start_time = None 

181 self.finish_time = None 

182 

183 def queue(self, new_job_id=None, new_job_name=None, queue_name=None): 

184 """Sets the status of this job to 'queued' or 'waiting'.""" 

185 # update the job id (i.e., when the job is executed in the grid) 

186 if new_job_id is not None: 

187 self.id = new_job_id 

188 

189 if new_job_name is not None: 

190 self.name = new_job_name 

191 

192 if queue_name is not None: 

193 self.queue_name = queue_name 

194 

195 new_status = "queued" 

196 self.result = None 

197 # check if we have to wait for another job to finish 

198 for job in self.get_jobs_we_wait_for(): 

199 if job.status not in ("success", "failure"): 

200 new_status = "waiting" 

201 elif self.stop_on_failure and job.status == "failure": 

202 new_status = "failure" 

203 

204 # reset the queued jobs that depend on us to waiting status 

205 for job in self.get_jobs_waiting_for_us(): 

206 if job.status == "queued": 

207 job.status = "failure" if new_status == "failure" else "waiting" 

208 

209 self.status = new_status 

210 for array_job in self.array: 

211 if array_job.status not in ("success", "failure"): 

212 array_job.status = new_status 

213 

214 def execute(self, array_id=None, machine_name=None): 

215 """Sets the status of this job to 'executing'.""" 

216 self.status = "executing" 

217 if array_id is not None: 

218 for array_job in self.array: 

219 if array_job.id == array_id: 

220 array_job.status = "executing" 

221 if machine_name is not None: 

222 array_job.machine_name = machine_name 

223 array_job.start_time = datetime.now() 

224 elif machine_name is not None: 

225 self.machine_name = machine_name 

226 if self.start_time is None: 

227 self.start_time = datetime.now() 

228 

229 # sometimes, the 'finish' command did not work for array jobs, 

230 # so check if any old job still has the 'executing' flag set 

231 for job in self.get_jobs_we_wait_for(): 

232 if job.array and job.status == "executing": 

233 job.finish(0, -1) 

234 

235 def finish(self, result, array_id=None): 

236 """Sets the status of this job to 'success' or 'failure'.""" 

237 # check if there is any array job still running 

238 new_status = "success" if result == 0 else "failure" 

239 new_result = result 

240 finished = True 

241 if array_id is not None: 

242 for array_job in self.array: 

243 if array_job.id == array_id: 

244 array_job.status = new_status 

245 array_job.result = result 

246 array_job.finish_time = datetime.now() 

247 if array_job.status not in ("success", "failure"): 

248 finished = False 

249 elif new_result == 0: 

250 new_result = array_job.result 

251 

252 if finished: 

253 # There was no array job, or all array jobs finished 

254 self.status = "success" if new_result == 0 else "failure" 

255 self.result = new_result 

256 self.finish_time = datetime.now() 

257 

258 # update all waiting jobs 

259 for job in self.get_jobs_waiting_for_us(): 

260 if job.status == "waiting": 

261 job.queue() 

262 

263 def refresh(self): 

264 """Refreshes the status information.""" 

265 if self.status == "executing" and self.array: 

266 new_result = 0 

267 for array_job in self.array: 

268 if array_job.status == "failure" and new_result is not None: 

269 new_result = array_job.result 

270 elif array_job.status not in ("success", "failure"): 

271 new_result = None 

272 if new_result is not None: 

273 self.status = "success" if new_result == 0 else "failure" 

274 self.result = new_result 

275 

276 def get_command_line(self): 

277 """Returns the command line for the job.""" 

278 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

279 # In python 3, the command line is bytes, which can be pickled directly 

280 return ( 

281 loads(self.command_line) 

282 if isinstance(self.command_line, bytes) 

283 else loads(self.command_line.encode()) 

284 ) 

285 

286 def set_command_line(self, command_line): 

287 """Sets / overwrites the command line for the job.""" 

288 self.command_line = dumps(command_line) 

289 

290 def get_exec_dir(self): 

291 """Returns the command line for the job.""" 

292 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

293 # In python 3, the command line is bytes, which can be pickled directly 

294 return ( 

295 str(os.path.realpath(self.exec_dir)) 

296 if self.exec_dir is not None 

297 else None 

298 ) 

299 

300 def get_array(self): 

301 """Returns the array arguments for the job; usually a string.""" 

302 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

303 # In python 3, the command line is bytes, which can be pickled directly 

304 return ( 

305 loads(self.array_string) 

306 if isinstance(self.array_string, bytes) 

307 else loads(self.array_string.encode()) 

308 ) 

309 

310 def get_arguments(self): 

311 """Returns the additional options for the grid (such as the queue, 

312 memory requirements, ...).""" 

313 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

314 # In python 3, the command line is bytes, which can be pickled directly 

315 args = ( 

316 loads(self.grid_arguments)["kwargs"] 

317 if isinstance(self.grid_arguments, bytes) 

318 else loads(self.grid_arguments.encode())["kwargs"] 

319 ) 

320 # in any case, the commands have to be converted to str 

321 retval = {} 

322 if "pe_opt" in args: 

323 retval["pe_opt"] = args["pe_opt"] 

324 if "memfree" in args and args["memfree"] is not None: 

325 retval["memfree"] = args["memfree"] 

326 if "hvmem" in args and args["hvmem"] is not None: 

327 retval["hvmem"] = args["hvmem"] 

328 if "gpumem" in args and args["gpumem"] is not None: 

329 retval["gpumem"] = args["gpumem"] 

330 if "env" in args and len(args["env"]) > 0: 

331 retval["env"] = args["env"] 

332 if "io_big" in args and args["io_big"]: 

333 retval["io_big"] = True 

334 if "sge_extra_args" in args: 

335 retval["sge_extra_args"] = args["sge_extra_args"] 

336 

337 # also add the queue 

338 if self.queue_name is not None: 

339 retval["queue"] = str(self.queue_name) 

340 

341 return retval 

342 

343 def set_arguments(self, **kwargs): 

344 self.grid_arguments = dumps(kwargs) 

345 

346 def get_jobs_we_wait_for(self): 

347 return [ 

348 j.waited_for_job 

349 for j in self.jobs_we_have_to_wait_for 

350 if j.waited_for_job is not None 

351 ] 

352 

353 def get_jobs_waiting_for_us(self): 

354 return [ 

355 j.waiting_job 

356 for j in self.jobs_that_wait_for_us 

357 if j.waiting_job is not None 

358 ] 

359 

360 def std_out_file(self, array_id=None): 

361 return ( 

362 os.path.join( 

363 self.log_dir, 

364 (self.name if self.name else "job") + ".o" + str(self.id), 

365 ) 

366 if self.log_dir 

367 else None 

368 ) 

369 

370 def std_err_file(self, array_id=None): 

371 return ( 

372 os.path.join( 

373 self.log_dir, 

374 (self.name if self.name else "job") + ".e" + str(self.id), 

375 ) 

376 if self.log_dir 

377 else None 

378 ) 

379 

380 def _cmdline(self): 

381 cmdline = self.get_command_line() 

382 c = "" 

383 for cmd in cmdline: 

384 if cmd[0] == "-": 

385 c += "%s " % cmd 

386 else: 

387 c += "'%s' " % cmd 

388 return c 

389 

390 def __str__(self): 

391 id = "%d (%d)" % (self.unique, self.id) 

392 if self.machine_name: 

393 m = f"{self.queue_name} - {self.machine_name}" 

394 else: 

395 m = self.queue_name 

396 if self.array: 

397 a = "[%d-%d:%d]" % self.get_array() 

398 else: 

399 a = "" 

400 if self.name is not None: 

401 n = f"<Job: {id} {a} - '{self.name}'>" 

402 else: 

403 n = "<Job: %s>" % id 

404 if self.result is not None: 

405 r = "%s (%d)" % (self.status, self.result) 

406 else: 

407 r = "%s" % self.status 

408 return f"{n} | {m} : {r} -- {self._cmdline()}" 

409 

410 def format(self, format, dependencies=0, limit_command_line=None): 

411 """Formats the current job into a nicer string to fit into a table.""" 

412 command_line = self._cmdline() 

413 if ( 

414 limit_command_line is not None 

415 and len(command_line) > limit_command_line 

416 ): 

417 command_line = command_line[: limit_command_line - 3] + "..." 

418 

419 job_id = "%d" % self.id + ( 

420 " [%d-%d:%d]" % self.get_array() if self.array else "" 

421 ) 

422 status = "%s" % self.status + ( 

423 " (%d)" % self.result if self.result is not None else "" 

424 ) 

425 queue = ( 

426 self.queue_name if self.machine_name is None else self.machine_name 

427 ) 

428 if limit_command_line is None: 

429 grid_opt = self.get_arguments() 

430 if grid_opt: 

431 # add additional information about the job at the end 

432 command_line = ( 

433 "<" 

434 + ",".join( 

435 [f"{key}={value}" for key, value in grid_opt.items()] 

436 ) 

437 + ">: " 

438 + command_line 

439 ) 

440 if self.exec_dir is not None: 

441 command_line += ( 

442 "; [Executed in directory: '%s']" % self.exec_dir 

443 ) 

444 

445 if dependencies: 

446 deps = str( 

447 sorted( 

448 list({dep.unique for dep in self.get_jobs_we_wait_for()}) 

449 ) 

450 ) 

451 if dependencies < len(deps): 

452 deps = deps[: dependencies - 3] + "..." 

453 return format.format( 

454 self.unique, 

455 job_id, 

456 queue[:12], 

457 status, 

458 str(self.name), 

459 deps, 

460 command_line, 

461 ) 

462 else: 

463 return format.format( 

464 self.unique, 

465 job_id, 

466 queue[:12], 

467 status, 

468 str(self.name), 

469 command_line, 

470 ) 

471 

472 

473class JobDependence(Base): 

474 """This table defines a many-to-many relationship between Jobs.""" 

475 

476 __tablename__ = "JobDependence" 

477 id = Column(Integer, primary_key=True) 

478 waiting_job_id = Column( 

479 Integer, ForeignKey("Job.unique") 

480 ) # The ID of the waiting job 

481 waited_for_job_id = Column( 

482 Integer, ForeignKey("Job.unique") 

483 ) # The ID of the job to wait for 

484 

485 # This is twisted: The 'jobs_we_have_to_wait_for' field in the Job class needs to be joined with the waiting job id, so that jobs_we_have_to_wait_for.waiting_job is correct 

486 # Honestly, I am lost but it seems to work... 

487 waiting_job = relationship( 

488 "Job", 

489 backref="jobs_we_have_to_wait_for", 

490 primaryjoin=(Job.unique == waiting_job_id), 

491 order_by=id, 

492 ) # The job that is waited for 

493 waited_for_job = relationship( 

494 "Job", 

495 backref="jobs_that_wait_for_us", 

496 primaryjoin=(Job.unique == waited_for_job_id), 

497 order_by=id, 

498 ) # The job that waits 

499 

500 def __init__(self, waiting_job_id, waited_for_job_id): 

501 self.waiting_job_id = waiting_job_id 

502 self.waited_for_job_id = waited_for_job_id 

503 

504 

505def add_job( 

506 session, 

507 command_line, 

508 name="job", 

509 dependencies=[], 

510 array=None, 

511 exec_dir=None, 

512 log_dir=None, 

513 stop_on_failure=False, 

514 **kwargs, 

515): 

516 """Helper function to create a job, add the dependencies and the array 

517 jobs.""" 

518 job = Job( 

519 command_line=command_line, 

520 name=name, 

521 exec_dir=exec_dir, 

522 log_dir=log_dir, 

523 array_string=array, 

524 stop_on_failure=stop_on_failure, 

525 kwargs=kwargs, 

526 ) 

527 

528 session.add(job) 

529 session.flush() 

530 session.refresh(job) 

531 

532 # by default id and unique id are identical, but the id might be overwritten later on 

533 job.id = job.unique 

534 

535 for d in dependencies: 

536 if d == job.unique: 

537 logger.warn("Adding self-dependency of job %d is not allowed" % d) 

538 continue 

539 depending = list(session.query(Job).filter(Job.unique == d)) 

540 if len(depending): 

541 session.add(JobDependence(job.unique, depending[0].unique)) 

542 else: 

543 logger.warn( 

544 "Could not find dependent job with id %d in database" % d 

545 ) 

546 

547 if array: 

548 (start, stop, step) = array 

549 # add array jobs 

550 for i in range(start, stop + 1, step): 

551 session.add(ArrayJob(i, job.unique)) 

552 

553 session.commit() 

554 

555 return job 

556 

557 

558def times(job): 

559 """Returns a string containing timing information for teh given job, which 

560 might be a :py:class:`Job` or an :py:class:`ArrayJob`.""" 

561 timing = "Submitted: %s" % job.submit_time.ctime() 

562 if job.start_time is not None: 

563 timing += "\nStarted : {} \t Job waited : {}".format( 

564 job.start_time.ctime(), 

565 job.start_time - job.submit_time, 

566 ) 

567 if job.finish_time is not None: 

568 timing += "\nFinished : {} \t Job executed: {}".format( 

569 job.finish_time.ctime(), 

570 job.finish_time - job.start_time, 

571 ) 

572 return timing