#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.anjos@idiap.ch>
# Wed 24 Aug 2011 13:06:25 CEST
"""Defines the job manager which can help you managing submitted grid jobs.
"""
from __future__ import print_function
import copy
import os
import subprocess
import sys
import time
from .manager import JobManager
from .models import add_job
from .tools import logger, makedirs_safe
[docs]class JobManagerLocal(JobManager):
"""Manages jobs run in parallel on the local machine."""
def __init__(self, **kwargs):
"""Initializes this object with a state file and a method for qsub'bing.
Keyword parameters:
statefile
The file containing a valid status database for the manager. If the file
does not exist it is initialized. If it exists, it is loaded.
"""
JobManager.__init__(self, **kwargs)
[docs] def submit(
self,
command_line,
name=None,
array=None,
dependencies=[],
exec_dir=None,
log_dir=None,
dry_run=False,
stop_on_failure=False,
**kwargs
):
"""Submits a job that will be executed on the local machine during a call to "run".
All kwargs will simply be ignored."""
# remove duplicate dependencies
dependencies = sorted(list(set(dependencies)))
# add job to database
self.lock()
job = add_job(
self.session,
command_line=command_line,
name=name,
dependencies=dependencies,
array=array,
exec_dir=exec_dir,
log_dir=log_dir,
stop_on_failure=stop_on_failure,
)
logger.info("Added job '%s' to the database", job)
if dry_run:
print(
"Would have added the Job",
job,
"to the database to be executed locally.",
)
self.session.delete(job)
logger.info(
"Deleted job '%s' from the database due to dry-run option", job
)
job_id = None
else:
job_id = job.unique
# return the new job id
self.unlock()
return job_id
[docs] def resubmit(
self,
job_ids=None,
also_success=False,
running_jobs=False,
new_command=None,
keep_logs=False,
**kwargs
):
"""Re-submit jobs automatically"""
self.lock()
# iterate over all jobs
jobs = self.get_jobs(job_ids)
if new_command is not None:
if len(jobs) == 1:
jobs[0].set_command_line(new_command)
else:
logger.warn(
"Ignoring new command since no single job id was specified"
)
accepted_old_status = (
("submitted", "success", "failure")
if also_success
else (
"submitted",
"failure",
)
)
for job in jobs:
# check if this job needs re-submission
if running_jobs or job.status in accepted_old_status:
if job.queue_name != "local" and job.status == "executing":
logger.error(
"Cannot re-submit job '%s' locally since it is still running in the grid. Use 'jman stop' to stop it's execution!",
job,
)
else:
# re-submit job to the grid
logger.info("Re-submitted job '%s' to the database", job)
if not keep_logs:
self.delete_logs(job)
job.submit("local")
self.session.commit()
self.unlock()
[docs] def stop_jobs(self, job_ids=None):
"""Resets the status of the job to 'submitted' when they are labeled as 'executing'."""
self.lock()
jobs = self.get_jobs(job_ids)
for job in jobs:
if (
job.status in ("executing", "queued", "waiting")
and job.queue_name == "local"
):
logger.info(
"Reset job '%s' (%s) in the database",
job.name,
self._format_log(job.id),
)
job.submit()
self.session.commit()
self.unlock()
[docs] def stop_job(self, job_id, array_id=None):
"""Resets the status of the given to 'submitted' when they are labeled as 'executing'."""
self.lock()
job, array_job = self._job_and_array(job_id, array_id)
if job is not None:
if job.status in ("executing", "queued", "waiting"):
logger.info(
"Reset job '%s' (%s) in the database",
job.name,
self._format_log(job.id),
)
job.status = "submitted"
if array_job is not None and array_job.status in (
"executing",
"queued",
"waiting",
):
logger.debug("Reset array job '%s' in the database", array_job)
array_job.status = "submitted"
if array_job is None:
for array_job in job.array:
if array_job.status in ("executing", "queued", "waiting"):
logger.debug(
"Reset array job '%s' in the database", array_job
)
array_job.status = "submitted"
self.session.commit()
self.unlock()
############################################################
# Methods to run the jobs in parallel on the local machine #
############################################################
def _run_parallel_job(
self, job_id, array_id=None, no_log=False, nice=None, verbosity=0
):
"""Executes the code for this job on the local machine."""
environ = copy.deepcopy(os.environ)
environ["JOB_ID"] = str(job_id)
if array_id:
environ["SGE_TASK_ID"] = str(array_id)
else:
environ["SGE_TASK_ID"] = "undefined"
# generate call to the wrapper script
command = [
self.wrapper_script,
"-l%sd" % ("v" * verbosity),
self._database,
"run-job",
]
if nice is not None:
command = ["nice", "-n%d" % nice] + command
job, array_job = self._job_and_array(job_id, array_id)
if job is None:
# rare case: job was deleted before starting
return None
logger.info(
"Starting execution of Job '%s' (%s)",
job.name,
self._format_log(job_id, array_id, len(job.array)),
)
# create log files
if no_log or job.log_dir is None:
out, err = sys.stdout, sys.stderr
else:
makedirs_safe(job.log_dir)
# create line-buffered files for writing output and error status
if array_job is not None:
out, err = open(array_job.std_out_file(), "w", 1), open(
array_job.std_err_file(), "w", 1
)
else:
out, err = open(job.std_out_file(), "w", 1), open(
job.std_err_file(), "w", 1
)
# return the subprocess pipe to the process
try:
return subprocess.Popen(
command, env=environ, stdout=out, stderr=err, bufsize=1
)
except OSError as e:
logger.error(
"Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s",
job.name,
self._format_log(job_id, array_id, len(job.array)),
e,
" ".join(job.get_command_line()),
"." if job.exec_dir is None else job.exec_dir,
" ".join(command),
)
job.finish(117, array_id) # ASCII 'O'
return None
def _format_log(self, job_id, array_id=None, array_count=0):
return (
("%d (%d/%d)" % (job_id, array_id, array_count))
if array_id is not None and array_count
else ("%d (%d)" % (job_id, array_id))
if array_id is not None
else ("%d" % job_id)
)
[docs] def run_scheduler(
self,
parallel_jobs=1,
job_ids=None,
sleep_time=0.1,
die_when_finished=False,
no_log=False,
nice=None,
verbosity=0,
):
"""Starts the scheduler, which is constantly checking for jobs that should be ran."""
running_tasks = []
finished_tasks = set()
try:
# keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught
while True:
# Flag that might be set in some rare cases, and that prevents the scheduler to die
repeat_execution = False
# FIRST, try if there are finished processes
for task_index in range(len(running_tasks) - 1, -1, -1):
task = running_tasks[task_index]
process = task[0]
if process.poll() is not None:
# process ended
job_id = task[1]
array_id = task[2] if len(task) > 2 else None
self.lock()
job, array_job = self._job_and_array(job_id, array_id)
if job is not None:
jj = array_job if array_job is not None else job
result = (
"%s (%d)" % (jj.status, jj.result)
if jj.result is not None
else "%s (?)" % jj.status
)
if jj.status not in ("success", "failure"):
logger.error(
"Job '%s' (%s) finished with status '%s' instead of 'success' or 'failure'. Usually this means an internal error. Check your wrapper_script parameter!",
job.name,
self._format_log(job_id, array_id),
jj.status,
)
raise StopIteration(
"Job did not finish correctly."
)
logger.info(
"Job '%s' (%s) finished execution with result '%s'",
job.name,
self._format_log(job_id, array_id),
result,
)
self.unlock()
finished_tasks.add(job_id)
# in any case, remove the job from the list
del running_tasks[task_index]
# SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE
if len(running_tasks) < parallel_jobs:
# get all unfinished jobs:
self.lock()
jobs = self.get_jobs(job_ids)
# put all new jobs into the queue
for job in jobs:
if (
job.status == "submitted"
and job.queue_name == "local"
):
job.queue()
# get all unfinished jobs that are submitted to the local queue
unfinished_jobs = [
job
for job in jobs
if job.status in ("queued", "executing")
and job.queue_name == "local"
]
for job in unfinished_jobs:
if job.array:
# find array jobs that can run
queued_array_jobs = [
array_job
for array_job in job.array
if array_job.status == "queued"
]
if not len(queued_array_jobs):
job.finish(0, -1)
repeat_execution = True
else:
# there are new array jobs to run
for i in range(
min(
parallel_jobs - len(running_tasks),
len(queued_array_jobs),
)
):
array_job = queued_array_jobs[i]
# start a new job from the array
process = self._run_parallel_job(
job.unique,
array_job.id,
no_log=no_log,
nice=nice,
verbosity=verbosity,
)
if process is None:
continue
running_tasks.append(
(process, job.unique, array_job.id)
)
# we here set the status to executing manually to avoid jobs to be run twice
# e.g., if the loop is executed while the asynchronous job did not start yet
array_job.status = "executing"
job.status = "executing"
if len(running_tasks) == parallel_jobs:
break
else:
if job.status == "queued":
# start a new job
process = self._run_parallel_job(
job.unique,
no_log=no_log,
nice=nice,
verbosity=verbosity,
)
if process is None:
continue
running_tasks.append((process, job.unique))
# we here set the status to executing manually to avoid jobs to be run twice
# e.g., if the loop is executed while the asynchronous job did not start yet
job.status = "executing"
if len(running_tasks) == parallel_jobs:
break
self.session.commit()
self.unlock()
# if after the submission of jobs there are no jobs running, we should have finished all the queue.
if (
die_when_finished
and not repeat_execution
and len(running_tasks) == 0
):
logger.info(
"Stopping task scheduler since there are no more jobs running."
)
break
# THIRD: sleep the desired amount of time before re-checking
time.sleep(sleep_time)
# This is the only way to stop: you have to interrupt the scheduler
except (KeyboardInterrupt, StopIteration):
if hasattr(self, "session"):
self.unlock()
logger.info("Stopping task scheduler due to user interrupt.")
for task in running_tasks:
logger.warn(
"Killing job '%s' that was still running.",
self._format_log(
task[1], task[2] if len(task) > 2 else None
),
)
try:
task[0].kill()
except OSError as e:
logger.error(
"Killing job '%s' was not successful: '%s'",
self._format_log(
task[1], task[2] if len(task) > 2 else None
),
e,
)
self.stop_job(task[1])
# stop all jobs that are currently running or queued
self.stop_jobs(job_ids)
# check the result of the jobs that we have run, and return the list of failed jobs
self.lock()
jobs = self.get_jobs(finished_tasks)
failures = [job.unique for job in jobs if job.status != "success"]
self.unlock()
return sorted(failures)