Source code for gridtk.manager

from __future__ import print_function

import os
import socket  # to get the host name
import subprocess

from distutils.version import LooseVersion
from shutil import which

import sqlalchemy

from .models import ArrayJob, Base, Job, Status, times
from .tools import logger


[docs]class JobManager: """This job manager defines the basic interface for handling jobs in the SQL database.""" def __init__( self, database="submitted.sql3", wrapper_script=None, debug=False ): self._database = os.path.realpath(database) self._engine = sqlalchemy.create_engine( "sqlite:///" + self._database, connect_args={"timeout": 600}, echo=debug, ) self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine) # store the command that this job manager was called with if wrapper_script is None: wrapper_script = "jman" if not os.path.exists(wrapper_script): bindir = os.path.join(os.path.realpath(os.curdir), "bin") wrapper_script = which( wrapper_script, path=os.pathsep.join((bindir, os.environ["PATH"])), ) if wrapper_script is None: raise IOError( "Could not find the installation path of gridtk. Please specify it in the wrapper_script parameter of the JobManager." ) if not os.path.exists(wrapper_script): raise IOError( "Your wrapper_script cannot be found. Jobs will not be executable." ) self.wrapper_script = wrapper_script def __del__(self): # remove the database if it is empty if os.path.isfile(self._database): # in errornous cases, the session might still be active, so don't create a deadlock here! if not hasattr(self, "session"): self.lock() job_count = len(self.get_jobs()) self.unlock() if not job_count: logger.debug( "Removed database file '%s' since database is empty" % self._database ) os.remove(self._database)
[docs] def lock(self): """Generates (and returns) a blocking session object to the database.""" if hasattr(self, "session"): raise RuntimeError( "Dead lock detected. Please do not try to lock the session when it is already locked!" ) if LooseVersion(sqlalchemy.__version__) < LooseVersion("0.7.8"): # for old sqlalchemy versions, in some cases it is required to re-generate the engine for each session self._engine = sqlalchemy.create_engine( "sqlite:///" + self._database ) self._session_maker = sqlalchemy.orm.sessionmaker(bind=self._engine) # create the database if it does not exist yet if not os.path.exists(self._database): self._create() # now, create a session self.session = self._session_maker() logger.debug("Created new database session to '%s'" % self._database) return self.session
[docs] def unlock(self): """Closes the session to the database.""" if not hasattr(self, "session"): raise RuntimeError( "Error detected! The session that you want to close does not exist any more!" ) logger.debug("Closed database session of '%s'" % self._database) self.session.close() del self.session
def _create(self): """Creates a new and empty database.""" from .tools import makedirs_safe # create directory for sql database makedirs_safe(os.path.dirname(self._database)) # create all the tables Base.metadata.create_all(self._engine) logger.debug("Created new empty database '%s'" % self._database)
[docs] def get_jobs(self, job_ids=None): """Returns a list of jobs that are stored in the database.""" if job_ids is not None and len(job_ids) == 0: return [] q = self.session.query(Job) if job_ids is not None: q = q.filter(Job.unique.in_(job_ids)) return sorted(list(q), key=lambda job: job.unique)
def _job_and_array(self, job_id, array_id=None): # get the job (and the array job) with the given id(s) job = self.get_jobs((job_id,)) if len(job) > 1: logger.error( "%d jobs with the same ID '%d' were detected in the database" % (len(job), job_id) ) elif not len(job): logger.error( "Job with ID '%d' was not found in the database." % job_id ) return (None, None) job = job[0] unique_id = job.unique if array_id is not None: array_job = list( self.session.query(ArrayJob) .filter(ArrayJob.job_id == unique_id) .filter(ArrayJob.id == array_id) ) assert len(array_job) == 1 return (job, array_job[0]) else: return (job, None)
[docs] def run_job(self, job_id, array_id=None): """This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable.""" # set the job's status in the database try: # get the job from the database self.lock() jobs = self.get_jobs((job_id,)) if not len(jobs): # it seems that the job has been deleted in the meanwhile return job = jobs[0] # get the machine name we are executing on; this might only work at idiap machine_name = socket.gethostname() # set the 'executing' status to the job job.execute(array_id, machine_name) self.session.commit() except Exception as e: logger.error("Caught exception '%s'", e) pass finally: self.unlock() # get the command line of the job from the database; does not need write access self.lock() job = self.get_jobs((job_id,))[0] command_line = job.get_command_line() exec_dir = job.get_exec_dir() self.unlock() logger.info("Starting job %d: %s", job_id, " ".join(command_line)) # execute the command line of the job, and wait until it has finished try: result = subprocess.call(command_line, cwd=exec_dir) logger.info("Job %d finished with result %s", job_id, str(result)) except Exception as e: logger.error( "The job with id '%d' could not be executed: %s", job_id, e ) result = 69 # ASCII: 'E' # set a new status and the results of the job try: self.lock() jobs = self.get_jobs((job_id,)) if not len(jobs): # it seems that the job has been deleted in the meanwhile logger.error( "The job with id '%d' could not be found in the database!", job_id, ) self.unlock() return job = jobs[0] job.finish(result, array_id) self.session.commit() # This might not be working properly, so use with care! if job.stop_on_failure and job.status == "failure": # the job has failed # stop this and all dependent jobs from execution dependent_jobs = job.get_jobs_waiting_for_us() dependent_job_ids = set( [dep.unique for dep in dependent_jobs] + [job.unique] ) while len(dependent_jobs): dep = dependent_jobs.pop(0) new = dep.get_jobs_waiting_for_us() dependent_jobs += new dependent_job_ids.update([dep.unique for dep in new]) self.unlock() deps = sorted(list(dependent_job_ids)) self.stop_jobs(deps) logger.warn( "Stopped dependent jobs '%s' since this job failed.", str(deps), ) except Exception as e: logger.error("Caught exception '%s'", e) pass finally: if hasattr(self, "session"): self.unlock()
[docs] def list( self, job_ids, print_array_jobs=False, print_dependencies=False, long=False, print_times=False, status=Status, names=None, ids_only=False, ): """Lists the jobs currently added to the database.""" # configuration for jobs fields = ("job-id", "grid-id", "queue", "status", "job-name") lengths = (6, 17, 11, 12, 16) dependency_length = 0 if print_dependencies: fields += ("dependencies",) lengths += (25,) dependency_length = lengths[-1] if long: fields += ("submitted command",) lengths += (43,) format = "{:^%d} " * len(lengths) format = format % lengths # if ids_only: # self.lock() # for job in self.get_jobs(): # print(job.unique, end=" ") # self.unlock() # return array_format = "{0:^%d} {1:>%d} {2:^%d} {3:^%d}" % lengths[:4] delimiter = format.format(*["=" * k for k in lengths]) array_delimiter = array_format.format(*["-" * k for k in lengths[:4]]) header = [fields[k].center(lengths[k]) for k in range(len(lengths))] # print header if not ids_only: print(" ".join(header)) print(delimiter) self.lock() for job in self.get_jobs(job_ids): job.refresh() if job.status in status and (names is None or job.name in names): if ids_only: print(job.unique, end=" ") else: print(job.format(format, dependency_length)) if print_times: print(times(job)) if (not ids_only) and print_array_jobs and job.array: print(array_delimiter) for array_job in job.array: if array_job.status in status: print(array_job.format(array_format)) if print_times: print(times(array_job)) print(array_delimiter) self.unlock()
[docs] def report( self, job_ids=None, array_ids=None, output=True, error=True, status=Status, name=None, ): """Iterates through the output and error files and write the results to command line.""" def _write_contents(job): # Writes the contents of the output and error files to command line out_file, err_file = job.std_out_file(), job.std_err_file() logger.info("Contents of output file: '%s'" % out_file) if ( output and out_file is not None and os.path.exists(out_file) and os.stat(out_file).st_size > 0 ): print(open(out_file).read().rstrip()) print("-" * 20) if ( error and err_file is not None and os.path.exists(err_file) and os.stat(err_file).st_size > 0 ): logger.info("Contents of error file: '%s'" % err_file) print(open(err_file).read().rstrip()) print("-" * 40) def _write_array_jobs(array_jobs): for array_job in array_jobs: print( "Array Job", str(array_job.id), ( "(%s) :" % array_job.machine_name if array_job.machine_name is not None else ":" ), ) _write_contents(array_job) self.lock() # check if an array job should be reported if array_ids: if len(job_ids) != 1: logger.error( "If array ids are specified exactly one job id must be given." ) array_jobs = list( self.session.query(ArrayJob) .join(Job) .filter(Job.unique.in_(job_ids)) .filter(Job.unique == ArrayJob.job_id) .filter(ArrayJob.id.in_(array_ids)) ) if array_jobs: print(array_jobs[0].job) _write_array_jobs(array_jobs) else: # iterate over all jobs jobs = self.get_jobs(job_ids) for job in jobs: if name is not None and job.name != name: continue if job.status not in status: continue if job.array: print(job) _write_array_jobs(job.array) else: print(job) _write_contents(job) if job.log_dir is not None: print("-" * 60) self.unlock()
[docs] def delete_logs(self, job): out_file, err_file = job.std_out_file(), job.std_err_file() if out_file and os.path.exists(out_file): os.remove(out_file) logger.debug("Removed output log file '%s'" % out_file) if err_file and os.path.exists(err_file): os.remove(err_file) logger.debug("Removed error log file '%s'" % err_file)
[docs] def delete( self, job_ids, array_ids=None, delete_logs=True, delete_log_dir=False, status=Status, delete_jobs=True, ): """Deletes the jobs with the given ids from the database.""" def _delete_dir_if_empty(log_dir): if ( log_dir and delete_log_dir and os.path.isdir(log_dir) and not os.listdir(log_dir) ): os.rmdir(log_dir) logger.info("Removed empty log directory '%s'" % log_dir) def _delete(job, try_to_delete_dir=False): # delete the job from the database if delete_logs: self.delete_logs(job) if try_to_delete_dir: _delete_dir_if_empty(job.log_dir) if delete_jobs: self.session.delete(job) self.lock() # check if array ids are specified if array_ids: if len(job_ids) != 1: logger.error( "If array ids are specified exactly one job id must be given." ) array_jobs = list( self.session.query(ArrayJob) .join(Job) .filter(Job.unique.in_(job_ids)) .filter(Job.unique == ArrayJob.job_id) .filter(ArrayJob.id.in_(array_ids)) ) if array_jobs: job = array_jobs[0].job for array_job in array_jobs: if array_job.status in status: if delete_jobs: logger.debug( "Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique) ) _delete(array_job) if not job.array: if job.status in status: if delete_jobs: logger.info( "Deleting job '%d' from the database." % job.unique ) _delete(job, delete_jobs) else: # iterate over all jobs jobs = self.get_jobs(job_ids) for job in jobs: # delete all array jobs if job.array: for array_job in job.array: if array_job.status in status: if delete_jobs: logger.debug( "Deleting array job '%d' of job '%d' from the database." % (array_job.id, job.unique) ) _delete(array_job) # delete this job if job.status in status: if delete_jobs: logger.info( "Deleting job '%d' from the database." % job.unique ) _delete(job, delete_jobs) self.session.commit() self.unlock()