Coverage for src/bob/bio/spear/annotator/mod_4hz.py: 98%
110 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-06 22:04 +0100
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-06 22:04 +0100
1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3# Elie Khoury <Elie.Khoury@idiap.ch>
4# Tue 9 Jun 16:56:01 CEST 2015
5#
6# Copyright (C) 2012-2015 Idiap Research Institute, Martigny, Switzerland
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, version 3 of the License.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
20"""{4Hz modulation energy and energy}-based voice activity detection for speaker recognition"""
22import logging
24import numpy
25import scipy.signal
27from bob.bio.base.annotator import Annotator
29from .. import audio_processing as ap
30from .. import utils
32logger = logging.getLogger(__name__)
35class Mod_4Hz(Annotator):
36 """VAD based on the modulation of the energy around 4 Hz and the energy"""
38 def __init__(
39 self,
40 max_iterations=10, # 10 iterations for the
41 convergence_threshold=0.0005,
42 variance_threshold=0.0005,
43 win_length_ms=20.0, # 20 ms
44 win_shift_ms=10.0, # 10 ms
45 smoothing_window=10, # 10 frames (i.e. 100 ms)
46 n_filters=40,
47 f_min=0.0, # 0 Hz
48 f_max=4000, # 4 KHz
49 pre_emphasis_coef=1.0,
50 ratio_threshold=0.1, # 0.1 of the maximum energy
51 **kwargs
52 ):
53 super().__init__(**kwargs)
54 self.max_iterations = max_iterations
55 self.convergence_threshold = convergence_threshold
56 self.variance_threshold = variance_threshold
57 self.win_length_ms = win_length_ms
58 self.win_shift_ms = win_shift_ms
59 self.smoothing_window = smoothing_window
60 self.n_filters = n_filters
61 self.f_min = f_min
62 self.f_max = f_max
63 self.pre_emphasis_coef = pre_emphasis_coef
64 self.ratio_threshold = ratio_threshold
66 def _voice_activity_detection(self, energy, mod_4hz):
68 n_samples = len(energy)
69 threshold = numpy.max(energy) - numpy.log(
70 (1.0 / self.ratio_threshold) * (1.0 / self.ratio_threshold)
71 )
72 labels = numpy.array(numpy.zeros(n_samples), dtype=numpy.int16)
73 # if energy does not change a lot, it's not audio maybe?
74 if numpy.std(energy) < 10e-5:
75 return labels * 0
77 for i in range(n_samples):
78 if energy[i] > threshold and mod_4hz[i] > 0.9:
79 labels[i] = 1
81 # If speech part less then 10 seconds and less than the half of the segment duration, try to find speech with more risk
82 if (
83 numpy.sum(labels) < 2000
84 and float(numpy.sum(labels)) / float(len(labels)) < 0.5
85 ):
86 # TRY WITH MORE RISK 1...
87 for i in range(n_samples):
88 if energy[i] > threshold and mod_4hz[i] > 0.5:
89 labels[i] = 1
91 if (
92 numpy.sum(labels) < 2000
93 and float(numpy.sum(labels)) / float(len(labels)) < 0.5
94 ):
95 # TRY WITH MORE RISK 2...
96 for i in range(n_samples):
97 if energy[i] > threshold and mod_4hz[i] > 0.2:
98 labels[i] = 1
100 if (
101 numpy.sum(labels) < 2000
102 and float(numpy.sum(labels)) / float(len(labels)) < 0.5
103 ): # This is special for short segments (less than 2s)...
104 # TRY WITH MORE RISK 3...
105 if (
106 (len(energy) < 200)
107 or (numpy.sum(labels) == 0)
108 or (numpy.mean(labels) < 0.025)
109 ):
110 for i in range(n_samples):
111 if energy[i] > threshold:
112 labels[i] = 1
113 return labels
115 def averaging(self, list_1s_shift):
116 len_list = len(list_1s_shift)
117 sample_level_value = numpy.array(numpy.zeros(len_list, dtype=float))
118 sample_level_value[0] = numpy.array(list_1s_shift[0])
119 for j in range(2, numpy.min([len_list, 100])):
120 sample_level_value[j - 1] = ((j - 1.0) / j) * sample_level_value[
121 j - 2
122 ] + (1.0 / j) * numpy.array(list_1s_shift[j - 1])
123 for j in range(numpy.min([len_list, 100]), len_list - 100 + 1):
124 sample_level_value[j - 1] = numpy.array(
125 numpy.mean(list_1s_shift[j - 100 : j])
126 )
127 sample_level_value[len_list - 1] = list_1s_shift[len_list - 1]
128 for j in range(2, numpy.min([len_list, 100]) + 1):
129 sample_level_value[len_list - j] = (
130 (j - 1.0) / j
131 ) * sample_level_value[len_list + 1 - j] + (1.0 / j) * numpy.array(
132 list_1s_shift[len_list - j]
133 )
134 return sample_level_value
136 def bandpass_firwin(self, ntaps, lowcut, highcut, fs, window="hamming"):
137 nyq = 0.5 * fs
138 taps = scipy.signal.firwin(
139 ntaps,
140 [lowcut, highcut],
141 nyq=nyq,
142 pass_zero=False,
143 window=window,
144 scale=True,
145 )
146 return taps
148 def pass_band_filtering(self, energy_bands, fs):
149 energy_bands = energy_bands.T
150 order = 8
151 Wo = 4.0
152 num_taps = self.bandpass_firwin(order + 1, (Wo - 0.5), (Wo + 0.5), fs)
153 res = scipy.signal.lfilter(num_taps, 1.0, energy_bands)
154 return res
156 def modulation_4hz(self, filtering_res, data, sample_rate):
157 fs = sample_rate
158 win_length = int(fs * self.win_length_ms / 1000)
159 win_shift = int(fs * self.win_shift_ms / 1000)
160 Energy = filtering_res.sum(axis=0)
161 mean_Energy = numpy.mean(Energy)
162 Energy = Energy / mean_Energy
164 # win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
165 n_frames = 1 + (data.shape[0] - win_length) // win_shift
166 range_modulation = int(fs / win_length) # This corresponds to 1 sec
167 res = numpy.zeros(n_frames)
168 if n_frames < range_modulation:
169 return res
170 for w in range(0, n_frames - range_modulation):
171 E_range = Energy[
172 w : w + range_modulation
173 ] # computes the modulation every 10 ms
174 if (E_range <= 0.0).any():
175 res[w] = 0
176 else:
177 res[w] = numpy.var(numpy.log(E_range))
178 res[n_frames - range_modulation : n_frames] = res[
179 n_frames - range_modulation - 1
180 ]
181 return res
183 def mod_4hz(self, data, sample_rate):
184 """Computes and returns the 4Hz modulation energy features for the given input wave file"""
186 energy_bands = ap.spectrogram(
187 data,
188 sample_rate,
189 win_length_ms=self.win_length_ms,
190 win_shift_ms=self.win_shift_ms,
191 n_filters=self.n_filters,
192 f_min=self.f_min,
193 f_max=self.f_max,
194 pre_emphasis_coef=self.pre_emphasis_coef,
195 energy_filter=True,
196 log_filter=False,
197 energy_bands=True,
198 )
199 filtering_res = self.pass_band_filtering(energy_bands, sample_rate)
200 mod_4hz = self.modulation_4hz(filtering_res, data, sample_rate)
201 mod_4hz = self.averaging(mod_4hz)
202 energy_array = ap.energy(
203 data,
204 sample_rate,
205 win_length_ms=self.win_length_ms,
206 win_shift_ms=self.win_shift_ms,
207 )
208 labels = self._voice_activity_detection(energy_array, mod_4hz)
209 labels = utils.smoothing(
210 labels, self.smoothing_window
211 ) # discard isolated speech less than 100ms
212 logger.info(
213 "After Mod-4Hz based VAD there are %d frames remaining over %d",
214 numpy.sum(labels),
215 len(labels),
216 )
217 return labels, energy_array, mod_4hz
219 def transform_one(self, data, sample_rate):
220 """labels speech (1) and non-speech (0) parts of the given input wave file using 4Hz modulation energy and energy
221 Input parameter:
222 * input_signal[0] --> rate
223 * input_signal[1] --> signal TODO doc
224 """
225 [labels, energy_array, mod_4hz] = self.mod_4hz(data, sample_rate)
226 if (labels == 0).all():
227 logger.warning("No Audio was detected in the sample!")
228 return None
230 return labels
232 def transform(
233 self, audio_signals: "list[numpy.ndarray]", sample_rates: "list[int]"
234 ):
235 results = []
236 for audio_signal, sample_rate in zip(audio_signals, sample_rates):
237 results.append(self.transform_one(audio_signal, sample_rate))
238 return results
240 def _more_tags(self):
241 return {
242 "requires_fit": False,
243 "bob_transform_extra_input": (("sample_rates", "rate"),),
244 "bob_output": "annotations",
245 }