Merge e34e2b466105540fb8b78e5aba8b22659203041c into c0d2f624c09dc18e709e37c2ad90c039a4eb72a2

This commit is contained in:
safayavatsal 2025-10-19 18:18:22 +00:00 committed by GitHub
commit dcf6ed841c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 2195 additions and 5 deletions

View File

@ -0,0 +1,22 @@
# Whisper Optimization Module
"""
This module provides memory and performance optimization utilities for OpenAI Whisper.
Includes GPU memory management, efficient chunking, and performance monitoring.
"""
from .memory_manager import MemoryManager, GPUMemoryManager, ChunkingStrategy
from .chunk_processor import ChunkProcessor, AdaptiveChunker
from .performance_monitor import PerformanceMonitor, BenchmarkRunner
__all__ = [
'MemoryManager',
'GPUMemoryManager',
'ChunkingStrategy',
'ChunkProcessor',
'AdaptiveChunker',
'PerformanceMonitor',
'BenchmarkRunner'
]
# Version info
__version__ = "1.0.0"

View File

@ -0,0 +1,743 @@
"""
Chunk processor for memory-efficient transcription of large audio files.
This module handles intelligent chunking and processing of large audio files
to prevent memory issues and optimize performance.
"""
import time
import logging
from typing import List, Dict, Any, Optional, Tuple, Callable, Union
from dataclasses import dataclass
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from enum import Enum
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
from .memory_manager import MemoryManager, ChunkingStrategy
class ProcessingMode(Enum):
"""Processing modes for chunk processing."""
SEQUENTIAL = "sequential" # Process chunks one by one
PARALLEL = "parallel" # Process chunks in parallel
ADAPTIVE = "adaptive" # Adapt based on system resources
@dataclass
class ChunkResult:
"""Result of processing a single chunk."""
chunk_id: int
start_time: float
end_time: float
text: str
segments: List[Dict[str, Any]]
processing_time: float
memory_used_gb: float
confidence_score: float = 0.0
error: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
@dataclass
class ProcessingStats:
"""Statistics from chunk processing."""
total_chunks: int
processed_chunks: int
failed_chunks: int
total_processing_time: float
total_audio_duration: float
average_processing_time: float
realtime_factor: float
peak_memory_usage_gb: float
memory_cleanups: int
class ChunkProcessor:
"""
Intelligent chunk processor for large audio files.
Handles chunking, processing, and result aggregation with memory management
and performance optimization.
"""
def __init__(
self,
memory_manager: Optional[MemoryManager] = None,
max_workers: Optional[int] = None,
processing_mode: ProcessingMode = ProcessingMode.ADAPTIVE,
enable_progress_callback: bool = True
):
"""
Initialize chunk processor.
Args:
memory_manager: Memory manager instance (created if None)
max_workers: Maximum number of parallel workers (auto-detect if None)
processing_mode: Processing mode (sequential, parallel, adaptive)
enable_progress_callback: Enable progress callbacks
"""
self.memory_manager = memory_manager or MemoryManager()
self.max_workers = max_workers or self._determine_optimal_workers()
self.processing_mode = processing_mode
self.enable_progress_callback = enable_progress_callback
# Processing state
self.current_stats = ProcessingStats(
total_chunks=0,
processed_chunks=0,
failed_chunks=0,
total_processing_time=0.0,
total_audio_duration=0.0,
average_processing_time=0.0,
realtime_factor=0.0,
peak_memory_usage_gb=0.0,
memory_cleanups=0
)
# Callbacks
self.progress_callback: Optional[Callable[[int, int, float], None]] = None
self.chunk_callback: Optional[Callable[[ChunkResult], None]] = None
self.error_callback: Optional[Callable[[Exception], None]] = None
# Setup logging
self.logger = logging.getLogger(__name__)
def _determine_optimal_workers(self) -> int:
"""Determine optimal number of workers based on system resources."""
try:
import psutil
cpu_count = psutil.cpu_count(logical=False) or 1
memory_gb = psutil.virtual_memory().total / (1024**3)
# Conservative worker count based on memory
# Assume each worker needs ~2GB for Whisper processing
memory_workers = max(1, int(memory_gb // 2))
# Use fewer workers than CPU cores to avoid overload
cpu_workers = max(1, cpu_count - 1)
# Take the minimum to avoid resource exhaustion
optimal_workers = min(memory_workers, cpu_workers, 4) # Cap at 4 workers
self.logger.info(f"Determined optimal workers: {optimal_workers}")
return optimal_workers
except Exception as e:
self.logger.warning(f"Error determining optimal workers: {e}")
return 1
def process_audio_file(
self,
audio_path: str,
model,
model_size: str = "base",
language: Optional[str] = None,
task: str = "transcribe",
chunk_duration: Optional[float] = None,
overlap_duration: float = 1.0,
**transcribe_options
) -> Dict[str, Any]:
"""
Process a large audio file using intelligent chunking.
Args:
audio_path: Path to audio file
model: Loaded Whisper model
model_size: Model size for memory optimization
language: Source language (None for auto-detect)
task: Task type (transcribe/translate)
chunk_duration: Chunk duration (auto-calculate if None)
overlap_duration: Overlap between chunks
**transcribe_options: Additional options for transcription
Returns:
Dictionary with aggregated results
"""
start_time = time.time()
try:
# Load audio file
audio_data, audio_duration = self._load_audio_file(audio_path)
# Calculate chunks
chunks = self._calculate_chunks(
audio_duration, model_size, chunk_duration, overlap_duration
)
# Initialize stats
self.current_stats = ProcessingStats(
total_chunks=len(chunks),
processed_chunks=0,
failed_chunks=0,
total_processing_time=0.0,
total_audio_duration=audio_duration,
average_processing_time=0.0,
realtime_factor=0.0,
peak_memory_usage_gb=0.0,
memory_cleanups=0
)
# Process chunks
chunk_results = self._process_chunks(
audio_data, chunks, model, language, task, **transcribe_options
)
# Aggregate results
final_result = self._aggregate_results(
chunk_results, audio_duration, overlap_duration
)
# Finalize stats
total_time = time.time() - start_time
self.current_stats.total_processing_time = total_time
self.current_stats.average_processing_time = (
total_time / max(1, self.current_stats.processed_chunks)
)
self.current_stats.realtime_factor = audio_duration / total_time
# Add processing metadata
final_result.update({
"processing_stats": self.current_stats.__dict__,
"chunk_info": {
"total_chunks": len(chunks),
"chunk_duration": chunk_duration or "auto",
"overlap_duration": overlap_duration,
"processing_mode": self.processing_mode.value
}
})
self.logger.info(f"Processed {audio_duration:.1f}s audio in {total_time:.1f}s "
f"(RTF: {self.current_stats.realtime_factor:.2f})")
return final_result
except Exception as e:
self.logger.error(f"Error processing audio file: {e}")
if self.error_callback:
self.error_callback(e)
raise
def _load_audio_file(self, audio_path: str) -> Tuple[np.ndarray, float]:
"""Load audio file and return data with duration."""
try:
import whisper
audio_data = whisper.load_audio(audio_path)
duration = len(audio_data) / 16000.0 # Whisper uses 16kHz
self.logger.info(f"Loaded audio file: {audio_path} ({duration:.1f}s)")
return audio_data, duration
except Exception as e:
self.logger.error(f"Error loading audio file {audio_path}: {e}")
raise
def _calculate_chunks(
self,
audio_duration: float,
model_size: str,
chunk_duration: Optional[float],
overlap_duration: float
) -> List[Tuple[float, float]]:
"""Calculate optimal chunks for the audio."""
if chunk_duration is None:
# Use memory manager to determine optimal chunk size
chunking_strategy = ChunkingStrategy(self.memory_manager)
chunks = chunking_strategy.calculate_optimal_chunks(
audio_duration,
model_size,
overlap_seconds=overlap_duration
)
else:
# Use specified chunk duration
chunks = []
current_start = 0.0
while current_start < audio_duration:
current_end = min(current_start + chunk_duration, audio_duration)
chunks.append((current_start, current_end))
current_start = current_end - overlap_duration
if current_start >= current_end:
break
self.logger.info(f"Split {audio_duration:.1f}s audio into {len(chunks)} chunks")
return chunks
def _process_chunks(
self,
audio_data: np.ndarray,
chunks: List[Tuple[float, float]],
model,
language: Optional[str],
task: str,
**transcribe_options
) -> List[ChunkResult]:
"""Process audio chunks using the configured processing mode."""
if self.processing_mode == ProcessingMode.SEQUENTIAL:
return self._process_chunks_sequential(
audio_data, chunks, model, language, task, **transcribe_options
)
elif self.processing_mode == ProcessingMode.PARALLEL:
return self._process_chunks_parallel(
audio_data, chunks, model, language, task, **transcribe_options
)
else: # ADAPTIVE
return self._process_chunks_adaptive(
audio_data, chunks, model, language, task, **transcribe_options
)
def _process_chunks_sequential(
self,
audio_data: np.ndarray,
chunks: List[Tuple[float, float]],
model,
language: Optional[str],
task: str,
**transcribe_options
) -> List[ChunkResult]:
"""Process chunks sequentially."""
results = []
for i, (start_time, end_time) in enumerate(chunks):
try:
with self.memory_manager.memory_context():
result = self._process_single_chunk(
audio_data, i, start_time, end_time, model,
language, task, **transcribe_options
)
results.append(result)
# Update progress
self.current_stats.processed_chunks += 1
if self.progress_callback:
self.progress_callback(
self.current_stats.processed_chunks,
self.current_stats.total_chunks,
(self.current_stats.processed_chunks / self.current_stats.total_chunks) * 100
)
# Call chunk callback
if self.chunk_callback:
self.chunk_callback(result)
except Exception as e:
self.logger.error(f"Error processing chunk {i}: {e}")
self.current_stats.failed_chunks += 1
# Create error result
error_result = ChunkResult(
chunk_id=i,
start_time=start_time,
end_time=end_time,
text="",
segments=[],
processing_time=0.0,
memory_used_gb=0.0,
error=str(e)
)
results.append(error_result)
return results
def _process_chunks_parallel(
self,
audio_data: np.ndarray,
chunks: List[Tuple[float, float]],
model,
language: Optional[str],
task: str,
**transcribe_options
) -> List[ChunkResult]:
"""Process chunks in parallel."""
results = [None] * len(chunks) # Pre-allocate results array
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# Submit all chunks
future_to_index = {}
for i, (start_time, end_time) in enumerate(chunks):
future = executor.submit(
self._process_single_chunk_with_context,
audio_data, i, start_time, end_time, model,
language, task, **transcribe_options
)
future_to_index[future] = i
# Collect results as they complete
for future in as_completed(future_to_index):
chunk_index = future_to_index[future]
try:
result = future.result()
results[chunk_index] = result
self.current_stats.processed_chunks += 1
# Update progress
if self.progress_callback:
self.progress_callback(
self.current_stats.processed_chunks,
self.current_stats.total_chunks,
(self.current_stats.processed_chunks / self.current_stats.total_chunks) * 100
)
# Call chunk callback
if self.chunk_callback:
self.chunk_callback(result)
except Exception as e:
self.logger.error(f"Error processing chunk {chunk_index}: {e}")
self.current_stats.failed_chunks += 1
start_time, end_time = chunks[chunk_index]
error_result = ChunkResult(
chunk_id=chunk_index,
start_time=start_time,
end_time=end_time,
text="",
segments=[],
processing_time=0.0,
memory_used_gb=0.0,
error=str(e)
)
results[chunk_index] = error_result
# Filter out None results (shouldn't happen, but safety check)
return [r for r in results if r is not None]
def _process_chunks_adaptive(
self,
audio_data: np.ndarray,
chunks: List[Tuple[float, float]],
model,
language: Optional[str],
task: str,
**transcribe_options
) -> List[ChunkResult]:
"""Process chunks adaptively based on system resources."""
# Check memory status to decide processing mode
memory_status = self.memory_manager.check_memory_status()
cpu_info = memory_status["cpu"]
# Use parallel processing if we have sufficient resources
if cpu_info.usage_percent < 70 and len(chunks) > 2:
self.logger.info("Using parallel processing mode (sufficient resources)")
return self._process_chunks_parallel(
audio_data, chunks, model, language, task, **transcribe_options
)
else:
self.logger.info("Using sequential processing mode (limited resources)")
return self._process_chunks_sequential(
audio_data, chunks, model, language, task, **transcribe_options
)
def _process_single_chunk_with_context(self, *args, **kwargs) -> ChunkResult:
"""Process a single chunk with memory context (for parallel execution)."""
with self.memory_manager.memory_context():
return self._process_single_chunk(*args, **kwargs)
def _process_single_chunk(
self,
audio_data: np.ndarray,
chunk_id: int,
start_time: float,
end_time: float,
model,
language: Optional[str],
task: str,
**transcribe_options
) -> ChunkResult:
"""Process a single audio chunk."""
chunk_start_time = time.time()
try:
# Extract audio chunk
sample_rate = 16000 # Whisper standard sample rate
start_sample = int(start_time * sample_rate)
end_sample = int(end_time * sample_rate)
chunk_audio = audio_data[start_sample:end_sample]
# Monitor memory before processing
memory_before = self.memory_manager.check_memory_status()
# Transcribe chunk
result = model.transcribe(
chunk_audio,
language=language,
task=task,
**transcribe_options
)
# Monitor memory after processing
memory_after = self.memory_manager.check_memory_status()
memory_used = memory_after["cpu"].used_gb - memory_before["cpu"].used_gb
# Update peak memory usage
current_usage = memory_after["cpu"].used_gb
if current_usage > self.current_stats.peak_memory_usage_gb:
self.current_stats.peak_memory_usage_gb = current_usage
# Calculate confidence score
confidence = self._calculate_chunk_confidence(result)
# Adjust segment timestamps to global time
adjusted_segments = []
for segment in result.get("segments", []):
adjusted_segment = segment.copy()
adjusted_segment["start"] += start_time
adjusted_segment["end"] += start_time
adjusted_segments.append(adjusted_segment)
processing_time = time.time() - chunk_start_time
return ChunkResult(
chunk_id=chunk_id,
start_time=start_time,
end_time=end_time,
text=result.get("text", ""),
segments=adjusted_segments,
processing_time=processing_time,
memory_used_gb=max(0.0, memory_used),
confidence_score=confidence,
metadata={
"language": result.get("language"),
"chunk_duration": end_time - start_time
}
)
except Exception as e:
processing_time = time.time() - chunk_start_time
self.logger.error(f"Error processing chunk {chunk_id}: {e}")
return ChunkResult(
chunk_id=chunk_id,
start_time=start_time,
end_time=end_time,
text="",
segments=[],
processing_time=processing_time,
memory_used_gb=0.0,
error=str(e)
)
def _calculate_chunk_confidence(self, result: Dict[str, Any]) -> float:
"""Calculate confidence score for a chunk result."""
try:
segments = result.get("segments", [])
if not segments:
return 0.0
# Use average log probability as confidence indicator
log_probs = []
for segment in segments:
if "avg_logprob" in segment:
log_probs.append(segment["avg_logprob"])
if log_probs:
avg_log_prob = sum(log_probs) / len(log_probs)
# Convert log probability to confidence (rough approximation)
confidence = max(0.1, min(0.99, 1.0 + avg_log_prob / 2.0))
return confidence
return 0.5 # Default confidence when no log probs available
except Exception:
return 0.5
def _aggregate_results(
self,
chunk_results: List[ChunkResult],
total_duration: float,
overlap_duration: float
) -> Dict[str, Any]:
"""Aggregate chunk results into final transcription."""
# Filter out failed chunks
successful_results = [r for r in chunk_results if r.error is None]
if not successful_results:
return {
"text": "",
"segments": [],
"language": "en",
"error": "All chunks failed to process"
}
# Combine text with overlap handling
combined_text = self._combine_text_with_overlap(
successful_results, overlap_duration
)
# Combine segments
all_segments = []
segment_id = 0
for result in successful_results:
for segment in result.segments:
segment_copy = segment.copy()
segment_copy["id"] = segment_id
all_segments.append(segment_copy)
segment_id += 1
# Determine primary language
languages = [r.metadata.get("language") for r in successful_results if r.metadata]
languages = [lang for lang in languages if lang]
primary_language = max(set(languages), key=languages.count) if languages else "en"
# Calculate overall confidence
confidences = [r.confidence_score for r in successful_results]
overall_confidence = sum(confidences) / len(confidences) if confidences else 0.0
return {
"text": combined_text,
"segments": all_segments,
"language": primary_language,
"confidence": overall_confidence,
"chunk_results": [r.__dict__ for r in chunk_results] # Include all chunk info
}
def _combine_text_with_overlap(
self,
results: List[ChunkResult],
overlap_duration: float
) -> str:
"""Combine text from chunks, handling overlaps intelligently."""
if not results:
return ""
if len(results) == 1:
return results[0].text.strip()
combined_parts = []
for i, result in enumerate(results):
text = result.text.strip()
if i == 0:
# First chunk: use full text
combined_parts.append(text)
else:
# Subsequent chunks: try to detect and remove overlap
if overlap_duration > 0 and combined_parts:
# Simple overlap detection: split text and look for common endings/beginnings
words = text.split()
if len(words) > 5:
# Try to find overlap by comparing last words of previous chunk
# with first words of current chunk
prev_words = combined_parts[-1].split()
# Look for overlap (up to 1/3 of the words in the chunk)
max_overlap = min(len(prev_words), len(words)) // 3
best_overlap = 0
for overlap_size in range(1, max_overlap + 1):
if (prev_words[-overlap_size:] == words[:overlap_size] and
overlap_size > best_overlap):
best_overlap = overlap_size
if best_overlap > 0:
# Remove overlapped words from current chunk
text = " ".join(words[best_overlap:])
if text: # Only add if there's remaining text
combined_parts.append(text)
return " ".join(combined_parts)
def set_progress_callback(self, callback: Callable[[int, int, float], None]) -> None:
"""Set progress callback function."""
self.progress_callback = callback
def set_chunk_callback(self, callback: Callable[[ChunkResult], None]) -> None:
"""Set chunk completion callback function."""
self.chunk_callback = callback
def set_error_callback(self, callback: Callable[[Exception], None]) -> None:
"""Set error callback function."""
self.error_callback = callback
def get_current_stats(self) -> ProcessingStats:
"""Get current processing statistics."""
return self.current_stats
class AdaptiveChunker:
"""Adaptive chunker that adjusts chunk size based on system performance."""
def __init__(self, memory_manager: Optional[MemoryManager] = None):
"""Initialize adaptive chunker."""
self.memory_manager = memory_manager or MemoryManager()
self.performance_history: List[Tuple[float, float, float]] = [] # (chunk_size, processing_time, memory_used)
self.logger = logging.getLogger(__name__)
def get_adaptive_chunk_size(
self,
base_chunk_size: float,
model_size: str = "base",
target_realtime_factor: float = 1.0
) -> float:
"""
Get adaptively adjusted chunk size based on performance history.
Args:
base_chunk_size: Base chunk size in seconds
model_size: Whisper model size
target_realtime_factor: Target real-time factor (1.0 = real-time)
Returns:
Adjusted chunk size in seconds
"""
if not self.performance_history:
return base_chunk_size
# Analyze recent performance
recent_history = self.performance_history[-10:] # Last 10 chunks
avg_processing_time = sum(h[1] for h in recent_history) / len(recent_history)
avg_chunk_size = sum(h[0] for h in recent_history) / len(recent_history)
avg_memory_used = sum(h[2] for h in recent_history) / len(recent_history)
current_rtf = avg_chunk_size / avg_processing_time if avg_processing_time > 0 else 1.0
# Adjust chunk size based on performance
if current_rtf < target_realtime_factor * 0.8:
# Too slow, reduce chunk size
adjustment_factor = 0.8
self.logger.info(f"Reducing chunk size due to slow processing (RTF: {current_rtf:.2f})")
elif current_rtf > target_realtime_factor * 1.5:
# Too fast, can increase chunk size
adjustment_factor = 1.2
self.logger.info(f"Increasing chunk size due to fast processing (RTF: {current_rtf:.2f})")
else:
# Performance is acceptable
adjustment_factor = 1.0
# Also consider memory usage
memory_status = self.memory_manager.check_memory_status()
if memory_status["cpu"].usage_percent > 80:
adjustment_factor *= 0.9 # Reduce size if memory is high
adjusted_size = base_chunk_size * adjustment_factor
# Apply reasonable bounds
min_size = 5.0 # Minimum 5 seconds
max_size = 300.0 # Maximum 5 minutes
adjusted_size = max(min_size, min(adjusted_size, max_size))
return adjusted_size
def record_performance(
self,
chunk_size: float,
processing_time: float,
memory_used: float
) -> None:
"""Record performance for adaptive adjustment."""
self.performance_history.append((chunk_size, processing_time, memory_used))
# Keep only recent history
if len(self.performance_history) > 100:
self.performance_history = self.performance_history[-50:]
def reset_history(self) -> None:
"""Reset performance history."""
self.performance_history.clear()

View File

@ -0,0 +1,620 @@
"""
Memory management utilities for Whisper transcription.
This module provides intelligent memory management for GPU and CPU resources,
preventing out-of-memory errors and optimizing performance for large audio files.
"""
import gc
import psutil
import time
import logging
from typing import Optional, Dict, List, Callable, Any, Tuple
from dataclasses import dataclass
from contextlib import contextmanager
from enum import Enum
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
class MemoryLevel(Enum):
"""Memory usage levels."""
LOW = "low" # < 30% usage
MODERATE = "moderate" # 30-60% usage
HIGH = "high" # 60-85% usage
CRITICAL = "critical" # > 85% usage
@dataclass
class MemoryInfo:
"""Memory information container."""
total_gb: float
used_gb: float
available_gb: float
usage_percent: float
level: MemoryLevel
class MemoryManager:
"""
System memory manager for CPU and GPU resources.
Monitors memory usage and provides automatic cleanup and optimization
to prevent out-of-memory errors during long transcriptions.
"""
def __init__(
self,
cpu_threshold_percent: float = 80.0,
gpu_threshold_percent: float = 85.0,
cleanup_interval_seconds: float = 30.0,
enable_automatic_cleanup: bool = True
):
"""
Initialize the memory manager.
Args:
cpu_threshold_percent: CPU memory threshold for triggering cleanup
gpu_threshold_percent: GPU memory threshold for triggering cleanup
cleanup_interval_seconds: Interval between automatic cleanup checks
enable_automatic_cleanup: Enable automatic memory cleanup
"""
self.cpu_threshold = cpu_threshold_percent
self.gpu_threshold = gpu_threshold_percent
self.cleanup_interval = cleanup_interval_seconds
self.enable_automatic_cleanup = enable_automatic_cleanup
# State tracking
self.last_cleanup_time = 0.0
self.cleanup_callbacks: List[Callable[[], None]] = []
self.memory_history: List[Tuple[float, MemoryInfo]] = []
# Setup logging
self.logger = logging.getLogger(__name__)
# Initialize GPU manager if available
self.gpu_manager = GPUMemoryManager() if TORCH_AVAILABLE else None
def get_cpu_memory_info(self) -> MemoryInfo:
"""Get current CPU memory information."""
memory = psutil.virtual_memory()
total_gb = memory.total / (1024**3)
used_gb = memory.used / (1024**3)
available_gb = memory.available / (1024**3)
usage_percent = memory.percent
# Determine memory level
if usage_percent < 30:
level = MemoryLevel.LOW
elif usage_percent < 60:
level = MemoryLevel.MODERATE
elif usage_percent < 85:
level = MemoryLevel.HIGH
else:
level = MemoryLevel.CRITICAL
return MemoryInfo(
total_gb=total_gb,
used_gb=used_gb,
available_gb=available_gb,
usage_percent=usage_percent,
level=level
)
def get_gpu_memory_info(self) -> Optional[MemoryInfo]:
"""Get current GPU memory information."""
if self.gpu_manager:
return self.gpu_manager.get_memory_info()
return None
def check_memory_status(self) -> Dict[str, MemoryInfo]:
"""Check current memory status for all devices."""
status = {"cpu": self.get_cpu_memory_info()}
gpu_info = self.get_gpu_memory_info()
if gpu_info:
status["gpu"] = gpu_info
# Store in history
current_time = time.time()
self.memory_history.append((current_time, status["cpu"]))
# Keep only recent history (last hour)
hour_ago = current_time - 3600
self.memory_history = [(t, info) for t, info in self.memory_history if t > hour_ago]
return status
def is_memory_critical(self) -> bool:
"""Check if any memory is at critical level."""
status = self.check_memory_status()
for device, info in status.items():
if device == "cpu" and info.usage_percent > self.cpu_threshold:
return True
elif device == "gpu" and info.usage_percent > self.gpu_threshold:
return True
return False
def cleanup_memory(self, force: bool = False) -> Dict[str, float]:
"""
Perform memory cleanup.
Args:
force: Force cleanup even if thresholds aren't met
Returns:
Dictionary with memory freed for each device
"""
if not force and not self.is_memory_critical():
return {}
memory_before = self.check_memory_status()
cleanup_results = {}
# Run cleanup callbacks
for callback in self.cleanup_callbacks:
try:
callback()
except Exception as e:
self.logger.warning(f"Cleanup callback failed: {e}")
# Python garbage collection
gc.collect()
# GPU memory cleanup
if self.gpu_manager:
gpu_freed = self.gpu_manager.cleanup_memory()
if gpu_freed > 0:
cleanup_results["gpu"] = gpu_freed
# Check memory after cleanup
memory_after = self.check_memory_status()
# Calculate freed memory
for device in memory_before:
if device in memory_after:
freed = memory_before[device].used_gb - memory_after[device].used_gb
if freed > 0:
cleanup_results[device] = freed
self.last_cleanup_time = time.time()
if cleanup_results:
self.logger.info(f"Memory cleanup freed: {cleanup_results}")
return cleanup_results
def add_cleanup_callback(self, callback: Callable[[], None]) -> None:
"""Add a cleanup callback function."""
self.cleanup_callbacks.append(callback)
def remove_cleanup_callback(self, callback: Callable[[], None]) -> None:
"""Remove a cleanup callback function."""
if callback in self.cleanup_callbacks:
self.cleanup_callbacks.remove(callback)
@contextmanager
def memory_context(self, cleanup_after: bool = True):
"""
Context manager for automatic memory management.
Args:
cleanup_after: Perform cleanup when exiting context
"""
initial_memory = self.check_memory_status()
try:
yield self
finally:
if cleanup_after:
self.cleanup_memory()
# Log memory usage
final_memory = self.check_memory_status()
for device in initial_memory:
if device in final_memory:
initial_used = initial_memory[device].used_gb
final_used = final_memory[device].used_gb
memory_delta = final_used - initial_used
if abs(memory_delta) > 0.1: # Only log significant changes
direction = "increased" if memory_delta > 0 else "decreased"
self.logger.info(
f"{device.upper()} memory {direction} by {abs(memory_delta):.2f}GB "
f"({final_used:.2f}GB used, {final_memory[device].usage_percent:.1f}%)"
)
def auto_cleanup_if_needed(self) -> bool:
"""Automatically cleanup if thresholds are exceeded."""
if not self.enable_automatic_cleanup:
return False
current_time = time.time()
# Check if enough time has passed since last cleanup
if current_time - self.last_cleanup_time < self.cleanup_interval:
return False
# Check if cleanup is needed
if self.is_memory_critical():
self.cleanup_memory()
return True
return False
def get_memory_recommendations(self) -> List[str]:
"""Get memory optimization recommendations."""
recommendations = []
status = self.check_memory_status()
cpu_info = status.get("cpu")
if cpu_info and cpu_info.level == MemoryLevel.CRITICAL:
recommendations.append("Reduce batch size or chunk duration")
recommendations.append("Enable memory cleanup callbacks")
recommendations.append("Consider processing audio in smaller segments")
gpu_info = status.get("gpu")
if gpu_info and gpu_info.level == MemoryLevel.CRITICAL:
recommendations.append("Use smaller Whisper model (e.g., base instead of large)")
recommendations.append("Enable mixed precision (fp16)")
recommendations.append("Reduce GPU batch size")
recommendations.append("Clear GPU cache between segments")
if cpu_info and cpu_info.level in [MemoryLevel.HIGH, MemoryLevel.CRITICAL]:
recommendations.append("Close other applications to free memory")
recommendations.append("Consider using swap memory if available")
return recommendations
def get_optimal_chunk_size(self, audio_length_seconds: float, target_memory_gb: float = 2.0) -> float:
"""
Calculate optimal chunk size based on available memory.
Args:
audio_length_seconds: Total audio length
target_memory_gb: Target memory usage per chunk
Returns:
Recommended chunk size in seconds
"""
cpu_info = self.get_cpu_memory_info()
available_memory = min(cpu_info.available_gb, target_memory_gb)
# Rough estimate: 1 second of audio ~ 0.1GB memory for processing
# This is very approximate and depends on model size and batch size
memory_per_second = 0.1
if self.gpu_manager and self.gpu_manager.is_cuda_available():
# GPU processing is more memory efficient for longer segments
memory_per_second = 0.05
optimal_chunk_seconds = available_memory / memory_per_second
# Clamp to reasonable bounds
optimal_chunk_seconds = max(10.0, min(optimal_chunk_seconds, 300.0)) # 10s to 5min
# Ensure we don't exceed total audio length
optimal_chunk_seconds = min(optimal_chunk_seconds, audio_length_seconds)
return optimal_chunk_seconds
class GPUMemoryManager:
"""GPU-specific memory management utilities."""
def __init__(self):
"""Initialize GPU memory manager."""
self.logger = logging.getLogger(__name__)
self.device_count = 0
if TORCH_AVAILABLE and torch.cuda.is_available():
self.device_count = torch.cuda.device_count()
def is_cuda_available(self) -> bool:
"""Check if CUDA is available."""
return TORCH_AVAILABLE and torch.cuda.is_available()
def get_memory_info(self, device_id: int = 0) -> Optional[MemoryInfo]:
"""Get GPU memory information for specified device."""
if not self.is_cuda_available() or device_id >= self.device_count:
return None
try:
# Get memory info from PyTorch
memory_allocated = torch.cuda.memory_allocated(device_id) / (1024**3) # GB
memory_reserved = torch.cuda.memory_reserved(device_id) / (1024**3) # GB
memory_total = torch.cuda.get_device_properties(device_id).total_memory / (1024**3) # GB
memory_free = memory_total - memory_reserved
usage_percent = (memory_reserved / memory_total) * 100
# Determine memory level
if usage_percent < 30:
level = MemoryLevel.LOW
elif usage_percent < 60:
level = MemoryLevel.MODERATE
elif usage_percent < 85:
level = MemoryLevel.HIGH
else:
level = MemoryLevel.CRITICAL
return MemoryInfo(
total_gb=memory_total,
used_gb=memory_reserved,
available_gb=memory_free,
usage_percent=usage_percent,
level=level
)
except Exception as e:
self.logger.error(f"Error getting GPU memory info: {e}")
return None
def cleanup_memory(self, device_id: Optional[int] = None) -> float:
"""
Cleanup GPU memory.
Args:
device_id: Specific device to cleanup (None for all devices)
Returns:
Amount of memory freed in GB
"""
if not self.is_cuda_available():
return 0.0
memory_before = 0.0
memory_after = 0.0
devices_to_cleanup = [device_id] if device_id is not None else range(self.device_count)
for device in devices_to_cleanup:
if device < self.device_count:
try:
# Get memory before cleanup
if device == 0: # Only measure for first device to avoid overhead
memory_before += torch.cuda.memory_allocated(device) / (1024**3)
# Clear cache
torch.cuda.empty_cache()
# Force garbage collection
gc.collect()
# Get memory after cleanup
if device == 0:
memory_after += torch.cuda.memory_allocated(device) / (1024**3)
except Exception as e:
self.logger.warning(f"Error cleaning GPU {device}: {e}")
memory_freed = max(0.0, memory_before - memory_after)
if memory_freed > 0.1: # Log only significant memory freeing
self.logger.info(f"GPU memory cleanup freed {memory_freed:.2f}GB")
return memory_freed
@contextmanager
def gpu_memory_context(self, device_id: int = 0):
"""Context manager for GPU memory management."""
if not self.is_cuda_available():
yield
return
# Set device
original_device = torch.cuda.current_device()
torch.cuda.set_device(device_id)
initial_memory = torch.cuda.memory_allocated(device_id) / (1024**3)
try:
yield
finally:
# Cleanup and restore device
torch.cuda.empty_cache()
torch.cuda.set_device(original_device)
final_memory = torch.cuda.memory_allocated(device_id) / (1024**3)
memory_delta = final_memory - initial_memory
if abs(memory_delta) > 0.1:
direction = "increased" if memory_delta > 0 else "decreased"
self.logger.debug(
f"GPU {device_id} memory {direction} by {abs(memory_delta):.2f}GB"
)
def optimize_model_for_memory(self, model, use_half_precision: bool = True) -> Any:
"""
Optimize model for memory usage.
Args:
model: PyTorch model to optimize
use_half_precision: Use FP16 to reduce memory usage
Returns:
Optimized model
"""
if not self.is_cuda_available():
return model
try:
# Move to GPU if not already
if hasattr(model, 'device') and model.device.type == 'cpu':
model = model.cuda()
# Enable half precision if supported and requested
if use_half_precision and hasattr(model, 'half'):
if hasattr(torch.cuda, 'get_device_capability'):
# Check if GPU supports half precision
capability = torch.cuda.get_device_capability()
if capability[0] >= 6: # Pascal architecture or newer
model = model.half()
self.logger.info("Enabled half precision (FP16) for memory optimization")
else:
model = model.half()
self.logger.info("Enabled half precision (FP16)")
# Enable optimizations
if hasattr(torch, 'compile') and hasattr(model, 'forward'):
# PyTorch 2.0 compilation (if available)
try:
model = torch.compile(model, mode='reduce-overhead')
self.logger.info("Applied PyTorch 2.0 compilation optimizations")
except Exception:
pass # Compilation may not be available
except Exception as e:
self.logger.warning(f"Error optimizing model for memory: {e}")
return model
def get_device_info(self) -> List[Dict[str, Any]]:
"""Get information about available GPU devices."""
if not self.is_cuda_available():
return []
devices = []
for i in range(self.device_count):
try:
props = torch.cuda.get_device_properties(i)
memory_info = self.get_memory_info(i)
device_info = {
"device_id": i,
"name": props.name,
"total_memory_gb": props.total_memory / (1024**3),
"compute_capability": f"{props.major}.{props.minor}",
"multiprocessor_count": props.multi_processor_count,
"current_memory_info": memory_info.__dict__ if memory_info else None
}
devices.append(device_info)
except Exception as e:
self.logger.warning(f"Error getting info for GPU {i}: {e}")
return devices
class ChunkingStrategy:
"""Strategy for chunking large audio files based on memory constraints."""
def __init__(self, memory_manager: MemoryManager):
"""Initialize chunking strategy with memory manager."""
self.memory_manager = memory_manager
self.logger = logging.getLogger(__name__)
def calculate_optimal_chunks(
self,
audio_length_seconds: float,
model_size: str = "base",
target_memory_gb: float = 2.0,
min_chunk_seconds: float = 10.0,
max_chunk_seconds: float = 300.0,
overlap_seconds: float = 1.0
) -> List[Tuple[float, float]]:
"""
Calculate optimal chunk boundaries for processing large audio.
Args:
audio_length_seconds: Total audio length
model_size: Whisper model size for memory estimation
target_memory_gb: Target memory usage per chunk
min_chunk_seconds: Minimum chunk size
max_chunk_seconds: Maximum chunk size
overlap_seconds: Overlap between chunks
Returns:
List of (start_time, end_time) tuples
"""
# Get optimal chunk size from memory manager
optimal_chunk_size = self.memory_manager.get_optimal_chunk_size(
audio_length_seconds, target_memory_gb
)
# Adjust based on model size
model_multipliers = {
"tiny": 0.5,
"base": 1.0,
"small": 1.5,
"medium": 2.0,
"large": 3.0,
"large-v2": 3.5,
"large-v3": 3.5
}
multiplier = model_multipliers.get(model_size, 1.0)
adjusted_chunk_size = optimal_chunk_size / multiplier
# Apply bounds
chunk_size = max(min_chunk_seconds, min(adjusted_chunk_size, max_chunk_seconds))
# Generate chunks
chunks = []
current_start = 0.0
while current_start < audio_length_seconds:
current_end = min(current_start + chunk_size, audio_length_seconds)
chunks.append((current_start, current_end))
# Next chunk starts with overlap
current_start = current_end - overlap_seconds
if current_start >= current_end:
break
self.logger.info(f"Generated {len(chunks)} chunks for {audio_length_seconds:.1f}s audio")
self.logger.info(f"Chunk size: {chunk_size:.1f}s, Overlap: {overlap_seconds:.1f}s")
return chunks
def get_memory_efficient_batch_size(self, model_size: str = "base") -> int:
"""Get memory-efficient batch size for the current system."""
cpu_info = self.memory_manager.get_cpu_memory_info()
gpu_info = self.memory_manager.get_gpu_memory_info()
# Base batch size recommendations
base_batch_sizes = {
"tiny": 8,
"base": 4,
"small": 2,
"medium": 1,
"large": 1,
"large-v2": 1,
"large-v3": 1
}
base_batch_size = base_batch_sizes.get(model_size, 1)
# Adjust based on available memory
if gpu_info:
# GPU processing
if gpu_info.level == MemoryLevel.CRITICAL:
return 1 # Minimal batch size
elif gpu_info.level == MemoryLevel.HIGH:
return max(1, base_batch_size // 2)
elif gpu_info.level == MemoryLevel.LOW:
return base_batch_size * 2
else:
# CPU processing
if cpu_info.level == MemoryLevel.CRITICAL:
return 1
elif cpu_info.level == MemoryLevel.HIGH:
return max(1, base_batch_size // 2)
elif cpu_info.level == MemoryLevel.LOW:
return min(base_batch_size * 2, 8) # Cap CPU batch size
return base_batch_size

View File

@ -0,0 +1,676 @@
"""
Performance monitoring and benchmarking utilities for Whisper optimization.
This module provides comprehensive performance monitoring, benchmarking,
and optimization recommendations for Whisper transcription.
"""
import time
import psutil
import logging
import json
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from contextlib import contextmanager
from collections import defaultdict, deque
import threading
try:
import torch
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
try:
import numpy as np
NUMPY_AVAILABLE = True
except ImportError:
NUMPY_AVAILABLE = False
@dataclass
class PerformanceMetrics:
"""Container for performance metrics."""
processing_time_seconds: float
audio_duration_seconds: float
realtime_factor: float
cpu_usage_percent: float
memory_usage_gb: float
gpu_memory_usage_gb: Optional[float]
model_size: str
device: str
batch_size: int
timestamp: float
@dataclass
class BenchmarkResult:
"""Container for benchmark results."""
model_name: str
device: str
audio_duration: float
processing_time: float
realtime_factor: float
memory_peak_gb: float
gpu_memory_peak_gb: Optional[float]
accuracy_score: Optional[float]
configuration: Dict[str, Any]
system_info: Dict[str, Any]
class PerformanceMonitor:
"""
Real-time performance monitoring for Whisper transcription.
Tracks processing performance, resource usage, and provides
optimization recommendations.
"""
def __init__(
self,
max_history_size: int = 1000,
enable_gpu_monitoring: bool = True,
sampling_interval: float = 0.1
):
"""
Initialize performance monitor.
Args:
max_history_size: Maximum number of metrics to keep in history
enable_gpu_monitoring: Enable GPU memory monitoring
sampling_interval: Interval between resource usage samples
"""
self.max_history_size = max_history_size
self.enable_gpu_monitoring = enable_gpu_monitoring and TORCH_AVAILABLE
self.sampling_interval = sampling_interval
# Performance history
self.metrics_history: deque = deque(maxlen=max_history_size)
# Real-time monitoring
self.current_session = {
"start_time": None,
"total_audio_processed": 0.0,
"total_processing_time": 0.0,
"segments_processed": 0,
"peak_memory_usage": 0.0,
"peak_gpu_memory_usage": 0.0 if self.enable_gpu_monitoring else None
}
# Resource monitoring
self.resource_history: List[Dict[str, Any]] = []
self.monitoring_thread: Optional[threading.Thread] = None
self.stop_monitoring = threading.Event()
# Setup logging
self.logger = logging.getLogger(__name__)
def start_session(self) -> None:
"""Start a monitoring session."""
self.current_session = {
"start_time": time.time(),
"total_audio_processed": 0.0,
"total_processing_time": 0.0,
"segments_processed": 0,
"peak_memory_usage": 0.0,
"peak_gpu_memory_usage": 0.0 if self.enable_gpu_monitoring else None
}
# Start background resource monitoring
self.stop_monitoring.clear()
self.monitoring_thread = threading.Thread(
target=self._monitor_resources,
daemon=True
)
self.monitoring_thread.start()
self.logger.info("Performance monitoring session started")
def stop_session(self) -> Dict[str, Any]:
"""Stop monitoring session and return summary."""
if self.monitoring_thread:
self.stop_monitoring.set()
self.monitoring_thread.join(timeout=1.0)
session_duration = time.time() - (self.current_session["start_time"] or time.time())
summary = {
"session_duration": session_duration,
"total_audio_processed": self.current_session["total_audio_processed"],
"total_processing_time": self.current_session["total_processing_time"],
"segments_processed": self.current_session["segments_processed"],
"peak_memory_usage_gb": self.current_session["peak_memory_usage"],
"average_rtf": (
self.current_session["total_audio_processed"] /
max(0.001, self.current_session["total_processing_time"])
),
"throughput_minutes_per_hour": (
(self.current_session["total_audio_processed"] / 60) /
max(0.001, session_duration / 3600)
)
}
if self.enable_gpu_monitoring:
summary["peak_gpu_memory_usage_gb"] = self.current_session["peak_gpu_memory_usage"]
self.logger.info(f"Performance monitoring session ended: RTF={summary['average_rtf']:.2f}")
return summary
@contextmanager
def monitor_transcription(
self,
model_size: str,
device: str,
batch_size: int = 1
):
"""
Context manager for monitoring a transcription operation.
Args:
model_size: Whisper model size
device: Processing device (cpu/cuda)
batch_size: Batch size used
"""
start_time = time.time()
start_memory = self._get_memory_usage()
start_gpu_memory = self._get_gpu_memory_usage() if self.enable_gpu_monitoring else None
try:
yield self
finally:
end_time = time.time()
end_memory = self._get_memory_usage()
end_gpu_memory = self._get_gpu_memory_usage() if self.enable_gpu_monitoring else None
processing_time = end_time - start_time
# Create metrics (will be completed by record_transcription)
self._processing_start_time = start_time
self._processing_time = processing_time
self._memory_usage = max(start_memory, end_memory)
self._gpu_memory_usage = max(start_gpu_memory or 0, end_gpu_memory or 0)
self._model_size = model_size
self._device = device
self._batch_size = batch_size
def record_transcription(
self,
audio_duration: float,
processing_time: Optional[float] = None,
model_size: Optional[str] = None,
device: Optional[str] = None,
batch_size: Optional[int] = None
) -> PerformanceMetrics:
"""
Record metrics for a completed transcription.
Args:
audio_duration: Duration of processed audio
processing_time: Time taken for processing
model_size: Model size used
device: Processing device
batch_size: Batch size used
Returns:
PerformanceMetrics object
"""
# Use values from context manager if available
processing_time = processing_time or getattr(self, '_processing_time', 0.0)
model_size = model_size or getattr(self, '_model_size', 'unknown')
device = device or getattr(self, '_device', 'unknown')
batch_size = batch_size or getattr(self, '_batch_size', 1)
# Calculate metrics
realtime_factor = audio_duration / max(0.001, processing_time)
cpu_usage = psutil.cpu_percent()
memory_usage = getattr(self, '_memory_usage', self._get_memory_usage())
gpu_memory_usage = getattr(self, '_gpu_memory_usage', None)
metrics = PerformanceMetrics(
processing_time_seconds=processing_time,
audio_duration_seconds=audio_duration,
realtime_factor=realtime_factor,
cpu_usage_percent=cpu_usage,
memory_usage_gb=memory_usage,
gpu_memory_usage_gb=gpu_memory_usage,
model_size=model_size,
device=device,
batch_size=batch_size,
timestamp=time.time()
)
# Add to history
self.metrics_history.append(metrics)
# Update session stats
self.current_session["total_audio_processed"] += audio_duration
self.current_session["total_processing_time"] += processing_time
self.current_session["segments_processed"] += 1
self.current_session["peak_memory_usage"] = max(
self.current_session["peak_memory_usage"], memory_usage
)
if gpu_memory_usage is not None:
self.current_session["peak_gpu_memory_usage"] = max(
self.current_session["peak_gpu_memory_usage"] or 0, gpu_memory_usage
)
# Clean up context manager attributes
for attr in ['_processing_start_time', '_processing_time', '_memory_usage',
'_gpu_memory_usage', '_model_size', '_device', '_batch_size']:
if hasattr(self, attr):
delattr(self, attr)
return metrics
def get_performance_summary(self, last_n: Optional[int] = None) -> Dict[str, Any]:
"""
Get performance summary statistics.
Args:
last_n: Number of recent metrics to analyze (None for all)
Returns:
Dictionary with performance statistics
"""
if not self.metrics_history:
return {"error": "No performance data available"}
# Select metrics to analyze
metrics_to_analyze = list(self.metrics_history)
if last_n is not None:
metrics_to_analyze = metrics_to_analyze[-last_n:]
# Calculate statistics
rtf_values = [m.realtime_factor for m in metrics_to_analyze]
processing_times = [m.processing_time_seconds for m in metrics_to_analyze]
audio_durations = [m.audio_duration_seconds for m in metrics_to_analyze]
memory_usage = [m.memory_usage_gb for m in metrics_to_analyze]
# GPU memory (if available)
gpu_memory = [m.gpu_memory_usage_gb for m in metrics_to_analyze if m.gpu_memory_usage_gb is not None]
summary = {
"total_samples": len(metrics_to_analyze),
"performance": {
"average_rtf": sum(rtf_values) / len(rtf_values),
"median_rtf": sorted(rtf_values)[len(rtf_values) // 2],
"min_rtf": min(rtf_values),
"max_rtf": max(rtf_values),
"average_processing_time": sum(processing_times) / len(processing_times),
"total_audio_processed": sum(audio_durations),
"total_processing_time": sum(processing_times)
},
"resources": {
"average_memory_usage_gb": sum(memory_usage) / len(memory_usage),
"peak_memory_usage_gb": max(memory_usage),
"min_memory_usage_gb": min(memory_usage)
}
}
if gpu_memory:
summary["resources"]["average_gpu_memory_gb"] = sum(gpu_memory) / len(gpu_memory)
summary["resources"]["peak_gpu_memory_gb"] = max(gpu_memory)
# Add model/device breakdown
model_stats = defaultdict(list)
device_stats = defaultdict(list)
for metric in metrics_to_analyze:
model_stats[metric.model_size].append(metric.realtime_factor)
device_stats[metric.device].append(metric.realtime_factor)
summary["breakdown"] = {
"by_model": {
model: {
"count": len(rtfs),
"average_rtf": sum(rtfs) / len(rtfs),
"median_rtf": sorted(rtfs)[len(rtfs) // 2]
}
for model, rtfs in model_stats.items()
},
"by_device": {
device: {
"count": len(rtfs),
"average_rtf": sum(rtfs) / len(rtfs),
"median_rtf": sorted(rtfs)[len(rtfs) // 2]
}
for device, rtfs in device_stats.items()
}
}
return summary
def get_optimization_recommendations(self) -> List[str]:
"""Get optimization recommendations based on performance history."""
if not self.metrics_history:
return ["No performance data available for recommendations"]
recommendations = []
recent_metrics = list(self.metrics_history)[-20:] # Last 20 measurements
# Analyze RTF performance
avg_rtf = sum(m.realtime_factor for m in recent_metrics) / len(recent_metrics)
if avg_rtf < 0.5:
recommendations.append("Consider using a smaller Whisper model (e.g., base instead of large)")
recommendations.append("Enable GPU acceleration if available")
recommendations.append("Reduce audio chunk size to lower memory usage")
recommendations.append("Close other applications to free system resources")
elif avg_rtf < 1.0:
recommendations.append("Performance is below real-time. Consider GPU acceleration")
recommendations.append("Monitor system resources for bottlenecks")
elif avg_rtf > 5.0:
recommendations.append("Performance is excellent! Consider using a larger model for better accuracy")
recommendations.append("You can increase chunk size for more efficient processing")
# Memory analysis
avg_memory = sum(m.memory_usage_gb for m in recent_metrics) / len(recent_metrics)
if avg_memory > 8.0:
recommendations.append("High memory usage detected. Consider smaller chunk sizes")
recommendations.append("Enable memory cleanup between segments")
# GPU memory analysis (if available)
gpu_metrics = [m for m in recent_metrics if m.gpu_memory_usage_gb is not None]
if gpu_metrics:
avg_gpu_memory = sum(m.gpu_memory_usage_gb for m in gpu_metrics) / len(gpu_metrics)
if avg_gpu_memory > 4.0:
recommendations.append("High GPU memory usage. Consider using fp16 precision")
recommendations.append("Reduce batch size or use smaller model")
# Device-specific recommendations
device_usage = defaultdict(int)
for metric in recent_metrics:
device_usage[metric.device] += 1
if device_usage.get('cpu', 0) > device_usage.get('cuda', 0):
if TORCH_AVAILABLE and torch.cuda.is_available():
recommendations.append("GPU is available but not being used. Enable GPU acceleration")
# Consistency analysis
rtf_variance = np.var([m.realtime_factor for m in recent_metrics]) if NUMPY_AVAILABLE else 0
if rtf_variance > 1.0:
recommendations.append("Performance is inconsistent. Check for background processes")
recommendations.append("Consider using fixed chunk sizes for more predictable performance")
return recommendations or ["Performance looks good! No specific recommendations."]
def export_metrics(self, filepath: str, format: str = "json") -> None:
"""
Export performance metrics to file.
Args:
filepath: Path to output file
format: Export format ("json", "csv")
"""
if format.lower() == "json":
self._export_json(filepath)
elif format.lower() == "csv":
self._export_csv(filepath)
else:
raise ValueError(f"Unsupported export format: {format}")
def _export_json(self, filepath: str) -> None:
"""Export metrics as JSON."""
data = {
"session_info": self.current_session,
"metrics": [asdict(metric) for metric in self.metrics_history],
"summary": self.get_performance_summary(),
"recommendations": self.get_optimization_recommendations(),
"export_timestamp": time.time()
}
with open(filepath, 'w') as f:
json.dump(data, f, indent=2)
self.logger.info(f"Exported {len(self.metrics_history)} metrics to {filepath}")
def _export_csv(self, filepath: str) -> None:
"""Export metrics as CSV."""
import csv
with open(filepath, 'w', newline='') as f:
if not self.metrics_history:
return
writer = csv.DictWriter(f, fieldnames=asdict(self.metrics_history[0]).keys())
writer.writeheader()
for metric in self.metrics_history:
writer.writerow(asdict(metric))
self.logger.info(f"Exported {len(self.metrics_history)} metrics to {filepath}")
def _monitor_resources(self) -> None:
"""Background thread for monitoring system resources."""
while not self.stop_monitoring.wait(self.sampling_interval):
try:
resource_sample = {
"timestamp": time.time(),
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"memory_gb": psutil.virtual_memory().used / (1024**3)
}
if self.enable_gpu_monitoring:
gpu_memory = self._get_gpu_memory_usage()
if gpu_memory is not None:
resource_sample["gpu_memory_gb"] = gpu_memory
self.resource_history.append(resource_sample)
# Keep only recent history (last hour)
cutoff_time = time.time() - 3600
self.resource_history = [
sample for sample in self.resource_history
if sample["timestamp"] > cutoff_time
]
except Exception as e:
self.logger.warning(f"Error in resource monitoring: {e}")
def _get_memory_usage(self) -> float:
"""Get current memory usage in GB."""
return psutil.virtual_memory().used / (1024**3)
def _get_gpu_memory_usage(self) -> Optional[float]:
"""Get current GPU memory usage in GB."""
if not self.enable_gpu_monitoring:
return None
try:
return torch.cuda.memory_allocated() / (1024**3)
except Exception:
return None
class BenchmarkRunner:
"""
Comprehensive benchmarking suite for Whisper models.
Provides standardized benchmarks for comparing performance across
different models, devices, and configurations.
"""
def __init__(self, performance_monitor: Optional[PerformanceMonitor] = None):
"""Initialize benchmark runner."""
self.performance_monitor = performance_monitor or PerformanceMonitor()
self.logger = logging.getLogger(__name__)
def benchmark_model(
self,
model_name: str,
device: str = "auto",
test_audio_duration: float = 60.0,
num_runs: int = 3,
warmup_runs: int = 1,
custom_config: Optional[Dict[str, Any]] = None
) -> BenchmarkResult:
"""
Benchmark a specific Whisper model configuration.
Args:
model_name: Whisper model to benchmark
device: Device for testing ("cpu", "cuda", "auto")
test_audio_duration: Duration of test audio
num_runs: Number of benchmark runs
warmup_runs: Number of warmup runs
custom_config: Custom configuration parameters
Returns:
BenchmarkResult with performance metrics
"""
self.logger.info(f"Starting benchmark: {model_name} on {device}")
try:
# Load model
import whisper
model = whisper.load_model(model_name, device=device)
# Generate test audio
test_audio = self._generate_test_audio(test_audio_duration)
# Configuration
config = {
"temperature": 0.0,
"language": None,
"task": "transcribe"
}
if custom_config:
config.update(custom_config)
# Warmup runs
self.logger.info(f"Running {warmup_runs} warmup iterations...")
for _ in range(warmup_runs):
model.transcribe(test_audio, **config)
if TORCH_AVAILABLE and torch.cuda.is_available():
torch.cuda.empty_cache()
# Benchmark runs
self.logger.info(f"Running {num_runs} benchmark iterations...")
results = []
for run in range(num_runs):
self.performance_monitor.start_session()
with self.performance_monitor.monitor_transcription(
model_name, device, batch_size=1
):
start_time = time.time()
result = model.transcribe(test_audio, **config)
end_time = time.time()
processing_time = end_time - start_time
# Record metrics
metrics = self.performance_monitor.record_transcription(
audio_duration=test_audio_duration,
processing_time=processing_time,
model_size=model_name,
device=device
)
session_summary = self.performance_monitor.stop_session()
results.append((metrics, session_summary, result))
self.logger.info(f"Run {run + 1}/{num_runs}: RTF={metrics.realtime_factor:.2f}")
# Clean up between runs
if TORCH_AVAILABLE and torch.cuda.is_available():
torch.cuda.empty_cache()
# Calculate aggregate results
processing_times = [r[0].processing_time_seconds for r in results]
rtf_values = [r[0].realtime_factor for r in results]
memory_peaks = [r[1]["peak_memory_usage_gb"] for r in results]
# System info
system_info = self._get_system_info()
benchmark_result = BenchmarkResult(
model_name=model_name,
device=device,
audio_duration=test_audio_duration,
processing_time=sum(processing_times) / len(processing_times),
realtime_factor=sum(rtf_values) / len(rtf_values),
memory_peak_gb=max(memory_peaks),
gpu_memory_peak_gb=max([
r[1].get("peak_gpu_memory_usage_gb", 0) or 0 for r in results
]) if TORCH_AVAILABLE else None,
accuracy_score=None, # Would need reference transcription
configuration=config,
system_info=system_info
)
self.logger.info(f"Benchmark completed: RTF={benchmark_result.realtime_factor:.2f}")
return benchmark_result
except Exception as e:
self.logger.error(f"Benchmark failed: {e}")
raise
def compare_models(
self,
model_names: List[str],
device: str = "auto",
test_audio_duration: float = 60.0
) -> Dict[str, BenchmarkResult]:
"""
Compare multiple models on the same test conditions.
Args:
model_names: List of model names to compare
device: Device for testing
test_audio_duration: Duration of test audio
Returns:
Dictionary mapping model names to benchmark results
"""
results = {}
for model_name in model_names:
try:
self.logger.info(f"Benchmarking {model_name}...")
results[model_name] = self.benchmark_model(
model_name, device, test_audio_duration
)
except Exception as e:
self.logger.error(f"Failed to benchmark {model_name}: {e}")
results[model_name] = None
return results
def _generate_test_audio(self, duration: float) -> np.ndarray:
"""Generate synthetic test audio."""
sample_rate = 16000
samples = int(duration * sample_rate)
# Generate speech-like audio with varying frequencies
t = np.linspace(0, duration, samples)
frequencies = 440 + 200 * np.sin(2 * np.pi * 0.5 * t) # Varying pitch
audio = 0.3 * np.sin(2 * np.pi * frequencies * t)
# Add some noise to make it more realistic
noise = 0.05 * np.random.randn(samples)
audio = audio + noise
return audio.astype(np.float32)
def _get_system_info(self) -> Dict[str, Any]:
"""Get system information for benchmark context."""
info = {
"cpu_count": psutil.cpu_count(),
"cpu_freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None,
"memory_total_gb": psutil.virtual_memory().total / (1024**3),
"python_version": f"{__import__('sys').version_info.major}.{__import__('sys').version_info.minor}",
}
if TORCH_AVAILABLE and torch.cuda.is_available():
info["gpu_count"] = torch.cuda.device_count()
info["gpu_name"] = torch.cuda.get_device_name(0) if torch.cuda.device_count() > 0 else None
info["cuda_version"] = torch.version.cuda
return info

View File

@ -34,6 +34,15 @@ from .utils import (
if TYPE_CHECKING: if TYPE_CHECKING:
from .model import Whisper from .model import Whisper
# Memory optimization imports
try:
from .optimization.memory_manager import MemoryManager
from .optimization.chunk_processor import ChunkProcessor
from .optimization.performance_monitor import PerformanceMonitor
OPTIMIZATION_AVAILABLE = True
except ImportError:
OPTIMIZATION_AVAILABLE = False
def transcribe( def transcribe(
model: "Whisper", model: "Whisper",
@ -52,6 +61,12 @@ def transcribe(
append_punctuations: str = "\"'.。,!?::”)]}、", append_punctuations: str = "\"'.。,!?::”)]}、",
clip_timestamps: Union[str, List[float]] = "0", clip_timestamps: Union[str, List[float]] = "0",
hallucination_silence_threshold: Optional[float] = None, hallucination_silence_threshold: Optional[float] = None,
# Memory optimization parameters
enable_memory_optimization: bool = False,
memory_optimization_mode: str = "adaptive", # "adaptive", "aggressive", "conservative"
auto_chunk_large_files: bool = True,
max_memory_usage_gb: Optional[float] = None,
enable_performance_monitoring: bool = False,
**decode_options, **decode_options,
): ):
""" """
@ -119,6 +134,26 @@ def transcribe(
When word_timestamps is True, skip silent periods longer than this threshold (in seconds) When word_timestamps is True, skip silent periods longer than this threshold (in seconds)
when a possible hallucination is detected when a possible hallucination is detected
enable_memory_optimization: bool
Enable automatic memory management and optimization features.
Helps prevent out-of-memory errors for large audio files.
memory_optimization_mode: str
Memory optimization strategy: "adaptive" (default), "aggressive", or "conservative".
Adaptive mode balances performance and memory usage based on system resources.
auto_chunk_large_files: bool
Automatically chunk large audio files to fit in available memory.
Uses intelligent chunking with overlap handling.
max_memory_usage_gb: Optional[float]
Maximum memory usage limit in GB. If None, uses system-dependent defaults.
Helps prevent system overload during processing.
enable_performance_monitoring: bool
Enable real-time performance monitoring and optimization recommendations.
Provides detailed metrics on processing speed and resource usage.
Returns Returns
------- -------
A dictionary containing the resulting text ("text") and segment-level details ("segments"), and A dictionary containing the resulting text ("text") and segment-level details ("segments"), and
@ -135,6 +170,70 @@ def transcribe(
if dtype == torch.float32: if dtype == torch.float32:
decode_options["fp16"] = False decode_options["fp16"] = False
# Initialize memory optimization if enabled
memory_manager = None
performance_monitor = None
chunk_processor = None
if enable_memory_optimization and OPTIMIZATION_AVAILABLE:
# Initialize memory manager
memory_manager = MemoryManager(
enable_automatic_cleanup=(memory_optimization_mode in ["adaptive", "aggressive"])
)
# Initialize performance monitoring
if enable_performance_monitoring:
performance_monitor = PerformanceMonitor()
performance_monitor.start_session()
# Check if we should use chunk processing for large files
if auto_chunk_large_files and isinstance(audio, str):
try:
import whisper
audio_data = whisper.load_audio(audio)
duration = len(audio_data) / SAMPLE_RATE
# Use chunk processing for files longer than 5 minutes or if memory is limited
memory_info = memory_manager.get_cpu_memory_info()
should_chunk = (duration > 300.0 or
memory_info.usage_percent > 70 or
(max_memory_usage_gb and memory_info.used_gb > max_memory_usage_gb))
if should_chunk:
chunk_processor = ChunkProcessor(
memory_manager=memory_manager,
processing_mode={
"conservative": "sequential",
"adaptive": "adaptive",
"aggressive": "parallel"
}.get(memory_optimization_mode, "adaptive")
)
# Use chunk processor for large files
model_size = getattr(model, 'model_name', 'base')
result = chunk_processor.process_audio_file(
audio, model, model_size,
language=decode_options.get("language"),
task=decode_options.get("task", "transcribe"),
**{k: v for k, v in decode_options.items() if k not in ["language", "task"]}
)
if performance_monitor:
session_summary = performance_monitor.stop_session()
result["performance_summary"] = session_summary
return result
except Exception as e:
if verbose:
warnings.warn(f"Memory optimization failed, falling back to standard processing: {e}")
elif enable_memory_optimization and not OPTIMIZATION_AVAILABLE:
warnings.warn(
"Memory optimization was requested but optimization modules are not available. "
"Falling back to standard Whisper behavior."
)
# Pad 30-seconds of silence to the input audio, for slicing # Pad 30-seconds of silence to the input audio, for slicing
mel = log_mel_spectrogram(audio, model.dims.n_mels, padding=N_SAMPLES) mel = log_mel_spectrogram(audio, model.dims.n_mels, padding=N_SAMPLES)
content_frames = mel.shape[-1] - N_FRAMES content_frames = mel.shape[-1] - N_FRAMES
@ -507,11 +606,34 @@ def transcribe(
# update progress bar # update progress bar
pbar.update(min(content_frames, seek) - previous_seek) pbar.update(min(content_frames, seek) - previous_seek)
return dict( # Finalize memory optimization and performance monitoring
text=tokenizer.decode(all_tokens[len(initial_prompt_tokens) :]), result_dict = {
segments=all_segments, "text": tokenizer.decode(all_tokens[len(initial_prompt_tokens) :]),
language=language, "segments": all_segments,
) "language": language,
}
# Add performance monitoring results if enabled
if performance_monitor:
try:
session_summary = performance_monitor.stop_session()
result_dict["performance_summary"] = session_summary
result_dict["optimization_recommendations"] = performance_monitor.get_optimization_recommendations()
except Exception as e:
if verbose:
warnings.warn(f"Performance monitoring failed: {e}")
# Perform final memory cleanup if enabled
if memory_manager and enable_memory_optimization:
try:
cleanup_results = memory_manager.cleanup_memory(force=True)
if cleanup_results and verbose:
print(f"Memory cleanup freed: {cleanup_results}")
except Exception as e:
if verbose:
warnings.warn(f"Memory cleanup failed: {e}")
return result_dict
def cli(): def cli():
@ -564,6 +686,13 @@ def cli():
parser.add_argument("--threads", type=optional_int, default=0, help="number of threads used by torch for CPU inference; supercedes MKL_NUM_THREADS/OMP_NUM_THREADS") parser.add_argument("--threads", type=optional_int, default=0, help="number of threads used by torch for CPU inference; supercedes MKL_NUM_THREADS/OMP_NUM_THREADS")
parser.add_argument("--clip_timestamps", type=str, default="0", help="comma-separated list start,end,start,end,... timestamps (in seconds) of clips to process, where the last end timestamp defaults to the end of the file") parser.add_argument("--clip_timestamps", type=str, default="0", help="comma-separated list start,end,start,end,... timestamps (in seconds) of clips to process, where the last end timestamp defaults to the end of the file")
parser.add_argument("--hallucination_silence_threshold", type=optional_float, help="(requires --word_timestamps True) skip silent periods longer than this threshold (in seconds) when a possible hallucination is detected") parser.add_argument("--hallucination_silence_threshold", type=optional_float, help="(requires --word_timestamps True) skip silent periods longer than this threshold (in seconds) when a possible hallucination is detected")
# Memory optimization arguments
parser.add_argument("--enable_memory_optimization", type=str2bool, default=False, help="enable automatic memory management and GPU optimization")
parser.add_argument("--memory_optimization_mode", type=str, default="adaptive", choices=["adaptive", "aggressive", "conservative"], help="memory optimization strategy")
parser.add_argument("--auto_chunk_large_files", type=str2bool, default=True, help="automatically chunk large audio files to prevent memory issues")
parser.add_argument("--max_memory_usage_gb", type=optional_float, default=None, help="maximum memory usage limit in GB")
parser.add_argument("--enable_performance_monitoring", type=str2bool, default=False, help="enable performance monitoring and optimization recommendations")
# fmt: on # fmt: on
args = parser.parse_args().__dict__ args = parser.parse_args().__dict__