Source code for hop.auth

from adc import auth
from . import configure
import os
import getpass
import logging
import csv
import errno
import stat
import toml
from collections.abc import Mapping

SASLMethod = auth.SASLMethod


logger = logging.getLogger("hop")


# thin wrapper over adc's SASLAuth to define
# different defaults used within Hopskotch
[docs]class Auth(auth.SASLAuth): """Attach SASL-based authentication to a client. Returns client-based auth options when called. Parameters ---------- user : `str` Username to authenticate with. password : `str` Password to authenticate with. host : `str`, optional The name of the host for which this authentication is valid. ssl : `bool`, optional Whether to enable SSL (enabled by default). method : `SASLMethod`, optional The SASL method to authenticate, default = SASLMethod.SCRAM_SHA_512. See valid SASL methods in SASLMethod. ssl_ca_location : `str`, optional If using SSL via a self-signed cert, a path/location to the certificate. """ def __init__(self, user, password, host="", ssl=True, method=SASLMethod.SCRAM_SHA_512, **kwargs): super().__init__(user, password, ssl=ssl, method=method, **kwargs) self._username = user self._hostname = host @property def username(self): return self._username @property def hostname(self): return self._hostname def __eq__(self, other): return (self._username == other._username and self._config["sasl.password"] == other._config["sasl.password"] and self.hostname == other.hostname and self._config["security.protocol"] == other._config["security.protocol"] and self._config["sasl.mechanism"] == other._config["sasl.mechanism"])
[docs]def load_auth(config_file=None): """Configures an Auth instance given a configuration file. Args: config_file: Path to a configuration file, loading from the default location if not given. Returns: A list of configured Auth instances. Raises: RuntimeError: The config file exists, but has unsafe permissions and will not be read until they are corrected. KeyError: An error occurred parsing the configuration file. FileNotFoundError: The configuration file, either as specified explicitly or found automatically, does not exist """ if config_file is None: config_file = configure.get_config_path("auth") # If finding data automatically, as a fallback, look to see if the auth data is in the main # config in the old style. if not os.path.exists(config_file): main_config_file = configure.get_config_path("general") if os.path.exists(main_config_file): try: return load_auth(main_config_file) except RuntimeError: # if the main config file does not contain auth data, rewrite the error into a # complaint that the auth config file does not exist, since that is what should # be fixed raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_file) # if config.toml does not exist, continue and complain about lack of auth.toml, # which is the real issue if not os.path.exists(config_file): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), config_file) # check that the config file has suitable permissions config_props = os.stat(config_file) dangerous_perm_mask = 0o7077 if (config_props.st_mode & dangerous_perm_mask) != 0: raise RuntimeError(f"{config_file} has unsafe permissions: {config_props.st_mode:o}\n" "Please correct it to 0600") # load config with open(config_file, "r") as f: try: auth_data = toml.loads(f.read())["auth"] except KeyError: raise RuntimeError("configuration file has no auth section") except Exception as ex: raise RuntimeError(f"configuration file is not configured correctly: {ex}") return _interpret_auth_data(auth_data)
[docs]def prune_outdated_auth(config_file=None): """Remove auth data from a general configuration file. This can be needed when updating auth data which was read from the general config for backwards compatibility, but is then written out to the correct new location in a separate auth config, as is now proper. With no further action, this would leave a vestigial copy from before the update in the general config file, which would not be rewritten, so this function exists to perform the necessary rewrite. Args: config_file: Path to a configuration file, rewriting the default location if not given. Raises: RuntimeError: The config file is malformed. """ if config_file is None: config_file = configure.get_config_path("general") if not os.path.exists(config_file): return # nothing to do! with open(config_file, "r") as f: try: config_data = toml.loads(f.read()) except Exception as ex: raise RuntimeError(f"configuration file {config_file} is malformed: {ex}") if "auth" in config_data: del config_data["auth"] # only overwrite if we made a change with open(config_file, "w") as f: toml.dump(config_data, f)
def _interpret_auth_data(auth_data): """Configures Auth instances given data read from a configuration file. Args: auth_data: The data obtained from the top-level 'auth' TOML key Returns: A list of configured Auth instances. Raises: KeyError: A required key is missing from a credential entry. """ if isinstance(auth_data, Mapping): # upgrade old-style single dict configs to new-style list-of-dicts (with one item) auth_data = [auth_data] auth = [] for config in auth_data: # translate config options host = "" ssl = True extra_kwargs = {} try: # SSL options if "protocol" in config and config["protocol"] == "SASL_PLAINTEXT": ssl = False elif "ssl_ca_location" in config: extra_kwargs["ssl_ca_location"] = config["ssl_ca_location"] # SASL options user = config["username"] password = config["password"] if "hostname" in config: host = config["hostname"] if "mechanism" in config: mechanism = config["mechanism"].replace("-", "_") else: mechanism = "SCRAM_SHA_512" except KeyError as ke: raise RuntimeError("configuration file is not configured correctly: " f"missing auth property {ke}") else: auth.append(Auth(user, password, host=host, ssl=ssl, method=SASLMethod[mechanism], **extra_kwargs)) return auth
[docs]def select_matching_auth(creds, hostname, username=None): """Selects the most appropriate credential to use when attempting to contact the given host. Args: creds: A list of configured Auth objects. These can be obtained from :meth:`load_auth`. hostname: The name of the host for which to select suitable credentials. username: `str`, optional The name of the credential to use. Returns: A single Auth object which should be used to authenticate. Raises: RuntimeError: Too many or too few credentials matched. """ matches = [] exact_hostname_match = False inexact_hostname_match = False for cred in creds: maybe_exact_hostname_match = False maybe_inexact_hostname_match = False # If the credential has a hostname, we require it to match. # If it doesn't, we have to assume it might suit the user's purposes. if len(cred.hostname) > 0: if cred.hostname != hostname: continue else: # the hostname does match, but this credential might be rejected for other reasons # so we cannot directly update the global exact_hostname_match maybe_exact_hostname_match = True else: maybe_inexact_hostname_match = True # If the user specified a specific username, we require that to be matched. if username is not None and cred.username != username: continue matches.append(cred) exact_hostname_match |= maybe_exact_hostname_match inexact_hostname_match |= maybe_inexact_hostname_match # if there were both exact and inexact matches, cull the inexact matches if inexact_hostname_match and exact_hostname_match: matches = [match for match in matches if match.hostname == hostname] if len(matches) == 0: err = f"No matching credential found for hostname '{hostname}'" if username is not None: err += f" with username '{username}'" raise RuntimeError(err) if len(matches) > 1: err = f"Ambiguous credentials found for hostname '{hostname}'" err += f" with username '{username}'" if username is not None \ else " with no username specified" err += "Matched credentials:" for match in matches: err += f"\n {match.username}" err += " which has no associated hostname" if len(match.hostname) == 0 \ else f" for {match.hostname}" raise RuntimeError(err) # At this point we should have exactly one match return matches[0]
[docs]def read_new_credential(csv_file=None): """Import a credential from a CSV file or obtain it interactively from the user. Args: csv_file: Path to a file from which to read credential data in CSV format. If unspecified, the user will be prompted to enter data instead. Returns: A configured `Auth` object containing the new credential. Raises: FileNotFoundError: If csv_file is not None and refers to a nonexistent path. KeyError: If csv_file is not None and the specified file does not contain either a username or password field. RuntimeError: If csv_file is None and the interactively entered username or passwod is empty. """ if csv_file is None: logger.info("Generating configuration with user-specified username + password") username = input("Username: ") if len(username) == 0: raise RuntimeError("Username may not be empty") password = getpass.getpass() if len(password) == 0: raise RuntimeError("Password may not be empty") hostname = input("Hostname (may be empty): ") else: if os.path.exists(csv_file): with open(csv_file, "r") as f: reader = csv.DictReader(f) cred = next(reader) username = cred["username"] password = cred["password"] hostname = cred["hostname"] if "hostname" in cred else "" else: raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), csv_file) return Auth(username, password, hostname)
[docs]def write_auth_data(config_file, credentials): """Write configuration file for the set of credentials. Creates containing directories as needed. Args: config_file: configuration file path credentials: list of `Auth` objects representing credentials to be stored """ cred_list = [] for cred in credentials: cred_dict = {"username": cred.username, "password": cred._config["sasl.password"]} if len(cred.hostname) > 0: cred_dict["hostname"] = cred.hostname cred_list.append(cred_dict) os.makedirs(os.path.dirname(config_file), exist_ok=True) fd = os.open(config_file, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, stat.S_IRUSR | stat.S_IWUSR) with open(fd, "w") as f: toml.dump({"auth": cred_list}, f) logger.info(f"Wrote configuration to: {config_file}")
[docs]def list_credentials(): """Display a list of all configured credentials. """ creds = load_auth() max_username_len = max([len(c.username) for c in creds]) if len(creds) > 0 else 1 long_format = f"{{:{max_username_len}}} for {{}}" for cred in creds: if len(cred.hostname) > 0: print(str.format(long_format, cred.username, cred.hostname)) else: print(cred.username) if len(creds) == 0 and os.isatty(1): print("No credentials configured")
[docs]def add_credential(args): """Load a new credential and store it to the persistent configuration. Args: args: Command line options/arguments object. args.cred_file is taken as the path to a CSV file to import, or if None the user is prompted to enter a credential directly. args.force controls whether an existing credential with an identical name will be overwritten. """ # first load any existing credentials try: creds = load_auth() except FileNotFoundError: # if no auth file exists we can just treat that as there being no credentials creds = [] # next, load the new credential new_cred = read_new_credential(args.cred_file) # check for any conflicts between the new credential and an existing one conflicting_cred_idx = None for idx, cred in enumerate(creds): if cred.username == new_cred.username: if len(cred.hostname) > 0 and len(new_cred.hostname) > 0 \ and cred.hostname == new_cred.hostname: conflicting_cred_idx = idx elif len(cred.hostname) == 0 and len(new_cred.hostname) == 0: conflicting_cred_idx = idx if conflicting_cred_idx is not None: if args.force: creds[idx] = new_cred else: logger.error("Credential already exists; overwrite with --force") return else: creds.append(new_cred) write_auth_data(configure.get_config_path("auth"), creds) prune_outdated_auth()
def _construct_ambiguous_deletion_message(username, hostname, matches): """Create an error message for an ambiguous request to delete a credential. This function should only be used by `delete_credential`. Args: username: The username for the credential targeted for deletion. hostname: The hostname for the credential targeted for deletion, if any. matches: the collection of possibly matching credentials found by delete_credential. Returns: An error message string. """ err = f"Ambiguous credentials found for username '{username}'" if hostname is not None: err += f" with hostname '{hostname}'\n" else: err += " with no hostname specified\n" err += "Matched credentials:" for match in matches: err += f"\n {match.username}" if len(match.hostname) == 0: err += " which has no associated hostname" else: err += f" for {match.hostname}" return err
[docs]def delete_credential(name: str): """Delete a credential from the persistent configuration. Args: name: The username, or username and hostname separated by an '@' character of the credential to delete. Raises: RuntimeError: If no credentials or more than one credential matches the specified name, making the operation impossible or ambiguous. """ # first load any existing credentials try: creds = load_auth() except FileNotFoundError: # if no auth file exists we can just treat that as there being no credentials creds = [] if '@' in name: username, hostname = name.split('@') else: username = name hostname = None # next, try to figure out which one we're supposed to remove matches = [] match_indices = [] for idx, cred in enumerate(creds): # the username must match if cred.username != username: continue # if specified, the hostname must match if hostname is not None and cred.hostname != hostname: continue matches.append(cred) match_indices.append(idx) if len(matches) == 0: err = f"No matching credential found with username '{username}'" if hostname is not None: err += f" with hostname '{hostname}'" raise RuntimeError(err) elif len(matches) > 1: raise RuntimeError(_construct_ambiguous_deletion_message(username, hostname, matches)) # At this point we should have exactly one match, which we can delete del creds[match_indices[0]] write_auth_data(configure.get_config_path("auth"), creds) prune_outdated_auth()
def _add_parser_args(parser): subparser = parser.add_subparsers(title="commands", metavar="<command>", dest="command") subparser.required = True subparser.add_parser("locate", help="display configuration path") subparser.add_parser("list", help="Display all stored credentials") add_cred_parser = subparser.add_parser("add", help="Load a credential, " "specified either via a CSV file or interactively") add_cred_parser.add_argument("cred_file", type=str, default=None, nargs='?', help="Import credentials from CSV file") del_cred_parser = subparser.add_parser("remove", help="Delete a stored credential") del_cred_parser.add_argument("name", type=str, help="The name of the credential to delete. This will be treated " "as a username, unless it contains a @ character, in which case it" " will be split into a username and hostname.") def _main(args): """Authentication configuration utilities. """ logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(name)s : %(levelname)s : %(message)s", ) if args.command == "locate": print(configure.get_config_path("auth")) elif args.command == "list": list_credentials() elif args.command == "add": add_credential(args) elif args.command == "remove": delete_credential(args.name)