Coverage for src/bob/bio/spear/audio_processing.py: 94%

282 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-06 22:04 +0100

1#!/usr/bin/env python 

2# vim: set fileencoding=utf-8 : 

3# Elie Khoury <Elie.Khoury@idiap.ch> 

4# Andre Anjos <andre.anjos@idiap.ch> 

5# Pavel Korshunov <Pavel.Korshunov@idiap.ch> 

6# Amir Mohammadi <amir.mohammadi@idiap.ch> 

7 

8import importlib 

9import logging 

10import math 

11import sys 

12 

13from typing import Optional, Tuple, Union 

14 

15import numpy 

16import torch 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21def fft(src, dst=None): 

22 out = numpy.fft.fft(src) 

23 if dst is not None: 

24 dst[:] = out 

25 return out 

26 

27 

28def init_hamming_kernel(win_length): 

29 # Hamming initialization 

30 cst = 2 * math.pi / (win_length - 1.0) 

31 hamming_kernel = numpy.zeros(win_length) 

32 

33 for i in range(win_length): 

34 hamming_kernel[i] = 0.54 - 0.46 * math.cos(i * cst) 

35 return hamming_kernel 

36 

37 

38def init_freqfilter(rate, win_size, mel_scale, n_filters, f_min, f_max): 

39 # Compute cut-off frequencies 

40 p_index = numpy.array(numpy.zeros(n_filters + 2), dtype=numpy.float64) 

41 if mel_scale: 

42 # Mel scale 

43 m_max = mel_python(f_max) 

44 m_min = mel_python(f_min) 

45 

46 for i in range(n_filters + 2): 

47 alpha = float(i) / (n_filters + 1) 

48 f = mel_inv_python(m_min * (1 - alpha) + m_max * alpha) 

49 factor = float(f) / rate 

50 p_index[i] = win_size * factor 

51 else: 

52 # linear scale 

53 for i in range(n_filters + 2): 

54 alpha = float(i) / (n_filters + 1) 

55 f = f_min * (1.0 - alpha) + f_max * alpha 

56 p_index[i] = float(win_size) / rate * f 

57 return p_index 

58 

59 

60def init_dct_kernel(n_filters, n_ceps, dct_norm): 

61 dct_kernel = numpy.zeros([n_ceps, n_filters], dtype=numpy.float64) 

62 

63 dct_coeff = 1.0 

64 if dct_norm: 

65 dct_coeff = math.sqrt(2.0 / n_filters) 

66 

67 for i in range(0, n_ceps): 

68 for j in range(0, n_filters): 

69 dct_kernel[i][j] = dct_coeff * math.cos( 

70 math.pi * i * (j + 0.5) / float(n_filters) 

71 ) 

72 

73 if dct_norm: 

74 column_multiplier = numpy.ones(n_ceps, dtype=numpy.float64) 

75 column_multiplier[0] = math.sqrt( 

76 0.5 

77 ) # first element sqrt(0.5), the rest are 1. 

78 for j in range(0, n_filters): 

79 dct_kernel[:, j] = column_multiplier * dct_kernel[:, j] 

80 

81 return dct_kernel 

82 

83 

84def resample( 

85 audio: Union[numpy.ndarray, torch.Tensor], 

86 rate: int, 

87 new_rate: int, 

88 **kwargs, 

89) -> Union[numpy.ndarray, torch.Tensor]: 

90 """Resamples the audio to a new sample rate. 

91 

92 Parameters 

93 ---------- 

94 audio: 

95 The audio to resample. 

96 rate: 

97 The original sample rate of the audio. 

98 new_rate: 

99 The wanted sample rate of the output audio. 

100 kwargs: 

101 Arguments passed to :py:class:``torchaudio.transforms.Resample``. 

102 """ 

103 

104 import torchaudio 

105 

106 if rate == new_rate: 

107 return audio 

108 

109 was_numpy = False 

110 if isinstance(audio, numpy.ndarray): 

111 audio = torch.from_numpy(audio) 

112 was_numpy = True 

113 

114 resampler = torchaudio.transforms.Resample( 

115 rate, new_rate, dtype=torch.float32, **kwargs 

116 ) 

117 audio = resampler(audio) 

118 

119 return audio.numpy() if was_numpy else audio 

120 

121 

122def read( 

123 filename: str, 

124 channel: Optional[int] = None, 

125 force_sample_rate: Optional[int] = None, 

126) -> Tuple[numpy.ndarray, int]: 

127 """Reads audio file and returns the signal and the sampling rate 

128 

129 Parameters 

130 ---------- 

131 filename: 

132 The full path to the audio file to load. 

133 channel: 

134 The channel to load. If None, all channels will be loaded and returned in a 2D 

135 array in the shape (n_channels, n_samples). 

136 force_sample_rate: 

137 If specified, the audio will be resampled to the specified rate. Otherwise, the 

138 sample rate of the file will be used. 

139 

140 Returns 

141 ------- 

142 signal and sampling rate 

143 The signal in int16 range (-32768 to 32767) and float32 format, and the 

144 sampling rate in Hz. 

145 """ 

146 

147 import torchaudio 

148 

149 try: 

150 importlib.__import__( 

151 "soundfile" 

152 ) # Try to import missing soundfile may throw an OSError. 

153 torchaudio.set_audio_backend("soundfile") # May throw a RuntimeError. 

154 except (RuntimeError, OSError) as e: 

155 logger.warning( 

156 "'soundfile' could not be imported or specified as torchaudio " 

157 "backend. torchaudio may have trouble loading '.sph' files." 

158 ) 

159 logger.info("error was %s", e) 

160 

161 data, rate = torchaudio.load(str(filename)) 

162 

163 if channel is not None: 

164 data = data[channel] 

165 else: 

166 if data.ndim > 1: 

167 data = data[0] 

168 

169 if force_sample_rate is not None: 

170 data = resample(data, rate, force_sample_rate) 

171 rate = force_sample_rate 

172 

173 # Expected data is in float32 format and int16 range (-32768. to 32767.) 

174 data = data.numpy().astype(numpy.float32) * 32768 

175 

176 return data, rate 

177 

178 

179def audio_info(filename: str): 

180 """Returns the audio info of a file. 

181 

182 Parameters 

183 ---------- 

184 filename: 

185 The full path to the audio file to load. 

186 

187 Returns 

188 ------- 

189 info: torchaudio.backend.common.AudioMetaData 

190 A dictionary containing the audio information. 

191 """ 

192 

193 import torchaudio 

194 

195 return torchaudio.info(str(filename)) 

196 

197 

198def compare(v1, v2, width): 

199 return abs(v1 - v2) <= width 

200 

201 

202def mel_python(f): 

203 return 2595.0 * math.log10(1.0 + f / 700.0) 

204 

205 

206def mel_inv_python(value): 

207 return 700.0 * (10 ** (value / 2595.0) - 1) 

208 

209 

210def sig_norm(win_length, frame, flag): 

211 gain = 0.0 

212 for i in range(win_length): 

213 gain = gain + frame[i] * frame[i] 

214 

215 ENERGY_FLOOR = 1.0 

216 if gain < ENERGY_FLOOR: 

217 gain = math.log(ENERGY_FLOOR) 

218 else: 

219 gain = math.log(gain) 

220 

221 if flag and gain != 0.0: 

222 for i in range(win_length): 

223 frame[i] = frame[i] / gain 

224 return gain 

225 

226 

227def pre_emphasis(frame, win_shift, coef, last_frame_elem): 

228 

229 if (coef <= 0.0) or (coef > 1.0): 

230 print("Error: The emphasis coeff. should be between 0 and 1") 

231 return None 

232 

233 last_element = frame[win_shift - 1] 

234 return ( 

235 numpy.append( 

236 frame[0] - coef * last_frame_elem, frame[1:] - coef * frame[:-1] 

237 ), 

238 last_element, 

239 ) 

240 

241 

242def hamming_window(vector, hamming_kernel, win_length): 

243 for i in range(win_length): 

244 vector[i] = vector[i] * hamming_kernel[i] 

245 return vector 

246 

247 

248def log_filter_bank(frame, n_filters, p_index, win_size, energy_filter): 

249 x1 = numpy.array(frame, dtype=numpy.complex128) 

250 complex_ = fft(x1) 

251 abscomplex = numpy.absolute(complex_) 

252 half_frame = abscomplex[0 : int(win_size / 2) + 1] 

253 

254 if energy_filter: 

255 # Energy is basically magnitude in power of 2 

256 half_frame = half_frame**2 

257 

258 frame[0 : int(win_size / 2) + 1] = half_frame 

259 filters = log_triangular_bank(frame, n_filters, p_index) 

260 return filters, frame 

261 

262 

263def log_triangular_bank(data, n_filters, p_index): 

264 res_ = numpy.zeros(n_filters, dtype=numpy.float64) 

265 

266 denominator = 1.0 / ( 

267 p_index[1 : n_filters + 2] - p_index[0 : n_filters + 1] 

268 ) 

269 

270 for i in range(0, n_filters): 

271 li = int(math.floor(p_index[i] + 1)) 

272 mi = int(math.floor(p_index[i + 1])) 

273 ri = int(math.floor(p_index[i + 2])) 

274 if i == 0 or li == ri: 

275 li -= 1 

276 

277 vec_left = numpy.arange(li, mi + 1) 

278 vec_right = numpy.arange(mi + 1, ri + 1) 

279 res_[i] = numpy.sum( 

280 data[vec_left] * denominator[i] * (vec_left - p_index[i]) 

281 ) + numpy.sum( 

282 data[vec_right] * denominator[i + 1] * (p_index[i + 2] - vec_right) 

283 ) 

284 # alternative but equivalent implementation: 

285 # filt = numpy.zeros(ri-li+1, dtype=numpy.float64) 

286 # filt_l = denominator[i] * (vec_left-p_index[i]) 

287 # filt_p = denominator[i+1] * (p_index[i+2]-vec_right) 

288 # filt = numpy.append(filt_l, filt_p) 

289 # vect_full = numpy.arange(li, ri+1) 

290 # res_[i] = numpy.sum(data[vect_full] * filt) 

291 

292 FBANK_OUT_FLOOR = sys.float_info.epsilon 

293 return numpy.log(numpy.where(res_ < FBANK_OUT_FLOOR, FBANK_OUT_FLOOR, res_)) 

294 

295 

296def dct_transform(filters, n_filters, dct_kernel, n_ceps): 

297 

298 ceps = numpy.zeros(n_ceps) 

299 vec = numpy.array(range(0, n_filters)) 

300 for i in range(0, n_ceps): 

301 ceps[i] = numpy.sum(filters[vec] * dct_kernel[i]) 

302 

303 return ceps 

304 

305 

306def energy(data, rate, *, win_length_ms=20.0, win_shift_ms=10.0): 

307 

308 ######################### 

309 # Initialisation part ## 

310 ######################### 

311 

312 win_length = int(rate * win_length_ms / 1000) 

313 win_shift = int(rate * win_shift_ms / 1000) 

314 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

315 

316 ###################################### 

317 # End of the Initialisation part ### 

318 ###################################### 

319 

320 ###################################### 

321 # Core code ### 

322 ###################################### 

323 

324 data_size = data.shape[0] 

325 n_frames = int(1 + (data_size - win_length) / win_shift) 

326 

327 # create features set 

328 

329 features = [0 for j in range(n_frames)] 

330 

331 # compute cepstral coefficients 

332 for i in range(n_frames): 

333 # create a frame 

334 frame = numpy.zeros(win_size, dtype=numpy.float64) 

335 vec = numpy.arange(win_length) 

336 frame[vec] = data[vec + i * win_shift] 

337 som = numpy.sum(frame) 

338 som = som / win_size 

339 frame[vec] -= som # normalization by mean here 

340 

341 energy = sig_norm(win_length, frame, False) 

342 features[i] = energy 

343 

344 return numpy.array(features) 

345 

346 

347def spectrogram( 

348 data, 

349 rate, 

350 *, 

351 win_length_ms=20.0, 

352 win_shift_ms=10.0, 

353 n_filters=24, 

354 f_min=0.0, 

355 f_max=4000.0, 

356 pre_emphasis_coef=0.95, 

357 mel_scale=True, 

358 energy_filter=False, 

359 log_filter=True, 

360 energy_bands=False, 

361): 

362 ######################### 

363 # Initialisation part ## 

364 ######################### 

365 

366 win_length = int(rate * win_length_ms / 1000) 

367 win_shift = int(rate * win_shift_ms / 1000) 

368 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

369 

370 # Hamming initialisation 

371 hamming_kernel = init_hamming_kernel(win_length) 

372 

373 # Compute cut-off frequencies 

374 p_index = init_freqfilter( 

375 rate, win_size, mel_scale, n_filters, f_min, f_max 

376 ) 

377 

378 ###################################### 

379 # End of the Initialisation part ### 

380 ###################################### 

381 

382 ###################################### 

383 # Core code ### 

384 ###################################### 

385 

386 data_size = data.shape[0] 

387 n_frames = int(1 + (data_size - win_length) / win_shift) 

388 

389 # create features set 

390 features = numpy.zeros( 

391 [n_frames, int(win_size / 2) + 1], dtype=numpy.float64 

392 ) 

393 

394 last_frame_elem = 0 

395 # compute cepstral coefficients 

396 for i in range(n_frames): 

397 # create a frame 

398 frame = numpy.zeros(win_size, dtype=numpy.float64) 

399 vec = numpy.arange(win_length) 

400 frame[vec] = data[vec + i * win_shift] 

401 som = numpy.sum(frame) 

402 som = som / win_size 

403 frame[vec] -= som # normalization by mean here 

404 

405 frame_, last_frame_elem = pre_emphasis( 

406 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem 

407 ) 

408 frame[vec] = frame_ 

409 

410 # Hamming windowing 

411 frame = hamming_window(frame, hamming_kernel, win_length) 

412 

413 _, spec_row = log_filter_bank( 

414 frame, n_filters, p_index, win_size, energy_filter 

415 ) 

416 

417 features[i] = spec_row[0 : int(win_size / 2) + 1] 

418 

419 return numpy.array(features) 

420 

421 

422def cepstral( 

423 data, 

424 rate, 

425 *, 

426 win_length_ms=20, 

427 win_shift_ms=10, 

428 n_filters=20, 

429 f_min=0.0, 

430 f_max=4000.0, 

431 pre_emphasis_coef=1.0, 

432 mel_scale=True, 

433 n_ceps=19, 

434 delta_win=2, 

435 dct_norm=True, 

436 with_energy=True, 

437 with_delta=True, 

438 with_delta_delta=True, 

439): 

440 

441 ######################### 

442 # Initialisation part ## 

443 ######################### 

444 

445 win_length = int(rate * win_length_ms / 1000) 

446 win_shift = int(rate * win_shift_ms / 1000) 

447 win_size = int(2.0 ** math.ceil(math.log(win_length) / math.log(2))) 

448 

449 # Hamming initialisation 

450 hamming_kernel = init_hamming_kernel(win_length) 

451 

452 # Compute cut-off frequencies 

453 p_index = init_freqfilter( 

454 rate, 

455 win_size, 

456 mel_scale, 

457 n_filters, 

458 f_min, 

459 f_max, 

460 ) 

461 

462 # Cosine transform initialisation 

463 dct_kernel = init_dct_kernel(n_filters, n_ceps, dct_norm) 

464 

465 ###################################### 

466 # End of the Initialisation part ### 

467 ###################################### 

468 

469 ###################################### 

470 # Core code ### 

471 ###################################### 

472 

473 data_size = data.shape[0] 

474 n_frames = int(1 + (data_size - win_length) / win_shift) 

475 

476 # create features set 

477 dim0 = n_ceps 

478 if with_energy: 

479 dim0 += +1 

480 dim = dim0 

481 if with_delta: 

482 dim += dim0 

483 if with_delta_delta: 

484 dim += dim0 

485 else: 

486 with_delta_delta = False 

487 

488 features = numpy.zeros([n_frames, dim], dtype=numpy.float64) 

489 

490 last_frame_elem = 0 

491 # compute cepstral coefficients 

492 for i in range(n_frames): 

493 # create a frame 

494 frame = numpy.zeros(win_size, dtype=numpy.float64) 

495 vec = numpy.arange(win_length) 

496 frame[vec] = data[vec + i * win_shift] 

497 som = numpy.sum(frame) 

498 som = som / win_size 

499 frame[vec] -= som # normalization by mean here 

500 

501 if with_energy: 

502 energy = sig_norm(win_length, frame, False) 

503 

504 # pre-emphasis filtering 

505 frame_, last_frame_elem = pre_emphasis( 

506 frame[vec], win_shift, pre_emphasis_coef, last_frame_elem 

507 ) 

508 frame[vec] = frame_ 

509 

510 # Hamming windowing 

511 frame = hamming_window(frame, hamming_kernel, win_length) 

512 

513 # FFT and filters 

514 filters, _ = log_filter_bank( 

515 frame, n_filters, p_index, win_size, energy_filter=False 

516 ) 

517 

518 # apply DCT 

519 ceps = dct_transform(filters, n_filters, dct_kernel, n_ceps) 

520 

521 ###################################### 

522 # Deltas and Delta-Deltas ### 

523 ###################################### 

524 

525 d1 = n_ceps 

526 if with_energy: 

527 d1 = n_ceps + 1 

528 ceps = numpy.append(ceps, energy) 

529 

530 # stock the results in features matrix 

531 vec = numpy.arange(d1) 

532 features[i][0:d1] = ceps[vec] 

533 

534 # compute Delta coefficient 

535 if with_delta: 

536 som = 0.0 

537 for i in range(1, delta_win + 1): 

538 som = som + i * i 

539 som = som * 2 

540 

541 for i in range(n_frames): 

542 for k in range(n_ceps): 

543 features[i][d1 + k] = 0.0 

544 for ll in range(1, delta_win + 1): 

545 if i + ll < n_frames: 

546 p_ind = i + ll 

547 else: 

548 p_ind = n_frames - 1 

549 if i - ll > 0: 

550 n_ind = i - ll 

551 else: 

552 n_ind = 0 

553 features[i][d1 + k] = features[i][d1 + k] + ll * ( 

554 features[p_ind][k] - features[n_ind][k] 

555 ) 

556 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore 

557 

558 # compute Delta of the Energy 

559 if with_delta and with_energy: 

560 som = 0.0 

561 

562 vec = numpy.arange(1, delta_win + 1) 

563 som = 2.0 * numpy.sum(vec * vec) 

564 

565 for i in range(n_frames): 

566 k = n_ceps 

567 features[i][d1 + k] = 0.0 

568 for ll in range(1, delta_win + 1): 

569 if i + ll < n_frames: 

570 p_ind = i + ll 

571 else: 

572 p_ind = n_frames - 1 

573 if i - ll > 0: 

574 n_ind = i - ll 

575 else: 

576 n_ind = 0 

577 features[i][d1 + k] = features[i][d1 + k] + ll * ( 

578 features[p_ind][k] - features[n_ind][k] 

579 ) 

580 # features[i][d1+k] = features[i][d1+k] / som # do not normalize anymore 

581 

582 # compute Delta Delta of the coefficients 

583 if with_delta_delta: 

584 som = 0.0 

585 for i in range(1, delta_win + 1): 

586 som = som + i * i 

587 som = som * 2 

588 for i in range(n_frames): 

589 for k in range(n_ceps): 

590 features[i][2 * d1 + k] = 0.0 

591 for ll in range(1, delta_win + 1): 

592 if i + ll < n_frames: 

593 p_ind = i + ll 

594 else: 

595 p_ind = n_frames - 1 

596 if i - ll > 0: 

597 n_ind = i - ll 

598 else: 

599 n_ind = 0 

600 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * ( 

601 features[p_ind][d1 + k] - features[n_ind][d1 + k] 

602 ) 

603 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore 

604 

605 # compute Delta Delta of the energy 

606 if with_delta_delta and with_energy: 

607 som = 0.0 

608 for i in range(1, delta_win + 1): 

609 som = som + i * i 

610 som = som * 2 

611 for i in range(n_frames): 

612 k = n_ceps 

613 features[i][2 * d1 + k] = 0.0 

614 for ll in range(1, delta_win + 1): 

615 if i + ll < n_frames: 

616 p_ind = i + ll 

617 else: 

618 p_ind = n_frames - 1 

619 if i - ll > 0: 

620 n_ind = i - ll 

621 else: 

622 n_ind = 0 

623 features[i][2 * d1 + k] = features[i][2 * d1 + k] + ll * ( 

624 features[p_ind][d1 + k] - features[n_ind][d1 + k] 

625 ) 

626 # features[i][2*d1+k] = features[i][2*d1+k] / som # do not normalize anymore 

627 

628 return numpy.array(features)