Untitled
unknown
python
a year ago
6.8 kB
15
Indexable
import os
import uuid
import requests
import srt_equalizer
import assemblyai as aai
import subprocess
from typing import List
from moviepy.editor import (
VideoFileClip,
concatenate_videoclips,
TextClip,
CompositeVideoClip,
AudioFileClip,
concatenate_audioclips,
)
from termcolor import colored
from dotenv import load_dotenv
from datetime import timedelta
from moviepy.video.fx.all import crop
from moviepy.video.tools.subtitles import SubtitlesClip
load_dotenv("../.env")
ASSEMBLY_AI_API_KEY = os.getenv("ASSEMBLY_AI_API_KEY")
def save_video(video_url: str, directory: str = "../temp") -> str:
video_id = uuid.uuid4()
video_path = f"{directory}/{video_id}.mp4"
try:
response = requests.get(video_url, stream=True)
response.raise_for_status()
with open(video_path, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if not verify_video_file(video_path):
print(colored(f"[-] Downloaded video is invalid: {video_path}", "red"))
os.remove(video_path)
return ""
return video_path
except requests.exceptions.RequestException as e:
print(colored(f"[-] Error downloading video: {e}", "red"))
return ""
def save_video_with_retry(video_url: str, directory: str = "../temp", retries: int = 3) -> str:
for attempt in range(retries):
video_path = save_video(video_url, directory)
if video_path:
return video_path
print(colored(f"[-] Retry {attempt + 1}/{retries} failed for video {video_url}", "yellow"))
return ""
def verify_video_file(video_path: str) -> bool:
try:
result = subprocess.run(
["ffmpeg", "-v", "error", "-i", video_path, "-f", "null", "-"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if result.returncode == 0:
return True
else:
print(colored(f"[-] ffmpeg error: {result.stderr.decode()}", "red"))
return False
except Exception as e:
print(colored(f"[-] An error occurred while verifying video file: {e}", "red"))
return False
def __generate_subtitles_assemblyai(audio_path: str, voice: str) -> str:
language_mapping = {
"br": "pt",
"id": "en", # AssemblyAI doesn't have Indonesian
"jp": "ja",
"kr": "ko",
}
if voice in language_mapping:
lang_code = language_mapping[voice]
else:
lang_code = voice
aai.settings.api_key = ASSEMBLY_AI_API_KEY
config = aai.TranscriptionConfig(language_code=lang_code)
transcriber = aai.Transcriber(config=config)
transcript = transcriber.transcribe(audio_path)
subtitles = transcript.export_subtitles_srt()
return subtitles
def __generate_subtitles_locally(sentences: List[str], audio_clips: List[AudioFileClip]) -> str:
def convert_to_srt_time_format(total_seconds):
if total_seconds == 0:
return "0:00:00,0"
return str(timedelta(seconds=total_seconds)).rstrip("0").replace(".", ",")
start_time = 0
subtitles = []
for i, (sentence, audio_clip) in enumerate(zip(sentences, audio_clips), start=1):
duration = audio_clip.duration
end_time = start_time + duration
subtitle_entry = (
f"{i}\n{convert_to_srt_time_format(start_time)} --> {convert_to_srt_time_format(end_time)}\n{sentence}\n"
)
subtitles.append(subtitle_entry)
start_time += duration
return "\n".join(subtitles)
def generate_subtitles(
audio_path: str, sentences: List[str], audio_clips: List[AudioFileClip], voice: str
) -> str:
def equalize_subtitles(srt_path: str, max_chars: int = 10) -> None:
srt_equalizer.equalize_srt_file(srt_path, srt_path, max_chars)
subtitles_path = f"../subtitles/{uuid.uuid4()}.srt"
if ASSEMBLY_AI_API_KEY:
print(colored("[+] Creating subtitles using AssemblyAI", "blue"))
subtitles = __generate_subtitles_assemblyai(audio_path, voice)
else:
print(colored("[+] Creating subtitles locally", "blue"))
subtitles = __generate_subtitles_locally(sentences, audio_clips)
with open(subtitles_path, "w") as file:
file.write(subtitles)
equalize_subtitles(subtitles_path)
print(colored("[+] Subtitles generated.", "green"))
return subtitles_path
def combine_videos(
video_paths: List[str], max_duration: int, max_clip_duration: int, threads: int
) -> str:
video_id = uuid.uuid4()
combined_video_path = f"../temp/{video_id}.mp4"
valid_clips = []
tot_dur = 0
print(colored("[+] Combining videos...", "blue"))
req_dur = max_duration / len(video_paths)
print(colored(f"[+] Each clip will be maximum {req_dur} seconds long.", "blue"))
for video_path in video_paths:
if not os.path.exists(video_path):
continue
try:
clip = VideoFileClip(video_path).without_audio()
if (max_duration - tot_dur) < clip.duration:
clip = clip.subclip(0, (max_duration - tot_dur))
if max_clip_duration < clip.duration:
clip = clip.subclip(0, max_clip_duration)
clip = clip.set_fps(30)
clip = clip.resize((1080, 1920))
valid_clips.append(clip)
tot_dur += clip.duration
except Exception as e:
print(colored(f"[-] Skipping invalid video: {video_path} ({e})", "red"))
continue
if valid_clips:
final_clip = concatenate_videoclips(valid_clips)
final_clip.write_videofile(combined_video_path, threads=threads)
return combined_video_path
else:
print(colored("[-] No valid video clips to combine.", "red"))
return ""
def generate_video(
combined_video_path: str,
tts_path: str,
subtitles_path: str,
threads: int,
subtitles_position: str,
text_color: str,
) -> str:
generator = lambda txt: TextClip(
txt,
font="../fonts/bold_font.ttf",
fontsize=100,
color=text_color,
stroke_color="black",
stroke_width=5,
)
horizontal_subtitles_position, vertical_subtitles_position = subtitles_position.split(
","
)
subtitles = SubtitlesClip(subtitles_path, generator)
result = CompositeVideoClip(
[
VideoFileClip(combined_video_path),
subtitles.set_pos((horizontal_subtitles_position, vertical_subtitles_position)),
]
)
audio = AudioFileClip(tts_path)
result = result.set_audio(audio)
result.write_videofile("../temp/output.mp4", threads=threads or 2)
return "output.mp4"
Editor is loading...
Leave a Comment