Coverage for src/gridtk/models.py: 87%
275 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
5import logging
6import os
8from datetime import datetime
9from pickle import dumps, loads
11from sqlalchemy import Boolean, Column, DateTime, Enum, ForeignKey, Integer, String
12from sqlalchemy.orm import declarative_base, relationship
14logger = logging.getLogger(__name__)
17class Base:
18 __allow_unmapped__ = True
21Base = declarative_base(cls=Base) # type: ignore
23Status = ("submitted", "queued", "waiting", "executing", "success", "failure")
26class ArrayJob(Base):
27 """This class defines one element of an array job."""
29 __tablename__ = "ArrayJob"
31 unique = Column(Integer, primary_key=True)
32 id = Column(Integer)
33 job_id = Column(Integer, ForeignKey("Job.unique"))
34 status = Column(Enum(*Status))
35 result = Column(Integer)
36 machine_name = Column(String(10))
38 submit_time = Column(DateTime)
39 start_time = Column(DateTime)
40 finish_time = Column(DateTime)
42 job = relationship("Job", backref="array", order_by=id)
44 def __init__(self, id, job_id):
45 self.id = id
46 self.job_id = job_id
47 self.status = Status[0]
48 self.result = None
49 self.machine_name = None # will be set later, by the Job class
51 self.submit_time = datetime.now()
52 self.start_time = None
53 self.finish_time = None
55 def std_out_file(self):
56 return (
57 self.job.std_out_file() + "." + str(self.id) if self.job.log_dir else None
58 )
60 def std_err_file(self):
61 return (
62 self.job.std_err_file() + "." + str(self.id) if self.job.log_dir else None
63 )
65 def __str__(self):
66 n = "<ArrayJob %d> of <Job %d>" % (self.id, self.job.id)
67 if self.result is not None:
68 r = "%s (%d)" % (self.status, self.result)
69 else:
70 r = "%s" % self.status
71 return f"{n} : {r}"
73 def format(self, format):
74 """Formats the current job into a nicer string to fit into a table."""
75 job_id = "%d - %d" % (self.job.id, self.id)
76 queue = self.job.queue_name if self.machine_name is None else self.machine_name
77 status = "%s" % self.status + (
78 " (%d)" % self.result if self.result is not None else ""
79 )
81 return format.format("", job_id, queue, status)
84class Job(Base):
85 """This class defines one Job that was submitted to the Job Manager."""
87 __tablename__ = "Job"
89 unique = Column(
90 Integer, primary_key=True
91 ) # The unique ID of the job (not corresponding to the grid ID)
92 command_line = Column(
93 String(255)
94 ) # The command line to execute, converted to one string
95 name = Column(String(20)) # A hand-chosen name for the task
96 queue_name = Column(String(20)) # The name of the queue
97 machine_name = Column(String(10)) # The name of the machine in which the job is run
98 grid_arguments = Column(
99 String(255)
100 ) # The kwargs arguments for the job submission (e.g. in the grid)
101 id = Column(Integer) # The ID of the job as given from the grid
102 exec_dir = Column(
103 String(255)
104 ) # The directory in which the command should be executed
105 log_dir = Column(String(255)) # The directory where the log files will be put to
106 array_string = Column(
107 String(255)
108 ) # The array string (only needed for re-submission)
109 stop_on_failure = Column(
110 Boolean
111 ) # An indicator whether to stop depending jobs when this job finishes with an error
113 submit_time = Column(DateTime)
114 start_time = Column(DateTime)
115 finish_time = Column(DateTime)
117 status = Column(Enum(*Status))
118 result = Column(Integer)
120 def __init__(
121 self,
122 command_line,
123 name=None,
124 exec_dir=None,
125 log_dir=None,
126 array_string=None,
127 queue_name="local",
128 machine_name=None,
129 stop_on_failure=False,
130 **kwargs,
131 ):
132 """Constructs a Job object without an ID (needs to be set later)."""
133 self.command_line = dumps(command_line)
134 self.name = name
135 self.queue_name = queue_name # will be set during the queue command later
136 self.machine_name = machine_name # will be set during the execute command later
137 self.grid_arguments = dumps(kwargs)
138 self.exec_dir = exec_dir
139 self.log_dir = log_dir
140 self.stop_on_failure = stop_on_failure
141 self.array_string = dumps(array_string)
142 self.submit()
144 def submit(self, new_queue=None):
145 """Sets the status of this job to 'submitted'."""
146 self.status = "submitted"
147 self.result = None
148 self.machine_name = None
149 if new_queue is not None:
150 self.queue_name = new_queue
151 for array_job in self.array:
152 array_job.status = "submitted"
153 array_job.result = None
154 array_job.machine_name = None
155 self.submit_time = datetime.now()
156 self.start_time = None
157 self.finish_time = None
159 def queue(self, new_job_id=None, new_job_name=None, queue_name=None):
160 """Sets the status of this job to 'queued' or 'waiting'."""
161 # update the job id (i.e., when the job is executed in the grid)
162 if new_job_id is not None:
163 self.id = new_job_id
165 if new_job_name is not None:
166 self.name = new_job_name
168 if queue_name is not None:
169 self.queue_name = queue_name
171 new_status = "queued"
172 self.result = None
173 # check if we have to wait for another job to finish
174 for job in self.get_jobs_we_wait_for():
175 if job.status not in ("success", "failure"):
176 new_status = "waiting"
177 elif self.stop_on_failure and job.status == "failure":
178 new_status = "failure"
180 # reset the queued jobs that depend on us to waiting status
181 for job in self.get_jobs_waiting_for_us():
182 if job.status == "queued":
183 job.status = "failure" if new_status == "failure" else "waiting"
185 self.status = new_status
186 for array_job in self.array:
187 if array_job.status not in ("success", "failure"):
188 array_job.status = new_status
190 def execute(self, array_id=None, machine_name=None):
191 """Sets the status of this job to 'executing'."""
192 self.status = "executing"
193 if array_id is not None:
194 for array_job in self.array:
195 if array_job.id == array_id:
196 array_job.status = "executing"
197 if machine_name is not None:
198 array_job.machine_name = machine_name
199 array_job.start_time = datetime.now()
200 elif machine_name is not None:
201 self.machine_name = machine_name
202 if self.start_time is None:
203 self.start_time = datetime.now()
205 # sometimes, the 'finish' command did not work for array jobs,
206 # so check if any old job still has the 'executing' flag set
207 for job in self.get_jobs_we_wait_for():
208 if job.array and job.status == "executing":
209 job.finish(0, -1)
211 def finish(self, result, array_id=None):
212 """Sets the status of this job to 'success' or 'failure'."""
213 # check if there is any array job still running
214 new_status = "success" if result == 0 else "failure"
215 new_result = result
216 finished = True
217 if array_id is not None:
218 for array_job in self.array:
219 if array_job.id == array_id:
220 array_job.status = new_status
221 array_job.result = result
222 array_job.finish_time = datetime.now()
223 if array_job.status not in ("success", "failure"):
224 finished = False
225 elif new_result == 0:
226 new_result = array_job.result
228 if finished:
229 # There was no array job, or all array jobs finished
230 self.status = "success" if new_result == 0 else "failure"
231 self.result = new_result
232 self.finish_time = datetime.now()
234 # update all waiting jobs
235 for job in self.get_jobs_waiting_for_us():
236 if job.status == "waiting":
237 job.queue()
239 def refresh(self):
240 """Refreshes the status information."""
241 if self.status == "executing" and self.array:
242 new_result = 0
243 for array_job in self.array:
244 if array_job.status == "failure" and new_result is not None:
245 new_result = array_job.result
246 elif array_job.status not in ("success", "failure"):
247 new_result = None
248 if new_result is not None:
249 self.status = "success" if new_result == 0 else "failure"
250 self.result = new_result
252 def get_command_line(self):
253 """Returns the command line for the job."""
254 # In python 2, the command line is unicode, which needs to be converted to string before pickling;
255 # In python 3, the command line is bytes, which can be pickled directly
256 return (
257 loads(self.command_line)
258 if isinstance(self.command_line, bytes)
259 else loads(self.command_line.encode())
260 )
262 def set_command_line(self, command_line):
263 """Sets / overwrites the command line for the job."""
264 self.command_line = dumps(command_line)
266 def get_exec_dir(self):
267 """Returns the command line for the job."""
268 # In python 2, the command line is unicode, which needs to be converted to string before pickling;
269 # In python 3, the command line is bytes, which can be pickled directly
270 return (
271 str(os.path.realpath(self.exec_dir)) if self.exec_dir is not None else None
272 )
274 def get_array(self):
275 """Returns the array arguments for the job; usually a string."""
276 # In python 2, the command line is unicode, which needs to be converted to string before pickling;
277 # In python 3, the command line is bytes, which can be pickled directly
278 return (
279 loads(self.array_string)
280 if isinstance(self.array_string, bytes)
281 else loads(self.array_string.encode())
282 )
284 def get_arguments(self):
285 """Returns the additional options for the grid (such as the queue,
286 memory requirements, ...)."""
287 # In python 2, the command line is unicode, which needs to be converted to string before pickling;
288 # In python 3, the command line is bytes, which can be pickled directly
289 args = (
290 loads(self.grid_arguments)["kwargs"]
291 if isinstance(self.grid_arguments, bytes)
292 else loads(self.grid_arguments.encode())["kwargs"]
293 )
294 # in any case, the commands have to be converted to str
295 retval = {}
296 if "pe_opt" in args:
297 retval["pe_opt"] = args["pe_opt"]
298 if "memfree" in args and args["memfree"] is not None:
299 retval["memfree"] = args["memfree"]
300 if "hvmem" in args and args["hvmem"] is not None:
301 retval["hvmem"] = args["hvmem"]
302 if "gpumem" in args and args["gpumem"] is not None:
303 retval["gpumem"] = args["gpumem"]
304 if "env" in args and len(args["env"]) > 0:
305 retval["env"] = args["env"]
306 if "io_big" in args and args["io_big"]:
307 retval["io_big"] = True
308 if "sge_extra_args" in args:
309 retval["sge_extra_args"] = args["sge_extra_args"]
311 # also add the queue
312 if self.queue_name is not None:
313 retval["queue"] = str(self.queue_name)
315 return retval
317 def set_arguments(self, **kwargs):
318 self.grid_arguments = dumps(kwargs)
320 def get_jobs_we_wait_for(self):
321 return [
322 j.waited_for_job
323 for j in self.jobs_we_have_to_wait_for
324 if j.waited_for_job is not None
325 ]
327 def get_jobs_waiting_for_us(self):
328 return [
329 j.waiting_job
330 for j in self.jobs_that_wait_for_us
331 if j.waiting_job is not None
332 ]
334 def std_out_file(self, array_id=None):
335 return (
336 os.path.join(
337 self.log_dir,
338 (self.name if self.name else "job") + ".o" + str(self.id),
339 )
340 if self.log_dir
341 else None
342 )
344 def std_err_file(self, array_id=None):
345 return (
346 os.path.join(
347 self.log_dir,
348 (self.name if self.name else "job") + ".e" + str(self.id),
349 )
350 if self.log_dir
351 else None
352 )
354 def _cmdline(self):
355 cmdline = self.get_command_line()
356 c = ""
357 for cmd in cmdline:
358 if cmd[0] == "-":
359 c += "%s " % cmd
360 else:
361 c += "'%s' " % cmd
362 return c
364 def __str__(self):
365 id = "%d (%d)" % (self.unique, self.id)
366 if self.machine_name:
367 m = f"{self.queue_name} - {self.machine_name}"
368 else:
369 m = self.queue_name
370 if self.array:
371 a = "[%d-%d:%d]" % self.get_array()
372 else:
373 a = ""
374 if self.name is not None:
375 n = f"<Job: {id} {a} - '{self.name}'>"
376 else:
377 n = "<Job: %s>" % id
378 if self.result is not None:
379 r = "%s (%d)" % (self.status, self.result)
380 else:
381 r = "%s" % self.status
382 return f"{n} | {m} : {r} -- {self._cmdline()}"
384 def format(self, format, dependencies=0, limit_command_line=None):
385 """Formats the current job into a nicer string to fit into a table."""
386 command_line = self._cmdline()
387 if limit_command_line is not None and len(command_line) > limit_command_line:
388 command_line = command_line[: limit_command_line - 3] + "..."
390 job_id = "%d" % self.id + (
391 " [%d-%d:%d]" % self.get_array() if self.array else ""
392 )
393 status = "%s" % self.status + (
394 " (%d)" % self.result if self.result is not None else ""
395 )
396 queue = self.queue_name if self.machine_name is None else self.machine_name
397 if limit_command_line is None:
398 grid_opt = self.get_arguments()
399 if grid_opt:
400 # add additional information about the job at the end
401 command_line = (
402 "<"
403 + ",".join([f"{key}={value}" for key, value in grid_opt.items()])
404 + ">: "
405 + command_line
406 )
407 if self.exec_dir is not None:
408 command_line += "; [Executed in directory: '%s']" % self.exec_dir
410 if dependencies:
411 deps = str(
412 sorted(list({dep.unique for dep in self.get_jobs_we_wait_for()}))
413 )
414 if dependencies < len(deps):
415 deps = deps[: dependencies - 3] + "..."
416 return format.format(
417 self.unique,
418 job_id,
419 queue[:12],
420 status,
421 str(self.name),
422 deps,
423 command_line,
424 )
425 else:
426 return format.format(
427 self.unique,
428 job_id,
429 queue[:12],
430 status,
431 str(self.name),
432 command_line,
433 )
436class JobDependence(Base):
437 """This table defines a many-to-many relationship between Jobs."""
439 __tablename__ = "JobDependence"
440 id = Column(Integer, primary_key=True)
441 waiting_job_id = Column(
442 Integer, ForeignKey("Job.unique")
443 ) # The ID of the waiting job
444 waited_for_job_id = Column(
445 Integer, ForeignKey("Job.unique")
446 ) # The ID of the job to wait for
448 # This is twisted: The 'jobs_we_have_to_wait_for' field in the Job class needs to be joined with the waiting job id, so that jobs_we_have_to_wait_for.waiting_job is correct
449 # Honestly, I am lost but it seems to work...
450 waiting_job = relationship(
451 "Job",
452 backref="jobs_we_have_to_wait_for",
453 primaryjoin=(Job.unique == waiting_job_id),
454 order_by=id,
455 ) # The job that is waited for
456 waited_for_job = relationship(
457 "Job",
458 backref="jobs_that_wait_for_us",
459 primaryjoin=(Job.unique == waited_for_job_id),
460 order_by=id,
461 ) # The job that waits
463 def __init__(self, waiting_job_id, waited_for_job_id):
464 self.waiting_job_id = waiting_job_id
465 self.waited_for_job_id = waited_for_job_id
468def add_job(
469 session,
470 command_line,
471 name="job",
472 dependencies=[],
473 array=None,
474 exec_dir=None,
475 log_dir=None,
476 stop_on_failure=False,
477 **kwargs,
478):
479 """Helper function to create a job, add the dependencies and the array
480 jobs."""
481 job = Job(
482 command_line=command_line,
483 name=name,
484 exec_dir=exec_dir,
485 log_dir=log_dir,
486 array_string=array,
487 stop_on_failure=stop_on_failure,
488 kwargs=kwargs,
489 )
491 session.add(job)
492 session.flush()
493 session.refresh(job)
495 # by default id and unique id are identical, but the id might be overwritten later on
496 job.id = job.unique
498 for d in dependencies:
499 if d == job.unique:
500 logger.warn("Adding self-dependency of job %d is not allowed" % d)
501 continue
502 depending = list(session.query(Job).filter(Job.unique == d))
503 if len(depending):
504 session.add(JobDependence(job.unique, depending[0].unique))
505 else:
506 logger.warn("Could not find dependent job with id %d in database" % d)
508 if array:
509 (start, stop, step) = array
510 # add array jobs
511 for i in range(start, stop + 1, step):
512 session.add(ArrayJob(i, job.unique))
514 session.commit()
516 return job
519def times(job):
520 """Returns a string containing timing information for teh given job, which
521 might be a :py:class:`Job` or an :py:class:`ArrayJob`."""
522 timing = "Submitted: %s" % job.submit_time.ctime()
523 if job.start_time is not None:
524 timing += "\nStarted : {} \t Job waited : {}".format(
525 job.start_time.ctime(),
526 job.start_time - job.submit_time,
527 )
528 if job.finish_time is not None:
529 timing += "\nFinished : {} \t Job executed: {}".format(
530 job.finish_time.ctime(),
531 job.finish_time - job.start_time,
532 )
533 return timing