#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
"""Mirroring functionality for conda channels
Some constructs are bluntly copied from
https://github.com/valassis-digital-media/conda-mirror
"""
import bz2
import fnmatch
import hashlib
import json
import os
import random
import tempfile
import time
import requests
from .log import get_logger
logger = get_logger(__name__)
def _download(url, target_directory):
"""Download `url` to `target_directory`
Parameters
----------
url : str
The url to download
target_directory : str
The path to a directory where `url` should be downloaded
Returns
-------
file_size: int
The size in bytes of the file that was downloaded
"""
file_size = 0
chunk_size = 1024 # 1KB chunks
logger.info("Download %s -> %s", url, target_directory)
# create a temporary file
target_filename = url.split("/")[-1]
download_filename = os.path.join(target_directory, target_filename)
with open(download_filename, "w+b") as tf:
ret = requests.get(url, stream=True)
size = ret.headers.get("Content-length", "??")
logger.debug("Saving to %s (%s bytes)", download_filename, size)
for data in ret.iter_content(chunk_size):
tf.write(data)
file_size = os.path.getsize(download_filename)
return file_size
def _list_conda_packages(local_dir):
"""List the conda packages (*.tar.bz2 or *.conda files) in `local_dir`
Parameters
----------
local_dir : str
Some local directory with (hopefully) some conda packages in it
Returns
-------
list
List of conda packages in `local_dir`
"""
contents = os.listdir(local_dir)
return fnmatch.filter(contents, "*.conda") + fnmatch.filter(
contents, "*.tar.bz2"
)
[docs]def get_json(channel, platform, name):
"""Get a JSON file for a channel/platform combo on conda channel
Parameters
----------
channel : str
Complete channel URL
platform : {'linux-64', 'linux-aarch64', 'osx-64', 'osx-arm64', 'noarch'}
The platform of interest
name : str
The name of the file to retrieve. If the name ends in '.bz2', then it
is auto-decompressed
Returns
-------
repodata : dict
contents of repodata.json
Raises
------
RuntimeError :
If the URL cannot be reached
"""
url = channel + "/" + platform + "/" + name
logger.debug("[checking] %s...", url)
r = requests.get(url, allow_redirects=True, stream=True)
if r.status_code == 404:
raise RuntimeError("URL '%s' does not exist" % url)
size = r.headers.get("Content-length", "??")
logger.info("[download] %s (%s bytes)...", url, size)
if name.endswith(".bz2"):
# just in case transport encoding was applied
r.raw.decode_content = True
data = bz2.decompress(r.raw.read())
return json.loads(data)
# else, just decodes the response
return r.json()
[docs]def get_local_contents(path, arch):
"""Returns the local package contents as a set"""
path_arch = os.path.join(path, arch)
if not os.path.exists(path_arch):
return set()
# path exists, lists currently available packages
logger.info("Listing package contents of %s...", path_arch)
contents = os.listdir(path_arch)
return set(
fnmatch.filter(contents, "*.tar.bz2")
+ fnmatch.filter(contents, "*.conda")
)
[docs]def load_glob_list(path):
"""Loads a list of globs from a configuration file
Excludes comments and empty lines
"""
retval = [str(k.strip()) for k in open(path, "rt")]
return [k for k in retval if k and k[0] not in ("#", "-")]
[docs]def blacklist_filter(packages, globs):
"""Filters **out** the input package set with the glob list"""
to_remove = set()
for k in globs:
to_remove |= set(fnmatch.filter(packages, k))
return packages - to_remove
[docs]def whitelist_filter(packages, globs):
"""Filters **in** the input package set with the glob list"""
to_keep = set()
for k in globs:
to_keep |= set(fnmatch.filter(packages, k))
return to_keep
def _sha256sum(filename):
"""Calculates and returns the sha-256 sum given a file name"""
h = hashlib.sha256()
b = bytearray(128 * 1024)
mv = memoryview(b)
with open(filename, "rb", buffering=0) as f:
for n in iter(lambda: f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
def _md5sum(filename):
"""Calculates and returns the md5 sum given a file name"""
h = hashlib.md5()
b = bytearray(128 * 1024)
mv = memoryview(b)
with open(filename, "rb", buffering=0) as f:
for n in iter(lambda: f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
[docs]def download_packages(packages, repodata, channel_url, dest_dir, arch, dry_run):
"""Downloads remote packages to a download directory
Packages are downloaded first to a temporary directory, then validated
according to the expected sha256/md5 sum and then moved, one by one, to the
destination directory. An error is raised if the package cannot be
correctly downloaded.
Parameters
----------
packages : list of str
List of packages to download from the remote channel
repodata: dict
A dictionary containing the remote repodata.json contents
channel_url: str
The complete channel URL
dest_dir: str
The local directory where the channel is being mirrored
arch: str
The current architecture which we are mirroring
dry_run: bool
A boolean flag indicating if this is just a dry-run (simulation),
flagging so we don't really do anything (set to ``True``).
"""
# download files into temporary directory, that is removed by the end of
# the procedure, or if something bad occurs
with tempfile.TemporaryDirectory() as download_dir:
total = len(packages)
for k, p in enumerate(packages):
k += 1 # adjust to produce correct order on printouts
# checksum to verify
if p.endswith(".tar.bz2"):
expected_hash = repodata["packages"][p].get(
"sha256", repodata["packages"][p]["md5"]
)
else:
expected_hash = repodata["packages.conda"][p].get(
"sha256", repodata["packages.conda"][p]["md5"]
)
# download package to file in our temporary directory
url = channel_url + "/" + arch + "/" + p
temp_dest = os.path.join(download_dir, p)
logger.info("[download: %d/%d] %s -> %s", k, total, url, temp_dest)
package_retries = 10
while package_retries:
if not dry_run:
logger.debug("[checking: %d/%d] %s", k, total, url)
r = requests.get(url, stream=True, allow_redirects=True)
size = r.headers.get("Content-length", "??")
logger.info(
"[download: %d/%d] %s -> %s (%s bytes)",
k,
total,
url,
temp_dest,
size,
)
open(temp_dest, "wb").write(r.raw.read())
# verify that checksum matches
if len(expected_hash) == 32: # md5
logger.info(
"[verify: %d/%d] md5(%s) == %s?",
k,
total,
temp_dest,
expected_hash,
)
else: # sha256
logger.info(
"[verify: %d/%d] sha256(%s) == %s?",
k,
total,
temp_dest,
expected_hash,
)
if not dry_run:
if len(expected_hash) == 32: # md5
actual_hash = _md5sum(temp_dest)
else: # sha256
actual_hash = _sha256sum(temp_dest)
if actual_hash != expected_hash:
wait_time = random.randint(10, 61)
logger.warning(
"Checksum of locally downloaded "
"version of %s does not match "
"(actual:%r != %r:expected) - retrying "
"after %d seconds",
url,
actual_hash,
expected_hash,
wait_time,
)
os.unlink(temp_dest)
time.sleep(wait_time)
package_retries -= 1
continue
else:
break
# final check, before we continue
assert actual_hash == expected_hash, (
"Checksum of locally "
"downloaded version of %s does not match "
"(actual:%r != %r:expected)" % (url, actual_hash, expected_hash)
)
# move
local_dest = os.path.join(dest_dir, arch, p)
logger.info(
"[move: %d/%d] %s -> %s", k, total, temp_dest, local_dest
)
# check local directory is available before moving
dirname = os.path.dirname(local_dest)
if not os.path.exists(dirname):
logger.info("[mkdir] %s", dirname)
if not dry_run:
os.makedirs(dirname)
if not dry_run:
os.rename(temp_dest, local_dest)
[docs]def remove_packages(packages, dest_dir, arch, dry_run):
"""Removes local packages that no longer matter"""
total = len(packages)
for k, p in enumerate(packages):
k += 1 # adjust to produce correct order on printouts
path = os.path.join(dest_dir, arch, p)
logger.info("[remove: %d/%d] %s", k, total, path)
if not dry_run:
os.unlink(path)
def _cleanup_json(data, packages):
"""Cleans-up the contents of conda JSON looking at existing packages"""
# only keys to clean-up here, othere keys remain unchanged
for key in ("packages", "packages.conda"):
if key not in data:
continue
data[key] = dict((k, v) for k, v in data[key].items() if k in packages)
return data
def _save_json(data, dest_dir, arch, name, dry_run):
"""Saves contents of conda JSON"""
destfile = os.path.join(dest_dir, arch, name)
if not dry_run:
with open(destfile, "w") as outfile:
json.dump(data, outfile, ensure_ascii=True, indent=2)
return destfile
[docs]def copy_and_clean_json(url, dest_dir, arch, name, dry_run):
"""Copies and cleans conda JSON file"""
data = get_json(url, arch, name)
packages = get_local_contents(dest_dir, arch)
data = _cleanup_json(data, packages)
return _save_json(data, dest_dir, arch, name, dry_run)
[docs]def copy_and_clean_patch(url, dest_dir, arch, name, dry_run):
"""Copies and cleans conda patch_instructions JSON file"""
data = get_json(url, arch, name)
packages = get_local_contents(dest_dir, arch)
data = _cleanup_json(data, packages)
# cleanup specific patch_instructions.json fields
for key in ["remove", "revoke"]:
data[key] = [k for k in data[key] if k in packages]
return _save_json(data, dest_dir, arch, name, dry_run)
[docs]def checksum_packages(repodata, dest_dir, arch, packages):
"""Checksums packages on the local mirror and compare to remote repository
Parameters
----------
repodata : dict
Data loaded from `repodata.json` on the remote repository
dest_dir : str
Path leading to local mirror
arch : str
Current architecture being considered (e.g. noarch, linux-64,
linux-aarch64, osx-64, osx-arm64)
packages : list
List of packages that are available locally, by name
Returns
-------
issues : list
List of matching errors
"""
issues = set()
total = len(packages)
for k, p in enumerate(packages):
path_to_package = os.path.join(dest_dir, arch, p)
# checksum to verify
if p.endswith(".tar.bz2"):
expected_hash = repodata["packages"][p].get(
"sha256", repodata["packages"][p]["md5"]
)
else:
expected_hash = repodata["packages.conda"][p].get(
"sha256", repodata["packages.conda"][p]["md5"]
)
# verify that checksum matches
if len(expected_hash) == 32: # md5
logger.debug(
"[verify: %d/%d] md5(%s) == %s?",
k,
total,
path_to_package,
expected_hash,
)
else: # sha256
logger.debug(
"[verify: %d/%d] sha256(%s) == %s?",
k,
total,
path_to_package,
expected_hash,
)
if len(expected_hash) == 32: # md5
actual_hash = _md5sum(path_to_package)
else: # sha256
actual_hash = _sha256sum(path_to_package)
if actual_hash != expected_hash:
logger.warning(
"Checksum of %s does not match remote "
"repository description (actual:%r != %r:expected)",
path_to_package,
actual_hash,
expected_hash,
)
issues.add(p)
return issues