Coverage for src/bob/bio/base/script/pipeline_transform.py: 0%

33 statements  

« prev     ^ index     » next       coverage.py v7.6.5, created at 2024-11-14 21:41 +0100

1#!/usr/bin/env python 

2# vim: set fileencoding=utf-8 : 

3# Tiago de Freitas Pereira <tiago.pereira@idiap.ch> 

4 

5 

6import logging 

7 

8import click 

9 

10from clapper.click import ConfigCommand, ResourceOption, verbosity_option 

11 

12from bob.pipelines import is_pipeline_wrapped 

13from bob.pipelines.distributed import ( 

14 VALID_DASK_CLIENT_STRINGS, 

15 dask_get_partition_size, 

16) 

17 

18logger = logging.getLogger(__name__) 

19from bob.pipelines.wrappers import CheckpointWrapper, DaskWrapper, wrap 

20 

21EPILOG = """\b 

22Command line examples\n 

23---------------------\n 

24 

25 

26Follow below an example on how to extract arcface features from a database:\n 

27 

28`bob bio transform my_database iresnet100 -vv`\n\n 

29 

30To "dask" the execution of the pipeline, you can use the `--dask-client` option.\n 

31In the example below we show how to use the `--dask-client` option to start a dask cluster on SGE.\n 

32 

33`bob bio transform my_database iresnet100 --dask-client sge -vv`\n\n 

34 

35\b 

36 

37Creating my own transformer\n 

38---------------------------\n 

39 

40This command accepts configuration file as input.\n 

41For example, if you desire to customize your transformer, you can use the following configuration file:\n 

42\b\b 

43 

44 

45```py\n 

46from sklearn.base import BaseEstimator, TransformerMixin \n 

47from sklearn.pipeline import make_pipeline \n 

48from bob.pipelines import wrap \n 

49 

50class MyTransformer(TransformerMixin, BaseEstimator): \n 

51 def _more_tags(self): \n 

52 return {"requires_fit": False} \n 

53 

54 def transform(self, X): \n 

55 # do something \n 

56 return X \n 

57 

58transformer = wrap(["sample"],make_pipeline(MyTransformer())) \n 

59``` 

60 

61Then, you can use above configuration file to run the command: 

62 

63\b 

64`bob bio pipelines transform my_database my_transformer.py --dask-client sge -vv` 

65 

66\b\b 

67 

68Leveraging from FunctionTransformer\n 

69-----------------------------------\n 

70 

71You can also benefit from `FunctionTransformer` to create a transformer 

72 

73```py \n 

74from sklearn.preprocessing import FunctionTransformer \n 

75from sklearn.pipeline import make_pipeline \n 

76from bob.pipelines import wrap \n 

77 

78\b 

79 

80def my_transformer(X): \n 

81 # do something \n 

82 return X \n 

83 

84 

85transformer = wrap(["sample"],make_pipeline(FunctionTransformer(my_transformer))) \n 

86``` 

87 

88Then, you can use above configuration file to run the command:\n 

89 

90`bob bio pipelines transform my_database my_transformer.py --dask-client sge -vv` 

91 

92 

93\b\b 

94""" 

95 

96 

97@click.command( 

98 name="transform", 

99 entry_point_group="bob.bio.config", 

100 cls=ConfigCommand, 

101 epilog=EPILOG, 

102) 

103@click.option( 

104 "--transformer", 

105 "-t", 

106 required=True, 

107 entry_point_group="bob.bio.transformer", 

108 help="A scikit-learn Pipeline containing the set of transformations", 

109 cls=ResourceOption, 

110) 

111@click.option( 

112 "--database", 

113 "-d", 

114 entry_point_group="bob.bio.database", 

115 required=True, 

116 help="Biometric Database connector (class that implements the methods: `background_model_samples`, `references` and `probes`)", 

117 cls=ResourceOption, 

118) 

119@click.option( 

120 "--dask-client", 

121 "-l", 

122 entry_point_group="dask.client", 

123 string_exceptions=VALID_DASK_CLIENT_STRINGS, 

124 default="single-threaded", 

125 help="Dask client for the execution of the pipeline.", 

126 cls=ResourceOption, 

127) 

128@click.option( 

129 "-c", 

130 "--checkpoint-dir", 

131 show_default=True, 

132 default="./checkpoints", 

133 help="Name of output directory where the checkpoints will be saved.", 

134 cls=ResourceOption, 

135) 

136@click.option( 

137 "--force", 

138 "-f", 

139 is_flag=True, 

140 help="If set, it will force generate all the checkpoints of an experiment. This option doesn't work if `--memory` is set", 

141 cls=ResourceOption, 

142) 

143@click.option( 

144 "--dask-partition-size", 

145 "-s", 

146 help="If using Dask, this option defines the size of each dask.bag.partition." 

147 "Use this option if the current heuristic that sets this value doesn't suit your experiment." 

148 "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).", 

149 default=None, 

150 type=click.INT, 

151 cls=ResourceOption, 

152) 

153@verbosity_option(cls=ResourceOption, logger=logger) 

154def pipeline_transform( 

155 transformer, 

156 database, 

157 dask_client, 

158 checkpoint_dir, 

159 force, 

160 dask_partition_size, 

161 **kwargs, 

162): 

163 """ 

164 This CLI command will execute a pipeline (a scikit learn Pipeline) on a given database. 

165 

166 This command can be used, for example, to extract features, face-crops, preprocess audio files and so on. 

167 

168 """ 

169 

170 logger.info(f"Transforming samples from {database}") 

171 

172 # save_func = bob.io.base.save 

173 

174 # Idiap SETUP. This avoids having directories with more than 1000 files/directories 

175 hash_fn = database.hash_fn if hasattr(database, "hash_fn") else None 

176 

177 # If NONE of the items are checkpointed, we checkpoint them all 

178 if not any(is_pipeline_wrapped(transformer, CheckpointWrapper)): 

179 logger.info("Checkpointing it") 

180 transformer = wrap( 

181 ["checkpoint"], 

182 transformer, 

183 features_dir=checkpoint_dir, 

184 hash_fn=hash_fn, 

185 force=force, 

186 ) 

187 else: 

188 # If there is only one item that is checkpointed, we don't need to wrap the pipeline 

189 logger.warning( 

190 f"{transformer}" 

191 f"The pipeline contains elements that are already checkpointed." 

192 "Hence, we are not checkpointing them again." 

193 ) 

194 

195 # Fetching all samples 

196 samples = database.all_samples() 

197 

198 # The number of dasked elements has to be the number of 

199 # elements in the pipeline - 1 (the ToDaskBag doesn't count) 

200 dasked_elements = is_pipeline_wrapped(transformer, DaskWrapper) 

201 

202 if any(dasked_elements): 

203 logger.warning( 

204 "The pipeline is already dasked, hence, we are not dasking it again." 

205 ) 

206 else: 

207 if not isinstance(dask_client, str): 

208 dask_partition_size = ( 

209 dask_get_partition_size( 

210 dask_client.cluster, len(samples), lower_bound=200 

211 ) 

212 if dask_partition_size is None 

213 else dask_partition_size 

214 ) 

215 

216 logger.info( 

217 f"Dask wrapping it with partition size {dask_partition_size}" 

218 ) 

219 transformer = wrap( 

220 ["dask"], transformer, partition_size=dask_partition_size 

221 ) 

222 

223 transformer.transform(samples).compute( 

224 scheduler="single-threaded" if dask_client is None else dask_client 

225 ) 

226 

227 logger.info("Transformation finished !")