diff --git a/whisper/optimization/__init__.py b/whisper/optimization/__init__.py new file mode 100644 index 0000000..4b2993f --- /dev/null +++ b/whisper/optimization/__init__.py @@ -0,0 +1,22 @@ +# Whisper Optimization Module +""" +This module provides memory and performance optimization utilities for OpenAI Whisper. +Includes GPU memory management, efficient chunking, and performance monitoring. +""" + +from .memory_manager import MemoryManager, GPUMemoryManager, ChunkingStrategy +from .chunk_processor import ChunkProcessor, AdaptiveChunker +from .performance_monitor import PerformanceMonitor, BenchmarkRunner + +__all__ = [ + 'MemoryManager', + 'GPUMemoryManager', + 'ChunkingStrategy', + 'ChunkProcessor', + 'AdaptiveChunker', + 'PerformanceMonitor', + 'BenchmarkRunner' +] + +# Version info +__version__ = "1.0.0" \ No newline at end of file diff --git a/whisper/optimization/chunk_processor.py b/whisper/optimization/chunk_processor.py new file mode 100644 index 0000000..5d3fe7f --- /dev/null +++ b/whisper/optimization/chunk_processor.py @@ -0,0 +1,743 @@ +""" +Chunk processor for memory-efficient transcription of large audio files. + +This module handles intelligent chunking and processing of large audio files +to prevent memory issues and optimize performance. +""" + +import time +import logging +from typing import List, Dict, Any, Optional, Tuple, Callable, Union +from dataclasses import dataclass +import threading +from concurrent.futures import ThreadPoolExecutor, as_completed +from enum import Enum + +try: + import numpy as np + NUMPY_AVAILABLE = True +except ImportError: + NUMPY_AVAILABLE = False + +from .memory_manager import MemoryManager, ChunkingStrategy + + +class ProcessingMode(Enum): + """Processing modes for chunk processing.""" + SEQUENTIAL = "sequential" # Process chunks one by one + PARALLEL = "parallel" # Process chunks in parallel + ADAPTIVE = "adaptive" # Adapt based on system resources + + +@dataclass +class ChunkResult: + """Result of processing a single chunk.""" + chunk_id: int + start_time: float + end_time: float + text: str + segments: List[Dict[str, Any]] + processing_time: float + memory_used_gb: float + confidence_score: float = 0.0 + error: Optional[str] = None + metadata: Optional[Dict[str, Any]] = None + + +@dataclass +class ProcessingStats: + """Statistics from chunk processing.""" + total_chunks: int + processed_chunks: int + failed_chunks: int + total_processing_time: float + total_audio_duration: float + average_processing_time: float + realtime_factor: float + peak_memory_usage_gb: float + memory_cleanups: int + + +class ChunkProcessor: + """ + Intelligent chunk processor for large audio files. + + Handles chunking, processing, and result aggregation with memory management + and performance optimization. + """ + + def __init__( + self, + memory_manager: Optional[MemoryManager] = None, + max_workers: Optional[int] = None, + processing_mode: ProcessingMode = ProcessingMode.ADAPTIVE, + enable_progress_callback: bool = True + ): + """ + Initialize chunk processor. + + Args: + memory_manager: Memory manager instance (created if None) + max_workers: Maximum number of parallel workers (auto-detect if None) + processing_mode: Processing mode (sequential, parallel, adaptive) + enable_progress_callback: Enable progress callbacks + """ + self.memory_manager = memory_manager or MemoryManager() + self.max_workers = max_workers or self._determine_optimal_workers() + self.processing_mode = processing_mode + self.enable_progress_callback = enable_progress_callback + + # Processing state + self.current_stats = ProcessingStats( + total_chunks=0, + processed_chunks=0, + failed_chunks=0, + total_processing_time=0.0, + total_audio_duration=0.0, + average_processing_time=0.0, + realtime_factor=0.0, + peak_memory_usage_gb=0.0, + memory_cleanups=0 + ) + + # Callbacks + self.progress_callback: Optional[Callable[[int, int, float], None]] = None + self.chunk_callback: Optional[Callable[[ChunkResult], None]] = None + self.error_callback: Optional[Callable[[Exception], None]] = None + + # Setup logging + self.logger = logging.getLogger(__name__) + + def _determine_optimal_workers(self) -> int: + """Determine optimal number of workers based on system resources.""" + try: + import psutil + cpu_count = psutil.cpu_count(logical=False) or 1 + memory_gb = psutil.virtual_memory().total / (1024**3) + + # Conservative worker count based on memory + # Assume each worker needs ~2GB for Whisper processing + memory_workers = max(1, int(memory_gb // 2)) + + # Use fewer workers than CPU cores to avoid overload + cpu_workers = max(1, cpu_count - 1) + + # Take the minimum to avoid resource exhaustion + optimal_workers = min(memory_workers, cpu_workers, 4) # Cap at 4 workers + + self.logger.info(f"Determined optimal workers: {optimal_workers}") + return optimal_workers + + except Exception as e: + self.logger.warning(f"Error determining optimal workers: {e}") + return 1 + + def process_audio_file( + self, + audio_path: str, + model, + model_size: str = "base", + language: Optional[str] = None, + task: str = "transcribe", + chunk_duration: Optional[float] = None, + overlap_duration: float = 1.0, + **transcribe_options + ) -> Dict[str, Any]: + """ + Process a large audio file using intelligent chunking. + + Args: + audio_path: Path to audio file + model: Loaded Whisper model + model_size: Model size for memory optimization + language: Source language (None for auto-detect) + task: Task type (transcribe/translate) + chunk_duration: Chunk duration (auto-calculate if None) + overlap_duration: Overlap between chunks + **transcribe_options: Additional options for transcription + + Returns: + Dictionary with aggregated results + """ + start_time = time.time() + + try: + # Load audio file + audio_data, audio_duration = self._load_audio_file(audio_path) + + # Calculate chunks + chunks = self._calculate_chunks( + audio_duration, model_size, chunk_duration, overlap_duration + ) + + # Initialize stats + self.current_stats = ProcessingStats( + total_chunks=len(chunks), + processed_chunks=0, + failed_chunks=0, + total_processing_time=0.0, + total_audio_duration=audio_duration, + average_processing_time=0.0, + realtime_factor=0.0, + peak_memory_usage_gb=0.0, + memory_cleanups=0 + ) + + # Process chunks + chunk_results = self._process_chunks( + audio_data, chunks, model, language, task, **transcribe_options + ) + + # Aggregate results + final_result = self._aggregate_results( + chunk_results, audio_duration, overlap_duration + ) + + # Finalize stats + total_time = time.time() - start_time + self.current_stats.total_processing_time = total_time + self.current_stats.average_processing_time = ( + total_time / max(1, self.current_stats.processed_chunks) + ) + self.current_stats.realtime_factor = audio_duration / total_time + + # Add processing metadata + final_result.update({ + "processing_stats": self.current_stats.__dict__, + "chunk_info": { + "total_chunks": len(chunks), + "chunk_duration": chunk_duration or "auto", + "overlap_duration": overlap_duration, + "processing_mode": self.processing_mode.value + } + }) + + self.logger.info(f"Processed {audio_duration:.1f}s audio in {total_time:.1f}s " + f"(RTF: {self.current_stats.realtime_factor:.2f})") + + return final_result + + except Exception as e: + self.logger.error(f"Error processing audio file: {e}") + if self.error_callback: + self.error_callback(e) + raise + + def _load_audio_file(self, audio_path: str) -> Tuple[np.ndarray, float]: + """Load audio file and return data with duration.""" + try: + import whisper + audio_data = whisper.load_audio(audio_path) + duration = len(audio_data) / 16000.0 # Whisper uses 16kHz + + self.logger.info(f"Loaded audio file: {audio_path} ({duration:.1f}s)") + return audio_data, duration + + except Exception as e: + self.logger.error(f"Error loading audio file {audio_path}: {e}") + raise + + def _calculate_chunks( + self, + audio_duration: float, + model_size: str, + chunk_duration: Optional[float], + overlap_duration: float + ) -> List[Tuple[float, float]]: + """Calculate optimal chunks for the audio.""" + if chunk_duration is None: + # Use memory manager to determine optimal chunk size + chunking_strategy = ChunkingStrategy(self.memory_manager) + chunks = chunking_strategy.calculate_optimal_chunks( + audio_duration, + model_size, + overlap_seconds=overlap_duration + ) + else: + # Use specified chunk duration + chunks = [] + current_start = 0.0 + while current_start < audio_duration: + current_end = min(current_start + chunk_duration, audio_duration) + chunks.append((current_start, current_end)) + current_start = current_end - overlap_duration + if current_start >= current_end: + break + + self.logger.info(f"Split {audio_duration:.1f}s audio into {len(chunks)} chunks") + return chunks + + def _process_chunks( + self, + audio_data: np.ndarray, + chunks: List[Tuple[float, float]], + model, + language: Optional[str], + task: str, + **transcribe_options + ) -> List[ChunkResult]: + """Process audio chunks using the configured processing mode.""" + if self.processing_mode == ProcessingMode.SEQUENTIAL: + return self._process_chunks_sequential( + audio_data, chunks, model, language, task, **transcribe_options + ) + elif self.processing_mode == ProcessingMode.PARALLEL: + return self._process_chunks_parallel( + audio_data, chunks, model, language, task, **transcribe_options + ) + else: # ADAPTIVE + return self._process_chunks_adaptive( + audio_data, chunks, model, language, task, **transcribe_options + ) + + def _process_chunks_sequential( + self, + audio_data: np.ndarray, + chunks: List[Tuple[float, float]], + model, + language: Optional[str], + task: str, + **transcribe_options + ) -> List[ChunkResult]: + """Process chunks sequentially.""" + results = [] + + for i, (start_time, end_time) in enumerate(chunks): + try: + with self.memory_manager.memory_context(): + result = self._process_single_chunk( + audio_data, i, start_time, end_time, model, + language, task, **transcribe_options + ) + results.append(result) + + # Update progress + self.current_stats.processed_chunks += 1 + if self.progress_callback: + self.progress_callback( + self.current_stats.processed_chunks, + self.current_stats.total_chunks, + (self.current_stats.processed_chunks / self.current_stats.total_chunks) * 100 + ) + + # Call chunk callback + if self.chunk_callback: + self.chunk_callback(result) + + except Exception as e: + self.logger.error(f"Error processing chunk {i}: {e}") + self.current_stats.failed_chunks += 1 + + # Create error result + error_result = ChunkResult( + chunk_id=i, + start_time=start_time, + end_time=end_time, + text="", + segments=[], + processing_time=0.0, + memory_used_gb=0.0, + error=str(e) + ) + results.append(error_result) + + return results + + def _process_chunks_parallel( + self, + audio_data: np.ndarray, + chunks: List[Tuple[float, float]], + model, + language: Optional[str], + task: str, + **transcribe_options + ) -> List[ChunkResult]: + """Process chunks in parallel.""" + results = [None] * len(chunks) # Pre-allocate results array + + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + # Submit all chunks + future_to_index = {} + for i, (start_time, end_time) in enumerate(chunks): + future = executor.submit( + self._process_single_chunk_with_context, + audio_data, i, start_time, end_time, model, + language, task, **transcribe_options + ) + future_to_index[future] = i + + # Collect results as they complete + for future in as_completed(future_to_index): + chunk_index = future_to_index[future] + try: + result = future.result() + results[chunk_index] = result + self.current_stats.processed_chunks += 1 + + # Update progress + if self.progress_callback: + self.progress_callback( + self.current_stats.processed_chunks, + self.current_stats.total_chunks, + (self.current_stats.processed_chunks / self.current_stats.total_chunks) * 100 + ) + + # Call chunk callback + if self.chunk_callback: + self.chunk_callback(result) + + except Exception as e: + self.logger.error(f"Error processing chunk {chunk_index}: {e}") + self.current_stats.failed_chunks += 1 + + start_time, end_time = chunks[chunk_index] + error_result = ChunkResult( + chunk_id=chunk_index, + start_time=start_time, + end_time=end_time, + text="", + segments=[], + processing_time=0.0, + memory_used_gb=0.0, + error=str(e) + ) + results[chunk_index] = error_result + + # Filter out None results (shouldn't happen, but safety check) + return [r for r in results if r is not None] + + def _process_chunks_adaptive( + self, + audio_data: np.ndarray, + chunks: List[Tuple[float, float]], + model, + language: Optional[str], + task: str, + **transcribe_options + ) -> List[ChunkResult]: + """Process chunks adaptively based on system resources.""" + # Check memory status to decide processing mode + memory_status = self.memory_manager.check_memory_status() + cpu_info = memory_status["cpu"] + + # Use parallel processing if we have sufficient resources + if cpu_info.usage_percent < 70 and len(chunks) > 2: + self.logger.info("Using parallel processing mode (sufficient resources)") + return self._process_chunks_parallel( + audio_data, chunks, model, language, task, **transcribe_options + ) + else: + self.logger.info("Using sequential processing mode (limited resources)") + return self._process_chunks_sequential( + audio_data, chunks, model, language, task, **transcribe_options + ) + + def _process_single_chunk_with_context(self, *args, **kwargs) -> ChunkResult: + """Process a single chunk with memory context (for parallel execution).""" + with self.memory_manager.memory_context(): + return self._process_single_chunk(*args, **kwargs) + + def _process_single_chunk( + self, + audio_data: np.ndarray, + chunk_id: int, + start_time: float, + end_time: float, + model, + language: Optional[str], + task: str, + **transcribe_options + ) -> ChunkResult: + """Process a single audio chunk.""" + chunk_start_time = time.time() + + try: + # Extract audio chunk + sample_rate = 16000 # Whisper standard sample rate + start_sample = int(start_time * sample_rate) + end_sample = int(end_time * sample_rate) + chunk_audio = audio_data[start_sample:end_sample] + + # Monitor memory before processing + memory_before = self.memory_manager.check_memory_status() + + # Transcribe chunk + result = model.transcribe( + chunk_audio, + language=language, + task=task, + **transcribe_options + ) + + # Monitor memory after processing + memory_after = self.memory_manager.check_memory_status() + memory_used = memory_after["cpu"].used_gb - memory_before["cpu"].used_gb + + # Update peak memory usage + current_usage = memory_after["cpu"].used_gb + if current_usage > self.current_stats.peak_memory_usage_gb: + self.current_stats.peak_memory_usage_gb = current_usage + + # Calculate confidence score + confidence = self._calculate_chunk_confidence(result) + + # Adjust segment timestamps to global time + adjusted_segments = [] + for segment in result.get("segments", []): + adjusted_segment = segment.copy() + adjusted_segment["start"] += start_time + adjusted_segment["end"] += start_time + adjusted_segments.append(adjusted_segment) + + processing_time = time.time() - chunk_start_time + + return ChunkResult( + chunk_id=chunk_id, + start_time=start_time, + end_time=end_time, + text=result.get("text", ""), + segments=adjusted_segments, + processing_time=processing_time, + memory_used_gb=max(0.0, memory_used), + confidence_score=confidence, + metadata={ + "language": result.get("language"), + "chunk_duration": end_time - start_time + } + ) + + except Exception as e: + processing_time = time.time() - chunk_start_time + self.logger.error(f"Error processing chunk {chunk_id}: {e}") + + return ChunkResult( + chunk_id=chunk_id, + start_time=start_time, + end_time=end_time, + text="", + segments=[], + processing_time=processing_time, + memory_used_gb=0.0, + error=str(e) + ) + + def _calculate_chunk_confidence(self, result: Dict[str, Any]) -> float: + """Calculate confidence score for a chunk result.""" + try: + segments = result.get("segments", []) + if not segments: + return 0.0 + + # Use average log probability as confidence indicator + log_probs = [] + for segment in segments: + if "avg_logprob" in segment: + log_probs.append(segment["avg_logprob"]) + + if log_probs: + avg_log_prob = sum(log_probs) / len(log_probs) + # Convert log probability to confidence (rough approximation) + confidence = max(0.1, min(0.99, 1.0 + avg_log_prob / 2.0)) + return confidence + + return 0.5 # Default confidence when no log probs available + + except Exception: + return 0.5 + + def _aggregate_results( + self, + chunk_results: List[ChunkResult], + total_duration: float, + overlap_duration: float + ) -> Dict[str, Any]: + """Aggregate chunk results into final transcription.""" + # Filter out failed chunks + successful_results = [r for r in chunk_results if r.error is None] + + if not successful_results: + return { + "text": "", + "segments": [], + "language": "en", + "error": "All chunks failed to process" + } + + # Combine text with overlap handling + combined_text = self._combine_text_with_overlap( + successful_results, overlap_duration + ) + + # Combine segments + all_segments = [] + segment_id = 0 + for result in successful_results: + for segment in result.segments: + segment_copy = segment.copy() + segment_copy["id"] = segment_id + all_segments.append(segment_copy) + segment_id += 1 + + # Determine primary language + languages = [r.metadata.get("language") for r in successful_results if r.metadata] + languages = [lang for lang in languages if lang] + primary_language = max(set(languages), key=languages.count) if languages else "en" + + # Calculate overall confidence + confidences = [r.confidence_score for r in successful_results] + overall_confidence = sum(confidences) / len(confidences) if confidences else 0.0 + + return { + "text": combined_text, + "segments": all_segments, + "language": primary_language, + "confidence": overall_confidence, + "chunk_results": [r.__dict__ for r in chunk_results] # Include all chunk info + } + + def _combine_text_with_overlap( + self, + results: List[ChunkResult], + overlap_duration: float + ) -> str: + """Combine text from chunks, handling overlaps intelligently.""" + if not results: + return "" + + if len(results) == 1: + return results[0].text.strip() + + combined_parts = [] + + for i, result in enumerate(results): + text = result.text.strip() + + if i == 0: + # First chunk: use full text + combined_parts.append(text) + else: + # Subsequent chunks: try to detect and remove overlap + if overlap_duration > 0 and combined_parts: + # Simple overlap detection: split text and look for common endings/beginnings + words = text.split() + if len(words) > 5: + # Try to find overlap by comparing last words of previous chunk + # with first words of current chunk + prev_words = combined_parts[-1].split() + + # Look for overlap (up to 1/3 of the words in the chunk) + max_overlap = min(len(prev_words), len(words)) // 3 + + best_overlap = 0 + for overlap_size in range(1, max_overlap + 1): + if (prev_words[-overlap_size:] == words[:overlap_size] and + overlap_size > best_overlap): + best_overlap = overlap_size + + if best_overlap > 0: + # Remove overlapped words from current chunk + text = " ".join(words[best_overlap:]) + + if text: # Only add if there's remaining text + combined_parts.append(text) + + return " ".join(combined_parts) + + def set_progress_callback(self, callback: Callable[[int, int, float], None]) -> None: + """Set progress callback function.""" + self.progress_callback = callback + + def set_chunk_callback(self, callback: Callable[[ChunkResult], None]) -> None: + """Set chunk completion callback function.""" + self.chunk_callback = callback + + def set_error_callback(self, callback: Callable[[Exception], None]) -> None: + """Set error callback function.""" + self.error_callback = callback + + def get_current_stats(self) -> ProcessingStats: + """Get current processing statistics.""" + return self.current_stats + + +class AdaptiveChunker: + """Adaptive chunker that adjusts chunk size based on system performance.""" + + def __init__(self, memory_manager: Optional[MemoryManager] = None): + """Initialize adaptive chunker.""" + self.memory_manager = memory_manager or MemoryManager() + self.performance_history: List[Tuple[float, float, float]] = [] # (chunk_size, processing_time, memory_used) + self.logger = logging.getLogger(__name__) + + def get_adaptive_chunk_size( + self, + base_chunk_size: float, + model_size: str = "base", + target_realtime_factor: float = 1.0 + ) -> float: + """ + Get adaptively adjusted chunk size based on performance history. + + Args: + base_chunk_size: Base chunk size in seconds + model_size: Whisper model size + target_realtime_factor: Target real-time factor (1.0 = real-time) + + Returns: + Adjusted chunk size in seconds + """ + if not self.performance_history: + return base_chunk_size + + # Analyze recent performance + recent_history = self.performance_history[-10:] # Last 10 chunks + + avg_processing_time = sum(h[1] for h in recent_history) / len(recent_history) + avg_chunk_size = sum(h[0] for h in recent_history) / len(recent_history) + avg_memory_used = sum(h[2] for h in recent_history) / len(recent_history) + + current_rtf = avg_chunk_size / avg_processing_time if avg_processing_time > 0 else 1.0 + + # Adjust chunk size based on performance + if current_rtf < target_realtime_factor * 0.8: + # Too slow, reduce chunk size + adjustment_factor = 0.8 + self.logger.info(f"Reducing chunk size due to slow processing (RTF: {current_rtf:.2f})") + elif current_rtf > target_realtime_factor * 1.5: + # Too fast, can increase chunk size + adjustment_factor = 1.2 + self.logger.info(f"Increasing chunk size due to fast processing (RTF: {current_rtf:.2f})") + else: + # Performance is acceptable + adjustment_factor = 1.0 + + # Also consider memory usage + memory_status = self.memory_manager.check_memory_status() + if memory_status["cpu"].usage_percent > 80: + adjustment_factor *= 0.9 # Reduce size if memory is high + + adjusted_size = base_chunk_size * adjustment_factor + + # Apply reasonable bounds + min_size = 5.0 # Minimum 5 seconds + max_size = 300.0 # Maximum 5 minutes + adjusted_size = max(min_size, min(adjusted_size, max_size)) + + return adjusted_size + + def record_performance( + self, + chunk_size: float, + processing_time: float, + memory_used: float + ) -> None: + """Record performance for adaptive adjustment.""" + self.performance_history.append((chunk_size, processing_time, memory_used)) + + # Keep only recent history + if len(self.performance_history) > 100: + self.performance_history = self.performance_history[-50:] + + def reset_history(self) -> None: + """Reset performance history.""" + self.performance_history.clear() \ No newline at end of file diff --git a/whisper/optimization/memory_manager.py b/whisper/optimization/memory_manager.py new file mode 100644 index 0000000..dc12793 --- /dev/null +++ b/whisper/optimization/memory_manager.py @@ -0,0 +1,620 @@ +""" +Memory management utilities for Whisper transcription. + +This module provides intelligent memory management for GPU and CPU resources, +preventing out-of-memory errors and optimizing performance for large audio files. +""" + +import gc +import psutil +import time +import logging +from typing import Optional, Dict, List, Callable, Any, Tuple +from dataclasses import dataclass +from contextlib import contextmanager +from enum import Enum + +try: + import torch + TORCH_AVAILABLE = True +except ImportError: + TORCH_AVAILABLE = False + +try: + import numpy as np + NUMPY_AVAILABLE = True +except ImportError: + NUMPY_AVAILABLE = False + + +class MemoryLevel(Enum): + """Memory usage levels.""" + LOW = "low" # < 30% usage + MODERATE = "moderate" # 30-60% usage + HIGH = "high" # 60-85% usage + CRITICAL = "critical" # > 85% usage + + +@dataclass +class MemoryInfo: + """Memory information container.""" + total_gb: float + used_gb: float + available_gb: float + usage_percent: float + level: MemoryLevel + + +class MemoryManager: + """ + System memory manager for CPU and GPU resources. + + Monitors memory usage and provides automatic cleanup and optimization + to prevent out-of-memory errors during long transcriptions. + """ + + def __init__( + self, + cpu_threshold_percent: float = 80.0, + gpu_threshold_percent: float = 85.0, + cleanup_interval_seconds: float = 30.0, + enable_automatic_cleanup: bool = True + ): + """ + Initialize the memory manager. + + Args: + cpu_threshold_percent: CPU memory threshold for triggering cleanup + gpu_threshold_percent: GPU memory threshold for triggering cleanup + cleanup_interval_seconds: Interval between automatic cleanup checks + enable_automatic_cleanup: Enable automatic memory cleanup + """ + self.cpu_threshold = cpu_threshold_percent + self.gpu_threshold = gpu_threshold_percent + self.cleanup_interval = cleanup_interval_seconds + self.enable_automatic_cleanup = enable_automatic_cleanup + + # State tracking + self.last_cleanup_time = 0.0 + self.cleanup_callbacks: List[Callable[[], None]] = [] + self.memory_history: List[Tuple[float, MemoryInfo]] = [] + + # Setup logging + self.logger = logging.getLogger(__name__) + + # Initialize GPU manager if available + self.gpu_manager = GPUMemoryManager() if TORCH_AVAILABLE else None + + def get_cpu_memory_info(self) -> MemoryInfo: + """Get current CPU memory information.""" + memory = psutil.virtual_memory() + + total_gb = memory.total / (1024**3) + used_gb = memory.used / (1024**3) + available_gb = memory.available / (1024**3) + usage_percent = memory.percent + + # Determine memory level + if usage_percent < 30: + level = MemoryLevel.LOW + elif usage_percent < 60: + level = MemoryLevel.MODERATE + elif usage_percent < 85: + level = MemoryLevel.HIGH + else: + level = MemoryLevel.CRITICAL + + return MemoryInfo( + total_gb=total_gb, + used_gb=used_gb, + available_gb=available_gb, + usage_percent=usage_percent, + level=level + ) + + def get_gpu_memory_info(self) -> Optional[MemoryInfo]: + """Get current GPU memory information.""" + if self.gpu_manager: + return self.gpu_manager.get_memory_info() + return None + + def check_memory_status(self) -> Dict[str, MemoryInfo]: + """Check current memory status for all devices.""" + status = {"cpu": self.get_cpu_memory_info()} + + gpu_info = self.get_gpu_memory_info() + if gpu_info: + status["gpu"] = gpu_info + + # Store in history + current_time = time.time() + self.memory_history.append((current_time, status["cpu"])) + + # Keep only recent history (last hour) + hour_ago = current_time - 3600 + self.memory_history = [(t, info) for t, info in self.memory_history if t > hour_ago] + + return status + + def is_memory_critical(self) -> bool: + """Check if any memory is at critical level.""" + status = self.check_memory_status() + + for device, info in status.items(): + if device == "cpu" and info.usage_percent > self.cpu_threshold: + return True + elif device == "gpu" and info.usage_percent > self.gpu_threshold: + return True + + return False + + def cleanup_memory(self, force: bool = False) -> Dict[str, float]: + """ + Perform memory cleanup. + + Args: + force: Force cleanup even if thresholds aren't met + + Returns: + Dictionary with memory freed for each device + """ + if not force and not self.is_memory_critical(): + return {} + + memory_before = self.check_memory_status() + cleanup_results = {} + + # Run cleanup callbacks + for callback in self.cleanup_callbacks: + try: + callback() + except Exception as e: + self.logger.warning(f"Cleanup callback failed: {e}") + + # Python garbage collection + gc.collect() + + # GPU memory cleanup + if self.gpu_manager: + gpu_freed = self.gpu_manager.cleanup_memory() + if gpu_freed > 0: + cleanup_results["gpu"] = gpu_freed + + # Check memory after cleanup + memory_after = self.check_memory_status() + + # Calculate freed memory + for device in memory_before: + if device in memory_after: + freed = memory_before[device].used_gb - memory_after[device].used_gb + if freed > 0: + cleanup_results[device] = freed + + self.last_cleanup_time = time.time() + + if cleanup_results: + self.logger.info(f"Memory cleanup freed: {cleanup_results}") + + return cleanup_results + + def add_cleanup_callback(self, callback: Callable[[], None]) -> None: + """Add a cleanup callback function.""" + self.cleanup_callbacks.append(callback) + + def remove_cleanup_callback(self, callback: Callable[[], None]) -> None: + """Remove a cleanup callback function.""" + if callback in self.cleanup_callbacks: + self.cleanup_callbacks.remove(callback) + + @contextmanager + def memory_context(self, cleanup_after: bool = True): + """ + Context manager for automatic memory management. + + Args: + cleanup_after: Perform cleanup when exiting context + """ + initial_memory = self.check_memory_status() + + try: + yield self + finally: + if cleanup_after: + self.cleanup_memory() + + # Log memory usage + final_memory = self.check_memory_status() + for device in initial_memory: + if device in final_memory: + initial_used = initial_memory[device].used_gb + final_used = final_memory[device].used_gb + memory_delta = final_used - initial_used + + if abs(memory_delta) > 0.1: # Only log significant changes + direction = "increased" if memory_delta > 0 else "decreased" + self.logger.info( + f"{device.upper()} memory {direction} by {abs(memory_delta):.2f}GB " + f"({final_used:.2f}GB used, {final_memory[device].usage_percent:.1f}%)" + ) + + def auto_cleanup_if_needed(self) -> bool: + """Automatically cleanup if thresholds are exceeded.""" + if not self.enable_automatic_cleanup: + return False + + current_time = time.time() + + # Check if enough time has passed since last cleanup + if current_time - self.last_cleanup_time < self.cleanup_interval: + return False + + # Check if cleanup is needed + if self.is_memory_critical(): + self.cleanup_memory() + return True + + return False + + def get_memory_recommendations(self) -> List[str]: + """Get memory optimization recommendations.""" + recommendations = [] + status = self.check_memory_status() + + cpu_info = status.get("cpu") + if cpu_info and cpu_info.level == MemoryLevel.CRITICAL: + recommendations.append("Reduce batch size or chunk duration") + recommendations.append("Enable memory cleanup callbacks") + recommendations.append("Consider processing audio in smaller segments") + + gpu_info = status.get("gpu") + if gpu_info and gpu_info.level == MemoryLevel.CRITICAL: + recommendations.append("Use smaller Whisper model (e.g., base instead of large)") + recommendations.append("Enable mixed precision (fp16)") + recommendations.append("Reduce GPU batch size") + recommendations.append("Clear GPU cache between segments") + + if cpu_info and cpu_info.level in [MemoryLevel.HIGH, MemoryLevel.CRITICAL]: + recommendations.append("Close other applications to free memory") + recommendations.append("Consider using swap memory if available") + + return recommendations + + def get_optimal_chunk_size(self, audio_length_seconds: float, target_memory_gb: float = 2.0) -> float: + """ + Calculate optimal chunk size based on available memory. + + Args: + audio_length_seconds: Total audio length + target_memory_gb: Target memory usage per chunk + + Returns: + Recommended chunk size in seconds + """ + cpu_info = self.get_cpu_memory_info() + available_memory = min(cpu_info.available_gb, target_memory_gb) + + # Rough estimate: 1 second of audio ~ 0.1GB memory for processing + # This is very approximate and depends on model size and batch size + memory_per_second = 0.1 + + if self.gpu_manager and self.gpu_manager.is_cuda_available(): + # GPU processing is more memory efficient for longer segments + memory_per_second = 0.05 + + optimal_chunk_seconds = available_memory / memory_per_second + + # Clamp to reasonable bounds + optimal_chunk_seconds = max(10.0, min(optimal_chunk_seconds, 300.0)) # 10s to 5min + + # Ensure we don't exceed total audio length + optimal_chunk_seconds = min(optimal_chunk_seconds, audio_length_seconds) + + return optimal_chunk_seconds + + +class GPUMemoryManager: + """GPU-specific memory management utilities.""" + + def __init__(self): + """Initialize GPU memory manager.""" + self.logger = logging.getLogger(__name__) + self.device_count = 0 + + if TORCH_AVAILABLE and torch.cuda.is_available(): + self.device_count = torch.cuda.device_count() + + def is_cuda_available(self) -> bool: + """Check if CUDA is available.""" + return TORCH_AVAILABLE and torch.cuda.is_available() + + def get_memory_info(self, device_id: int = 0) -> Optional[MemoryInfo]: + """Get GPU memory information for specified device.""" + if not self.is_cuda_available() or device_id >= self.device_count: + return None + + try: + # Get memory info from PyTorch + memory_allocated = torch.cuda.memory_allocated(device_id) / (1024**3) # GB + memory_reserved = torch.cuda.memory_reserved(device_id) / (1024**3) # GB + memory_total = torch.cuda.get_device_properties(device_id).total_memory / (1024**3) # GB + + memory_free = memory_total - memory_reserved + usage_percent = (memory_reserved / memory_total) * 100 + + # Determine memory level + if usage_percent < 30: + level = MemoryLevel.LOW + elif usage_percent < 60: + level = MemoryLevel.MODERATE + elif usage_percent < 85: + level = MemoryLevel.HIGH + else: + level = MemoryLevel.CRITICAL + + return MemoryInfo( + total_gb=memory_total, + used_gb=memory_reserved, + available_gb=memory_free, + usage_percent=usage_percent, + level=level + ) + + except Exception as e: + self.logger.error(f"Error getting GPU memory info: {e}") + return None + + def cleanup_memory(self, device_id: Optional[int] = None) -> float: + """ + Cleanup GPU memory. + + Args: + device_id: Specific device to cleanup (None for all devices) + + Returns: + Amount of memory freed in GB + """ + if not self.is_cuda_available(): + return 0.0 + + memory_before = 0.0 + memory_after = 0.0 + + devices_to_cleanup = [device_id] if device_id is not None else range(self.device_count) + + for device in devices_to_cleanup: + if device < self.device_count: + try: + # Get memory before cleanup + if device == 0: # Only measure for first device to avoid overhead + memory_before += torch.cuda.memory_allocated(device) / (1024**3) + + # Clear cache + torch.cuda.empty_cache() + + # Force garbage collection + gc.collect() + + # Get memory after cleanup + if device == 0: + memory_after += torch.cuda.memory_allocated(device) / (1024**3) + + except Exception as e: + self.logger.warning(f"Error cleaning GPU {device}: {e}") + + memory_freed = max(0.0, memory_before - memory_after) + + if memory_freed > 0.1: # Log only significant memory freeing + self.logger.info(f"GPU memory cleanup freed {memory_freed:.2f}GB") + + return memory_freed + + @contextmanager + def gpu_memory_context(self, device_id: int = 0): + """Context manager for GPU memory management.""" + if not self.is_cuda_available(): + yield + return + + # Set device + original_device = torch.cuda.current_device() + torch.cuda.set_device(device_id) + + initial_memory = torch.cuda.memory_allocated(device_id) / (1024**3) + + try: + yield + finally: + # Cleanup and restore device + torch.cuda.empty_cache() + torch.cuda.set_device(original_device) + + final_memory = torch.cuda.memory_allocated(device_id) / (1024**3) + memory_delta = final_memory - initial_memory + + if abs(memory_delta) > 0.1: + direction = "increased" if memory_delta > 0 else "decreased" + self.logger.debug( + f"GPU {device_id} memory {direction} by {abs(memory_delta):.2f}GB" + ) + + def optimize_model_for_memory(self, model, use_half_precision: bool = True) -> Any: + """ + Optimize model for memory usage. + + Args: + model: PyTorch model to optimize + use_half_precision: Use FP16 to reduce memory usage + + Returns: + Optimized model + """ + if not self.is_cuda_available(): + return model + + try: + # Move to GPU if not already + if hasattr(model, 'device') and model.device.type == 'cpu': + model = model.cuda() + + # Enable half precision if supported and requested + if use_half_precision and hasattr(model, 'half'): + if hasattr(torch.cuda, 'get_device_capability'): + # Check if GPU supports half precision + capability = torch.cuda.get_device_capability() + if capability[0] >= 6: # Pascal architecture or newer + model = model.half() + self.logger.info("Enabled half precision (FP16) for memory optimization") + else: + model = model.half() + self.logger.info("Enabled half precision (FP16)") + + # Enable optimizations + if hasattr(torch, 'compile') and hasattr(model, 'forward'): + # PyTorch 2.0 compilation (if available) + try: + model = torch.compile(model, mode='reduce-overhead') + self.logger.info("Applied PyTorch 2.0 compilation optimizations") + except Exception: + pass # Compilation may not be available + + except Exception as e: + self.logger.warning(f"Error optimizing model for memory: {e}") + + return model + + def get_device_info(self) -> List[Dict[str, Any]]: + """Get information about available GPU devices.""" + if not self.is_cuda_available(): + return [] + + devices = [] + for i in range(self.device_count): + try: + props = torch.cuda.get_device_properties(i) + memory_info = self.get_memory_info(i) + + device_info = { + "device_id": i, + "name": props.name, + "total_memory_gb": props.total_memory / (1024**3), + "compute_capability": f"{props.major}.{props.minor}", + "multiprocessor_count": props.multi_processor_count, + "current_memory_info": memory_info.__dict__ if memory_info else None + } + + devices.append(device_info) + + except Exception as e: + self.logger.warning(f"Error getting info for GPU {i}: {e}") + + return devices + + +class ChunkingStrategy: + """Strategy for chunking large audio files based on memory constraints.""" + + def __init__(self, memory_manager: MemoryManager): + """Initialize chunking strategy with memory manager.""" + self.memory_manager = memory_manager + self.logger = logging.getLogger(__name__) + + def calculate_optimal_chunks( + self, + audio_length_seconds: float, + model_size: str = "base", + target_memory_gb: float = 2.0, + min_chunk_seconds: float = 10.0, + max_chunk_seconds: float = 300.0, + overlap_seconds: float = 1.0 + ) -> List[Tuple[float, float]]: + """ + Calculate optimal chunk boundaries for processing large audio. + + Args: + audio_length_seconds: Total audio length + model_size: Whisper model size for memory estimation + target_memory_gb: Target memory usage per chunk + min_chunk_seconds: Minimum chunk size + max_chunk_seconds: Maximum chunk size + overlap_seconds: Overlap between chunks + + Returns: + List of (start_time, end_time) tuples + """ + # Get optimal chunk size from memory manager + optimal_chunk_size = self.memory_manager.get_optimal_chunk_size( + audio_length_seconds, target_memory_gb + ) + + # Adjust based on model size + model_multipliers = { + "tiny": 0.5, + "base": 1.0, + "small": 1.5, + "medium": 2.0, + "large": 3.0, + "large-v2": 3.5, + "large-v3": 3.5 + } + + multiplier = model_multipliers.get(model_size, 1.0) + adjusted_chunk_size = optimal_chunk_size / multiplier + + # Apply bounds + chunk_size = max(min_chunk_seconds, min(adjusted_chunk_size, max_chunk_seconds)) + + # Generate chunks + chunks = [] + current_start = 0.0 + + while current_start < audio_length_seconds: + current_end = min(current_start + chunk_size, audio_length_seconds) + chunks.append((current_start, current_end)) + + # Next chunk starts with overlap + current_start = current_end - overlap_seconds + if current_start >= current_end: + break + + self.logger.info(f"Generated {len(chunks)} chunks for {audio_length_seconds:.1f}s audio") + self.logger.info(f"Chunk size: {chunk_size:.1f}s, Overlap: {overlap_seconds:.1f}s") + + return chunks + + def get_memory_efficient_batch_size(self, model_size: str = "base") -> int: + """Get memory-efficient batch size for the current system.""" + cpu_info = self.memory_manager.get_cpu_memory_info() + gpu_info = self.memory_manager.get_gpu_memory_info() + + # Base batch size recommendations + base_batch_sizes = { + "tiny": 8, + "base": 4, + "small": 2, + "medium": 1, + "large": 1, + "large-v2": 1, + "large-v3": 1 + } + + base_batch_size = base_batch_sizes.get(model_size, 1) + + # Adjust based on available memory + if gpu_info: + # GPU processing + if gpu_info.level == MemoryLevel.CRITICAL: + return 1 # Minimal batch size + elif gpu_info.level == MemoryLevel.HIGH: + return max(1, base_batch_size // 2) + elif gpu_info.level == MemoryLevel.LOW: + return base_batch_size * 2 + else: + # CPU processing + if cpu_info.level == MemoryLevel.CRITICAL: + return 1 + elif cpu_info.level == MemoryLevel.HIGH: + return max(1, base_batch_size // 2) + elif cpu_info.level == MemoryLevel.LOW: + return min(base_batch_size * 2, 8) # Cap CPU batch size + + return base_batch_size \ No newline at end of file diff --git a/whisper/optimization/performance_monitor.py b/whisper/optimization/performance_monitor.py new file mode 100644 index 0000000..2cbe803 --- /dev/null +++ b/whisper/optimization/performance_monitor.py @@ -0,0 +1,676 @@ +""" +Performance monitoring and benchmarking utilities for Whisper optimization. + +This module provides comprehensive performance monitoring, benchmarking, +and optimization recommendations for Whisper transcription. +""" + +import time +import psutil +import logging +import json +from typing import Dict, List, Any, Optional, Callable +from dataclasses import dataclass, asdict +from contextlib import contextmanager +from collections import defaultdict, deque +import threading + +try: + import torch + TORCH_AVAILABLE = True +except ImportError: + TORCH_AVAILABLE = False + +try: + import numpy as np + NUMPY_AVAILABLE = True +except ImportError: + NUMPY_AVAILABLE = False + + +@dataclass +class PerformanceMetrics: + """Container for performance metrics.""" + processing_time_seconds: float + audio_duration_seconds: float + realtime_factor: float + cpu_usage_percent: float + memory_usage_gb: float + gpu_memory_usage_gb: Optional[float] + model_size: str + device: str + batch_size: int + timestamp: float + + +@dataclass +class BenchmarkResult: + """Container for benchmark results.""" + model_name: str + device: str + audio_duration: float + processing_time: float + realtime_factor: float + memory_peak_gb: float + gpu_memory_peak_gb: Optional[float] + accuracy_score: Optional[float] + configuration: Dict[str, Any] + system_info: Dict[str, Any] + + +class PerformanceMonitor: + """ + Real-time performance monitoring for Whisper transcription. + + Tracks processing performance, resource usage, and provides + optimization recommendations. + """ + + def __init__( + self, + max_history_size: int = 1000, + enable_gpu_monitoring: bool = True, + sampling_interval: float = 0.1 + ): + """ + Initialize performance monitor. + + Args: + max_history_size: Maximum number of metrics to keep in history + enable_gpu_monitoring: Enable GPU memory monitoring + sampling_interval: Interval between resource usage samples + """ + self.max_history_size = max_history_size + self.enable_gpu_monitoring = enable_gpu_monitoring and TORCH_AVAILABLE + self.sampling_interval = sampling_interval + + # Performance history + self.metrics_history: deque = deque(maxlen=max_history_size) + + # Real-time monitoring + self.current_session = { + "start_time": None, + "total_audio_processed": 0.0, + "total_processing_time": 0.0, + "segments_processed": 0, + "peak_memory_usage": 0.0, + "peak_gpu_memory_usage": 0.0 if self.enable_gpu_monitoring else None + } + + # Resource monitoring + self.resource_history: List[Dict[str, Any]] = [] + self.monitoring_thread: Optional[threading.Thread] = None + self.stop_monitoring = threading.Event() + + # Setup logging + self.logger = logging.getLogger(__name__) + + def start_session(self) -> None: + """Start a monitoring session.""" + self.current_session = { + "start_time": time.time(), + "total_audio_processed": 0.0, + "total_processing_time": 0.0, + "segments_processed": 0, + "peak_memory_usage": 0.0, + "peak_gpu_memory_usage": 0.0 if self.enable_gpu_monitoring else None + } + + # Start background resource monitoring + self.stop_monitoring.clear() + self.monitoring_thread = threading.Thread( + target=self._monitor_resources, + daemon=True + ) + self.monitoring_thread.start() + + self.logger.info("Performance monitoring session started") + + def stop_session(self) -> Dict[str, Any]: + """Stop monitoring session and return summary.""" + if self.monitoring_thread: + self.stop_monitoring.set() + self.monitoring_thread.join(timeout=1.0) + + session_duration = time.time() - (self.current_session["start_time"] or time.time()) + + summary = { + "session_duration": session_duration, + "total_audio_processed": self.current_session["total_audio_processed"], + "total_processing_time": self.current_session["total_processing_time"], + "segments_processed": self.current_session["segments_processed"], + "peak_memory_usage_gb": self.current_session["peak_memory_usage"], + "average_rtf": ( + self.current_session["total_audio_processed"] / + max(0.001, self.current_session["total_processing_time"]) + ), + "throughput_minutes_per_hour": ( + (self.current_session["total_audio_processed"] / 60) / + max(0.001, session_duration / 3600) + ) + } + + if self.enable_gpu_monitoring: + summary["peak_gpu_memory_usage_gb"] = self.current_session["peak_gpu_memory_usage"] + + self.logger.info(f"Performance monitoring session ended: RTF={summary['average_rtf']:.2f}") + return summary + + @contextmanager + def monitor_transcription( + self, + model_size: str, + device: str, + batch_size: int = 1 + ): + """ + Context manager for monitoring a transcription operation. + + Args: + model_size: Whisper model size + device: Processing device (cpu/cuda) + batch_size: Batch size used + """ + start_time = time.time() + start_memory = self._get_memory_usage() + start_gpu_memory = self._get_gpu_memory_usage() if self.enable_gpu_monitoring else None + + try: + yield self + finally: + end_time = time.time() + end_memory = self._get_memory_usage() + end_gpu_memory = self._get_gpu_memory_usage() if self.enable_gpu_monitoring else None + + processing_time = end_time - start_time + + # Create metrics (will be completed by record_transcription) + self._processing_start_time = start_time + self._processing_time = processing_time + self._memory_usage = max(start_memory, end_memory) + self._gpu_memory_usage = max(start_gpu_memory or 0, end_gpu_memory or 0) + self._model_size = model_size + self._device = device + self._batch_size = batch_size + + def record_transcription( + self, + audio_duration: float, + processing_time: Optional[float] = None, + model_size: Optional[str] = None, + device: Optional[str] = None, + batch_size: Optional[int] = None + ) -> PerformanceMetrics: + """ + Record metrics for a completed transcription. + + Args: + audio_duration: Duration of processed audio + processing_time: Time taken for processing + model_size: Model size used + device: Processing device + batch_size: Batch size used + + Returns: + PerformanceMetrics object + """ + # Use values from context manager if available + processing_time = processing_time or getattr(self, '_processing_time', 0.0) + model_size = model_size or getattr(self, '_model_size', 'unknown') + device = device or getattr(self, '_device', 'unknown') + batch_size = batch_size or getattr(self, '_batch_size', 1) + + # Calculate metrics + realtime_factor = audio_duration / max(0.001, processing_time) + cpu_usage = psutil.cpu_percent() + memory_usage = getattr(self, '_memory_usage', self._get_memory_usage()) + gpu_memory_usage = getattr(self, '_gpu_memory_usage', None) + + metrics = PerformanceMetrics( + processing_time_seconds=processing_time, + audio_duration_seconds=audio_duration, + realtime_factor=realtime_factor, + cpu_usage_percent=cpu_usage, + memory_usage_gb=memory_usage, + gpu_memory_usage_gb=gpu_memory_usage, + model_size=model_size, + device=device, + batch_size=batch_size, + timestamp=time.time() + ) + + # Add to history + self.metrics_history.append(metrics) + + # Update session stats + self.current_session["total_audio_processed"] += audio_duration + self.current_session["total_processing_time"] += processing_time + self.current_session["segments_processed"] += 1 + self.current_session["peak_memory_usage"] = max( + self.current_session["peak_memory_usage"], memory_usage + ) + + if gpu_memory_usage is not None: + self.current_session["peak_gpu_memory_usage"] = max( + self.current_session["peak_gpu_memory_usage"] or 0, gpu_memory_usage + ) + + # Clean up context manager attributes + for attr in ['_processing_start_time', '_processing_time', '_memory_usage', + '_gpu_memory_usage', '_model_size', '_device', '_batch_size']: + if hasattr(self, attr): + delattr(self, attr) + + return metrics + + def get_performance_summary(self, last_n: Optional[int] = None) -> Dict[str, Any]: + """ + Get performance summary statistics. + + Args: + last_n: Number of recent metrics to analyze (None for all) + + Returns: + Dictionary with performance statistics + """ + if not self.metrics_history: + return {"error": "No performance data available"} + + # Select metrics to analyze + metrics_to_analyze = list(self.metrics_history) + if last_n is not None: + metrics_to_analyze = metrics_to_analyze[-last_n:] + + # Calculate statistics + rtf_values = [m.realtime_factor for m in metrics_to_analyze] + processing_times = [m.processing_time_seconds for m in metrics_to_analyze] + audio_durations = [m.audio_duration_seconds for m in metrics_to_analyze] + memory_usage = [m.memory_usage_gb for m in metrics_to_analyze] + + # GPU memory (if available) + gpu_memory = [m.gpu_memory_usage_gb for m in metrics_to_analyze if m.gpu_memory_usage_gb is not None] + + summary = { + "total_samples": len(metrics_to_analyze), + "performance": { + "average_rtf": sum(rtf_values) / len(rtf_values), + "median_rtf": sorted(rtf_values)[len(rtf_values) // 2], + "min_rtf": min(rtf_values), + "max_rtf": max(rtf_values), + "average_processing_time": sum(processing_times) / len(processing_times), + "total_audio_processed": sum(audio_durations), + "total_processing_time": sum(processing_times) + }, + "resources": { + "average_memory_usage_gb": sum(memory_usage) / len(memory_usage), + "peak_memory_usage_gb": max(memory_usage), + "min_memory_usage_gb": min(memory_usage) + } + } + + if gpu_memory: + summary["resources"]["average_gpu_memory_gb"] = sum(gpu_memory) / len(gpu_memory) + summary["resources"]["peak_gpu_memory_gb"] = max(gpu_memory) + + # Add model/device breakdown + model_stats = defaultdict(list) + device_stats = defaultdict(list) + + for metric in metrics_to_analyze: + model_stats[metric.model_size].append(metric.realtime_factor) + device_stats[metric.device].append(metric.realtime_factor) + + summary["breakdown"] = { + "by_model": { + model: { + "count": len(rtfs), + "average_rtf": sum(rtfs) / len(rtfs), + "median_rtf": sorted(rtfs)[len(rtfs) // 2] + } + for model, rtfs in model_stats.items() + }, + "by_device": { + device: { + "count": len(rtfs), + "average_rtf": sum(rtfs) / len(rtfs), + "median_rtf": sorted(rtfs)[len(rtfs) // 2] + } + for device, rtfs in device_stats.items() + } + } + + return summary + + def get_optimization_recommendations(self) -> List[str]: + """Get optimization recommendations based on performance history.""" + if not self.metrics_history: + return ["No performance data available for recommendations"] + + recommendations = [] + recent_metrics = list(self.metrics_history)[-20:] # Last 20 measurements + + # Analyze RTF performance + avg_rtf = sum(m.realtime_factor for m in recent_metrics) / len(recent_metrics) + + if avg_rtf < 0.5: + recommendations.append("Consider using a smaller Whisper model (e.g., base instead of large)") + recommendations.append("Enable GPU acceleration if available") + recommendations.append("Reduce audio chunk size to lower memory usage") + recommendations.append("Close other applications to free system resources") + + elif avg_rtf < 1.0: + recommendations.append("Performance is below real-time. Consider GPU acceleration") + recommendations.append("Monitor system resources for bottlenecks") + + elif avg_rtf > 5.0: + recommendations.append("Performance is excellent! Consider using a larger model for better accuracy") + recommendations.append("You can increase chunk size for more efficient processing") + + # Memory analysis + avg_memory = sum(m.memory_usage_gb for m in recent_metrics) / len(recent_metrics) + + if avg_memory > 8.0: + recommendations.append("High memory usage detected. Consider smaller chunk sizes") + recommendations.append("Enable memory cleanup between segments") + + # GPU memory analysis (if available) + gpu_metrics = [m for m in recent_metrics if m.gpu_memory_usage_gb is not None] + if gpu_metrics: + avg_gpu_memory = sum(m.gpu_memory_usage_gb for m in gpu_metrics) / len(gpu_metrics) + if avg_gpu_memory > 4.0: + recommendations.append("High GPU memory usage. Consider using fp16 precision") + recommendations.append("Reduce batch size or use smaller model") + + # Device-specific recommendations + device_usage = defaultdict(int) + for metric in recent_metrics: + device_usage[metric.device] += 1 + + if device_usage.get('cpu', 0) > device_usage.get('cuda', 0): + if TORCH_AVAILABLE and torch.cuda.is_available(): + recommendations.append("GPU is available but not being used. Enable GPU acceleration") + + # Consistency analysis + rtf_variance = np.var([m.realtime_factor for m in recent_metrics]) if NUMPY_AVAILABLE else 0 + if rtf_variance > 1.0: + recommendations.append("Performance is inconsistent. Check for background processes") + recommendations.append("Consider using fixed chunk sizes for more predictable performance") + + return recommendations or ["Performance looks good! No specific recommendations."] + + def export_metrics(self, filepath: str, format: str = "json") -> None: + """ + Export performance metrics to file. + + Args: + filepath: Path to output file + format: Export format ("json", "csv") + """ + if format.lower() == "json": + self._export_json(filepath) + elif format.lower() == "csv": + self._export_csv(filepath) + else: + raise ValueError(f"Unsupported export format: {format}") + + def _export_json(self, filepath: str) -> None: + """Export metrics as JSON.""" + data = { + "session_info": self.current_session, + "metrics": [asdict(metric) for metric in self.metrics_history], + "summary": self.get_performance_summary(), + "recommendations": self.get_optimization_recommendations(), + "export_timestamp": time.time() + } + + with open(filepath, 'w') as f: + json.dump(data, f, indent=2) + + self.logger.info(f"Exported {len(self.metrics_history)} metrics to {filepath}") + + def _export_csv(self, filepath: str) -> None: + """Export metrics as CSV.""" + import csv + + with open(filepath, 'w', newline='') as f: + if not self.metrics_history: + return + + writer = csv.DictWriter(f, fieldnames=asdict(self.metrics_history[0]).keys()) + writer.writeheader() + + for metric in self.metrics_history: + writer.writerow(asdict(metric)) + + self.logger.info(f"Exported {len(self.metrics_history)} metrics to {filepath}") + + def _monitor_resources(self) -> None: + """Background thread for monitoring system resources.""" + while not self.stop_monitoring.wait(self.sampling_interval): + try: + resource_sample = { + "timestamp": time.time(), + "cpu_percent": psutil.cpu_percent(), + "memory_percent": psutil.virtual_memory().percent, + "memory_gb": psutil.virtual_memory().used / (1024**3) + } + + if self.enable_gpu_monitoring: + gpu_memory = self._get_gpu_memory_usage() + if gpu_memory is not None: + resource_sample["gpu_memory_gb"] = gpu_memory + + self.resource_history.append(resource_sample) + + # Keep only recent history (last hour) + cutoff_time = time.time() - 3600 + self.resource_history = [ + sample for sample in self.resource_history + if sample["timestamp"] > cutoff_time + ] + + except Exception as e: + self.logger.warning(f"Error in resource monitoring: {e}") + + def _get_memory_usage(self) -> float: + """Get current memory usage in GB.""" + return psutil.virtual_memory().used / (1024**3) + + def _get_gpu_memory_usage(self) -> Optional[float]: + """Get current GPU memory usage in GB.""" + if not self.enable_gpu_monitoring: + return None + + try: + return torch.cuda.memory_allocated() / (1024**3) + except Exception: + return None + + +class BenchmarkRunner: + """ + Comprehensive benchmarking suite for Whisper models. + + Provides standardized benchmarks for comparing performance across + different models, devices, and configurations. + """ + + def __init__(self, performance_monitor: Optional[PerformanceMonitor] = None): + """Initialize benchmark runner.""" + self.performance_monitor = performance_monitor or PerformanceMonitor() + self.logger = logging.getLogger(__name__) + + def benchmark_model( + self, + model_name: str, + device: str = "auto", + test_audio_duration: float = 60.0, + num_runs: int = 3, + warmup_runs: int = 1, + custom_config: Optional[Dict[str, Any]] = None + ) -> BenchmarkResult: + """ + Benchmark a specific Whisper model configuration. + + Args: + model_name: Whisper model to benchmark + device: Device for testing ("cpu", "cuda", "auto") + test_audio_duration: Duration of test audio + num_runs: Number of benchmark runs + warmup_runs: Number of warmup runs + custom_config: Custom configuration parameters + + Returns: + BenchmarkResult with performance metrics + """ + self.logger.info(f"Starting benchmark: {model_name} on {device}") + + try: + # Load model + import whisper + model = whisper.load_model(model_name, device=device) + + # Generate test audio + test_audio = self._generate_test_audio(test_audio_duration) + + # Configuration + config = { + "temperature": 0.0, + "language": None, + "task": "transcribe" + } + if custom_config: + config.update(custom_config) + + # Warmup runs + self.logger.info(f"Running {warmup_runs} warmup iterations...") + for _ in range(warmup_runs): + model.transcribe(test_audio, **config) + if TORCH_AVAILABLE and torch.cuda.is_available(): + torch.cuda.empty_cache() + + # Benchmark runs + self.logger.info(f"Running {num_runs} benchmark iterations...") + results = [] + + for run in range(num_runs): + self.performance_monitor.start_session() + + with self.performance_monitor.monitor_transcription( + model_name, device, batch_size=1 + ): + start_time = time.time() + result = model.transcribe(test_audio, **config) + end_time = time.time() + + processing_time = end_time - start_time + + # Record metrics + metrics = self.performance_monitor.record_transcription( + audio_duration=test_audio_duration, + processing_time=processing_time, + model_size=model_name, + device=device + ) + + session_summary = self.performance_monitor.stop_session() + results.append((metrics, session_summary, result)) + + self.logger.info(f"Run {run + 1}/{num_runs}: RTF={metrics.realtime_factor:.2f}") + + # Clean up between runs + if TORCH_AVAILABLE and torch.cuda.is_available(): + torch.cuda.empty_cache() + + # Calculate aggregate results + processing_times = [r[0].processing_time_seconds for r in results] + rtf_values = [r[0].realtime_factor for r in results] + memory_peaks = [r[1]["peak_memory_usage_gb"] for r in results] + + # System info + system_info = self._get_system_info() + + benchmark_result = BenchmarkResult( + model_name=model_name, + device=device, + audio_duration=test_audio_duration, + processing_time=sum(processing_times) / len(processing_times), + realtime_factor=sum(rtf_values) / len(rtf_values), + memory_peak_gb=max(memory_peaks), + gpu_memory_peak_gb=max([ + r[1].get("peak_gpu_memory_usage_gb", 0) or 0 for r in results + ]) if TORCH_AVAILABLE else None, + accuracy_score=None, # Would need reference transcription + configuration=config, + system_info=system_info + ) + + self.logger.info(f"Benchmark completed: RTF={benchmark_result.realtime_factor:.2f}") + return benchmark_result + + except Exception as e: + self.logger.error(f"Benchmark failed: {e}") + raise + + def compare_models( + self, + model_names: List[str], + device: str = "auto", + test_audio_duration: float = 60.0 + ) -> Dict[str, BenchmarkResult]: + """ + Compare multiple models on the same test conditions. + + Args: + model_names: List of model names to compare + device: Device for testing + test_audio_duration: Duration of test audio + + Returns: + Dictionary mapping model names to benchmark results + """ + results = {} + + for model_name in model_names: + try: + self.logger.info(f"Benchmarking {model_name}...") + results[model_name] = self.benchmark_model( + model_name, device, test_audio_duration + ) + except Exception as e: + self.logger.error(f"Failed to benchmark {model_name}: {e}") + results[model_name] = None + + return results + + def _generate_test_audio(self, duration: float) -> np.ndarray: + """Generate synthetic test audio.""" + sample_rate = 16000 + samples = int(duration * sample_rate) + + # Generate speech-like audio with varying frequencies + t = np.linspace(0, duration, samples) + frequencies = 440 + 200 * np.sin(2 * np.pi * 0.5 * t) # Varying pitch + audio = 0.3 * np.sin(2 * np.pi * frequencies * t) + + # Add some noise to make it more realistic + noise = 0.05 * np.random.randn(samples) + audio = audio + noise + + return audio.astype(np.float32) + + def _get_system_info(self) -> Dict[str, Any]: + """Get system information for benchmark context.""" + info = { + "cpu_count": psutil.cpu_count(), + "cpu_freq": psutil.cpu_freq()._asdict() if psutil.cpu_freq() else None, + "memory_total_gb": psutil.virtual_memory().total / (1024**3), + "python_version": f"{__import__('sys').version_info.major}.{__import__('sys').version_info.minor}", + } + + if TORCH_AVAILABLE and torch.cuda.is_available(): + info["gpu_count"] = torch.cuda.device_count() + info["gpu_name"] = torch.cuda.get_device_name(0) if torch.cuda.device_count() > 0 else None + info["cuda_version"] = torch.version.cuda + + return info \ No newline at end of file diff --git a/whisper/transcribe.py b/whisper/transcribe.py index 0a4cc36..4e37dff 100644 --- a/whisper/transcribe.py +++ b/whisper/transcribe.py @@ -34,6 +34,15 @@ from .utils import ( if TYPE_CHECKING: from .model import Whisper +# Memory optimization imports +try: + from .optimization.memory_manager import MemoryManager + from .optimization.chunk_processor import ChunkProcessor + from .optimization.performance_monitor import PerformanceMonitor + OPTIMIZATION_AVAILABLE = True +except ImportError: + OPTIMIZATION_AVAILABLE = False + def transcribe( model: "Whisper", @@ -52,6 +61,12 @@ def transcribe( append_punctuations: str = "\"'.。,,!!??::”)]}、", clip_timestamps: Union[str, List[float]] = "0", hallucination_silence_threshold: Optional[float] = None, + # Memory optimization parameters + enable_memory_optimization: bool = False, + memory_optimization_mode: str = "adaptive", # "adaptive", "aggressive", "conservative" + auto_chunk_large_files: bool = True, + max_memory_usage_gb: Optional[float] = None, + enable_performance_monitoring: bool = False, **decode_options, ): """ @@ -119,6 +134,26 @@ def transcribe( When word_timestamps is True, skip silent periods longer than this threshold (in seconds) when a possible hallucination is detected + enable_memory_optimization: bool + Enable automatic memory management and optimization features. + Helps prevent out-of-memory errors for large audio files. + + memory_optimization_mode: str + Memory optimization strategy: "adaptive" (default), "aggressive", or "conservative". + Adaptive mode balances performance and memory usage based on system resources. + + auto_chunk_large_files: bool + Automatically chunk large audio files to fit in available memory. + Uses intelligent chunking with overlap handling. + + max_memory_usage_gb: Optional[float] + Maximum memory usage limit in GB. If None, uses system-dependent defaults. + Helps prevent system overload during processing. + + enable_performance_monitoring: bool + Enable real-time performance monitoring and optimization recommendations. + Provides detailed metrics on processing speed and resource usage. + Returns ------- A dictionary containing the resulting text ("text") and segment-level details ("segments"), and @@ -135,6 +170,70 @@ def transcribe( if dtype == torch.float32: decode_options["fp16"] = False + # Initialize memory optimization if enabled + memory_manager = None + performance_monitor = None + chunk_processor = None + + if enable_memory_optimization and OPTIMIZATION_AVAILABLE: + # Initialize memory manager + memory_manager = MemoryManager( + enable_automatic_cleanup=(memory_optimization_mode in ["adaptive", "aggressive"]) + ) + + # Initialize performance monitoring + if enable_performance_monitoring: + performance_monitor = PerformanceMonitor() + performance_monitor.start_session() + + # Check if we should use chunk processing for large files + if auto_chunk_large_files and isinstance(audio, str): + try: + import whisper + audio_data = whisper.load_audio(audio) + duration = len(audio_data) / SAMPLE_RATE + + # Use chunk processing for files longer than 5 minutes or if memory is limited + memory_info = memory_manager.get_cpu_memory_info() + should_chunk = (duration > 300.0 or + memory_info.usage_percent > 70 or + (max_memory_usage_gb and memory_info.used_gb > max_memory_usage_gb)) + + if should_chunk: + chunk_processor = ChunkProcessor( + memory_manager=memory_manager, + processing_mode={ + "conservative": "sequential", + "adaptive": "adaptive", + "aggressive": "parallel" + }.get(memory_optimization_mode, "adaptive") + ) + + # Use chunk processor for large files + model_size = getattr(model, 'model_name', 'base') + result = chunk_processor.process_audio_file( + audio, model, model_size, + language=decode_options.get("language"), + task=decode_options.get("task", "transcribe"), + **{k: v for k, v in decode_options.items() if k not in ["language", "task"]} + ) + + if performance_monitor: + session_summary = performance_monitor.stop_session() + result["performance_summary"] = session_summary + + return result + + except Exception as e: + if verbose: + warnings.warn(f"Memory optimization failed, falling back to standard processing: {e}") + + elif enable_memory_optimization and not OPTIMIZATION_AVAILABLE: + warnings.warn( + "Memory optimization was requested but optimization modules are not available. " + "Falling back to standard Whisper behavior." + ) + # Pad 30-seconds of silence to the input audio, for slicing mel = log_mel_spectrogram(audio, model.dims.n_mels, padding=N_SAMPLES) content_frames = mel.shape[-1] - N_FRAMES @@ -507,11 +606,34 @@ def transcribe( # update progress bar pbar.update(min(content_frames, seek) - previous_seek) - return dict( - text=tokenizer.decode(all_tokens[len(initial_prompt_tokens) :]), - segments=all_segments, - language=language, - ) + # Finalize memory optimization and performance monitoring + result_dict = { + "text": tokenizer.decode(all_tokens[len(initial_prompt_tokens) :]), + "segments": all_segments, + "language": language, + } + + # Add performance monitoring results if enabled + if performance_monitor: + try: + session_summary = performance_monitor.stop_session() + result_dict["performance_summary"] = session_summary + result_dict["optimization_recommendations"] = performance_monitor.get_optimization_recommendations() + except Exception as e: + if verbose: + warnings.warn(f"Performance monitoring failed: {e}") + + # Perform final memory cleanup if enabled + if memory_manager and enable_memory_optimization: + try: + cleanup_results = memory_manager.cleanup_memory(force=True) + if cleanup_results and verbose: + print(f"Memory cleanup freed: {cleanup_results}") + except Exception as e: + if verbose: + warnings.warn(f"Memory cleanup failed: {e}") + + return result_dict def cli(): @@ -564,6 +686,13 @@ def cli(): parser.add_argument("--threads", type=optional_int, default=0, help="number of threads used by torch for CPU inference; supercedes MKL_NUM_THREADS/OMP_NUM_THREADS") parser.add_argument("--clip_timestamps", type=str, default="0", help="comma-separated list start,end,start,end,... timestamps (in seconds) of clips to process, where the last end timestamp defaults to the end of the file") parser.add_argument("--hallucination_silence_threshold", type=optional_float, help="(requires --word_timestamps True) skip silent periods longer than this threshold (in seconds) when a possible hallucination is detected") + + # Memory optimization arguments + parser.add_argument("--enable_memory_optimization", type=str2bool, default=False, help="enable automatic memory management and GPU optimization") + parser.add_argument("--memory_optimization_mode", type=str, default="adaptive", choices=["adaptive", "aggressive", "conservative"], help="memory optimization strategy") + parser.add_argument("--auto_chunk_large_files", type=str2bool, default=True, help="automatically chunk large audio files to prevent memory issues") + parser.add_argument("--max_memory_usage_gb", type=optional_float, default=None, help="maximum memory usage limit in GB") + parser.add_argument("--enable_performance_monitoring", type=str2bool, default=False, help="enable performance monitoring and optimization recommendations") # fmt: on args = parser.parse_args().__dict__