Coverage for src/gridtk/local.py: 73%
175 statements
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
« prev ^ index » next coverage.py v7.4.3, created at 2024-04-22 14:25 +0200
1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch>
2#
3# SPDX-License-Identifier: GPL-3.0-or-later
4"""Defines the job manager which can help you managing submitted grid jobs."""
6import copy
7import logging
8import os
9import subprocess
10import sys
11import time
13from .manager import JobManager
14from .models import add_job
16logger = logging.getLogger(__name__)
19class JobManagerLocal(JobManager):
20 """Manages jobs run in parallel on the local machine."""
22 def __init__(self, **kwargs):
23 """Initializes this object with a state file and a method for
24 qsub'bing.
26 Keyword parameters:
28 statefile
29 The file containing a valid status database for the manager. If
30 the file
31 does not exist it is initialized. If it exists, it is loaded.
32 """
33 JobManager.__init__(self, **kwargs)
35 def submit(
36 self,
37 command_line,
38 name=None,
39 array=None,
40 dependencies=[],
41 exec_dir=None,
42 log_dir=None,
43 dry_run=False,
44 stop_on_failure=False,
45 **kwargs,
46 ):
47 """Submits a job that will be executed on the local machine during a
48 call to "run".
50 All kwargs will simply be ignored.
51 """
52 # remove duplicate dependencies
53 dependencies = sorted(list(set(dependencies)))
55 # add job to database
56 self.lock()
57 job = add_job(
58 self.session,
59 command_line=command_line,
60 name=name,
61 dependencies=dependencies,
62 array=array,
63 exec_dir=exec_dir,
64 log_dir=log_dir,
65 stop_on_failure=stop_on_failure,
66 )
67 logger.info("Added job '%s' to the database", job)
69 if dry_run:
70 print(
71 "Would have added the Job",
72 job,
73 "to the database to be executed locally.",
74 )
75 self.session.delete(job)
76 logger.info("Deleted job '%s' from the database due to dry-run option", job)
77 job_id = None
78 else:
79 job_id = job.unique
81 # return the new job id
82 self.unlock()
83 return job_id
85 def resubmit(
86 self,
87 job_ids=None,
88 also_success=False,
89 running_jobs=False,
90 new_command=None,
91 keep_logs=False,
92 **kwargs,
93 ):
94 """Re-submit jobs automatically."""
95 self.lock()
96 # iterate over all jobs
97 jobs = self.get_jobs(job_ids)
98 if new_command is not None:
99 if len(jobs) == 1:
100 jobs[0].set_command_line(new_command)
101 else:
102 logger.warn("Ignoring new command since no single job id was specified")
103 accepted_old_status = (
104 ("submitted", "success", "failure")
105 if also_success
106 else (
107 "submitted",
108 "failure",
109 )
110 )
111 for job in jobs:
112 # check if this job needs re-submission
113 if running_jobs or job.status in accepted_old_status:
114 if job.queue_name != "local" and job.status == "executing":
115 logger.error(
116 "Cannot re-submit job '%s' locally since it is still running in the grid. Use 'jman stop' to stop it's execution!",
117 job,
118 )
119 else:
120 # re-submit job to the grid
121 logger.info("Re-submitted job '%s' to the database", job)
122 if not keep_logs:
123 self.delete_logs(job)
124 job.submit("local")
126 self.session.commit()
127 self.unlock()
129 def stop_jobs(self, job_ids=None):
130 """Resets the status of the job to 'submitted' when they are labeled as
131 'executing'."""
132 self.lock()
134 jobs = self.get_jobs(job_ids)
135 for job in jobs:
136 if (
137 job.status in ("executing", "queued", "waiting")
138 and job.queue_name == "local"
139 ):
140 logger.info(
141 "Reset job '%s' (%s) in the database",
142 job.name,
143 self._format_log(job.id),
144 )
145 job.submit()
147 self.session.commit()
148 self.unlock()
150 def stop_job(self, job_id, array_id=None):
151 """Resets the status of the given to 'submitted' when they are labeled
152 as 'executing'."""
153 self.lock()
155 job, array_job = self._job_and_array(job_id, array_id)
156 if job is not None:
157 if job.status in ("executing", "queued", "waiting"):
158 logger.info(
159 "Reset job '%s' (%s) in the database",
160 job.name,
161 self._format_log(job.id),
162 )
163 job.status = "submitted"
165 if array_job is not None and array_job.status in (
166 "executing",
167 "queued",
168 "waiting",
169 ):
170 logger.debug("Reset array job '%s' in the database", array_job)
171 array_job.status = "submitted"
172 if array_job is None:
173 for array_job in job.array:
174 if array_job.status in ("executing", "queued", "waiting"):
175 logger.debug("Reset array job '%s' in the database", array_job)
176 array_job.status = "submitted"
178 self.session.commit()
179 self.unlock()
181 ############################################################
182 # Methods to run the jobs in parallel on the local machine #
183 ############################################################
185 def _run_parallel_job(
186 self, job_id, array_id=None, no_log=False, nice=None, verbosity=0
187 ):
188 """Executes the code for this job on the local machine."""
189 environ = copy.deepcopy(os.environ)
190 environ["JOB_ID"] = str(job_id)
191 if array_id:
192 environ["SGE_TASK_ID"] = str(array_id)
193 else:
194 environ["SGE_TASK_ID"] = "undefined"
196 # generate call to the wrapper script
197 command = [
198 self.wrapper_script,
199 "-l%sd" % ("v" * verbosity),
200 self._database,
201 "run-job",
202 ]
204 if nice is not None:
205 command = ["nice", "-n%d" % nice] + command
207 job, array_job = self._job_and_array(job_id, array_id)
208 if job is None:
209 # rare case: job was deleted before starting
210 return None
212 logger.info(
213 "Starting execution of Job '%s' (%s)",
214 job.name,
215 self._format_log(job_id, array_id, len(job.array)),
216 )
217 # create log files
218 if no_log or job.log_dir is None:
219 out, err = sys.stdout, sys.stderr
220 else:
221 os.makedirs(job.log_dir, exist_ok=True)
222 # create line-buffered files for writing output and error status
223 if array_job is not None:
224 out, err = open(array_job.std_out_file(), "w", 1), open(
225 array_job.std_err_file(), "w", 1
226 )
227 else:
228 out, err = open(job.std_out_file(), "w", 1), open(
229 job.std_err_file(), "w", 1
230 )
232 # return the subprocess pipe to the process
233 try:
234 return subprocess.Popen(
235 command, env=environ, stdout=out, stderr=err, bufsize=1
236 )
237 except OSError as e:
238 logger.error(
239 "Could not execute job '%s' (%s) locally\n- reason:\t%s\n- command line:\t%s\n- directory:\t%s\n- command:\t%s",
240 job.name,
241 self._format_log(job_id, array_id, len(job.array)),
242 e,
243 " ".join(job.get_command_line()),
244 "." if job.exec_dir is None else job.exec_dir,
245 " ".join(command),
246 )
247 job.finish(117, array_id) # ASCII 'O'
248 return None
250 def _format_log(self, job_id, array_id=None, array_count=0):
251 return (
252 ("%d (%d/%d)" % (job_id, array_id, array_count))
253 if array_id is not None and array_count
254 else ("%d (%d)" % (job_id, array_id))
255 if array_id is not None
256 else ("%d" % job_id)
257 )
259 def run_scheduler(
260 self,
261 parallel_jobs=1,
262 job_ids=None,
263 sleep_time=0.1,
264 die_when_finished=False,
265 no_log=False,
266 nice=None,
267 verbosity=0,
268 ):
269 """Starts the scheduler, which is constantly checking for jobs that
270 should be ran."""
271 running_tasks = []
272 finished_tasks = set()
273 try:
274 # keep the scheduler alive until every job is finished or the KeyboardInterrupt is caught
275 while True:
276 # Flag that might be set in some rare cases, and that prevents the scheduler to die
277 repeat_execution = False
278 # FIRST, try if there are finished processes
279 for task_index in range(len(running_tasks) - 1, -1, -1):
280 task = running_tasks[task_index]
281 process = task[0]
283 if process.poll() is not None:
284 # process ended
285 job_id = task[1]
286 array_id = task[2] if len(task) > 2 else None
287 self.lock()
288 job, array_job = self._job_and_array(job_id, array_id)
289 if job is not None:
290 jj = array_job if array_job is not None else job
291 result = (
292 "%s (%d)" % (jj.status, jj.result)
293 if jj.result is not None
294 else "%s (?)" % jj.status
295 )
296 if jj.status not in ("success", "failure"):
297 logger.error(
298 "Job '%s' (%s) finished with status '%s' instead of 'success' or 'failure'. Usually this means an internal error. Check your wrapper_script parameter!",
299 job.name,
300 self._format_log(job_id, array_id),
301 jj.status,
302 )
303 raise StopIteration("Job did not finish correctly.")
304 logger.info(
305 "Job '%s' (%s) finished execution with result '%s'",
306 job.name,
307 self._format_log(job_id, array_id),
308 result,
309 )
310 self.unlock()
311 finished_tasks.add(job_id)
312 # in any case, remove the job from the list
313 del running_tasks[task_index]
315 # SECOND, check if new jobs can be submitted; THIS NEEDS TO LOCK THE DATABASE
316 if len(running_tasks) < parallel_jobs:
317 # get all unfinished jobs:
318 self.lock()
319 jobs = self.get_jobs(job_ids)
320 # put all new jobs into the queue
321 for job in jobs:
322 if job.status == "submitted" and job.queue_name == "local":
323 job.queue()
325 # get all unfinished jobs that are submitted to the local queue
326 unfinished_jobs = [
327 job
328 for job in jobs
329 if job.status in ("queued", "executing")
330 and job.queue_name == "local"
331 ]
332 for job in unfinished_jobs:
333 if job.array:
334 # find array jobs that can run
335 queued_array_jobs = [
336 array_job
337 for array_job in job.array
338 if array_job.status == "queued"
339 ]
340 if not len(queued_array_jobs):
341 job.finish(0, -1)
342 repeat_execution = True
343 else:
344 # there are new array jobs to run
345 for i in range(
346 min(
347 parallel_jobs - len(running_tasks),
348 len(queued_array_jobs),
349 )
350 ):
351 array_job = queued_array_jobs[i]
352 # start a new job from the array
353 process = self._run_parallel_job(
354 job.unique,
355 array_job.id,
356 no_log=no_log,
357 nice=nice,
358 verbosity=verbosity,
359 )
360 if process is None:
361 continue
362 running_tasks.append(
363 (process, job.unique, array_job.id)
364 )
365 # we here set the status to executing manually to avoid jobs to be run twice
366 # e.g., if the loop is executed while the asynchronous job did not start yet
367 array_job.status = "executing"
368 job.status = "executing"
369 if len(running_tasks) == parallel_jobs:
370 break
371 else:
372 if job.status == "queued":
373 # start a new job
374 process = self._run_parallel_job(
375 job.unique,
376 no_log=no_log,
377 nice=nice,
378 verbosity=verbosity,
379 )
380 if process is None:
381 continue
382 running_tasks.append((process, job.unique))
383 # we here set the status to executing manually to avoid jobs to be run twice
384 # e.g., if the loop is executed while the asynchronous job did not start yet
385 job.status = "executing"
386 if len(running_tasks) == parallel_jobs:
387 break
389 self.session.commit()
390 self.unlock()
392 # if after the submission of jobs there are no jobs running, we should have finished all the queue.
393 if (
394 die_when_finished
395 and not repeat_execution
396 and len(running_tasks) == 0
397 ):
398 logger.info(
399 "Stopping task scheduler since there are no more jobs running."
400 )
401 break
403 # THIRD: sleep the desired amount of time before re-checking
404 time.sleep(sleep_time)
406 # This is the only way to stop: you have to interrupt the scheduler
407 except (KeyboardInterrupt, StopIteration):
408 if hasattr(self, "session"):
409 self.unlock()
410 logger.info("Stopping task scheduler due to user interrupt.")
411 for task in running_tasks:
412 logger.warn(
413 "Killing job '%s' that was still running.",
414 self._format_log(task[1], task[2] if len(task) > 2 else None),
415 )
416 try:
417 task[0].kill()
418 except OSError as e:
419 logger.error(
420 "Killing job '%s' was not successful: '%s'",
421 self._format_log(task[1], task[2] if len(task) > 2 else None),
422 e,
423 )
424 self.stop_job(task[1])
425 # stop all jobs that are currently running or queued
426 self.stop_jobs(job_ids)
428 # check the result of the jobs that we have run, and return the list of failed jobs
429 self.lock()
430 jobs = self.get_jobs(finished_tasks)
431 failures = [job.unique for job in jobs if job.status != "success"]
432 self.unlock()
433 return sorted(failures)