Untitled
unknown
plain_text
17 days ago
25 kB
2
Indexable
import pyaudio import numpy as np import librosa import pygame import time from hmmlearn import hmm import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import SimpleRNN, Dense, Input from sklearn.preprocessing import StandardScaler from collections import deque import scipy.signal as signal import threading import queue pygame.init() screen = pygame.display.set_mode((500, 500)) pygame.display.set_caption("Enhanced Lip-Sync Stickman") clock = pygame.time.Clock() MOUTH_SHAPES = { "rest": "Neutral, lips together, relaxed position", "open": "Mouth open as in 'ah' sound", "wide": "Lips spread wide as in 'ee' sound", "round": "Lips rounded as in 'oh' or 'oo' sound", "teeth": "Teeth visible, as in 's', 'f', 't' sounds", "plosive": "Lips pressed together as in 'p', 'b', 'm' sounds" } IDX_TO_MOUTH = { 0: 'rest', 1: 'open', 2: 'wide', 3: 'round', 4: 'teeth', 5: 'plosive' } # Mouth shape interpolation mappings MOUTH_SHAPE_VALUES = { 'rest': {'jaw_position': 0, 'mouth_width': 40, 'mouth_height': 2, 'mouth_openness': 0}, 'open': {'jaw_position': 15, 'mouth_width': 40, 'mouth_height': 25, 'mouth_openness': 0.8}, 'wide': {'jaw_position': 5, 'mouth_width': 60, 'mouth_height': 10, 'mouth_openness': 0.5}, 'round': {'jaw_position': 7, 'mouth_width': 40, 'mouth_height': 40, 'mouth_openness': 0.7}, 'teeth': {'jaw_position': 10, 'mouth_width': 50, 'mouth_height': 15, 'mouth_openness': 0.6}, 'plosive': {'jaw_position': 3, 'mouth_width': 50, 'mouth_height': 4, 'mouth_openness': 0.2} } # Animation state variables for smooth transitions current_mouth_params = { 'jaw_position': 0, 'mouth_width': 40, 'mouth_height': 2, 'mouth_openness': 0, 'current_shape': 'rest' } # Queue for smoothing mouth shape predictions mouth_shape_history = deque(maxlen=5) for _ in range(5): mouth_shape_history.append('rest') # Create a queue for audio data audio_queue = queue.Queue(maxsize=10) # Flag to indicate if audio processing should continue processing_active = True def interpolate_mouth_shape(target_shape, transition_speed=0.3): """Smoothly transition mouth parameters toward the target shape""" global current_mouth_params # Get target parameters target_params = MOUTH_SHAPE_VALUES[target_shape] # Interpolate each parameter for param in ['jaw_position', 'mouth_width', 'mouth_height', 'mouth_openness']: current_value = current_mouth_params[param] target_value = target_params[param] # Linear interpolation new_value = current_value + (target_value - current_value) * transition_speed current_mouth_params[param] = new_value # Update current shape current_mouth_params['current_shape'] = target_shape def get_smoothed_mouth_shape(predicted_shape): """Use a history-based approach to smooth out mouth shape predictions""" # Add new prediction to history mouth_shape_history.append(predicted_shape) # Count occurrences of each shape in history shape_counts = {} for shape in mouth_shape_history: if shape in shape_counts: shape_counts[shape] += 1 else: shape_counts[shape] = 1 # Find most common shape smoothed_shape = max(shape_counts, key=shape_counts.get) # If the predicted shape is very different, use a weighted approach if smoothed_shape != predicted_shape: # If new prediction appears at least twice in history, use it if shape_counts.get(predicted_shape, 0) >= 2: smoothed_shape = predicted_shape return smoothed_shape def draw_stickman_with_jaw(): """Draw a stickman with different mouth shapes using interpolated parameters""" screen.fill((255, 255, 255)) # Extract current parameters jaw_position = current_mouth_params['jaw_position'] mouth_width = current_mouth_params['mouth_width'] mouth_height = current_mouth_params['mouth_height'] mouth_openness = current_mouth_params['mouth_openness'] current_shape = current_mouth_params['current_shape'] # Head - constant pygame.draw.circle(screen, (0, 0, 0), (250, 150), 60, 2) # Body and limbs - constant pygame.draw.line(screen, (0, 0, 0), (250, 210), (250, 350), 2) # Body pygame.draw.line(screen, (0, 0, 0), (250, 250), (200, 300), 2) # Left Arm pygame.draw.line(screen, (0, 0, 0), (250, 250), (300, 300), 2) # Right Arm pygame.draw.line(screen, (0, 0, 0), (250, 350), (200, 450), 2) # Left Leg pygame.draw.line(screen, (0, 0, 0), (250, 350), (300, 450), 2) # Right Leg # Eyes - constant pygame.draw.circle(screen, (0, 0, 0), (230, 140), 5) # Left eye pygame.draw.circle(screen, (0, 0, 0), (270, 140), 5) # Right eye # Calculate mouth position based on current parameters mouth_center_x = 250 mouth_center_y = 180 + jaw_position * 0.5 # Move down slightly with jaw # Base mouth positioning mouth_left = mouth_center_x - mouth_width / 2 mouth_top = mouth_center_y - mouth_height / 2 # Draw different mouth shapes based on the current shape but with interpolated parameters if current_shape == 'rest': # Neutral, closed mouth - just a line pygame.draw.line(screen, (0, 0, 0), (mouth_center_x - mouth_width / 2, mouth_center_y), (mouth_center_x + mouth_width / 2, mouth_center_y), 2) elif current_shape == 'open': # Open mouth - ellipse pygame.draw.ellipse(screen, (0, 0, 0), (mouth_left, mouth_top, mouth_width, mouth_height), 2) # Darker inside mouth inner_width = max(5, mouth_width * 0.7) inner_height = max(3, mouth_height * 0.7) pygame.draw.ellipse(screen, (50, 50, 50), (mouth_center_x - inner_width / 2, mouth_center_y - inner_height / 2, inner_width, inner_height)) elif current_shape == 'wide': # Wide mouth - flat ellipse or arc mouth_rect = pygame.Rect(mouth_left, mouth_top, mouth_width, mouth_height * 1.2) pygame.draw.arc(screen, (0, 0, 0), mouth_rect, 0.2, 2.9, 2) # Slight darkness inside if mouth_height > 5: inner_rect = pygame.Rect(mouth_left + 5, mouth_top + 2, mouth_width - 10, mouth_height - 4) pygame.draw.arc(screen, (100, 100, 100), inner_rect, 0.3, 2.8, 2) elif current_shape == 'round': # Round mouth - circle - make sure this is visible radius = min(30, max(12, (mouth_width + mouth_height) / 4)) # Ensure minimum visible size pygame.draw.circle(screen, (0, 0, 0), (mouth_center_x, mouth_center_y), radius, 2) # Darker inside for depth if radius > 6: pygame.draw.circle(screen, (80, 80, 80), (mouth_center_x, mouth_center_y), max(radius - 3, 5)) # Ensure inner circle is visible elif current_shape == 'teeth': # Teeth visible - rectangle with teeth lines pygame.draw.rect(screen, (0, 0, 0), (mouth_left, mouth_top, mouth_width, mouth_height), 2) # Draw teeth based on mouth width num_teeth = max(3, int(mouth_width / 8)) teeth_spacing = mouth_width / (num_teeth + 1) for i in range(num_teeth): tooth_x = mouth_left + (i + 1) * teeth_spacing # Upper teeth pygame.draw.line(screen, (0, 0, 0), (tooth_x, mouth_top), (tooth_x, mouth_top + mouth_height * 0.5), 2) # Pink tongue tongue_width = mouth_width * 0.6 pygame.draw.arc(screen, (255, 150, 150), (mouth_center_x - tongue_width / 2, mouth_center_y + mouth_height * 0.1, tongue_width, mouth_height * 0.6), 0, 3.14, 2) elif current_shape == 'plosive': # Pressed lips line_thickness = max(2, int(4)) # Ensure visible thickness pygame.draw.line(screen, (0, 0, 0), (mouth_left, mouth_center_y), (mouth_left + mouth_width, mouth_center_y), line_thickness) # Bulge in middle for pressure indication bulge_width = mouth_width * 0.3 pygame.draw.arc(screen, (0, 0, 0), (mouth_center_x - bulge_width / 2, mouth_center_y - 5, # Fixed position for visibility bulge_width, 10), # Fixed size for visibility 3.14, 6.28, 2) # Air burst lines for plosive - always show for plosive if current_shape == 'plosive': burst_color = (0, 0, 200) # Always full intensity pygame.draw.line(screen, burst_color, (mouth_left + mouth_width + 5, mouth_center_y - 2), (mouth_left + mouth_width + 15, mouth_center_y - 5), 2) pygame.draw.line(screen, burst_color, (mouth_left + mouth_width + 5, mouth_center_y), (mouth_left + mouth_width + 18, mouth_center_y), 2) pygame.draw.line(screen, burst_color, (mouth_left + mouth_width + 5, mouth_center_y + 2), (mouth_left + mouth_width + 15, mouth_center_y + 5), 2) # Display current mouth shape font = pygame.font.SysFont('Arial', 20) text = font.render(f"Mouth: {current_shape}", True, (0, 0, 0)) screen.blit(text, (20, 20)) # Display animation parameters param_font = pygame.font.SysFont('Arial', 12) param_text = param_font.render( f"Jaw: {jaw_position:.1f}, Width: {mouth_width:.1f}, Height: {mouth_height:.1f}, Open: {mouth_openness:.1f}", True, (100, 100, 100)) screen.blit(param_text, (20, 70)) # Show description of the current mouth shape if current_shape in MOUTH_SHAPES: small_font = pygame.font.SysFont('Arial', 16) description = small_font.render(MOUTH_SHAPES.get(current_shape, ""), True, (0, 0, 0)) screen.blit(description, (20, 45)) # Show real-time audio status status_font = pygame.font.SysFont('Arial', 12) status_text = status_font.render( f"Audio Buffer: {'Active' if not audio_queue.empty() else 'Waiting'} | Queue size: {audio_queue.qsize()}/10", True, (0, 0, 200)) screen.blit(status_text, (20, 90)) pygame.display.flip() def extract_features_with_windowing(audio_data, sr=22050): """Extract features using proper windowing techniques""" if len(audio_data) == 0 or np.max(np.abs(audio_data)) < 0.01: return np.zeros(8) # Extended feature set try: # Apply Hamming window to reduce spectral leakage windowed_audio = audio_data * signal.windows.hamming(len(audio_data)) # Basic features rms = np.sqrt(np.mean(np.square(windowed_audio))) # Zero crossing rate with windowing zero_crossings = np.sum(np.abs(np.diff(np.signbit(windowed_audio)))) / len(windowed_audio) # Spectral features with windowing # Using properly windowed FFT fft_size = 2048 # Power of 2 for efficient FFT if len(windowed_audio) < fft_size: # Zero-pad if necessary padded_audio = np.pad(windowed_audio, (0, fft_size - len(windowed_audio))) else: padded_audio = windowed_audio[:fft_size] spec = np.abs(np.fft.rfft(padded_audio)) freqs = np.fft.rfftfreq(len(padded_audio), 1 / sr) # Spectral centroid (brightness) if len(spec) > 0 and np.sum(spec) > 0: spectral_centroid = np.sum(freqs * spec) / np.sum(spec) else: spectral_centroid = 0 # Spectral rolloff if len(spec) > 0 and np.sum(spec) > 0: cumsum = np.cumsum(spec) spectral_rolloff = freqs[np.argmax(cumsum >= 0.85 * cumsum[-1])] else: spectral_rolloff = 0 # Spectral flatness (tonality vs. noise) if len(spec) > 0 and np.prod(spec) > 0: geometric_mean = np.exp(np.mean(np.log(spec + 1e-10))) arithmetic_mean = np.mean(spec) spectral_flatness = geometric_mean / arithmetic_mean else: spectral_flatness = 0 # Energy in different frequency bands with better frequency resolution if len(spec) > 10: # Get frequency band indices # Low (< 500 Hz), Mid (500-2000 Hz), High (> 2000 Hz) low_idx = np.sum(freqs < 500) mid_idx = np.sum(freqs < 2000) # Calculate energy in each band low_energy = np.sum(spec[:low_idx]) / np.sum(spec) if np.sum(spec) > 0 else 0 mid_energy = np.sum(spec[low_idx:mid_idx]) / np.sum(spec) if np.sum(spec) > 0 else 0 high_energy = np.sum(spec[mid_idx:]) / np.sum(spec) if np.sum(spec) > 0 else 0 else: low_energy = mid_energy = high_energy = 0 # Combine all features features = np.array([ rms, # Volume zero_crossings, # Frequency indicator spectral_centroid / 1000, # Scaled centroid spectral_rolloff / 1000, # Scaled rolloff spectral_flatness, # Tonality vs. noise low_energy, # Low freq energy mid_energy, # Mid freq energy high_energy # High freq energy ]) return features except Exception as e: print(f"Feature extraction error: {e}") return np.zeros(8) # Return zeros for all features def train_hmm_model(training_data, lengths=None, n_components=6): """Train and return a GaussianHMM model""" if lengths is None: lengths = [len(training_data)] model = hmm.GaussianHMM( n_components=n_components, covariance_type="diag", n_iter=100, random_state=42 ) model.fit(training_data, lengths) return model def create_rnn_model(input_shape, num_classes=6): """Create a simple RNN model for phoneme recognition""" model = Sequential([ Input(shape=input_shape), SimpleRNN(16, activation='relu'), Dense(16, activation='relu'), Dense(num_classes, activation='softmax') ]) model.compile( optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'] ) return model def generate_training_data(n_samples=1000, n_features=8): """Generate synthetic training data for mouth shapes with windowed features""" X_train = [] y_train = [] # Different characteristics for each mouth shape - expanded for new features characteristics = { # rest: low everything 0: {'rms': 0.1, 'zcr': 0.2, 'centroid': 0.2, 'rolloff': 0.2, 'flatness': 0.8, 'low': 0.5, 'mid': 0.3, 'high': 0.1}, # open: high rms, medium centroid 1: {'rms': 0.8, 'zcr': 0.4, 'centroid': 0.5, 'rolloff': 0.4, 'flatness': 0.4, 'low': 0.7, 'mid': 0.5, 'high': 0.3}, # wide: high centroid, high high-energy 2: {'rms': 0.6, 'zcr': 0.7, 'centroid': 0.8, 'rolloff': 0.7, 'flatness': 0.3, 'low': 0.3, 'mid': 0.5, 'high': 0.8}, # round: medium rms, low centroid 3: {'rms': 0.5, 'zcr': 0.3, 'centroid': 0.3, 'rolloff': 0.4, 'flatness': 0.5, 'low': 0.8, 'mid': 0.4, 'high': 0.2}, # teeth: high zcr, high centroid, high high-energy 4: {'rms': 0.4, 'zcr': 0.9, 'centroid': 0.9, 'rolloff': 0.8, 'flatness': 0.7, 'low': 0.2, 'mid': 0.4, 'high': 0.9}, # plosive: medium zcr, medium rms 5: {'rms': 0.5, 'zcr': 0.5, 'centroid': 0.4, 'rolloff': 0.5, 'flatness': 0.6, 'low': 0.5, 'mid': 0.5, 'high': 0.5} } # Generate data for each class for class_idx in range(6): char = characteristics[class_idx] # Base values for this class base = np.array([ char['rms'], char['zcr'], char['centroid'], char['rolloff'], char['flatness'], char['low'], char['mid'], char['high'] ]) # Generate samples with noise class_samples = np.random.normal( loc=base, scale=0.1, # Add some noise size=(n_samples // 6, n_features) ) X_train.append(class_samples) y_train.extend([class_idx] * (n_samples // 6)) # Combine all data X_train = np.vstack(X_train) y_train = np.array(y_train) # Shuffle data indices = np.arange(len(y_train)) np.random.shuffle(indices) X_train = X_train[indices] y_train = y_train[indices] return X_train, y_train def classify_phoneme(features, hmm_model, rnn_model, scaler=None): """Classify audio features into mouth shape using both models""" # Check for silence if np.sum(np.abs(features)) < 0.1: return 'rest' # Scale features if scaler is provided if scaler is not None: features = scaler.transform(features.reshape(1, -1)) else: # Just reshape features = features.reshape(1, -1) # For RNN, reshape to (samples, timesteps, features) rnn_features = features.reshape(1, features.shape[1], 1) # Get predictions from both models try: hmm_pred = hmm_model.predict(features)[0] rnn_pred = np.argmax(rnn_model.predict(rnn_features, verbose=0), axis=1)[0] # Combine both predictions with weighting # Giving more weight to RNN prediction hmm_weight = 0.3 rnn_weight = 0.7 # Simple weighted voting if hmm_pred == rnn_pred: final_prediction = hmm_pred else: # If they disagree, use the RNN prediction more often if np.random.random() < rnn_weight: final_prediction = rnn_pred else: final_prediction = hmm_pred return IDX_TO_MOUTH.get(final_prediction, 'rest') except Exception as e: print(f"Prediction error: {e}") return 'rest' def audio_capture_thread(chunk_size, format_type, channels, rate, stream): """Thread function to continuously capture audio data""" global processing_active audio_buffer = np.zeros(chunk_size * 2, dtype=np.float32) while processing_active: try: # Read audio data data = np.frombuffer(stream.read(chunk_size, exception_on_overflow=False), dtype=np.int16) # Check if we have actual sound if np.max(np.abs(data)) > 100: # Convert to float and normalize audio_float = data.astype(np.float32) / np.max(np.abs(data)) # Shift buffer and add new data (implementing overlap) audio_buffer[:-chunk_size] = audio_buffer[chunk_size:] audio_buffer[-chunk_size:] = audio_float # Try to add to queue if not full try: if not audio_queue.full(): audio_queue.put(audio_buffer.copy(), block=False) except queue.Full: # If queue is full, just continue pass # Small sleep to prevent CPU hogging time.sleep(0.01) except Exception as e: print(f"Error in audio capture thread: {e}") time.sleep(0.1) # Sleep a bit longer on error print("Audio capture thread stopped") def audio_stream(): """Main function to process audio and animate stickman""" global processing_active CHUNK = 1024 # Smaller chunk for more responsive updates FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 22050 # Initialize PyAudio p = pyaudio.PyAudio() stream = p.open( format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK ) # Generate training data and train models with expanded feature set print("Generating training data...") X_train, y_train = generate_training_data(n_samples=1800, n_features=8) # Create a scaler to normalize the features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) # Convert labels to one-hot encoding y_train_onehot = tf.keras.utils.to_categorical(y_train, num_classes=6) # Create and train the RNN model print("Training RNN model...") input_shape = (X_train_scaled.shape[1], 1) # (features, timesteps) rnn_model = create_rnn_model(input_shape, num_classes=6) # Train the model with fewer epochs for testing rnn_model.fit( X_train_scaled.reshape(-1, X_train_scaled.shape[1], 1), y_train_onehot, epochs=5, batch_size=32, verbose=1 ) # Train HMM model print("Training HMM model...") hmm_model = train_hmm_model(X_train_scaled, n_components=6) # Start the audio capture thread capture_thread = threading.Thread( target=audio_capture_thread, args=(CHUNK, FORMAT, CHANNELS, RATE, stream), daemon=True ) capture_thread.start() print("Starting audio stream. Press Q to quit.") running = True frame_counter = 0 last_process_time = time.time() processing_interval = 0.05 # Process audio every 50ms for more responsive updates while running: # Check for quit events for event in pygame.event.get(): if event.type == pygame.QUIT: running = False if event.type == pygame.KEYDOWN: if event.key == pygame.K_q: running = False # Process audio data if available and enough time has passed current_time = time.time() if current_time - last_process_time >= processing_interval: try: # Get audio data from queue if available try: audio_buffer = audio_queue.get(block=False) # Process with overlapping windows features = extract_features_with_windowing(audio_buffer, RATE) # Classify and get mouth shape raw_mouth_shape = classify_phoneme(features, hmm_model, rnn_model, scaler) # Apply smoothing to the mouth shape smoothed_mouth_shape = get_smoothed_mouth_shape(raw_mouth_shape) # Interpolate toward the target mouth shape interpolate_mouth_shape(smoothed_mouth_shape) # Debug output if frame_counter % 10 == 0: print(f"Raw: {raw_mouth_shape}, Smoothed: {smoothed_mouth_shape}") except queue.Empty: # If no audio data, gradually transition to rest position interpolate_mouth_shape('rest', transition_speed=0.1) except Exception as e: print(f"Error in audio processing: {e}") interpolate_mouth_shape('rest') last_process_time = current_time # Draw the stickman with the current interpolated mouth shape draw_stickman_with_jaw() # Increment frame counter frame_counter += 1 # Cap the frame rate clock.tick(60) # Increased frame rate for smoother animation # Clean up processing_active = False capture_thread.join(timeout=1.0) stream.stop_stream() stream.close() p.terminate() pygame.quit() print("Program ended.") if __name__ == "__main__": print("Starting enhanced lip-sync program...") audio_stream()
Editor is loading...
Leave a Comment