Coverage for src/gridtk/sge.py: 23%
126 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
4"""Defines the job manager which can help you managing submitted grid jobs."""
6from __future__ import annotations
8import logging
9import os
10import re
11import sys
13from .manager import JobManager
14from .models import Job, add_job
15from .setshell import environ
16from .tools import make_shell, qdel, qstat, qsub
18logger = logging.getLogger(__name__)
21class JobManagerSGE(JobManager):
22 """The JobManager will submit and control the status of submitted jobs."""
24 def __init__(self, context="grid", **kwargs):
25 """Initializes this object with a state file and a method for
26 qsub'bing.
28 Keyword parameters:
30 statefile
31 The file containing a valid status database for the manager. If
32 the file
33 does not exist it is initialized. If it exists, it is loaded.
35 context
36 The context to provide when setting up the environment to call
37 the SGE
38 utilities such as qsub, qstat and qdel (normally 'grid', which
39 also
40 happens to be default)
41 """
42 self.context = environ(context)
43 JobManager.__init__(self, **kwargs)
45 def _queue(self, kwargs):
46 """The hard resource_list comes like this: '<qname>=TRUE,mem=128M'.
48 To process it we have to split it twice (',' and then on '='),
49 create a dictionary and extract just the qname
50 """
51 if "hard resource_list" not in kwargs:
52 return "all.q"
53 d = dict([k.split("=") for k in kwargs["hard resource_list"].split(",")])
54 for k in d:
55 if k[0] == "q" and d[k] == "TRUE":
56 return k
57 return "all.q"
59 def _submit_to_grid(
60 self, job, name, array, dependencies, log_dir, verbosity, **kwargs
61 ):
62 # ... what we will actually submit to the grid is a wrapper script that will call the desired command...
63 # get the name of the file that was called originally
64 jman = self.wrapper_script
65 python = sys.executable
67 # get the grid id's for the dependencies and remove duplicates
68 dependent_jobs = self.get_jobs(dependencies)
69 deps = sorted(list({j.id for j in dependent_jobs}))
71 # make sure log directory is created and is a directory
72 os.makedirs(job.log_dir, exist_ok=True)
73 assert os.path.isdir(
74 job.log_dir
75 ), "Please make sure --log-dir `{}' either does not exist or is a directory.".format(
76 job.log_dir
77 )
79 # generate call to the wrapper script
80 command = make_shell(
81 python,
82 [jman, "-%sd" % ("v" * verbosity), self._database, "run-job"],
83 )
84 q_array = "%d-%d:%d" % array if array else None
85 grid_id = qsub(
86 command,
87 context=self.context,
88 name=name,
89 deps=deps,
90 array=q_array,
91 stdout=log_dir,
92 stderr=log_dir,
93 **kwargs,
94 )
96 # get the result of qstat
97 status = qstat(grid_id, context=self.context)
99 # set the grid id of the job
100 job.queue(
101 new_job_id=int(status["job_number"]),
102 new_job_name=status["job_name"],
103 queue_name=self._queue(status),
104 )
106 logger.info(
107 "Submitted job '%s' with dependencies '%s' to the SGE grid."
108 % (job, str(deps))
109 )
111 if (
112 "io_big" in kwargs
113 and kwargs["io_big"]
114 and ("queue" not in kwargs or kwargs["queue"] == "all.q")
115 ):
116 logger.warn(
117 "This job will never be executed since the 'io_big' flag is not available for the 'all.q'."
118 )
119 if "pe_opt" in kwargs and (
120 "queue" not in kwargs
121 or kwargs["queue"] not in ("q1dm", "q_1day_mth", "q1wm", "q_1week_mth")
122 ):
123 logger.warn(
124 "This job will never be executed since the queue '%s' does not support multi-threading (pe_mth) -- use 'q1dm' or 'q1wm' instead."
125 % kwargs["queue"]
126 if "queue" in kwargs
127 else "all.q"
128 )
129 if (
130 "gpumem" in kwargs
131 and "queue" in kwargs
132 and kwargs["queue"] in ("gpu", "lgpu", "sgpu", "vsgpu")
133 and int(re.sub("\\D", "", kwargs["gpumem"])) > 24
134 ):
135 logger.warn(
136 "This job will never be executed since the GPU queue '%s' cannot have more than 24GB of memory."
137 % kwargs["queue"]
138 )
140 assert job.id == grid_id
141 return job.unique
143 def submit(
144 self,
145 command_line,
146 name=None,
147 array=None,
148 dependencies=[],
149 exec_dir=None,
150 log_dir="logs",
151 dry_run=False,
152 verbosity=0,
153 stop_on_failure=False,
154 **kwargs,
155 ):
156 """Submits a job that will be executed in the grid."""
157 # add job to database
158 self.lock()
159 job = add_job(
160 self.session,
161 command_line,
162 name,
163 dependencies,
164 array,
165 exec_dir=exec_dir,
166 log_dir=log_dir,
167 stop_on_failure=stop_on_failure,
168 context=self.context,
169 **kwargs,
170 )
171 logger.info("Added job '%s' to the database." % job)
172 if dry_run:
173 print("Would have added the Job")
174 print(job)
175 print(
176 "to the database to be executed in the grid with options:",
177 str(kwargs),
178 )
179 self.session.delete(job)
180 logger.info(
181 "Deleted job '%s' from the database due to dry-run option" % job
182 )
183 job_id = None
185 else:
186 job_id = self._submit_to_grid(
187 job, name, array, dependencies, log_dir, verbosity, **kwargs
188 )
190 self.session.commit()
191 self.unlock()
193 return job_id
195 def communicate(self, job_ids=None):
196 """Communicates with the SGE grid (using qstat) to see if jobs are
197 still running."""
198 self.lock()
199 # iterate over all jobs
200 jobs = self.get_jobs(job_ids)
201 for job in jobs:
202 job.refresh()
203 if (
204 job.status in ("queued", "executing", "waiting")
205 and job.queue_name != "local"
206 ):
207 status = qstat(job.id, context=self.context)
208 if len(status) == 0:
209 job.status = "failure"
210 job.result = 70 # ASCII: 'F'
211 logger.warn(
212 "The job '%s' was not executed successfully (maybe a time-out happened). Please check the log files."
213 % job
214 )
215 for array_job in job.array:
216 if array_job.status in ("queued", "executing"):
217 array_job.status = "failure"
218 array_job.result = 70 # ASCII: 'F'
220 self.session.commit()
221 self.unlock()
223 def resubmit(
224 self,
225 job_ids=None,
226 also_success=False,
227 running_jobs=False,
228 new_command=None,
229 verbosity=0,
230 keep_logs=False,
231 **kwargs,
232 ):
233 """Re-submit jobs automatically."""
234 self.lock()
235 # iterate over all jobs
236 jobs = self.get_jobs(job_ids)
237 if new_command is not None:
238 if len(jobs) == 1:
239 jobs[0].set_command_line(new_command)
240 else:
241 logger.warn("Ignoring new command since no single job id was specified")
242 accepted_old_status = (
243 ("submitted", "success", "failure")
244 if also_success
245 else (
246 "submitted",
247 "failure",
248 )
249 )
250 for job in jobs:
251 # check if this job needs re-submission
252 if running_jobs or job.status in accepted_old_status:
253 grid_status = qstat(job.id, context=self.context)
254 if len(grid_status) != 0:
255 logger.warn(
256 "Deleting job '%d' since it was still running in the grid."
257 % job.unique
258 )
259 qdel(job.id, context=self.context)
260 # re-submit job to the grid
261 arguments = job.get_arguments()
262 arguments.update(**kwargs)
263 if "queue" not in arguments or arguments["queue"] == "all.q":
264 for arg in ("hvmem", "pe_opt", "io_big"):
265 if arg in arguments:
266 del arguments[arg]
267 job.set_arguments(kwargs=arguments)
268 # delete old status and result of the job
269 if not keep_logs:
270 self.delete_logs(job)
271 job.submit()
272 if job.queue_name == "local" and "queue" not in arguments:
273 logger.warn(
274 "Re-submitting job '%s' locally (since no queue name is specified)."
275 % job
276 )
277 else:
278 deps = [dep.unique for dep in job.get_jobs_we_wait_for()]
279 logger.debug(
280 "Re-submitting job '%s' with dependencies '%s' to the grid."
281 % (job, deps)
282 )
283 self._submit_to_grid(
284 job,
285 job.name,
286 job.get_array(),
287 deps,
288 job.log_dir,
289 verbosity,
290 **arguments,
291 )
293 # commit after each job to avoid failures of not finding the job during execution in the grid
294 self.session.commit()
295 self.unlock()
297 def run_job(self, job_id, array_id=None):
298 """Overwrites the run-job command from the manager to extract the
299 correct job id before calling base class implementation."""
300 # get the unique job id from the given grid id
301 self.lock()
302 jobs = list(self.session.query(Job).filter(Job.id == job_id))
303 if len(jobs) != 1:
304 self.unlock()
305 raise ValueError("Could not find job id '%d' in the database'" % job_id)
306 job_id = jobs[0].unique
307 self.unlock()
308 # call base class implementation with the corrected job id
309 return JobManager.run_job(self, job_id, array_id)
311 def stop_jobs(self, job_ids):
312 """Stops the jobs in the grid."""
313 self.lock()
315 jobs = self.get_jobs(job_ids)
316 for job in jobs:
317 if job.status in ("executing", "queued", "waiting"):
318 qdel(job.id, context=self.context)
319 logger.info("Stopped job '%s' in the SGE grid." % job)
320 job.submit()
322 self.session.commit()
323 self.unlock()