Untitled
unknown
python
a month ago
6.8 kB
2
Indexable
Never
import os import uuid import requests import srt_equalizer import assemblyai as aai import subprocess from typing import List from moviepy.editor import ( VideoFileClip, concatenate_videoclips, TextClip, CompositeVideoClip, AudioFileClip, concatenate_audioclips, ) from termcolor import colored from dotenv import load_dotenv from datetime import timedelta from moviepy.video.fx.all import crop from moviepy.video.tools.subtitles import SubtitlesClip load_dotenv("../.env") ASSEMBLY_AI_API_KEY = os.getenv("ASSEMBLY_AI_API_KEY") def save_video(video_url: str, directory: str = "../temp") -> str: video_id = uuid.uuid4() video_path = f"{directory}/{video_id}.mp4" try: response = requests.get(video_url, stream=True) response.raise_for_status() with open(video_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) if not verify_video_file(video_path): print(colored(f"[-] Downloaded video is invalid: {video_path}", "red")) os.remove(video_path) return "" return video_path except requests.exceptions.RequestException as e: print(colored(f"[-] Error downloading video: {e}", "red")) return "" def save_video_with_retry(video_url: str, directory: str = "../temp", retries: int = 3) -> str: for attempt in range(retries): video_path = save_video(video_url, directory) if video_path: return video_path print(colored(f"[-] Retry {attempt + 1}/{retries} failed for video {video_url}", "yellow")) return "" def verify_video_file(video_path: str) -> bool: try: result = subprocess.run( ["ffmpeg", "-v", "error", "-i", video_path, "-f", "null", "-"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if result.returncode == 0: return True else: print(colored(f"[-] ffmpeg error: {result.stderr.decode()}", "red")) return False except Exception as e: print(colored(f"[-] An error occurred while verifying video file: {e}", "red")) return False def __generate_subtitles_assemblyai(audio_path: str, voice: str) -> str: language_mapping = { "br": "pt", "id": "en", # AssemblyAI doesn't have Indonesian "jp": "ja", "kr": "ko", } if voice in language_mapping: lang_code = language_mapping[voice] else: lang_code = voice aai.settings.api_key = ASSEMBLY_AI_API_KEY config = aai.TranscriptionConfig(language_code=lang_code) transcriber = aai.Transcriber(config=config) transcript = transcriber.transcribe(audio_path) subtitles = transcript.export_subtitles_srt() return subtitles def __generate_subtitles_locally(sentences: List[str], audio_clips: List[AudioFileClip]) -> str: def convert_to_srt_time_format(total_seconds): if total_seconds == 0: return "0:00:00,0" return str(timedelta(seconds=total_seconds)).rstrip("0").replace(".", ",") start_time = 0 subtitles = [] for i, (sentence, audio_clip) in enumerate(zip(sentences, audio_clips), start=1): duration = audio_clip.duration end_time = start_time + duration subtitle_entry = ( f"{i}\n{convert_to_srt_time_format(start_time)} --> {convert_to_srt_time_format(end_time)}\n{sentence}\n" ) subtitles.append(subtitle_entry) start_time += duration return "\n".join(subtitles) def generate_subtitles( audio_path: str, sentences: List[str], audio_clips: List[AudioFileClip], voice: str ) -> str: def equalize_subtitles(srt_path: str, max_chars: int = 10) -> None: srt_equalizer.equalize_srt_file(srt_path, srt_path, max_chars) subtitles_path = f"../subtitles/{uuid.uuid4()}.srt" if ASSEMBLY_AI_API_KEY: print(colored("[+] Creating subtitles using AssemblyAI", "blue")) subtitles = __generate_subtitles_assemblyai(audio_path, voice) else: print(colored("[+] Creating subtitles locally", "blue")) subtitles = __generate_subtitles_locally(sentences, audio_clips) with open(subtitles_path, "w") as file: file.write(subtitles) equalize_subtitles(subtitles_path) print(colored("[+] Subtitles generated.", "green")) return subtitles_path def combine_videos( video_paths: List[str], max_duration: int, max_clip_duration: int, threads: int ) -> str: video_id = uuid.uuid4() combined_video_path = f"../temp/{video_id}.mp4" valid_clips = [] tot_dur = 0 print(colored("[+] Combining videos...", "blue")) req_dur = max_duration / len(video_paths) print(colored(f"[+] Each clip will be maximum {req_dur} seconds long.", "blue")) for video_path in video_paths: if not os.path.exists(video_path): continue try: clip = VideoFileClip(video_path).without_audio() if (max_duration - tot_dur) < clip.duration: clip = clip.subclip(0, (max_duration - tot_dur)) if max_clip_duration < clip.duration: clip = clip.subclip(0, max_clip_duration) clip = clip.set_fps(30) clip = clip.resize((1080, 1920)) valid_clips.append(clip) tot_dur += clip.duration except Exception as e: print(colored(f"[-] Skipping invalid video: {video_path} ({e})", "red")) continue if valid_clips: final_clip = concatenate_videoclips(valid_clips) final_clip.write_videofile(combined_video_path, threads=threads) return combined_video_path else: print(colored("[-] No valid video clips to combine.", "red")) return "" def generate_video( combined_video_path: str, tts_path: str, subtitles_path: str, threads: int, subtitles_position: str, text_color: str, ) -> str: generator = lambda txt: TextClip( txt, font="../fonts/bold_font.ttf", fontsize=100, color=text_color, stroke_color="black", stroke_width=5, ) horizontal_subtitles_position, vertical_subtitles_position = subtitles_position.split( "," ) subtitles = SubtitlesClip(subtitles_path, generator) result = CompositeVideoClip( [ VideoFileClip(combined_video_path), subtitles.set_pos((horizontal_subtitles_position, vertical_subtitles_position)), ] ) audio = AudioFileClip(tts_path) result = result.set_audio(audio) result.write_videofile("../temp/output.mp4", threads=threads or 2) return "output.mp4"
Leave a Comment