Source code for enstaller.repository

import collections
import operator
import os
import os.path
import sys
import time

from egginst.eggmeta import info_from_z
from egginst.vendor.six.moves import urllib
from egginst._zipfile import ZipFile

from enstaller.errors import EnstallerException, NoSuchPackage
from enstaller.eggcollect import info_from_metadir
from enstaller.utils import compute_md5, path_to_uri, PY_VER
from enstaller.versions.pep386_workaround import PEP386WorkaroundVersion
from enstaller.versions.enpkg import EnpkgVersion


class PackageVersionInfo(object):
    def __init__(self, name, version):
        self.name = name
        self.version = version


class PackageMetadata(object):
    """
    PackageMetadataBase encompasses the metadata required to resolve
    dependencies.

    They are not attached to a repository.
    """
    @classmethod
    def from_egg(cls, path):
        """
        Create an instance from an egg filename.
        """
        with ZipFile(path) as zp:
            metadata = info_from_z(zp)
        metadata["packages"] = metadata.get("packages", [])
        return cls.from_json_dict(os.path.basename(path), metadata)

    @classmethod
    def from_json_dict(cls, key, json_dict):
        """
        Create an instance from a key (the egg filename) and metadata passed as
        a dictionary
        """
        version = EnpkgVersion.from_upstream_and_build(json_dict["version"],
                                                       json_dict["build"])
        return cls(key, json_dict["name"], version, json_dict["packages"],
                   json_dict["python"])

    def __init__(self, key, name, version, packages, python):
        self.key = key

        self.name = name
        self.version = version

        self.packages = packages
        self.python = python

    def __repr__(self):
        return "PackageMetadata('{0}-{1}', key={2!r})".format(
            self.name, self.version, self.key)

    @property
    def dependencies(self):
        # FIXME: we keep packages for backward compatibility (called as is in
        # the index).
        return self.packages

    @property
    def full_version(self):
        """
        The full version as a string (e.g. '1.8.0-1' for the numpy-1.8.0-1.egg)
        """
        return str(self.version)


class RepositoryPackageMetadata(PackageMetadata):
    """
    RepositoryPackageMetadata encompasses the full set of package metadata
    available from a repository.

    In particular, RepositoryPackageMetadata's instances know about which
    repository they are coming from through the store_location attribute.
    """
    @classmethod
    def from_egg(cls, path, store_location=""):
        """
        Create an instance from an egg filename.
        """
        with ZipFile(path) as zp:
            metadata = info_from_z(zp)

        if len(store_location) == 0:
            store_location = path_to_uri(os.path.dirname(path)) + "/"

        if not store_location.endswith("/"):
            msg = "Invalid uri for store location: {0!r} (expected an uri " \
                  "ending with '/')".format(store_location)
            raise ValueError(msg)

        metadata["packages"] = metadata.get("packages", [])
        st = os.stat(path)
        metadata["size"] = st.st_size
        metadata["md5"] = compute_md5(path)
        metadata["mtime"] = st.st_mtime
        metadata["store_location"] = store_location
        return cls.from_json_dict(os.path.basename(path), metadata)

    @classmethod
    def from_json_dict(cls, key, json_dict):
        version = EnpkgVersion.from_upstream_and_build(json_dict["version"],
                                                       json_dict["build"])
        return cls(key, json_dict["name"], version, json_dict["packages"],
                   json_dict["python"], json_dict["size"], json_dict["md5"],
                   json_dict.get("mtime", 0.0), json_dict.get("product", None),
                   json_dict.get("available", True),
                   json_dict["store_location"])

    def __init__(self, key, name, version, packages, python, size, md5,
                 mtime, product, available, store_location):
        super(RepositoryPackageMetadata, self).__init__(key, name, version,
                                                        packages, python)

        self.size = size
        self.md5 = md5

        self.mtime = mtime
        self.product = product
        self.available = available
        self.store_location = store_location

        self.type = "egg"

    @property
    def s3index_data(self):
        """
        Returns a dict that may be converted to json to re-create our legacy S3
        index content
        """
        keys = ("available", "md5", "name", "packages", "product",
                "python", "mtime", "size", "type")
        ret = dict((k, getattr(self, k)) for k in keys)
        ret["version"] = str(self.version.upstream)
        ret["build"] = self.version.build
        return ret

    @property
    def source_url(self):
        return urllib.parse.urljoin(self.store_location, self.key)

    def __repr__(self):
        template = "RepositoryPackageMetadata(" \
            "'{self.name}-{self.version}', key={self.key!r}, " \
            "available={self.available!r}, product={self.product!r}, " \
            "store_location={self.store_location!r})".format(self=self)
        return template


class InstalledPackageMetadata(PackageMetadata):
    @classmethod
    def from_egg(cls, path, ctime, store_location):
        """
        Create an instance from an egg filename.
        """
        with ZipFile(path) as zp:
            metadata = info_from_z(zp)
        metadata["packages"] = metadata.get("packages", [])
        metadata["ctime"] = ctime
        metadata["store_location"] = store_location
        metadata["key"] = os.path.basename(path)
        return cls.from_installed_meta_dict(metadata)

    @classmethod
    def from_meta_dir(cls, meta_dir):
        meta_dict = info_from_metadir(meta_dir)
        if meta_dict is None:
            message = "No installed metadata found in {0!r}".format(meta_dir)
            raise EnstallerException(message)
        else:
            return cls.from_installed_meta_dict(meta_dict)

    @classmethod
    def from_installed_meta_dict(cls, json_dict):
        key = json_dict["key"]
        name = json_dict["name"]
        upstream_version = json_dict["version"]
        build = json_dict.get("build", 1)
        version = EnpkgVersion.from_upstream_and_build(upstream_version, build)
        packages = json_dict.get("packages", [])
        python = json_dict.get("python", PY_VER)
        ctime = json_dict.get("ctime", time.ctime(0.0))
        store_location = json_dict.get("store_location", "")
        return cls(key, name, version, packages, python, ctime,
                   store_location)

    def __init__(self, key, name, version, packages, python, ctime,
                 store_location):
        super(InstalledPackageMetadata, self).__init__(key, name, version,
                                                       packages, python)

        self.ctime = ctime
        self.store_location = store_location


def egg_name_to_name_version(egg_name):
    """
    Convert a eggname (filename) to a (name, version) pair.

    Parameters
    ----------
    egg_name: str
        The egg filename

    Returns
    -------
    name: str
        The name
    version: str
        The *full* version (e.g. for 'numpy-1.8.0-1.egg', the full version is
        '1.8.0-1')
    """
    basename = os.path.splitext(os.path.basename(egg_name))[0]
    parts = basename.split("-", 1)
    if len(parts) != 2:
        raise ValueError("Invalid egg name: {0!r}".format(egg_name))
    else:
        return parts[0].lower(), parts[1]


def _valid_meta_dir_iterator(prefixes):
    for prefix in prefixes:
        egg_info_root = os.path.join(prefix, "EGG-INFO")
        if os.path.isdir(egg_info_root):
            for path in os.listdir(egg_info_root):
                meta_dir = os.path.join(egg_info_root, path)
                yield prefix, egg_info_root, meta_dir


[docs]class Repository(object): """ A Repository is a set of package, and knows about which package it contains. """ def _populate_from_prefixes(self, prefixes): if prefixes is None: # pragma: nocover prefixes = [sys.prefix] for prefix, egg_info_root, meta_dir in _valid_meta_dir_iterator(prefixes): info = info_from_metadir(meta_dir) if info is not None: info["store_location"] = prefix package = \ InstalledPackageMetadata.from_installed_meta_dict(info) self.add_package(package) @classmethod def _from_prefixes(cls, prefixes=None): """ Create a repository representing the *installed* packages. Parameters ---------- prefixes: seq List of prefixes. [sys.prefix] by default """ repository = cls() repository._populate_from_prefixes(prefixes) return repository def __init__(self, packages=None): self._name_to_packages = collections.defaultdict(list) self._store_info = "" packages = packages or [] for package in packages: self.add_package(package) def __len__(self): return sum(len(self._name_to_packages[p]) for p in self._name_to_packages) def add_package(self, package_metadata): self._name_to_packages[package_metadata.name].append(package_metadata)
[docs] def delete_package(self, package_metadata): """ Remove the given package. Removing a non-existent package is an error. Parameters ---------- package_metadata : PackageMetadata The package to remove """ if not self.has_package(package_metadata): msg = "Package '{0}-{1}' not found".format( package_metadata.name, package_metadata.version) raise NoSuchPackage(msg) else: candidates = [p for p in self._name_to_packages[package_metadata.name] if p.full_version != package_metadata.full_version] self._name_to_packages[package_metadata.name] = candidates
[docs] def has_package(self, package_metadata): """Returns True if the given package is available in this repository Parameters ---------- package_metadata : PackageMetadata The package to look for. Returns ------- ret : bool True if the package is in the repository, false otherwise. """ candidates = self._name_to_packages.get(package_metadata.name, []) for candidate in candidates: if candidate.full_version == package_metadata.full_version: return True return False
[docs] def find_package(self, name, version): """Search for the first match of a package with the given name and version. Parameters ---------- name : str The package name to look for. version : str The full version string to look for (e.g. '1.8.0-1'). Returns ------- package : RepositoryPackageMetadata The corresponding metadata. """ version = EnpkgVersion.from_string(version) candidates = self._name_to_packages.get(name, []) for candidate in candidates: if candidate.version == version: return candidate raise NoSuchPackage("Package '{0}-{1}' not found".format(name, version))
[docs] def find_package_from_requirement(self, requirement): """Search for latest package matching the given requirement. Parameters ---------- requirement : Requirement The requirement to match for. Returns ------- package : RepositoryPackageMetadata The corresponding metadata. """ name = requirement.name version = requirement.version build = requirement.build if version is None: return self.find_latest_package(name) else: if build is None: upstream = PEP386WorkaroundVersion.from_string(version) candidates = [p for p in self.find_packages(name) if p.version.upstream == upstream] candidates.sort(key=operator.attrgetter("version")) if len(candidates) == 0: msg = "No package found for requirement {0!r}" raise NoSuchPackage(msg.format(requirement)) return candidates[-1] else: version = EnpkgVersion.from_upstream_and_build(version, build) return self.find_package(name, str(version))
[docs] def find_latest_package(self, name): """Returns the latest package with the given name. Parameters ---------- name : str The package's name Returns ------- package : PackageMetadata """ packages = self.find_sorted_packages(name) if len(packages) < 1: raise NoSuchPackage("No package with name {0!r}".format(name)) else: return packages[-1]
[docs] def find_sorted_packages(self, name): """Returns a list of package metadata with the given name and version, sorted from lowest to highest version (when possible). Parameters ---------- name : str The package's name Returns ------- packages : iterable Iterable of RepositoryPackageMetadata. """ packages = self.find_packages(name) try: return sorted(packages, key=operator.attrgetter("version")) except TypeError: # FIXME: allowing uncomparable versions should be disallowed at # some point return packages
[docs] def find_packages(self, name, version=None): """ Returns a list of package metadata with the given name and version Parameters ---------- name : str The package's name version : str or None If not None, the version to look for Returns ------- packages : iterable Iterable of RepositoryPackageMetadata-like (order is unspecified) """ candidates = self._name_to_packages.get(name, []) if version is None: return [package for package in candidates] else: return [package for package in candidates if package.full_version == version]
[docs] def iter_packages(self): """Iter over each package of the repository Returns ------- packages : iterable Iterable of RepositoryPackageMetadata-like. """ for packages_set in self._name_to_packages.values(): for package in packages_set: yield package
[docs] def iter_most_recent_packages(self): """Iter over each package of the repository, but only the most recent version of a given package Returns ------- packages : iterable Iterable of the corresponding RepositoryPackageMetadata-like instances. """ for name, packages in self._name_to_packages.items(): sorted_by_version = sorted(packages, key=operator.attrgetter("version")) yield sorted_by_version[-1]