Coverage for src/bob/learn/em/kmeans.py: 99%

146 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-06-16 14:34 +0200

1#!/usr/bin/env python 

2# @author: Yannick Dayer <yannick.dayer@idiap.ch> 

3# @date: Tue 27 Jul 2021 11:04:10 UTC+02 

4 

5import logging 

6 

7from typing import Union 

8 

9import dask 

10import dask.array as da 

11import dask.delayed 

12import numpy as np 

13import scipy.spatial.distance 

14 

15from dask_ml.cluster.k_means import k_init 

16from sklearn.base import BaseEstimator 

17 

18from .utils import array_to_delayed_list, check_and_persist_dask_input 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def get_centroids_distance(x: np.ndarray, means: np.ndarray) -> np.ndarray: 

24 """Returns the distance values between x and each cluster's centroid. 

25 

26 The returned values are squared Euclidean distances. 

27 

28 Parameters 

29 ---------- 

30 x: ndarray of shape (n_samples, n_features) 

31 A series of data points. 

32 means: ndarray of shape (n_clusters, n_features) 

33 The centroids. 

34 

35 Returns 

36 ------- 

37 distances: ndarray of shape (n_clusters, n_samples) 

38 For each cluster, the squared Euclidian distance (or distances) to x. 

39 """ 

40 x = np.atleast_2d(x) 

41 if isinstance(x, da.Array): 

42 distances = [] 

43 for i in range(means.shape[0]): 

44 distances.append(np.sum((means[i] - x) ** 2, axis=-1)) 

45 return da.vstack(distances) 

46 else: 

47 return scipy.spatial.distance.cdist(means, x, metric="sqeuclidean") 

48 

49 

50def get_closest_centroid_index(centroids_dist: np.ndarray) -> np.ndarray: 

51 """Returns the index of the closest cluster mean to x. 

52 

53 Parameters 

54 ---------- 

55 centroids_dist: ndarray of shape (n_clusters, n_samples) 

56 The squared Euclidian distance (or distances) to each cluster mean. 

57 

58 Returns 

59 ------- 

60 closest_centroid_indices: ndarray of shape (n_samples,) 

61 The index of the closest cluster mean to x. 

62 """ 

63 return np.argmin(centroids_dist, axis=0) 

64 

65 

66def e_step(data, means): 

67 """Computes the zero-th and first order statistics and average min distance 

68 for each data point. 

69 

70 Parameters 

71 ---------- 

72 data : array-like, shape (n_samples, n_features) 

73 The data. 

74 

75 means : array-like, shape (n_clusters, n_features) 

76 The cluster centers. 

77 

78 

79 Returns 

80 ------- 

81 zeroeth_order_statistics : array-like, shape (n_samples,) 

82 The zero-th order statistics. 

83 first_order_statistics : array-like, shape (n_samples, n_clusters) 

84 The first order statistics. 

85 avg_min_dist : float 

86 """ 

87 n_clusters = len(means) 

88 distances = get_centroids_distance(data, means) 

89 closest_k_indices = get_closest_centroid_index(distances) 

90 zeroeth_order_statistics = np.bincount( 

91 closest_k_indices, minlength=n_clusters 

92 ) 

93 # Compute first_order_statistics in a memory efficient way 

94 first_order_statistics = np.zeros((n_clusters, data.shape[1])) 

95 for i in range(n_clusters): 

96 first_order_statistics[i] = np.sum(data[closest_k_indices == i], axis=0) 

97 min_distance = np.min(distances, axis=0) 

98 average_min_distance = min_distance.mean() 

99 return ( 

100 zeroeth_order_statistics, 

101 first_order_statistics, 

102 average_min_distance, 

103 ) 

104 

105 

106def m_step(stats, n_samples): 

107 """Computes the cluster centers and average minimum distance. 

108 

109 Parameters 

110 ---------- 

111 stats : list 

112 A list which contains the results of the :any:`e_step` function applied 

113 on each chunk of data. 

114 n_samples : int 

115 The total number of samples. 

116 

117 Returns 

118 ------- 

119 means : array-like, shape (n_clusters, n_features) 

120 The cluster centers. 

121 avg_min_dist : float 

122 The average minimum distance. 

123 """ 

124 ( 

125 zeroeth_order_statistics, 

126 first_order_statistics, 

127 average_min_distance, 

128 ) = (0, 0, 0) 

129 for zeroeth_, first_, average_ in stats: 

130 zeroeth_order_statistics += zeroeth_ 

131 first_order_statistics += first_ 

132 average_min_distance += average_ 

133 average_min_distance /= n_samples 

134 

135 means = first_order_statistics / zeroeth_order_statistics[:, None] 

136 return means, average_min_distance 

137 

138 

139def accumulate_indices_means_vars(data, means): 

140 """Accumulates statistics needed to compute weights and variances of the clusters.""" 

141 n_clusters, n_features = len(means), data.shape[1] 

142 dist = get_centroids_distance(data, means) 

143 closest_centroid_indices = get_closest_centroid_index(dist) 

144 # the means_sum and variances_sum must be initialized with zero here since 

145 # they get accumulated in the next function 

146 means_sum = np.zeros((n_clusters, n_features), like=data) 

147 variances_sum = np.zeros((n_clusters, n_features), like=data) 

148 for i in range(n_clusters): 

149 means_sum[i] = np.sum(data[closest_centroid_indices == i], axis=0) 

150 for i in range(n_clusters): 

151 variances_sum[i] = np.sum( 

152 data[closest_centroid_indices == i] ** 2, axis=0 

153 ) 

154 return closest_centroid_indices, means_sum, variances_sum 

155 

156 

157def reduce_indices_means_vars(stats): 

158 """Computes weights and variances of the clusters given the statistics.""" 

159 closest_centroid_indices = [s[0] for s in stats] 

160 means_sum = [s[1] for s in stats] 

161 variances_sum = [s[2] for s in stats] 

162 

163 closest_centroid_indices = np.concatenate(closest_centroid_indices, axis=0) 

164 means_sum = np.sum(means_sum, axis=0) 

165 variances_sum = np.sum(variances_sum, axis=0) 

166 

167 n_clusters = len(means_sum) 

168 weights_count = np.bincount(closest_centroid_indices, minlength=n_clusters) 

169 weights = weights_count / weights_count.sum() 

170 means = means_sum / weights_count[:, None] 

171 variances = (variances_sum / weights_count[:, None]) - (means**2) 

172 

173 return variances, weights 

174 

175 

176class KMeansMachine(BaseEstimator): 

177 """Stores the k-means clusters parameters (centroid of each cluster). 

178 

179 Allows the clustering of data with the ``fit`` method. 

180 

181 The training works in two phases: 

182 - An initialization (setting the initial values of the centroids) 

183 - An e-m loop reducing the total distance between the data points and their 

184 closest centroid. 

185 

186 The initialization can use an iterative process to find the best set of 

187 coordinates, use random starting points, or take specified coordinates. The 

188 ``init_method`` parameter specifies which of these behavior is considered. 

189 

190 Attributes 

191 ---------- 

192 centroids_: ndarray of shape (n_clusters, n_features) 

193 The current clusters centroids. Available after fitting. 

194 """ 

195 

196 def __init__( 

197 self, 

198 n_clusters: int, 

199 init_method: Union[str, np.ndarray] = "k-means||", 

200 convergence_threshold: float = 1e-5, 

201 max_iter: int = 20, 

202 random_state: Union[int, np.random.RandomState] = 0, 

203 init_max_iter: Union[int, None] = 5, 

204 oversampling_factor: float = 2, 

205 **kwargs, 

206 ) -> None: 

207 """ 

208 Parameters 

209 ---------- 

210 n_clusters: int 

211 The number of represented clusters. 

212 init_method: 

213 One of: "random", "k-means++", or "k-means||", or an array with the wanted 

214 starting values of the centroids. 

215 max_iter: 

216 The maximum number of iterations for the e-m part. 

217 random_state: 

218 A seed or RandomState used for the initialization part. 

219 init_max_iter: 

220 The maximum number of iterations for the initialization part. 

221 """ 

222 

223 super().__init__(**kwargs) 

224 

225 if n_clusters < 1: 

226 raise ValueError("The Number of cluster should be greater thant 0.") 

227 self.n_clusters = n_clusters 

228 self.init_method = init_method 

229 self.convergence_threshold = convergence_threshold 

230 self.max_iter = max_iter 

231 self.random_state = random_state 

232 self.init_max_iter = init_max_iter 

233 self.oversampling_factor = oversampling_factor 

234 self.average_min_distance = np.inf 

235 self.zeroeth_order_statistics = None 

236 self.first_order_statistics = None 

237 self.centroids_ = None 

238 

239 @property 

240 def means(self) -> np.ndarray: 

241 """An alias for `centroids_`.""" 

242 return self.centroids_ 

243 

244 @means.setter 

245 def means(self, value: np.ndarray): 

246 self.centroids_ = value 

247 

248 def __eq__(self, obj) -> bool: 

249 return self.is_similar_to(obj, r_epsilon=0, a_epsilon=0) 

250 

251 def is_similar_to(self, obj, r_epsilon=1e-05, a_epsilon=1e-08) -> bool: 

252 if self.centroids_ is not None and obj.centroids_ is not None: 

253 return np.allclose( 

254 self.centroids_, obj.centroids_, rtol=r_epsilon, atol=a_epsilon 

255 ) 

256 else: 

257 logger.warning( 

258 "KMeansMachine `centroids_` was not set. You should call 'fit' first." 

259 ) 

260 return False 

261 

262 def get_variances_and_weights_for_each_cluster(self, data: np.ndarray): 

263 """Returns the clusters variance and weight for data clustered by the machine. 

264 

265 For each cluster, finds the subset of the samples that is closest to that 

266 centroid, and calculates: 

267 1) the variance of that subset (the cluster variance) 

268 2) the proportion of samples represented by that subset (the cluster weight) 

269 

270 Parameters 

271 ---------- 

272 data: 

273 The data to compute the variance of. 

274 

275 Returns 

276 ------- 

277 Tuple of arrays: 

278 variances: ndarray of shape (n_clusters, n_features) 

279 For each cluster, the variance in each dimension of the data. 

280 weights: ndarray of shape (n_clusters, ) 

281 Weight (proportion of quantity of data point) of each cluster. 

282 """ 

283 input_is_dask, data = check_and_persist_dask_input(data) 

284 data = array_to_delayed_list(data, input_is_dask) 

285 

286 if input_is_dask: 

287 stats = [ 

288 dask.delayed(accumulate_indices_means_vars)( 

289 xx, means=self.centroids_ 

290 ) 

291 for xx in data 

292 ] 

293 variances, weights = dask.compute( 

294 dask.delayed(reduce_indices_means_vars)(stats) 

295 )[0] 

296 else: 

297 # Accumulate 

298 stats = accumulate_indices_means_vars(data, self.centroids_) 

299 # Reduce 

300 variances, weights = reduce_indices_means_vars([stats]) 

301 

302 return variances, weights 

303 

304 def initialize(self, data: np.ndarray): 

305 """Assigns the means to an initial value using a specified method or randomly.""" 

306 logger.debug("k-means initialization") 

307 logger.debug(f"Initializing k-means means with '{self.init_method}'.") 

308 # k_init requires da.Array as input. 

309 logger.debug("Transform k-means data to dask array") 

310 data = da.array(data) 

311 data.rechunk(1, data.shape[-1]) # Prevents issue with large arrays. 

312 logger.debug("Get k-means centroids") 

313 self.centroids_ = k_init( 

314 X=data, 

315 n_clusters=self.n_clusters, 

316 init=self.init_method, 

317 random_state=self.random_state, 

318 max_iter=self.init_max_iter, 

319 oversampling_factor=self.oversampling_factor, 

320 ) 

321 logger.debug("End of k-means initialization") 

322 

323 def fit(self, X, y=None): 

324 """Fits this machine on data samples.""" 

325 

326 input_is_dask, X = check_and_persist_dask_input(X) 

327 

328 logger.debug("Initializing trainer.") 

329 self.initialize(data=X) 

330 

331 logger.debug("Get the number of samples") 

332 n_samples = len(X) 

333 

334 logger.debug("Transform X array to delayed list") 

335 X = array_to_delayed_list(X, input_is_dask) 

336 

337 logger.info("Training k-means.") 

338 distance = np.inf 

339 step = 0 

340 while self.max_iter is None or step < self.max_iter: 

341 step += 1 

342 logger.info( 

343 f"Iteration {step:3d}" 

344 + (f"/{self.max_iter:3d}" if self.max_iter else "") 

345 ) 

346 distance_previous = distance 

347 

348 # compute the e-m steps 

349 if input_is_dask: 

350 stats = [ 

351 dask.delayed(e_step)(xx, means=self.centroids_) for xx in X 

352 ] 

353 self.centroids_, self.average_min_distance = dask.compute( 

354 dask.delayed(m_step)(stats, n_samples) 

355 )[0] 

356 else: 

357 stats = [e_step(X, means=self.centroids_)] 

358 self.centroids_, self.average_min_distance = m_step( 

359 stats, n_samples 

360 ) 

361 

362 distance = self.average_min_distance 

363 

364 logger.debug( 

365 f"Average minimal squared Euclidean distance = {distance}" 

366 ) 

367 

368 if step > 1: 

369 convergence_value = abs( 

370 (distance_previous - distance) / distance_previous 

371 ) 

372 logger.debug( 

373 f"Convergence value = {convergence_value} and threshold is {self.convergence_threshold}" 

374 ) 

375 

376 # Terminates if converged (and threshold is set) 

377 if ( 

378 self.convergence_threshold is not None 

379 and convergence_value <= self.convergence_threshold 

380 ): 

381 logger.info( 

382 "Reached convergence threshold. Training stopped." 

383 ) 

384 break 

385 

386 else: 

387 logger.info( 

388 "Reached maximum step. Training stopped without convergence." 

389 ) 

390 return self 

391 

392 def transform(self, X): 

393 """Returns all the distances between the data and each cluster's mean. 

394 

395 Parameters 

396 ---------- 

397 X: ndarray of shape (n_samples, n_features) 

398 Series of data points. 

399 

400 Returns 

401 ------- 

402 distances: ndarray of shape (n_clusters, n_samples) 

403 For each mean, for each point, the squared Euclidian distance between them. 

404 """ 

405 return get_centroids_distance(X, self.centroids_) 

406 

407 def predict(self, X): 

408 """Returns the labels of the closest cluster centroid to the data. 

409 

410 Parameters 

411 ---------- 

412 X: ndarray of shape (n_samples, n_features) 

413 Series of data points. 

414 

415 Returns 

416 ------- 

417 indices: ndarray of shape (n_samples) 

418 The indices of the closest cluster for each data point. 

419 """ 

420 return get_closest_centroid_index( 

421 get_centroids_distance(X, self.centroids_) 

422 )