Coverage for src/bob/learn/em/kmeans.py: 99%
146 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-06-16 14:34 +0200
« prev ^ index » next coverage.py v7.0.5, created at 2023-06-16 14:34 +0200
1#!/usr/bin/env python
2# @author: Yannick Dayer <yannick.dayer@idiap.ch>
3# @date: Tue 27 Jul 2021 11:04:10 UTC+02
5import logging
7from typing import Union
9import dask
10import dask.array as da
11import dask.delayed
12import numpy as np
13import scipy.spatial.distance
15from dask_ml.cluster.k_means import k_init
16from sklearn.base import BaseEstimator
18from .utils import array_to_delayed_list, check_and_persist_dask_input
20logger = logging.getLogger(__name__)
23def get_centroids_distance(x: np.ndarray, means: np.ndarray) -> np.ndarray:
24 """Returns the distance values between x and each cluster's centroid.
26 The returned values are squared Euclidean distances.
28 Parameters
29 ----------
30 x: ndarray of shape (n_samples, n_features)
31 A series of data points.
32 means: ndarray of shape (n_clusters, n_features)
33 The centroids.
35 Returns
36 -------
37 distances: ndarray of shape (n_clusters, n_samples)
38 For each cluster, the squared Euclidian distance (or distances) to x.
39 """
40 x = np.atleast_2d(x)
41 if isinstance(x, da.Array):
42 distances = []
43 for i in range(means.shape[0]):
44 distances.append(np.sum((means[i] - x) ** 2, axis=-1))
45 return da.vstack(distances)
46 else:
47 return scipy.spatial.distance.cdist(means, x, metric="sqeuclidean")
50def get_closest_centroid_index(centroids_dist: np.ndarray) -> np.ndarray:
51 """Returns the index of the closest cluster mean to x.
53 Parameters
54 ----------
55 centroids_dist: ndarray of shape (n_clusters, n_samples)
56 The squared Euclidian distance (or distances) to each cluster mean.
58 Returns
59 -------
60 closest_centroid_indices: ndarray of shape (n_samples,)
61 The index of the closest cluster mean to x.
62 """
63 return np.argmin(centroids_dist, axis=0)
66def e_step(data, means):
67 """Computes the zero-th and first order statistics and average min distance
68 for each data point.
70 Parameters
71 ----------
72 data : array-like, shape (n_samples, n_features)
73 The data.
75 means : array-like, shape (n_clusters, n_features)
76 The cluster centers.
79 Returns
80 -------
81 zeroeth_order_statistics : array-like, shape (n_samples,)
82 The zero-th order statistics.
83 first_order_statistics : array-like, shape (n_samples, n_clusters)
84 The first order statistics.
85 avg_min_dist : float
86 """
87 n_clusters = len(means)
88 distances = get_centroids_distance(data, means)
89 closest_k_indices = get_closest_centroid_index(distances)
90 zeroeth_order_statistics = np.bincount(
91 closest_k_indices, minlength=n_clusters
92 )
93 # Compute first_order_statistics in a memory efficient way
94 first_order_statistics = np.zeros((n_clusters, data.shape[1]))
95 for i in range(n_clusters):
96 first_order_statistics[i] = np.sum(data[closest_k_indices == i], axis=0)
97 min_distance = np.min(distances, axis=0)
98 average_min_distance = min_distance.mean()
99 return (
100 zeroeth_order_statistics,
101 first_order_statistics,
102 average_min_distance,
103 )
106def m_step(stats, n_samples):
107 """Computes the cluster centers and average minimum distance.
109 Parameters
110 ----------
111 stats : list
112 A list which contains the results of the :any:`e_step` function applied
113 on each chunk of data.
114 n_samples : int
115 The total number of samples.
117 Returns
118 -------
119 means : array-like, shape (n_clusters, n_features)
120 The cluster centers.
121 avg_min_dist : float
122 The average minimum distance.
123 """
124 (
125 zeroeth_order_statistics,
126 first_order_statistics,
127 average_min_distance,
128 ) = (0, 0, 0)
129 for zeroeth_, first_, average_ in stats:
130 zeroeth_order_statistics += zeroeth_
131 first_order_statistics += first_
132 average_min_distance += average_
133 average_min_distance /= n_samples
135 means = first_order_statistics / zeroeth_order_statistics[:, None]
136 return means, average_min_distance
139def accumulate_indices_means_vars(data, means):
140 """Accumulates statistics needed to compute weights and variances of the clusters."""
141 n_clusters, n_features = len(means), data.shape[1]
142 dist = get_centroids_distance(data, means)
143 closest_centroid_indices = get_closest_centroid_index(dist)
144 # the means_sum and variances_sum must be initialized with zero here since
145 # they get accumulated in the next function
146 means_sum = np.zeros((n_clusters, n_features), like=data)
147 variances_sum = np.zeros((n_clusters, n_features), like=data)
148 for i in range(n_clusters):
149 means_sum[i] = np.sum(data[closest_centroid_indices == i], axis=0)
150 for i in range(n_clusters):
151 variances_sum[i] = np.sum(
152 data[closest_centroid_indices == i] ** 2, axis=0
153 )
154 return closest_centroid_indices, means_sum, variances_sum
157def reduce_indices_means_vars(stats):
158 """Computes weights and variances of the clusters given the statistics."""
159 closest_centroid_indices = [s[0] for s in stats]
160 means_sum = [s[1] for s in stats]
161 variances_sum = [s[2] for s in stats]
163 closest_centroid_indices = np.concatenate(closest_centroid_indices, axis=0)
164 means_sum = np.sum(means_sum, axis=0)
165 variances_sum = np.sum(variances_sum, axis=0)
167 n_clusters = len(means_sum)
168 weights_count = np.bincount(closest_centroid_indices, minlength=n_clusters)
169 weights = weights_count / weights_count.sum()
170 means = means_sum / weights_count[:, None]
171 variances = (variances_sum / weights_count[:, None]) - (means**2)
173 return variances, weights
176class KMeansMachine(BaseEstimator):
177 """Stores the k-means clusters parameters (centroid of each cluster).
179 Allows the clustering of data with the ``fit`` method.
181 The training works in two phases:
182 - An initialization (setting the initial values of the centroids)
183 - An e-m loop reducing the total distance between the data points and their
184 closest centroid.
186 The initialization can use an iterative process to find the best set of
187 coordinates, use random starting points, or take specified coordinates. The
188 ``init_method`` parameter specifies which of these behavior is considered.
190 Attributes
191 ----------
192 centroids_: ndarray of shape (n_clusters, n_features)
193 The current clusters centroids. Available after fitting.
194 """
196 def __init__(
197 self,
198 n_clusters: int,
199 init_method: Union[str, np.ndarray] = "k-means||",
200 convergence_threshold: float = 1e-5,
201 max_iter: int = 20,
202 random_state: Union[int, np.random.RandomState] = 0,
203 init_max_iter: Union[int, None] = 5,
204 oversampling_factor: float = 2,
205 **kwargs,
206 ) -> None:
207 """
208 Parameters
209 ----------
210 n_clusters: int
211 The number of represented clusters.
212 init_method:
213 One of: "random", "k-means++", or "k-means||", or an array with the wanted
214 starting values of the centroids.
215 max_iter:
216 The maximum number of iterations for the e-m part.
217 random_state:
218 A seed or RandomState used for the initialization part.
219 init_max_iter:
220 The maximum number of iterations for the initialization part.
221 """
223 super().__init__(**kwargs)
225 if n_clusters < 1:
226 raise ValueError("The Number of cluster should be greater thant 0.")
227 self.n_clusters = n_clusters
228 self.init_method = init_method
229 self.convergence_threshold = convergence_threshold
230 self.max_iter = max_iter
231 self.random_state = random_state
232 self.init_max_iter = init_max_iter
233 self.oversampling_factor = oversampling_factor
234 self.average_min_distance = np.inf
235 self.zeroeth_order_statistics = None
236 self.first_order_statistics = None
237 self.centroids_ = None
239 @property
240 def means(self) -> np.ndarray:
241 """An alias for `centroids_`."""
242 return self.centroids_
244 @means.setter
245 def means(self, value: np.ndarray):
246 self.centroids_ = value
248 def __eq__(self, obj) -> bool:
249 return self.is_similar_to(obj, r_epsilon=0, a_epsilon=0)
251 def is_similar_to(self, obj, r_epsilon=1e-05, a_epsilon=1e-08) -> bool:
252 if self.centroids_ is not None and obj.centroids_ is not None:
253 return np.allclose(
254 self.centroids_, obj.centroids_, rtol=r_epsilon, atol=a_epsilon
255 )
256 else:
257 logger.warning(
258 "KMeansMachine `centroids_` was not set. You should call 'fit' first."
259 )
260 return False
262 def get_variances_and_weights_for_each_cluster(self, data: np.ndarray):
263 """Returns the clusters variance and weight for data clustered by the machine.
265 For each cluster, finds the subset of the samples that is closest to that
266 centroid, and calculates:
267 1) the variance of that subset (the cluster variance)
268 2) the proportion of samples represented by that subset (the cluster weight)
270 Parameters
271 ----------
272 data:
273 The data to compute the variance of.
275 Returns
276 -------
277 Tuple of arrays:
278 variances: ndarray of shape (n_clusters, n_features)
279 For each cluster, the variance in each dimension of the data.
280 weights: ndarray of shape (n_clusters, )
281 Weight (proportion of quantity of data point) of each cluster.
282 """
283 input_is_dask, data = check_and_persist_dask_input(data)
284 data = array_to_delayed_list(data, input_is_dask)
286 if input_is_dask:
287 stats = [
288 dask.delayed(accumulate_indices_means_vars)(
289 xx, means=self.centroids_
290 )
291 for xx in data
292 ]
293 variances, weights = dask.compute(
294 dask.delayed(reduce_indices_means_vars)(stats)
295 )[0]
296 else:
297 # Accumulate
298 stats = accumulate_indices_means_vars(data, self.centroids_)
299 # Reduce
300 variances, weights = reduce_indices_means_vars([stats])
302 return variances, weights
304 def initialize(self, data: np.ndarray):
305 """Assigns the means to an initial value using a specified method or randomly."""
306 logger.debug("k-means initialization")
307 logger.debug(f"Initializing k-means means with '{self.init_method}'.")
308 # k_init requires da.Array as input.
309 logger.debug("Transform k-means data to dask array")
310 data = da.array(data)
311 data.rechunk(1, data.shape[-1]) # Prevents issue with large arrays.
312 logger.debug("Get k-means centroids")
313 self.centroids_ = k_init(
314 X=data,
315 n_clusters=self.n_clusters,
316 init=self.init_method,
317 random_state=self.random_state,
318 max_iter=self.init_max_iter,
319 oversampling_factor=self.oversampling_factor,
320 )
321 logger.debug("End of k-means initialization")
323 def fit(self, X, y=None):
324 """Fits this machine on data samples."""
326 input_is_dask, X = check_and_persist_dask_input(X)
328 logger.debug("Initializing trainer.")
329 self.initialize(data=X)
331 logger.debug("Get the number of samples")
332 n_samples = len(X)
334 logger.debug("Transform X array to delayed list")
335 X = array_to_delayed_list(X, input_is_dask)
337 logger.info("Training k-means.")
338 distance = np.inf
339 step = 0
340 while self.max_iter is None or step < self.max_iter:
341 step += 1
342 logger.info(
343 f"Iteration {step:3d}"
344 + (f"/{self.max_iter:3d}" if self.max_iter else "")
345 )
346 distance_previous = distance
348 # compute the e-m steps
349 if input_is_dask:
350 stats = [
351 dask.delayed(e_step)(xx, means=self.centroids_) for xx in X
352 ]
353 self.centroids_, self.average_min_distance = dask.compute(
354 dask.delayed(m_step)(stats, n_samples)
355 )[0]
356 else:
357 stats = [e_step(X, means=self.centroids_)]
358 self.centroids_, self.average_min_distance = m_step(
359 stats, n_samples
360 )
362 distance = self.average_min_distance
364 logger.debug(
365 f"Average minimal squared Euclidean distance = {distance}"
366 )
368 if step > 1:
369 convergence_value = abs(
370 (distance_previous - distance) / distance_previous
371 )
372 logger.debug(
373 f"Convergence value = {convergence_value} and threshold is {self.convergence_threshold}"
374 )
376 # Terminates if converged (and threshold is set)
377 if (
378 self.convergence_threshold is not None
379 and convergence_value <= self.convergence_threshold
380 ):
381 logger.info(
382 "Reached convergence threshold. Training stopped."
383 )
384 break
386 else:
387 logger.info(
388 "Reached maximum step. Training stopped without convergence."
389 )
390 return self
392 def transform(self, X):
393 """Returns all the distances between the data and each cluster's mean.
395 Parameters
396 ----------
397 X: ndarray of shape (n_samples, n_features)
398 Series of data points.
400 Returns
401 -------
402 distances: ndarray of shape (n_clusters, n_samples)
403 For each mean, for each point, the squared Euclidian distance between them.
404 """
405 return get_centroids_distance(X, self.centroids_)
407 def predict(self, X):
408 """Returns the labels of the closest cluster centroid to the data.
410 Parameters
411 ----------
412 X: ndarray of shape (n_samples, n_features)
413 Series of data points.
415 Returns
416 -------
417 indices: ndarray of shape (n_samples)
418 The indices of the closest cluster for each data point.
419 """
420 return get_closest_centroid_index(
421 get_centroids_distance(X, self.centroids_)
422 )