Coverage for src/gridtk/script/jman.py: 82%

272 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-16 09:20 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""A logging Idiap/SGE job manager.""" 

5 

6from __future__ import annotations 

7 

8__epilog__ = """ For a list of available commands: 

9 >>> %(prog)s --help 

10 

11 For a list of options for a particular command: 

12 >>> %(prog)s <command> --help 

13""" 

14 

15import argparse 

16import logging 

17import os 

18import string 

19import sys 

20 

21from .. import local, sge 

22from ..models import Status 

23 

24logger = logging.getLogger("gridtk") 

25 

26GPU_QUEUES = ["gpu", "lgpu", "sgpu", "gpum", "vsgpu"] 

27QUEUES = ["all.q", "q1d", "q1w", "q1m", "q1dm", "q1wm"] + GPU_QUEUES 

28 

29 

30def appropriate_for_gpu(args, kwargs): 

31 # don't set these for GPU processing or the maximum virtual memory will be 

32 # set on ulimit 

33 kwargs.pop("memfree", None) 

34 kwargs.pop("hvmem", None) 

35 

36 if args.memory is None: 

37 return 

38 

39 # if this is a GPU queue and args.memory is provided, we set gpumem flag 

40 # remove 'G' last character from the args.memory string 

41 if args.memory.isdigit(): 

42 kwargs["gpumem"] = args.memory # assign directly 

43 elif args.memory.endswith("G"): 

44 kwargs["gpumem"] = args.memory[:-1] # remove G at the end 

45 

46 

47def setup(args): 

48 """Returns the JobManager and sets up the basic infrastructure.""" 

49 kwargs = { 

50 "wrapper_script": args.wrapper_script, 

51 "debug": args.verbose == 3, 

52 "database": args.database, 

53 } 

54 

55 if args.local: 

56 jm = local.JobManagerLocal(**kwargs) 

57 else: 

58 jm = sge.JobManagerSGE(**kwargs) 

59 

60 # set-up logging 

61 if args.verbose not in range(0, 4): 

62 raise ValueError( 

63 "The verbosity level %d does not exist. Please reduce the number of '--verbose' parameters in your call to maximum 3" 

64 % args.verbose 

65 ) 

66 

67 # set up the verbosity level of the logging system 

68 log_level = { 

69 0: logging.ERROR, 

70 1: logging.WARNING, 

71 2: logging.INFO, 

72 3: logging.DEBUG, 

73 }[args.verbose] 

74 

75 handler = logging.StreamHandler() 

76 handler.setFormatter( 

77 logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") 

78 ) 

79 logger.addHandler(handler) 

80 logger.setLevel(log_level) 

81 

82 return jm 

83 

84 

85def get_array(array): 

86 if array is None: 

87 return None 

88 start = array.find("-") 

89 if start == -1: 

90 a = 1 

91 b = int(array) 

92 c = 1 

93 else: 

94 a = int(array[0:start]) 

95 step = array.find(":") 

96 if step == -1: 

97 b = int(array[start + 1 :]) 

98 c = 1 

99 else: 

100 b = int(array[start + 1 : step]) 

101 c = int(array[step + 1 :]) 

102 

103 return (a, b, c) 

104 

105 

106def get_ids(jobs): 

107 if jobs is None: 

108 return None 

109 indexes = [] 

110 for job in jobs: 

111 if "-" not in job and "+" not in job: 

112 index = int(job) 

113 indexes.append(index) 

114 # check if a range is specified 

115 elif "-" in job and "+" not in job: 

116 first, last = job.split("-", 1) 

117 indexes.extend(range(int(first), int(last) + 1)) 

118 # check if a plus sign is specified 

119 elif "+" in job and "-" not in job: 

120 first, add = job.split("+", 1) 

121 first, add = int(first), int(add) 

122 indexes.extend(range(first, first + add + 1)) 

123 return indexes 

124 

125 

126def get_memfree(memory, parallel): 

127 """Computes the memory required for the memfree field.""" 

128 number = int(memory.rstrip(string.ascii_letters)) 

129 memtype = memory.lstrip(string.digits) 

130 if not memtype: 

131 memtype = "G" 

132 return "%d%s" % (number * parallel, memtype) 

133 

134 

135def submit(args): 

136 """Submission command.""" 

137 # set full path to command 

138 if args.job[0] == "--": 

139 del args.job[0] 

140 if not os.path.isabs(args.job[0]): 

141 args.job[0] = os.path.abspath(args.job[0]) 

142 

143 jm = setup(args) 

144 kwargs = { 

145 "queue": args.qname, 

146 "cwd": True, 

147 "verbosity": args.verbose, 

148 "name": args.name, 

149 "env": args.env, 

150 "memfree": args.memory, 

151 "io_big": args.io_big, 

152 "sge_extra_args": args.sge_extra_args, 

153 } 

154 

155 if args.array is not None: 

156 kwargs["array"] = get_array(args.array) 

157 if args.exec_dir is not None: 

158 kwargs["exec_dir"] = args.exec_dir 

159 if args.log_dir is not None: 

160 kwargs["log_dir"] = args.log_dir 

161 if args.dependencies is not None: 

162 kwargs["dependencies"] = args.dependencies 

163 if args.qname != "all.q": 

164 kwargs["hvmem"] = args.memory 

165 if args.qname in GPU_QUEUES: 

166 appropriate_for_gpu(args, kwargs) 

167 if args.parallel is not None: 

168 kwargs["pe_opt"] = "pe_mth %d" % args.parallel 

169 if args.memory is not None: 

170 kwargs["memfree"] = get_memfree(args.memory, args.parallel) 

171 kwargs["dry_run"] = args.dry_run 

172 kwargs["stop_on_failure"] = args.stop_on_failure 

173 

174 # submit the job(s) 

175 for _ in range(args.repeat): 

176 job_id = jm.submit(args.job, **kwargs) 

177 dependencies = kwargs.get("dependencies", []) 

178 dependencies.append(job_id) 

179 kwargs["dependencies"] = dependencies 

180 

181 if args.print_id: 

182 print(job_id, end="") 

183 

184 

185def resubmit(args): 

186 """Re-submits the jobs with the given ids.""" 

187 jm = setup(args) 

188 

189 kwargs = {"cwd": True, "verbosity": args.verbose} 

190 if args.qname is not None: 

191 kwargs["queue"] = args.qname 

192 if args.memory is not None: 

193 kwargs["memfree"] = args.memory 

194 if args.qname not in (None, "all.q"): 

195 kwargs["hvmem"] = args.memory 

196 if args.qname in GPU_QUEUES: 

197 appropriate_for_gpu(args, kwargs) 

198 if args.parallel is not None: 

199 kwargs["pe_opt"] = "pe_mth %d" % args.parallel 

200 kwargs["memfree"] = get_memfree(args.memory, args.parallel) 

201 if args.io_big: 

202 kwargs["io_big"] = True 

203 if args.no_io_big: 

204 kwargs["io_big"] = False 

205 

206 jm.resubmit( 

207 get_ids(args.job_ids), 

208 args.also_success, 

209 args.running_jobs, 

210 args.overwrite_command, 

211 keep_logs=args.keep_logs, 

212 **kwargs, 

213 ) 

214 

215 

216def run_scheduler(args): 

217 """Runs the scheduler on the local machine. 

218 

219 To stop it, please use Ctrl-C. 

220 """ 

221 if not args.local: 

222 raise ValueError( 

223 "The execute command can only be used with the '--local' command line option" 

224 ) 

225 jm = setup(args) 

226 jm.run_scheduler( 

227 parallel_jobs=args.parallel, 

228 job_ids=get_ids(args.job_ids), 

229 sleep_time=args.sleep_time, 

230 die_when_finished=args.die_when_finished, 

231 no_log=args.no_log_files, 

232 nice=args.nice, 

233 verbosity=args.verbose, 

234 ) 

235 

236 

237def list(args): 

238 """Lists the jobs in the given database.""" 

239 jm = setup(args) 

240 

241 if not args.local: 

242 # update the status of jobs from SGE before listing them. 

243 jm.communicate(job_ids=get_ids(args.job_ids)) 

244 

245 jm.list( 

246 job_ids=get_ids(args.job_ids), 

247 print_array_jobs=args.print_array_jobs, 

248 print_dependencies=args.print_dependencies, 

249 status=args.status, 

250 long=args.long, 

251 print_times=args.print_times, 

252 ids_only=args.ids_only, 

253 names=args.names, 

254 ) 

255 

256 

257def communicate(args): 

258 """Uses qstat to get the status of the requested jobs.""" 

259 if args.local: 

260 raise ValueError( 

261 "The communicate command can only be used without the '--local' command line option" 

262 ) 

263 jm = setup(args) 

264 jm.communicate(job_ids=get_ids(args.job_ids)) 

265 

266 

267def report(args): 

268 """Reports the results of the finished (and unfinished) jobs.""" 

269 jm = setup(args) 

270 jm.report( 

271 job_ids=get_ids(args.job_ids), 

272 array_ids=get_ids(args.array_ids), 

273 output=not args.errors_only, 

274 error=not args.output_only, 

275 status=args.status, 

276 name=args.name, 

277 ) 

278 

279 

280def stop(args): 

281 """Stops (qdel's) the jobs with the given ids.""" 

282 if args.local: 

283 raise ValueError( 

284 "Stopping commands locally is not supported (please kill them yourself)" 

285 ) 

286 jm = setup(args) 

287 jm.stop_jobs(get_ids(args.job_ids)) 

288 

289 

290def delete(args): 

291 """Deletes the jobs from the job manager. 

292 

293 If the jobs are still running in the grid, they are stopped. 

294 """ 

295 jm = setup(args) 

296 

297 # first, stop the jobs if they are running in the grid 

298 if not args.local and "executing" in args.status: 

299 stop(args) 

300 

301 # then, delete them from the database 

302 jm.delete( 

303 job_ids=get_ids(args.job_ids), 

304 array_ids=get_ids(args.array_ids), 

305 delete_logs=not args.keep_logs, 

306 delete_log_dir=not args.keep_log_dir, 

307 status=args.status, 

308 ) 

309 

310 

311def run_job(args): 

312 """Starts the wrapper script to execute a job, interpreting the JOB_ID and 

313 SGE_TASK_ID keywords that are set by the grid or by us.""" 

314 jm = setup(args) 

315 job_id = int(os.environ["JOB_ID"]) 

316 array_id = ( 

317 int(os.environ["SGE_TASK_ID"]) 

318 if os.environ["SGE_TASK_ID"] != "undefined" 

319 else None 

320 ) 

321 jm.run_job(job_id, array_id) 

322 

323 

324class AliasedSubParsersAction(argparse._SubParsersAction): 

325 """Hack taken from https://gist.github.com/471779 to allow aliases in 

326 argparse for python 2.x (this has been implemented on python 3.2)""" 

327 

328 class _AliasedPseudoAction(argparse.Action): 

329 def __init__(self, name, aliases, help): 

330 dest = name 

331 if aliases: 

332 dest += " (%s)" % ",".join(aliases) 

333 sup = super() 

334 sup.__init__(option_strings=[], dest=dest, help=help) 

335 

336 def add_parser(self, name, **kwargs): 

337 if "aliases" in kwargs: 

338 aliases = kwargs["aliases"] 

339 del kwargs["aliases"] 

340 else: 

341 aliases = [] 

342 

343 parser = super().add_parser(name, **kwargs) 

344 

345 # Make the aliases work. 

346 for alias in aliases: 

347 self._name_parser_map[alias] = parser 

348 # Make the help text reflect them, first removing old help entry. 

349 if "help" in kwargs: 

350 help = kwargs.pop("help") 

351 self._choices_actions.pop() 

352 pseudo_action = self._AliasedPseudoAction(name, aliases, help) 

353 self._choices_actions.append(pseudo_action) 

354 

355 return parser 

356 

357 

358def main(command_line_options=None): 

359 from importlib.metadata import version 

360 

361 from clapper.rc import UserDefaults 

362 

363 from ..tools import USER_CONFIGURATION 

364 

365 defaults = UserDefaults(USER_CONFIGURATION) 

366 

367 formatter = argparse.ArgumentDefaultsHelpFormatter 

368 parser = argparse.ArgumentParser( 

369 description=__doc__, epilog=__epilog__, formatter_class=formatter 

370 ) 

371 # part of the hack to support aliases in subparsers 

372 parser.register("action", "parsers", AliasedSubParsersAction) 

373 

374 # general options 

375 parser.add_argument( 

376 "-v", 

377 "--verbose", 

378 action="count", 

379 default=0, 

380 help="Increase the verbosity level from 0 (only error messages) to 1 (warnings), 2 (log messages), 3 (debug information) by adding the --verbose option as often as desired (e.g. '-vvv' for debug).", 

381 ) 

382 parser.add_argument( 

383 "-V", 

384 "--version", 

385 action="version", 

386 version="gridtk version %s" % version(__name__.split(".", 1)[0]), 

387 ) 

388 parser.add_argument( 

389 "-d", 

390 "--database", 

391 "--db", 

392 metavar="DATABASE", 

393 default="submitted.sql3", 

394 help='replace the default database "submitted.sql3" by one provided by you.', 

395 ) 

396 

397 parser.add_argument( 

398 "-l", 

399 "--local", 

400 action="store_true", 

401 help="Uses the local job manager instead of the SGE one.", 

402 ) 

403 cmdparser = parser.add_subparsers( 

404 title="commands", help="commands accepted by %(prog)s" 

405 ) 

406 

407 # subcommand 'submit' 

408 submit_parser = cmdparser.add_parser( 

409 "submit", 

410 aliases=["sub"], 

411 formatter_class=formatter, 

412 help="Submits jobs to the SGE/Local job scheduler and logs them in a database.", 

413 ) 

414 submit_parser.add_argument( 

415 "-q", 

416 "--queue", 

417 metavar="QNAME", 

418 dest="qname", 

419 default="all.q", 

420 choices=QUEUES, 

421 help="the name of the SGE queue to submit the job to", 

422 ) 

423 submit_parser.add_argument( 

424 "-e", 

425 "--sge-extra-args", 

426 default=defaults.get("sge-extra-args-default", ""), 

427 type=str, 

428 help="Passes extra arguments to qsub. See the documentation of the package for usage and ways of overriding default behavior.", 

429 ) 

430 submit_parser.add_argument( 

431 "-m", 

432 "--memory", 

433 help="Sets both the h_vmem and the mem_free parameters when submitting " 

434 "the job to a non-GPU queue, e.g., 8G to set the memory " 

435 "requirements to 8 gigabytes. Sets gpumem parameter when " 

436 "submitting the job to a GPU-based queue.", 

437 ) 

438 submit_parser.add_argument( 

439 "-p", 

440 "--parallel", 

441 "--pe_mth", 

442 type=int, 

443 help="Sets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.", 

444 ) 

445 submit_parser.add_argument( 

446 "-n", "--name", dest="name", help="Gives the job a name" 

447 ) 

448 submit_parser.add_argument( 

449 "-x", 

450 "--dependencies", 

451 type=int, 

452 default=[], 

453 metavar="ID", 

454 nargs="*", 

455 help="Set job dependencies to the list of job identifiers separated by spaces", 

456 ) 

457 submit_parser.add_argument( 

458 "-k", 

459 "--stop-on-failure", 

460 action="store_true", 

461 help="Stop depending jobs when this job finished with an error.", 

462 ) 

463 submit_parser.add_argument( 

464 "-d", 

465 "--exec-dir", 

466 metavar="DIR", 

467 help="Sets the executing directory, where the script should be executed. If not given, jobs will be executed in the current directory", 

468 ) 

469 submit_parser.add_argument( 

470 "-l", 

471 "--log-dir", 

472 default="logs", 

473 metavar="DIR", 

474 help="Sets the log directory.", 

475 ) 

476 submit_parser.add_argument( 

477 "-s", 

478 "--environment", 

479 metavar="KEY=VALUE", 

480 dest="env", 

481 nargs="*", 

482 default=[], 

483 help="Passes specific environment variables to the job.", 

484 ) 

485 submit_parser.add_argument( 

486 "-t", 

487 "--array", 

488 "--parametric", 

489 metavar="(first-)last(:step)", 

490 help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).", 

491 ) 

492 submit_parser.add_argument( 

493 "-z", 

494 "--dry-run", 

495 action="store_true", 

496 help="Do not really submit anything, just print out what would submit in this case", 

497 ) 

498 submit_parser.add_argument( 

499 "-i", 

500 "--io-big", 

501 action="store_true", 

502 help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput.', 

503 ) 

504 submit_parser.add_argument( 

505 "-r", 

506 "--repeat", 

507 type=int, 

508 metavar="N", 

509 default=1, 

510 help="Submits the job N times. Each job will depend on the job before.", 

511 ) 

512 submit_parser.add_argument( 

513 "-o", 

514 "--print-id", 

515 action="store_true", 

516 help="Prints the new job id (so that they can be parsed by automatic scripts).", 

517 ) 

518 submit_parser.add_argument( 

519 "job", 

520 metavar="command", 

521 nargs=argparse.REMAINDER, 

522 help="The job that should be executed. Sometimes a -- is required to separate the job from other command line options.", 

523 ) 

524 submit_parser.set_defaults(func=submit) 

525 

526 # subcommand 're-submit' 

527 resubmit_parser = cmdparser.add_parser( 

528 "resubmit", 

529 aliases=["reset", "requeue", "re"], 

530 formatter_class=formatter, 

531 help="Re-submits a list of jobs.", 

532 ) 

533 resubmit_parser.add_argument( 

534 "-j", 

535 "--job-ids", 

536 metavar="ID", 

537 nargs="+", 

538 help="Re-submit only the jobs with the given ids (by default, all finished jobs are re-submitted).", 

539 ) 

540 resubmit_parser.add_argument( 

541 "-q", 

542 "--queue", 

543 metavar="QNAME", 

544 dest="qname", 

545 choices=QUEUES, 

546 help="Reset the SGE queue to submit the job to", 

547 ) 

548 resubmit_parser.add_argument( 

549 "-m", 

550 "--memory", 

551 help="Resets both the h_vmem and the mem_free parameters when " 

552 "submitting the job to a non-GPU queue, e.g., 8G " 

553 "to set the memory requirements to 8 gigabytes. Resets gpumem " 

554 "parameter when submitting the job to a GPU-based queue.", 

555 ) 

556 resubmit_parser.add_argument( 

557 "-p", 

558 "--parallel", 

559 "--pe_mth", 

560 type=int, 

561 help="Resets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.", 

562 ) 

563 resubmit_parser.add_argument( 

564 "-i", 

565 "--io-big", 

566 action="store_true", 

567 help='Resubmits the job to the "io_big" queue.', 

568 ) 

569 resubmit_parser.add_argument( 

570 "-I", 

571 "--no-io-big", 

572 action="store_true", 

573 help='Resubmits the job NOT to the "io_big" queue.', 

574 ) 

575 resubmit_parser.add_argument( 

576 "-k", 

577 "--keep-logs", 

578 action="store_true", 

579 help="Do not clean the log files of the old job before re-submitting.", 

580 ) 

581 resubmit_parser.add_argument( 

582 "-s", 

583 "--also-success", 

584 action="store_true", 

585 help="Re-submit also jobs that have finished successfully.", 

586 ) 

587 resubmit_parser.add_argument( 

588 "-a", 

589 "--running-jobs", 

590 action="store_true", 

591 help="Re-submit even jobs that are running or waiting (use this flag with care).", 

592 ) 

593 resubmit_parser.add_argument( 

594 "-o", 

595 "--overwrite-command", 

596 nargs=argparse.REMAINDER, 

597 help="Overwrite the command line (of a single job) that should be executed (useful to keep job dependencies).", 

598 ) 

599 resubmit_parser.set_defaults(func=resubmit) 

600 

601 # subcommand 'stop' 

602 stop_parser = cmdparser.add_parser( 

603 "stop", 

604 formatter_class=formatter, 

605 help="Stops the execution of jobs in the grid.", 

606 ) 

607 stop_parser.add_argument( 

608 "-j", 

609 "--job-ids", 

610 metavar="ID", 

611 nargs="+", 

612 help="Stop only the jobs with the given ids (by default, all jobs are stopped).", 

613 ) 

614 stop_parser.set_defaults(func=stop) 

615 

616 # subcommand 'list' 

617 list_parser = cmdparser.add_parser( 

618 "list", 

619 aliases=["ls"], 

620 formatter_class=formatter, 

621 help="Lists jobs stored in the database. Use the -vv option to get a long listing.", 

622 ) 

623 list_parser.add_argument( 

624 "-j", 

625 "--job-ids", 

626 metavar="ID", 

627 nargs="+", 

628 help="List only the jobs with the given ids (by default, all jobs are listed)", 

629 ) 

630 list_parser.add_argument( 

631 "-n", 

632 "--names", 

633 metavar="NAME", 

634 nargs="+", 

635 help="List only the jobs with the given names (by default, all jobs are listed)", 

636 ) 

637 list_parser.add_argument( 

638 "-a", 

639 "--print-array-jobs", 

640 action="store_true", 

641 help="Also list the array ids.", 

642 ) 

643 list_parser.add_argument( 

644 "-l", 

645 "--long", 

646 action="store_true", 

647 help="Prints additional information about the submitted job.", 

648 ) 

649 list_parser.add_argument( 

650 "-t", 

651 "--print-times", 

652 action="store_true", 

653 help="Prints timing information on when jobs were submited, executed and finished", 

654 ) 

655 list_parser.add_argument( 

656 "-x", 

657 "--print-dependencies", 

658 action="store_true", 

659 help="Print the dependencies of the jobs as well.", 

660 ) 

661 list_parser.add_argument( 

662 "-o", 

663 "--ids-only", 

664 action="store_true", 

665 help="Prints ONLY the job ids (so that they can be parsed by automatic scripts).", 

666 ) 

667 list_parser.add_argument( 

668 "-s", 

669 "--status", 

670 nargs="+", 

671 choices=Status, 

672 default=Status, 

673 help="Delete only jobs that have the given statuses; by default all jobs are deleted.", 

674 ) 

675 list_parser.set_defaults(func=list) 

676 

677 # subcommand 'communicate' 

678 stop_parser = cmdparser.add_parser( 

679 "communicate", 

680 aliases=["com"], 

681 formatter_class=formatter, 

682 help="Communicates with the grid to see if there were unexpected errors (e.g. a timeout) during the job execution.", 

683 ) 

684 stop_parser.add_argument( 

685 "-j", 

686 "--job-ids", 

687 metavar="ID", 

688 nargs="+", 

689 help="Check only the jobs with the given ids (by default, all jobs are checked)", 

690 ) 

691 stop_parser.set_defaults(func=communicate) 

692 

693 # subcommand 'report' 

694 report_parser = cmdparser.add_parser( 

695 "report", 

696 aliases=["rep", "r", "explain", "why"], 

697 formatter_class=formatter, 

698 help="Iterates through the result and error log files and prints out the logs.", 

699 ) 

700 report_parser.add_argument( 

701 "-e", 

702 "--errors-only", 

703 action="store_true", 

704 help="Only report the error logs (by default, both logs are reported).", 

705 ) 

706 report_parser.add_argument( 

707 "-o", 

708 "--output-only", 

709 action="store_true", 

710 help="Only report the output logs (by default, both logs are reported).", 

711 ) 

712 report_parser.add_argument( 

713 "-j", 

714 "--job-ids", 

715 metavar="ID", 

716 nargs="+", 

717 help="Report only the jobs with the given ids (by default, all finished jobs are reported)", 

718 ) 

719 report_parser.add_argument( 

720 "-a", 

721 "--array-ids", 

722 metavar="ID", 

723 nargs="+", 

724 help="Report only the jobs with the given array ids. If specified, a single job-id must be given as well.", 

725 ) 

726 report_parser.add_argument( 

727 "-n", 

728 "--name", 

729 help="Report only the jobs with the given name; by default all jobs are reported.", 

730 ) 

731 report_parser.add_argument( 

732 "-s", 

733 "--status", 

734 nargs="+", 

735 choices=Status, 

736 default=Status, 

737 help="Report only jobs that have the given statuses; by default all jobs are reported.", 

738 ) 

739 report_parser.set_defaults(func=report) 

740 

741 # subcommand 'delete' 

742 delete_parser = cmdparser.add_parser( 

743 "delete", 

744 aliases=["del", "rm", "remove"], 

745 formatter_class=formatter, 

746 help="Removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue.", 

747 ) 

748 delete_parser.add_argument( 

749 "-j", 

750 "--job-ids", 

751 metavar="ID", 

752 nargs="+", 

753 help="Delete only the jobs with the given ids (by default, all jobs are deleted).", 

754 ) 

755 delete_parser.add_argument( 

756 "-a", 

757 "--array-ids", 

758 metavar="ID", 

759 nargs="+", 

760 help="Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.", 

761 ) 

762 delete_parser.add_argument( 

763 "-r", 

764 "--keep-logs", 

765 action="store_true", 

766 help="If set, the log files will NOT be removed.", 

767 ) 

768 delete_parser.add_argument( 

769 "-R", 

770 "--keep-log-dir", 

771 action="store_true", 

772 help="When removing the logs, keep the log directory.", 

773 ) 

774 delete_parser.add_argument( 

775 "-s", 

776 "--status", 

777 nargs="+", 

778 choices=Status, 

779 default=Status, 

780 help="Delete only jobs that have the given statuses; by default all jobs are deleted.", 

781 ) 

782 delete_parser.set_defaults(func=delete) 

783 

784 # subcommand 'run_scheduler' 

785 scheduler_parser = cmdparser.add_parser( 

786 "run-scheduler", 

787 aliases=["sched", "x"], 

788 formatter_class=formatter, 

789 help="Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the '--local' option.", 

790 ) 

791 scheduler_parser.add_argument( 

792 "-p", 

793 "--parallel", 

794 type=int, 

795 default=1, 

796 help="Select the number of parallel jobs that you want to execute locally", 

797 ) 

798 scheduler_parser.add_argument( 

799 "-j", 

800 "--job-ids", 

801 metavar="ID", 

802 nargs="+", 

803 help="Select the job ids that should be run (be default, all submitted and queued jobs are run).", 

804 ) 

805 scheduler_parser.add_argument( 

806 "-s", 

807 "--sleep-time", 

808 type=float, 

809 default=0.1, 

810 help="Set the sleep time between for the scheduler in seconds.", 

811 ) 

812 scheduler_parser.add_argument( 

813 "-x", 

814 "--die-when-finished", 

815 action="store_true", 

816 help="Let the job manager die when it has finished all jobs of the database.", 

817 ) 

818 scheduler_parser.add_argument( 

819 "-l", 

820 "--no-log-files", 

821 action="store_true", 

822 help="Overwrites the log file setup to print the results to the console.", 

823 ) 

824 scheduler_parser.add_argument( 

825 "-n", 

826 "--nice", 

827 type=int, 

828 help="Jobs will be run with the given priority (can only be positive, i.e., to have lower priority", 

829 ) 

830 scheduler_parser.set_defaults(func=run_scheduler) 

831 

832 # subcommand 'run-job'; this should not be seen on the command line since it is actually a wrapper script 

833 run_parser = cmdparser.add_parser("run-job", help=argparse.SUPPRESS) 

834 run_parser.set_defaults(func=run_job) 

835 

836 if command_line_options: 

837 args = parser.parse_args(command_line_options[1:]) 

838 args.wrapper_script = command_line_options[0] 

839 else: 

840 args = parser.parse_args() 

841 args.wrapper_script = sys.argv[0] 

842 

843 if not hasattr(args, "func"): 

844 return parser.print_help(sys.stderr) 

845 

846 args.func(args) 

847 

848 return 0