Merge 81ef7f3bad23a00dc75336b461145453ebcb65a5 into 173ff7dd1d9fb1c4fddea0d41d704cfefeb8908c

This commit is contained in:
Felipe 2024-11-13 03:36:40 +03:00 committed by GitHub
commit ad261ea460
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -7,41 +7,39 @@ import numpy as np
import torch import torch
import torch.nn.functional as F import torch.nn.functional as F
from .utils import exact_div # Importação corrigida
from utils import exact_div
# hard-coded audio hyperparameters # Constants for audio processing
SAMPLE_RATE = 16000 class AudioConstants:
N_FFT = 400 SAMPLE_RATE = 16000
HOP_LENGTH = 160 N_FFT = 400
CHUNK_LENGTH = 30 HOP_LENGTH = 160
N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE # 480000 samples in a 30-second chunk CHUNK_LENGTH = 30
N_FRAMES = exact_div(N_SAMPLES, HOP_LENGTH) # 3000 frames in a mel spectrogram input N_SAMPLES = CHUNK_LENGTH * SAMPLE_RATE # 480000 samples in a 30-second chunk
N_FRAMES = exact_div(N_SAMPLES, HOP_LENGTH) # 3000 frames in a mel spectrogram input
N_SAMPLES_PER_TOKEN = HOP_LENGTH * 2 # Initial convolutions have stride 2
FRAMES_PER_SECOND = exact_div(SAMPLE_RATE, HOP_LENGTH) # 10ms per audio frame
TOKENS_PER_SECOND = exact_div(SAMPLE_RATE, N_SAMPLES_PER_TOKEN) # 20ms per audio token
N_SAMPLES_PER_TOKEN = HOP_LENGTH * 2 # the initial convolutions has stride 2 def load_audio(file: str, sr: int = AudioConstants.SAMPLE_RATE) -> np.ndarray:
FRAMES_PER_SECOND = exact_div(SAMPLE_RATE, HOP_LENGTH) # 10ms per audio frame
TOKENS_PER_SECOND = exact_div(SAMPLE_RATE, N_SAMPLES_PER_TOKEN) # 20ms per audio token
def load_audio(file: str, sr: int = SAMPLE_RATE):
""" """
Open an audio file and read as mono waveform, resampling as necessary Open an audio file and read as mono waveform, resampling as necessary.
Parameters Parameters
---------- ----------
file: str file: str
The audio file to open The audio file to open.
sr: int sr: int
The sample rate to resample the audio if necessary The sample rate to resample the audio if necessary.
Returns Returns
------- -------
np.ndarray
A NumPy array containing the audio waveform, in float32 dtype. A NumPy array containing the audio waveform, in float32 dtype.
""" """
# This launches a subprocess to decode audio while down-mixing
# and resampling as necessary. Requires the ffmpeg CLI in PATH.
# fmt: off
cmd = [ cmd = [
"ffmpeg", "ffmpeg",
"-nostdin", "-nostdin",
@ -53,32 +51,47 @@ def load_audio(file: str, sr: int = SAMPLE_RATE):
"-ar", str(sr), "-ar", str(sr),
"-" "-"
] ]
# fmt: on
try: try:
out = run(cmd, capture_output=True, check=True).stdout out = run(cmd, capture_output=True, check=True).stdout
except CalledProcessError as e: except CalledProcessError as e:
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e raise RuntimeError(f"Failed to load audio from {file}: {e.stderr.decode()}") from e
return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0 return np.frombuffer(out, np.int16).flatten().astype(np.float32) / 32768.0
def pad_or_trim(array: Union[np.ndarray, torch.Tensor], length: int = AudioConstants.N_SAMPLES, *, axis: int = -1) -> Union[np.ndarray, torch.Tensor]:
"""
Pad or trim the audio array to a specified length.
def pad_or_trim(array, length: int = N_SAMPLES, *, axis: int = -1): Parameters
----------
array: Union[np.ndarray, torch.Tensor]
The input array (NumPy or PyTorch tensor).
length: int
The target length to pad or trim to.
axis: int
The axis along which to pad or trim.
Returns
-------
Union[np.ndarray, torch.Tensor]
The padded or trimmed array.
""" """
Pad or trim the audio array to N_SAMPLES, as expected by the encoder.
""" if isinstance(array, torch.Tensor):
if torch.is_tensor(array):
if array.shape[axis] > length: if array.shape[axis] > length:
array = array.index_select( array = array.index_select(dim=axis, index=torch.arange(length, device=array.device))
dim=axis, index=torch.arange(length, device=array.device)
)
if array.shape[axis] < length: if array.shape[axis] < length:
pad_widths = [(0, 0)] * array.ndim pad_widths = [(0, 0)] * array.ndim
pad_widths[axis] = (0, length - array.shape[axis]) pad_widths[axis] = (0, length - array.shape[axis])
array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes]) array = F.pad(array, [pad for sizes in pad_widths[::-1] for pad in sizes])
else:
else: # Assume it's a NumPy array
if array.shape[axis] > length: if array.shape[axis] > length:
array = array.take(indices=range(length), axis=axis) array = np.take(array, indices=range(length), axis=axis)
if array.shape[axis] < length: if array.shape[axis] < length:
pad_widths = [(0, 0)] * array.ndim pad_widths = [(0, 0)] * array.ndim
@ -87,71 +100,126 @@ def pad_or_trim(array, length: int = N_SAMPLES, *, axis: int = -1):
return array return array
@lru_cache(maxsize=None) @lru_cache(maxsize=None)
def mel_filters(device, n_mels: int) -> torch.Tensor: def mel_filters(device: torch.device, n_mels: int) -> torch.Tensor:
""" """
load the mel filterbank matrix for projecting STFT into a Mel spectrogram. Load the mel filterbank matrix for projecting STFT into a Mel spectrogram.
Allows decoupling librosa dependency; saved using:
np.savez_compressed( Parameters
"mel_filters.npz", ----------
mel_80=librosa.filters.mel(sr=16000, n_fft=400, n_mels=80), device: torch.device
mel_128=librosa.filters.mel(sr=16000, n_fft=400, n_mels=128), The device to which the tensor will be moved.
)
n_mels: int
The number of Mel-frequency filters.
Returns
-------
torch.Tensor
A tensor containing the mel filterbank.
Raises
------
AssertionError
If n_mels is not supported.
""" """
assert n_mels in {80, 128}, f"Unsupported n_mels: {n_mels}" assert n_mels in {80, 128}, f"Unsupported n_mels: {n_mels}"
filters_path = os.path.join(os.path.dirname(__file__), "assets", "mel_filters.npz") filters_path = os.path.join(os.path.dirname(__file__), "assets", "mel_filters.npz")
with np.load(filters_path, allow_pickle=False) as f: with np.load(filters_path, allow_pickle=False) as f:
return torch.from_numpy(f[f"mel_{n_mels}"]).to(device) return torch.from_numpy(f[f"mel_{n_mels}"]).to(device)
def get_hann_window(size: int, device: torch.device) -> torch.Tensor:
"""
Get a Hann window of specified size on the given device.
Parameters
----------
size: int
The size of the window.
device: torch.device
The device to which the window will be moved.
Returns
-------
torch.Tensor
A Hann window tensor.
Cache the windows for efficiency.
"""
# Cache for Hann windows based on size and device.
if not hasattr(get_hann_window, 'cache'):
get_hann_window.cache = {}
key = (size, str(device))
if key not in get_hann_window.cache:
get_hann_window.cache[key] = torch.hann_window(size).to(device)
return get_hann_window.cache[key]
def log_mel_spectrogram( def log_mel_spectrogram(
audio: Union[str, np.ndarray, torch.Tensor], audio: Union[str, np.ndarray, torch.Tensor],
n_mels: int = 80, n_mels: int = 80,
padding: int = 0, padding: int = 0,
device: Optional[Union[str, torch.device]] = None, device: Optional[Union[str, torch.device]] = None,
): ) -> torch.Tensor:
""" """
Compute the log-Mel spectrogram of Compute the log-Mel spectrogram of an audio waveform.
Parameters Parameters
---------- ----------
audio: Union[str, np.ndarray, torch.Tensor], shape = (*) audio: Union[str, np.ndarray, torch.Tensor]
The path to audio or either a NumPy array or Tensor containing the audio waveform in 16 kHz The path to audio or a NumPy array or Tensor containing the audio waveform.
n_mels: int n_mels: int
The number of Mel-frequency filters, only 80 is supported The number of Mel-frequency filters (only supports 80 and 128).
padding: int padding: int
Number of zero samples to pad to the right Number of zero samples to pad to the right.
device: Optional[Union[str, torch.device]] device: Optional[Union[str, torch.device]]
If given, the audio tensor is moved to this device before STFT If given, moves the audio tensor to this device before STFT.
Returns Returns
------- -------
torch.Tensor, shape = (80, n_frames) torch.Tensor
A Tensor that contains the Mel spectrogram A Tensor containing the log-Mel spectrogram.
""" """
if not torch.is_tensor(audio):
# Load audio if necessary and convert to tensor if needed.
if isinstance(audio, str): if isinstance(audio, str):
audio = load_audio(audio) audio_tensor = load_audio(audio)
audio = torch.from_numpy(audio) audio_tensor = torch.from_numpy(audio_tensor)
else:
audio_tensor = audio if isinstance(audio, torch.Tensor) else torch.from_numpy(audio)
# Move to specified device if provided.
if device is not None: if device is not None:
audio = audio.to(device) audio_tensor = audio_tensor.to(device)
if padding > 0:
audio = F.pad(audio, (0, padding))
window = torch.hann_window(N_FFT).to(audio.device)
stft = torch.stft(audio, N_FFT, HOP_LENGTH, window=window, return_complex=True)
magnitudes = stft[..., :-1].abs() ** 2
filters = mel_filters(audio.device, n_mels) # Padding the audio tensor.
if padding > 0:
audio_tensor = F.pad(audio_tensor, (0, padding))
# Compute STFT and magnitudes.
window = get_hann_window(AudioConstants.N_FFT, audio_tensor.device)
stft = torch.stft(audio_tensor, AudioConstants.N_FFT,
AudioConstants.HOP_LENGTH,
window=window,
return_complex=True)
magnitudes = stft.abs() ** 2
# Calculate Mel spectrogram and apply logarithmic scaling.
filters = mel_filters(audio_tensor.device, n_mels)
mel_spec = filters @ magnitudes mel_spec = filters @ magnitudes
log_spec = torch.clamp(mel_spec, min=1e-10).log10() log_spec = torch.clamp(mel_spec, min=1e-10).log10()
log_spec = torch.maximum(log_spec, log_spec.max() - 8.0)
log_spec = (log_spec + 4.0) / 4.0 log_spec_normalized = (log_spec + 4.0) / 4.0
return log_spec
return log_spec_normalized