Coverage for src/bob/bio/base/database/utils.py: 48%
227 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 21:41 +0100
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 21:41 +0100
1import bz2
2import glob
3import hashlib
4import io
5import logging
6import tarfile
7import warnings
8import zipfile
10from fnmatch import fnmatch
11from os import PathLike
12from pathlib import Path
13from typing import IO, Any, Callable, TextIO, Union
15import requests
17from clapper.rc import UserDefaults
19logger = logging.getLogger(__name__)
22def _get_local_data_directory() -> Path:
23 user_config = UserDefaults("bobrc.toml")
24 return Path(
25 user_config.get("bob_data_dir", default=Path.home() / "bob_data")
26 )
29def _path_and_subdir(
30 archive_path: Union[str, PathLike[str]],
31) -> tuple[Path, Union[Path, None]]:
32 """Splits an archive's path from a sub directory (separated by ``:``)."""
33 archive_path_str = Path(archive_path).as_posix()
34 if ":" in archive_path_str:
35 archive, sub_dir = archive_path_str.rsplit(":", 1)
36 return Path(archive), Path(sub_dir)
37 return Path(archive_path), None
40def _is_bz2(path: Union[str, PathLike[str]]) -> bool:
41 try:
42 with bz2.BZ2File(path) as f:
43 f.read(1024)
44 return True
45 except (OSError, EOFError):
46 return False
49def is_archive(path: Union[str, PathLike[str]]) -> bool:
50 """Returns whether the path points in an archive.
52 Any path pointing to a valid tar or zip archive or to a valid bz2
53 file will return ``True``.
54 """
55 archive = _path_and_subdir(path)[0]
56 try:
57 return any(
58 tester(_path_and_subdir(archive)[0])
59 for tester in (tarfile.is_tarfile, zipfile.is_zipfile, _is_bz2)
60 )
61 except (FileNotFoundError, IsADirectoryError):
62 return False
65def search_in_archive_and_open(
66 search_pattern: str,
67 archive_path: Union[str, PathLike[str]],
68 inner_dir: Union[str, PathLike[str], None] = None,
69 open_as_binary: bool = False,
70) -> Union[IO[bytes], TextIO, None]:
71 """Returns a read-only stream of a file matching a pattern in an archive.
73 Wildcards (``*``, ``?``, and ``**``) are supported (using
74 :meth:`pathlib.Path.glob`).
76 The first matching file will be open and returned.
78 examples:
80 .. code-block: text
82 archive.tar.gz
83 + subdir1
84 | + file1.txt
85 | + file2.txt
86 |
87 + subdir2
88 + file1.txt
90 ``search_and_open("archive.tar.gz", "file1.txt")``
91 opens``archive.tar.gz/subdir1/file1.txt``
93 ``search_and_open("archive.tar.gz:subdir2", "file1.txt")``
94 opens ``archive.tar.gz/subdir2/file1.txt``
96 ``search_and_open("archive.tar.gz", "*.txt")``
97 opens ``archive.tar.gz/subdir1/file1.txt``
100 Parameters
101 ----------
102 archive_path
103 The ``.tar.gz`` archive file containing the wanted file. To match
104 ``search_pattern`` in a sub path in that archive, append the sub path
105 to ``archive_path`` with a ``:`` (e.g.
106 ``/path/to/archive.tar.gz:sub/dir/``).
107 search_pattern
108 A string to match to the file. Wildcards are supported (Unix pattern
109 matching).
111 Returns
112 -------
113 io.TextIOBase or io.BytesIO
114 A read-only file stream.
115 """
117 archive_path = Path(archive_path)
119 if inner_dir is None:
120 archive_path, inner_dir = _path_and_subdir(archive_path)
122 if inner_dir is not None:
123 pattern = (Path("/") / inner_dir / search_pattern).as_posix()
124 else:
125 pattern = (Path("/") / search_pattern).as_posix()
127 if ".tar" in archive_path.suffixes:
128 tar_arch = tarfile.open(archive_path) # TODO File not closed
129 for member in tar_arch:
130 if member.isfile() and fnmatch("/" + member.name, pattern):
131 break
132 else:
133 logger.debug(
134 "No file matching '%s' were found in '%s'.",
135 pattern,
136 archive_path,
137 )
138 return None
140 if open_as_binary:
141 return tar_arch.extractfile(member)
142 return io.TextIOWrapper(tar_arch.extractfile(member), encoding="utf-8")
144 elif archive_path.suffix == ".zip":
145 zip_arch = zipfile.ZipFile(archive_path)
146 for name in zip_arch.namelist():
147 if fnmatch("/" + name, pattern):
148 break
149 else:
150 logger.debug(
151 "No file matching '%s' were found in '%s'.",
152 pattern,
153 archive_path,
154 )
155 return zip_arch.open(name)
157 raise ValueError(
158 f"Unknown file extension '{''.join(archive_path.suffixes)}'"
159 )
162def list_dir_in_archive(
163 archive_path: Union[str, PathLike[str]],
164 inner_dir: Union[str, PathLike[str], None] = None,
165 show_dirs: bool = True,
166 show_files: bool = True,
167) -> list[Path]:
168 """Returns a list of all the elements in an archive or inner directory.
170 Parameters
171 ----------
172 archive_path
173 A path to an archive, or an inner directory of an archive (appended
174 with a ``:``).
175 inner_dir
176 A path inside the archive with its root at the archive's root.
177 show_dirs
178 Returns directories.
179 show_files
180 Returns files.
181 """
183 archive_path, arch_inner_dir = _path_and_subdir(archive_path)
184 inner_dir = Path(inner_dir or arch_inner_dir or Path("."))
186 results = []
187 # Read the archive info and iterate over the paths. Return the ones we want.
188 if ".tar" in archive_path.suffixes:
189 with tarfile.open(archive_path) as arch:
190 for info in arch.getmembers():
191 path = Path(info.name)
192 if path.parent != inner_dir:
193 continue
194 if info.isdir() and show_dirs:
195 results.append(Path("/") / path)
196 if info.isfile() and show_files:
197 results.append(Path("/") / path)
198 elif archive_path.suffix == ".zip":
199 with zipfile.ZipFile(archive_path) as arch:
200 for zip_info in arch.infolist():
201 zip_path = zipfile.Path(archive_path, zip_info.filename)
202 if Path(zip_info.filename).parent != inner_dir:
203 continue
204 if zip_path.is_dir() and show_dirs:
205 results.append(Path("/") / zip_info.filename)
206 if not zip_path.is_dir() and show_files:
207 results.append(Path("/") / zip_info.filename)
208 elif archive_path.suffix == ".bz2":
209 if inner_dir != Path("."):
210 raise ValueError(
211 ".bz2 files don't have an inner structure (tried to access "
212 f"'{archive_path}:{inner_dir}')."
213 )
214 results.extend([Path(archive_path.stem)] if show_files else [])
215 else:
216 raise ValueError(
217 f"Unsupported archive extension '{''.join(archive_path.suffixes)}'."
218 )
219 return sorted(results) # Fixes inconsistent file ordering across platforms
222def extract_archive(
223 archive_path: Union[str, PathLike[str]],
224 inner_path: Union[str, PathLike[str], None] = None,
225 destination: Union[str, PathLike[str], None] = None,
226) -> Path:
227 """Extract an archive and returns the location of the extracted data.
229 Supports ``.zip``, ``.tar.gz``, ``.tar.bz2``, ``.tar.tgz``, and
230 ``.tar.tbz2`` archives.
231 Can also extract ``.bz2`` compressed files.
233 Parameters
234 ----------
235 archive_path
236 The compressed archive location. Pointing to a location inside a
237 tarball can be achieved by appending ``:`` and the desired member to
238 extract.
239 inner_path
240 A path with its root at the root of the archive file pointing to a
241 specific file to extract.
242 destination
243 The desired location of the extracted file or directory. If not
244 provided, the archive will be extracted where it stands (the parent of
245 ``archive_path``).
247 Returns
248 -------
249 pathlib.Path
250 The extracted file or directory location.
251 As an archive can contain any number of members, the parent directory
252 is returned (where the archive content is extracted).
254 Raises
255 ------
256 ValueError
257 When ``archive_path`` does not point to a file with a known extension.
258 """
260 archive_path, arch_inner_dir = _path_and_subdir(archive_path)
261 sub_dir = inner_path or arch_inner_dir
263 if destination is None:
264 destination = archive_path.parent
266 if ".tar" in archive_path.suffixes:
267 with tarfile.open(archive_path, mode="r") as arch:
268 if sub_dir is None:
269 arch.extractall(destination)
270 else:
271 arch.extract(Path(sub_dir).as_posix(), destination)
272 elif ".zip" == archive_path.suffix:
273 with zipfile.ZipFile(archive_path) as arch:
274 if sub_dir is None:
275 arch.extractall(destination)
276 else:
277 arch.extract(Path(sub_dir).as_posix(), destination)
278 elif ".bz2" == archive_path.suffix:
279 if sub_dir is not None:
280 warnings.warn(
281 f"Ignored sub directory ({sub_dir}). Not supported for `.bz2` "
282 "files.",
283 RuntimeWarning,
284 )
285 extracted_file = destination / Path(archive_path.stem)
286 with bz2.BZ2File(archive_path) as arch, extracted_file.open(
287 "wb"
288 ) as dest:
289 dest.write(arch.read())
290 else:
291 raise ValueError(
292 f"Unknown file extension: {''.join(archive_path.suffixes)}"
293 )
294 return Path(destination)
297def search_and_open(
298 search_pattern: str,
299 base_dir: Union[PathLike, None] = None,
300 sub_dir: Union[PathLike, None] = None,
301 open_as_binary: bool = False,
302 **kwargs,
303) -> Union[IO[bytes], TextIO, None]:
304 """Searches for a matching file recursively in a directory.
306 If ``base_dir`` points to an archive, the pattern will be searched inside
307 that archive.
309 Wildcards (``*``, ``?``, and ``**``) are supported (using
310 :meth:`pathlib.Path.glob`).
312 Parameters
313 ----------
314 search_pattern
315 A string containing the wanted path pattern of the file to open.
316 Supports ``fnmatch`` notation (``*``, ``**``, and ``?``).
317 base_dir
318 A path to a directory to search into. By default, will use the
319 ``data_path`` user configuration.
320 sub_dir
321 A sub directory of ``base_dir`` to search into instead. Useful when
322 using the default value of ``base_dir`` but still wanting to use a
323 sub directory in there.
324 open_as_binary
325 Will open the file as a binary stream instead of a text file.
327 Returns
328 -------
329 IO
330 A read-only open file stream.
331 """
333 if base_dir is None:
334 base_dir = _get_local_data_directory()
336 if is_archive(base_dir):
337 return search_in_archive_and_open(
338 search_pattern=search_pattern,
339 archive_path=base_dir,
340 inner_dir=sub_dir, # TODO not ok with config data_path / subdir
341 **kwargs,
342 )
344 # If the input is local
345 base_dir = Path(base_dir)
346 final_dir = base_dir / sub_dir if sub_dir else base_dir
347 if final_dir.is_dir():
348 # we prepend './' to search_pattern because it might start with '/'
349 pattern = final_dir / "**" / f"./{search_pattern}"
350 for path in glob.iglob(pattern.as_posix(), recursive=True):
351 if not Path(path).is_file():
352 continue
353 return open(path, mode="rb" if open_as_binary else "rt")
354 raise FileNotFoundError(
355 f"Unable to locate and open a file that matches '{pattern}' in "
356 f"'{final_dir}'."
357 )
359 return open(final_dir, mode="rb" if open_as_binary else "rt")
362def list_dir(
363 base_directory: PathLike,
364 sub_directory: Union[PathLike, None] = None,
365 show_files: bool = True,
366 show_dirs: bool = True,
367) -> list[Path]:
368 """Lists all directories and/or files in a directory (non-recursively)."""
369 base_directory = Path(base_directory)
371 if is_archive(base_directory):
372 return list_dir_in_archive(
373 archive_path=base_directory,
374 inner_dir=sub_directory,
375 show_dirs=show_dirs,
376 show_files=show_files,
377 )
379 # Not an archive
380 final_directory = (
381 base_directory
382 if sub_directory is None
383 else base_directory / sub_directory
384 )
385 glob = list(final_directory.glob("*"))
386 if not show_dirs:
387 glob = [g for g in glob if not g.is_dir()]
388 if not show_files:
389 glob = [g for g in glob if not g.is_file()]
390 return glob
393def md5_hash(readable: Any, chunk_size: int = 65535) -> str:
394 """Computes the md5 hash of any object with a read method."""
395 hasher = hashlib.md5()
396 for chunk in iter(lambda: readable.read(chunk_size), b""):
397 hasher.update(chunk)
398 return hasher.hexdigest()
401def sha256_hash(readable: Any, chunk_size: int = 65535) -> str:
402 """Computes the SHA256 hash of any object with a read method."""
403 hasher = hashlib.sha256()
404 for chunk in iter(lambda: readable.read(chunk_size), b""):
405 hasher.update(chunk)
406 return hasher.hexdigest()
409def verify_file(
410 file_path: Union[str, PathLike[str]],
411 file_hash: str,
412 hash_fct: Callable[[Any, int], str] = sha256_hash,
413 full_match: bool = False,
414) -> bool:
415 """Returns True if the file computed hash corresponds to `file_hash`.
417 For comfort, we allow ``file_hash`` to match with the first
418 characters of the digest, allowing storing only e.g. the first 8
419 char.
421 Parameters
422 ----------
423 file_path
424 The path to the file needing verification.
425 file_hash
426 The expected file hash digest.
427 hash_fct
428 A function taking a path and returning a digest. Defaults to SHA256.
429 full_match
430 If set to False, allows ``file_hash`` to match the first characters of
431 the files digest (this allows storing e.g. 8 chars of a digest instead
432 of the whole 64 characters of SHA256, and still matching.)
433 """
434 file_path = Path(file_path)
435 with file_path.open("rb") as f:
436 digest = hash_fct(f, 65535)
437 return digest == file_hash if full_match else digest.startswith(file_hash)
440def compute_crc(
441 file_path: Union[str, PathLike[str]],
442 hash_fct: Callable[[Any, int], str] = sha256_hash,
443) -> str:
444 """Returns the CRC of a file."""
445 file_path = Path(file_path)
446 with file_path.open("rb") as f:
447 return hash_fct(f, 65535)
450def _infer_filename_from_urls(urls=Union[list[str], str]) -> str:
451 """Retrieves the remote filename from the URLs.
453 Parameters
454 ----------
455 urls
456 One or multiple URLs pointing to files with the same name.
458 Returns
459 -------
460 The remote file name.
462 Raises
463 ------
464 ValueError
465 When urls point to files with different names.
466 """
467 if isinstance(urls, str):
468 return urls.split("/")[-1]
470 # Check that all urls point to the same file name
471 names = [u.split("/")[-1] for u in urls]
472 if not all(n == names[0] for n in names):
473 raise ValueError(
474 "Cannot infer file name when urls point to different files "
475 f"({names=})."
476 )
477 return urls[0].split("/")[-1]
480def download_file(
481 urls: Union[list[str], str],
482 destination_directory: Union[str, PathLike[str], None] = None,
483 destination_sub_directory: Union[str, None] = None,
484 destination_filename: Union[str, None] = None,
485 checksum: Union[str, None] = None,
486 checksum_fct: Callable[[Any, int], str] = sha256_hash,
487 force: bool = False,
488 extract: bool = False,
489 makedirs: bool = True,
490 checksum_mismatch_download_attempts: int = 2,
491) -> Path:
492 """Downloads a remote file locally.
494 This will overwrite any existing file with the same name.
496 Parameters
497 ----------
498 urls
499 The remote location of the server. If multiple addresses are given, we
500 will try to download from them in order until one succeeds.
501 destination_directory
502 A path to a local directory where the file will be saved. If omitted,
503 the file will be saved in the folder pointed by the ``bob_data_dir`` key
504 in the user configuration.
505 destination_sub_directory
506 An additional layer added to the destination directory (useful when
507 using ``destination_directory=None``).
508 destination_filename
509 The final name of the local file. If omitted, the file will keep the
510 name of the remote file.
511 checksum
512 When provided, will compute the file's checksum and compare to this.
513 force
514 Re-download and overwrite any existing file with the same name.
515 extract
516 Extract an archive or zip file next to the downloaded file.
517 If this is set, the parent directory path will be returned.
518 makedirs
519 Automatically make the parent directories of the new local file.
520 checksum_mismatch_download_attempts
521 Number of download attempts when the checksum does not match after
522 downloading, must be 1 or more.
524 Returns
525 -------
526 The path to the new local file (or the parent directory if ``extract`` is
527 True).
529 Raises
530 ------
531 RuntimeError
532 When the URLs provided are all invalid.
533 ValueError
534 - When ``destination_filename`` is omitted and URLs point to files with
535 different names.
536 - When the checksum of the file does not correspond to the provided
537 ``checksum``.
538 """
540 if destination_filename is None:
541 destination_filename = _infer_filename_from_urls(urls=urls)
543 if destination_directory is None:
544 destination_directory = _get_local_data_directory()
546 destination_directory = Path(destination_directory)
548 if destination_sub_directory is not None:
549 destination_directory = (
550 destination_directory / destination_sub_directory
551 )
553 if checksum_mismatch_download_attempts < 1:
554 logger.warning(
555 "'Checksum_mismatch_download_attempts' must be greater than 0 "
556 "(got %d). Setting it to 1.",
557 checksum_mismatch_download_attempts,
558 )
559 checksum_mismatch_download_attempts = 1
561 local_file = destination_directory / destination_filename
562 needs_download = True
564 if force or not local_file.is_file():
565 if not force:
566 logger.info(f"File {local_file} is not present. Needs download.")
567 needs_download = True
568 elif local_file.is_file():
569 file_ok = verify_file(local_file, checksum, hash_fct=checksum_fct)
570 if not file_ok:
571 logger.info(
572 f"File {local_file} does not checksum to '{checksum=}'."
573 )
574 needs_download = True
575 elif not force and checksum is not None and file_ok:
576 logger.info(f"File {local_file} already exists, skipping download.")
577 needs_download = False
579 if needs_download:
580 for current_download_try in range(checksum_mismatch_download_attempts):
581 if isinstance(urls, str):
582 urls = [urls]
584 for tries, url in enumerate(urls):
585 logger.debug("Retrieving file from '%s'.", url)
586 try:
587 response = requests.get(url=url, timeout=10)
588 except requests.exceptions.ConnectionError as e:
589 if tries < len(urls) - 1:
590 logger.info(
591 "Could not connect to %s. Trying other URLs.",
592 url,
593 )
594 logger.debug(e)
595 continue
597 logger.debug(
598 "http response: '%d: %s'.",
599 response.status_code,
600 response.reason,
601 )
603 if response.ok:
604 logger.debug("Got file from %s.", url)
605 break
606 elif tries < len(urls) - 1:
607 logger.info(
608 "Failed to get file from %s, trying other URLs.", url
609 )
610 logger.debug("requests.response was:\n%s", response)
611 else:
612 raise RuntimeError(
613 "Could not retrieve file from any of the provided URLs! "
614 f"({urls=})"
615 )
617 if makedirs:
618 local_file.parent.mkdir(parents=True, exist_ok=True)
620 with local_file.open("wb") as f:
621 f.write(response.content)
623 if checksum is not None:
624 if not verify_file(local_file, checksum, hash_fct=checksum_fct):
625 if not needs_download:
626 raise ValueError(
627 f"The local file hash does not correspond to '{checksum}' "
628 f"and {force=} prevents overwriting."
629 )
630 raise ValueError(
631 "The downloaded file hash ('"
632 f"{compute_crc(local_file, hash_fct=checksum_fct)}') does not "
633 f"correspond to '{checksum}'."
634 )
636 if extract:
637 # Extract only if the file was re-downloaded
638 if needs_download:
639 local_file = extract_archive(local_file)
640 else:
641 # Mimic the behavior of extract_archive
642 local_file = local_file.parent
644 return local_file