Coverage for src/gridtk/models.py: 87%

275 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4 

5import logging 

6import os 

7 

8from datetime import datetime 

9from pickle import dumps, loads 

10 

11from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, Integer, String 

12from sqlalchemy.orm import declarative_base, relationship 

13 

14logger = logging.getLogger(__name__) 

15 

16 

17class Base: 

18 __allow_unmapped__ = True 

19 

20 

21Base = declarative_base(cls=Base) # type: ignore 

22 

23Status = ("submitted", "queued", "waiting", "executing", "success", "failure") 

24 

25 

26class ArrayJob(Base): 

27 """This class defines one element of an array job.""" 

28 

29 __tablename__ = "ArrayJob" 

30 

31 unique = Column(Integer, primary_key=True) 

32 id = Column(Integer) 

33 job_id = Column(Integer, ForeignKey("Job.unique")) 

34 status = Column(Enum(*Status)) 

35 result = Column(Integer) 

36 machine_name = Column(String(10)) 

37 

38 submit_time = Column(DateTime) 

39 start_time = Column(DateTime) 

40 finish_time = Column(DateTime) 

41 

42 job = relationship("Job", backref="array", order_by=id) 

43 

44 def __init__(self, id, job_id): 

45 self.id = id 

46 self.job_id = job_id 

47 self.status = Status[0] 

48 self.result = None 

49 self.machine_name = None # will be set later, by the Job class 

50 

51 self.submit_time = datetime.now() 

52 self.start_time = None 

53 self.finish_time = None 

54 

55 def std_out_file(self): 

56 return ( 

57 self.job.std_out_file() + "." + str(self.id) if self.job.log_dir else None 

58 ) 

59 

60 def std_err_file(self): 

61 return ( 

62 self.job.std_err_file() + "." + str(self.id) if self.job.log_dir else None 

63 ) 

64 

65 def __str__(self): 

66 n = "<ArrayJob %d> of <Job %d>" % (self.id, self.job.id) 

67 if self.result is not None: 

68 r = "%s (%d)" % (self.status, self.result) 

69 else: 

70 r = "%s" % self.status 

71 return f"{n} : {r}" 

72 

73 def format(self, format): 

74 """Formats the current job into a nicer string to fit into a table.""" 

75 job_id = "%d - %d" % (self.job.id, self.id) 

76 queue = self.job.queue_name if self.machine_name is None else self.machine_name 

77 status = "%s" % self.status + ( 

78 " (%d)" % self.result if self.result is not None else "" 

79 ) 

80 

81 return format.format("", job_id, queue, status) 

82 

83 

84class Job(Base): 

85 """This class defines one Job that was submitted to the Job Manager.""" 

86 

87 __tablename__ = "Job" 

88 

89 unique = Column( 

90 Integer, primary_key=True 

91 ) # The unique ID of the job (not corresponding to the grid ID) 

92 command_line = Column( 

93 String(255) 

94 ) # The command line to execute, converted to one string 

95 name = Column(String(20)) # A hand-chosen name for the task 

96 queue_name = Column(String(20)) # The name of the queue 

97 machine_name = Column(String(10)) # The name of the machine in which the job is run 

98 grid_arguments = Column( 

99 String(255) 

100 ) # The kwargs arguments for the job submission (e.g. in the grid) 

101 id = Column(Integer) # The ID of the job as given from the grid 

102 exec_dir = Column( 

103 String(255) 

104 ) # The directory in which the command should be executed 

105 log_dir = Column(String(255)) # The directory where the log files will be put to 

106 array_string = Column( 

107 String(255) 

108 ) # The array string (only needed for re-submission) 

109 stop_on_failure = Column( 

110 Boolean 

111 ) # An indicator whether to stop depending jobs when this job finishes with an error 

112 

113 submit_time = Column(DateTime) 

114 start_time = Column(DateTime) 

115 finish_time = Column(DateTime) 

116 

117 status = Column(Enum(*Status)) 

118 result = Column(Integer) 

119 

120 def __init__( 

121 self, 

122 command_line, 

123 name=None, 

124 exec_dir=None, 

125 log_dir=None, 

126 array_string=None, 

127 queue_name="local", 

128 machine_name=None, 

129 stop_on_failure=False, 

130 **kwargs, 

131 ): 

132 """Constructs a Job object without an ID (needs to be set later).""" 

133 self.command_line = dumps(command_line) 

134 self.name = name 

135 self.queue_name = queue_name # will be set during the queue command later 

136 self.machine_name = machine_name # will be set during the execute command later 

137 self.grid_arguments = dumps(kwargs) 

138 self.exec_dir = exec_dir 

139 self.log_dir = log_dir 

140 self.stop_on_failure = stop_on_failure 

141 self.array_string = dumps(array_string) 

142 self.submit() 

143 

144 def submit(self, new_queue=None): 

145 """Sets the status of this job to 'submitted'.""" 

146 self.status = "submitted" 

147 self.result = None 

148 self.machine_name = None 

149 if new_queue is not None: 

150 self.queue_name = new_queue 

151 for array_job in self.array: 

152 array_job.status = "submitted" 

153 array_job.result = None 

154 array_job.machine_name = None 

155 self.submit_time = datetime.now() 

156 self.start_time = None 

157 self.finish_time = None 

158 

159 def queue(self, new_job_id=None, new_job_name=None, queue_name=None): 

160 """Sets the status of this job to 'queued' or 'waiting'.""" 

161 # update the job id (i.e., when the job is executed in the grid) 

162 if new_job_id is not None: 

163 self.id = new_job_id 

164 

165 if new_job_name is not None: 

166 self.name = new_job_name 

167 

168 if queue_name is not None: 

169 self.queue_name = queue_name 

170 

171 new_status = "queued" 

172 self.result = None 

173 # check if we have to wait for another job to finish 

174 for job in self.get_jobs_we_wait_for(): 

175 if job.status not in ("success", "failure"): 

176 new_status = "waiting" 

177 elif self.stop_on_failure and job.status == "failure": 

178 new_status = "failure" 

179 

180 # reset the queued jobs that depend on us to waiting status 

181 for job in self.get_jobs_waiting_for_us(): 

182 if job.status == "queued": 

183 job.status = "failure" if new_status == "failure" else "waiting" 

184 

185 self.status = new_status 

186 for array_job in self.array: 

187 if array_job.status not in ("success", "failure"): 

188 array_job.status = new_status 

189 

190 def execute(self, array_id=None, machine_name=None): 

191 """Sets the status of this job to 'executing'.""" 

192 self.status = "executing" 

193 if array_id is not None: 

194 for array_job in self.array: 

195 if array_job.id == array_id: 

196 array_job.status = "executing" 

197 if machine_name is not None: 

198 array_job.machine_name = machine_name 

199 array_job.start_time = datetime.now() 

200 elif machine_name is not None: 

201 self.machine_name = machine_name 

202 if self.start_time is None: 

203 self.start_time = datetime.now() 

204 

205 # sometimes, the 'finish' command did not work for array jobs, 

206 # so check if any old job still has the 'executing' flag set 

207 for job in self.get_jobs_we_wait_for(): 

208 if job.array and job.status == "executing": 

209 job.finish(0, -1) 

210 

211 def finish(self, result, array_id=None): 

212 """Sets the status of this job to 'success' or 'failure'.""" 

213 # check if there is any array job still running 

214 new_status = "success" if result == 0 else "failure" 

215 new_result = result 

216 finished = True 

217 if array_id is not None: 

218 for array_job in self.array: 

219 if array_job.id == array_id: 

220 array_job.status = new_status 

221 array_job.result = result 

222 array_job.finish_time = datetime.now() 

223 if array_job.status not in ("success", "failure"): 

224 finished = False 

225 elif new_result == 0: 

226 new_result = array_job.result 

227 

228 if finished: 

229 # There was no array job, or all array jobs finished 

230 self.status = "success" if new_result == 0 else "failure" 

231 self.result = new_result 

232 self.finish_time = datetime.now() 

233 

234 # update all waiting jobs 

235 for job in self.get_jobs_waiting_for_us(): 

236 if job.status == "waiting": 

237 job.queue() 

238 

239 def refresh(self): 

240 """Refreshes the status information.""" 

241 if self.status == "executing" and self.array: 

242 new_result = 0 

243 for array_job in self.array: 

244 if array_job.status == "failure" and new_result is not None: 

245 new_result = array_job.result 

246 elif array_job.status not in ("success", "failure"): 

247 new_result = None 

248 if new_result is not None: 

249 self.status = "success" if new_result == 0 else "failure" 

250 self.result = new_result 

251 

252 def get_command_line(self): 

253 """Returns the command line for the job.""" 

254 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

255 # In python 3, the command line is bytes, which can be pickled directly 

256 return ( 

257 loads(self.command_line) 

258 if isinstance(self.command_line, bytes) 

259 else loads(self.command_line.encode()) 

260 ) 

261 

262 def set_command_line(self, command_line): 

263 """Sets / overwrites the command line for the job.""" 

264 self.command_line = dumps(command_line) 

265 

266 def get_exec_dir(self): 

267 """Returns the command line for the job.""" 

268 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

269 # In python 3, the command line is bytes, which can be pickled directly 

270 return ( 

271 str(os.path.realpath(self.exec_dir)) if self.exec_dir is not None else None 

272 ) 

273 

274 def get_array(self): 

275 """Returns the array arguments for the job; usually a string.""" 

276 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

277 # In python 3, the command line is bytes, which can be pickled directly 

278 return ( 

279 loads(self.array_string) 

280 if isinstance(self.array_string, bytes) 

281 else loads(self.array_string.encode()) 

282 ) 

283 

284 def get_arguments(self): 

285 """Returns the additional options for the grid (such as the queue, 

286 memory requirements, ...).""" 

287 # In python 2, the command line is unicode, which needs to be converted to string before pickling; 

288 # In python 3, the command line is bytes, which can be pickled directly 

289 args = ( 

290 loads(self.grid_arguments)["kwargs"] 

291 if isinstance(self.grid_arguments, bytes) 

292 else loads(self.grid_arguments.encode())["kwargs"] 

293 ) 

294 # in any case, the commands have to be converted to str 

295 retval = {} 

296 if "pe_opt" in args: 

297 retval["pe_opt"] = args["pe_opt"] 

298 if "memfree" in args and args["memfree"] is not None: 

299 retval["memfree"] = args["memfree"] 

300 if "hvmem" in args and args["hvmem"] is not None: 

301 retval["hvmem"] = args["hvmem"] 

302 if "gpumem" in args and args["gpumem"] is not None: 

303 retval["gpumem"] = args["gpumem"] 

304 if "env" in args and len(args["env"]) > 0: 

305 retval["env"] = args["env"] 

306 if "io_big" in args and args["io_big"]: 

307 retval["io_big"] = True 

308 if "sge_extra_args" in args: 

309 retval["sge_extra_args"] = args["sge_extra_args"] 

310 

311 # also add the queue 

312 if self.queue_name is not None: 

313 retval["queue"] = str(self.queue_name) 

314 

315 return retval 

316 

317 def set_arguments(self, **kwargs): 

318 self.grid_arguments = dumps(kwargs) 

319 

320 def get_jobs_we_wait_for(self): 

321 return [ 

322 j.waited_for_job 

323 for j in self.jobs_we_have_to_wait_for 

324 if j.waited_for_job is not None 

325 ] 

326 

327 def get_jobs_waiting_for_us(self): 

328 return [ 

329 j.waiting_job 

330 for j in self.jobs_that_wait_for_us 

331 if j.waiting_job is not None 

332 ] 

333 

334 def std_out_file(self, array_id=None): 

335 return ( 

336 os.path.join( 

337 self.log_dir, 

338 (self.name if self.name else "job") + ".o" + str(self.id), 

339 ) 

340 if self.log_dir 

341 else None 

342 ) 

343 

344 def std_err_file(self, array_id=None): 

345 return ( 

346 os.path.join( 

347 self.log_dir, 

348 (self.name if self.name else "job") + ".e" + str(self.id), 

349 ) 

350 if self.log_dir 

351 else None 

352 ) 

353 

354 def _cmdline(self): 

355 cmdline = self.get_command_line() 

356 c = "" 

357 for cmd in cmdline: 

358 if cmd[0] == "-": 

359 c += "%s " % cmd 

360 else: 

361 c += "'%s' " % cmd 

362 return c 

363 

364 def __str__(self): 

365 id = "%d (%d)" % (self.unique, self.id) 

366 if self.machine_name: 

367 m = f"{self.queue_name} - {self.machine_name}" 

368 else: 

369 m = self.queue_name 

370 if self.array: 

371 a = "[%d-%d:%d]" % self.get_array() 

372 else: 

373 a = "" 

374 if self.name is not None: 

375 n = f"<Job: {id} {a} - '{self.name}'>" 

376 else: 

377 n = "<Job: %s>" % id 

378 if self.result is not None: 

379 r = "%s (%d)" % (self.status, self.result) 

380 else: 

381 r = "%s" % self.status 

382 return f"{n} | {m} : {r} -- {self._cmdline()}" 

383 

384 def format(self, format, dependencies=0, limit_command_line=None): 

385 """Formats the current job into a nicer string to fit into a table.""" 

386 command_line = self._cmdline() 

387 if limit_command_line is not None and len(command_line) > limit_command_line: 

388 command_line = command_line[: limit_command_line - 3] + "..." 

389 

390 job_id = "%d" % self.id + ( 

391 " [%d-%d:%d]" % self.get_array() if self.array else "" 

392 ) 

393 status = "%s" % self.status + ( 

394 " (%d)" % self.result if self.result is not None else "" 

395 ) 

396 queue = self.queue_name if self.machine_name is None else self.machine_name 

397 if limit_command_line is None: 

398 grid_opt = self.get_arguments() 

399 if grid_opt: 

400 # add additional information about the job at the end 

401 command_line = ( 

402 "<" 

403 + ",".join([f"{key}={value}" for key, value in grid_opt.items()]) 

404 + ">: " 

405 + command_line 

406 ) 

407 if self.exec_dir is not None: 

408 command_line += "; [Executed in directory: '%s']" % self.exec_dir 

409 

410 if dependencies: 

411 deps = str( 

412 sorted(list({dep.unique for dep in self.get_jobs_we_wait_for()})) 

413 ) 

414 if dependencies < len(deps): 

415 deps = deps[: dependencies - 3] + "..." 

416 return format.format( 

417 self.unique, 

418 job_id, 

419 queue[:12], 

420 status, 

421 str(self.name), 

422 deps, 

423 command_line, 

424 ) 

425 else: 

426 return format.format( 

427 self.unique, 

428 job_id, 

429 queue[:12], 

430 status, 

431 str(self.name), 

432 command_line, 

433 ) 

434 

435 

436class JobDependence(Base): 

437 """This table defines a many-to-many relationship between Jobs.""" 

438 

439 __tablename__ = "JobDependence" 

440 id = Column(Integer, primary_key=True) 

441 waiting_job_id = Column( 

442 Integer, ForeignKey("Job.unique") 

443 ) # The ID of the waiting job 

444 waited_for_job_id = Column( 

445 Integer, ForeignKey("Job.unique") 

446 ) # The ID of the job to wait for 

447 

448 # This is twisted: The 'jobs_we_have_to_wait_for' field in the Job class needs to be joined with the waiting job id, so that jobs_we_have_to_wait_for.waiting_job is correct 

449 # Honestly, I am lost but it seems to work... 

450 waiting_job = relationship( 

451 "Job", 

452 backref="jobs_we_have_to_wait_for", 

453 primaryjoin=(Job.unique == waiting_job_id), 

454 order_by=id, 

455 ) # The job that is waited for 

456 waited_for_job = relationship( 

457 "Job", 

458 backref="jobs_that_wait_for_us", 

459 primaryjoin=(Job.unique == waited_for_job_id), 

460 order_by=id, 

461 ) # The job that waits 

462 

463 def __init__(self, waiting_job_id, waited_for_job_id): 

464 self.waiting_job_id = waiting_job_id 

465 self.waited_for_job_id = waited_for_job_id 

466 

467 

468def add_job( 

469 session, 

470 command_line, 

471 name="job", 

472 dependencies=[], 

473 array=None, 

474 exec_dir=None, 

475 log_dir=None, 

476 stop_on_failure=False, 

477 **kwargs, 

478): 

479 """Helper function to create a job, add the dependencies and the array 

480 jobs.""" 

481 job = Job( 

482 command_line=command_line, 

483 name=name, 

484 exec_dir=exec_dir, 

485 log_dir=log_dir, 

486 array_string=array, 

487 stop_on_failure=stop_on_failure, 

488 kwargs=kwargs, 

489 ) 

490 

491 session.add(job) 

492 session.flush() 

493 session.refresh(job) 

494 

495 # by default id and unique id are identical, but the id might be overwritten later on 

496 job.id = job.unique 

497 

498 for d in dependencies: 

499 if d == job.unique: 

500 logger.warn("Adding self-dependency of job %d is not allowed" % d) 

501 continue 

502 depending = list(session.query(Job).filter(Job.unique == d)) 

503 if len(depending): 

504 session.add(JobDependence(job.unique, depending[0].unique)) 

505 else: 

506 logger.warn("Could not find dependent job with id %d in database" % d) 

507 

508 if array: 

509 (start, stop, step) = array 

510 # add array jobs 

511 for i in range(start, stop + 1, step): 

512 session.add(ArrayJob(i, job.unique)) 

513 

514 session.commit() 

515 

516 return job 

517 

518 

519def times(job): 

520 """Returns a string containing timing information for teh given job, which 

521 might be a :py:class:`Job` or an :py:class:`ArrayJob`.""" 

522 timing = "Submitted: %s" % job.submit_time.ctime() 

523 if job.start_time is not None: 

524 timing += "\nStarted : {} \t Job waited : {}".format( 

525 job.start_time.ctime(), 

526 job.start_time - job.submit_time, 

527 ) 

528 if job.finish_time is not None: 

529 timing += "\nFinished : {} \t Job executed: {}".format( 

530 job.finish_time.ctime(), 

531 job.finish_time - job.start_time, 

532 ) 

533 return timing