Coverage for src/bob/bio/base/database/utils.py: 48%

227 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-12 22:34 +0200

1import bz2 

2import glob 

3import hashlib 

4import io 

5import logging 

6import tarfile 

7import warnings 

8import zipfile 

9 

10from fnmatch import fnmatch 

11from os import PathLike 

12from pathlib import Path 

13from typing import IO, Any, Callable, TextIO, Union 

14 

15import requests 

16 

17from clapper.rc import UserDefaults 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22def _get_local_data_directory() -> Path: 

23 user_config = UserDefaults("bobrc.toml") 

24 return Path( 

25 user_config.get("bob_data_dir", default=Path.home() / "bob_data") 

26 ) 

27 

28 

29def _path_and_subdir( 

30 archive_path: Union[str, PathLike[str]], 

31) -> tuple[Path, Union[Path, None]]: 

32 """Splits an archive's path from a sub directory (separated by ``:``).""" 

33 archive_path_str = Path(archive_path).as_posix() 

34 if ":" in archive_path_str: 

35 archive, sub_dir = archive_path_str.rsplit(":", 1) 

36 return Path(archive), Path(sub_dir) 

37 return Path(archive_path), None 

38 

39 

40def _is_bz2(path: Union[str, PathLike[str]]) -> bool: 

41 try: 

42 with bz2.BZ2File(path) as f: 

43 f.read(1024) 

44 return True 

45 except (OSError, EOFError): 

46 return False 

47 

48 

49def is_archive(path: Union[str, PathLike[str]]) -> bool: 

50 """Returns whether the path points in an archive. 

51 

52 Any path pointing to a valid tar or zip archive or to a valid bz2 

53 file will return ``True``. 

54 """ 

55 archive = _path_and_subdir(path)[0] 

56 try: 

57 return any( 

58 tester(_path_and_subdir(archive)[0]) 

59 for tester in (tarfile.is_tarfile, zipfile.is_zipfile, _is_bz2) 

60 ) 

61 except (FileNotFoundError, IsADirectoryError): 

62 return False 

63 

64 

65def search_in_archive_and_open( 

66 search_pattern: str, 

67 archive_path: Union[str, PathLike[str]], 

68 inner_dir: Union[str, PathLike[str], None] = None, 

69 open_as_binary: bool = False, 

70) -> Union[IO[bytes], TextIO, None]: 

71 """Returns a read-only stream of a file matching a pattern in an archive. 

72 

73 Wildcards (``*``, ``?``, and ``**``) are supported (using 

74 :meth:`pathlib.Path.glob`). 

75 

76 The first matching file will be open and returned. 

77 

78 examples: 

79 

80 .. code-block: text 

81 

82 archive.tar.gz 

83 + subdir1 

84 | + file1.txt 

85 | + file2.txt 

86 | 

87 + subdir2 

88 + file1.txt 

89 

90 ``search_and_open("archive.tar.gz", "file1.txt")`` 

91 opens``archive.tar.gz/subdir1/file1.txt`` 

92 

93 ``search_and_open("archive.tar.gz:subdir2", "file1.txt")`` 

94 opens ``archive.tar.gz/subdir2/file1.txt`` 

95 

96 ``search_and_open("archive.tar.gz", "*.txt")`` 

97 opens ``archive.tar.gz/subdir1/file1.txt`` 

98 

99 

100 Parameters 

101 ---------- 

102 archive_path 

103 The ``.tar.gz`` archive file containing the wanted file. To match 

104 ``search_pattern`` in a sub path in that archive, append the sub path 

105 to ``archive_path`` with a ``:`` (e.g. 

106 ``/path/to/archive.tar.gz:sub/dir/``). 

107 search_pattern 

108 A string to match to the file. Wildcards are supported (Unix pattern 

109 matching). 

110 

111 Returns 

112 ------- 

113 io.TextIOBase or io.BytesIO 

114 A read-only file stream. 

115 """ 

116 

117 archive_path = Path(archive_path) 

118 

119 if inner_dir is None: 

120 archive_path, inner_dir = _path_and_subdir(archive_path) 

121 

122 if inner_dir is not None: 

123 pattern = (Path("/") / inner_dir / search_pattern).as_posix() 

124 else: 

125 pattern = (Path("/") / search_pattern).as_posix() 

126 

127 if ".tar" in archive_path.suffixes: 

128 tar_arch = tarfile.open(archive_path) # TODO File not closed 

129 for member in tar_arch: 

130 if member.isfile() and fnmatch("/" + member.name, pattern): 

131 break 

132 else: 

133 logger.debug( 

134 "No file matching '%s' were found in '%s'.", 

135 pattern, 

136 archive_path, 

137 ) 

138 return None 

139 

140 if open_as_binary: 

141 return tar_arch.extractfile(member) 

142 return io.TextIOWrapper(tar_arch.extractfile(member), encoding="utf-8") 

143 

144 elif archive_path.suffix == ".zip": 

145 zip_arch = zipfile.ZipFile(archive_path) 

146 for name in zip_arch.namelist(): 

147 if fnmatch("/" + name, pattern): 

148 break 

149 else: 

150 logger.debug( 

151 "No file matching '%s' were found in '%s'.", 

152 pattern, 

153 archive_path, 

154 ) 

155 return zip_arch.open(name) 

156 

157 raise ValueError( 

158 f"Unknown file extension '{''.join(archive_path.suffixes)}'" 

159 ) 

160 

161 

162def list_dir_in_archive( 

163 archive_path: Union[str, PathLike[str]], 

164 inner_dir: Union[str, PathLike[str], None] = None, 

165 show_dirs: bool = True, 

166 show_files: bool = True, 

167) -> list[Path]: 

168 """Returns a list of all the elements in an archive or inner directory. 

169 

170 Parameters 

171 ---------- 

172 archive_path 

173 A path to an archive, or an inner directory of an archive (appended 

174 with a ``:``). 

175 inner_dir 

176 A path inside the archive with its root at the archive's root. 

177 show_dirs 

178 Returns directories. 

179 show_files 

180 Returns files. 

181 """ 

182 

183 archive_path, arch_inner_dir = _path_and_subdir(archive_path) 

184 inner_dir = Path(inner_dir or arch_inner_dir or Path(".")) 

185 

186 results = [] 

187 # Read the archive info and iterate over the paths. Return the ones we want. 

188 if ".tar" in archive_path.suffixes: 

189 with tarfile.open(archive_path) as arch: 

190 for info in arch.getmembers(): 

191 path = Path(info.name) 

192 if path.parent != inner_dir: 

193 continue 

194 if info.isdir() and show_dirs: 

195 results.append(Path("/") / path) 

196 if info.isfile() and show_files: 

197 results.append(Path("/") / path) 

198 elif archive_path.suffix == ".zip": 

199 with zipfile.ZipFile(archive_path) as arch: 

200 for zip_info in arch.infolist(): 

201 zip_path = zipfile.Path(archive_path, zip_info.filename) 

202 if Path(zip_info.filename).parent != inner_dir: 

203 continue 

204 if zip_path.is_dir() and show_dirs: 

205 results.append(Path("/") / zip_info.filename) 

206 if not zip_path.is_dir() and show_files: 

207 results.append(Path("/") / zip_info.filename) 

208 elif archive_path.suffix == ".bz2": 

209 if inner_dir != Path("."): 

210 raise ValueError( 

211 ".bz2 files don't have an inner structure (tried to access " 

212 f"'{archive_path}:{inner_dir}')." 

213 ) 

214 results.extend([Path(archive_path.stem)] if show_files else []) 

215 else: 

216 raise ValueError( 

217 f"Unsupported archive extension '{''.join(archive_path.suffixes)}'." 

218 ) 

219 return sorted(results) # Fixes inconsistent file ordering across platforms 

220 

221 

222def extract_archive( 

223 archive_path: Union[str, PathLike[str]], 

224 inner_path: Union[str, PathLike[str], None] = None, 

225 destination: Union[str, PathLike[str], None] = None, 

226) -> Path: 

227 """Extract an archive and returns the location of the extracted data. 

228 

229 Supports ``.zip``, ``.tar.gz``, ``.tar.bz2``, ``.tar.tgz``, and 

230 ``.tar.tbz2`` archives. 

231 Can also extract ``.bz2`` compressed files. 

232 

233 Parameters 

234 ---------- 

235 archive_path 

236 The compressed archive location. Pointing to a location inside a 

237 tarball can be achieved by appending ``:`` and the desired member to 

238 extract. 

239 inner_path 

240 A path with its root at the root of the archive file pointing to a 

241 specific file to extract. 

242 destination 

243 The desired location of the extracted file or directory. If not 

244 provided, the archive will be extracted where it stands (the parent of 

245 ``archive_path``). 

246 

247 Returns 

248 ------- 

249 pathlib.Path 

250 The extracted file or directory location. 

251 As an archive can contain any number of members, the parent directory 

252 is returned (where the archive content is extracted). 

253 

254 Raises 

255 ------ 

256 ValueError 

257 When ``archive_path`` does not point to a file with a known extension. 

258 """ 

259 

260 archive_path, arch_inner_dir = _path_and_subdir(archive_path) 

261 sub_dir = inner_path or arch_inner_dir 

262 

263 if destination is None: 

264 destination = archive_path.parent 

265 

266 if ".tar" in archive_path.suffixes: 

267 with tarfile.open(archive_path, mode="r") as arch: 

268 if sub_dir is None: 

269 arch.extractall(destination) 

270 else: 

271 arch.extract(Path(sub_dir).as_posix(), destination) 

272 elif ".zip" == archive_path.suffix: 

273 with zipfile.ZipFile(archive_path) as arch: 

274 if sub_dir is None: 

275 arch.extractall(destination) 

276 else: 

277 arch.extract(Path(sub_dir).as_posix(), destination) 

278 elif ".bz2" == archive_path.suffix: 

279 if sub_dir is not None: 

280 warnings.warn( 

281 f"Ignored sub directory ({sub_dir}). Not supported for `.bz2` " 

282 "files.", 

283 RuntimeWarning, 

284 ) 

285 extracted_file = destination / Path(archive_path.stem) 

286 with bz2.BZ2File(archive_path) as arch, extracted_file.open( 

287 "wb" 

288 ) as dest: 

289 dest.write(arch.read()) 

290 else: 

291 raise ValueError( 

292 f"Unknown file extension: {''.join(archive_path.suffixes)}" 

293 ) 

294 return Path(destination) 

295 

296 

297def search_and_open( 

298 search_pattern: str, 

299 base_dir: Union[PathLike, None] = None, 

300 sub_dir: Union[PathLike, None] = None, 

301 open_as_binary: bool = False, 

302 **kwargs, 

303) -> Union[IO[bytes], TextIO, None]: 

304 """Searches for a matching file recursively in a directory. 

305 

306 If ``base_dir`` points to an archive, the pattern will be searched inside 

307 that archive. 

308 

309 Wildcards (``*``, ``?``, and ``**``) are supported (using 

310 :meth:`pathlib.Path.glob`). 

311 

312 Parameters 

313 ---------- 

314 search_pattern 

315 A string containing the wanted path pattern of the file to open. 

316 Supports ``fnmatch`` notation (``*``, ``**``, and ``?``). 

317 base_dir 

318 A path to a directory to search into. By default, will use the 

319 ``data_path`` user configuration. 

320 sub_dir 

321 A sub directory of ``base_dir`` to search into instead. Useful when 

322 using the default value of ``base_dir`` but still wanting to use a 

323 sub directory in there. 

324 open_as_binary 

325 Will open the file as a binary stream instead of a text file. 

326 

327 Returns 

328 ------- 

329 IO 

330 A read-only open file stream. 

331 """ 

332 

333 if base_dir is None: 

334 base_dir = _get_local_data_directory() 

335 

336 if is_archive(base_dir): 

337 return search_in_archive_and_open( 

338 search_pattern=search_pattern, 

339 archive_path=base_dir, 

340 inner_dir=sub_dir, # TODO not ok with config data_path / subdir 

341 **kwargs, 

342 ) 

343 

344 # If the input is local 

345 base_dir = Path(base_dir) 

346 final_dir = base_dir / sub_dir if sub_dir else base_dir 

347 if final_dir.is_dir(): 

348 # we prepend './' to search_pattern because it might start with '/' 

349 pattern = final_dir / "**" / f"./{search_pattern}" 

350 for path in glob.iglob(pattern.as_posix(), recursive=True): 

351 if not Path(path).is_file(): 

352 continue 

353 return open(path, mode="rb" if open_as_binary else "rt") 

354 raise FileNotFoundError( 

355 f"Unable to locate and open a file that matches '{pattern}' in " 

356 f"'{final_dir}'." 

357 ) 

358 

359 return open(final_dir, mode="rb" if open_as_binary else "rt") 

360 

361 

362def list_dir( 

363 base_directory: PathLike, 

364 sub_directory: Union[PathLike, None] = None, 

365 show_files: bool = True, 

366 show_dirs: bool = True, 

367) -> list[Path]: 

368 """Lists all directories and/or files in a directory (non-recursively).""" 

369 base_directory = Path(base_directory) 

370 

371 if is_archive(base_directory): 

372 return list_dir_in_archive( 

373 archive_path=base_directory, 

374 inner_dir=sub_directory, 

375 show_dirs=show_dirs, 

376 show_files=show_files, 

377 ) 

378 

379 # Not an archive 

380 final_directory = ( 

381 base_directory 

382 if sub_directory is None 

383 else base_directory / sub_directory 

384 ) 

385 glob = list(final_directory.glob("*")) 

386 if not show_dirs: 

387 glob = [g for g in glob if not g.is_dir()] 

388 if not show_files: 

389 glob = [g for g in glob if not g.is_file()] 

390 return glob 

391 

392 

393def md5_hash(readable: Any, chunk_size: int = 65535) -> str: 

394 """Computes the md5 hash of any object with a read method.""" 

395 hasher = hashlib.md5() 

396 for chunk in iter(lambda: readable.read(chunk_size), b""): 

397 hasher.update(chunk) 

398 return hasher.hexdigest() 

399 

400 

401def sha256_hash(readable: Any, chunk_size: int = 65535) -> str: 

402 """Computes the SHA256 hash of any object with a read method.""" 

403 hasher = hashlib.sha256() 

404 for chunk in iter(lambda: readable.read(chunk_size), b""): 

405 hasher.update(chunk) 

406 return hasher.hexdigest() 

407 

408 

409def verify_file( 

410 file_path: Union[str, PathLike[str]], 

411 file_hash: str, 

412 hash_fct: Callable[[Any, int], str] = sha256_hash, 

413 full_match: bool = False, 

414) -> bool: 

415 """Returns True if the file computed hash corresponds to `file_hash`. 

416 

417 For comfort, we allow ``file_hash`` to match with the first 

418 characters of the digest, allowing storing only e.g. the first 8 

419 char. 

420 

421 Parameters 

422 ---------- 

423 file_path 

424 The path to the file needing verification. 

425 file_hash 

426 The expected file hash digest. 

427 hash_fct 

428 A function taking a path and returning a digest. Defaults to SHA256. 

429 full_match 

430 If set to False, allows ``file_hash`` to match the first characters of 

431 the files digest (this allows storing e.g. 8 chars of a digest instead 

432 of the whole 64 characters of SHA256, and still matching.) 

433 """ 

434 file_path = Path(file_path) 

435 with file_path.open("rb") as f: 

436 digest = hash_fct(f, 65535) 

437 return digest == file_hash if full_match else digest.startswith(file_hash) 

438 

439 

440def compute_crc( 

441 file_path: Union[str, PathLike[str]], 

442 hash_fct: Callable[[Any, int], str] = sha256_hash, 

443) -> str: 

444 """Returns the CRC of a file.""" 

445 file_path = Path(file_path) 

446 with file_path.open("rb") as f: 

447 return hash_fct(f, 65535) 

448 

449 

450def _infer_filename_from_urls(urls=Union[list[str], str]) -> str: 

451 """Retrieves the remote filename from the URLs. 

452 

453 Parameters 

454 ---------- 

455 urls 

456 One or multiple URLs pointing to files with the same name. 

457 

458 Returns 

459 ------- 

460 The remote file name. 

461 

462 Raises 

463 ------ 

464 ValueError 

465 When urls point to files with different names. 

466 """ 

467 if isinstance(urls, str): 

468 return urls.split("/")[-1] 

469 

470 # Check that all urls point to the same file name 

471 names = [u.split("/")[-1] for u in urls] 

472 if not all(n == names[0] for n in names): 

473 raise ValueError( 

474 "Cannot infer file name when urls point to different files " 

475 f"({names=})." 

476 ) 

477 return urls[0].split("/")[-1] 

478 

479 

480def download_file( 

481 urls: Union[list[str], str], 

482 destination_directory: Union[str, PathLike[str], None] = None, 

483 destination_sub_directory: Union[str, None] = None, 

484 destination_filename: Union[str, None] = None, 

485 checksum: Union[str, None] = None, 

486 checksum_fct: Callable[[Any, int], str] = sha256_hash, 

487 force: bool = False, 

488 extract: bool = False, 

489 makedirs: bool = True, 

490 checksum_mismatch_download_attempts: int = 2, 

491) -> Path: 

492 """Downloads a remote file locally. 

493 

494 This will overwrite any existing file with the same name. 

495 

496 Parameters 

497 ---------- 

498 urls 

499 The remote location of the server. If multiple addresses are given, we 

500 will try to download from them in order until one succeeds. 

501 destination_directory 

502 A path to a local directory where the file will be saved. If omitted, 

503 the file will be saved in the folder pointed by the ``bob_data_dir`` key 

504 in the user configuration. 

505 destination_sub_directory 

506 An additional layer added to the destination directory (useful when 

507 using ``destination_directory=None``). 

508 destination_filename 

509 The final name of the local file. If omitted, the file will keep the 

510 name of the remote file. 

511 checksum 

512 When provided, will compute the file's checksum and compare to this. 

513 force 

514 Re-download and overwrite any existing file with the same name. 

515 extract 

516 Extract an archive or zip file next to the downloaded file. 

517 If this is set, the parent directory path will be returned. 

518 makedirs 

519 Automatically make the parent directories of the new local file. 

520 checksum_mismatch_download_attempts 

521 Number of download attempts when the checksum does not match after 

522 downloading, must be 1 or more. 

523 

524 Returns 

525 ------- 

526 The path to the new local file (or the parent directory if ``extract`` is 

527 True). 

528 

529 Raises 

530 ------ 

531 RuntimeError 

532 When the URLs provided are all invalid. 

533 ValueError 

534 - When ``destination_filename`` is omitted and URLs point to files with 

535 different names. 

536 - When the checksum of the file does not correspond to the provided 

537 ``checksum``. 

538 """ 

539 

540 if destination_filename is None: 

541 destination_filename = _infer_filename_from_urls(urls=urls) 

542 

543 if destination_directory is None: 

544 destination_directory = _get_local_data_directory() 

545 

546 destination_directory = Path(destination_directory) 

547 

548 if destination_sub_directory is not None: 

549 destination_directory = ( 

550 destination_directory / destination_sub_directory 

551 ) 

552 

553 if checksum_mismatch_download_attempts < 1: 

554 logger.warning( 

555 "'Checksum_mismatch_download_attempts' must be greater than 0 " 

556 "(got %d). Setting it to 1.", 

557 checksum_mismatch_download_attempts, 

558 ) 

559 checksum_mismatch_download_attempts = 1 

560 

561 local_file = destination_directory / destination_filename 

562 needs_download = True 

563 

564 if force or not local_file.is_file(): 

565 if not force: 

566 logger.info(f"File {local_file} is not present. Needs download.") 

567 needs_download = True 

568 elif local_file.is_file(): 

569 file_ok = verify_file(local_file, checksum, hash_fct=checksum_fct) 

570 if not file_ok: 

571 logger.info( 

572 f"File {local_file} does not checksum to '{checksum=}'." 

573 ) 

574 needs_download = True 

575 elif not force and checksum is not None and file_ok: 

576 logger.info(f"File {local_file} already exists, skipping download.") 

577 needs_download = False 

578 

579 if needs_download: 

580 for current_download_try in range(checksum_mismatch_download_attempts): 

581 if isinstance(urls, str): 

582 urls = [urls] 

583 

584 for tries, url in enumerate(urls): 

585 logger.debug("Retrieving file from '%s'.", url) 

586 try: 

587 response = requests.get(url=url, timeout=10) 

588 except requests.exceptions.ConnectionError as e: 

589 if tries < len(urls) - 1: 

590 logger.info( 

591 "Could not connect to %s. Trying other URLs.", 

592 url, 

593 ) 

594 logger.debug(e) 

595 continue 

596 

597 logger.debug( 

598 "http response: '%d: %s'.", 

599 response.status_code, 

600 response.reason, 

601 ) 

602 

603 if response.ok: 

604 logger.debug("Got file from %s.", url) 

605 break 

606 elif tries < len(urls) - 1: 

607 logger.info( 

608 "Failed to get file from %s, trying other URLs.", url 

609 ) 

610 logger.debug("requests.response was:\n%s", response) 

611 else: 

612 raise RuntimeError( 

613 "Could not retrieve file from any of the provided URLs! " 

614 f"({urls=})" 

615 ) 

616 

617 if makedirs: 

618 local_file.parent.mkdir(parents=True, exist_ok=True) 

619 

620 with local_file.open("wb") as f: 

621 f.write(response.content) 

622 

623 if checksum is not None: 

624 if not verify_file(local_file, checksum, hash_fct=checksum_fct): 

625 if not needs_download: 

626 raise ValueError( 

627 f"The local file hash does not correspond to '{checksum}' " 

628 f"and {force=} prevents overwriting." 

629 ) 

630 raise ValueError( 

631 "The downloaded file hash ('" 

632 f"{compute_crc(local_file, hash_fct=checksum_fct)}') does not " 

633 f"correspond to '{checksum}'." 

634 ) 

635 

636 if extract: 

637 # Extract only if the file was re-downloaded 

638 if needs_download: 

639 local_file = extract_archive(local_file) 

640 else: 

641 # Mimic the behavior of extract_archive 

642 local_file = local_file.parent 

643 

644 return local_file