Coverage for src/bob/pad/base/script/run_pipeline.py: 95%

94 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-06 21:56 +0100

1"""Executes PAD pipeline""" 

2 

3 

4import logging 

5 

6import click 

7 

8from clapper.click import ( 

9 ConfigCommand, 

10 ResourceOption, 

11 log_parameters, 

12 verbosity_option, 

13) 

14 

15from bob.pipelines.distributed import ( 

16 VALID_DASK_CLIENT_STRINGS, 

17 dask_get_partition_size, 

18) 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23@click.command( 

24 entry_point_group="bob.pad.config", 

25 cls=ConfigCommand, 

26 epilog="""\b 

27 Command line examples\n 

28 ----------------------- 

29 

30 

31 $ bob pad run-pipeline my_experiment.py -vv 

32""", 

33) 

34@click.option( 

35 "--pipeline", 

36 "-p", 

37 required=True, 

38 entry_point_group="bob.pad.pipeline", 

39 help="Feature extraction algorithm", 

40 cls=ResourceOption, 

41) 

42@click.option( 

43 "--decision_function", 

44 "-f", 

45 show_default=True, 

46 default="decision_function", 

47 help="Name of the Pipeline step to call for results, eg. ``predict_proba``", 

48 cls=ResourceOption, 

49) 

50@click.option( 

51 "--database", 

52 "-d", 

53 required=True, 

54 entry_point_group="bob.pad.database", 

55 help="PAD Database connector (class that implements the methods: `fit_samples`, `predict_samples`)", 

56 cls=ResourceOption, 

57) 

58@click.option( 

59 "--dask-client", 

60 "-l", 

61 entry_point_group="dask.client", 

62 string_exceptions=VALID_DASK_CLIENT_STRINGS, 

63 default="single-threaded", 

64 help="Dask client for the execution of the pipeline.", 

65 cls=ResourceOption, 

66) 

67@click.option( 

68 "--group", 

69 "-g", 

70 "groups", 

71 type=click.Choice(["train", "dev", "eval"]), 

72 multiple=True, 

73 default=("dev", "eval"), 

74 help="If given, this value will limit the experiments belonging to a particular group", 

75 cls=ResourceOption, 

76) 

77@click.option( 

78 "-o", 

79 "--output", 

80 show_default=True, 

81 default="results", 

82 help="Saves scores (and checkpoints) in this folder.", 

83 cls=ResourceOption, 

84) 

85@click.option( 

86 "--checkpoint/--memory", 

87 "checkpoint", 

88 default=True, 

89 help="If --checkpoint (which is the default), all steps of the pipeline will be saved. Checkpoints will be saved in `--output`.", 

90 cls=ResourceOption, 

91) 

92@click.option( 

93 "--dask-partition-size", 

94 "-s", 

95 help="If using Dask, this option defines the size of each dask.bag.partition." 

96 "Use this option if the current heuristic that sets this value doesn't suit your experiment." 

97 "(https://docs.dask.org/en/latest/bag-api.html?highlight=partition_size#dask.bag.from_sequence).", 

98 default=None, 

99 type=click.INT, 

100 cls=ResourceOption, 

101) 

102@click.option( 

103 "--dask-n-workers", 

104 "-n", 

105 help="If using Dask, this option defines the number of workers to start your experiment." 

106 "Dask automatically scales up/down the number of workers due to the current load of tasks to be solved." 

107 "Use this option if the current amount of workers set to start an experiment doesn't suit you.", 

108 default=None, 

109 type=click.INT, 

110 cls=ResourceOption, 

111) 

112@click.option( 

113 "--no-dask", 

114 is_flag=True, 

115 help="If set, it will not use Dask for the execution of the pipeline.", 

116 cls=ResourceOption, 

117) 

118@verbosity_option(cls=ResourceOption, logger=logger) 

119def run_pipeline( 

120 pipeline, 

121 decision_function, 

122 database, 

123 dask_client, 

124 groups, 

125 output, 

126 checkpoint, 

127 dask_partition_size, 

128 dask_n_workers, 

129 no_dask, 

130 **kwargs, 

131): 

132 """Runs the simplest PAD pipeline.""" 

133 

134 log_parameters(logger) 

135 

136 execute_pipeline( 

137 pipeline=pipeline, 

138 database=database, 

139 decision_function=decision_function, 

140 output=output, 

141 groups=groups, 

142 checkpoint=checkpoint, 

143 dask_client=dask_client, 

144 dask_partition_size=dask_partition_size, 

145 dask_n_workers=dask_n_workers, 

146 no_dask=no_dask, 

147 ) 

148 

149 

150def execute_pipeline( 

151 pipeline, 

152 database, 

153 decision_function="decision_function", 

154 output="results", 

155 groups=("dev", "eval"), 

156 checkpoint=False, 

157 dask_client="single-threaded", 

158 dask_partition_size=None, 

159 dask_n_workers=None, 

160 no_dask=False, 

161): 

162 import os 

163 import sys 

164 

165 import dask.bag 

166 

167 import bob.pipelines as mario 

168 

169 from bob.pipelines import DaskWrapper, is_pipeline_wrapped 

170 from bob.pipelines.distributed.sge import get_resource_requirements 

171 

172 if no_dask: 

173 dask_client = None 

174 

175 os.makedirs(output, exist_ok=True) 

176 

177 if checkpoint: 

178 pipeline = mario.wrap( 

179 ["checkpoint"], pipeline, features_dir=output, model_path=output 

180 ) 

181 

182 # Fetching samples 

183 fit_samples = database.fit_samples() 

184 total_samples = len(fit_samples) 

185 predict_samples = dict() 

186 for group in groups: 

187 predict_samples[group] = database.predict_samples(group=group) 

188 total_samples += len(predict_samples[group]) 

189 

190 # Checking if the pipeline is dask-wrapped 

191 if ( 

192 not any(is_pipeline_wrapped(pipeline, DaskWrapper)) 

193 ) and dask_client is not None: 

194 # Scaling up if necessary 

195 if dask_n_workers is not None and not isinstance(dask_client, str): 

196 dask_client.cluster.scale(dask_n_workers) 

197 

198 # Defining the partition size 

199 partition_size = None 

200 if not isinstance(dask_client, str): 

201 lower_bound = 1 # lower bound of 1 video per chunk since usually video are already big 

202 partition_size = dask_get_partition_size( 

203 dask_client.cluster, total_samples, lower_bound=lower_bound 

204 ) 

205 if dask_partition_size is not None: 

206 partition_size = dask_partition_size 

207 

208 pipeline = mario.wrap(["dask"], pipeline, partition_size=partition_size) 

209 

210 # create an experiment info file 

211 with open(os.path.join(output, "Experiment_info.txt"), "wt") as f: 

212 f.write(f"{sys.argv!r}\n") 

213 f.write(f"database={database!r}\n") 

214 f.write("Pipeline steps:\n") 

215 for i, name, estimator in pipeline._iter(): 

216 f.write(f"Step {i}: {name}\n{estimator!r}\n") 

217 

218 # train the pipeline 

219 pipeline.fit(fit_samples) 

220 

221 for group in groups: 

222 logger.info(f"Running PAD pipeline for group {group}") 

223 result = getattr(pipeline, decision_function)(predict_samples[group]) 

224 

225 resources = None 

226 if isinstance(result, dask.bag.core.Bag): 

227 resources = get_resource_requirements(pipeline) 

228 

229 save_sample_scores( 

230 result=result, 

231 output=output, 

232 group=group, 

233 dask_client=dask_client, 

234 resources=resources, 

235 ) 

236 

237 logger.info("PAD experiment finished!") 

238 

239 

240def _get_csv_columns(sample): 

241 """Returns a dict of {csv_column_name: sample_attr_name} given a sample.""" 

242 # Mandatory columns and their corresponding fields 

243 columns_attr = { 

244 "claimed_id": "subject", 

245 "test_label": "key", 

246 "is_bonafide": "is_bonafide", 

247 "attack_type": "attack_type", 

248 "score": "data", 

249 } 

250 # Preventing duplicates and unwanted data 

251 ignored_fields = list(columns_attr.values()) + ["annotations"] 

252 # Retrieving custom metadata attribute names 

253 metadata_fields = [ 

254 k 

255 for k in sample.__dict__.keys() 

256 if not k.startswith("_") and k not in ignored_fields 

257 ] 

258 for field in metadata_fields: 

259 columns_attr[field] = field 

260 return columns_attr 

261 

262 

263def sample_to_dict_row(sample, columns_fields): 

264 row_values = { 

265 col: getattr(sample, attr, None) for col, attr in columns_fields.items() 

266 } 

267 return row_values 

268 

269 

270def score_samples_to_dataframe(samples): 

271 import pandas as pd 

272 

273 rows, column_fields = [], None 

274 for sample in samples: 

275 if column_fields is None: 

276 column_fields = _get_csv_columns(sample) 

277 row_values = sample_to_dict_row(sample, column_fields) 

278 rows.append(row_values) 

279 df = pd.DataFrame(rows) 

280 return df 

281 

282 

283def save_sample_scores( 

284 result, 

285 output, 

286 group, 

287 dask_client, 

288 resources=None, 

289): 

290 import os 

291 

292 import dask.bag 

293 import dask.dataframe as dd 

294 

295 scores_path = os.path.join(output, f"scores-{group}.csv") 

296 

297 if isinstance(result, dask.bag.core.Bag): 

298 # convert score samples to dataframes 

299 result = result.map_partitions(score_samples_to_dataframe) 

300 result = dd.from_delayed(result.to_delayed()) 

301 result.to_csv( 

302 scores_path, 

303 single_file=True, 

304 compute_kwargs=dict(scheduler=dask_client, resources=resources), 

305 index=False, 

306 ) 

307 

308 else: 

309 # convert score samples to dataframes 

310 result = score_samples_to_dataframe(result) 

311 result.to_csv(scores_path, index=False)