Source code for bob.extension.scripts.click_helper

import logging
import textwrap
import time
import traceback

import click

from click.core import ParameterSource

from ..config import load, mod_to_context, resource_keys
from ..log import set_verbosity_level

logger = logging.getLogger(__name__)


[docs]def bool_option(name, short_name, desc, dflt=False, **kwargs): """Generic provider for boolean options Parameters ---------- name : str name of the option short_name : str short name for the option desc : str short description for the option dflt : bool or None Default value **kwargs All kwargs are passed to click.option. Returns ------- ``callable`` A decorator to be used for adding this option. """ def custom_bool_option(func): def callback(ctx, param, value): ctx.meta[name.replace("-", "_")] = value return value return click.option( "-%s/-n%s" % (short_name, short_name), "--%s/--no-%s" % (name, name), default=dflt, help=desc, show_default=True, callback=callback, is_eager=True, **kwargs, )(func) return custom_bool_option
[docs]def list_float_option(name, short_name, desc, nitems=None, dflt=None, **kwargs): """Get option to get a list of float f Parameters ---------- name : str name of the option short_name : str short name for the option desc : str short description for the option nitems : obj:`int`, optional If given, the parsed list must contains this number of items. dflt : :any:`list`, optional List of default values for axes. **kwargs All kwargs are passed to click.option. Returns ------- ``callable`` A decorator to be used for adding this option. """ def custom_list_float_option(func): def callback(ctx, param, value): if value is None or not value.replace(" ", ""): value = None elif value is not None: tmp = value.split(",") if nitems is not None and len(tmp) != nitems: raise click.BadParameter( "%s Must provide %d axis limits" % (name, nitems) ) try: value = [float(i) for i in tmp] except Exception: raise click.BadParameter("Inputs of %s be floats" % name) ctx.meta[name.replace("-", "_")] = value return value return click.option( "-" + short_name, "--" + name, default=dflt, show_default=True, help=desc + " Provide just a space (' ') to cancel default values.", callback=callback, **kwargs, )(func) return custom_list_float_option
[docs]def open_file_mode_option(**kwargs): """Get open mode file option Parameters ---------- **kwargs All kwargs are passed to click.option. Returns ------- ``callable`` A decorator to be used for adding this option. """ def custom_open_file_mode_option(func): def callback(ctx, param, value): if value not in ["w", "a", "w+", "a+"]: raise click.BadParameter("Incorrect open file mode") ctx.meta["open_mode"] = value return value return click.option( "-om", "--open-mode", default="w", help="File open mode", callback=callback, **kwargs, )(func) return custom_open_file_mode_option
[docs]def verbosity_option(**kwargs): """Adds a -v/--verbose option to a click command. Parameters ---------- **kwargs All kwargs are passed to click.option. Returns ------- ``callable`` A decorator to be used for adding this option. """ def custom_verbosity_option(f): def callback(ctx, param, value): ctx.meta["verbosity"] = value # This needs to be bob so that logger is configured for all bob packages. logger = logging.getLogger("bob") set_verbosity_level(logger, value) logger.debug("Logging of the `bob' logger was set to %d", value) return value return click.option( "-v", "--verbose", count=True, help="Increase the verbosity level from 0 (only error messages) to 1 " "(warnings), 2 (log messages), 3 (debug information) by adding the " "--verbose option as often as desired (e.g. '-vvv' for debug).", callback=callback, **kwargs, )(f) return custom_verbosity_option
def _prepare_entry_points(entry_point_group): if not entry_point_group: return "" ret = "" for prj_name, prj_entry_points in resource_keys( entry_point_group, with_project_names=True ).items(): ret += f"\n\n**{prj_name}** entry points are: " ret += ", ".join(prj_entry_points) # wrap ret to 80 chars ret = "\n".join( textwrap.wrap(ret, 80, break_on_hyphens=False, replace_whitespace=False) ) return ret
[docs]class ConfigCommand(click.Command): """A click.Command that can take options both form command line options and configuration files. In order to use this class, you **have to** use the :any:`ResourceOption` class also. Attributes ---------- config_argument_name : str The name of the config argument. entry_point_group : str The name of entry point that will be used to load the config files. """ def __init__( self, name, *args, help=None, entry_point_group=None, **kwargs ): self.entry_point_group = entry_point_group configs_argument_name = "CONFIG" # Augment help for the config file argument self.extra_help = """\n\nIt is possible to pass one or several Python files (or names of ``{entry_point_group}`` entry points or module names i.e. import paths) as {CONFIG} arguments to this command line which contain the parameters listed below as Python variables. Available entry points are: {entry_points} \nThe options through the command-line (see below) will override the values of argument provided configuration files. You can run this command with ``<COMMAND> -H example_config.py`` to create a template config file.""".format( CONFIG=configs_argument_name, entry_point_group=entry_point_group, entry_points=_prepare_entry_points(entry_point_group), ) help = (help or "").rstrip() + self.extra_help super().__init__(name, *args, help=help, **kwargs) # Add the config argument to the command def configs_argument_callback(ctx, param, value): config_context = load( value, entry_point_group=self.entry_point_group ) config_context = mod_to_context(config_context) ctx.config_context = config_context logger.debug("Augmenting context with config context") return value click.argument( configs_argument_name, nargs=-1, callback=configs_argument_callback, is_eager=True, )(self) # Option for config file generation click.option( "-H", "--dump-config", type=click.File(mode="wt"), help="Name of the config file to be generated", is_eager=True, callback=self.dump_config, )(self)
[docs] def dump_config(self, ctx, param, value): """Generate configuration file from parameters and context Parameters ---------- ctx : object Click context """ config_file = value if config_file is None: return logger.debug("Generating configuration file `%s'...", config_file) config_file.write("'''") config_file.write( "Configuration file automatically generated at " "%s\n%s\n" % (time.strftime("%d/%m/%Y"), ctx.command_path) ) if self.help: h = self.help.replace("\b\n", "") config_file.write("\n{}".format(h.rstrip())) if self.epilog: config_file.write("\n\n{}".format(self.epilog.replace("\b\n", ""))) config_file.write("'''\n") for param in self.params: if not isinstance(param, ResourceOption): continue config_file.write( "\n# %s = %s\n" % (param.name, str(param.default)) ) config_file.write("'''") if param.required: begin, dflt = "Required parameter", "" else: begin, dflt = ( "Optional parameter", " [default: {}]".format(param.default), ) config_file.write( "%s: %s (%s)%s" % (begin, param.name, ", ".join(param.opts), dflt) ) if param.help is not None: config_file.write( "\n%s" % "\n".join( textwrap.wrap( param.help, 80, break_on_hyphens=False, replace_whitespace=False, ) ) ) config_file.write("'''\n") click.echo( "Configuration file '{}' was written; exiting".format( config_file.name ) ) config_file.close() ctx.exit()
[docs]class CustomParamType(click.ParamType): name = "custom"
[docs]class ResourceOption(click.Option): """An extended click.Option that automatically loads resources from config files. This class comes with two different functionalities that are independent and could be combined: 1. If used in commands that are inherited from :any:`ConfigCommand`, it will lookup inside the config files (that are provided as argument to the command) to resolve its value. Values given explicitly in the command line take precedence. 2. If `entry_point_group` is provided, it will treat values given to it (by any means) as resources to be loaded. Loading is done using :any:`load`. See :ref:`bob.extension.config.resource` for more information. The final value cannot be a string. You may use this class in three ways: 1. Using this class (without using :any:`ConfigCommand`) AND (providing `entry_point_group`). 2. Using this class (with :any:`ConfigCommand`) AND (providing `entry_point_group`). 3. Using this class (with :any:`ConfigCommand`) AND (without providing `entry_point_group`). Using this class without :any:`ConfigCommand` and without providing `entry_point_group` does nothing and is not allowed. Attributes ---------- entry_point_group : str or None If provided, the strings values to this option are assumed to be entry points from ``entry_point_group`` that need to be loaded. string_exceptions : tuple or None If provided and ``entry_point_group`` is provided, the code will not treat strings in ``string_exceptions`` as entry points and does not try to load them. """ def __init__( self, param_decls=None, show_default=False, prompt=False, confirmation_prompt=False, hide_input=False, is_flag=None, flag_value=None, multiple=False, count=False, allow_from_autoenv=True, type=None, help=None, entry_point_group=None, required=False, string_exceptions=None, **kwargs, ): # if no type, default, count, or is_flag is given, do not convert values to strings if ( (type is None) and (kwargs.get("default") is None) and (count is False) and (is_flag is None) ): type = CustomParamType() self.entry_point_group = entry_point_group if entry_point_group is not None: name, _, _ = self._parse_decls( param_decls, kwargs.get("expose_value") ) help = help or "" help += ( " Can be a ``{entry_point_group}`` entry point, a module name, or " "a path to a Python file which contains a variable named `{name}`. " "Available entry points are: {entry_points}" ) help = help.format( entry_point_group=entry_point_group, entry_points=_prepare_entry_points(entry_point_group), name=name, ) super().__init__( param_decls=param_decls, show_default=show_default, prompt=prompt, confirmation_prompt=confirmation_prompt, hide_input=hide_input, is_flag=is_flag, flag_value=flag_value, multiple=multiple, count=count, allow_from_autoenv=allow_from_autoenv, type=type, help=help, required=required, **kwargs, ) self.string_exceptions = string_exceptions or []
[docs] def consume_value(self, ctx, opts): if ( not hasattr(ctx, "config_context") ) and self.entry_point_group is None: raise TypeError( "The ResourceOption class is not meant to be used this way. " "Please see the docs of the class." ) logger.debug("consuming resource option for %s", self.name) value = opts.get(self.name) source = ParameterSource.COMMANDLINE # if value is not given from command line, lookup the config files given as # arguments (not options). if value is None: # if this class is used with the ConfigCommand class. This is not always # true. if hasattr(ctx, "config_context"): value = ctx.config_context.get(self.name) # if not from config files, lookup the environment variables if value is None: value = self.value_from_envvar(ctx) source = ParameterSource.ENVIRONMENT # if not from environment variables, lookup the default value if value is None: value = ctx.lookup_default(self.name) source = ParameterSource.DEFAULT_MAP if value is None: value = self.get_default(ctx) source = ParameterSource.DEFAULT return value, source
[docs] def type_cast_value(self, ctx, value): """Convert and validate a value against the option's ``type``, ``multiple``, and ``nargs``. Furthermore, if the an entry_point_group is provided, it will load it. """ value = super().type_cast_value(ctx, value) # if the value is a string and an entry_point_group is provided, load it if self.entry_point_group is not None: while ( isinstance(value, str) and value not in self.string_exceptions ): value = load( [value], entry_point_group=self.entry_point_group, attribute_name=self.name, ) return value
[docs]class AliasedGroup(click.Group): """Class that handles prefix aliasing for commands Basically just implements get_command that is used by click to choose the command based on the name. Example ------- To enable prefix aliasing of commands for a given group, just set ``cls=AliasedGroup`` parameter in click.group decorator. """
[docs] def get_command(self, ctx, cmd_name): """get_command with prefix aliasing""" rv = click.Group.get_command(self, ctx, cmd_name) if rv is not None: return rv matches = [x for x in self.list_commands(ctx) if x.startswith(cmd_name)] if not matches: return None elif len(matches) == 1: return click.Group.get_command(self, ctx, matches[0]) ctx.fail("Too many matches: %s" % ", ".join(sorted(matches)))
[docs]def log_parameters(logger_handle, ignore=tuple()): """Logs the click parameters with the logging module. Parameters ---------- logger_handle : object The logger handle to write debug information into. ignore : tuple The keys in ignore will not be logged. """ ctx = click.get_current_context() # do not sort the ctx.params dict. The insertion order is kept in Python 3 # and is useful (but not necessary so works on Python 2 too). for k, v in ctx.params.items(): if k in ignore: continue logger_handle.debug("%s: %s", k, v)
[docs]def assert_click_runner_result(result, exit_code=0, exception_type=None): """Helper for asserting click runner results""" m = ( "Click command exited with code `{}' and exception:\n{}" "\nThe output was:\n{}" ) exception = ( "None" if result.exc_info is None else "".join(traceback.format_exception(*result.exc_info)) ) m = m.format(result.exit_code, exception, result.output) assert result.exit_code == exit_code, m if exit_code == 0: assert not result.exception, m if exception_type is not None: assert isinstance(result.exception, exception_type), m