Coverage for src/bob/bio/spear/audio_processing.py: 95%

273 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-12 22:52 +0200

1#!/usr/bin/env python 

2# vim: set fileencoding=utf-8 : 

3# Elie Khoury <Elie.Khoury@idiap.ch> 

4# Andre Anjos <andre.anjos@idiap.ch> 

5# Pavel Korshunov <Pavel.Korshunov@idiap.ch> 

6# Amir Mohammadi <amir.mohammadi@idiap.ch> 

7 

8import logging 

9import math 

10import sys 

11 

12from typing import Optional, Tuple, Union 

13 

14import numpy 

15import torch 

16import torchaudio 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21def fft(src, dst=None): 

22 out = numpy.fft.fft(src) 

23 if dst is not None: 

24 dst[:] = out 

25 return out 

26 

27 

28def init_hamming_kernel(win_length): 

29 # Hamming initialization 

30 cst = 2 * math.pi / (win_length - 1.0) 

31 hamming_kernel = numpy.zeros(win_length) 

32 

33 for i in range(win_length): 

34 hamming_kernel[i] = 0.54 - 0.46 * math.cos(i * cst) 

35 return hamming_kernel 

36 

37 

38def init_freqfilter(rate, win_size, mel_scale, n_filters, f_min, f_max): 

39 # Compute cut-off frequencies 

40 p_index = numpy.array(numpy.zeros(n_filters + 2), dtype=numpy.float64) 

41 if mel_scale: 

42 # Mel scale 

43 m_max = mel_python(f_max) 

44 m_min = mel_python(f_min) 

45 

46 for i in range(n_filters + 2): 

47 alpha = float(i) / (n_filters + 1) 

48 f = mel_inv_python(m_min * (1 - alpha) + m_max * alpha) 

49 factor = float(f) / rate 

50 p_index[i] = win_size * factor 

51 else: 

52 # linear scale 

53 for i in range(n_filters + 2): 

54 alpha = float(i) / (n_filters + 1) 

55 f = f_min * (1.0 - alpha) + f_max * alpha 

56 p_index[i] = float(win_size) / rate * f 

57 return p_index 

58 

59 

60def init_dct_kernel(n_filters, n_ceps, dct_norm): 

61 dct_kernel = numpy.zeros([n_ceps, n_filters], dtype=numpy.float64) 

62 

63 dct_coeff = 1.0 

64 if dct_norm: 

65 dct_coeff = math.sqrt(2.0 / n_filters) 

66 

67 for i in range(0, n_ceps): 

68 for j in range(0, n_filters): 

69 dct_kernel[i][j] = dct_coeff * math.cos( 

70 math.pi * i * (j + 0.5) / float(n_filters) 

71 ) 

72 

73 if dct_norm: 

74 column_multiplier = numpy.ones(n_ceps, dtype=numpy.float64) 

75 column_multiplier[0] = math.sqrt( 

76 0.5 

77 ) # first element sqrt(0.5), the rest are 1. 

78 for j in range(0, n_filters): 

79 dct_kernel[:, j] = column_multiplier * dct_kernel[:, j] 

80 

81 return dct_kernel 

82 

83 

84def resample( 

85 audio: Union[numpy.ndarray, torch.Tensor], 

86 rate: int, 

87 new_rate: int, 

88 **kwargs, 

89) -> Union[numpy.ndarray, torch.Tensor]: 

90 """Resamples the audio to a new sample rate. 

91 

92 Parameters 

93 ---------- 

94 audio: 

95 The audio to resample. 

96 rate: 

97 The original sample rate of the audio. 

98 new_rate: 

99 The wanted sample rate of the output audio. 

100 kwargs: 

101 Arguments passed to :py:class:``torchaudio.transforms.Resample``. 

102 """ 

103 

104 if rate == new_rate: 

105 return audio 

106 

107 was_numpy = False 

108 if isinstance(audio, numpy.ndarray): 

109 audio = torch.from_numpy(audio) 

110 was_numpy = True 

111 

112 resampler = torchaudio.transforms.Resample( 

113 rate, new_rate, dtype=torch.float32, **kwargs 

114 ) 

115 audio = resampler(audio) 

116 

117 return audio.numpy() if was_numpy else audio 

118 

119 

120def read( 

121 filename: str, 

122 channel: Optional[int] = None, 

123 force_sample_rate: Optional[int] = None, 

124) -> Tuple[numpy.ndarray, int]: 

125 """Reads audio file and returns the signal and the sampling rate 

126 

127 Parameters 

128 ---------- 

129 filename: 

130 The full path to the audio file to load. 

131 channel: 

132 The channel to load. If None, all channels will be loaded and returned in a 2D 

133 array in the shape (n_channels, n_samples). 

134 force_sample_rate: 

135 If specified, the audio will be resampled to the specified rate. Otherwise, the 

136 sample rate of the file will be used. 

137 

138 Returns 

139 ------- 

140 signal and sampling rate 

141 The signal in int16 range (-32768 to 32767) and float32 format, and the 

142 sampling rate in Hz. 

143 """ 

144 

145 data, rate = torchaudio.load(str(filename)) 

146 

147 if channel is not None: 

148 data = data[channel] 

149 else: 

150 if data.ndim > 1: 

151 data = data[0] 

152 

153 if force_sample_rate is not None: 

154 data = resample(data, rate, force_sample_rate) 

155 rate = force_sample_rate 

156 

157 # Expected data is in float32 format and int16 range (-32768. to 32767.) 

158 data = data.numpy().astype(numpy.float32) * 32768 

159 

160 return data, rate 

161 

162 

163def audio_info(filename: str): 

164 """Returns the audio info of a file. 

165 

166 Parameters 

167 ---------- 

168 filename: 

169 The full path to the audio file to load. 

170 

171 Returns 

172 ------- 

173 info: torchaudio.backend.common.AudioMetaData 

174 A dictionary containing the audio information. 

175 """ 

176 

177 return torchaudio.info(str(filename)) 

178 

179 

180def compare(v1, v2, width): 

181 return abs(v1 - v2) <= width 

182 

183 

184def mel_python(f): 

185 return 2595.0 * math.log10(1.0 + f / 700.0) 

186 

187 

188def mel_inv_python(value): 

189 return 700.0 * (10 ** (value / 2595.0) - 1) 

190 

191 

192def sig_norm(win_length, frame, flag): 

193 gain = 0.0 

194 for i in range(win_length): 

195 gain = gain + frame[i] * frame[i] 

196 

197 ENERGY_FLOOR = 1.0 

198 if gain < ENERGY_FLOOR: 

199 gain = math.log(ENERGY_FLOOR) 

200 else: 

201 gain = math.log(gain) 

202 

203 if flag and gain != 0.0: 

204 for i in range(win_length): 

205 frame[i] = frame[i] / gain 

206 return gain 

207 

208 

209def pre_emphasis(frame, win_shift, coef, last_frame_elem): 

210 

211 if (coef <= 0.0) or (coef > 1.0): 

212 print("Error: The emphasis coeff. should be between 0 and 1") 

213 return None 

214 

215 last_element = frame[win_shift - 1] 

216 return ( 

217 numpy.append( 

218 frame[0] - coef * last_frame_elem, frame[1:] - coef * frame[:-1] 

219 ), 

220 last_element, 

221 ) 

222 

223 

224def hamming_window(vector, hamming_kernel, win_length): 

225 for i in range(win_length): 

226 vector[i] = vector[i] * hamming_kernel[i] 

227 return vector 

228 

229 

230def log_filter_bank(frame, n_filters, p_index, win_size, energy_filter): 

231 x1 = numpy.array(frame, dtype=numpy.complex128) 

232 complex_ = fft(x1) 

233 abscomplex = numpy.absolute(complex_) 

234 half_frame = abscomplex[0 : int(win_size / 2) + 1] 

235 

236 if energy_filter: 

237 # Energy is basically magnitude in power of 2 

238 half_frame = half_frame**2 

239 

240 frame[0 : int(win_size / 2) + 1] = half_frame 

241 filters = log_triangular_bank(frame, n_filters, p_index) 

242 return filters, frame 

243 

244 

245def log_triangular_bank(data, n_filters, p_index): 

246 res_ = numpy.zeros(n_filters, dtype=numpy.float64) 

247 

248 denominator = 1.0 / ( 

249 p_index[1 : n_filters + 2] - p_index[0 : n_filters + 1] 

250 ) 

251 

252 for i in range(0, n_filters): 

253 li = int(math.floor(p_index[i] + 1)) 

254 mi = int(math.floor(p_index[i + 1])) 

255 ri = int(math.floor(p_index[i + 2])) 

256 if i == 0 or li == ri: 

257 li -= 1 

258 

259 vec_left = numpy.arange(li, mi + 1) 

260 vec_right = numpy.arange(mi + 1, ri + 1) 

261 res_[i] = numpy.sum( 

262 data[vec_left] * denominator[i] * (vec_left - p_index[i]) 

263 ) + numpy.sum( 

264 data[vec_right] * denominator[i + 1] * (p_index[i + 2] - vec_right) 

265 ) 

266 # alternative but equivalent implementation: 

267 # filt = numpy.zeros(ri-li+1, dtype=numpy.float64) 

268 # filt_l = denominator[i] * (vec_left-p_index[i]) 

269 # filt_p = denominator[i+1] * (p_index[i+2]-vec_right) 

270 # filt = numpy.append(filt_l, filt_p) 

271 # vect_full = numpy.arange(li, ri+1) 

272 # res_[i] = numpy.sum(data[vect_full] * filt) 

273 

274 FBANK_OUT_FLOOR = sys.float_info.epsilon 

275 return numpy.log(numpy.where(res_ < FBANK_OUT_FLOOR, FBANK_OUT_FLOOR, res_)) 

276 

277 

278def dct_transform(filters, n_filters, dct_kernel, n_ceps): 

279 

280 ceps = numpy.zeros(n_ceps) 

281 vec = numpy.array(range(0, n_filters)) 

282 for i in range(0, n_ceps): 

283 ceps[i] = numpy.sum(filters[vec] * dct_kernel[i]) 

284 

285 return ceps 

286 

287 

288def energy(data, rate, *, win_length_ms=20.0, win_shift_ms=10.0): 

289 

290 ######################### 

291 # Initialisation part ## 

292 ######################### 

293 

294 win_length = int(rate * win_length_ms / 1000) 

295 win_shift = int(rate * win_shift_ms / 1000) 

296 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

297 

298 ###################################### 

299 # End of the Initialisation part ### 

300 ###################################### 

301 

302 ###################################### 

303 # Core code ### 

304 ###################################### 

305 

306 data_size = data.shape[0] 

307 n_frames = int(1 + (data_size - win_length) / win_shift) 

308 

309 # create features set 

310 

311 features = [0 for j in range(n_frames)] 

312 

313 # compute cepstral coefficients 

314 for i in range(n_frames): 

315 # create a frame 

316 frame = numpy.zeros(win_size, dtype=numpy.float64) 

317 vec = numpy.arange(win_length) 

318 frame[vec] = data[vec + i * win_shift] 

319 som = numpy.sum(frame) 

320 som = som / win_size 

321 frame[vec] -= som # normalization by mean here 

322 

323 energy = sig_norm(win_length, frame, False) 

324 features[i] = energy 

325 

326 return numpy.array(features) 

327 

328 

329def spectrogram( 

330 data, 

331 rate, 

332 *, 

333 win_length_ms=20.0, 

334 win_shift_ms=10.0, 

335 n_filters=24, 

336 f_min=0.0, 

337 f_max=4000.0, 

338 pre_emphasis_coef=0.95, 

339 mel_scale=True, 

340 energy_filter=False, 

341 log_filter=True, 

342 energy_bands=False, 

343): 

344 ######################### 

345 # Initialisation part ## 

346 ######################### 

347 

348 win_length = int(rate * win_length_ms / 1000) 

349 win_shift = int(rate * win_shift_ms / 1000) 

350 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

351 

352 # Hamming initialisation 

353 hamming_kernel = init_hamming_kernel(win_length) 

354 

355 # Compute cut-off frequencies 

356 p_index = init_freqfilter( 

357 rate, win_size, mel_scale, n_filters, f_min, f_max 

358 ) 

359 

360 ###################################### 

361 # End of the Initialisation part ### 

362 ###################################### 

363 

364 ###################################### 

365 # Core code ### 

366 ###################################### 

367 

368 data_size = data.shape[0] 

369 n_frames = int(1 + (data_size - win_length) / win_shift) 

370 

371 # create features set 

372 features = numpy.zeros( 

373 [n_frames, int(win_size / 2) + 1], dtype=numpy.float64 

374 ) 

375 

376 last_frame_elem = 0 

377 # compute cepstral coefficients 

378 for i in range(n_frames): 

379 # create a frame 

380 frame = numpy.zeros(win_size, dtype=numpy.float64) 

381 vec = numpy.arange(win_length) 

382 frame[vec] = data[vec + i * win_shift] 

383 som = numpy.sum(frame) 

384 som = som / win_size 

385 frame[vec] -= som # normalization by mean here 

386 

387 frame_, last_frame_elem = pre_emphasis( 

388 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem 

389 ) 

390 frame[vec] = frame_ 

391 

392 # Hamming windowing 

393 frame = hamming_window(frame, hamming_kernel, win_length) 

394 

395 _, spec_row = log_filter_bank( 

396 frame, n_filters, p_index, win_size, energy_filter 

397 ) 

398 

399 features[i] = spec_row[0 : int(win_size / 2) + 1] 

400 

401 return numpy.array(features) 

402 

403 

404def cepstral( 

405 data, 

406 rate, 

407 *, 

408 win_length_ms=20, 

409 win_shift_ms=10, 

410 n_filters=20, 

411 f_min=0.0, 

412 f_max=4000.0, 

413 pre_emphasis_coef=1.0, 

414 mel_scale=True, 

415 n_ceps=19, 

416 delta_win=2, 

417 dct_norm=True, 

418 with_energy=True, 

419 with_delta=True, 

420 with_delta_delta=True, 

421): 

422 

423 ######################### 

424 # Initialisation part ## 

425 ######################### 

426 

427 win_length = int(rate * win_length_ms / 1000) 

428 win_shift = int(rate * win_shift_ms / 1000) 

429 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

430 

431 # Hamming initialisation 

432 hamming_kernel = init_hamming_kernel(win_length) 

433 

434 # Compute cut-off frequencies 

435 p_index = init_freqfilter( 

436 rate, 

437 win_size, 

438 mel_scale, 

439 n_filters, 

440 f_min, 

441 f_max, 

442 ) 

443 

444 # Cosine transform initialisation 

445 dct_kernel = init_dct_kernel(n_filters, n_ceps, dct_norm) 

446 

447 ###################################### 

448 # End of the Initialisation part ### 

449 ###################################### 

450 

451 ###################################### 

452 # Core code ### 

453 ###################################### 

454 

455 data_size = data.shape[0] 

456 n_frames = int(1 + (data_size - win_length) / win_shift) 

457 

458 # create features set 

459 dim0 = n_ceps 

460 if with_energy: 

461 dim0 += +1 

462 dim = dim0 

463 if with_delta: 

464 dim += dim0 

465 if with_delta_delta: 

466 dim += dim0 

467 else: 

468 with_delta_delta = False 

469 

470 features = numpy.zeros([n_frames, dim], dtype=numpy.float64) 

471 

472 last_frame_elem = 0 

473 # compute cepstral coefficients 

474 for i in range(n_frames): 

475 # create a frame 

476 frame = numpy.zeros(win_size, dtype=numpy.float64) 

477 vec = numpy.arange(win_length) 

478 frame[vec] = data[vec + i * win_shift] 

479 som = numpy.sum(frame) 

480 som = som / win_size 

481 frame[vec] -= som # normalization by mean here 

482 

483 if with_energy: 

484 energy = sig_norm(win_length, frame, False) 

485 

486 # pre-emphasis filtering 

487 frame_, last_frame_elem = pre_emphasis( 

488 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem 

489 ) 

490 frame[vec] = frame_ 

491 

492 # Hamming windowing 

493 frame = hamming_window(frame, hamming_kernel, win_length) 

494 

495 # FFT and filters 

496 filters, _ = log_filter_bank( 

497 frame, n_filters, p_index, win_size, energy_filter=False 

498 ) 

499 

500 # apply DCT 

501 ceps = dct_transform(filters, n_filters, dct_kernel, n_ceps) 

502 

503 ###################################### 

504 # Deltas and Delta-Deltas ### 

505 ###################################### 

506 

507 d1 = n_ceps 

508 if with_energy: 

509 d1 = n_ceps + 1 

510 ceps = numpy.append(ceps, energy) 

511 

512 # stock the results in features matrix 

513 vec = numpy.arange(d1) 

514 features[i][0:d1] = ceps[vec] 

515 

516 # compute Delta coefficient 

517 if with_delta: 

518 som = 0.0 

519 for i in range(1, delta_win + 1): 

520 som = som + i * i 

521 som = som * 2 

522 

523 for i in range(n_frames): 

524 for k in range(n_ceps): 

525 features[i][d1 + k] = 0.0 

526 for ll in range(1, delta_win + 1): 

527 if i + ll < n_frames: 

528 p_ind = i + ll 

529 else: 

530 p_ind = n_frames - 1 

531 if i - ll > 0: 

532 n_ind = i - ll 

533 else: 

534 n_ind = 0 

535 features[i][d1 + k] = features[i][d1 + k] + ll * ( 

536 features[p_ind][k] - features[n_ind][k] 

537 ) 

538 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore 

539 

540 # compute Delta of the Energy 

541 if with_delta and with_energy: 

542 som = 0.0 

543 

544 vec = numpy.arange(1, delta_win + 1) 

545 som = 2.0 * numpy.sum(vec * vec) 

546 

547 for i in range(n_frames): 

548 k = n_ceps 

549 features[i][d1 + k] = 0.0 

550 for ll in range(1, delta_win + 1): 

551 if i + ll < n_frames: 

552 p_ind = i + ll 

553 else: 

554 p_ind = n_frames - 1 

555 if i - ll > 0: 

556 n_ind = i - ll 

557 else: 

558 n_ind = 0 

559 features[i][d1 + k] = features[i][d1 + k] + ll * ( 

560 features[p_ind][k] - features[n_ind][k] 

561 ) 

562 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore 

563 

564 # compute Delta Delta of the coefficients 

565 if with_delta_delta: 

566 som = 0.0 

567 for i in range(1, delta_win + 1): 

568 som = som + i * i 

569 som = som * 2 

570 for i in range(n_frames): 

571 for k in range(n_ceps): 

572 features[i][2 * d1 + k] = 0.0 

573 for ll in range(1, delta_win + 1): 

574 if i + ll < n_frames: 

575 p_ind = i + ll 

576 else: 

577 p_ind = n_frames - 1 

578 if i - ll > 0: 

579 n_ind = i - ll 

580 else: 

581 n_ind = 0 

582 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * ( 

583 features[p_ind][d1 + k] - features[n_ind][d1 + k] 

584 ) 

585 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore 

586 

587 # compute Delta Delta of the energy 

588 if with_delta_delta and with_energy: 

589 som = 0.0 

590 for i in range(1, delta_win + 1): 

591 som = som + i * i 

592 som = som * 2 

593 for i in range(n_frames): 

594 k = n_ceps 

595 features[i][2 * d1 + k] = 0.0 

596 for ll in range(1, delta_win + 1): 

597 if i + ll < n_frames: 

598 p_ind = i + ll 

599 else: 

600 p_ind = n_frames - 1 

601 if i - ll > 0: 

602 n_ind = i - ll 

603 else: 

604 n_ind = 0 

605 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * ( 

606 features[p_ind][d1 + k] - features[n_ind][d1 + k] 

607 ) 

608 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore 

609 

610 return numpy.array(features)