The GridTk User Guide

The gridtk framework is a python library to help submitting, tracking and querying SGE. Here is quick example on how to use the gridtk framework to submit a python script:

import sys
from gridtk.sge import JobManagerSGE
from gridtk.tools import make_shell

manager = JobManagerSGE()
command = make_shell(sys.executable, ['myscript.py', '--help'])
job = manager.submit(command)

You can do, programatically, everything you can do with the job manager - just browse the help messages and the jman script for more information.

API to the Job Managers

class gridtk.manager.JobManager(database='submitted.sql3', wrapper_script=None, debug=False)[source]

Bases: object

This job manager defines the basic interface for handling jobs in the SQL database.

lock()[source]

Generates (and returns) a blocking session object to the database.

unlock()[source]

Closes the session to the database.

get_jobs(job_ids=None)[source]

Returns a list of jobs that are stored in the database.

run_job(job_id, array_id=None)[source]

This function is called to run a job (e.g. in the grid) with the given id and the given array index if applicable.

list(job_ids, print_array_jobs=False, print_dependencies=False, long=False, print_times=False, status=('submitted', 'queued', 'waiting', 'executing', 'success', 'failure'), names=None, ids_only=False)[source]

Lists the jobs currently added to the database.

report(job_ids=None, array_ids=None, output=True, error=True, status=('submitted', 'queued', 'waiting', 'executing', 'success', 'failure'), name=None)[source]

Iterates through the output and error files and write the results to command line.

delete_logs(job)[source]
delete(job_ids, array_ids=None, delete_logs=True, delete_log_dir=False, status=('submitted', 'queued', 'waiting', 'executing', 'success', 'failure'), delete_jobs=True)[source]

Deletes the jobs with the given ids from the database.

Defines the job manager which can help you managing submitted grid jobs.

class gridtk.sge.JobManagerSGE(context='grid', **kwargs)[source]

Bases: gridtk.manager.JobManager

The JobManager will submit and control the status of submitted jobs

submit(command_line, name=None, array=None, dependencies=[], exec_dir=None, log_dir='logs', dry_run=False, verbosity=0, stop_on_failure=False, **kwargs)[source]

Submits a job that will be executed in the grid.

communicate(job_ids=None)[source]

Communicates with the SGE grid (using qstat) to see if jobs are still running.

resubmit(job_ids=None, also_success=False, running_jobs=False, new_command=None, verbosity=0, keep_logs=False, **kwargs)[source]

Re-submit jobs automatically

run_job(job_id, array_id=None)[source]

Overwrites the run-job command from the manager to extract the correct job id before calling base class implementation.

stop_jobs(job_ids)[source]

Stops the jobs in the grid.

Defines the job manager which can help you managing submitted grid jobs.

class gridtk.local.JobManagerLocal(**kwargs)[source]

Bases: gridtk.manager.JobManager

Manages jobs run in parallel on the local machine.

submit(command_line, name=None, array=None, dependencies=[], exec_dir=None, log_dir=None, dry_run=False, stop_on_failure=False, **kwargs)[source]

Submits a job that will be executed on the local machine during a call to “run”. All kwargs will simply be ignored.

resubmit(job_ids=None, also_success=False, running_jobs=False, new_command=None, keep_logs=False, **kwargs)[source]

Re-submit jobs automatically

stop_jobs(job_ids=None)[source]

Resets the status of the job to ‘submitted’ when they are labeled as ‘executing’.

stop_job(job_id, array_id=None)[source]

Resets the status of the given to ‘submitted’ when they are labeled as ‘executing’.

run_scheduler(parallel_jobs=1, job_ids=None, sleep_time=0.1, die_when_finished=False, no_log=False, nice=None, verbosity=0)[source]

Starts the scheduler, which is constantly checking for jobs that should be ran.

The Models of the SQL3 Databases

class gridtk.models.ArrayJob(id, job_id)[source]

Bases: sqlalchemy.orm.decl_api.Base

This class defines one element of an array job.

unique
job
id
job_id
status
result
machine_name
submit_time
start_time
finish_time
std_out_file()[source]
std_err_file()[source]
format(format)[source]

Formats the current job into a nicer string to fit into a table.

class gridtk.models.Job(command_line, name=None, exec_dir=None, log_dir=None, array_string=None, queue_name='local', machine_name=None, stop_on_failure=False, **kwargs)[source]

Bases: sqlalchemy.orm.decl_api.Base

This class defines one Job that was submitted to the Job Manager.

unique
id
submit_time
start_time
finish_time
status
result
command_line
name
queue_name
machine_name
grid_arguments
exec_dir
log_dir
stop_on_failure
array_string
submit(new_queue=None)[source]

Sets the status of this job to ‘submitted’.

queue(new_job_id=None, new_job_name=None, queue_name=None)[source]

Sets the status of this job to ‘queued’ or ‘waiting’.

execute(array_id=None, machine_name=None)[source]

Sets the status of this job to ‘executing’.

finish(result, array_id=None)[source]

Sets the status of this job to ‘success’ or ‘failure’.

refresh()[source]

Refreshes the status information.

get_command_line()[source]

Returns the command line for the job.

set_command_line(command_line)[source]

Sets / overwrites the command line for the job.

get_exec_dir()[source]

Returns the command line for the job.

get_array()[source]

Returns the array arguments for the job; usually a string.

get_arguments()[source]

Returns the additional options for the grid (such as the queue, memory requirements, …).

set_arguments(**kwargs)[source]
get_jobs_we_wait_for()[source]
get_jobs_waiting_for_us()[source]
std_out_file(array_id=None)[source]
std_err_file(array_id=None)[source]
format(format, dependencies=0, limit_command_line=None)[source]

Formats the current job into a nicer string to fit into a table.

class gridtk.models.JobDependence(waiting_job_id, waited_for_job_id)[source]

Bases: sqlalchemy.orm.decl_api.Base

This table defines a many-to-many relationship between Jobs.

id
waiting_job
waited_for_job
waiting_job_id
waited_for_job_id
gridtk.models.add_job(session, command_line, name='job', dependencies=[], array=None, exec_dir=None, log_dir=None, stop_on_failure=False, **kwargs)[source]

Helper function to create a job, add the dependencies and the array jobs.

gridtk.models.times(job)[source]

Returns a string containing timing information for teh given job, which might be a Job or an ArrayJob.

Middleware

Functions that replace the shell based utilities for the grid submission and probing.

gridtk.tools.makedirs_safe(fulldir)[source]

Creates a directory if it does not exists. Takes into consideration concurrent access support. Works like the shell’s ‘mkdir -p’.

gridtk.tools.str_(name)[source]

Return the string representation of the given ‘name’. If it is a bytes object, it will be converted into str. If it is a str object, it will simply be resurned.

gridtk.tools.qsub(command, queue=None, cwd=True, name=None, deps=[], stdout='', stderr='', env=[], array=None, context='grid', hostname=None, memfree=None, hvmem=None, gpumem=None, pe_opt=None, io_big=False, sge_extra_args='')[source]

Submits a shell job to a given grid queue

Keyword parameters:

command

The command to be submitted to the grid

queue

A valid queue name or None, to use the default queue

cwd

If the job should change to the current working directory before starting

name

An optional name to set for the job. If not given, defaults to the script name being launched.

deps

Job ids to which this job will be dependent on

stdout

The standard output directory. If not given, defaults to what qsub has as a default.

stderr

The standard error directory (if not given, defaults to the stdout directory).

env

This is a list of extra variables that will be set on the environment running the command of your choice.

array

If set should be either:

  1. a string in the form m[-n[:s]] which indicates the starting range ‘m’, the closing range ‘n’ and the step ‘s’.

  2. an integer value indicating the total number of jobs to be submitted. This is equivalent ot set the parameter to a string “1-k:1” where “k” is the passed integer value

  3. a tuple that contains either 1, 2 or 3 elements indicating the start, end and step arguments (“m”, “n”, “s”).

The minimum value for “m” is 1. Giving “0” is an error.

If submitted with this option, the job to be created will be an SGE parametric job. In this mode SGE does not allow individual control of each job. The environment variable SGE_TASK_ID will be set on the executing process automatically by SGE and indicates the unique identifier in the range for which the current job instance is for.

context

The setshell context in which we should try a ‘qsub’. Normally you don’t need to change the default. This variable can also be set to a context dictionary in which case we just setup using that context instead of probing for a new one, what can be fast.

memfree

If set, it asks the queue for a node with a minimum amount of memory Used only if mem is not set (cf. qsub -l mem_free=<…>)

hvmem

If set, it asks the queue for a node with a minimum amount of memory Used only if mem is not set (cf. qsub -l h_vmem=<…>)

gpumem

Applicable only for GPU-based queues. If set, it asks for the GPU queue with a minimum amount of memory. The amount should not be more than 24. (cf. qsub -l gpumem=<…>)

hostname

If set, it asks the queue to use only a subset of the available nodes Symbols: | for OR, & for AND, ! for NOT, etc. (cf. qsub -l hostname=<…>)

pe_opt

If set, add a -pe option when launching a job (for instance pe_exclusive* 1-)

io_big

If set to true, the io_big flag will be set. Use this flag if your process will need a lot of Input/Output operations.

sge_extra_args

This is used to send extra argument to SGE. Note that all its arguments are directly used in qsub command. For example, jman submit -e “-P project_name -l pytorch=true” – … will be translated to qsub -P project_name -l pytorch=true – …

Returns the job id assigned to this job (integer)

gridtk.tools.make_shell(shell, command)[source]

Returns a single command given a shell and a command to be qsub’ed

Keyword parameters:

shell

The path to the shell to use when submitting the job.

command

The script path to be submitted

Returns the command parameters to be supplied to qsub()

gridtk.tools.qstat(jobid, context='grid')[source]

Queries status of a given job.

Keyword parameters:

jobid

The job identifier as returned by qsub()

context

The setshell context in which we should try a ‘qsub’. Normally you don’t need to change the default. This variable can also be set to a context dictionary in which case we just setup using that context instead of probing for a new one, what can be fast.

Returns a dictionary with the specific job properties

gridtk.tools.qdel(jobid, context='grid')[source]

Halts a given job.

Keyword parameters:

jobid

The job identifier as returned by qsub()

context

The setshell context in which we should try a ‘qsub’. Normally you don’t need to change the default. This variable can also be set to a context dictionary in which case we just setup using that context instead of probing for a new one, what can be fast.

gridtk.tools.get_array_job_slice(total_length)[source]

A helper function that let’s you chunk a list in an SGE array job. Use this function like a = a[get_array_job_slice(len(a))] to only process a chunk of a.

Parameters

total_length (int) – The length of the list that you are trying to slice

Returns

A slice to be used.

Return type

slice

Raises

NotImplementedError – If “SGE_TASK_FIRST” and “SGE_TASK_STEPSIZE” are not 1.

Low-level Utilities

Wrappers for Idiap’s SETSHELL functionality

gridtk.setshell.environ(context)[source]

Retrieves the environment for a particular SETSHELL context

gridtk.setshell.sexec(context, command, error_on_nonzero=True)[source]

Executes a command within a particular Idiap SETSHELL context

gridtk.setshell.replace(context, command)[source]