Source code for enstaller.session

from __future__ import absolute_import

import collections
import contextlib
import os.path
import threading

from egginst.utils import atomic_file, ensure_dir
from egginst.vendor.six.moves import urllib

from enstaller import __version__
from enstaller.auth.auth_managers import (BroodAuthenticator,
                                          LegacyCanopyAuthManager,
                                          OldRepoAuthManager)
from enstaller.config import STORE_KIND_BROOD
from enstaller.errors import EnstallerException
from enstaller.vendor import requests
from enstaller.vendor.cachecontrol.adapter import CacheControlAdapter

from enstaller.requests_utils import (DBCache, LocalFileAdapter,
                                      QueryPathOnlyCacheController)

from enstaller.auth import UserPasswordAuth


class _PatchedRawSession(requests.Session):
    """ Like requests.Session, but supporting (nested) umounting of
    adapters.
    """
    def __init__(self, *a, **kw):
        self._adapters_stack = collections.defaultdict(list)
        super(_PatchedRawSession, self).__init__(*a, **kw)

    def mount(self, prefix, adapter):
        self._adapters_stack[prefix].append(adapter)
        super(_PatchedRawSession, self).mount(prefix, adapter)

    def umount(self, prefix):
        if prefix in self._adapters_stack \
                and len(self._adapters_stack[prefix]) > 0:
            self.adapters.pop(prefix)
            current_adapter = self._adapters_stack[prefix].pop()

            if len(self._adapters_stack[prefix]) > 0:
                previous_adapter = self._adapters_stack[prefix][-1]
                self.adapters[prefix] = previous_adapter

            return current_adapter
        else:
            msg = "no adapter registered for prefix {0!r}".format(prefix)
            raise ValueError(msg)


[docs]class Session(object): """ Simple class to handle http session management It also ensures connection settings such as proxy, SSL CA certification, etc... are handled consistently). Parameters ---------- authenticator : IAuthManager An authenticator instance cache_directory : str A writeable directory to cache data and eggs. proxies : dict A proxy dict as expected by requests (and provided by Configuration.proxy_dict). verify : bool If True, SSL CA are verified (default). """ def __init__(self, authenticator, cache_directory, proxies=None, verify=True, max_retries=0): self.proxies = proxies self.verify = verify self.cache_directory = cache_directory self.max_retries = max_retries self._authenticator = authenticator self._raw = _PatchedRawSession() if proxies is not None: self._raw.proxies = proxies self._raw.verify = verify self._raw.mount("file://", LocalFileAdapter()) adapter = requests.adapters.HTTPAdapter(max_retries=self.max_retries) for prefix in ("http://", "https://"): self._raw.mount(prefix, adapter) user_agent = "enstaller/{0} {1}".format(__version__, self._raw.headers["user-agent"]) self._raw.headers["user-agent"] = user_agent self._in_etag_context = 0 self._etag_context_lock = threading.RLock() @classmethod
[docs] def authenticated_from_configuration(cls, configuration): """ Create a new authenticated session from a configuration. Parameters ---------- configuration : Configuration The configuration to use. """ if configuration.auth is None: msg = "No auth configured for the given configuration." raise EnstallerException(msg) session = cls.from_configuration(configuration) session.authenticate(configuration.auth) return session
@classmethod
[docs] def from_configuration(cls, configuration): """ Create a new session from a configuration. Parameters ---------- configuration : Configuration The configuration to use. """ if configuration.store_kind == STORE_KIND_BROOD: klass = BroodAuthenticator elif configuration.use_webservice: klass = LegacyCanopyAuthManager else: klass = OldRepoAuthManager authenticator = klass.from_configuration(configuration) return cls(authenticator, configuration.repository_cache, configuration.proxy_dict, verify=configuration.verify_ssl, max_retries=configuration.max_retries)
def close(self): self._raw.close() def __enter__(self): return self def __exit__(self, *a, **kw): self.close() @contextlib.contextmanager def etag(self): with self._etag_context_lock: self._etag_setup() try: yield finally: with self._etag_context_lock: self._etag_tear() def _etag_setup(self): if self._in_etag_context == 0: uri = os.path.join(self.cache_directory, "index_cache", "index.db") ensure_dir(uri) cache = DBCache(uri) for prefix in ("http://", "https://"): adapter = CacheControlAdapter( cache, controller_class=QueryPathOnlyCacheController, max_retries=self.max_retries) self._raw.mount(prefix, adapter) self._in_etag_context += 1 def _etag_tear(self): if self._in_etag_context == 1: for prefix in ("https://", "http://"): # XXX: This close is ugly, but I am not sure how one can link a cache # controller to a http adapter in cachecontrol. See issue #42 on # ionrock/cachecontrol @ github. adapter = self._raw.umount(prefix) adapter.cache.close() self._in_etag_context -= 1 def authenticate(self, auth): if isinstance(auth, tuple) and len(auth) == 2: auth = UserPasswordAuth(auth[0], auth[1]) self._authenticator.authenticate(self, auth.request_adapter) self._raw.auth = self._authenticator._auth
[docs] def download(self, url, target=None): """ Safely download the content at the give url. Safely here is understood as not leaving stalled content if the download fails or is canceled. It uses streaming as so not to use too much memory. """ resp = self._raw.get(url, stream=True) resp.raise_for_status() if target is None: target = os.path.basename(urllib.parse.urlparse(url).path) with atomic_file(target) as fp: for chunk in resp.iter_content(1024): fp.write(chunk) return target
def fetch(self, url): resp = self._raw.get(url, stream=True) resp.raise_for_status() return resp def get(self, url, stream=False): return self._raw.get(url, stream=stream) def head(self, url): return self._raw.head(url) def _raw_get(self, url, auth=None): return self._raw.get(url, auth=auth) def _raw_head(self, url, auth=None): return self._raw.head(url, auth=auth) def _raw_post(self, url, auth=None): return self._raw.post(url, auth=auth)