Untitled

mail@pastecode.io avatar
unknown
python
a month ago
6.8 kB
2
Indexable
Never
import os
import uuid
import requests
import srt_equalizer
import assemblyai as aai
import subprocess

from typing import List
from moviepy.editor import (
    VideoFileClip,
    concatenate_videoclips,
    TextClip,
    CompositeVideoClip,
    AudioFileClip,
    concatenate_audioclips,
)
from termcolor import colored
from dotenv import load_dotenv
from datetime import timedelta
from moviepy.video.fx.all import crop
from moviepy.video.tools.subtitles import SubtitlesClip

load_dotenv("../.env")

ASSEMBLY_AI_API_KEY = os.getenv("ASSEMBLY_AI_API_KEY")


def save_video(video_url: str, directory: str = "../temp") -> str:
    video_id = uuid.uuid4()
    video_path = f"{directory}/{video_id}.mp4"

    try:
        response = requests.get(video_url, stream=True)
        response.raise_for_status()

        with open(video_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)

        if not verify_video_file(video_path):
            print(colored(f"[-] Downloaded video is invalid: {video_path}", "red"))
            os.remove(video_path)
            return ""

        return video_path

    except requests.exceptions.RequestException as e:
        print(colored(f"[-] Error downloading video: {e}", "red"))
        return ""


def save_video_with_retry(video_url: str, directory: str = "../temp", retries: int = 3) -> str:
    for attempt in range(retries):
        video_path = save_video(video_url, directory)
        if video_path:
            return video_path
        print(colored(f"[-] Retry {attempt + 1}/{retries} failed for video {video_url}", "yellow"))
    return ""


def verify_video_file(video_path: str) -> bool:
    try:
        result = subprocess.run(
            ["ffmpeg", "-v", "error", "-i", video_path, "-f", "null", "-"],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
        )
        if result.returncode == 0:
            return True
        else:
            print(colored(f"[-] ffmpeg error: {result.stderr.decode()}", "red"))
            return False
    except Exception as e:
        print(colored(f"[-] An error occurred while verifying video file: {e}", "red"))
        return False


def __generate_subtitles_assemblyai(audio_path: str, voice: str) -> str:
    language_mapping = {
        "br": "pt",
        "id": "en",  # AssemblyAI doesn't have Indonesian
        "jp": "ja",
        "kr": "ko",
    }

    if voice in language_mapping:
        lang_code = language_mapping[voice]
    else:
        lang_code = voice

    aai.settings.api_key = ASSEMBLY_AI_API_KEY
    config = aai.TranscriptionConfig(language_code=lang_code)
    transcriber = aai.Transcriber(config=config)
    transcript = transcriber.transcribe(audio_path)
    subtitles = transcript.export_subtitles_srt()

    return subtitles


def __generate_subtitles_locally(sentences: List[str], audio_clips: List[AudioFileClip]) -> str:
    def convert_to_srt_time_format(total_seconds):
        if total_seconds == 0:
            return "0:00:00,0"
        return str(timedelta(seconds=total_seconds)).rstrip("0").replace(".", ",")

    start_time = 0
    subtitles = []

    for i, (sentence, audio_clip) in enumerate(zip(sentences, audio_clips), start=1):
        duration = audio_clip.duration
        end_time = start_time + duration

        subtitle_entry = (
            f"{i}\n{convert_to_srt_time_format(start_time)} --> {convert_to_srt_time_format(end_time)}\n{sentence}\n"
        )
        subtitles.append(subtitle_entry)

        start_time += duration

    return "\n".join(subtitles)


def generate_subtitles(
    audio_path: str, sentences: List[str], audio_clips: List[AudioFileClip], voice: str
) -> str:
    def equalize_subtitles(srt_path: str, max_chars: int = 10) -> None:
        srt_equalizer.equalize_srt_file(srt_path, srt_path, max_chars)

    subtitles_path = f"../subtitles/{uuid.uuid4()}.srt"

    if ASSEMBLY_AI_API_KEY:
        print(colored("[+] Creating subtitles using AssemblyAI", "blue"))
        subtitles = __generate_subtitles_assemblyai(audio_path, voice)
    else:
        print(colored("[+] Creating subtitles locally", "blue"))
        subtitles = __generate_subtitles_locally(sentences, audio_clips)

    with open(subtitles_path, "w") as file:
        file.write(subtitles)

    equalize_subtitles(subtitles_path)

    print(colored("[+] Subtitles generated.", "green"))

    return subtitles_path


def combine_videos(
    video_paths: List[str], max_duration: int, max_clip_duration: int, threads: int
) -> str:
    video_id = uuid.uuid4()
    combined_video_path = f"../temp/{video_id}.mp4"

    valid_clips = []
    tot_dur = 0

    print(colored("[+] Combining videos...", "blue"))

    req_dur = max_duration / len(video_paths)
    print(colored(f"[+] Each clip will be maximum {req_dur} seconds long.", "blue"))

    for video_path in video_paths:
        if not os.path.exists(video_path):
            continue

        try:
            clip = VideoFileClip(video_path).without_audio()

            if (max_duration - tot_dur) < clip.duration:
                clip = clip.subclip(0, (max_duration - tot_dur))

            if max_clip_duration < clip.duration:
                clip = clip.subclip(0, max_clip_duration)

            clip = clip.set_fps(30)
            clip = clip.resize((1080, 1920))

            valid_clips.append(clip)
            tot_dur += clip.duration

        except Exception as e:
            print(colored(f"[-] Skipping invalid video: {video_path} ({e})", "red"))
            continue

    if valid_clips:
        final_clip = concatenate_videoclips(valid_clips)
        final_clip.write_videofile(combined_video_path, threads=threads)
        return combined_video_path
    else:
        print(colored("[-] No valid video clips to combine.", "red"))
        return ""


def generate_video(
    combined_video_path: str,
    tts_path: str,
    subtitles_path: str,
    threads: int,
    subtitles_position: str,
    text_color: str,
) -> str:
    generator = lambda txt: TextClip(
        txt,
        font="../fonts/bold_font.ttf",
        fontsize=100,
        color=text_color,
        stroke_color="black",
        stroke_width=5,
    )

    horizontal_subtitles_position, vertical_subtitles_position = subtitles_position.split(
        ","
    )

    subtitles = SubtitlesClip(subtitles_path, generator)
    result = CompositeVideoClip(
        [
            VideoFileClip(combined_video_path),
            subtitles.set_pos((horizontal_subtitles_position, vertical_subtitles_position)),
        ]
    )

    audio = AudioFileClip(tts_path)
    result = result.set_audio(audio)

    result.write_videofile("../temp/output.mp4", threads=threads or 2)

    return "output.mp4"
Leave a Comment