Source code for enstaller.config

# Copyright by Enthought, Inc.
# Author: Ilan Schnell <ischnell@enthought.com>
from __future__ import absolute_import, print_function

import logging
import re
import os
import sys
import platform
import tempfile
import warnings

from os.path import isfile, join

from egginst.utils import parse_assignments
from egginst.vendor.six import string_types
from egginst.vendor.six.moves import urllib

from enstaller.vendor import keyring
from enstaller.vendor.keyring.backends.file import PlaintextKeyring

from enstaller import __version__
from enstaller.auth import (DUMMY_USER,
                            subscription_message, APITokenAuth, UserInfo,
                            UserPasswordAuth)
from enstaller.config_templates import RC_DEFAULT_TEMPLATE, RC_TEMPLATE
from enstaller.errors import (EnstallerException, InvalidConfiguration,
                              InvalidFormat)
from enstaller.proxy_info import ProxyInfo
from enstaller.repository_info import (BroodRepositoryInfo,
                                       CanopyRepositoryInfo,
                                       FSRepositoryInfo,
                                       OldstyleRepositoryInfo)
from enstaller.utils import real_prefix, under_venv
from enstaller.vendor import requests
from enstaller.cli.utils import humanize_ssl_error_and_die
from enstaller import plat
from .utils import PY_VER, abs_expanduser, fill_url
from ._yaml_config import load_configuration_from_yaml

logger = logging.getLogger(__name__)

KEYRING_SERVICE_NAME = 'Enthought.com'

ENSTALLER4RC_FILENAME = ".enstaller4rc"
SYS_PREFIX_ENSTALLER4RC = os.path.join(sys.prefix, ENSTALLER4RC_FILENAME)
HOME_ENSTALLER4RC = os.path.join(abs_expanduser("~"), ENSTALLER4RC_FILENAME)

STORE_KIND_LEGACY = "legacy"
STORE_KIND_BROOD = "brood"
_BROOD_PREFIX = "brood+"


def _setup_keyring():
    backend = PlaintextKeyring()

    try:
        if sys.platform == "win32":
            from enstaller.vendor.keyring.backends.Windows import \
                WinVaultKeyring
            tentative_backend = WinVaultKeyring()
        elif sys.platform == "darwin":
            from enstaller.vendor.keyring.backends.OS_X import Keyring
            tentative_backend = Keyring()
        else:
            tentative_backend = backend
        if tentative_backend.priority >= 0:
            backend = tentative_backend
    except ImportError:
        pass

    keyring.set_keyring(backend)

_setup_keyring()


def _keyring_backend_name():
    return str(type(keyring.get_keyring()))


def legacy_configuration_read_search_order():
    """
    Return a list of directories where to look for the configuration file.

    Legacy order, don't use outside canopy.
    """
    paths = [
        abs_expanduser("~"),
        real_prefix(),
    ]

    return [os.path.normpath(p) for p in paths]


def configuration_read_search_order():
    """
    Return a list of directories where to look for the configuration file.
    """
    paths = [sys.prefix]
    paths.append(abs_expanduser("~"))
    if under_venv():
        paths.append(real_prefix())

    return [os.path.normpath(p) for p in paths]


def add_url(filename, config, url):
    url = fill_url(url)
    if url in config.indexed_repositories:
        print("Already configured:", url)
        return
    prepend_url(filename, url)


def _get_writable_local_dir(local_dir):
    if not os.access(local_dir, os.F_OK):
        try:
            os.makedirs(local_dir)
            return local_dir
        except (OSError, IOError):
            pass
    elif os.access(local_dir, os.W_OK):
        return local_dir

    logger.warn('Warning: the following directory is not writeable '
                'with current permissions:\n'
                '    {0!r}\n'
                'Using a temporary cache for index and eggs.\n'.
                format(local_dir))
    return tempfile.mkdtemp()


[docs]def write_default_config(filename): """ Write a default configuration file at the given location. Will raise an exception if a file already exists. Parameters ---------- filename : str The location to write to. """ if os.path.isfile(filename): msg = "File '{0}' already exists, not overwriting." raise EnstallerException(msg.format(filename)) else: config = Configuration() if config.proxy: proxy_line = 'proxy = %r' % str(config.proxy) else: proxy_line = ('#proxy = <proxy string> ' '# e.g. "http://<user>:<passwd>@123.0.1.2:8080"') variables = {"py_ver": PY_VER, "sys_prefix": sys.prefix, "version": __version__, "proxy_line": proxy_line, "auth_section": ""} with open(filename, "w") as fo: fo.write(RC_DEFAULT_TEMPLATE % variables)
def _is_using_epd_username(filename_or_fp): """ Returns True if the given configuration file uses EPD_username. """ data = parse_assignments(filename_or_fp) return "EPD_username" in data and "EPD_auth" not in data def convert_auth_if_required(filename): """ This function will convert configuration using EPD_username + keyring to using EPD_auth. Returns True if the file has been modified, False otherwise. """ did_convert = False if _is_using_epd_username(filename): msg = "Cannot convert password: no password found in keyring" config = Configuration.from_file(filename) if config.auth is None: raise EnstallerException(msg) username = config.auth.username password = _get_keyring_password(username) if password is None: raise EnstallerException(msg) else: config.update(auth=(username, password)) config._change_auth(filename) did_convert = True return did_convert def _get_keyring_password(username): return keyring.get_password(KEYRING_SERVICE_NAME, username) def _set_keyring_password(username, password): return keyring.set_password(KEYRING_SERVICE_NAME, username, password) def _create_error_message(fp, exc): pos = fp.tell() try: fp.seek(0) lines = fp.readlines() line = lines[exc.lineno-1].rstrip() if isinstance(exc, SyntaxError): msg = "Could not parse configuration file " \ "(invalid python syntax at line {0!r}: expression {1!r})".\ format(exc.lineno, line) else: msg = "Could not parse configuration file " \ "(error at line {0!r}: expression {1!r} not " \ "supported)".format(exc.lineno, line) return msg finally: fp.seek(pos)
[docs]class Configuration(object): @classmethod def _from_legacy_locations(cls): warnings.warn("Using legacy location is deprecated, use " "Configuration.from_filename with an explicit " "filename instead", DeprecationWarning) config_path = None for p in legacy_configuration_read_search_order(): candidate = os.path.join(p, ENSTALLER4RC_FILENAME) if isfile(candidate): config_path = candidate break if config_path is None: raise InvalidConfiguration("No default configuration found.") else: return cls.from_file(config_path) @classmethod def from_yaml_filename(cls, filename): return load_configuration_from_yaml(cls, filename) @classmethod
[docs] def from_file(cls, filename): """ Create a new Configuration instance from the given file. Parameters ---------- filename: str or file-like object If a string, is understood as a filename to open. Understood as a file-like object otherwise. """ def _create(fp): ret = cls() def api_token_to_auth(api_token): ret.update(auth=APITokenAuth(api_token)) def epd_auth_to_auth(epd_auth): ret.update(auth=UserPasswordAuth.from_encoded_auth(epd_auth)) def epd_username_to_auth(username): if keyring is not None: password = _get_keyring_password(username) if password is not None: ret.update(auth=(username, password)) try: parsed = parse_assignments(fp) except (InvalidFormat, SyntaxError) as e: msg = _create_error_message(fp, e) raise InvalidConfiguration(msg) # We need a custom translator to manage attributes in the # configuration file without polluting the config class translator = ret._name_to_setter.copy() translator.update({ "EPD_auth": epd_auth_to_auth, "EPD_username": epd_username_to_auth, "IndexedRepos": translator["indexed_repositories"], "api_token": api_token_to_auth, }) if "EPD_auth" in parsed and "api_token" in parsed: msg = "Both 'EPD_auth' and 'api_token' set in configuration." \ "\nYou should remove one of those for consistent " \ "behaviour." warnings.warn(msg) for name, value in parsed.items(): if name in translator: translator[name](value) else: warnings.warn("Unsupported configuration setting {0}, " "ignored".format(name)) return ret if isinstance(filename, string_types): with open(filename, "r") as fp: ret = _create(fp) ret._filename = filename return ret else: return _create(filename)
def __init__(self, **kw): """ Create a new configuration instance. Any argument passed to the constructor need to be a keyword argument, and is understood as an argument to the update method, e.g.:: config = Configuration(store_url="http://acme.com") # equivalent to config = Configuration() config.update(store_url="http://acme.com") """ self._auth = None self._autoupdate = True self._noapp = False self._proxy = None self._use_pypi = True self._use_webservice = True self._prefix = os.path.normpath(sys.prefix) self._indexed_repositories = [] self._repositories = [] self._store_url = "https://api.enthought.com" self._store_kind = STORE_KIND_LEGACY self._repository_cache = None self._filename = None self._platform = plat.custom_plat self._max_retries = 0 self._verify_ssl = True self._name_to_setter = {} simple_attributes = [ ("autoupdate", "_autoupdate"), ("noapp", "_noapp"), ("verify_ssl", "_verify_ssl"), ("use_pypi", "_use_pypi"), ("use_webservice", "_use_webservice"), ] for name, private_attribute in simple_attributes: self._name_to_setter[name] = \ self._simple_attribute_set_factory(private_attribute) self._name_to_setter.update({ "auth": self._set_auth, "indexed_repositories": self._set_indexed_repositories, "max_retries": self._set_max_retries, "prefix": self._set_prefix, "proxy": self._set_proxy, "repository_cache": self._set_repository_cache, "store_url": self._set_store_url, }) self.update(**kw) # ---------- # Properties # ---------- @property def api_url(self): """ Url to hit to get user information on api.e.com. """ return fill_url("{0}/accounts/user/info/".format(self.store_url)) @property def auth(self): """ The auth object that may be passed to Session.authenticate :return: the auth instance, or None is configured. :rtype: IAuth or None """ return self._auth @property def autoupdate(self): """ Whether enpkg should attempt updating itself. """ return self._autoupdate @property def filename(self): """ The filename this configuration was created from. May be None if the configuration was not created from a file. """ return self._filename @property def indexed_repositories(self): """ List of (old-style) repositories. Only actually used when use_webservice is False. """ return self._indexed_repositories @property def indices(self): """ Returns a list of pair (index_url, store_location) for this given configuration. Takes into account webservice/no webservice and pypi True/False .. deprecated:: 4.9.0 Use the repositories property instead """ msg = ("the indices property is deprecated: use the repositories " "property instead") warnings.warn(msg, DeprecationWarning) return tuple((repository_info.index_url, repository_info._base_url) for repository_info in self.repositories) @property def max_retries(self): """ Max attempts to retry an http connection or re-fetching data whose checksum failed. """ return self._max_retries @property def noapp(self): """ Ignore appinst entries. """ return self._noapp @property def prefix(self): """ Prefix in which enpkg operates. """ return self._prefix @property def proxy(self): """ A ProxyInfo instance or None if no proxy is configured. """ return self._proxy @property def proxy_dict(self): """ A dictionary <scheme>:<proxy_string> that can be used as the proxies argument for requests. """ if self._proxy: return {self._proxy.scheme: str(self._proxy)} else: return {} @property def repositories(self): if self.use_webservice: return (CanopyRepositoryInfo(self.store_url, self.use_pypi, self._platform),) else: return self._repositories @property def repository_cache(self): """ Absolute path where eggs will be cached. """ if self._repository_cache is None: return join(self.prefix, "LOCAL-REPO") else: return self._repository_cache @property def store_kind(self): """ Store kind (brood, legacy canopy, old-repo style). """ return self._store_kind @property def store_url(self): """ The store url to hit for indices and eggs. """ return self._store_url @property def verify_ssl(self): """ Whether to verify SSL CA or not. """ return self._verify_ssl @property def use_pypi(self): """ Whether to load pypi repositories (in `webservice` mode). """ return self._use_pypi @property def use_webservice(self): """ Whether to use canopy legacy or not. """ return self._use_webservice @property def webservice_entry_point(self): """ Whether to fetch indices and data (in `webservice` mode). """ return fill_url("{0}/eggs/{1}/". format(self.store_url, self._platform)) # -------------- # Public methods # -------------- def reset_auth(self): self._auth = None def set_auth_from_encoded(self, value): try: auth = UserPasswordAuth.from_encoded_auth(value) except Exception: raise InvalidConfiguration("Invalid EPD_auth value") else: self.update(auth=auth)
[docs] def set_repositories_from_names(self, names): """ Set repositories from their names alone, e.g. 'enthought/free'. Parameters ---------- names : list List of repository names """ repositories = [] for name in names: p = urllib.parse.urlparse(name) if p.scheme == "": repositories.append(BroodRepositoryInfo(self.store_url, name, self._platform)) elif p.scheme == "file": repositories.append(FSRepositoryInfo(name)) else: msg = "Unsupported syntax: {0!r}".format(name) raise InvalidConfiguration(msg) self._repositories = tuple(repositories)
[docs] def update(self, **kw): """ Set configuration attributes given as keyword arguments.""" for name, value in kw.items(): setter = self._name_to_setter.get(name, None) if name is None: raise ValueError("Invalid setting name: {0!r}".format(name)) else: setter(value)
[docs] def write(self, filename): """ Write this configuration to the given filename. Parameters ---------- filename : str The path of the written file. """ if self.auth is None: auth_section = "" else: auth_section = self.auth.config_string if self.proxy: proxy_line = 'proxy = %r' % str(self.proxy) else: proxy_line = ('#proxy = <proxy string> ' '# e.g. "http://<user>:<passwd>@123.0.1.2:8080"') # XXX: because setting indexed repositories use string replacement # (sigh...), we need to write the IndexedRepos section in a very specific # way: # # IndexedRepos = [ # url1, # url2 # ] indexed_repos_string = "\n".join(" %r," % (repo,) for repo in self.indexed_repositories) if self.store_kind == STORE_KIND_BROOD: store_url = _BROOD_PREFIX + self.store_url else: store_url = self.store_url variables = {"py_ver": PY_VER, "sys_prefix": sys.prefix, "version": __version__, "proxy_line": proxy_line, "auth_section": auth_section, "store_url": store_url, "indexed_repositories": indexed_repos_string, "use_pypi": self.use_pypi, "use_webservice": self.use_webservice, } with open(filename, "w") as fo: fo.write(RC_TEMPLATE % variables) # --------------- # Private methods # ---------------
def _change_auth(self, filename): if self.auth is None: pat = re.compile(r'^(EPD_auth|EPD_username)\s*=.*$', re.M) with open(filename, 'r') as fi: data = fi.read() if pat.search(data): data = pat.sub("", data) with open(filename, 'w') as fo: fo.write(data) else: self.auth.change_auth(filename) def _checked_change_auth(self, auth, session, filename): user = {} session.authenticate(auth) self.update(auth=auth) self._change_auth(filename) user = UserInfo.from_session(session) print(subscription_message(self, user)) return user def _set_auth(self, auth): """ Set the internal authentication information. Parameters ---------- auth : Auth-like The authentication information. May be a (username, password) tuple, or an *Auth subclass. """ if isinstance(auth, tuple) and len(auth) == 2: username, password = auth if username is None: raise InvalidConfiguration( "invalid authentication arguments: " "{0}:{1}".format(username, password)) self._auth = UserPasswordAuth(username, password) else: self._auth = auth def _set_indexed_repositories(self, urls): self._indexed_repositories = tuple(fill_url(url) for url in urls) self._repositories = tuple(OldstyleRepositoryInfo(url) for url in self._indexed_repositories) def _set_max_retries(self, raw_max_retries): try: max_retries = int(raw_max_retries) except ValueError: msg = "Invalid type for 'max_retries': {0!r}" raise InvalidConfiguration(msg.format(raw_max_retries)) else: self._max_retries = max_retries def _set_prefix(self, prefix): self._prefix = os.path.normpath(abs_expanduser(prefix)) def _set_proxy(self, proxy_string): self._proxy = ProxyInfo.from_string(proxy_string) def _set_store_url(self, url): p = urllib.parse.urlparse(url) if p.scheme.startswith(_BROOD_PREFIX): url = url[len(_BROOD_PREFIX):] self._store_kind = STORE_KIND_BROOD self._store_url = url def _set_repository_cache(self, value): normalized = os.path.normpath(abs_expanduser(value)) self._repository_cache = _get_writable_local_dir(normalized) def _simple_attribute_set_factory(self, attribute_name): return lambda value: setattr(self, attribute_name, value)
def prepend_url(filename, url): with open(filename, 'r+') as fp: data = fp.read() pat = re.compile(r'^IndexedRepos\s*=\s*\[\s*$', re.M) if not pat.search(data): sys.exit("Error: IndexedRepos section not found") data = pat.sub(r"IndexedRepos = [\n '%s'," % url, data) fp.seek(0) fp.write(data) def print_config(config, session): print("Python version:", PY_VER) print("enstaller version:", __version__) print("sys.prefix:", os.path.normpath(sys.prefix)) print("platform:", platform.platform()) print("architecture:", platform.architecture()[0]) print("use_webservice:", config.use_webservice) if config.filename is not None: print("config file:", config.filename) print("keyring backend: %s" % (_keyring_backend_name(), )) print("settings:") print(" prefix = %s" % config.prefix) print(" %s = %s" % ("repository_cache", config.repository_cache)) print(" %s = %r" % ("noapp", config.noapp)) print(" %s = %r" % ("proxy", config.proxy)) if not config.use_webservice: print(" IndexedRepos:") for repo in config.indexed_repositories: print(' %r' % repo) user = DUMMY_USER if config.auth is None: print("No valid auth information in configuration, cannot " "authenticate.") else: try: session.authenticate(config.auth) user = UserInfo.from_session(session) except requests.exceptions.SSLError as e: humanize_ssl_error_and_die(e, config.store_url) except Exception as e: print(e) print(subscription_message(config, user))