Coverage for src/gridtk/script/jman.py: 82%

272 statements  

« prev     ^ index     » next       coverage.py v7.4.3, created at 2024-04-22 14:25 +0200

1# Copyright © 2022 Idiap Research Institute <contact@idiap.ch> 

2# 

3# SPDX-License-Identifier: GPL-3.0-or-later 

4"""A logging Idiap/SGE job manager.""" 

5 

6from __future__ import annotations 

7 

8__epilog__ = """ For a list of available commands: 

9 >>> %(prog)s --help 

10 

11 For a list of options for a particular command: 

12 >>> %(prog)s <command> --help 

13""" 

14 

15import argparse 

16import logging 

17import os 

18import string 

19import sys 

20 

21from .. import local, sge 

22from ..models import Status 

23 

24logger = logging.getLogger("gridtk") 

25 

26GPU_QUEUES = ["gpu", "lgpu", "sgpu", "gpum", "vsgpu"] 

27QUEUES = ["all.q", "q1d", "q1w", "q1m", "q1dm", "q1wm"] + GPU_QUEUES 

28 

29 

30def appropriate_for_gpu(args, kwargs): 

31 # don't set these for GPU processing or the maximum virtual memory will be 

32 # set on ulimit 

33 kwargs.pop("memfree", None) 

34 kwargs.pop("hvmem", None) 

35 

36 if args.memory is None: 

37 return 

38 

39 # if this is a GPU queue and args.memory is provided, we set gpumem flag 

40 # remove 'G' last character from the args.memory string 

41 if args.memory.isdigit(): 

42 kwargs["gpumem"] = args.memory # assign directly 

43 elif args.memory.endswith("G"): 

44 kwargs["gpumem"] = args.memory[:-1] # remove G at the end 

45 

46 

47def setup(args): 

48 """Returns the JobManager and sets up the basic infrastructure.""" 

49 kwargs = { 

50 "wrapper_script": args.wrapper_script, 

51 "debug": args.verbose == 3, 

52 "database": args.database, 

53 } 

54 

55 if args.local: 

56 jm = local.JobManagerLocal(**kwargs) 

57 else: 

58 jm = sge.JobManagerSGE(**kwargs) 

59 

60 # set-up logging 

61 if args.verbose not in range(0, 4): 

62 raise ValueError( 

63 "The verbosity level %d does not exist. Please reduce the number of '--verbose' parameters in your call to maximum 3" 

64 % args.verbose 

65 ) 

66 

67 # set up the verbosity level of the logging system 

68 log_level = { 

69 0: logging.ERROR, 

70 1: logging.WARNING, 

71 2: logging.INFO, 

72 3: logging.DEBUG, 

73 }[args.verbose] 

74 

75 handler = logging.StreamHandler() 

76 handler.setFormatter( 

77 logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") 

78 ) 

79 logger.addHandler(handler) 

80 logger.setLevel(log_level) 

81 

82 return jm 

83 

84 

85def get_array(array): 

86 if array is None: 

87 return None 

88 start = array.find("-") 

89 if start == -1: 

90 a = 1 

91 b = int(array) 

92 c = 1 

93 else: 

94 a = int(array[0:start]) 

95 step = array.find(":") 

96 if step == -1: 

97 b = int(array[start + 1 :]) 

98 c = 1 

99 else: 

100 b = int(array[start + 1 : step]) 

101 c = int(array[step + 1 :]) 

102 

103 return (a, b, c) 

104 

105 

106def get_ids(jobs): 

107 if jobs is None: 

108 return None 

109 indexes = [] 

110 for job in jobs: 

111 if "-" not in job and "+" not in job: 

112 index = int(job) 

113 indexes.append(index) 

114 # check if a range is specified 

115 elif "-" in job and "+" not in job: 

116 first, last = job.split("-", 1) 

117 indexes.extend(range(int(first), int(last) + 1)) 

118 # check if a plus sign is specified 

119 elif "+" in job and "-" not in job: 

120 first, add = job.split("+", 1) 

121 first, add = int(first), int(add) 

122 indexes.extend(range(first, first + add + 1)) 

123 return indexes 

124 

125 

126def get_memfree(memory, parallel): 

127 """Computes the memory required for the memfree field.""" 

128 number = int(memory.rstrip(string.ascii_letters)) 

129 memtype = memory.lstrip(string.digits) 

130 if not memtype: 

131 memtype = "G" 

132 return "%d%s" % (number * parallel, memtype) 

133 

134 

135def submit(args): 

136 """Submission command.""" 

137 # set full path to command 

138 if args.job[0] == "--": 

139 del args.job[0] 

140 if not os.path.isabs(args.job[0]): 

141 args.job[0] = os.path.abspath(args.job[0]) 

142 

143 jm = setup(args) 

144 kwargs = { 

145 "queue": args.qname, 

146 "cwd": True, 

147 "verbosity": args.verbose, 

148 "name": args.name, 

149 "env": args.env, 

150 "memfree": args.memory, 

151 "io_big": args.io_big, 

152 "sge_extra_args": args.sge_extra_args, 

153 } 

154 

155 if args.array is not None: 

156 kwargs["array"] = get_array(args.array) 

157 if args.exec_dir is not None: 

158 kwargs["exec_dir"] = args.exec_dir 

159 if args.log_dir is not None: 

160 kwargs["log_dir"] = args.log_dir 

161 if args.dependencies is not None: 

162 kwargs["dependencies"] = args.dependencies 

163 if args.qname != "all.q": 

164 kwargs["hvmem"] = args.memory 

165 if args.qname in GPU_QUEUES: 

166 appropriate_for_gpu(args, kwargs) 

167 if args.parallel is not None: 

168 kwargs["pe_opt"] = "pe_mth %d" % args.parallel 

169 if args.memory is not None: 

170 kwargs["memfree"] = get_memfree(args.memory, args.parallel) 

171 kwargs["dry_run"] = args.dry_run 

172 kwargs["stop_on_failure"] = args.stop_on_failure 

173 

174 # submit the job(s) 

175 for _ in range(args.repeat): 

176 job_id = jm.submit(args.job, **kwargs) 

177 dependencies = kwargs.get("dependencies", []) 

178 dependencies.append(job_id) 

179 kwargs["dependencies"] = dependencies 

180 

181 if args.print_id: 

182 print(job_id, end="") 

183 

184 

185def resubmit(args): 

186 """Re-submits the jobs with the given ids.""" 

187 jm = setup(args) 

188 

189 kwargs = {"cwd": True, "verbosity": args.verbose} 

190 if args.qname is not None: 

191 kwargs["queue"] = args.qname 

192 if args.memory is not None: 

193 kwargs["memfree"] = args.memory 

194 if args.qname not in (None, "all.q"): 

195 kwargs["hvmem"] = args.memory 

196 if args.qname in GPU_QUEUES: 

197 appropriate_for_gpu(args, kwargs) 

198 if args.parallel is not None: 

199 kwargs["pe_opt"] = "pe_mth %d" % args.parallel 

200 kwargs["memfree"] = get_memfree(args.memory, args.parallel) 

201 if args.io_big: 

202 kwargs["io_big"] = True 

203 if args.no_io_big: 

204 kwargs["io_big"] = False 

205 

206 jm.resubmit( 

207 get_ids(args.job_ids), 

208 args.also_success, 

209 args.running_jobs, 

210 args.overwrite_command, 

211 keep_logs=args.keep_logs, 

212 **kwargs, 

213 ) 

214 

215 

216def run_scheduler(args): 

217 """Runs the scheduler on the local machine. 

218 

219 To stop it, please use Ctrl-C. 

220 """ 

221 if not args.local: 

222 raise ValueError( 

223 "The execute command can only be used with the '--local' command line option" 

224 ) 

225 jm = setup(args) 

226 jm.run_scheduler( 

227 parallel_jobs=args.parallel, 

228 job_ids=get_ids(args.job_ids), 

229 sleep_time=args.sleep_time, 

230 die_when_finished=args.die_when_finished, 

231 no_log=args.no_log_files, 

232 nice=args.nice, 

233 verbosity=args.verbose, 

234 ) 

235 

236 

237def list(args): 

238 """Lists the jobs in the given database.""" 

239 jm = setup(args) 

240 

241 if not args.local: 

242 # update the status of jobs from SGE before listing them. 

243 jm.communicate(job_ids=get_ids(args.job_ids)) 

244 

245 jm.list( 

246 job_ids=get_ids(args.job_ids), 

247 print_array_jobs=args.print_array_jobs, 

248 print_dependencies=args.print_dependencies, 

249 status=args.status, 

250 long=args.long, 

251 print_times=args.print_times, 

252 ids_only=args.ids_only, 

253 names=args.names, 

254 ) 

255 

256 

257def communicate(args): 

258 """Uses qstat to get the status of the requested jobs.""" 

259 if args.local: 

260 raise ValueError( 

261 "The communicate command can only be used without the '--local' command line option" 

262 ) 

263 jm = setup(args) 

264 jm.communicate(job_ids=get_ids(args.job_ids)) 

265 

266 

267def report(args): 

268 """Reports the results of the finished (and unfinished) jobs.""" 

269 jm = setup(args) 

270 jm.report( 

271 job_ids=get_ids(args.job_ids), 

272 array_ids=get_ids(args.array_ids), 

273 output=not args.errors_only, 

274 error=not args.output_only, 

275 status=args.status, 

276 name=args.name, 

277 ) 

278 

279 

280def stop(args): 

281 """Stops (qdel's) the jobs with the given ids.""" 

282 if args.local: 

283 raise ValueError( 

284 "Stopping commands locally is not supported (please kill them yourself)" 

285 ) 

286 jm = setup(args) 

287 jm.stop_jobs(get_ids(args.job_ids)) 

288 

289 

290def delete(args): 

291 """Deletes the jobs from the job manager. 

292 

293 If the jobs are still running in the grid, they are stopped. 

294 """ 

295 jm = setup(args) 

296 

297 # first, stop the jobs if they are running in the grid 

298 if not args.local and "executing" in args.status: 

299 stop(args) 

300 

301 # then, delete them from the database 

302 jm.delete( 

303 job_ids=get_ids(args.job_ids), 

304 array_ids=get_ids(args.array_ids), 

305 delete_logs=not args.keep_logs, 

306 delete_log_dir=not args.keep_log_dir, 

307 status=args.status, 

308 ) 

309 

310 

311def run_job(args): 

312 """Starts the wrapper script to execute a job, interpreting the JOB_ID and 

313 SGE_TASK_ID keywords that are set by the grid or by us.""" 

314 jm = setup(args) 

315 job_id = int(os.environ["JOB_ID"]) 

316 array_id = ( 

317 int(os.environ["SGE_TASK_ID"]) 

318 if os.environ["SGE_TASK_ID"] != "undefined" 

319 else None 

320 ) 

321 jm.run_job(job_id, array_id) 

322 

323 

324class AliasedSubParsersAction(argparse._SubParsersAction): 

325 """Hack taken from https://gist.github.com/471779 to allow aliases in 

326 argparse for python 2.x (this has been implemented on python 3.2)""" 

327 

328 class _AliasedPseudoAction(argparse.Action): 

329 def __init__(self, name, aliases, help): 

330 dest = name 

331 if aliases: 

332 dest += " (%s)" % ",".join(aliases) 

333 sup = super() 

334 sup.__init__(option_strings=[], dest=dest, help=help) 

335 

336 def add_parser(self, name, **kwargs): 

337 if "aliases" in kwargs: 

338 aliases = kwargs["aliases"] 

339 del kwargs["aliases"] 

340 else: 

341 aliases = [] 

342 

343 parser = super().add_parser(name, **kwargs) 

344 

345 # Make the aliases work. 

346 for alias in aliases: 

347 self._name_parser_map[alias] = parser 

348 # Make the help text reflect them, first removing old help entry. 

349 if "help" in kwargs: 

350 help = kwargs.pop("help") 

351 self._choices_actions.pop() 

352 pseudo_action = self._AliasedPseudoAction(name, aliases, help) 

353 self._choices_actions.append(pseudo_action) 

354 

355 return parser 

356 

357 

358def main(command_line_options=None): 

359 from importlib.metadata import version 

360 

361 from clapper.rc import UserDefaults 

362 

363 from ..tools import USER_CONFIGURATION 

364 

365 defaults = UserDefaults(USER_CONFIGURATION) 

366 

367 formatter = argparse.ArgumentDefaultsHelpFormatter 

368 parser = argparse.ArgumentParser( 

369 description=__doc__, epilog=__epilog__, formatter_class=formatter 

370 ) 

371 # part of the hack to support aliases in subparsers 

372 parser.register("action", "parsers", AliasedSubParsersAction) 

373 

374 # general options 

375 parser.add_argument( 

376 "-v", 

377 "--verbose", 

378 action="count", 

379 default=0, 

380 help="Increase the verbosity level from 0 (only error messages) to 1 (warnings), 2 (log messages), 3 (debug information) by adding the --verbose option as often as desired (e.g. '-vvv' for debug).", 

381 ) 

382 parser.add_argument( 

383 "-V", 

384 "--version", 

385 action="version", 

386 version="gridtk version %s" % version(__name__.split(".", 1)[0]), 

387 ) 

388 parser.add_argument( 

389 "-d", 

390 "--database", 

391 "--db", 

392 metavar="DATABASE", 

393 default="submitted.sql3", 

394 help='replace the default database "submitted.sql3" by one provided by you.', 

395 ) 

396 

397 parser.add_argument( 

398 "-l", 

399 "--local", 

400 action="store_true", 

401 help="Uses the local job manager instead of the SGE one.", 

402 ) 

403 cmdparser = parser.add_subparsers( 

404 title="commands", help="commands accepted by %(prog)s" 

405 ) 

406 

407 # subcommand 'submit' 

408 submit_parser = cmdparser.add_parser( 

409 "submit", 

410 aliases=["sub"], 

411 formatter_class=formatter, 

412 help="Submits jobs to the SGE/Local job scheduler and logs them in a database.", 

413 ) 

414 submit_parser.add_argument( 

415 "-q", 

416 "--queue", 

417 metavar="QNAME", 

418 dest="qname", 

419 default="all.q", 

420 choices=QUEUES, 

421 help="the name of the SGE queue to submit the job to", 

422 ) 

423 submit_parser.add_argument( 

424 "-e", 

425 "--sge-extra-args", 

426 default=defaults.get("sge-extra-args-default", ""), 

427 type=str, 

428 help="Passes extra arguments to qsub. See the documentation of the package for usage and ways of overriding default behavior.", 

429 ) 

430 submit_parser.add_argument( 

431 "-m", 

432 "--memory", 

433 help="Sets both the h_vmem and the mem_free parameters when submitting " 

434 "the job to a non-GPU queue, e.g., 8G to set the memory " 

435 "requirements to 8 gigabytes. Sets gpumem parameter when " 

436 "submitting the job to a GPU-based queue.", 

437 ) 

438 submit_parser.add_argument( 

439 "-p", 

440 "--parallel", 

441 "--pe_mth", 

442 type=int, 

443 help="Sets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.", 

444 ) 

445 submit_parser.add_argument("-n", "--name", dest="name", help="Gives the job a name") 

446 submit_parser.add_argument( 

447 "-x", 

448 "--dependencies", 

449 type=int, 

450 default=[], 

451 metavar="ID", 

452 nargs="*", 

453 help="Set job dependencies to the list of job identifiers separated by spaces", 

454 ) 

455 submit_parser.add_argument( 

456 "-k", 

457 "--stop-on-failure", 

458 action="store_true", 

459 help="Stop depending jobs when this job finished with an error.", 

460 ) 

461 submit_parser.add_argument( 

462 "-d", 

463 "--exec-dir", 

464 metavar="DIR", 

465 help="Sets the executing directory, where the script should be executed. If not given, jobs will be executed in the current directory", 

466 ) 

467 submit_parser.add_argument( 

468 "-l", 

469 "--log-dir", 

470 default="logs", 

471 metavar="DIR", 

472 help="Sets the log directory.", 

473 ) 

474 submit_parser.add_argument( 

475 "-s", 

476 "--environment", 

477 metavar="KEY=VALUE", 

478 dest="env", 

479 nargs="*", 

480 default=[], 

481 help="Passes specific environment variables to the job.", 

482 ) 

483 submit_parser.add_argument( 

484 "-t", 

485 "--array", 

486 "--parametric", 

487 metavar="(first-)last(:step)", 

488 help="Creates a parametric (array) job. You must specify the 'last' value, but 'first' (default=1) and 'step' (default=1) can be specified as well (when specifying 'step', 'first' has to be given, too).", 

489 ) 

490 submit_parser.add_argument( 

491 "-z", 

492 "--dry-run", 

493 action="store_true", 

494 help="Do not really submit anything, just print out what would submit in this case", 

495 ) 

496 submit_parser.add_argument( 

497 "-i", 

498 "--io-big", 

499 action="store_true", 

500 help='Sets "io_big" on the submitted jobs so it limits the machines in which the job is submitted to those that can do high-throughput.', 

501 ) 

502 submit_parser.add_argument( 

503 "-r", 

504 "--repeat", 

505 type=int, 

506 metavar="N", 

507 default=1, 

508 help="Submits the job N times. Each job will depend on the job before.", 

509 ) 

510 submit_parser.add_argument( 

511 "-o", 

512 "--print-id", 

513 action="store_true", 

514 help="Prints the new job id (so that they can be parsed by automatic scripts).", 

515 ) 

516 submit_parser.add_argument( 

517 "job", 

518 metavar="command", 

519 nargs=argparse.REMAINDER, 

520 help="The job that should be executed. Sometimes a -- is required to separate the job from other command line options.", 

521 ) 

522 submit_parser.set_defaults(func=submit) 

523 

524 # subcommand 're-submit' 

525 resubmit_parser = cmdparser.add_parser( 

526 "resubmit", 

527 aliases=["reset", "requeue", "re"], 

528 formatter_class=formatter, 

529 help="Re-submits a list of jobs.", 

530 ) 

531 resubmit_parser.add_argument( 

532 "-j", 

533 "--job-ids", 

534 metavar="ID", 

535 nargs="+", 

536 help="Re-submit only the jobs with the given ids (by default, all finished jobs are re-submitted).", 

537 ) 

538 resubmit_parser.add_argument( 

539 "-q", 

540 "--queue", 

541 metavar="QNAME", 

542 dest="qname", 

543 choices=QUEUES, 

544 help="Reset the SGE queue to submit the job to", 

545 ) 

546 resubmit_parser.add_argument( 

547 "-m", 

548 "--memory", 

549 help="Resets both the h_vmem and the mem_free parameters when " 

550 "submitting the job to a non-GPU queue, e.g., 8G " 

551 "to set the memory requirements to 8 gigabytes. Resets gpumem " 

552 "parameter when submitting the job to a GPU-based queue.", 

553 ) 

554 resubmit_parser.add_argument( 

555 "-p", 

556 "--parallel", 

557 "--pe_mth", 

558 type=int, 

559 help="Resets the number of slots per job (-pe pe_mth) and multiplies the mem_free parameter. E.g. to get 16 G of memory, use -m 8G -p 2.", 

560 ) 

561 resubmit_parser.add_argument( 

562 "-i", 

563 "--io-big", 

564 action="store_true", 

565 help='Resubmits the job to the "io_big" queue.', 

566 ) 

567 resubmit_parser.add_argument( 

568 "-I", 

569 "--no-io-big", 

570 action="store_true", 

571 help='Resubmits the job NOT to the "io_big" queue.', 

572 ) 

573 resubmit_parser.add_argument( 

574 "-k", 

575 "--keep-logs", 

576 action="store_true", 

577 help="Do not clean the log files of the old job before re-submitting.", 

578 ) 

579 resubmit_parser.add_argument( 

580 "-s", 

581 "--also-success", 

582 action="store_true", 

583 help="Re-submit also jobs that have finished successfully.", 

584 ) 

585 resubmit_parser.add_argument( 

586 "-a", 

587 "--running-jobs", 

588 action="store_true", 

589 help="Re-submit even jobs that are running or waiting (use this flag with care).", 

590 ) 

591 resubmit_parser.add_argument( 

592 "-o", 

593 "--overwrite-command", 

594 nargs=argparse.REMAINDER, 

595 help="Overwrite the command line (of a single job) that should be executed (useful to keep job dependencies).", 

596 ) 

597 resubmit_parser.set_defaults(func=resubmit) 

598 

599 # subcommand 'stop' 

600 stop_parser = cmdparser.add_parser( 

601 "stop", 

602 formatter_class=formatter, 

603 help="Stops the execution of jobs in the grid.", 

604 ) 

605 stop_parser.add_argument( 

606 "-j", 

607 "--job-ids", 

608 metavar="ID", 

609 nargs="+", 

610 help="Stop only the jobs with the given ids (by default, all jobs are stopped).", 

611 ) 

612 stop_parser.set_defaults(func=stop) 

613 

614 # subcommand 'list' 

615 list_parser = cmdparser.add_parser( 

616 "list", 

617 aliases=["ls"], 

618 formatter_class=formatter, 

619 help="Lists jobs stored in the database. Use the -vv option to get a long listing.", 

620 ) 

621 list_parser.add_argument( 

622 "-j", 

623 "--job-ids", 

624 metavar="ID", 

625 nargs="+", 

626 help="List only the jobs with the given ids (by default, all jobs are listed)", 

627 ) 

628 list_parser.add_argument( 

629 "-n", 

630 "--names", 

631 metavar="NAME", 

632 nargs="+", 

633 help="List only the jobs with the given names (by default, all jobs are listed)", 

634 ) 

635 list_parser.add_argument( 

636 "-a", 

637 "--print-array-jobs", 

638 action="store_true", 

639 help="Also list the array ids.", 

640 ) 

641 list_parser.add_argument( 

642 "-l", 

643 "--long", 

644 action="store_true", 

645 help="Prints additional information about the submitted job.", 

646 ) 

647 list_parser.add_argument( 

648 "-t", 

649 "--print-times", 

650 action="store_true", 

651 help="Prints timing information on when jobs were submited, executed and finished", 

652 ) 

653 list_parser.add_argument( 

654 "-x", 

655 "--print-dependencies", 

656 action="store_true", 

657 help="Print the dependencies of the jobs as well.", 

658 ) 

659 list_parser.add_argument( 

660 "-o", 

661 "--ids-only", 

662 action="store_true", 

663 help="Prints ONLY the job ids (so that they can be parsed by automatic scripts).", 

664 ) 

665 list_parser.add_argument( 

666 "-s", 

667 "--status", 

668 nargs="+", 

669 choices=Status, 

670 default=Status, 

671 help="Delete only jobs that have the given statuses; by default all jobs are deleted.", 

672 ) 

673 list_parser.set_defaults(func=list) 

674 

675 # subcommand 'communicate' 

676 stop_parser = cmdparser.add_parser( 

677 "communicate", 

678 aliases=["com"], 

679 formatter_class=formatter, 

680 help="Communicates with the grid to see if there were unexpected errors (e.g. a timeout) during the job execution.", 

681 ) 

682 stop_parser.add_argument( 

683 "-j", 

684 "--job-ids", 

685 metavar="ID", 

686 nargs="+", 

687 help="Check only the jobs with the given ids (by default, all jobs are checked)", 

688 ) 

689 stop_parser.set_defaults(func=communicate) 

690 

691 # subcommand 'report' 

692 report_parser = cmdparser.add_parser( 

693 "report", 

694 aliases=["rep", "r", "explain", "why"], 

695 formatter_class=formatter, 

696 help="Iterates through the result and error log files and prints out the logs.", 

697 ) 

698 report_parser.add_argument( 

699 "-e", 

700 "--errors-only", 

701 action="store_true", 

702 help="Only report the error logs (by default, both logs are reported).", 

703 ) 

704 report_parser.add_argument( 

705 "-o", 

706 "--output-only", 

707 action="store_true", 

708 help="Only report the output logs (by default, both logs are reported).", 

709 ) 

710 report_parser.add_argument( 

711 "-j", 

712 "--job-ids", 

713 metavar="ID", 

714 nargs="+", 

715 help="Report only the jobs with the given ids (by default, all finished jobs are reported)", 

716 ) 

717 report_parser.add_argument( 

718 "-a", 

719 "--array-ids", 

720 metavar="ID", 

721 nargs="+", 

722 help="Report only the jobs with the given array ids. If specified, a single job-id must be given as well.", 

723 ) 

724 report_parser.add_argument( 

725 "-n", 

726 "--name", 

727 help="Report only the jobs with the given name; by default all jobs are reported.", 

728 ) 

729 report_parser.add_argument( 

730 "-s", 

731 "--status", 

732 nargs="+", 

733 choices=Status, 

734 default=Status, 

735 help="Report only jobs that have the given statuses; by default all jobs are reported.", 

736 ) 

737 report_parser.set_defaults(func=report) 

738 

739 # subcommand 'delete' 

740 delete_parser = cmdparser.add_parser( 

741 "delete", 

742 aliases=["del", "rm", "remove"], 

743 formatter_class=formatter, 

744 help="Removes jobs from the database; if jobs are running or are still scheduled in SGE, the jobs are also removed from the SGE queue.", 

745 ) 

746 delete_parser.add_argument( 

747 "-j", 

748 "--job-ids", 

749 metavar="ID", 

750 nargs="+", 

751 help="Delete only the jobs with the given ids (by default, all jobs are deleted).", 

752 ) 

753 delete_parser.add_argument( 

754 "-a", 

755 "--array-ids", 

756 metavar="ID", 

757 nargs="+", 

758 help="Delete only the jobs with the given array ids. If specified, a single job-id must be given as well. Note that the whole job including all array jobs will be removed from the SGE queue.", 

759 ) 

760 delete_parser.add_argument( 

761 "-r", 

762 "--keep-logs", 

763 action="store_true", 

764 help="If set, the log files will NOT be removed.", 

765 ) 

766 delete_parser.add_argument( 

767 "-R", 

768 "--keep-log-dir", 

769 action="store_true", 

770 help="When removing the logs, keep the log directory.", 

771 ) 

772 delete_parser.add_argument( 

773 "-s", 

774 "--status", 

775 nargs="+", 

776 choices=Status, 

777 default=Status, 

778 help="Delete only jobs that have the given statuses; by default all jobs are deleted.", 

779 ) 

780 delete_parser.set_defaults(func=delete) 

781 

782 # subcommand 'run_scheduler' 

783 scheduler_parser = cmdparser.add_parser( 

784 "run-scheduler", 

785 aliases=["sched", "x"], 

786 formatter_class=formatter, 

787 help="Runs the scheduler on the local machine. To stop the scheduler safely, please use Ctrl-C; only valid in combination with the '--local' option.", 

788 ) 

789 scheduler_parser.add_argument( 

790 "-p", 

791 "--parallel", 

792 type=int, 

793 default=1, 

794 help="Select the number of parallel jobs that you want to execute locally", 

795 ) 

796 scheduler_parser.add_argument( 

797 "-j", 

798 "--job-ids", 

799 metavar="ID", 

800 nargs="+", 

801 help="Select the job ids that should be run (be default, all submitted and queued jobs are run).", 

802 ) 

803 scheduler_parser.add_argument( 

804 "-s", 

805 "--sleep-time", 

806 type=float, 

807 default=0.1, 

808 help="Set the sleep time between for the scheduler in seconds.", 

809 ) 

810 scheduler_parser.add_argument( 

811 "-x", 

812 "--die-when-finished", 

813 action="store_true", 

814 help="Let the job manager die when it has finished all jobs of the database.", 

815 ) 

816 scheduler_parser.add_argument( 

817 "-l", 

818 "--no-log-files", 

819 action="store_true", 

820 help="Overwrites the log file setup to print the results to the console.", 

821 ) 

822 scheduler_parser.add_argument( 

823 "-n", 

824 "--nice", 

825 type=int, 

826 help="Jobs will be run with the given priority (can only be positive, i.e., to have lower priority", 

827 ) 

828 scheduler_parser.set_defaults(func=run_scheduler) 

829 

830 # subcommand 'run-job'; this should not be seen on the command line since it is actually a wrapper script 

831 run_parser = cmdparser.add_parser("run-job", help=argparse.SUPPRESS) 

832 run_parser.set_defaults(func=run_job) 

833 

834 if command_line_options: 

835 args = parser.parse_args(command_line_options[1:]) 

836 args.wrapper_script = command_line_options[0] 

837 else: 

838 args = parser.parse_args() 

839 args.wrapper_script = sys.argv[0] 

840 

841 if not hasattr(args, "func"): 

842 return parser.print_help(sys.stderr) 

843 

844 args.func(args) 

845 

846 return 0