Untitled
unknown
plain_text
10 months ago
25 kB
6
Indexable
import pyaudio
import numpy as np
import librosa
import pygame
import time
from hmmlearn import hmm
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense, Input
from sklearn.preprocessing import StandardScaler
from collections import deque
import scipy.signal as signal
import threading
import queue
pygame.init()
screen = pygame.display.set_mode((500, 500))
pygame.display.set_caption("Enhanced Lip-Sync Stickman")
clock = pygame.time.Clock()
MOUTH_SHAPES = {
"rest": "Neutral, lips together, relaxed position",
"open": "Mouth open as in 'ah' sound",
"wide": "Lips spread wide as in 'ee' sound",
"round": "Lips rounded as in 'oh' or 'oo' sound",
"teeth": "Teeth visible, as in 's', 'f', 't' sounds",
"plosive": "Lips pressed together as in 'p', 'b', 'm' sounds"
}
IDX_TO_MOUTH = {
0: 'rest',
1: 'open',
2: 'wide',
3: 'round',
4: 'teeth',
5: 'plosive'
}
# Mouth shape interpolation mappings
MOUTH_SHAPE_VALUES = {
'rest': {'jaw_position': 0, 'mouth_width': 40, 'mouth_height': 2, 'mouth_openness': 0},
'open': {'jaw_position': 15, 'mouth_width': 40, 'mouth_height': 25, 'mouth_openness': 0.8},
'wide': {'jaw_position': 5, 'mouth_width': 60, 'mouth_height': 10, 'mouth_openness': 0.5},
'round': {'jaw_position': 7, 'mouth_width': 40, 'mouth_height': 40, 'mouth_openness': 0.7},
'teeth': {'jaw_position': 10, 'mouth_width': 50, 'mouth_height': 15, 'mouth_openness': 0.6},
'plosive': {'jaw_position': 3, 'mouth_width': 50, 'mouth_height': 4, 'mouth_openness': 0.2}
}
# Animation state variables for smooth transitions
current_mouth_params = {
'jaw_position': 0,
'mouth_width': 40,
'mouth_height': 2,
'mouth_openness': 0,
'current_shape': 'rest'
}
# Queue for smoothing mouth shape predictions
mouth_shape_history = deque(maxlen=5)
for _ in range(5):
mouth_shape_history.append('rest')
# Create a queue for audio data
audio_queue = queue.Queue(maxsize=10)
# Flag to indicate if audio processing should continue
processing_active = True
def interpolate_mouth_shape(target_shape, transition_speed=0.3):
"""Smoothly transition mouth parameters toward the target shape"""
global current_mouth_params
# Get target parameters
target_params = MOUTH_SHAPE_VALUES[target_shape]
# Interpolate each parameter
for param in ['jaw_position', 'mouth_width', 'mouth_height', 'mouth_openness']:
current_value = current_mouth_params[param]
target_value = target_params[param]
# Linear interpolation
new_value = current_value + (target_value - current_value) * transition_speed
current_mouth_params[param] = new_value
# Update current shape
current_mouth_params['current_shape'] = target_shape
def get_smoothed_mouth_shape(predicted_shape):
"""Use a history-based approach to smooth out mouth shape predictions"""
# Add new prediction to history
mouth_shape_history.append(predicted_shape)
# Count occurrences of each shape in history
shape_counts = {}
for shape in mouth_shape_history:
if shape in shape_counts:
shape_counts[shape] += 1
else:
shape_counts[shape] = 1
# Find most common shape
smoothed_shape = max(shape_counts, key=shape_counts.get)
# If the predicted shape is very different, use a weighted approach
if smoothed_shape != predicted_shape:
# If new prediction appears at least twice in history, use it
if shape_counts.get(predicted_shape, 0) >= 2:
smoothed_shape = predicted_shape
return smoothed_shape
def draw_stickman_with_jaw():
"""Draw a stickman with different mouth shapes using interpolated parameters"""
screen.fill((255, 255, 255))
# Extract current parameters
jaw_position = current_mouth_params['jaw_position']
mouth_width = current_mouth_params['mouth_width']
mouth_height = current_mouth_params['mouth_height']
mouth_openness = current_mouth_params['mouth_openness']
current_shape = current_mouth_params['current_shape']
# Head - constant
pygame.draw.circle(screen, (0, 0, 0), (250, 150), 60, 2)
# Body and limbs - constant
pygame.draw.line(screen, (0, 0, 0), (250, 210), (250, 350), 2) # Body
pygame.draw.line(screen, (0, 0, 0), (250, 250), (200, 300), 2) # Left Arm
pygame.draw.line(screen, (0, 0, 0), (250, 250), (300, 300), 2) # Right Arm
pygame.draw.line(screen, (0, 0, 0), (250, 350), (200, 450), 2) # Left Leg
pygame.draw.line(screen, (0, 0, 0), (250, 350), (300, 450), 2) # Right Leg
# Eyes - constant
pygame.draw.circle(screen, (0, 0, 0), (230, 140), 5) # Left eye
pygame.draw.circle(screen, (0, 0, 0), (270, 140), 5) # Right eye
# Calculate mouth position based on current parameters
mouth_center_x = 250
mouth_center_y = 180 + jaw_position * 0.5 # Move down slightly with jaw
# Base mouth positioning
mouth_left = mouth_center_x - mouth_width / 2
mouth_top = mouth_center_y - mouth_height / 2
# Draw different mouth shapes based on the current shape but with interpolated parameters
if current_shape == 'rest':
# Neutral, closed mouth - just a line
pygame.draw.line(screen, (0, 0, 0),
(mouth_center_x - mouth_width / 2, mouth_center_y),
(mouth_center_x + mouth_width / 2, mouth_center_y), 2)
elif current_shape == 'open':
# Open mouth - ellipse
pygame.draw.ellipse(screen, (0, 0, 0),
(mouth_left, mouth_top, mouth_width, mouth_height), 2)
# Darker inside mouth
inner_width = max(5, mouth_width * 0.7)
inner_height = max(3, mouth_height * 0.7)
pygame.draw.ellipse(screen, (50, 50, 50),
(mouth_center_x - inner_width / 2,
mouth_center_y - inner_height / 2,
inner_width, inner_height))
elif current_shape == 'wide':
# Wide mouth - flat ellipse or arc
mouth_rect = pygame.Rect(mouth_left, mouth_top, mouth_width, mouth_height * 1.2)
pygame.draw.arc(screen, (0, 0, 0), mouth_rect, 0.2, 2.9, 2)
# Slight darkness inside
if mouth_height > 5:
inner_rect = pygame.Rect(mouth_left + 5, mouth_top + 2, mouth_width - 10, mouth_height - 4)
pygame.draw.arc(screen, (100, 100, 100), inner_rect, 0.3, 2.8, 2)
elif current_shape == 'round':
# Round mouth - circle - make sure this is visible
radius = min(30, max(12, (mouth_width + mouth_height) / 4)) # Ensure minimum visible size
pygame.draw.circle(screen, (0, 0, 0),
(mouth_center_x, mouth_center_y),
radius, 2)
# Darker inside for depth
if radius > 6:
pygame.draw.circle(screen, (80, 80, 80),
(mouth_center_x, mouth_center_y),
max(radius - 3, 5)) # Ensure inner circle is visible
elif current_shape == 'teeth':
# Teeth visible - rectangle with teeth lines
pygame.draw.rect(screen, (0, 0, 0),
(mouth_left, mouth_top, mouth_width, mouth_height), 2)
# Draw teeth based on mouth width
num_teeth = max(3, int(mouth_width / 8))
teeth_spacing = mouth_width / (num_teeth + 1)
for i in range(num_teeth):
tooth_x = mouth_left + (i + 1) * teeth_spacing
# Upper teeth
pygame.draw.line(screen, (0, 0, 0),
(tooth_x, mouth_top),
(tooth_x, mouth_top + mouth_height * 0.5), 2)
# Pink tongue
tongue_width = mouth_width * 0.6
pygame.draw.arc(screen, (255, 150, 150),
(mouth_center_x - tongue_width / 2,
mouth_center_y + mouth_height * 0.1,
tongue_width, mouth_height * 0.6),
0, 3.14, 2)
elif current_shape == 'plosive':
# Pressed lips
line_thickness = max(2, int(4)) # Ensure visible thickness
pygame.draw.line(screen, (0, 0, 0),
(mouth_left, mouth_center_y),
(mouth_left + mouth_width, mouth_center_y),
line_thickness)
# Bulge in middle for pressure indication
bulge_width = mouth_width * 0.3
pygame.draw.arc(screen, (0, 0, 0),
(mouth_center_x - bulge_width / 2,
mouth_center_y - 5, # Fixed position for visibility
bulge_width, 10), # Fixed size for visibility
3.14, 6.28, 2)
# Air burst lines for plosive - always show for plosive
if current_shape == 'plosive':
burst_color = (0, 0, 200) # Always full intensity
pygame.draw.line(screen, burst_color,
(mouth_left + mouth_width + 5, mouth_center_y - 2),
(mouth_left + mouth_width + 15, mouth_center_y - 5),
2)
pygame.draw.line(screen, burst_color,
(mouth_left + mouth_width + 5, mouth_center_y),
(mouth_left + mouth_width + 18, mouth_center_y),
2)
pygame.draw.line(screen, burst_color,
(mouth_left + mouth_width + 5, mouth_center_y + 2),
(mouth_left + mouth_width + 15, mouth_center_y + 5),
2)
# Display current mouth shape
font = pygame.font.SysFont('Arial', 20)
text = font.render(f"Mouth: {current_shape}", True, (0, 0, 0))
screen.blit(text, (20, 20))
# Display animation parameters
param_font = pygame.font.SysFont('Arial', 12)
param_text = param_font.render(
f"Jaw: {jaw_position:.1f}, Width: {mouth_width:.1f}, Height: {mouth_height:.1f}, Open: {mouth_openness:.1f}",
True, (100, 100, 100))
screen.blit(param_text, (20, 70))
# Show description of the current mouth shape
if current_shape in MOUTH_SHAPES:
small_font = pygame.font.SysFont('Arial', 16)
description = small_font.render(MOUTH_SHAPES.get(current_shape, ""), True, (0, 0, 0))
screen.blit(description, (20, 45))
# Show real-time audio status
status_font = pygame.font.SysFont('Arial', 12)
status_text = status_font.render(
f"Audio Buffer: {'Active' if not audio_queue.empty() else 'Waiting'} | Queue size: {audio_queue.qsize()}/10",
True, (0, 0, 200))
screen.blit(status_text, (20, 90))
pygame.display.flip()
def extract_features_with_windowing(audio_data, sr=22050):
"""Extract features using proper windowing techniques"""
if len(audio_data) == 0 or np.max(np.abs(audio_data)) < 0.01:
return np.zeros(8) # Extended feature set
try:
# Apply Hamming window to reduce spectral leakage
windowed_audio = audio_data * signal.windows.hamming(len(audio_data))
# Basic features
rms = np.sqrt(np.mean(np.square(windowed_audio)))
# Zero crossing rate with windowing
zero_crossings = np.sum(np.abs(np.diff(np.signbit(windowed_audio)))) / len(windowed_audio)
# Spectral features with windowing
# Using properly windowed FFT
fft_size = 2048 # Power of 2 for efficient FFT
if len(windowed_audio) < fft_size:
# Zero-pad if necessary
padded_audio = np.pad(windowed_audio, (0, fft_size - len(windowed_audio)))
else:
padded_audio = windowed_audio[:fft_size]
spec = np.abs(np.fft.rfft(padded_audio))
freqs = np.fft.rfftfreq(len(padded_audio), 1 / sr)
# Spectral centroid (brightness)
if len(spec) > 0 and np.sum(spec) > 0:
spectral_centroid = np.sum(freqs * spec) / np.sum(spec)
else:
spectral_centroid = 0
# Spectral rolloff
if len(spec) > 0 and np.sum(spec) > 0:
cumsum = np.cumsum(spec)
spectral_rolloff = freqs[np.argmax(cumsum >= 0.85 * cumsum[-1])]
else:
spectral_rolloff = 0
# Spectral flatness (tonality vs. noise)
if len(spec) > 0 and np.prod(spec) > 0:
geometric_mean = np.exp(np.mean(np.log(spec + 1e-10)))
arithmetic_mean = np.mean(spec)
spectral_flatness = geometric_mean / arithmetic_mean
else:
spectral_flatness = 0
# Energy in different frequency bands with better frequency resolution
if len(spec) > 10:
# Get frequency band indices
# Low (< 500 Hz), Mid (500-2000 Hz), High (> 2000 Hz)
low_idx = np.sum(freqs < 500)
mid_idx = np.sum(freqs < 2000)
# Calculate energy in each band
low_energy = np.sum(spec[:low_idx]) / np.sum(spec) if np.sum(spec) > 0 else 0
mid_energy = np.sum(spec[low_idx:mid_idx]) / np.sum(spec) if np.sum(spec) > 0 else 0
high_energy = np.sum(spec[mid_idx:]) / np.sum(spec) if np.sum(spec) > 0 else 0
else:
low_energy = mid_energy = high_energy = 0
# Combine all features
features = np.array([
rms, # Volume
zero_crossings, # Frequency indicator
spectral_centroid / 1000, # Scaled centroid
spectral_rolloff / 1000, # Scaled rolloff
spectral_flatness, # Tonality vs. noise
low_energy, # Low freq energy
mid_energy, # Mid freq energy
high_energy # High freq energy
])
return features
except Exception as e:
print(f"Feature extraction error: {e}")
return np.zeros(8) # Return zeros for all features
def train_hmm_model(training_data, lengths=None, n_components=6):
"""Train and return a GaussianHMM model"""
if lengths is None:
lengths = [len(training_data)]
model = hmm.GaussianHMM(
n_components=n_components,
covariance_type="diag",
n_iter=100,
random_state=42
)
model.fit(training_data, lengths)
return model
def create_rnn_model(input_shape, num_classes=6):
"""Create a simple RNN model for phoneme recognition"""
model = Sequential([
Input(shape=input_shape),
SimpleRNN(16, activation='relu'),
Dense(16, activation='relu'),
Dense(num_classes, activation='softmax')
])
model.compile(
optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy']
)
return model
def generate_training_data(n_samples=1000, n_features=8):
"""Generate synthetic training data for mouth shapes with windowed features"""
X_train = []
y_train = []
# Different characteristics for each mouth shape - expanded for new features
characteristics = {
# rest: low everything
0: {'rms': 0.1, 'zcr': 0.2, 'centroid': 0.2, 'rolloff': 0.2,
'flatness': 0.8, 'low': 0.5, 'mid': 0.3, 'high': 0.1},
# open: high rms, medium centroid
1: {'rms': 0.8, 'zcr': 0.4, 'centroid': 0.5, 'rolloff': 0.4,
'flatness': 0.4, 'low': 0.7, 'mid': 0.5, 'high': 0.3},
# wide: high centroid, high high-energy
2: {'rms': 0.6, 'zcr': 0.7, 'centroid': 0.8, 'rolloff': 0.7,
'flatness': 0.3, 'low': 0.3, 'mid': 0.5, 'high': 0.8},
# round: medium rms, low centroid
3: {'rms': 0.5, 'zcr': 0.3, 'centroid': 0.3, 'rolloff': 0.4,
'flatness': 0.5, 'low': 0.8, 'mid': 0.4, 'high': 0.2},
# teeth: high zcr, high centroid, high high-energy
4: {'rms': 0.4, 'zcr': 0.9, 'centroid': 0.9, 'rolloff': 0.8,
'flatness': 0.7, 'low': 0.2, 'mid': 0.4, 'high': 0.9},
# plosive: medium zcr, medium rms
5: {'rms': 0.5, 'zcr': 0.5, 'centroid': 0.4, 'rolloff': 0.5,
'flatness': 0.6, 'low': 0.5, 'mid': 0.5, 'high': 0.5}
}
# Generate data for each class
for class_idx in range(6):
char = characteristics[class_idx]
# Base values for this class
base = np.array([
char['rms'],
char['zcr'],
char['centroid'],
char['rolloff'],
char['flatness'],
char['low'],
char['mid'],
char['high']
])
# Generate samples with noise
class_samples = np.random.normal(
loc=base,
scale=0.1, # Add some noise
size=(n_samples // 6, n_features)
)
X_train.append(class_samples)
y_train.extend([class_idx] * (n_samples // 6))
# Combine all data
X_train = np.vstack(X_train)
y_train = np.array(y_train)
# Shuffle data
indices = np.arange(len(y_train))
np.random.shuffle(indices)
X_train = X_train[indices]
y_train = y_train[indices]
return X_train, y_train
def classify_phoneme(features, hmm_model, rnn_model, scaler=None):
"""Classify audio features into mouth shape using both models"""
# Check for silence
if np.sum(np.abs(features)) < 0.1:
return 'rest'
# Scale features if scaler is provided
if scaler is not None:
features = scaler.transform(features.reshape(1, -1))
else:
# Just reshape
features = features.reshape(1, -1)
# For RNN, reshape to (samples, timesteps, features)
rnn_features = features.reshape(1, features.shape[1], 1)
# Get predictions from both models
try:
hmm_pred = hmm_model.predict(features)[0]
rnn_pred = np.argmax(rnn_model.predict(rnn_features, verbose=0), axis=1)[0]
# Combine both predictions with weighting
# Giving more weight to RNN prediction
hmm_weight = 0.3
rnn_weight = 0.7
# Simple weighted voting
if hmm_pred == rnn_pred:
final_prediction = hmm_pred
else:
# If they disagree, use the RNN prediction more often
if np.random.random() < rnn_weight:
final_prediction = rnn_pred
else:
final_prediction = hmm_pred
return IDX_TO_MOUTH.get(final_prediction, 'rest')
except Exception as e:
print(f"Prediction error: {e}")
return 'rest'
def audio_capture_thread(chunk_size, format_type, channels, rate, stream):
"""Thread function to continuously capture audio data"""
global processing_active
audio_buffer = np.zeros(chunk_size * 2, dtype=np.float32)
while processing_active:
try:
# Read audio data
data = np.frombuffer(stream.read(chunk_size, exception_on_overflow=False), dtype=np.int16)
# Check if we have actual sound
if np.max(np.abs(data)) > 100:
# Convert to float and normalize
audio_float = data.astype(np.float32) / np.max(np.abs(data))
# Shift buffer and add new data (implementing overlap)
audio_buffer[:-chunk_size] = audio_buffer[chunk_size:]
audio_buffer[-chunk_size:] = audio_float
# Try to add to queue if not full
try:
if not audio_queue.full():
audio_queue.put(audio_buffer.copy(), block=False)
except queue.Full:
# If queue is full, just continue
pass
# Small sleep to prevent CPU hogging
time.sleep(0.01)
except Exception as e:
print(f"Error in audio capture thread: {e}")
time.sleep(0.1) # Sleep a bit longer on error
print("Audio capture thread stopped")
def audio_stream():
"""Main function to process audio and animate stickman"""
global processing_active
CHUNK = 1024 # Smaller chunk for more responsive updates
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 22050
# Initialize PyAudio
p = pyaudio.PyAudio()
stream = p.open(
format=FORMAT,
channels=CHANNELS,
rate=RATE,
input=True,
frames_per_buffer=CHUNK
)
# Generate training data and train models with expanded feature set
print("Generating training data...")
X_train, y_train = generate_training_data(n_samples=1800, n_features=8)
# Create a scaler to normalize the features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
# Convert labels to one-hot encoding
y_train_onehot = tf.keras.utils.to_categorical(y_train, num_classes=6)
# Create and train the RNN model
print("Training RNN model...")
input_shape = (X_train_scaled.shape[1], 1) # (features, timesteps)
rnn_model = create_rnn_model(input_shape, num_classes=6)
# Train the model with fewer epochs for testing
rnn_model.fit(
X_train_scaled.reshape(-1, X_train_scaled.shape[1], 1),
y_train_onehot,
epochs=5,
batch_size=32,
verbose=1
)
# Train HMM model
print("Training HMM model...")
hmm_model = train_hmm_model(X_train_scaled, n_components=6)
# Start the audio capture thread
capture_thread = threading.Thread(
target=audio_capture_thread,
args=(CHUNK, FORMAT, CHANNELS, RATE, stream),
daemon=True
)
capture_thread.start()
print("Starting audio stream. Press Q to quit.")
running = True
frame_counter = 0
last_process_time = time.time()
processing_interval = 0.05 # Process audio every 50ms for more responsive updates
while running:
# Check for quit events
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
if event.type == pygame.KEYDOWN:
if event.key == pygame.K_q:
running = False
# Process audio data if available and enough time has passed
current_time = time.time()
if current_time - last_process_time >= processing_interval:
try:
# Get audio data from queue if available
try:
audio_buffer = audio_queue.get(block=False)
# Process with overlapping windows
features = extract_features_with_windowing(audio_buffer, RATE)
# Classify and get mouth shape
raw_mouth_shape = classify_phoneme(features, hmm_model, rnn_model, scaler)
# Apply smoothing to the mouth shape
smoothed_mouth_shape = get_smoothed_mouth_shape(raw_mouth_shape)
# Interpolate toward the target mouth shape
interpolate_mouth_shape(smoothed_mouth_shape)
# Debug output
if frame_counter % 10 == 0:
print(f"Raw: {raw_mouth_shape}, Smoothed: {smoothed_mouth_shape}")
except queue.Empty:
# If no audio data, gradually transition to rest position
interpolate_mouth_shape('rest', transition_speed=0.1)
except Exception as e:
print(f"Error in audio processing: {e}")
interpolate_mouth_shape('rest')
last_process_time = current_time
# Draw the stickman with the current interpolated mouth shape
draw_stickman_with_jaw()
# Increment frame counter
frame_counter += 1
# Cap the frame rate
clock.tick(60) # Increased frame rate for smoother animation
# Clean up
processing_active = False
capture_thread.join(timeout=1.0)
stream.stop_stream()
stream.close()
p.terminate()
pygame.quit()
print("Program ended.")
if __name__ == "__main__":
print("Starting enhanced lip-sync program...")
audio_stream()Editor is loading...
Leave a Comment