Coverage for src/bob/bio/spear/audio_processing.py: 95%
273 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 22:52 +0200
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-12 22:52 +0200
1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3# Elie Khoury <Elie.Khoury@idiap.ch>
4# Andre Anjos <andre.anjos@idiap.ch>
5# Pavel Korshunov <Pavel.Korshunov@idiap.ch>
6# Amir Mohammadi <amir.mohammadi@idiap.ch>
8import logging
9import math
10import sys
12from typing import Optional, Tuple, Union
14import numpy
15import torch
16import torchaudio
18logger = logging.getLogger(__name__)
21def fft(src, dst=None):
22 out = numpy.fft.fft(src)
23 if dst is not None:
24 dst[:] = out
25 return out
28def init_hamming_kernel(win_length):
29 # Hamming initialization
30 cst = 2 * math.pi / (win_length - 1.0)
31 hamming_kernel = numpy.zeros(win_length)
33 for i in range(win_length):
34 hamming_kernel[i] = 0.54 - 0.46 * math.cos(i * cst)
35 return hamming_kernel
38def init_freqfilter(rate, win_size, mel_scale, n_filters, f_min, f_max):
39 # Compute cut-off frequencies
40 p_index = numpy.array(numpy.zeros(n_filters + 2), dtype=numpy.float64)
41 if mel_scale:
42 # Mel scale
43 m_max = mel_python(f_max)
44 m_min = mel_python(f_min)
46 for i in range(n_filters + 2):
47 alpha = float(i) / (n_filters + 1)
48 f = mel_inv_python(m_min * (1 - alpha) + m_max * alpha)
49 factor = float(f) / rate
50 p_index[i] = win_size * factor
51 else:
52 # linear scale
53 for i in range(n_filters + 2):
54 alpha = float(i) / (n_filters + 1)
55 f = f_min * (1.0 - alpha) + f_max * alpha
56 p_index[i] = float(win_size) / rate * f
57 return p_index
60def init_dct_kernel(n_filters, n_ceps, dct_norm):
61 dct_kernel = numpy.zeros([n_ceps, n_filters], dtype=numpy.float64)
63 dct_coeff = 1.0
64 if dct_norm:
65 dct_coeff = math.sqrt(2.0 / n_filters)
67 for i in range(0, n_ceps):
68 for j in range(0, n_filters):
69 dct_kernel[i][j] = dct_coeff * math.cos(
70 math.pi * i * (j + 0.5) / float(n_filters)
71 )
73 if dct_norm:
74 column_multiplier = numpy.ones(n_ceps, dtype=numpy.float64)
75 column_multiplier[0] = math.sqrt(
76 0.5
77 ) # first element sqrt(0.5), the rest are 1.
78 for j in range(0, n_filters):
79 dct_kernel[:, j] = column_multiplier * dct_kernel[:, j]
81 return dct_kernel
84def resample(
85 audio: Union[numpy.ndarray, torch.Tensor],
86 rate: int,
87 new_rate: int,
88 **kwargs,
89) -> Union[numpy.ndarray, torch.Tensor]:
90 """Resamples the audio to a new sample rate.
92 Parameters
93 ----------
94 audio:
95 The audio to resample.
96 rate:
97 The original sample rate of the audio.
98 new_rate:
99 The wanted sample rate of the output audio.
100 kwargs:
101 Arguments passed to :py:class:``torchaudio.transforms.Resample``.
102 """
104 if rate == new_rate:
105 return audio
107 was_numpy = False
108 if isinstance(audio, numpy.ndarray):
109 audio = torch.from_numpy(audio)
110 was_numpy = True
112 resampler = torchaudio.transforms.Resample(
113 rate, new_rate, dtype=torch.float32, **kwargs
114 )
115 audio = resampler(audio)
117 return audio.numpy() if was_numpy else audio
120def read(
121 filename: str,
122 channel: Optional[int] = None,
123 force_sample_rate: Optional[int] = None,
124) -> Tuple[numpy.ndarray, int]:
125 """Reads audio file and returns the signal and the sampling rate
127 Parameters
128 ----------
129 filename:
130 The full path to the audio file to load.
131 channel:
132 The channel to load. If None, all channels will be loaded and returned in a 2D
133 array in the shape (n_channels, n_samples).
134 force_sample_rate:
135 If specified, the audio will be resampled to the specified rate. Otherwise, the
136 sample rate of the file will be used.
138 Returns
139 -------
140 signal and sampling rate
141 The signal in int16 range (-32768 to 32767) and float32 format, and the
142 sampling rate in Hz.
143 """
145 data, rate = torchaudio.load(str(filename))
147 if channel is not None:
148 data = data[channel]
149 else:
150 if data.ndim > 1:
151 data = data[0]
153 if force_sample_rate is not None:
154 data = resample(data, rate, force_sample_rate)
155 rate = force_sample_rate
157 # Expected data is in float32 format and int16 range (-32768. to 32767.)
158 data = data.numpy().astype(numpy.float32) * 32768
160 return data, rate
163def audio_info(filename: str):
164 """Returns the audio info of a file.
166 Parameters
167 ----------
168 filename:
169 The full path to the audio file to load.
171 Returns
172 -------
173 info: torchaudio.backend.common.AudioMetaData
174 A dictionary containing the audio information.
175 """
177 return torchaudio.info(str(filename))
180def compare(v1, v2, width):
181 return abs(v1 - v2) <= width
184def mel_python(f):
185 return 2595.0 * math.log10(1.0 + f / 700.0)
188def mel_inv_python(value):
189 return 700.0 * (10 ** (value / 2595.0) - 1)
192def sig_norm(win_length, frame, flag):
193 gain = 0.0
194 for i in range(win_length):
195 gain = gain + frame[i] * frame[i]
197 ENERGY_FLOOR = 1.0
198 if gain < ENERGY_FLOOR:
199 gain = math.log(ENERGY_FLOOR)
200 else:
201 gain = math.log(gain)
203 if flag and gain != 0.0:
204 for i in range(win_length):
205 frame[i] = frame[i] / gain
206 return gain
209def pre_emphasis(frame, win_shift, coef, last_frame_elem):
211 if (coef <= 0.0) or (coef > 1.0):
212 print("Error: The emphasis coeff. should be between 0 and 1")
213 return None
215 last_element = frame[win_shift - 1]
216 return (
217 numpy.append(
218 frame[0] - coef * last_frame_elem, frame[1:] - coef * frame[:-1]
219 ),
220 last_element,
221 )
224def hamming_window(vector, hamming_kernel, win_length):
225 for i in range(win_length):
226 vector[i] = vector[i] * hamming_kernel[i]
227 return vector
230def log_filter_bank(frame, n_filters, p_index, win_size, energy_filter):
231 x1 = numpy.array(frame, dtype=numpy.complex128)
232 complex_ = fft(x1)
233 abscomplex = numpy.absolute(complex_)
234 half_frame = abscomplex[0 : int(win_size / 2) + 1]
236 if energy_filter:
237 # Energy is basically magnitude in power of 2
238 half_frame = half_frame**2
240 frame[0 : int(win_size / 2) + 1] = half_frame
241 filters = log_triangular_bank(frame, n_filters, p_index)
242 return filters, frame
245def log_triangular_bank(data, n_filters, p_index):
246 res_ = numpy.zeros(n_filters, dtype=numpy.float64)
248 denominator = 1.0 / (
249 p_index[1 : n_filters + 2] - p_index[0 : n_filters + 1]
250 )
252 for i in range(0, n_filters):
253 li = int(math.floor(p_index[i] + 1))
254 mi = int(math.floor(p_index[i + 1]))
255 ri = int(math.floor(p_index[i + 2]))
256 if i == 0 or li == ri:
257 li -= 1
259 vec_left = numpy.arange(li, mi + 1)
260 vec_right = numpy.arange(mi + 1, ri + 1)
261 res_[i] = numpy.sum(
262 data[vec_left] * denominator[i] * (vec_left - p_index[i])
263 ) + numpy.sum(
264 data[vec_right] * denominator[i + 1] * (p_index[i + 2] - vec_right)
265 )
266 # alternative but equivalent implementation:
267 # filt = numpy.zeros(ri-li+1, dtype=numpy.float64)
268 # filt_l = denominator[i] * (vec_left-p_index[i])
269 # filt_p = denominator[i+1] * (p_index[i+2]-vec_right)
270 # filt = numpy.append(filt_l, filt_p)
271 # vect_full = numpy.arange(li, ri+1)
272 # res_[i] = numpy.sum(data[vect_full] * filt)
274 FBANK_OUT_FLOOR = sys.float_info.epsilon
275 return numpy.log(numpy.where(res_ < FBANK_OUT_FLOOR, FBANK_OUT_FLOOR, res_))
278def dct_transform(filters, n_filters, dct_kernel, n_ceps):
280 ceps = numpy.zeros(n_ceps)
281 vec = numpy.array(range(0, n_filters))
282 for i in range(0, n_ceps):
283 ceps[i] = numpy.sum(filters[vec] * dct_kernel[i])
285 return ceps
288def energy(data, rate, *, win_length_ms=20.0, win_shift_ms=10.0):
290 #########################
291 # Initialisation part ##
292 #########################
294 win_length = int(rate * win_length_ms / 1000)
295 win_shift = int(rate * win_shift_ms / 1000)
296 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
298 ######################################
299 # End of the Initialisation part ###
300 ######################################
302 ######################################
303 # Core code ###
304 ######################################
306 data_size = data.shape[0]
307 n_frames = int(1 + (data_size - win_length) / win_shift)
309 # create features set
311 features = [0 for j in range(n_frames)]
313 # compute cepstral coefficients
314 for i in range(n_frames):
315 # create a frame
316 frame = numpy.zeros(win_size, dtype=numpy.float64)
317 vec = numpy.arange(win_length)
318 frame[vec] = data[vec + i * win_shift]
319 som = numpy.sum(frame)
320 som = som / win_size
321 frame[vec] -= som # normalization by mean here
323 energy = sig_norm(win_length, frame, False)
324 features[i] = energy
326 return numpy.array(features)
329def spectrogram(
330 data,
331 rate,
332 *,
333 win_length_ms=20.0,
334 win_shift_ms=10.0,
335 n_filters=24,
336 f_min=0.0,
337 f_max=4000.0,
338 pre_emphasis_coef=0.95,
339 mel_scale=True,
340 energy_filter=False,
341 log_filter=True,
342 energy_bands=False,
343):
344 #########################
345 # Initialisation part ##
346 #########################
348 win_length = int(rate * win_length_ms / 1000)
349 win_shift = int(rate * win_shift_ms / 1000)
350 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
352 # Hamming initialisation
353 hamming_kernel = init_hamming_kernel(win_length)
355 # Compute cut-off frequencies
356 p_index = init_freqfilter(
357 rate, win_size, mel_scale, n_filters, f_min, f_max
358 )
360 ######################################
361 # End of the Initialisation part ###
362 ######################################
364 ######################################
365 # Core code ###
366 ######################################
368 data_size = data.shape[0]
369 n_frames = int(1 + (data_size - win_length) / win_shift)
371 # create features set
372 features = numpy.zeros(
373 [n_frames, int(win_size / 2) + 1], dtype=numpy.float64
374 )
376 last_frame_elem = 0
377 # compute cepstral coefficients
378 for i in range(n_frames):
379 # create a frame
380 frame = numpy.zeros(win_size, dtype=numpy.float64)
381 vec = numpy.arange(win_length)
382 frame[vec] = data[vec + i * win_shift]
383 som = numpy.sum(frame)
384 som = som / win_size
385 frame[vec] -= som # normalization by mean here
387 frame_, last_frame_elem = pre_emphasis(
388 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem
389 )
390 frame[vec] = frame_
392 # Hamming windowing
393 frame = hamming_window(frame, hamming_kernel, win_length)
395 _, spec_row = log_filter_bank(
396 frame, n_filters, p_index, win_size, energy_filter
397 )
399 features[i] = spec_row[0 : int(win_size / 2) + 1]
401 return numpy.array(features)
404def cepstral(
405 data,
406 rate,
407 *,
408 win_length_ms=20,
409 win_shift_ms=10,
410 n_filters=20,
411 f_min=0.0,
412 f_max=4000.0,
413 pre_emphasis_coef=1.0,
414 mel_scale=True,
415 n_ceps=19,
416 delta_win=2,
417 dct_norm=True,
418 with_energy=True,
419 with_delta=True,
420 with_delta_delta=True,
421):
423 #########################
424 # Initialisation part ##
425 #########################
427 win_length = int(rate * win_length_ms / 1000)
428 win_shift = int(rate * win_shift_ms / 1000)
429 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
431 # Hamming initialisation
432 hamming_kernel = init_hamming_kernel(win_length)
434 # Compute cut-off frequencies
435 p_index = init_freqfilter(
436 rate,
437 win_size,
438 mel_scale,
439 n_filters,
440 f_min,
441 f_max,
442 )
444 # Cosine transform initialisation
445 dct_kernel = init_dct_kernel(n_filters, n_ceps, dct_norm)
447 ######################################
448 # End of the Initialisation part ###
449 ######################################
451 ######################################
452 # Core code ###
453 ######################################
455 data_size = data.shape[0]
456 n_frames = int(1 + (data_size - win_length) / win_shift)
458 # create features set
459 dim0 = n_ceps
460 if with_energy:
461 dim0 += +1
462 dim = dim0
463 if with_delta:
464 dim += dim0
465 if with_delta_delta:
466 dim += dim0
467 else:
468 with_delta_delta = False
470 features = numpy.zeros([n_frames, dim], dtype=numpy.float64)
472 last_frame_elem = 0
473 # compute cepstral coefficients
474 for i in range(n_frames):
475 # create a frame
476 frame = numpy.zeros(win_size, dtype=numpy.float64)
477 vec = numpy.arange(win_length)
478 frame[vec] = data[vec + i * win_shift]
479 som = numpy.sum(frame)
480 som = som / win_size
481 frame[vec] -= som # normalization by mean here
483 if with_energy:
484 energy = sig_norm(win_length, frame, False)
486 # pre-emphasis filtering
487 frame_, last_frame_elem = pre_emphasis(
488 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem
489 )
490 frame[vec] = frame_
492 # Hamming windowing
493 frame = hamming_window(frame, hamming_kernel, win_length)
495 # FFT and filters
496 filters, _ = log_filter_bank(
497 frame, n_filters, p_index, win_size, energy_filter=False
498 )
500 # apply DCT
501 ceps = dct_transform(filters, n_filters, dct_kernel, n_ceps)
503 ######################################
504 # Deltas and Delta-Deltas ###
505 ######################################
507 d1 = n_ceps
508 if with_energy:
509 d1 = n_ceps + 1
510 ceps = numpy.append(ceps, energy)
512 # stock the results in features matrix
513 vec = numpy.arange(d1)
514 features[i][0:d1] = ceps[vec]
516 # compute Delta coefficient
517 if with_delta:
518 som = 0.0
519 for i in range(1, delta_win + 1):
520 som = som + i * i
521 som = som * 2
523 for i in range(n_frames):
524 for k in range(n_ceps):
525 features[i][d1 + k] = 0.0
526 for ll in range(1, delta_win + 1):
527 if i + ll < n_frames:
528 p_ind = i + ll
529 else:
530 p_ind = n_frames - 1
531 if i - ll > 0:
532 n_ind = i - ll
533 else:
534 n_ind = 0
535 features[i][d1 + k] = features[i][d1 + k] + ll * (
536 features[p_ind][k] - features[n_ind][k]
537 )
538 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore
540 # compute Delta of the Energy
541 if with_delta and with_energy:
542 som = 0.0
544 vec = numpy.arange(1, delta_win + 1)
545 som = 2.0 * numpy.sum(vec * vec)
547 for i in range(n_frames):
548 k = n_ceps
549 features[i][d1 + k] = 0.0
550 for ll in range(1, delta_win + 1):
551 if i + ll < n_frames:
552 p_ind = i + ll
553 else:
554 p_ind = n_frames - 1
555 if i - ll > 0:
556 n_ind = i - ll
557 else:
558 n_ind = 0
559 features[i][d1 + k] = features[i][d1 + k] + ll * (
560 features[p_ind][k] - features[n_ind][k]
561 )
562 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore
564 # compute Delta Delta of the coefficients
565 if with_delta_delta:
566 som = 0.0
567 for i in range(1, delta_win + 1):
568 som = som + i * i
569 som = som * 2
570 for i in range(n_frames):
571 for k in range(n_ceps):
572 features[i][2 * d1 + k] = 0.0
573 for ll in range(1, delta_win + 1):
574 if i + ll < n_frames:
575 p_ind = i + ll
576 else:
577 p_ind = n_frames - 1
578 if i - ll > 0:
579 n_ind = i - ll
580 else:
581 n_ind = 0
582 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * (
583 features[p_ind][d1 + k] - features[n_ind][d1 + k]
584 )
585 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore
587 # compute Delta Delta of the energy
588 if with_delta_delta and with_energy:
589 som = 0.0
590 for i in range(1, delta_win + 1):
591 som = som + i * i
592 som = som * 2
593 for i in range(n_frames):
594 k = n_ceps
595 features[i][2 * d1 + k] = 0.0
596 for ll in range(1, delta_win + 1):
597 if i + ll < n_frames:
598 p_ind = i + ll
599 else:
600 p_ind = n_frames - 1
601 if i - ll > 0:
602 n_ind = i - ll
603 else:
604 n_ind = 0
605 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * (
606 features[p_ind][d1 + k] - features[n_ind][d1 + k]
607 )
608 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore
610 return numpy.array(features)