Coverage for src/bob/bio/spear/annotator/mod_4hz.py: 98%
108 statements
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 22:07 +0100
« prev ^ index » next coverage.py v7.6.5, created at 2024-11-14 22:07 +0100
1#!/usr/bin/env python
2# vim: set fileencoding=utf-8 :
3# Elie Khoury <Elie.Khoury@idiap.ch>
4# Tue 9 Jun 16:56:01 CEST 2015
5#
6# Copyright (C) 2012-2015 Idiap Research Institute, Martigny, Switzerland
7#
8# This program is free software: you can redistribute it and/or modify
9# it under the terms of the GNU General Public License as published by
10# the Free Software Foundation, version 3 of the License.
11#
12# This program is distributed in the hope that it will be useful,
13# but WITHOUT ANY WARRANTY; without even the implied warranty of
14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15# GNU General Public License for more details.
16#
17# You should have received a copy of the GNU General Public License
18# along with this program. If not, see <http://www.gnu.org/licenses/>.
20"""{4Hz modulation energy and energy}-based voice activity detection for speaker recognition"""
22import logging
24import numpy
25import scipy.signal
27from bob.bio.base.annotator import Annotator
29from .. import audio_processing as ap
30from .. import utils
32logger = logging.getLogger(__name__)
35class Mod_4Hz(Annotator):
36 """VAD based on the modulation of the energy around 4 Hz and the energy"""
38 def __init__(
39 self,
40 max_iterations=10, # 10 iterations for the
41 convergence_threshold=0.0005,
42 variance_threshold=0.0005,
43 win_length_ms=20.0, # 20 ms
44 win_shift_ms=10.0, # 10 ms
45 smoothing_window=10, # 10 frames (i.e. 100 ms)
46 n_filters=40,
47 f_min=0.0, # 0 Hz
48 f_max=4000, # 4 KHz
49 pre_emphasis_coef=1.0,
50 ratio_threshold=0.1, # 0.1 of the maximum energy
51 **kwargs
52 ):
53 super().__init__(**kwargs)
54 self.max_iterations = max_iterations
55 self.convergence_threshold = convergence_threshold
56 self.variance_threshold = variance_threshold
57 self.win_length_ms = win_length_ms
58 self.win_shift_ms = win_shift_ms
59 self.smoothing_window = smoothing_window
60 self.n_filters = n_filters
61 self.f_min = f_min
62 self.f_max = f_max
63 self.pre_emphasis_coef = pre_emphasis_coef
64 self.ratio_threshold = ratio_threshold
66 def _voice_activity_detection(self, energy, mod_4hz):
68 n_samples = len(energy)
69 threshold = numpy.max(energy) - numpy.log(
70 (1.0 / self.ratio_threshold) * (1.0 / self.ratio_threshold)
71 )
72 labels = numpy.array(numpy.zeros(n_samples), dtype=numpy.int16)
73 # if energy does not change a lot, it's not audio maybe?
74 if numpy.std(energy) < 10e-5:
75 return labels * 0
77 for i in range(n_samples):
78 if energy[i] > threshold and mod_4hz[i] > 0.9:
79 labels[i] = 1
81 # If speech part less then 10 seconds and less than the half of the segment duration, try to find speech with more risk
82 if (
83 numpy.sum(labels) < 2000
84 and float(numpy.sum(labels)) / float(len(labels)) < 0.5
85 ):
86 # TRY WITH MORE RISK 1...
87 for i in range(n_samples):
88 if energy[i] > threshold and mod_4hz[i] > 0.5:
89 labels[i] = 1
91 if (
92 numpy.sum(labels) < 2000
93 and float(numpy.sum(labels)) / float(len(labels)) < 0.5
94 ):
95 # TRY WITH MORE RISK 2...
96 for i in range(n_samples):
97 if energy[i] > threshold and mod_4hz[i] > 0.2:
98 labels[i] = 1
100 if (
101 numpy.sum(labels) < 2000
102 and float(numpy.sum(labels)) / float(len(labels)) < 0.5
103 ): # This is special for short segments (less than 2s)...
104 # TRY WITH MORE RISK 3...
105 if (
106 (len(energy) < 200)
107 or (numpy.sum(labels) == 0)
108 or (numpy.mean(labels) < 0.025)
109 ):
110 for i in range(n_samples):
111 if energy[i] > threshold:
112 labels[i] = 1
113 return labels
115 def averaging(self, list_1s_shift):
116 len_list = len(list_1s_shift)
117 sample_level_value = numpy.array(numpy.zeros(len_list, dtype=float))
118 sample_level_value[0] = numpy.array(list_1s_shift[0])
119 for j in range(2, numpy.min([len_list, 100])):
120 sample_level_value[j - 1] = ((j - 1.0) / j) * sample_level_value[
121 j - 2
122 ] + (1.0 / j) * numpy.array(list_1s_shift[j - 1])
123 for j in range(numpy.min([len_list, 100]), len_list - 100 + 1):
124 sample_level_value[j - 1] = numpy.array(
125 numpy.mean(list_1s_shift[j - 100 : j])
126 )
127 sample_level_value[len_list - 1] = list_1s_shift[len_list - 1]
128 for j in range(2, numpy.min([len_list, 100]) + 1):
129 sample_level_value[len_list - j] = (
130 (j - 1.0) / j
131 ) * sample_level_value[len_list + 1 - j] + (1.0 / j) * numpy.array(
132 list_1s_shift[len_list - j]
133 )
134 return sample_level_value
136 def bandpass_firwin(self, ntaps, lowcut, highcut, fs, window="hamming"):
137 taps = scipy.signal.firwin(
138 ntaps,
139 [lowcut, highcut],
140 fs=fs,
141 pass_zero=False,
142 window=window,
143 scale=True,
144 )
145 return taps
147 def pass_band_filtering(self, energy_bands, fs):
148 energy_bands = energy_bands.T
149 order = 8
150 Wo = 4.0
151 num_taps = self.bandpass_firwin(order + 1, (Wo - 0.5), (Wo + 0.5), fs)
152 res = scipy.signal.lfilter(num_taps, 1.0, energy_bands)
153 return res
155 def modulation_4hz(self, filtering_res, data, sample_rate):
156 fs = sample_rate
157 win_length = int(fs * self.win_length_ms / 1000)
158 win_shift = int(fs * self.win_shift_ms / 1000)
159 Energy = filtering_res.sum(axis=0)
160 mean_Energy = numpy.mean(Energy)
161 Energy = Energy / mean_Energy
163 # win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2)))
164 n_frames = 1 + (data.shape[0] - win_length) // win_shift
165 range_modulation = int(fs / win_length) # This corresponds to 1 sec
166 res = numpy.zeros(n_frames)
167 if n_frames < range_modulation:
168 return res
169 for w in range(0, n_frames - range_modulation):
170 E_range = Energy[
171 w : w + range_modulation
172 ] # computes the modulation every 10 ms
173 if (E_range <= 0.0).any():
174 res[w] = 0
175 else:
176 res[w] = numpy.var(numpy.log(E_range))
177 res[n_frames - range_modulation : n_frames] = res[
178 n_frames - range_modulation - 1
179 ]
180 return res
182 def mod_4hz(self, data, sample_rate):
183 """Computes and returns the 4Hz modulation energy features for the given input wave file"""
185 energy_bands = ap.spectrogram(
186 data,
187 sample_rate,
188 win_length_ms=self.win_length_ms,
189 win_shift_ms=self.win_shift_ms,
190 n_filters=self.n_filters,
191 f_min=self.f_min,
192 f_max=self.f_max,
193 pre_emphasis_coef=self.pre_emphasis_coef,
194 energy_filter=True,
195 log_filter=False,
196 energy_bands=True,
197 )
198 filtering_res = self.pass_band_filtering(energy_bands, sample_rate)
199 mod_4hz = self.modulation_4hz(filtering_res, data, sample_rate)
200 mod_4hz = self.averaging(mod_4hz)
201 energy_array = ap.energy(
202 data,
203 sample_rate,
204 win_length_ms=self.win_length_ms,
205 win_shift_ms=self.win_shift_ms,
206 )
207 labels = self._voice_activity_detection(energy_array, mod_4hz)
208 labels = utils.smoothing(
209 labels, self.smoothing_window
210 ) # discard isolated speech less than 100ms
211 logger.info(
212 "After Mod-4Hz based VAD there are %d frames remaining over %d",
213 numpy.sum(labels),
214 len(labels),
215 )
216 return labels, energy_array, mod_4hz
218 def transform_one(self, data, sample_rate):
219 """labels speech (1) and non-speech (0) parts of the given input wave file using 4Hz modulation energy and energy
220 Input parameter:
221 * input_signal[0] --> rate
222 * input_signal[1] --> signal TODO doc
223 """
224 [labels, energy_array, mod_4hz] = self.mod_4hz(data, sample_rate)
225 if (labels == 0).all():
226 logger.warning("No Audio was detected in the sample!")
227 return None
229 return labels
231 def transform(
232 self, audio_signals: "list[numpy.ndarray]", sample_rates: "list[int]"
233 ):
234 results = []
235 for audio_signal, sample_rate in zip(audio_signals, sample_rates):
236 results.append(self.transform_one(audio_signal, sample_rate))
237 return results
239 def _more_tags(self):
240 return {
241 "requires_fit": False,
242 "bob_transform_extra_input": (("sample_rates", "rate"),),
243 "bob_output": "annotations",
244 }