Coverage for src/bob/bio/base/pipelines/entry_points.py: 69%

185 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-14 21:41 +0100

1import glob 

2import logging 

3import os 

4import pickle 

5import random 

6 

7from typing import Optional, Union 

8 

9import dask.bag 

10import dask.distributed 

11 

12from dask.delayed import Delayed 

13from sklearn.pipeline import Pipeline 

14 

15from bob.bio.base.pipelines import ( 

16 BioAlgDaskWrapper, 

17 CSVScoreWriter, 

18 Database, 

19 FourColumnsScoreWriter, 

20 PipelineScoreNorm, 

21 PipelineSimple, 

22 TNormScores, 

23 ZNormScores, 

24 checkpoint_pipeline_simple, 

25 dask_bio_pipeline, 

26 is_biopipeline_checkpointed, 

27) 

28from bob.pipelines import ( 

29 DaskWrapper, 

30 estimator_requires_fit, 

31 is_instance_nested, 

32 wrap, 

33) 

34from bob.pipelines.distributed import dask_get_partition_size 

35from bob.pipelines.distributed.sge import SGEMultipleQueuesCluster 

36 

37logger = logging.getLogger(__name__) 

38 

39 

40def compute_scores(result, dask_client): 

41 if isinstance(result, Delayed) or isinstance(result, dask.bag.Bag): 

42 if dask_client is not None: 

43 result = result.compute(scheduler=dask_client) 

44 else: 

45 logger.warning( 

46 "`dask_client` not set. Your pipeline will run locally" 

47 ) 

48 result = result.compute(scheduler="single-threaded") 

49 return result 

50 

51 

52def post_process_scores(pipeline, scores, path): 

53 written_scores = pipeline.write_scores(scores) 

54 return pipeline.post_process(written_scores, path) 

55 

56 

57def execute_pipeline_simple( 

58 pipeline, 

59 database, 

60 dask_client, 

61 groups, 

62 output, 

63 write_metadata_scores, 

64 checkpoint, 

65 dask_n_partitions, 

66 dask_partition_size, 

67 dask_n_workers, 

68 checkpoint_dir=None, 

69 force=False, 

70): 

71 """ 

72 Function that executes the PipelineSimple. 

73 

74 This is called when using the ``bob bio pipeline simple`` 

75 command. 

76 

77 This is also callable from a script without fear of interrupting the running 

78 Dask instance, allowing chaining multiple experiments while keeping the 

79 workers alive. 

80 

81 When using Dask, something to keep in mind is that we want to split our data and 

82 processing time on multiple workers. There is no recipe to make everything work on 

83 any system. So if you encounter some balancing error (a few of all the available 

84 workers actually working while the rest waits, or the scheduler being overloaded 

85 trying to organize millions of tiny tasks), you can specify ``dask_n_partitions`` 

86 or ``dask_partition_size``. 

87 The first will try to split any set of data into a number of chunks (ideally, we 

88 would want one per worker), and the second creates similar-sized partitions in each 

89 set. 

90 If the memory on the workers is not sufficient, try reducing the size of the 

91 partitions (or increasing the number of partitions). 

92 

93 Parameters 

94 ---------- 

95 

96 pipeline: Instance of :py:class:`bob.bio.base.pipelines.PipelineSimple` 

97 A constructed PipelineSimple object. 

98 

99 database: Instance of :py:class:`bob.bio.base.pipelines.abstract_class.Database` 

100 A database interface instance 

101 

102 dask_client: instance of :py:class:`dask.distributed.Client` or ``None`` 

103 A Dask client instance used to run the experiment in parallel on multiple 

104 machines, or locally. 

105 Basic configs can be found in ``bob.pipelines.config.distributed``. 

106 

107 dask_n_partitions: int or None 

108 Specifies a number of partitions to split the data into. 

109 

110 dask_partition_size: int or None 

111 Specifies a data partition size when using dask. Ignored when dask_n_partitions 

112 is set. 

113 

114 dask_n_workers: int or None 

115 Sets the starting number of Dask workers. Does not prevent Dask from requesting 

116 more or releasing workers depending on load. 

117 

118 groups: list of str 

119 Groups of the dataset that will be requested from the database interface. 

120 

121 output: str 

122 Path where the scores will be saved. 

123 

124 write_metadata_scores: bool 

125 Use the CSVScoreWriter instead of the FourColumnScoreWriter when True. 

126 

127 checkpoint: bool 

128 Whether checkpoint files will be created for every step of the pipelines. 

129 

130 checkpoint_dir: str 

131 If `checkpoint` is set, this path will be used to save the checkpoints. 

132 If `None`, the content of `output` will be used. 

133 

134 force: bool 

135 If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set 

136 """ 

137 if not os.path.exists(output): 

138 os.makedirs(output, exist_ok=True) 

139 

140 # Setting the `checkpoint_dir` 

141 if checkpoint_dir is None: 

142 checkpoint_dir = output 

143 else: 

144 os.makedirs(checkpoint_dir, exist_ok=True) 

145 

146 # Scores are written on `output` 

147 if write_metadata_scores: 

148 pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp")) 

149 else: 

150 pipeline.score_writer = FourColumnsScoreWriter( 

151 os.path.join(output, "./tmp") 

152 ) 

153 

154 # Checkpoint if it's not already checkpointed 

155 if checkpoint and not is_biopipeline_checkpointed(pipeline): 

156 hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None 

157 pipeline = checkpoint_pipeline_simple( 

158 pipeline, checkpoint_dir, hash_fn=hash_fn, force=force 

159 ) 

160 

161 # Load the background model samples only if the transformer requires fitting 

162 if estimator_requires_fit(pipeline.transformer): 

163 background_model_samples = database.background_model_samples() 

164 else: 

165 background_model_samples = [] 

166 

167 for group in groups: 

168 score_file_name = os.path.join( 

169 output, 

170 f"scores-{group}" + (".csv" if write_metadata_scores else ""), 

171 ) 

172 biometric_references = database.references(group=group) 

173 probes = database.probes(group=group) 

174 

175 # If there's no data to be processed, continue 

176 if len(biometric_references) == 0 or len(probes) == 0: 

177 logger.warning( 

178 f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed." 

179 ) 

180 continue 

181 

182 if dask_client is not None and not is_instance_nested( 

183 pipeline.biometric_algorithm, 

184 "biometric_algorithm", 

185 BioAlgDaskWrapper, 

186 ): 

187 # Scaling up 

188 if dask_n_workers is not None and not isinstance(dask_client, str): 

189 dask_client.cluster.scale(dask_n_workers) 

190 

191 # Data partitioning. 

192 # - Too many small partitions: the scheduler takes more time scheduling 

193 # than the computations. 

194 # - Too few big partitions: We don't use all the available workers and thus 

195 # run slower. 

196 if dask_partition_size is not None: 

197 logger.debug( 

198 f"Splitting data with fixed size partitions: {dask_partition_size}." 

199 ) 

200 pipeline = dask_bio_pipeline( 

201 pipeline, 

202 partition_size=dask_partition_size, 

203 ) 

204 elif dask_n_partitions is not None or dask_n_workers is not None: 

205 # Divide each Set in a user-defined number of partitions 

206 n_partitions = dask_n_partitions or dask_n_workers 

207 logger.debug( 

208 f"Splitting data with fixed number of partitions: {n_partitions}." 

209 ) 

210 pipeline = dask_bio_pipeline(pipeline, npartitions=n_partitions) 

211 else: 

212 # Split in max_jobs partitions or revert to the default behavior of 

213 # dask.Bag from_sequence: partition_size = 100 

214 n_jobs = None 

215 if not isinstance(dask_client, str) and isinstance( 

216 dask_client.cluster, SGEMultipleQueuesCluster 

217 ): 

218 logger.debug( 

219 "Splitting data according to the number of available workers." 

220 ) 

221 n_jobs = dask_client.cluster.sge_job_spec["default"][ 

222 "max_jobs" 

223 ] 

224 logger.debug(f"{n_jobs} partitions will be created.") 

225 pipeline = dask_bio_pipeline(pipeline, npartitions=n_jobs) 

226 

227 logger.info(f"Running the PipelineSimple for group {group}") 

228 score_all_vs_all = ( 

229 database.score_all_vs_all 

230 if hasattr(database, "score_all_vs_all") 

231 else False 

232 ) 

233 

234 result = pipeline( 

235 background_model_samples, 

236 biometric_references, 

237 probes, 

238 score_all_vs_all=score_all_vs_all, 

239 ) 

240 

241 post_processed_scores = post_process_scores( 

242 pipeline, result, score_file_name 

243 ) 

244 compute_scores(post_processed_scores, dask_client) 

245 

246 

247def execute_pipeline_score_norm( 

248 pipeline, 

249 database, 

250 dask_client, 

251 groups, 

252 output, 

253 write_metadata_scores, 

254 checkpoint, 

255 dask_partition_size, 

256 dask_n_workers, 

257 checkpoint_dir=None, 

258 top_norm=False, 

259 top_norm_score_fraction=0.8, 

260 score_normalization_type="znorm", 

261 force=False, 

262): 

263 """ 

264 Function that extends the capabilities of the PipelineSimple to run score normalization. 

265 

266 This is called when using the ``bob bio pipeline score-norm`` command. 

267 

268 This is also callable from a script without fear of interrupting the running 

269 Dask instance, allowing chaining multiple experiments while keeping the 

270 workers alive. 

271 

272 Parameters 

273 ---------- 

274 

275 pipeline: Instance of :py:class:`bob.bio.base.pipelines.PipelineSimple` 

276 A constructed PipelineSimple object. 

277 

278 database: Instance of :py:class:`bob.bio.base.pipelines.abstract_class.Database` 

279 A database interface instance 

280 

281 dask_client: instance of :py:class:`dask.distributed.Client` or ``None`` 

282 A Dask client instance used to run the experiment in parallel on multiple machines, or locally. Basic configs can be found in ``bob.pipelines.config.distributed``. 

283 

284 groups: list of str 

285 Groups of the dataset that will be requested from the database interface. 

286 

287 output: str 

288 Path where the results and checkpoints will be saved to. 

289 

290 write_metadata_scores: bool 

291 Use the CSVScoreWriter instead of the FourColumnScoreWriter when True. 

292 

293 checkpoint: bool 

294 Whether checkpoint files will be created for every step of the pipelines. 

295 

296 dask_partition_size: int 

297 If using Dask, this option defines the size of each dask.bag.partition. Use this option if the current heuristic that sets this value doesn't suit your experiment. (https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence). 

298 

299 dask_n_workers: int 

300 If using Dask, this option defines the number of workers to start your experiment. Dask automatically scales up/down the number of workers due to the current load of tasks to be solved. Use this option if the current amount of workers set to start an experiment doesn't suit you. 

301 

302 top_norm: bool 

303 

304 top_norm_score_fraction: float 

305 Sets the percentage of samples used for t-norm and z-norm. Sometimes you don't want to use all the t/z samples for normalization 

306 

307 checkpoint_dir: str 

308 If `checkpoint` is set, this path will be used to save the checkpoints. 

309 If `None`, the content of `output` will be used. 

310 

311 """ 

312 

313 if not os.path.exists(output): 

314 os.makedirs(output, exist_ok=True) 

315 

316 # Setting the `checkpoint_dir` 

317 if checkpoint_dir is None: 

318 checkpoint_dir = output 

319 else: 

320 os.makedirs(checkpoint_dir, exist_ok=True) 

321 

322 # Scores are written on `output` 

323 if write_metadata_scores: 

324 pipeline.score_writer = CSVScoreWriter(os.path.join(output, "./tmp")) 

325 else: 

326 pipeline.score_writer = FourColumnsScoreWriter( 

327 os.path.join(output, "./tmp") 

328 ) 

329 

330 # Check if it's already checkpointed 

331 if checkpoint and not is_biopipeline_checkpointed(pipeline): 

332 pipeline = checkpoint_pipeline_simple( 

333 pipeline, checkpoint_dir, force=force 

334 ) 

335 

336 # PICKING THE TYPE OF POST-PROCESSING 

337 if score_normalization_type == "znorm": 

338 post_processor = ZNormScores( 

339 top_norm=top_norm, 

340 top_norm_score_fraction=top_norm_score_fraction, 

341 ) 

342 elif score_normalization_type == "tnorm": 

343 post_processor = TNormScores( 

344 top_norm=top_norm, 

345 top_norm_score_fraction=top_norm_score_fraction, 

346 ) 

347 else: 

348 raise ValueError( 

349 f"score_normalization_type {score_normalization_type} is not valid" 

350 ) 

351 

352 if checkpoint and not is_biopipeline_checkpointed(post_processor): 

353 score_stats_path = os.path.join( 

354 checkpoint_dir, 

355 f"{score_normalization_type}-scores", 

356 "norm", 

357 "stats", 

358 ) 

359 # we cannot checkpoint "features" because sample.keys are not unique. 

360 post_processor = wrap( 

361 ["checkpoint"], 

362 post_processor, 

363 model_path=score_stats_path, 

364 force=force, 

365 ) 

366 

367 pipeline = PipelineScoreNorm(pipeline, post_processor) 

368 

369 background_model_samples = database.background_model_samples() 

370 

371 # treferences = database.treferences(proportion=ztnorm_cohort_proportion) 

372 for group in groups: 

373 # Changing the score normalization stats file name as a function of the group 

374 if checkpoint and not is_biopipeline_checkpointed(post_processor): 

375 post_processor.model_path = f"{score_stats_path}_{group}.pkl" 

376 

377 if score_normalization_type == "znorm": 

378 score_normalization_samples = database.zprobes(group=group) 

379 elif score_normalization_type == "tnorm": 

380 score_normalization_samples = database.treferences() 

381 

382 score_file_name = os.path.join(output, f"scores-{group}") 

383 

384 biometric_references = database.references(group=group) 

385 probes = database.probes(group=group) 

386 

387 # If there's no data to be processed, continue 

388 if len(biometric_references) == 0 or len(probes) == 0: 

389 logger.warning( 

390 f"Current dataset ({database}) does not have `{group}` set. The experiment will not be executed." 

391 ) 

392 continue 

393 

394 if dask_client is not None and not is_instance_nested( 

395 pipeline.biometric_algorithm, 

396 "biometric_algorithm", 

397 BioAlgDaskWrapper, 

398 ): 

399 # Scaling up 

400 if dask_n_workers is not None and not isinstance(dask_client, str): 

401 dask_client.cluster.scale(dask_n_workers) 

402 

403 n_objects = max( 

404 len(background_model_samples), 

405 len(biometric_references), 

406 len(probes), 

407 ) 

408 partition_size = None 

409 if not isinstance(dask_client, str): 

410 partition_size = dask_get_partition_size( 

411 dask_client.cluster, n_objects 

412 ) 

413 if dask_partition_size is not None: 

414 partition_size = dask_partition_size 

415 

416 pipeline = dask_bio_pipeline( 

417 pipeline, 

418 partition_size=partition_size, 

419 ) 

420 

421 logger.info(f"Running PipelineSimple for group {group}") 

422 score_all_vs_all = ( 

423 database.score_all_vs_all 

424 if hasattr(database, "score_all_vs_all") 

425 else False 

426 ) 

427 

428 ( 

429 raw_scores, 

430 score_normed_scores, 

431 ) = pipeline( 

432 background_model_samples, 

433 biometric_references, 

434 probes, 

435 score_normalization_samples, 

436 score_all_vs_all=score_all_vs_all, 

437 ) 

438 

439 def _build_filename(score_file_name, suffix): 

440 return os.path.join(score_file_name, suffix) 

441 

442 # Running RAW_SCORES 

443 

444 raw_scores = post_process_scores( 

445 pipeline, 

446 raw_scores, 

447 _build_filename(score_file_name, "raw_scores.csv"), 

448 ) 

449 _ = compute_scores(raw_scores, dask_client) 

450 

451 # Z-SCORES 

452 score_normed_scores = post_process_scores( 

453 pipeline, 

454 score_normed_scores, 

455 _build_filename(score_file_name, f"{score_normalization_type}.csv"), 

456 ) 

457 _ = compute_scores(score_normed_scores, dask_client) 

458 

459 # T-SCORES 

460 """ 

461 t_normed_scores = post_process_scores( 

462 pipeline, 

463 t_normed_scores, 

464 _build_filename(score_file_name, "t_normed_scores.csv"), 

465 ) 

466 _ = compute_scores(t_normed_scores, dask_client) 

467 

468 # S-SCORES 

469 s_normed_scores = post_process_scores( 

470 pipeline, 

471 s_normed_scores, 

472 _build_filename(score_file_name, "s_normed_scores.csv"), 

473 ) 

474 _ = compute_scores(s_normed_scores, dask_client) 

475 

476 # ZT-SCORES 

477 zt_normed_scores = post_process_scores( 

478 pipeline, 

479 zt_normed_scores, 

480 _build_filename(score_file_name, "zt_normed_scores.csv"), 

481 ) 

482 _ = compute_scores(zt_normed_scores, dask_client) 

483 """ 

484 

485 

486def execute_pipeline_train( 

487 pipeline: Union[PipelineSimple, Pipeline], 

488 database: Database, 

489 dask_client: Optional[dask.distributed.Client] = None, 

490 output: str = "./results", 

491 checkpoint: bool = True, 

492 dask_n_partitions: Optional[int] = None, 

493 dask_partition_size: Optional[int] = None, 

494 dask_n_workers: Optional[int] = None, 

495 checkpoint_dir: Optional[str] = None, 

496 force: bool = False, 

497 split_training: bool = False, 

498 n_splits: int = 3, 

499 **kwargs, 

500): 

501 """Executes only the training part of a pipeline. 

502 

503 When running from a script, use this function instead of the click command in 

504 ``bob.bio.base.script.pipeline_train``. 

505 

506 Parameters 

507 ---------- 

508 

509 pipeline: 

510 A constructed ``PipelineSimple`` object (the ``transformer`` will be extracted 

511 for training), or an ``sklearn.Pipeline``. 

512 

513 database: 

514 A database interface instance 

515 

516 dask_client: 

517 A Dask client instance used to run the experiment in parallel on multiple 

518 machines, or locally. 

519 Basic configs can be found in ``bob.pipelines.config.distributed``. 

520 

521 dask_n_partitions: 

522 Specifies a number of partitions to split the data into. 

523 

524 dask_partition_size: 

525 Specifies a data partition size when using dask. Ignored when dask_n_partitions 

526 is set. 

527 

528 dask_n_workers: 

529 Sets the starting number of Dask workers. Does not prevent Dask from requesting 

530 more or releasing workers depending on load. 

531 

532 output: 

533 Path where the scores will be saved. 

534 

535 checkpoint: 

536 Whether checkpoint files will be created for every step of the pipelines. 

537 

538 checkpoint_dir: 

539 If `checkpoint` is set, this path will be used to save the checkpoints. 

540 If `None`, the content of `output` will be used. 

541 

542 force: 

543 If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set 

544 

545 split_training: 

546 If set, the background model will be trained on multiple partitions of the data. 

547 

548 n_splits: 

549 Number of splits to use when splitting the data. 

550 """ 

551 

552 logger.debug(f"Unused arguments: {kwargs=}") 

553 if not os.path.exists(output): 

554 os.makedirs(output, exist_ok=True) 

555 

556 # Setting the `checkpoint_dir` 

557 if checkpoint_dir is None: 

558 checkpoint_dir = output 

559 else: 

560 os.makedirs(checkpoint_dir, exist_ok=True) 

561 

562 if isinstance(pipeline, PipelineSimple): 

563 pipeline = pipeline.transformer 

564 

565 # Checkpoint (only features, not the model) 

566 if checkpoint: 

567 hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None 

568 wrap( 

569 ["checkpoint"], 

570 pipeline, 

571 features_dir=checkpoint_dir, 

572 model_path=None, 

573 hash_fn=hash_fn, 

574 force=force, 

575 ) 

576 

577 if not estimator_requires_fit(pipeline): 

578 raise ValueError( 

579 "Estimator does not require fitting. No training necessary." 

580 ) 

581 

582 background_model_samples = database.background_model_samples() 

583 

584 if dask_client is not None: 

585 # Scaling up 

586 if dask_n_workers is not None and not isinstance(dask_client, str): 

587 dask_client.cluster.scale(dask_n_workers) 

588 

589 if dask_partition_size is not None: 

590 logger.debug( 

591 f"Splitting data with fixed size partitions: {dask_partition_size}." 

592 ) 

593 pipeline = wrap( 

594 ["dask"], pipeline, partition_size=dask_partition_size 

595 ) 

596 elif dask_n_partitions is not None or dask_n_workers is not None: 

597 # Divide each Set in a user-defined number of partitions 

598 n_partitions = dask_n_partitions or dask_n_workers 

599 logger.debug( 

600 f"Splitting data with fixed number of partitions: {n_partitions}." 

601 ) 

602 pipeline = wrap(["dask"], pipeline, npartitions=n_partitions) 

603 else: 

604 # Split in max_jobs partitions or revert to the default behavior of 

605 # dask.Bag from_sequence: partition_size = 100 

606 n_jobs = None 

607 if not isinstance(dask_client, str) and isinstance( 

608 dask_client.cluster, SGEMultipleQueuesCluster 

609 ): 

610 logger.debug( 

611 "Splitting data according to the number of available workers." 

612 ) 

613 n_jobs = dask_client.cluster.sge_job_spec["default"]["max_jobs"] 

614 logger.debug(f"{n_jobs} partitions will be created.") 

615 pipeline = wrap(["dask"], pipeline, npartitions=n_jobs) 

616 

617 logger.info("Running the pipeline training") 

618 if split_training: 

619 start_step = -1 

620 # Look at step files, and assess if we can load the last one 

621 for step_file in glob.glob( 

622 os.path.join(output, "train_pipeline_step_*.pkl") 

623 ): 

624 to_rem = os.path.join(output, "train_pipeline_step_") 

625 file_step = int(step_file.replace(to_rem, "").replace(".pkl", "")) 

626 start_step = max(start_step, file_step) 

627 if start_step > -1: 

628 logger.debug("Found pipeline training step. Loading it.") 

629 last_step_file = os.path.join( 

630 output, f"train_pipeline_step_{start_step}.pkl" 

631 ) 

632 with open(last_step_file, "rb") as start_file: 

633 pipeline = pickle.load(start_file) 

634 start_step += 1 # Loaded step is i -> training starts a i+1 

635 logger.info(f"Starting from training step {start_step}") 

636 

637 random.seed(0) 

638 random.shuffle(background_model_samples) 

639 

640 for partition_i in range(start_step, n_splits): 

641 logger.info( 

642 f"Training with partition {partition_i} ({partition_i+1}/{n_splits})" 

643 ) 

644 start = len(background_model_samples) // n_splits * partition_i 

645 end = len(background_model_samples) // n_splits * (partition_i + 1) 

646 _ = pipeline.fit(background_model_samples[start:end]) 

647 step_path = os.path.join( 

648 output, f"train_pipeline_step_{partition_i}.pkl" 

649 ) 

650 with open(step_path, "wb") as f: 

651 pickle.dump(pipeline, f) 

652 else: 

653 _ = pipeline.fit(background_model_samples) 

654 

655 # Save each fitted transformer 

656 for transformer_name, transformer in pipeline.steps: 

657 if transformer._get_tags()["requires_fit"]: 

658 if isinstance(transformer, DaskWrapper): 

659 transformer = transformer.estimator 

660 step_path = os.path.join(output, f"{transformer_name}.pkl") 

661 with open(step_path, "wb") as f: 

662 pickle.dump(transformer, f)