Source code for bob.extension.scripts.click_helper

from ..log import set_verbosity_level
from ..config import load, mod_to_context, resource_keys
import time
import click
import logging
import traceback

# This needs to be bob so that logger is configured for all bob packages.
logger = logging.getLogger('bob')
try:
  basestring
except NameError:
  basestring = str


[docs]def bool_option(name, short_name, desc, dflt=False, **kwargs): '''Generic provider for boolean options Parameters ---------- name : str name of the option short_name : str short name for the option desc : str short description for the option dflt : bool or None Default value **kwargs All kwargs are passed to click.option. Returns ------- callable A decorator to be used for adding this option. ''' def custom_bool_option(func): def callback(ctx, param, value): ctx.meta[name.replace('-', '_')] = value return value return click.option( '-%s/-n%s' % (short_name, short_name), '--%s/--no-%s' % (name, name), default=dflt, help=desc, show_default=True, callback=callback, is_eager=True, **kwargs)(func) return custom_bool_option
[docs]def list_float_option(name, short_name, desc, nitems=None, dflt=None, **kwargs): '''Get option to get a list of float f Parameters ---------- name : str name of the option short_name : str short name for the option desc : str short description for the option nitems : obj:`int`, optional If given, the parsed list must contains this number of items. dflt : :any:`list`, optional List of default values for axes. **kwargs All kwargs are passed to click.option. Returns ------- callable A decorator to be used for adding this option. ''' def custom_list_float_option(func): def callback(ctx, param, value): if value is None or not value.replace(' ', ''): value = None elif value is not None: tmp = value.split(',') if nitems is not None and len(tmp) != nitems: raise click.BadParameter( '%s Must provide %d axis limits' % (name, nitems) ) try: value = [float(i) for i in tmp] except Exception: raise click.BadParameter('Inputs of %s be floats' % name) ctx.meta[name.replace('-', '_')] = value return value return click.option( '-' + short_name, '--' + name, default=dflt, show_default=True, help=desc + ' Provide just a space (\' \') to cancel default values.', callback=callback, **kwargs)(func) return custom_list_float_option
[docs]def open_file_mode_option(**kwargs): '''Get open mode file option Parameters ---------- **kwargs All kwargs are passed to click.option. Returns ------- callable A decorator to be used for adding this option. ''' def custom_open_file_mode_option(func): def callback(ctx, param, value): if value not in ['w', 'a', 'w+', 'a+']: raise click.BadParameter('Incorrect open file mode') ctx.meta['open_mode'] = value return value return click.option( '-om', '--open-mode', default='w', help='File open mode', callback=callback, **kwargs)(func) return custom_open_file_mode_option
[docs]def verbosity_option(**kwargs): """Adds a -v/--verbose option to a click command. Parameters ---------- **kwargs All kwargs are passed to click.option. Returns ------- callable A decorator to be used for adding this option. """ def custom_verbosity_option(f): def callback(ctx, param, value): ctx.meta['verbosity'] = value set_verbosity_level(logger, value) logger.debug("Logging of the `bob' logger was set to %d", value) return value return click.option( '-v', '--verbose', count=True, help="Increase the verbosity level from 0 (only error messages) to 1 " "(warnings), 2 (log messages), 3 (debug information) by adding the " "--verbose option as often as desired (e.g. '-vvv' for debug).", callback=callback, **kwargs)(f) return custom_verbosity_option
[docs]class ConfigCommand(click.Command): """A click.Command that can take options both form command line options and configuration files. In order to use this class, you have to use the :any:`ResourceOption` class also. Attributes ---------- config_argument_name : str The name of the config argument. entry_point_group : str The name of entry point that will be used to load the config files. """ def __init__(self, name, context_settings=None, callback=None, params=None, help=None, epilog=None, short_help=None, options_metavar='[OPTIONS]', add_help_option=True, entry_point_group=None, config_argument_name='CONFIG', **kwargs): self.config_argument_name = config_argument_name self.entry_point_group = entry_point_group # Augment help for the config file argument self.extra_help = '''\n\nIt is possible to pass one or several Python files (or names of ``{entry_point_group}`` entry points or module names) as {CONFIG} arguments to the command line which contain the parameters listed below as Python variables. The options through the command-line (see below) will override the values of configuration files. You can run this command with ``<COMMAND> -H example_config.py`` to create a template config file.'''.format(CONFIG=config_argument_name, entry_point_group=entry_point_group) help = (help or '').rstrip() + self.extra_help # kwargs['help'] = help click.Command.__init__( self, name, context_settings=context_settings, callback=callback, params=params, help=help, epilog=epilog, short_help=short_help, options_metavar=options_metavar, add_help_option=add_help_option, **kwargs) # Add the config argument to the command click.argument(config_argument_name, nargs=-1)(self) # Option for config file generation click.option('-H', '--dump-config', type=click.File(mode='wt'), help="Name of the config file to be generated")(self)
[docs] def is_resource(self, param, ctx): """Checks if the param is an option and is also in the current context.""" return (param.name in ctx.params and param.name != 'dump_config' and isinstance(param, click.Option))
[docs] def invoke(self, ctx): dump_file = ctx.params.get('dump_config') if dump_file is not None: click.echo("Configuration file '{}' was written; exiting".format( dump_file.name)) return self.dump_config(ctx) config_files = ctx.params[self.config_argument_name.lower()] # load and normalize context from config files config_context = load( config_files, entry_point_group=self.entry_point_group) config_context = mod_to_context(config_context) for param in self.params: if not self.is_resource(param, ctx): continue value = ctx.params[param.name] if not hasattr(param, 'user_provided'): if value == param.default: param.user_provided = False else: param.user_provided = True if not param.user_provided and param.name in config_context: ctx.params[param.name] = param.full_process_value( ctx, config_context[param.name]) # raise exceptions if the value is required. if hasattr(param, 'real_required'): param.required = param.real_required try: ctx.params[param.name] = param.full_process_value( ctx, ctx.params[param.name]) finally: # make sure to set this back to False for future invocations param.required = False return super(ConfigCommand, self).invoke(ctx)
[docs] def dump_config(self, ctx): """Generate configuration file from parameters and context Parameters ---------- ctx : object Click context """ config_file = ctx.params['dump_config'] logger.debug("Generating configuration file `%s'...", config_file) config_file.write("'''") config_file.write('Configuration file automatically generated at ' '%s\n%s\n' % (time.strftime("%d/%m/%Y"), ctx.command_path)) if self.help: h = self.help.replace(self.extra_help, '').replace('\b\n', '') config_file.write('\n{}'.format(h.rstrip())) if self.epilog: config_file.write('\n\n{}'.format(self.epilog.replace('\b\n', ''))) config_file.write("'''\n") for param in self.params: if not self.is_resource(param, ctx): continue config_file.write('\n# %s = %s\n' % (param.name, str(ctx.params[param.name]))) config_file.write("'''") if param.required or (isinstance(param, ResourceOption) and param.real_required): begin, dflt = 'Required parameter', '' else: begin, dflt = 'Optional parameter', ' [default: {}]'.format( param.default) config_file.write( "%s: %s (%s)%s" % ( begin, param.name, ', '.join(param.opts), dflt)) if param.help is not None: config_file.write("\n%s" % param.help) if isinstance(param, ResourceOption) and \ param.entry_point_group is not None: config_file.write("\nRegistered entries are: {}".format( resource_keys(param.entry_point_group))) config_file.write("'''\n")
[docs]class ResourceOption(click.Option): """A click.Option that is aware if the user actually provided this option through command-line or it holds a default value. The option can also be a resource that will be automatically loaded. Attributes ---------- entry_point_group : str or None If provided, the strings values to this option are assumed to be entry points from ``entry_point_group`` that need to be loaded. real_required : bool Holds the real value of ``required`` here. The ``required`` value is hidden from click since the option may be loaded later through the configuration files. user_provided : bool True if the user actually provided this option through command-line or using environment variables. """ def __init__(self, param_decls=None, show_default=False, prompt=False, confirmation_prompt=False, hide_input=False, is_flag=None, flag_value=None, multiple=False, count=False, allow_from_autoenv=True, type=None, help=None, entry_point_group=None, required=False, **kwargs): self.entry_point_group = entry_point_group self.real_required = required kwargs['required'] = False if entry_point_group is not None: name, _, _ = self._parse_decls(param_decls, kwargs.get('expose_value')) help = help or '' help += ( ' Can be a ``{entry_point_group}`` entry point, a module name, or ' 'a path to a Python file which contains a variable named `{name}`.') help = help.format(entry_point_group=entry_point_group, name=name) click.Option.__init__( self, param_decls=param_decls, show_default=show_default, prompt=prompt, confirmation_prompt=confirmation_prompt, hide_input=hide_input, is_flag=is_flag, flag_value=flag_value, multiple=multiple, count=count, allow_from_autoenv=allow_from_autoenv, type=type, help=help, **kwargs)
[docs] def consume_value(self, ctx, opts): value = opts.get(self.name) self.user_provided = True if value is None: value = ctx.lookup_default(self.name) self.user_provided = False if value is None: value = self.value_from_envvar(ctx) if value is not None: self.user_provided = True return value
[docs] def full_process_value(self, ctx, value): value = super(ResourceOption, self).full_process_value(ctx, value) if self.entry_point_group is not None: attribute_name = self.entry_point_group.split('.')[-1] while isinstance(value, basestring): value = load([value], entry_point_group=self.entry_point_group, attribute_name=attribute_name) return value
[docs]class AliasedGroup(click.Group): ''' Class that handles prefix aliasing for commands Basically just implements get_command that is used by click to choose the comamnd based on the name. Example ------- To enable prefix aliasing of commands for a given group, just set ``cls=AliasedGroup`` parameter in click.group decorator. '''
[docs] def get_command(self, ctx, cmd_name): rv = click.Group.get_command(self, ctx, cmd_name) if rv is not None: return rv matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)] if not matches: return None elif len(matches) == 1: return click.Group.get_command(self, ctx, matches[0]) ctx.fail('Too many matches: %s' % ', '.join(sorted(matches)))
[docs]def log_parameters(logger_handle, ignore=tuple()): """Logs the click parameters with the logging module. Parameters ---------- logger_handle : object The logger handle to write debug information into. ignore : tuple The keys in ignore will not be logged. """ ctx = click.get_current_context() # do not sort the ctx.params dict. The insertion order is kept in Python 3 # and is useful (but not necessary so works on Python 2 too). for k, v in ctx.params.items(): if k in ignore: continue logger_handle.debug('%s: %s', k, v)
[docs]def assert_click_runner_result(result, exit_code=0): """Helper for asserting click runner results""" m = ("Click command exited with code `{}' and exception:\n{}" "\nThe output was:\n{}") exception = 'None' if result.exc_info is None else \ ''.join(traceback.format_exception(*result.exc_info)) m = m.format(result.exit_code, exception, result.output) assert result.exit_code == exit_code, m if exit_code == 0: assert not result.exception, m