Coverage for src/gridtk/manager.py: 82%
250 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
5import logging
6import os
7import socket # to get the host name
8import subprocess
10from shutil import which
12import sqlalchemy
14from .models import ArrayJob, Base, Job, Status, times
16logger = logging.getLogger(__name__)
19class JobManager:
20 """This job manager defines the basic interface for handling jobs in the
21 SQL database."""
23 def __init__(self, database="submitted.sql3", wrapper_script=None, debug=False):
24 self._database = os.path.realpath(database)
25 self._engine = sqlalchemy.create_engine(
26 "sqlite:///" + self._database,
27 connect_args={"timeout": 600},
28 echo=debug,
29 )
30 self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine)
32 # store the command that this job manager was called with
33 if wrapper_script is None:
34 wrapper_script = "jman"
35 if not os.path.exists(wrapper_script):
36 bindir = os.path.join(os.path.realpath(os.curdir), "bin")
37 wrapper_script = which(
38 wrapper_script,
39 path=os.pathsep.join((bindir, os.environ["PATH"])),
40 )
42 if wrapper_script is None:
43 raise OSError(
44 "Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager."
45 )
46 if not os.path.exists(wrapper_script):
47 raise OSError(
48 "Your wrapper_script cannot be found. Jobs will not be executable."
49 )
50 self.wrapper_script = wrapper_script
52 def __del__(self):
53 # remove the database if it is empty
54 if os.path.isfile(self._database):
55 # in errornous cases, the session might still be active, so don't create a deadlock here!
56 if not hasattr(self, "session"):
57 self.lock()
58 job_count = len(self.get_jobs())
59 self.unlock()
60 if not job_count:
61 logger.debug(
62 "Removed database file '%s' since database is empty"
63 % self._database
64 )
65 os.remove(self._database)
67 def lock(self):
68 """Generates (and returns) a blocking session object to the
69 database."""
70 if hasattr(self, "session"):
71 raise RuntimeError(
72 "Dead lock detected. Please do not try to lock the session when it is already locked!"
73 )
75 # create the database if it does not exist yet
76 if not os.path.exists(self._database):
77 self._create()
79 # now, create a session
80 self.session = self._session_maker()
81 logger.debug("Created new database session to '%s'" % self._database)
82 return self.session
84 def unlock(self):
85 """Closes the session to the database."""
86 if not hasattr(self, "session"):
87 raise RuntimeError(
88 "Error detected! The session that you want to close does not exist any more!"
89 )
90 logger.debug("Closed database session of '%s'" % self._database)
91 self.session.close()
92 del self.session
94 def _create(self):
95 """Creates a new and empty database."""
96 # create directory for sql database
97 os.makedirs(os.path.dirname(self._database), exist_ok=True)
99 # create all the tables
100 Base.metadata.create_all(self._engine)
101 logger.debug("Created new empty database '%s'" % self._database)
103 def get_jobs(self, job_ids=None):
104 """Returns a list of jobs that are stored in the database."""
105 if job_ids is not None and len(job_ids) == 0:
106 return []
107 q = self.session.query(Job)
108 if job_ids is not None:
109 q = q.filter(Job.unique.in_(job_ids))
110 return sorted(list(q), key=lambda job: job.unique)
112 def _job_and_array(self, job_id, array_id=None):
113 # get the job (and the array job) with the given id(s)
114 job = self.get_jobs((job_id,))
115 if len(job) > 1:
116 logger.error(
117 "%d jobs with the same ID '%d' were detected in the database"
118 % (len(job), job_id)
119 )
120 elif not len(job):
121 logger.error("Job with ID '%d' was not found in the database." % job_id)
122 return (None, None)
124 job = job[0]
125 unique_id = job.unique
127 if array_id is not None:
128 array_job = list(
129 self.session.query(ArrayJob)
130 .filter(ArrayJob.job_id == unique_id)
131 .filter(ArrayJob.id == array_id)
132 )
133 assert len(array_job) == 1
134 return (job, array_job[0])
135 else:
136 return (job, None)
138 def run_job(self, job_id, array_id=None):
139 """This function is called to run a job (e.g. in the grid) with the
140 given id and the given array index if applicable."""
141 # set the job's status in the database
142 try:
143 # get the job from the database
144 self.lock()
145 jobs = self.get_jobs((job_id,))
146 if not len(jobs):
147 # it seems that the job has been deleted in the meanwhile
148 return
149 job = jobs[0]
151 # get the machine name we are executing on; this might only work at idiap
152 machine_name = socket.gethostname()
154 # set the 'executing' status to the job
155 job.execute(array_id, machine_name)
157 self.session.commit()
158 except Exception as e:
159 logger.error("Caught exception '%s'", e)
160 pass
161 finally:
162 self.unlock()
164 # get the command line of the job from the database; does not need write access
165 self.lock()
166 job = self.get_jobs((job_id,))[0]
167 command_line = job.get_command_line()
168 exec_dir = job.get_exec_dir()
169 self.unlock()
171 logger.info("Starting job %d: %s", job_id, " ".join(command_line))
173 # execute the command line of the job, and wait until it has finished
174 try:
175 result = subprocess.call(command_line, cwd=exec_dir)
176 logger.info("Job %d finished with result %s", job_id, str(result))
177 except Exception as e:
178 logger.error("The job with id '%d' could not be executed: %s", job_id, e)
179 result = 69 # ASCII: 'E'
181 # set a new status and the results of the job
182 try:
183 self.lock()
184 jobs = self.get_jobs((job_id,))
185 if not len(jobs):
186 # it seems that the job has been deleted in the meanwhile
187 logger.error(
188 "The job with id '%d' could not be found in the database!",
189 job_id,
190 )
191 self.unlock()
192 return
194 job = jobs[0]
195 job.finish(result, array_id)
197 self.session.commit()
199 # This might not be working properly, so use with care!
200 if job.stop_on_failure and job.status == "failure":
201 # the job has failed
202 # stop this and all dependent jobs from execution
203 dependent_jobs = job.get_jobs_waiting_for_us()
204 dependent_job_ids = set(
205 [dep.unique for dep in dependent_jobs] + [job.unique]
206 )
207 while len(dependent_jobs):
208 dep = dependent_jobs.pop(0)
209 new = dep.get_jobs_waiting_for_us()
210 dependent_jobs += new
211 dependent_job_ids.update([dep.unique for dep in new])
213 self.unlock()
214 deps = sorted(list(dependent_job_ids))
215 self.stop_jobs(deps)
216 logger.warn(
217 "Stopped dependent jobs '%s' since this job failed.",
218 str(deps),
219 )
221 except Exception as e:
222 logger.error("Caught exception '%s'", e)
223 pass
224 finally:
225 if hasattr(self, "session"):
226 self.unlock()
228 def list(
229 self,
230 job_ids,
231 print_array_jobs=False,
232 print_dependencies=False,
233 long=False,
234 print_times=False,
235 status=Status,
236 names=None,
237 ids_only=False,
238 ):
239 """Lists the jobs currently added to the database."""
240 # configuration for jobs
241 fields = ("job-id", "grid-id", "queue", "status", "job-name")
242 lengths = (6, 17, 11, 12, 16)
243 dependency_length = 0
245 if print_dependencies:
246 fields += ("dependencies",)
247 lengths += (25,)
248 dependency_length = lengths[-1]
250 if long:
251 fields += ("submitted command",)
252 lengths += (43,)
254 format = "{:^%d} " * len(lengths)
255 format = format % lengths
257 # if ids_only:
258 # self.lock()
259 # for job in self.get_jobs():
260 # print(job.unique, end=" ")
261 # self.unlock()
262 # return
264 array_format = "{0:^%d} {1:>%d} {2:^%d} {3:^%d}" % lengths[:4]
265 delimiter = format.format(*["=" * k for k in lengths])
266 array_delimiter = array_format.format(*["-" * k for k in lengths[:4]])
267 header = [fields[k].center(lengths[k]) for k in range(len(lengths))]
269 # print header
270 if not ids_only:
271 print(" ".join(header))
272 print(delimiter)
274 self.lock()
275 for job in self.get_jobs(job_ids):
276 job.refresh()
277 if job.status in status and (names is None or job.name in names):
278 if ids_only:
279 print(job.unique, end=" ")
280 else:
281 print(job.format(format, dependency_length))
282 if print_times:
283 print(times(job))
285 if (not ids_only) and print_array_jobs and job.array:
286 print(array_delimiter)
287 for array_job in job.array:
288 if array_job.status in status:
289 print(array_job.format(array_format))
290 if print_times:
291 print(times(array_job))
292 print(array_delimiter)
294 self.unlock()
296 def report(
297 self,
298 job_ids=None,
299 array_ids=None,
300 output=True,
301 error=True,
302 status=Status,
303 name=None,
304 ):
305 """Iterates through the output and error files and write the results to
306 command line."""
308 def _write_contents(job):
309 # Writes the contents of the output and error files to command line
310 out_file, err_file = job.std_out_file(), job.std_err_file()
311 logger.info("Contents of output file: '%s'" % out_file)
312 if (
313 output
314 and out_file is not None
315 and os.path.exists(out_file)
316 and os.stat(out_file).st_size > 0
317 ):
318 print(open(out_file).read().rstrip())
319 print("-" * 20)
320 if (
321 error
322 and err_file is not None
323 and os.path.exists(err_file)
324 and os.stat(err_file).st_size > 0
325 ):
326 logger.info("Contents of error file: '%s'" % err_file)
327 print(open(err_file).read().rstrip())
328 print("-" * 40)
330 def _write_array_jobs(array_jobs):
331 for array_job in array_jobs:
332 print(
333 "Array Job",
334 str(array_job.id),
335 (
336 "(%s) :" % array_job.machine_name
337 if array_job.machine_name is not None
338 else ":"
339 ),
340 )
341 _write_contents(array_job)
343 self.lock()
345 # check if an array job should be reported
346 if array_ids:
347 if len(job_ids) != 1:
348 logger.error(
349 "If array ids are specified exactly one job id must be given."
350 )
351 array_jobs = list(
352 self.session.query(ArrayJob)
353 .join(Job)
354 .filter(Job.unique.in_(job_ids))
355 .filter(Job.unique == ArrayJob.job_id)
356 .filter(ArrayJob.id.in_(array_ids))
357 )
358 if array_jobs:
359 print(array_jobs[0].job)
360 _write_array_jobs(array_jobs)
362 else:
363 # iterate over all jobs
364 jobs = self.get_jobs(job_ids)
365 for job in jobs:
366 if name is not None and job.name != name:
367 continue
368 if job.status not in status:
369 continue
370 if job.array:
371 print(job)
372 _write_array_jobs(job.array)
373 else:
374 print(job)
375 _write_contents(job)
376 if job.log_dir is not None:
377 print("-" * 60)
379 self.unlock()
381 def delete_logs(self, job):
382 out_file, err_file = job.std_out_file(), job.std_err_file()
383 if out_file and os.path.exists(out_file):
384 os.remove(out_file)
385 logger.debug("Removed output log file '%s'" % out_file)
386 if err_file and os.path.exists(err_file):
387 os.remove(err_file)
388 logger.debug("Removed error log file '%s'" % err_file)
390 def delete(
391 self,
392 job_ids,
393 array_ids=None,
394 delete_logs=True,
395 delete_log_dir=False,
396 status=Status,
397 delete_jobs=True,
398 ):
399 """Deletes the jobs with the given ids from the database."""
401 def _delete_dir_if_empty(log_dir):
402 if (
403 log_dir
404 and delete_log_dir
405 and os.path.isdir(log_dir)
406 and not os.listdir(log_dir)
407 ):
408 os.rmdir(log_dir)
409 logger.info("Removed empty log directory '%s'" % log_dir)
411 def _delete(job, try_to_delete_dir=False):
412 # delete the job from the database
413 if delete_logs:
414 self.delete_logs(job)
415 if try_to_delete_dir:
416 _delete_dir_if_empty(job.log_dir)
417 if delete_jobs:
418 self.session.delete(job)
420 self.lock()
422 # check if array ids are specified
423 if array_ids:
424 if len(job_ids) != 1:
425 logger.error(
426 "If array ids are specified exactly one job id must be given."
427 )
428 array_jobs = list(
429 self.session.query(ArrayJob)
430 .join(Job)
431 .filter(Job.unique.in_(job_ids))
432 .filter(Job.unique == ArrayJob.job_id)
433 .filter(ArrayJob.id.in_(array_ids))
434 )
435 if array_jobs:
436 job = array_jobs[0].job
437 for array_job in array_jobs:
438 if array_job.status in status:
439 if delete_jobs:
440 logger.debug(
441 "Deleting array job '%d' of job '%d' from the database."
442 % (array_job.id, job.unique)
443 )
444 _delete(array_job)
445 if not job.array:
446 if job.status in status:
447 if delete_jobs:
448 logger.info(
449 "Deleting job '%d' from the database." % job.unique
450 )
451 _delete(job, delete_jobs)
453 else:
454 # iterate over all jobs
455 jobs = self.get_jobs(job_ids)
456 for job in jobs:
457 # delete all array jobs
458 if job.array:
459 for array_job in job.array:
460 if array_job.status in status:
461 if delete_jobs:
462 logger.debug(
463 "Deleting array job '%d' of job '%d' from the database."
464 % (array_job.id, job.unique)
465 )
466 _delete(array_job)
467 # delete this job
468 if job.status in status:
469 if delete_jobs:
470 logger.info("Deleting job '%d' from the database." % job.unique)
471 _delete(job, delete_jobs)
473 self.session.commit()
474 self.unlock()