Coverage for src/bob/bio/spear/annotator/mod_4hz.py: 98%

110 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-06 22:04 +0100

1#!/usr/bin/env python 

2# vim: set fileencoding=utf-8 : 

3# Elie Khoury <Elie.Khoury@idiap.ch> 

4# Tue 9 Jun 16:56:01 CEST 2015 

5# 

6# Copyright (C) 2012-2015 Idiap Research Institute, Martigny, Switzerland 

7# 

8# This program is free software: you can redistribute it and/or modify 

9# it under the terms of the GNU General Public License as published by 

10# the Free Software Foundation, version 3 of the License. 

11# 

12# This program is distributed in the hope that it will be useful, 

13# but WITHOUT ANY WARRANTY; without even the implied warranty of 

14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

15# GNU General Public License for more details. 

16# 

17# You should have received a copy of the GNU General Public License 

18# along with this program. If not, see <http://www.gnu.org/licenses/>. 

19 

20"""{4Hz modulation energy and energy}-based voice activity detection for speaker recognition""" 

21 

22import logging 

23 

24import numpy 

25import scipy.signal 

26 

27from bob.bio.base.annotator import Annotator 

28 

29from .. import audio_processing as ap 

30from .. import utils 

31 

32logger = logging.getLogger(__name__) 

33 

34 

35class Mod_4Hz(Annotator): 

36 """VAD based on the modulation of the energy around 4 Hz and the energy""" 

37 

38 def __init__( 

39 self, 

40 max_iterations=10, # 10 iterations for the 

41 convergence_threshold=0.0005, 

42 variance_threshold=0.0005, 

43 win_length_ms=20.0, # 20 ms 

44 win_shift_ms=10.0, # 10 ms 

45 smoothing_window=10, # 10 frames (i.e. 100 ms) 

46 n_filters=40, 

47 f_min=0.0, # 0 Hz 

48 f_max=4000, # 4 KHz 

49 pre_emphasis_coef=1.0, 

50 ratio_threshold=0.1, # 0.1 of the maximum energy 

51 **kwargs 

52 ): 

53 super().__init__(**kwargs) 

54 self.max_iterations = max_iterations 

55 self.convergence_threshold = convergence_threshold 

56 self.variance_threshold = variance_threshold 

57 self.win_length_ms = win_length_ms 

58 self.win_shift_ms = win_shift_ms 

59 self.smoothing_window = smoothing_window 

60 self.n_filters = n_filters 

61 self.f_min = f_min 

62 self.f_max = f_max 

63 self.pre_emphasis_coef = pre_emphasis_coef 

64 self.ratio_threshold = ratio_threshold 

65 

66 def _voice_activity_detection(self, energy, mod_4hz): 

67 

68 n_samples = len(energy) 

69 threshold = numpy.max(energy) - numpy.log( 

70 (1.0 / self.ratio_threshold) * (1.0 / self.ratio_threshold) 

71 ) 

72 labels = numpy.array(numpy.zeros(n_samples), dtype=numpy.int16) 

73 # if energy does not change a lot, it's not audio maybe? 

74 if numpy.std(energy) < 10e-5: 

75 return labels * 0 

76 

77 for i in range(n_samples): 

78 if energy[i] > threshold and mod_4hz[i] > 0.9: 

79 labels[i] = 1 

80 

81 # If speech part less then 10 seconds and less than the half of the segment duration, try to find speech with more risk 

82 if ( 

83 numpy.sum(labels) < 2000 

84 and float(numpy.sum(labels)) / float(len(labels)) < 0.5 

85 ): 

86 # TRY WITH MORE RISK 1... 

87 for i in range(n_samples): 

88 if energy[i] > threshold and mod_4hz[i] > 0.5: 

89 labels[i] = 1 

90 

91 if ( 

92 numpy.sum(labels) < 2000 

93 and float(numpy.sum(labels)) / float(len(labels)) < 0.5 

94 ): 

95 # TRY WITH MORE RISK 2... 

96 for i in range(n_samples): 

97 if energy[i] > threshold and mod_4hz[i] > 0.2: 

98 labels[i] = 1 

99 

100 if ( 

101 numpy.sum(labels) < 2000 

102 and float(numpy.sum(labels)) / float(len(labels)) < 0.5 

103 ): # This is special for short segments (less than 2s)... 

104 # TRY WITH MORE RISK 3... 

105 if ( 

106 (len(energy) < 200) 

107 or (numpy.sum(labels) == 0) 

108 or (numpy.mean(labels) < 0.025) 

109 ): 

110 for i in range(n_samples): 

111 if energy[i] > threshold: 

112 labels[i] = 1 

113 return labels 

114 

115 def averaging(self, list_1s_shift): 

116 len_list = len(list_1s_shift) 

117 sample_level_value = numpy.array(numpy.zeros(len_list, dtype=float)) 

118 sample_level_value[0] = numpy.array(list_1s_shift[0]) 

119 for j in range(2, numpy.min([len_list, 100])): 

120 sample_level_value[j - 1] = ((j - 1.0) / j) * sample_level_value[ 

121 j - 2 

122 ] + (1.0 / j) * numpy.array(list_1s_shift[j - 1]) 

123 for j in range(numpy.min([len_list, 100]), len_list - 100 + 1): 

124 sample_level_value[j - 1] = numpy.array( 

125 numpy.mean(list_1s_shift[j - 100 : j]) 

126 ) 

127 sample_level_value[len_list - 1] = list_1s_shift[len_list - 1] 

128 for j in range(2, numpy.min([len_list, 100]) + 1): 

129 sample_level_value[len_list - j] = ( 

130 (j - 1.0) / j 

131 ) * sample_level_value[len_list + 1 - j] + (1.0 / j) * numpy.array( 

132 list_1s_shift[len_list - j] 

133 ) 

134 return sample_level_value 

135 

136 def bandpass_firwin(self, ntaps, lowcut, highcut, fs, window="hamming"): 

137 nyq = 0.5 * fs 

138 taps = scipy.signal.firwin( 

139 ntaps, 

140 [lowcut, highcut], 

141 nyq=nyq, 

142 pass_zero=False, 

143 window=window, 

144 scale=True, 

145 ) 

146 return taps 

147 

148 def pass_band_filtering(self, energy_bands, fs): 

149 energy_bands = energy_bands.T 

150 order = 8 

151 Wo = 4.0 

152 num_taps = self.bandpass_firwin(order + 1, (Wo - 0.5), (Wo + 0.5), fs) 

153 res = scipy.signal.lfilter(num_taps, 1.0, energy_bands) 

154 return res 

155 

156 def modulation_4hz(self, filtering_res, data, sample_rate): 

157 fs = sample_rate 

158 win_length = int(fs * self.win_length_ms / 1000) 

159 win_shift = int(fs * self.win_shift_ms / 1000) 

160 Energy = filtering_res.sum(axis=0) 

161 mean_Energy = numpy.mean(Energy) 

162 Energy = Energy / mean_Energy 

163 

164 # win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

165 n_frames = 1 + (data.shape[0] - win_length) // win_shift 

166 range_modulation = int(fs / win_length) # This corresponds to 1 sec 

167 res = numpy.zeros(n_frames) 

168 if n_frames < range_modulation: 

169 return res 

170 for w in range(0, n_frames - range_modulation): 

171 E_range = Energy[ 

172 w : w + range_modulation 

173 ] # computes the modulation every 10 ms 

174 if (E_range <= 0.0).any(): 

175 res[w] = 0 

176 else: 

177 res[w] = numpy.var(numpy.log(E_range)) 

178 res[n_frames - range_modulation : n_frames] = res[ 

179 n_frames - range_modulation - 1 

180 ] 

181 return res 

182 

183 def mod_4hz(self, data, sample_rate): 

184 """Computes and returns the 4Hz modulation energy features for the given input wave file""" 

185 

186 energy_bands = ap.spectrogram( 

187 data, 

188 sample_rate, 

189 win_length_ms=self.win_length_ms, 

190 win_shift_ms=self.win_shift_ms, 

191 n_filters=self.n_filters, 

192 f_min=self.f_min, 

193 f_max=self.f_max, 

194 pre_emphasis_coef=self.pre_emphasis_coef, 

195 energy_filter=True, 

196 log_filter=False, 

197 energy_bands=True, 

198 ) 

199 filtering_res = self.pass_band_filtering(energy_bands, sample_rate) 

200 mod_4hz = self.modulation_4hz(filtering_res, data, sample_rate) 

201 mod_4hz = self.averaging(mod_4hz) 

202 energy_array = ap.energy( 

203 data, 

204 sample_rate, 

205 win_length_ms=self.win_length_ms, 

206 win_shift_ms=self.win_shift_ms, 

207 ) 

208 labels = self._voice_activity_detection(energy_array, mod_4hz) 

209 labels = utils.smoothing( 

210 labels, self.smoothing_window 

211 ) # discard isolated speech less than 100ms 

212 logger.info( 

213 "After Mod-4Hz based VAD there are %d frames remaining over %d", 

214 numpy.sum(labels), 

215 len(labels), 

216 ) 

217 return labels, energy_array, mod_4hz 

218 

219 def transform_one(self, data, sample_rate): 

220 """labels speech (1) and non-speech (0) parts of the given input wave file using 4Hz modulation energy and energy 

221 Input parameter: 

222 * input_signal[0] --> rate 

223 * input_signal[1] --> signal TODO doc 

224 """ 

225 [labels, energy_array, mod_4hz] = self.mod_4hz(data, sample_rate) 

226 if (labels == 0).all(): 

227 logger.warning("No Audio was detected in the sample!") 

228 return None 

229 

230 return labels 

231 

232 def transform( 

233 self, audio_signals: "list[numpy.ndarray]", sample_rates: "list[int]" 

234 ): 

235 results = [] 

236 for audio_signal, sample_rate in zip(audio_signals, sample_rates): 

237 results.append(self.transform_one(audio_signal, sample_rate)) 

238 return results 

239 

240 def _more_tags(self): 

241 return { 

242 "requires_fit": False, 

243 "bob_transform_extra_input": (("sample_rates", "rate"),), 

244 "bob_output": "annotations", 

245 }