Untitled

 avatar
unknown
plain_text
5 months ago
22 kB
15
Indexable
import sounddevice as sd
import numpy as np
import speech_recognition as sr
from scipy.io.wavfile import write
import os
import tempfile
from datetime import datetime
import threading
import queue
import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
from tkinter.filedialog import asksaveasfilename
from deep_translator import GoogleTranslator
import joblib
import librosa

# Configuration
SAMPLE_RATE = 16000
CHANNELS = 1
CHUNK_DURATION = 10  # seconds
DTYPE = np.int16

# Available languages for translation
LANGUAGES = {
    'English': 'en',
    'Spanish': 'es',
    'French': 'fr',
    'German': 'de',
    'Italian': 'it',
    'Portuguese': 'pt',
    'Russian': 'ru',
    'Japanese': 'ja',
    'Korean': 'ko',
    'Chinese (Simplified)': 'zh-CN',
    'Arabic': 'ar',
    'Hindi': 'hi',
    'Vietnamese': 'vi',
    'Thai': 'th',
    'Turkish': 'tr',
    'Polish': 'pl',
    'Dutch': 'nl',
    'Swedish': 'sv',
    'Indonesian': 'id',
    'Filipino': 'tl'
}

def extract_speech_feature(file_name, label=None, **kwargs):
    """
    Trích xuất đặc trưng MFCC + MEL từ audio file.
    """
    mfcc = kwargs.get("mfcc", True)
    mel = kwargs.get("mel", False)

    if os.path.exists(file_name):
        # Load audio và chuẩn hóa
        X, sample_rate = librosa.load(file_name, sr=16000)

        # Chuẩn hóa âm lượng
        X = librosa.util.normalize(X)

        # Trim bỏ silent đầu & cuối
        X, _ = librosa.effects.trim(X, top_db=20)

        # Đệm im lặng để file về cùng độ dài
        desired_length = sample_rate * 5  # 5 giây
        if len(X) < desired_length:
            pad_len = desired_length - len(X)
            X = np.pad(X, (0, pad_len), mode='constant')
        else:
            X = X[:desired_length]

        result = {}

        if mfcc:
            mfccs = librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40)
            for i, mfcc_val in enumerate(mfccs):
                result[f"mfcc_{i}"] = np.mean(mfcc_val)
                result[f"mfcc_{i}_std"] = np.std(mfcc_val)

        if mel:
            mel_spec = librosa.feature.melspectrogram(y=X, sr=sample_rate)
            for i, mel_val in enumerate(mel_spec):
                result[f"mel_{i}"] = np.mean(mel_val)

        if label is not None:
            result["label"] = label
        return result
    return None


def classify_voice(wav_file, models_dict, scaler, threshold=0.40):
    """
    Phân loại giọng nói từ file WAV.
    """
    label_map = {0: 'Nhựt', 1: 'Linh', 2: 'Việt', 3: 'Huy'}

    feature_dict = extract_speech_feature(wav_file, mfcc=True)
    if feature_dict is None:
        return {"error": "❌ File âm thanh không hợp lệ!"}

    mfcc_features = np.array([feature_dict[f'mfcc_{i}'] for i in range(40)])
    X = scaler.transform(mfcc_features.reshape(1, -1))

    predictions = []
    confidences = []
    results_detail = {}

    for model_name, (model, acc) in models_dict.items():
        pred_label = model.predict(X)[0]

        if hasattr(model, "predict_proba"):
            proba = model.predict_proba(X)[0]
            confidence = float(proba[pred_label])
        else:
            confidence = 1.0  

        predictions.append(pred_label)
        confidences.append(confidence)

        results_detail[model_name] = {
            "label": label_map[pred_label],
            "confidence": round(confidence * 100, 2)
        }

    if predictions.count(predictions[0]) == len(predictions):
        final_label = label_map[predictions[0]]

        if all(c >= threshold for c in confidences):
            return {
                "result": f"✅ Người nói: {final_label}",
                "details": results_detail
            }

    return {
        "result": "❌ Không thể phân loại",
        "details": results_detail
    }


class SpeechRecognitionGUI:
    def __init__(self, root):
        self.root = root
        self.root.title("Speech Recognition & Translation")
        self.root.geometry("1400x700")
        self.root.configure(bg='#2b2b2b')
        
        # Initialize recognizer
        self.recognizer = sr.Recognizer()
        self.audio_queue = queue.Queue()
        self.is_running = False
        self.full_transcript = []
        self.full_translation = []
        self.stream = None
        self.target_language = 'es'  # Default to Spanish
        
        # Load voice prediction models
        try:
            self.models_dict = joblib.load("models.pkl")
            self.scaler = joblib.load("scaler.pkl")
            self.voice_prediction_enabled = True
        except Exception as e:
            print(f"Warning: Could not load voice prediction models: {e}")
            self.models_dict = None
            self.scaler = None
            self.voice_prediction_enabled = False
        
        # Setup GUI
        self.setup_gui()
        
    def setup_gui(self):
        """Create GUI elements"""
        # Title
        title_frame = tk.Frame(self.root, bg='#1e1e1e', height=60)
        title_frame.pack(fill=tk.X, padx=10, pady=(10, 0))
        title_frame.pack_propagate(False)
        
        title_label = tk.Label(
            title_frame, 
            text="🎤 Speech Recognition & Translation",
            font=('Helvetica', 20, 'bold'),
            bg='#1e1e1e',
            fg='#ffffff'
        )
        title_label.pack(pady=15)
        
        # Status frame
        status_frame = tk.Frame(self.root, bg='#2b2b2b')
        status_frame.pack(fill=tk.X, padx=10, pady=10)
        
        self.status_label = tk.Label(
            status_frame,
            text="● Ready to start",
            font=('Helvetica', 12),
            bg='#2b2b2b',
            fg='#4CAF50'
        )
        self.status_label.pack(side=tk.LEFT, padx=10)
        
        self.time_label = tk.Label(
            status_frame,
            text="00:00:00",
            font=('Helvetica', 12, 'bold'),
            bg='#2b2b2b',
            fg='#ffffff'
        )
        self.time_label.pack(side=tk.RIGHT, padx=10)
        
        # Control buttons and language selector
        button_frame = tk.Frame(self.root, bg='#2b2b2b')
        button_frame.pack(fill=tk.X, padx=10, pady=10)
        
        self.start_button = tk.Button(
            button_frame,
            text="▶ Start Recording",
            command=self.start_recording,
            font=('Helvetica', 12, 'bold'),
            bg='#4CAF50',
            fg='white',
            activebackground='#45a049',
            relief=tk.FLAT,
            cursor='hand2',
            padx=20,
            pady=10
        )
        self.start_button.pack(side=tk.LEFT, padx=5)
        
        self.stop_button = tk.Button(
            button_frame,
            text="■ Stop Recording",
            command=self.stop_recording,
            font=('Helvetica', 12, 'bold'),
            bg='#f44336',
            fg='white',
            activebackground='#da190b',
            relief=tk.FLAT,
            cursor='hand2',
            padx=20,
            pady=10,
            state=tk.DISABLED
        )
        self.stop_button.pack(side=tk.LEFT, padx=5)
        
        self.clear_button = tk.Button(
            button_frame,
            text="🗑 Clear",
            command=self.clear_transcript,
            font=('Helvetica', 12),
            bg='#757575',
            fg='white',
            activebackground='#616161',
            relief=tk.FLAT,
            cursor='hand2',
            padx=20,
            pady=10
        )
        self.clear_button.pack(side=tk.LEFT, padx=5)
        
        self.save_button = tk.Button(
            button_frame,
            text="💾 Save",
            command=self.save_transcript,
            font=('Helvetica', 12),
            bg='#2196F3',
            fg='white',
            activebackground='#0b7dda',
            relief=tk.FLAT,
            cursor='hand2',
            padx=20,
            pady=10
        )
        self.save_button.pack(side=tk.LEFT, padx=5)
        
        # Language selector
        lang_frame = tk.Frame(button_frame, bg='#2b2b2b')
        lang_frame.pack(side=tk.RIGHT, padx=10)
        
        lang_label = tk.Label(
            lang_frame,
            text="Translate to:",
            font=('Helvetica', 11),
            bg='#2b2b2b',
            fg='#ffffff'
        )
        lang_label.pack(side=tk.LEFT, padx=(0, 5))
        
        self.language_var = tk.StringVar(value='Spanish')
        self.language_combo = ttk.Combobox(
            lang_frame,
            textvariable=self.language_var,
            values=list(LANGUAGES.keys()),
            state='readonly',
            font=('Helvetica', 10),
            width=20
        )
        self.language_combo.pack(side=tk.LEFT)
        self.language_combo.bind('<<ComboboxSelected>>', self.on_language_change)
        
        # Main content frame with three columns
        content_frame = tk.Frame(self.root, bg='#2b2b2b')
        content_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)
        
        # Left column - Original transcript
        left_frame = tk.Frame(content_frame, bg='#2b2b2b')
        left_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=(0, 5))
        
        transcript_label = tk.Label(
            left_frame,
            text="📝 Original Transcript (English):",
            font=('Helvetica', 11, 'bold'),
            bg='#2b2b2b',
            fg='#ffffff',
            anchor='w'
        )
        transcript_label.pack(fill=tk.X, pady=(0, 5))
        
        self.transcript_text = scrolledtext.ScrolledText(
            left_frame,
            font=('Consolas', 11),
            bg='#1e1e1e',
            fg='#ffffff',
            insertbackground='white',
            relief=tk.FLAT,
            padx=10,
            pady=10,
            wrap=tk.WORD
        )
        self.transcript_text.pack(fill=tk.BOTH, expand=True)
        
        # Middle column - Translation
        middle_frame = tk.Frame(content_frame, bg='#2b2b2b')
        middle_frame.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=5)
        
        self.translation_label = tk.Label(
            middle_frame,
            text="🌐 Translation (Spanish):",
            font=('Helvetica', 11, 'bold'),
            bg='#2b2b2b',
            fg='#ffffff',
            anchor='w'
        )
        self.translation_label.pack(fill=tk.X, pady=(0, 5))
        
        self.translation_text = scrolledtext.ScrolledText(
            middle_frame,
            font=('Consolas', 11),
            bg='#1e1e1e',
            fg='#4CAF50',
            insertbackground='white',
            relief=tk.FLAT,
            padx=10,
            pady=10,
            wrap=tk.WORD
        )
        self.translation_text.pack(fill=tk.BOTH, expand=True)
        
        # Right column - Voice Prediction
        right_frame = tk.Frame(content_frame, bg='#2b2b2b')
        right_frame.pack(side=tk.RIGHT, fill=tk.BOTH, expand=True, padx=(5, 0))
        
        prediction_label = tk.Label(
            right_frame,
            text="🎯 Voice Prediction:",
            font=('Helvetica', 11, 'bold'),
            bg='#2b2b2b',
            fg='#ffffff',
            anchor='w'
        )
        prediction_label.pack(fill=tk.X, pady=(0, 5))
        
        self.prediction_text = scrolledtext.ScrolledText(
            right_frame,
            font=('Consolas', 10),
            bg='#1e1e1e',
            fg='#FFC107',
            insertbackground='white',
            relief=tk.FLAT,
            padx=10,
            pady=10,
            wrap=tk.WORD
        )
        self.prediction_text.pack(fill=tk.BOTH, expand=True)
        
        # Show model status
        if not self.voice_prediction_enabled:
            self.prediction_text.insert(tk.END, "⚠️ Voice prediction models not found.\n")
            self.prediction_text.insert(tk.END, "Please ensure 'models.pkl' and 'scaler.pkl' are in the same directory.\n")
            self.prediction_text.config(fg='#FF5722')
        
        # Footer
        footer_frame = tk.Frame(self.root, bg='#1e1e1e', height=30)
        footer_frame.pack(fill=tk.X, side=tk.BOTTOM)
        footer_frame.pack_propagate(False)
        
        footer_label = tk.Label(
            footer_frame,
            text="Press Start to begin recording • Google Speech Recognition & Translation",
            font=('Helvetica', 9),
            bg='#1e1e1e',
            fg='#888888'
        )
        footer_label.pack(pady=5)
        
    def on_language_change(self, event):
        """Handle language selection change"""
        selected_lang = self.language_var.get()
        self.target_language = LANGUAGES[selected_lang]
        self.translation_label.config(text=f"🌐 Translation ({selected_lang}):")
        self.update_status(f"● Translation language set to {selected_lang}", '#4CAF50')
    
    def predict_voice(self, wav_file, threshold=0.40):
        """Predict voice characteristics from audio file"""
        if not self.voice_prediction_enabled:
            return None
        
        try:
            result = classify_voice(wav_file, self.models_dict, self.scaler, threshold)
            return result
        except Exception as e:
            print(f"Voice prediction error: {e}")
            return None
    
    def format_prediction_result(self, result):
        """Format prediction result for display"""
        if not result:
            return ""
        
        timestamp = datetime.now().strftime("%H:%M:%S")
        
        if "error" in result:
            return f"[{timestamp}] {result['error']}\n\n"
        
        output = f"[{timestamp}] {result['result']}\n"
        
        for model_name, detail in result["details"].items():
            output += f"  {model_name.upper()}: {detail['label']} | {detail['confidence']}%\n"
        
        output += "\n"
        return output
    
    def translate_text(self, text):
        """Translate text to target language"""
        try:
            translator = GoogleTranslator(source='en', target=self.target_language)
            translated = translator.translate(text)
            return translated
        except Exception as e:
            return f"[Translation error: {e}]"
    
    def audio_callback(self, indata, frames, time, status):
        """Callback for sounddevice stream"""
        if status and self.is_running:
            self.update_status(f"⚠ Audio warning: {status}", '#FF9800')
        self.audio_queue.put(indata.copy())
    
    def process_audio(self):
        """Process audio from queue"""
        audio_buffer = []
        
        while self.is_running:
            try:
                chunk = self.audio_queue.get(timeout=0.1)
                audio_buffer.append(chunk)
                
                if len(audio_buffer) >= (SAMPLE_RATE * CHUNK_DURATION) / (len(chunk)):
                    audio_data = np.concatenate(audio_buffer)
                    audio_buffer = []
                    
                    with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file:
                        temp_filename = temp_file.name
                        write(temp_filename, SAMPLE_RATE, audio_data)
                    
                    try:
                        with sr.AudioFile(temp_filename) as source:
                            audio = self.recognizer.record(source)
                        
                        text = self.recognizer.recognize_google(audio, language="en-US")
                        timestamp = datetime.now().strftime("%H:%M:%S")
                        
                        # Translate the text
                        translated_text = self.translate_text(text)
                        
                        # Predict voice characteristics
                        prediction_result = None
                        if self.voice_prediction_enabled:
                            prediction_result = self.predict_voice(temp_filename)
                        
                        transcript_line = f"[{timestamp}] {text}"
                        translation_line = f"[{timestamp}] {translated_text}"
                        
                        self.full_transcript.append(transcript_line)
                        self.full_translation.append(translation_line)
                        
                        # Update GUI
                        self.root.after(0, self.add_transcript_line, transcript_line, translation_line, prediction_result)
                        
                    except sr.UnknownValueError:
                        pass
                    except sr.RequestError as e:
                        self.root.after(0, self.update_status, f"❌ API Error: {e}", '#f44336')
                    finally:
                        try:
                            os.unlink(temp_filename)
                        except:
                            pass
                            
            except queue.Empty:
                continue
            except Exception as e:
                if self.is_running:
                    self.root.after(0, self.update_status, f"❌ Error: {e}", '#f44336')
    
    def add_transcript_line(self, transcript_line, translation_line, prediction_result=None):
        """Add lines to transcript, translation, and prediction displays"""
        self.transcript_text.insert(tk.END, transcript_line + "\n")
        self.transcript_text.see(tk.END)
        
        self.translation_text.insert(tk.END, translation_line + "\n")
        self.translation_text.see(tk.END)
        
        if prediction_result and self.voice_prediction_enabled:
            prediction_text = self.format_prediction_result(prediction_result)
            self.prediction_text.insert(tk.END, prediction_text)
            self.prediction_text.see(tk.END)
        
        self.update_time_display()
    
    def update_status(self, message, color='#4CAF50'):
        """Update status label"""
        self.status_label.config(text=message, fg=color)
    
    def update_time_display(self):
        """Update time display"""
        if self.full_transcript:
            current_time = datetime.now().strftime("%H:%M:%S")
            self.time_label.config(text=current_time)
    
    def start_recording(self):
        """Start speech recognition"""
        if self.is_running:
            return
        
        self.is_running = True
        self.start_button.config(state=tk.DISABLED)
        self.stop_button.config(state=tk.NORMAL)
        self.language_combo.config(state=tk.DISABLED)
        self.update_status("● Recording...", '#f44336')
        
        # Clear queue
        while not self.audio_queue.empty():
            self.audio_queue.get()
        
        # Start processing thread
        process_thread = threading.Thread(target=self.process_audio, daemon=True)
        process_thread.start()
        
        # Start audio stream
        try:
            self.stream = sd.InputStream(
                samplerate=SAMPLE_RATE,
                channels=CHANNELS,
                dtype=DTYPE,
                callback=self.audio_callback
            )
            self.stream.start()
        except Exception as e:
            self.update_status(f"❌ Failed to start: {e}", '#f44336')
            self.stop_recording()
    
    def stop_recording(self):
        """Stop speech recognition"""
        self.is_running = False
        
        if self.stream:
            self.stream.stop()
            self.stream.close()
            self.stream = None
        
        self.start_button.config(state=tk.NORMAL)
        self.stop_button.config(state=tk.DISABLED)
        self.language_combo.config(state='readonly')
        self.update_status("● Stopped", '#FF9800')
    
    def clear_transcript(self):
        """Clear transcript"""
        if self.full_transcript:
            response = messagebox.askyesno(
                "Clear Transcript",
                "Are you sure you want to clear all data (transcript, translation, and predictions)?"
            )
            if response:
                self.transcript_text.delete(1.0, tk.END)
                self.translation_text.delete(1.0, tk.END)
                self.prediction_text.delete(1.0, tk.END)
                self.full_transcript = []
                self.full_translation = []
                self.update_status("● All data cleared", '#4CAF50')
    
    def save_transcript(self):
        """Save transcript to file"""
        if not self.full_transcript:
            messagebox.showinfo("Save Transcript", "No transcript to save!")
            return
        
        filename = asksaveasfilename(
            defaultextension=".txt",
            filetypes=[("Text files", "*.txt"), ("All files", "*.*")],
            initialfile=f"transcript_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
        )
        
        if filename:
            try:
                with open(filename, 'w', encoding='utf-8') as f:
                    f.write("=" * 60 + "\n")
                    f.write("ORIGINAL TRANSCRIPT (ENGLISH)\n")
                    f.write("=" * 60 + "\n\n")
                    f.write('\n'.join(self.full_transcript))
                    f.write("\n\n" + "=" * 60 + "\n")
                    f.write(f"TRANSLATION ({self.language_var.get().upper()})\n")
                    f.write("=" * 60 + "\n\n")
                    f.write('\n'.join(self.full_translation))
                messagebox.showinfo("Save Transcript", f"Transcript saved to:\n{filename}")
                self.update_status("✓ Transcript saved", '#4CAF50')
            except Exception as e:
                messagebox.showerror("Save Error", f"Failed to save transcript:\n{e}")
    
    def on_closing(self):
        """Handle window close"""
        if self.is_running:
            self.stop_recording()
        self.root.destroy()


def main():
    root = tk.Tk()
    app = SpeechRecognitionGUI(root)
    root.protocol("WM_DELETE_WINDOW", app.on_closing)
    root.mainloop()


if __name__ == "__main__":
    main()
Editor is loading...
Leave a Comment