Coverage for src/bob/bio/spear/audio_processing.py: 94%
282 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-06 22:04 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-06 22:04 +0100
1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3# Elie Khoury <Elie.Khoury@idiap.ch>
4# Andre Anjos <andre.anjos@idiap.ch>
5# Pavel Korshunov <Pavel.Korshunov@idiap.ch>
6# Amir Mohammadi <amir.mohammadi@idiap.ch>
8import importlib
9import logging
10import math
11import sys
13from typing import Optional, Tuple, Union
15import numpy
16import torch
18logger = logging.getLogger(__name__)
21def fft(src, dst=None):
22 out = numpy.fft.fft(src)
23 if dst is not None:
24 dst[:] = out
25 return out
28def init_hamming_kernel(win_length):
29 # Hamming initialization
30 cst = 2 * math.pi / (win_length - 1.0)
31 hamming_kernel = numpy.zeros(win_length)
33 for i in range(win_length):
34 hamming_kernel[i] = 0.54 - 0.46 * math.cos(i * cst)
35 return hamming_kernel
38def init_freqfilter(rate, win_size, mel_scale, n_filters, f_min, f_max):
39 # Compute cut-off frequencies
40 p_index = numpy.array(numpy.zeros(n_filters + 2), dtype=numpy.float64)
41 if mel_scale:
42 # Mel scale
43 m_max = mel_python(f_max)
44 m_min = mel_python(f_min)
46 for i in range(n_filters + 2):
47 alpha = float(i) / (n_filters + 1)
48 f = mel_inv_python(m_min * (1 - alpha) + m_max * alpha)
49 factor = float(f) / rate
50 p_index[i] = win_size * factor
51 else:
52 # linear scale
53 for i in range(n_filters + 2):
54 alpha = float(i) / (n_filters + 1)
55 f = f_min * (1.0 - alpha) + f_max * alpha
56 p_index[i] = float(win_size) / rate * f
57 return p_index
60def init_dct_kernel(n_filters, n_ceps, dct_norm):
61 dct_kernel = numpy.zeros([n_ceps, n_filters], dtype=numpy.float64)
63 dct_coeff = 1.0
64 if dct_norm:
65 dct_coeff = math.sqrt(2.0 / n_filters)
67 for i in range(0, n_ceps):
68 for j in range(0, n_filters):
69 dct_kernel[i][j] = dct_coeff * math.cos(
70 math.pi * i * (j + 0.5) / float(n_filters)
71 )
73 if dct_norm:
74 column_multiplier = numpy.ones(n_ceps, dtype=numpy.float64)
75 column_multiplier[0] = math.sqrt(
76 0.5
77 ) # first element sqrt(0.5), the rest are 1.
78 for j in range(0, n_filters):
79 dct_kernel[:, j] = column_multiplier * dct_kernel[:, j]
81 return dct_kernel
84def resample(
85 audio: Union[numpy.ndarray, torch.Tensor],
86 rate: int,
87 new_rate: int,
88 **kwargs,
89) -> Union[numpy.ndarray, torch.Tensor]:
90 """Resamples the audio to a new sample rate.
92 Parameters
93 ----------
94 audio:
95 The audio to resample.
96 rate:
97 The original sample rate of the audio.
98 new_rate:
99 The wanted sample rate of the output audio.
100 kwargs:
101 Arguments passed to :py:class:``torchaudio.transforms.Resample``.
102 """
104 import torchaudio
106 if rate == new_rate:
107 return audio
109 was_numpy = False
110 if isinstance(audio, numpy.ndarray):
111 audio = torch.from_numpy(audio)
112 was_numpy = True
114 resampler = torchaudio.transforms.Resample(
115 rate, new_rate, dtype=torch.float32, **kwargs
116 )
117 audio = resampler(audio)
119 return audio.numpy() if was_numpy else audio
122def read(
123 filename: str,
124 channel: Optional[int] = None,
125 force_sample_rate: Optional[int] = None,
126) -> Tuple[numpy.ndarray, int]:
127 """Reads audio file and returns the signal and the sampling rate
129 Parameters
130 ----------
131 filename:
132 The full path to the audio file to load.
133 channel:
134 The channel to load. If None, all channels will be loaded and returned in a 2D
135 array in the shape (n_channels, n_samples).
136 force_sample_rate:
137 If specified, the audio will be resampled to the specified rate. Otherwise, the
138 sample rate of the file will be used.
140 Returns
141 -------
142 signal and sampling rate
143 The signal in int16 range (-32768 to 32767) and float32 format, and the
144 sampling rate in Hz.
145 """
147 import torchaudio
149 try:
150 importlib.__import__(
151 "soundfile"
152 ) # Try to import missing soundfile may throw an OSError.
153 torchaudio.set_audio_backend("soundfile") # May throw a RuntimeError.
154 except (RuntimeError, OSError) as e:
155 logger.warning(
156 "'soundfile' could not be imported or specified as torchaudio "
157 "backend. torchaudio may have trouble loading '.sph' files."
158 )
159 logger.info("error was %s", e)
161 data, rate = torchaudio.load(str(filename))
163 if channel is not None:
164 data = data[channel]
165 else:
166 if data.ndim > 1:
167 data = data[0]
169 if force_sample_rate is not None:
170 data = resample(data, rate, force_sample_rate)
171 rate = force_sample_rate
173 # Expected data is in float32 format and int16 range (-32768. to 32767.)
174 data = data.numpy().astype(numpy.float32) * 32768
176 return data, rate
179def audio_info(filename: str):
180 """Returns the audio info of a file.
182 Parameters
183 ----------
184 filename:
185 The full path to the audio file to load.
187 Returns
188 -------
189 info: torchaudio.backend.common.AudioMetaData
190 A dictionary containing the audio information.
191 """
193 import torchaudio
195 return torchaudio.info(str(filename))
198def compare(v1, v2, width):
199 return abs(v1 - v2) <= width
202def mel_python(f):
203 return 2595.0 * math.log10(1.0 + f / 700.0)
206def mel_inv_python(value):
207 return 700.0 * (10 ** (value / 2595.0) - 1)
210def sig_norm(win_length, frame, flag):
211 gain = 0.0
212 for i in range(win_length):
213 gain = gain + frame[i] * frame[i]
215 ENERGY_FLOOR = 1.0
216 if gain < ENERGY_FLOOR:
217 gain = math.log(ENERGY_FLOOR)
218 else:
219 gain = math.log(gain)
221 if flag and gain != 0.0:
222 for i in range(win_length):
223 frame[i] = frame[i] / gain
224 return gain
227def pre_emphasis(frame, win_shift, coef, last_frame_elem):
229 if (coef <= 0.0) or (coef > 1.0):
230 print("Error: The emphasis coeff. should be between 0 and 1")
231 return None
233 last_element = frame[win_shift - 1]
234 return (
235 numpy.append(
236 frame[0] - coef * last_frame_elem, frame[1:] - coef * frame[:-1]
237 ),
238 last_element,
239 )
242def hamming_window(vector, hamming_kernel, win_length):
243 for i in range(win_length):
244 vector[i] = vector[i] * hamming_kernel[i]
245 return vector
248def log_filter_bank(frame, n_filters, p_index, win_size, energy_filter):
249 x1 = numpy.array(frame, dtype=numpy.complex128)
250 complex_ = fft(x1)
251 abscomplex = numpy.absolute(complex_)
252 half_frame = abscomplex[0 : int(win_size / 2) + 1]
254 if energy_filter:
255 # Energy is basically magnitude in power of 2
256 half_frame = half_frame**2
258 frame[0 : int(win_size / 2) + 1] = half_frame
259 filters = log_triangular_bank(frame, n_filters, p_index)
260 return filters, frame
263def log_triangular_bank(data, n_filters, p_index):
264 res_ = numpy.zeros(n_filters, dtype=numpy.float64)
266 denominator = 1.0 / (
267 p_index[1 : n_filters + 2] - p_index[0 : n_filters + 1]
268 )
270 for i in range(0, n_filters):
271 li = int(math.floor(p_index[i] + 1))
272 mi = int(math.floor(p_index[i + 1]))
273 ri = int(math.floor(p_index[i + 2]))
274 if i == 0 or li == ri:
275 li -= 1
277 vec_left = numpy.arange(li, mi + 1)
278 vec_right = numpy.arange(mi + 1, ri + 1)
279 res_[i] = numpy.sum(
280 data[vec_left] * denominator[i] * (vec_left - p_index[i])
281 ) + numpy.sum(
282 data[vec_right] * denominator[i + 1] * (p_index[i + 2] - vec_right)
283 )
284 # alternative but equivalent implementation:
285 # filt = numpy.zeros(ri-li+1, dtype=numpy.float64)
286 # filt_l = denominator[i] * (vec_left-p_index[i])
287 # filt_p = denominator[i+1] * (p_index[i+2]-vec_right)
288 # filt = numpy.append(filt_l, filt_p)
289 # vect_full = numpy.arange(li, ri+1)
290 # res_[i] = numpy.sum(data[vect_full] * filt)
292 FBANK_OUT_FLOOR = sys.float_info.epsilon
293 return numpy.log(numpy.where(res_ < FBANK_OUT_FLOOR, FBANK_OUT_FLOOR, res_))
296def dct_transform(filters, n_filters, dct_kernel, n_ceps):
298 ceps = numpy.zeros(n_ceps)
299 vec = numpy.array(range(0, n_filters))
300 for i in range(0, n_ceps):
301 ceps[i] = numpy.sum(filters[vec] * dct_kernel[i])
303 return ceps
306def energy(data, rate, *, win_length_ms=20.0, win_shift_ms=10.0):
308 #########################
309 # Initialisation part ##
310 #########################
312 win_length = int(rate * win_length_ms / 1000)
313 win_shift = int(rate * win_shift_ms / 1000)
314 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
316 ######################################
317 # End of the Initialisation part ###
318 ######################################
320 ######################################
321 # Core code ###
322 ######################################
324 data_size = data.shape[0]
325 n_frames = int(1 + (data_size - win_length) / win_shift)
327 # create features set
329 features = [0 for j in range(n_frames)]
331 # compute cepstral coefficients
332 for i in range(n_frames):
333 # create a frame
334 frame = numpy.zeros(win_size, dtype=numpy.float64)
335 vec = numpy.arange(win_length)
336 frame[vec] = data[vec + i * win_shift]
337 som = numpy.sum(frame)
338 som = som / win_size
339 frame[vec] -= som # normalization by mean here
341 energy = sig_norm(win_length, frame, False)
342 features[i] = energy
344 return numpy.array(features)
347def spectrogram(
348 data,
349 rate,
350 *,
351 win_length_ms=20.0,
352 win_shift_ms=10.0,
353 n_filters=24,
354 f_min=0.0,
355 f_max=4000.0,
356 pre_emphasis_coef=0.95,
357 mel_scale=True,
358 energy_filter=False,
359 log_filter=True,
360 energy_bands=False,
361):
362 #########################
363 # Initialisation part ##
364 #########################
366 win_length = int(rate * win_length_ms / 1000)
367 win_shift = int(rate * win_shift_ms / 1000)
368 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
370 # Hamming initialisation
371 hamming_kernel = init_hamming_kernel(win_length)
373 # Compute cut-off frequencies
374 p_index = init_freqfilter(
375 rate, win_size, mel_scale, n_filters, f_min, f_max
376 )
378 ######################################
379 # End of the Initialisation part ###
380 ######################################
382 ######################################
383 # Core code ###
384 ######################################
386 data_size = data.shape[0]
387 n_frames = int(1 + (data_size - win_length) / win_shift)
389 # create features set
390 features = numpy.zeros(
391 [n_frames, int(win_size / 2) + 1], dtype=numpy.float64
392 )
394 last_frame_elem = 0
395 # compute cepstral coefficients
396 for i in range(n_frames):
397 # create a frame
398 frame = numpy.zeros(win_size, dtype=numpy.float64)
399 vec = numpy.arange(win_length)
400 frame[vec] = data[vec + i * win_shift]
401 som = numpy.sum(frame)
402 som = som / win_size
403 frame[vec] -= som # normalization by mean here
405 frame_, last_frame_elem = pre_emphasis(
406 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem
407 )
408 frame[vec] = frame_
410 # Hamming windowing
411 frame = hamming_window(frame, hamming_kernel, win_length)
413 _, spec_row = log_filter_bank(
414 frame, n_filters, p_index, win_size, energy_filter
415 )
417 features[i] = spec_row[0 : int(win_size / 2) + 1]
419 return numpy.array(features)
422def cepstral(
423 data,
424 rate,
425 *,
426 win_length_ms=20,
427 win_shift_ms=10,
428 n_filters=20,
429 f_min=0.0,
430 f_max=4000.0,
431 pre_emphasis_coef=1.0,
432 mel_scale=True,
433 n_ceps=19,
434 delta_win=2,
435 dct_norm=True,
436 with_energy=True,
437 with_delta=True,
438 with_delta_delta=True,
439):
441 #########################
442 # Initialisation part ##
443 #########################
445 win_length = int(rate * win_length_ms / 1000)
446 win_shift = int(rate * win_shift_ms / 1000)
447 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
449 # Hamming initialisation
450 hamming_kernel = init_hamming_kernel(win_length)
452 # Compute cut-off frequencies
453 p_index = init_freqfilter(
454 rate,
455 win_size,
456 mel_scale,
457 n_filters,
458 f_min,
459 f_max,
460 )
462 # Cosine transform initialisation
463 dct_kernel = init_dct_kernel(n_filters, n_ceps, dct_norm)
465 ######################################
466 # End of the Initialisation part ###
467 ######################################
469 ######################################
470 # Core code ###
471 ######################################
473 data_size = data.shape[0]
474 n_frames = int(1 + (data_size - win_length) / win_shift)
476 # create features set
477 dim0 = n_ceps
478 if with_energy:
479 dim0 += +1
480 dim = dim0
481 if with_delta:
482 dim += dim0
483 if with_delta_delta:
484 dim += dim0
485 else:
486 with_delta_delta = False
488 features = numpy.zeros([n_frames, dim], dtype=numpy.float64)
490 last_frame_elem = 0
491 # compute cepstral coefficients
492 for i in range(n_frames):
493 # create a frame
494 frame = numpy.zeros(win_size, dtype=numpy.float64)
495 vec = numpy.arange(win_length)
496 frame[vec] = data[vec + i * win_shift]
497 som = numpy.sum(frame)
498 som = som / win_size
499 frame[vec] -= som # normalization by mean here
501 if with_energy:
502 energy = sig_norm(win_length, frame, False)
504 # pre-emphasis filtering
505 frame_, last_frame_elem = pre_emphasis(
506 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem
507 )
508 frame[vec] = frame_
510 # Hamming windowing
511 frame = hamming_window(frame, hamming_kernel, win_length)
513 # FFT and filters
514 filters, _ = log_filter_bank(
515 frame, n_filters, p_index, win_size, energy_filter=False
516 )
518 # apply DCT
519 ceps = dct_transform(filters, n_filters, dct_kernel, n_ceps)
521 ######################################
522 # Deltas and Delta-Deltas ###
523 ######################################
525 d1 = n_ceps
526 if with_energy:
527 d1 = n_ceps + 1
528 ceps = numpy.append(ceps, energy)
530 # stock the results in features matrix
531 vec = numpy.arange(d1)
532 features[i][0:d1] = ceps[vec]
534 # compute Delta coefficient
535 if with_delta:
536 som = 0.0
537 for i in range(1, delta_win + 1):
538 som = som + i * i
539 som = som * 2
541 for i in range(n_frames):
542 for k in range(n_ceps):
543 features[i][d1 + k] = 0.0
544 for ll in range(1, delta_win + 1):
545 if i + ll < n_frames:
546 p_ind = i + ll
547 else:
548 p_ind = n_frames - 1
549 if i - ll > 0:
550 n_ind = i - ll
551 else:
552 n_ind = 0
553 features[i][d1 + k] = features[i][d1 + k] + ll * (
554 features[p_ind][k] - features[n_ind][k]
555 )
556 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore
558 # compute Delta of the Energy
559 if with_delta and with_energy:
560 som = 0.0
562 vec = numpy.arange(1, delta_win + 1)
563 som = 2.0 * numpy.sum(vec * vec)
565 for i in range(n_frames):
566 k = n_ceps
567 features[i][d1 + k] = 0.0
568 for ll in range(1, delta_win + 1):
569 if i + ll < n_frames:
570 p_ind = i + ll
571 else:
572 p_ind = n_frames - 1
573 if i - ll > 0:
574 n_ind = i - ll
575 else:
576 n_ind = 0
577 features[i][d1 + k] = features[i][d1 + k] + ll * (
578 features[p_ind][k] - features[n_ind][k]
579 )
580 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore
582 # compute Delta Delta of the coefficients
583 if with_delta_delta:
584 som = 0.0
585 for i in range(1, delta_win + 1):
586 som = som + i * i
587 som = som * 2
588 for i in range(n_frames):
589 for k in range(n_ceps):
590 features[i][2 * d1 + k] = 0.0
591 for ll in range(1, delta_win + 1):
592 if i + ll < n_frames:
593 p_ind = i + ll
594 else:
595 p_ind = n_frames - 1
596 if i - ll > 0:
597 n_ind = i - ll
598 else:
599 n_ind = 0
600 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * (
601 features[p_ind][d1 + k] - features[n_ind][d1 + k]
602 )
603 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore
605 # compute Delta Delta of the energy
606 if with_delta_delta and with_energy:
607 som = 0.0
608 for i in range(1, delta_win + 1):
609 som = som + i * i
610 som = som * 2
611 for i in range(n_frames):
612 k = n_ceps
613 features[i][2 * d1 + k] = 0.0
614 for ll in range(1, delta_win + 1):
615 if i + ll < n_frames:
616 p_ind = i + ll
617 else:
618 p_ind = n_frames - 1
619 if i - ll > 0:
620 n_ind = i - ll
621 else:
622 n_ind = 0
623 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * (
624 features[p_ind][d1 + k] - features[n_ind][d1 + k]
625 )
626 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore
628 return numpy.array(features)