Untitled

 avatar
unknown
plain_text
17 days ago
25 kB
2
Indexable
import pyaudio
import numpy as np
import librosa
import pygame
import time
from hmmlearn import hmm
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import SimpleRNN, Dense, Input
from sklearn.preprocessing import StandardScaler
from collections import deque
import scipy.signal as signal
import threading
import queue

pygame.init()
screen = pygame.display.set_mode((500, 500))
pygame.display.set_caption("Enhanced Lip-Sync Stickman")
clock = pygame.time.Clock()

MOUTH_SHAPES = {
    "rest": "Neutral, lips together, relaxed position",
    "open": "Mouth open as in 'ah' sound",
    "wide": "Lips spread wide as in 'ee' sound",
    "round": "Lips rounded as in 'oh' or 'oo' sound",
    "teeth": "Teeth visible, as in 's', 'f', 't' sounds",
    "plosive": "Lips pressed together as in 'p', 'b', 'm' sounds"
}

IDX_TO_MOUTH = {
    0: 'rest',
    1: 'open',
    2: 'wide',
    3: 'round',
    4: 'teeth',
    5: 'plosive'
}

# Mouth shape interpolation mappings
MOUTH_SHAPE_VALUES = {
    'rest': {'jaw_position': 0, 'mouth_width': 40, 'mouth_height': 2, 'mouth_openness': 0},
    'open': {'jaw_position': 15, 'mouth_width': 40, 'mouth_height': 25, 'mouth_openness': 0.8},
    'wide': {'jaw_position': 5, 'mouth_width': 60, 'mouth_height': 10, 'mouth_openness': 0.5},
    'round': {'jaw_position': 7, 'mouth_width': 40, 'mouth_height': 40, 'mouth_openness': 0.7},
    'teeth': {'jaw_position': 10, 'mouth_width': 50, 'mouth_height': 15, 'mouth_openness': 0.6},
    'plosive': {'jaw_position': 3, 'mouth_width': 50, 'mouth_height': 4, 'mouth_openness': 0.2}
}

# Animation state variables for smooth transitions
current_mouth_params = {
    'jaw_position': 0,
    'mouth_width': 40,
    'mouth_height': 2,
    'mouth_openness': 0,
    'current_shape': 'rest'
}

# Queue for smoothing mouth shape predictions
mouth_shape_history = deque(maxlen=5)
for _ in range(5):
    mouth_shape_history.append('rest')

# Create a queue for audio data
audio_queue = queue.Queue(maxsize=10)
# Flag to indicate if audio processing should continue
processing_active = True


def interpolate_mouth_shape(target_shape, transition_speed=0.3):
    """Smoothly transition mouth parameters toward the target shape"""
    global current_mouth_params

    # Get target parameters
    target_params = MOUTH_SHAPE_VALUES[target_shape]

    # Interpolate each parameter
    for param in ['jaw_position', 'mouth_width', 'mouth_height', 'mouth_openness']:
        current_value = current_mouth_params[param]
        target_value = target_params[param]

        # Linear interpolation
        new_value = current_value + (target_value - current_value) * transition_speed
        current_mouth_params[param] = new_value

    # Update current shape
    current_mouth_params['current_shape'] = target_shape


def get_smoothed_mouth_shape(predicted_shape):
    """Use a history-based approach to smooth out mouth shape predictions"""
    # Add new prediction to history
    mouth_shape_history.append(predicted_shape)

    # Count occurrences of each shape in history
    shape_counts = {}
    for shape in mouth_shape_history:
        if shape in shape_counts:
            shape_counts[shape] += 1
        else:
            shape_counts[shape] = 1

    # Find most common shape
    smoothed_shape = max(shape_counts, key=shape_counts.get)

    # If the predicted shape is very different, use a weighted approach
    if smoothed_shape != predicted_shape:
        # If new prediction appears at least twice in history, use it
        if shape_counts.get(predicted_shape, 0) >= 2:
            smoothed_shape = predicted_shape

    return smoothed_shape


def draw_stickman_with_jaw():
    """Draw a stickman with different mouth shapes using interpolated parameters"""
    screen.fill((255, 255, 255))

    # Extract current parameters
    jaw_position = current_mouth_params['jaw_position']
    mouth_width = current_mouth_params['mouth_width']
    mouth_height = current_mouth_params['mouth_height']
    mouth_openness = current_mouth_params['mouth_openness']
    current_shape = current_mouth_params['current_shape']

    # Head - constant
    pygame.draw.circle(screen, (0, 0, 0), (250, 150), 60, 2)

    # Body and limbs - constant
    pygame.draw.line(screen, (0, 0, 0), (250, 210), (250, 350), 2)  # Body
    pygame.draw.line(screen, (0, 0, 0), (250, 250), (200, 300), 2)  # Left Arm
    pygame.draw.line(screen, (0, 0, 0), (250, 250), (300, 300), 2)  # Right Arm
    pygame.draw.line(screen, (0, 0, 0), (250, 350), (200, 450), 2)  # Left Leg
    pygame.draw.line(screen, (0, 0, 0), (250, 350), (300, 450), 2)  # Right Leg

    # Eyes - constant
    pygame.draw.circle(screen, (0, 0, 0), (230, 140), 5)  # Left eye
    pygame.draw.circle(screen, (0, 0, 0), (270, 140), 5)  # Right eye

    # Calculate mouth position based on current parameters
    mouth_center_x = 250
    mouth_center_y = 180 + jaw_position * 0.5  # Move down slightly with jaw

    # Base mouth positioning
    mouth_left = mouth_center_x - mouth_width / 2
    mouth_top = mouth_center_y - mouth_height / 2

    # Draw different mouth shapes based on the current shape but with interpolated parameters
    if current_shape == 'rest':
        # Neutral, closed mouth - just a line
        pygame.draw.line(screen, (0, 0, 0),
                         (mouth_center_x - mouth_width / 2, mouth_center_y),
                         (mouth_center_x + mouth_width / 2, mouth_center_y), 2)

    elif current_shape == 'open':
        # Open mouth - ellipse
        pygame.draw.ellipse(screen, (0, 0, 0),
                            (mouth_left, mouth_top, mouth_width, mouth_height), 2)
        # Darker inside mouth
        inner_width = max(5, mouth_width * 0.7)
        inner_height = max(3, mouth_height * 0.7)
        pygame.draw.ellipse(screen, (50, 50, 50),
                            (mouth_center_x - inner_width / 2,
                             mouth_center_y - inner_height / 2,
                             inner_width, inner_height))

    elif current_shape == 'wide':
        # Wide mouth - flat ellipse or arc
        mouth_rect = pygame.Rect(mouth_left, mouth_top, mouth_width, mouth_height * 1.2)
        pygame.draw.arc(screen, (0, 0, 0), mouth_rect, 0.2, 2.9, 2)

        # Slight darkness inside
        if mouth_height > 5:
            inner_rect = pygame.Rect(mouth_left + 5, mouth_top + 2, mouth_width - 10, mouth_height - 4)
            pygame.draw.arc(screen, (100, 100, 100), inner_rect, 0.3, 2.8, 2)

    elif current_shape == 'round':
        # Round mouth - circle - make sure this is visible
        radius = min(30, max(12, (mouth_width + mouth_height) / 4))  # Ensure minimum visible size
        pygame.draw.circle(screen, (0, 0, 0),
                           (mouth_center_x, mouth_center_y),
                           radius, 2)
        # Darker inside for depth
        if radius > 6:
            pygame.draw.circle(screen, (80, 80, 80),
                               (mouth_center_x, mouth_center_y),
                               max(radius - 3, 5))  # Ensure inner circle is visible

    elif current_shape == 'teeth':
        # Teeth visible - rectangle with teeth lines
        pygame.draw.rect(screen, (0, 0, 0),
                         (mouth_left, mouth_top, mouth_width, mouth_height), 2)

        # Draw teeth based on mouth width
        num_teeth = max(3, int(mouth_width / 8))
        teeth_spacing = mouth_width / (num_teeth + 1)

        for i in range(num_teeth):
            tooth_x = mouth_left + (i + 1) * teeth_spacing
            # Upper teeth
            pygame.draw.line(screen, (0, 0, 0),
                             (tooth_x, mouth_top),
                             (tooth_x, mouth_top + mouth_height * 0.5), 2)

        # Pink tongue
        tongue_width = mouth_width * 0.6
        pygame.draw.arc(screen, (255, 150, 150),
                        (mouth_center_x - tongue_width / 2,
                         mouth_center_y + mouth_height * 0.1,
                         tongue_width, mouth_height * 0.6),
                        0, 3.14, 2)

    elif current_shape == 'plosive':
        # Pressed lips
        line_thickness = max(2, int(4))  # Ensure visible thickness
        pygame.draw.line(screen, (0, 0, 0),
                         (mouth_left, mouth_center_y),
                         (mouth_left + mouth_width, mouth_center_y),
                         line_thickness)

        # Bulge in middle for pressure indication
        bulge_width = mouth_width * 0.3
        pygame.draw.arc(screen, (0, 0, 0),
                        (mouth_center_x - bulge_width / 2,
                         mouth_center_y - 5,  # Fixed position for visibility
                         bulge_width, 10),  # Fixed size for visibility
                        3.14, 6.28, 2)

        # Air burst lines for plosive - always show for plosive
        if current_shape == 'plosive':
            burst_color = (0, 0, 200)  # Always full intensity

            pygame.draw.line(screen, burst_color,
                             (mouth_left + mouth_width + 5, mouth_center_y - 2),
                             (mouth_left + mouth_width + 15, mouth_center_y - 5),
                             2)
            pygame.draw.line(screen, burst_color,
                             (mouth_left + mouth_width + 5, mouth_center_y),
                             (mouth_left + mouth_width + 18, mouth_center_y),
                             2)
            pygame.draw.line(screen, burst_color,
                             (mouth_left + mouth_width + 5, mouth_center_y + 2),
                             (mouth_left + mouth_width + 15, mouth_center_y + 5),
                             2)

    # Display current mouth shape
    font = pygame.font.SysFont('Arial', 20)
    text = font.render(f"Mouth: {current_shape}", True, (0, 0, 0))
    screen.blit(text, (20, 20))

    # Display animation parameters
    param_font = pygame.font.SysFont('Arial', 12)
    param_text = param_font.render(
        f"Jaw: {jaw_position:.1f}, Width: {mouth_width:.1f}, Height: {mouth_height:.1f}, Open: {mouth_openness:.1f}",
        True, (100, 100, 100))
    screen.blit(param_text, (20, 70))

    # Show description of the current mouth shape
    if current_shape in MOUTH_SHAPES:
        small_font = pygame.font.SysFont('Arial', 16)
        description = small_font.render(MOUTH_SHAPES.get(current_shape, ""), True, (0, 0, 0))
        screen.blit(description, (20, 45))

    # Show real-time audio status
    status_font = pygame.font.SysFont('Arial', 12)
    status_text = status_font.render(
        f"Audio Buffer: {'Active' if not audio_queue.empty() else 'Waiting'} | Queue size: {audio_queue.qsize()}/10",
        True, (0, 0, 200))
    screen.blit(status_text, (20, 90))

    pygame.display.flip()


def extract_features_with_windowing(audio_data, sr=22050):
    """Extract features using proper windowing techniques"""
    if len(audio_data) == 0 or np.max(np.abs(audio_data)) < 0.01:
        return np.zeros(8)  # Extended feature set

    try:
        # Apply Hamming window to reduce spectral leakage
        windowed_audio = audio_data * signal.windows.hamming(len(audio_data))

        # Basic features
        rms = np.sqrt(np.mean(np.square(windowed_audio)))

        # Zero crossing rate with windowing
        zero_crossings = np.sum(np.abs(np.diff(np.signbit(windowed_audio)))) / len(windowed_audio)

        # Spectral features with windowing
        # Using properly windowed FFT
        fft_size = 2048  # Power of 2 for efficient FFT
        if len(windowed_audio) < fft_size:
            # Zero-pad if necessary
            padded_audio = np.pad(windowed_audio, (0, fft_size - len(windowed_audio)))
        else:
            padded_audio = windowed_audio[:fft_size]

        spec = np.abs(np.fft.rfft(padded_audio))
        freqs = np.fft.rfftfreq(len(padded_audio), 1 / sr)

        # Spectral centroid (brightness)
        if len(spec) > 0 and np.sum(spec) > 0:
            spectral_centroid = np.sum(freqs * spec) / np.sum(spec)
        else:
            spectral_centroid = 0

        # Spectral rolloff
        if len(spec) > 0 and np.sum(spec) > 0:
            cumsum = np.cumsum(spec)
            spectral_rolloff = freqs[np.argmax(cumsum >= 0.85 * cumsum[-1])]
        else:
            spectral_rolloff = 0

        # Spectral flatness (tonality vs. noise)
        if len(spec) > 0 and np.prod(spec) > 0:
            geometric_mean = np.exp(np.mean(np.log(spec + 1e-10)))
            arithmetic_mean = np.mean(spec)
            spectral_flatness = geometric_mean / arithmetic_mean
        else:
            spectral_flatness = 0

        # Energy in different frequency bands with better frequency resolution
        if len(spec) > 10:
            # Get frequency band indices
            # Low (< 500 Hz), Mid (500-2000 Hz), High (> 2000 Hz)
            low_idx = np.sum(freqs < 500)
            mid_idx = np.sum(freqs < 2000)

            # Calculate energy in each band
            low_energy = np.sum(spec[:low_idx]) / np.sum(spec) if np.sum(spec) > 0 else 0
            mid_energy = np.sum(spec[low_idx:mid_idx]) / np.sum(spec) if np.sum(spec) > 0 else 0
            high_energy = np.sum(spec[mid_idx:]) / np.sum(spec) if np.sum(spec) > 0 else 0
        else:
            low_energy = mid_energy = high_energy = 0

        # Combine all features
        features = np.array([
            rms,  # Volume
            zero_crossings,  # Frequency indicator
            spectral_centroid / 1000,  # Scaled centroid
            spectral_rolloff / 1000,  # Scaled rolloff
            spectral_flatness,  # Tonality vs. noise
            low_energy,  # Low freq energy
            mid_energy,  # Mid freq energy
            high_energy  # High freq energy
        ])

        return features

    except Exception as e:
        print(f"Feature extraction error: {e}")
        return np.zeros(8)  # Return zeros for all features


def train_hmm_model(training_data, lengths=None, n_components=6):
    """Train and return a GaussianHMM model"""
    if lengths is None:
        lengths = [len(training_data)]

    model = hmm.GaussianHMM(
        n_components=n_components,
        covariance_type="diag",
        n_iter=100,
        random_state=42
    )
    model.fit(training_data, lengths)
    return model


def create_rnn_model(input_shape, num_classes=6):
    """Create a simple RNN model for phoneme recognition"""
    model = Sequential([
        Input(shape=input_shape),
        SimpleRNN(16, activation='relu'),
        Dense(16, activation='relu'),
        Dense(num_classes, activation='softmax')
    ])

    model.compile(
        optimizer='adam',
        loss='categorical_crossentropy',
        metrics=['accuracy']
    )

    return model


def generate_training_data(n_samples=1000, n_features=8):
    """Generate synthetic training data for mouth shapes with windowed features"""
    X_train = []
    y_train = []

    # Different characteristics for each mouth shape - expanded for new features
    characteristics = {
        # rest: low everything
        0: {'rms': 0.1, 'zcr': 0.2, 'centroid': 0.2, 'rolloff': 0.2,
            'flatness': 0.8, 'low': 0.5, 'mid': 0.3, 'high': 0.1},

        # open: high rms, medium centroid
        1: {'rms': 0.8, 'zcr': 0.4, 'centroid': 0.5, 'rolloff': 0.4,
            'flatness': 0.4, 'low': 0.7, 'mid': 0.5, 'high': 0.3},

        # wide: high centroid, high high-energy
        2: {'rms': 0.6, 'zcr': 0.7, 'centroid': 0.8, 'rolloff': 0.7,
            'flatness': 0.3, 'low': 0.3, 'mid': 0.5, 'high': 0.8},

        # round: medium rms, low centroid
        3: {'rms': 0.5, 'zcr': 0.3, 'centroid': 0.3, 'rolloff': 0.4,
            'flatness': 0.5, 'low': 0.8, 'mid': 0.4, 'high': 0.2},

        # teeth: high zcr, high centroid, high high-energy
        4: {'rms': 0.4, 'zcr': 0.9, 'centroid': 0.9, 'rolloff': 0.8,
            'flatness': 0.7, 'low': 0.2, 'mid': 0.4, 'high': 0.9},

        # plosive: medium zcr, medium rms
        5: {'rms': 0.5, 'zcr': 0.5, 'centroid': 0.4, 'rolloff': 0.5,
            'flatness': 0.6, 'low': 0.5, 'mid': 0.5, 'high': 0.5}
    }

    # Generate data for each class
    for class_idx in range(6):
        char = characteristics[class_idx]
        # Base values for this class
        base = np.array([
            char['rms'],
            char['zcr'],
            char['centroid'],
            char['rolloff'],
            char['flatness'],
            char['low'],
            char['mid'],
            char['high']
        ])

        # Generate samples with noise
        class_samples = np.random.normal(
            loc=base,
            scale=0.1,  # Add some noise
            size=(n_samples // 6, n_features)
        )

        X_train.append(class_samples)
        y_train.extend([class_idx] * (n_samples // 6))

    # Combine all data
    X_train = np.vstack(X_train)
    y_train = np.array(y_train)

    # Shuffle data
    indices = np.arange(len(y_train))
    np.random.shuffle(indices)
    X_train = X_train[indices]
    y_train = y_train[indices]

    return X_train, y_train


def classify_phoneme(features, hmm_model, rnn_model, scaler=None):
    """Classify audio features into mouth shape using both models"""
    # Check for silence
    if np.sum(np.abs(features)) < 0.1:
        return 'rest'

    # Scale features if scaler is provided
    if scaler is not None:
        features = scaler.transform(features.reshape(1, -1))
    else:
        # Just reshape
        features = features.reshape(1, -1)

    # For RNN, reshape to (samples, timesteps, features)
    rnn_features = features.reshape(1, features.shape[1], 1)

    # Get predictions from both models
    try:
        hmm_pred = hmm_model.predict(features)[0]
        rnn_pred = np.argmax(rnn_model.predict(rnn_features, verbose=0), axis=1)[0]

        # Combine both predictions with weighting
        # Giving more weight to RNN prediction
        hmm_weight = 0.3
        rnn_weight = 0.7

        # Simple weighted voting
        if hmm_pred == rnn_pred:
            final_prediction = hmm_pred
        else:
            # If they disagree, use the RNN prediction more often
            if np.random.random() < rnn_weight:
                final_prediction = rnn_pred
            else:
                final_prediction = hmm_pred

        return IDX_TO_MOUTH.get(final_prediction, 'rest')

    except Exception as e:
        print(f"Prediction error: {e}")
        return 'rest'


def audio_capture_thread(chunk_size, format_type, channels, rate, stream):
    """Thread function to continuously capture audio data"""
    global processing_active

    audio_buffer = np.zeros(chunk_size * 2, dtype=np.float32)

    while processing_active:
        try:
            # Read audio data
            data = np.frombuffer(stream.read(chunk_size, exception_on_overflow=False), dtype=np.int16)

            # Check if we have actual sound
            if np.max(np.abs(data)) > 100:
                # Convert to float and normalize
                audio_float = data.astype(np.float32) / np.max(np.abs(data))

                # Shift buffer and add new data (implementing overlap)
                audio_buffer[:-chunk_size] = audio_buffer[chunk_size:]
                audio_buffer[-chunk_size:] = audio_float

                # Try to add to queue if not full
                try:
                    if not audio_queue.full():
                        audio_queue.put(audio_buffer.copy(), block=False)
                except queue.Full:
                    # If queue is full, just continue
                    pass

            # Small sleep to prevent CPU hogging
            time.sleep(0.01)

        except Exception as e:
            print(f"Error in audio capture thread: {e}")
            time.sleep(0.1)  # Sleep a bit longer on error

    print("Audio capture thread stopped")


def audio_stream():
    """Main function to process audio and animate stickman"""
    global processing_active

    CHUNK = 1024  # Smaller chunk for more responsive updates
    FORMAT = pyaudio.paInt16
    CHANNELS = 1
    RATE = 22050

    # Initialize PyAudio
    p = pyaudio.PyAudio()
    stream = p.open(
        format=FORMAT,
        channels=CHANNELS,
        rate=RATE,
        input=True,
        frames_per_buffer=CHUNK
    )

    # Generate training data and train models with expanded feature set
    print("Generating training data...")
    X_train, y_train = generate_training_data(n_samples=1800, n_features=8)

    # Create a scaler to normalize the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)

    # Convert labels to one-hot encoding
    y_train_onehot = tf.keras.utils.to_categorical(y_train, num_classes=6)

    # Create and train the RNN model
    print("Training RNN model...")
    input_shape = (X_train_scaled.shape[1], 1)  # (features, timesteps)
    rnn_model = create_rnn_model(input_shape, num_classes=6)

    # Train the model with fewer epochs for testing
    rnn_model.fit(
        X_train_scaled.reshape(-1, X_train_scaled.shape[1], 1),
        y_train_onehot,
        epochs=5,
        batch_size=32,
        verbose=1
    )

    # Train HMM model
    print("Training HMM model...")
    hmm_model = train_hmm_model(X_train_scaled, n_components=6)

    # Start the audio capture thread
    capture_thread = threading.Thread(
        target=audio_capture_thread,
        args=(CHUNK, FORMAT, CHANNELS, RATE, stream),
        daemon=True
    )
    capture_thread.start()

    print("Starting audio stream. Press Q to quit.")

    running = True
    frame_counter = 0
    last_process_time = time.time()
    processing_interval = 0.05  # Process audio every 50ms for more responsive updates

    while running:
        # Check for quit events
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                running = False
            if event.type == pygame.KEYDOWN:
                if event.key == pygame.K_q:
                    running = False

        # Process audio data if available and enough time has passed
        current_time = time.time()
        if current_time - last_process_time >= processing_interval:
            try:
                # Get audio data from queue if available
                try:
                    audio_buffer = audio_queue.get(block=False)

                    # Process with overlapping windows
                    features = extract_features_with_windowing(audio_buffer, RATE)

                    # Classify and get mouth shape
                    raw_mouth_shape = classify_phoneme(features, hmm_model, rnn_model, scaler)

                    # Apply smoothing to the mouth shape
                    smoothed_mouth_shape = get_smoothed_mouth_shape(raw_mouth_shape)

                    # Interpolate toward the target mouth shape
                    interpolate_mouth_shape(smoothed_mouth_shape)

                    # Debug output
                    if frame_counter % 10 == 0:
                        print(f"Raw: {raw_mouth_shape}, Smoothed: {smoothed_mouth_shape}")

                except queue.Empty:
                    # If no audio data, gradually transition to rest position
                    interpolate_mouth_shape('rest', transition_speed=0.1)

            except Exception as e:
                print(f"Error in audio processing: {e}")
                interpolate_mouth_shape('rest')

            last_process_time = current_time

        # Draw the stickman with the current interpolated mouth shape
        draw_stickman_with_jaw()

        # Increment frame counter
        frame_counter += 1

        # Cap the frame rate
        clock.tick(60)  # Increased frame rate for smoother animation

    # Clean up
    processing_active = False
    capture_thread.join(timeout=1.0)
    stream.stop_stream()
    stream.close()
    p.terminate()
    pygame.quit()
    print("Program ended.")


if __name__ == "__main__":
    print("Starting enhanced lip-sync program...")
    audio_stream()
Editor is loading...
Leave a Comment