#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST
"""Defines the job manager which can help you managing submitted grid jobs.
"""
from __future__ import print_function
import os
import re
import sys
from .manager import JobManager
from .models import Job, add_job
from .setshell import environ
from .tools import logger, make_shell, makedirs_safe, qdel, qstat, qsub
[docs]class JobManagerSGE(JobManager):
"""The JobManager will submit and control the status of submitted jobs"""
def __init__(self, context="grid", **kwargs):
"""Initializes this object with a state file and a method for qsub'bing.
Keyword parameters:
statefile
The file containing a valid status database for the manager. If the file
does not exist it is initialized. If it exists, it is loaded.
context
The context to provide when setting up the environment to call the SGE
utilities such as qsub, qstat and qdel (normally 'grid', which also
happens to be default)
"""
self.context = environ(context)
JobManager.__init__(self, **kwargs)
def _queue(self, kwargs):
"""The hard resource_list comes like this: '<qname>=TRUE,mem=128M'. To
process it we have to split it twice (',' and then on '='), create a
dictionary and extract just the qname"""
if "hard resource_list" not in kwargs:
return "all.q"
d = dict(
[k.split("=") for k in kwargs["hard resource_list"].split(",")]
)
for k in d:
if k[0] == "q" and d[k] == "TRUE":
return k
return "all.q"
def _submit_to_grid(
self, job, name, array, dependencies, log_dir, verbosity, **kwargs
):
# ... what we will actually submit to the grid is a wrapper script that will call the desired command...
# get the name of the file that was called originally
jman = self.wrapper_script
python = sys.executable
# get the grid id's for the dependencies and remove duplicates
dependent_jobs = self.get_jobs(dependencies)
deps = sorted(list(set([j.id for j in dependent_jobs])))
# make sure log directory is created and is a directory
makedirs_safe(job.log_dir)
assert os.path.isdir(
job.log_dir
), "Please make sure --log-dir `{}' either does not exist or is a directory.".format(
job.log_dir
)
# generate call to the wrapper script
command = make_shell(
python,
[jman, "-%sd" % ("v" * verbosity), self._database, "run-job"],
)
q_array = "%d-%d:%d" % array if array else None
grid_id = qsub(
command,
context=self.context,
name=name,
deps=deps,
array=q_array,
stdout=log_dir,
stderr=log_dir,
**kwargs
)
# get the result of qstat
status = qstat(grid_id, context=self.context)
# set the grid id of the job
job.queue(
new_job_id=int(status["job_number"]),
new_job_name=status["job_name"],
queue_name=self._queue(status),
)
logger.info(
"Submitted job '%s' with dependencies '%s' to the SGE grid."
% (job, str(deps))
)
if (
"io_big" in kwargs
and kwargs["io_big"]
and ("queue" not in kwargs or kwargs["queue"] == "all.q")
):
logger.warn(
"This job will never be executed since the 'io_big' flag is not available for the 'all.q'."
)
if "pe_opt" in kwargs and (
"queue" not in kwargs
or kwargs["queue"]
not in ("q1dm", "q_1day_mth", "q1wm", "q_1week_mth")
):
logger.warn(
"This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead."
% kwargs["queue"]
if "queue" in kwargs
else "all.q"
)
if (
"gpumem" in kwargs
and "queue" in kwargs
and kwargs["queue"] in ("gpu", "lgpu", "sgpu")
and int(re.sub("\\D", "", kwargs["gpumem"])) > 24
):
logger.warn(
"This job will never be executed since the GPU queue '%s' cannot have more than 24GB of memory."
% kwargs["queue"]
)
assert job.id == grid_id
return job.unique
[docs] def submit(
self,
command_line,
name=None,
array=None,
dependencies=[],
exec_dir=None,
log_dir="logs",
dry_run=False,
verbosity=0,
stop_on_failure=False,
**kwargs
):
"""Submits a job that will be executed in the grid."""
# add job to database
self.lock()
job = add_job(
self.session,
command_line,
name,
dependencies,
array,
exec_dir=exec_dir,
log_dir=log_dir,
stop_on_failure=stop_on_failure,
context=self.context,
**kwargs
)
logger.info("Added job '%s' to the database." % job)
if dry_run:
print("Would have added the Job")
print(job)
print(
"to the database to be executed in the grid with options:",
str(kwargs),
)
self.session.delete(job)
logger.info(
"Deleted job '%s' from the database due to dry-run option" % job
)
job_id = None
else:
job_id = self._submit_to_grid(
job, name, array, dependencies, log_dir, verbosity, **kwargs
)
self.session.commit()
self.unlock()
return job_id
[docs] def communicate(self, job_ids=None):
"""Communicates with the SGE grid (using qstat) to see if jobs are still running."""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
for job in jobs:
job.refresh()
if (
job.status in ("queued", "executing", "waiting")
and job.queue_name != "local"
):
status = qstat(job.id, context=self.context)
if len(status) == 0:
job.status = "failure"
job.result = 70 # ASCII: 'F'
logger.warn(
"The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files."
% job
)
for array_job in job.array:
if array_job.status in ("queued", "executing"):
array_job.status = "failure"
array_job.result = 70 # ASCII: 'F'
self.session.commit()
self.unlock()
[docs] def resubmit(
self,
job_ids=None,
also_success=False,
running_jobs=False,
new_command=None,
verbosity=0,
keep_logs=False,
**kwargs
):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
if new_command is not None:
if len(jobs) == 1:
jobs[0].set_command_line(new_command)
else:
logger.warn(
"Ignoring new command since no single job id was specified"
)
accepted_old_status = (
("submitted", "success", "failure")
if also_success
else (
"submitted",
"failure",
)
)
for job in jobs:
# check if this job needs re-submission
if running_jobs or job.status in accepted_old_status:
grid_status = qstat(job.id, context=self.context)
if len(grid_status) != 0:
logger.warn(
"Deleting job '%d' since it was still running in the grid."
% job.unique
)
qdel(job.id, context=self.context)
# re-submit job to the grid
arguments = job.get_arguments()
arguments.update(**kwargs)
if "queue" not in arguments or arguments["queue"] == "all.q":
for arg in ("hvmem", "pe_opt", "io_big"):
if arg in arguments:
del arguments[arg]
job.set_arguments(kwargs=arguments)
# delete old status and result of the job
if not keep_logs:
self.delete_logs(job)
job.submit()
if job.queue_name == "local" and "queue" not in arguments:
logger.warn(
"Re-submitting job '%s' locally (since no queue name is specified)."
% job
)
else:
deps = [dep.unique for dep in job.get_jobs_we_wait_for()]
logger.debug(
"Re-submitting job '%s' with dependencies '%s' to the grid."
% (job, deps)
)
self._submit_to_grid(
job,
job.name,
job.get_array(),
deps,
job.log_dir,
verbosity,
**arguments
)
# commit after each job to avoid failures of not finding the job during execution in the grid
self.session.commit()
self.unlock()
[docs] def run_job(self, job_id, array_id=None):
"""Overwrites the run-job command from the manager to extract the correct job id before calling base class implementation."""
# get the unique job id from the given grid id
self.lock()
jobs = list(self.session.query(Job).filter(Job.id == job_id))
if len(jobs) != 1:
self.unlock()
raise ValueError(
"Could not find job id '%d' in the database'" % job_id
)
job_id = jobs[0].unique
self.unlock()
# call base class implementation with the corrected job id
return JobManager.run_job(self, job_id, array_id)
[docs] def stop_jobs(self, job_ids):
"""Stops the jobs in the grid."""
self.lock()
jobs = self.get_jobs(job_ids)
for job in jobs:
if job.status in ("executing", "queued", "waiting"):
qdel(job.id, context=self.context)
logger.info("Stopped job '%s' in the SGE grid." % job)
job.submit()
self.session.commit()
self.unlock()