#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
# Andre Anjos <andre.dos.anjos@gmail.com>
# Wed 22 Aug 10:20:07 2012
"""Generic tools for Bob buildout recipes
"""
import os
import sys
import site
import fnmatch
import pkg_resources
import distutils
import zc.buildout
import zc.buildout.easy_install
from zc.buildout.buildout import bool_option, MissingOption
import logging
logger = logging.getLogger(__name__)
[docs]def site_paths(buildout, prefixes):
"""Filters the site paths to make sure we don't get mistaken when filtering
user directories.
"""
def is_buildout_dir(path):
return path.startswith(buildout['eggs-directory']) or \
path.startswith(buildout['develop-eggs-directory'])
def is_in_prefixes(path):
return any([path.startswith(k) for k in prefixes])
retval = [os.path.realpath(k) for k in site.sys.path]
return [k for k in retval if not (is_buildout_dir(k) or is_in_prefixes(k))]
[docs]def uniq(seq, idfun=None):
"""Order preserving, fast de-duplication for lists"""
if idfun is None:
def idfun(x): return x
seen = {}
result = []
for item in seq:
marker = idfun(item)
if marker in seen: continue
seen[marker] = 1
result.append(item)
return result
[docs]def parse_list(l):
"""Parses a ini-style list from buildout and solves complex nesting"""
if not l: return []
return uniq([k.strip() for k in l.split() if len(k.strip()) > 0])
[docs]def add_eggs(eggs, l):
"""Adds eggs from a list into the buildout option"""
return '\n'.join(uniq(eggs + l))
[docs]def prepend_path(path, paths):
"""Prepends a path to the list of paths making sure it remains unique"""
if path in paths: paths.remove(path)
paths.insert(0, path)
[docs]def is_directory(filename):
"""Tells if the file is a directory"""
return os.path.isdir(filename)
[docs]def directory_readlines(package, filename):
"""Read all lines of a given file in a directory"""
try:
return open(os.path.join(package, filename), 'rt').readlines()
except:
pass
return []
[docs]def is_zipfile(filename):
"""Tells if the file is a zip file"""
import zipfile
return zipfile.is_zipfile(filename)
[docs]def zipfile_readlines(package, filename):
"""Read all lines of a given file in a tar ball"""
import zipfile
f = None
try:
f = zipfile.ZipFile(package)
try:
package_dir = os.path.splitext(os.path.basename(package))[0]
return [line.decode('utf-8') if isinstance(line, bytes) else line for line in f.open(os.path.join(package_dir, filename), 'rU').readlines()]
except:
pass
finally:
if f is not None:
f.close()
return []
[docs]def is_tarfile(filename):
"""Tells if the file is a tar ball"""
import tarfile
return tarfile.is_tarfile(filename)
[docs]def tarfile_readlines(package, filename):
"""Read all lines of a given file in tar ball"""
import tarfile
try:
f = tarfile.open(package)
try:
package_dir = os.path.splitext(os.path.dirname(package))[0]
if package_dir.endswith('.tar'): package_dir = package_dir[:-4]
return [line.decode('utf-8') if isinstance(line, bytes) else line for line in f.extractfile(os.path.join(package_dir, filename)).readlines()]
except:
pass
finally:
if f is not None:
f.close()
return []
[docs]def package_readlines(package, filename):
"""Extracts a single file contents from a given package"""
if is_directory(package):
return directory_readlines(package, filename)
elif is_zipfile(package):
return zipfile_readlines(package, filename)
elif is_tarfile(package):
return tarfile_readlines(package, filename)
else:
raise RuntimeError("package format not recognized: `%s'" % package)
[docs]def requirement_is_satisfied(requirement, working_set, newest):
"""Checks if the specifications for a requirement are satisfied by the
current working set"""
if newest: return False
try:
working_set.require(requirement)
return True
except:
pass
return False
[docs]def unsatisfied_requirements(buildout, package, working_set):
"""Reads and extracts the unsatisfied requirements from the package
"""
# read all lines from "requirements.txt"
specs = [k.strip() for k in package_readlines(package, 'requirements.txt')]
# discard empty lines and comments
specs = [k for k in specs if k and k[0] not in ('#', '-')]
# do not consider packages which are already installed, with a reasonable
# version matching the user specification, either on the current working
# set, the installed eggs or the system paths
newest = bool_option(buildout, 'newest', 'true')
left_over = []
for k in specs:
if requirement_is_satisfied(k, working_set, newest):
dist = working_set.require(k)[0]
logger.info("taking requirement `%s' (%s) from `%s'", dist.key,
dist.version, dist.location)
else:
left_over.append(k)
specs = left_over
return left_over
[docs]def merge_working_sets(self, other):
"""Merge two working sets, results are put on the first one"""
for dist in other.by_key.values(): self.add(dist)
return self
[docs]def install_package(buildout, specification, working_set):
"""Installs a package on either the eggs directory or development-eggs
directory. Updates the working set"""
new_ws = zc.buildout.easy_install.install(
specs = [specification],
dest = buildout['eggs-directory'],
links = buildout.get('find-links', '').split(),
index = buildout.get('index', None),
path = [buildout['develop-eggs-directory']],
working_set = working_set,
newest = bool_option(buildout, 'newest', 'true'),
)
merge_working_sets(working_set, new_ws)
return working_set
[docs]def satisfy_requirements(buildout, package, working_set):
"""Makes sure all requirements from a given package are installed properly
before we try to install the package itself."""
requirements = unsatisfied_requirements(buildout, package, working_set)
if not requirements: return
# only installs if not on "offline" mode
if offline(buildout):
raise zc.buildout.UserError("We don't have a distribution for %s\n"
"and can't install one in offline (no-install) mode.\n"
% ','.join(requirements))
# installs all missing dependencies, if required, updates working set
for req in requirements:
logger.info("Installing `%s' for package `%s'...", req, package)
working_set = install_package(buildout, req, working_set)
[docs]def get_pythonpath(working_set, buildout, prefixes):
"""Returns the PYTHONPATH setting for a particular working set"""
# get all paths available in the current working set
paths = list(working_set.entries)
if hasattr(zc.buildout.easy_install, 'distribute_loc'):
prepend_path(zc.buildout.easy_install.distribute_loc, paths)
elif hasattr(zc.buildout.easy_install, 'setuptools_loc'):
prepend_path(zc.buildout.easy_install.setuptools_loc, paths)
else:
prepend_path(zc.buildout.easy_install.setuptools_path, paths)
return [k for k in working_set.entries \
if os.path.realpath(k) not in site_paths(buildout, prefixes)]
[docs]def get_prefixes(buildout):
"""Returns a list of prefixes set on the buildout section"""
prefixes = parse_list(buildout.get('prefixes', ''))
return [os.path.abspath(k) for k in prefixes if os.path.exists(k)]
[docs]def find_site_packages(prefixes):
"""Finds python packages on prefixes"""
from distutils.sysconfig import get_python_lib
# Standard prefixes to check
PYTHONDIR = 'python%d.%d' % sys.version_info[0:2]
SUFFIXES = uniq([
get_python_lib(prefix=''),
os.path.join('lib', PYTHONDIR, 'site-packages'),
os.path.join('lib32', PYTHONDIR, 'site-packages'),
os.path.join('lib64', PYTHONDIR, 'site-packages'),
])
retval = []
for k in prefixes:
for suffix in SUFFIXES:
candidate = os.path.realpath(os.path.join(k, suffix))
if os.path.exists(candidate) and candidate not in retval:
retval.append(candidate)
return retval
[docs]def has_distribution(path):
"""Tests if a given path really has installed python distributions"""
ws = pkg_resources.WorkingSet([path])
return bool(ws.entry_keys[path])
[docs]def order_egg_dirs(buildout):
"""Orders the egg directories and returns them newest first"""
eggdir = buildout['eggs-directory']
eggs = [os.path.join(eggdir, k) for k in os.listdir(eggdir)]
distros = {}
for egg in eggs:
working_set = pkg_resources.WorkingSet([egg])
for key in working_set.entry_keys[egg]:
distro = working_set.by_key[key]
distro_version = distutils.version.LooseVersion(distro.version)
if key in distros:
if distro_version <= distros[key][0]: continue
distros[key] = (distro_version, egg)
return [k[1] for k in distros.values()]
[docs]def working_set(buildout):
"""Creates and returns a new working set based on user prefixes and existing
packages already installed"""
working_set = pkg_resources.WorkingSet([])
# add development directory first
dev_dir = buildout['develop-eggs-directory']
for path in fnmatch.filter(os.listdir(dev_dir), '*.egg-link'):
full_path = os.path.join(dev_dir, path)
python_path = open(full_path, 'rt').read().split('\n')[0]
distro = None
wants = os.path.splitext(path)[0]
distro = [k for k in pkg_resources.find_distributions(python_path) \
if k.project_name == wants]
if not distro:
raise RuntimeError("Could not find a distribution for `%s' under `%s'" \
" - check egg-link at `%s'" % (wants, python_path, full_path))
working_set.add(distro[0])
# add all egg directories, newest first
for path in order_egg_dirs(buildout): working_set.add_entry(path)
# adds the user paths
for path in find_site_packages(get_prefixes(buildout)):
if has_distribution(path) and path not in working_set.entries:
working_set.add_entry(path)
# finally, adds the system path
for path in site.sys.path:
if has_distribution(path) and path not in working_set.entries:
working_set.add_entry(path)
return working_set
[docs]def filter_working_set_hard(working_set, requirements):
"""Returns a new working set which contains only the paths to the required
packages. Raises if a requirement cannot be met."""
retval = pkg_resources.WorkingSet([])
for req in requirements:
dists = working_set.require(req)
for dist in dists: retval.add(dist)
return retval
[docs]def filter_working_set_soft(working_set, requirements):
"""Returns a new working set which contains only the paths to the required
packages. requirements that cannot be fulfilled are returned"""
unmet_requirements = []
retval = pkg_resources.WorkingSet([])
for req in requirements:
try:
dists = working_set.require(req)
for dist in dists: retval.add(dist)
except:
unmet_requirements.append(req)
return retval, unmet_requirements
[docs]def newest(buildout):
return bool_option(buildout, 'newest', 'true')
[docs]def offline(buildout):
return bool_option(buildout, 'offline', 'false')
[docs]def debug(buildout):
return bool_option(buildout, 'debug', 'false')
[docs]def verbose(buildout):
return bool_option(buildout, 'verbose', 'false')
[docs]def prefer_final(buildout):
return bool_option(buildout, 'prefer-final', 'true')
[docs]def eggs(buildout, options, name):
retval = options.get('eggs', buildout.get('eggs', ''))
return parse_list(retval)