Untitled

 avatar
unknown
plain_text
2 years ago
18 kB
5
Indexable
from __future__ import annotations

import argparse
import json
import os
import time
from configparser import ConfigParser
from pathlib import Path

import pyrogram
from pyrogram.errors import ChannelInvalid, FloodWait, PeerIdInvalid

from setup import version

DELAY_AMOUNT = 10


def get_config_data(path_file_config):
    """get default configuration data from file config.ini

    Returns:
        dict: config data
    """

    config_file = ConfigParser()
    config_file.read(path_file_config)
    default_config = dict(config_file["default"])
    return default_config


def foward_photo(message, destination_chat):

    caption = get_caption(message)
    photo_id = message.photo.file_id
    try:
        tg.send_photo(
            chat_id=destination_chat,
            photo=photo_id,
            caption=caption,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_photo(message, destination_chat)


def foward_text(message, destination_chat):

    text = message.text.markdown
    try:
        tg.send_message(
            chat_id=destination_chat,
            text=text,
            disable_notification=True,
            disable_web_page_preview=True,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_text(message, destination_chat)


def foward_sticker(message, destination_chat):

    sticker_id = message.sticker.file_id
    try:
        tg.send_sticker(chat_id=destination_chat, sticker=sticker_id)
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_sticker(message, destination_chat)


def foward_document(message, destination_chat):

    caption = get_caption(message)
    document_id = message.document.file_id
    try:
        tg.send_document(
            chat_id=destination_chat,
            document=document_id,
            disable_notification=True,
            caption=caption,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_document(message, destination_chat)


def foward_animation(message, destination_chat):

    caption = get_caption(message)
    animation_id = message.animation.file_id
    try:
        tg.send_animation(
            chat_id=destination_chat,
            animation=animation_id,
            disable_notification=True,
            caption=caption,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_animation(message, destination_chat)


def foward_audio(message, destination_chat):

    caption = get_caption(message)
    audio_id = message.audio.file_id
    try:
        tg.send_audio(
            chat_id=destination_chat,
            audio=audio_id,
            disable_notification=True,
            caption=caption,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_audio(message, destination_chat)


def foward_voice(message, destination_chat):

    caption = get_caption(message)
    voice_id = message.voice.file_id
    try:
        tg.send_voice(
            chat_id=destination_chat,
            voice=voice_id,
            disable_notification=True,
            caption=caption,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_voice(message, destination_chat)


def foward_video_note(message, destination_chat):

    video_note_id = message.video_note.file_id
    try:
        tg.send_video_note(
            chat_id=destination_chat,
            video_note=video_note_id,
            disable_notification=True,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_video_note(message, destination_chat)


def foward_video(message, destination_chat):

    caption = get_caption(message)
    video_id = message.video.file_id
    try:
        tg.send_video(
            chat_id=destination_chat,
            video=video_id,
            disable_notification=True,
            caption=caption,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_video(message, destination_chat)


def foward_poll(message, destination_chat):

    if message.poll.type != "regular":
        return
    try:
        tg.send_poll(
            chat_id=destination_chat,
            question=message.poll.question,
            options=[option.text for option in message.poll.options],
            is_anonymous=message.poll.is_anonymous,
            allows_multiple_answers=message.poll.allows_multiple_answers,
            disable_notification=True,
        )
        return
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    foward_poll(message, destination_chat)


def get_caption(message):

    if message.caption:
        caption = message.caption.markdown
    else:
        caption = None
    return caption


def get_sender(message):

    if message.photo:
        return foward_photo
    if message.text:
        return foward_text
    if message.document:
        return foward_document
    if message.sticker:
        return foward_sticker
    if message.animation:
        return foward_animation
    if message.audio:
        return foward_audio
    if message.voice:
        return foward_voice
    if message.video:
        return foward_video
    if message.video_note:
        return foward_video_note
    if message.poll:
        return foward_poll

    print("\nNot recognized message type:\n")
    print(message)
    raise Exception


def get_input_type_to_copy():

    answer = ""
    print("0 - All files")
    print("1 - Photos")
    print("2 - Text")
    print("3 - Documents (pdf, zip, rar, ...)")
    print("4 - Stickers")
    print("5 - Animation")
    print("6 - Audio files (music)")
    print("7 - Voice message")
    print("8 - Videos")
    print("9 - Polls\n")
    print(
        "Enter the number(s) of the file type to clone, separating by comma."
    )
    print("For example, to copy photos and documents type: 1,3")
    answer = input("Your answer: ")
    return answer


def get_files_type_excluded_by_input(input_string):

    files_type_excluded = []
    if input_string == "" or "0" in input_string:
        return files_type_excluded
    else:
        if "1" not in input_string:
            files_type_excluded += [foward_photo]
        if "2" not in input_string:
            files_type_excluded += [foward_text]
        if "3" not in input_string:
            files_type_excluded += [foward_document]
        if "4" not in input_string:
            files_type_excluded += [foward_sticker]
        if "5" not in input_string:
            files_type_excluded += [foward_animation]
        if "6" not in input_string:
            files_type_excluded += [foward_audio]
        if "7" not in input_string:
            files_type_excluded += [foward_voice]
        if "8" not in input_string:
            files_type_excluded += [foward_video]
        if "9" not in input_string:
            files_type_excluded += [foward_poll]
        if len(files_type_excluded) == 9:
            print("Invalid option! Try again")
            return get_files_type_excluded_by_input(input_string)
    return files_type_excluded


def get_message(origin_chat, message_id):

    try:
        message = tg.get_messages(origin_chat, message_id)
        return message
    except FloodWait as e:
        print(f"..FloodWait {e.value} seconds..")
        time.sleep(e.value)
    except Exception as e:
        print(f"trying again... Due to: {e}")
        time.sleep(10)

    return get_message(origin_chat, message_id)


def task_type():

    print("New cloning or continuation?\n1 = new\n2 = resume")
    answer = input("Your answer: ")
    if answer == "1":
        return 1
    elif answer == "2":
        return 2
    else:
        print("\nInvalid answer.\n")
        return task_type()


def get_list_posted(int_task_type):

    # 1 = new
    if int_task_type == 1:
        if os.path.exists(CACHE_FILE):
            os.remove(CACHE_FILE)
        return []
    else:  # 2 = resume
        if os.path.exists(CACHE_FILE):
            with open(CACHE_FILE, mode="r") as file:
                posted = json.loads(file.read())
                return posted
        else:
            return []


def wait_a_moment(message_id, skip=False):

    if message_id != 1:
        if skip:
            time.sleep(DELAY_SKIP)
        else:
            time.sleep(DELAY_AMOUNT)


def update_cache(CACHE_FILE, list_posted):

    with open(CACHE_FILE, mode="w") as file:
        file.write(json.dumps(list_posted))


def get_last_message_id(origin_chat):

    iter_message = useraccount.get_chat_history(origin_chat)
    message = next(iter_message)
    return message.id


def get_files_type_excluded():

    global FILES_TYPE_EXCLUDED
    try:
        FILES_TYPE_EXCLUDED = FILES_TYPE_EXCLUDED
        return FILES_TYPE_EXCLUDED
    except:
        FILES_TYPE_EXCLUDED = get_files_type_excluded_by_input(
            get_input_type_to_copy()
        )
        return FILES_TYPE_EXCLUDED


def is_empty_message(message, message_id, last_message_id) -> bool:

    if message.empty or message.service or message.dice or message.location:
        print(f"{message_id}/{last_message_id} (blank id)")
        wait_a_moment(message_id, skip=True)
        return True
    else:
        return False


def must_be_ignored(func_sender, message_id, last_message_id) -> bool:

    if func_sender in FILES_TYPE_EXCLUDED:
        print(f"{message_id}/{last_message_id} (skip by type)")
        wait_a_moment(message_id, skip=True)
        return True
    else:
        return False


def get_first_message_id(list_posted) -> int:

    if len(list_posted) > 0:
        message_id = list_posted[-1]
    else:
        message_id = 1
    return message_id


def ensure_folder_existence(folder_path):
    """If the folder does not exist, it creates

    Args:
        folder_path (str): folder path
    """

    if not os.path.exists(folder_path):
        os.mkdir(folder_path)


def get_task_file(ORIGIN_CHAT_TITLE, destination_chat):

    ensure_folder_existence("user")
    ensure_folder_existence(os.path.join("user", "tasks"))
    task_file_name = f"{ORIGIN_CHAT_TITLE}-{destination_chat}.json"
    task_file_path = os.path.join("user", "tasks", task_file_name)
    return task_file_path


def check_chat_id(chat_id):

    try:
        chat_obj = tg.get_chat(chat_id)
        chat_title = chat_obj.title
        return chat_title
    except ChannelInvalid:  # When you are not part of the channel
        print("\nNon-accessible chat")
        if MODE == "bot":
            print(
                "\nCheck that the bot is part of the chat as an administrator."
                + "It is necessary for bot mode."
            )
        else:
            print("\nCheck that the user account is part of the chat.")
        return False
    except PeerIdInvalid:  # When the chat_id is invalid
        print(f"\nInvalid chat_id: {chat_id}")
        return False


def ensure_connection(client_name):

    if client_name == "user":
        if Path(f"{client_name}.session").exists():
            try:
                useraccount = pyrogram.Client(client_name)
                useraccount.start()
                return useraccount
            except:
                print("Delete Session file and try again.")

        while True:
            try:
                api_id = int(input("Enter your api_id: "))
                api_hash = input("Enter your api_hash: ")

                useraccount = pyrogram.Client("user", api_id, api_hash)
                useraccount.start()
                return useraccount
            except:
                print("\nError. Try again.\n")
                pass
    else:
        pass

    if client_name == "bot":
        if Path(f"{client_name}.session").exists():
            try:
                bot = pyrogram.Client(client_name)
                bot.start()
                return bot
            except:
                print("Delete Session file and try again.")

        while True:
            try:
                api_id = int(input("Enter your api_id: "))
                api_hash = input("Enter your api_hash: ")
                bot_token = input("Enter your bot_token: ")

                bot = pyrogram.Client(
                    client_name, api_id, api_hash, bot_token=bot_token
                )
                bot.start()
                return bot
            except:
                print("\nError. Try again.\n")
                pass


def main():

    print(
        f"\n....:: Clonechat - v{version} ::....\n"
        + "github.com/apenasrr/clonechat/\n"
    )

    global FILES_TYPE_EXCLUDED
    FILES_TYPE_EXCLUDED = get_files_type_excluded()
    last_message_id = get_last_message_id(origin_chat)

    global NEW
    if NEW is None:
        int_task_type = task_type()
    else:
        int_task_type = NEW
    list_posted = get_list_posted(int_task_type)

    message_id = get_first_message_id(list_posted)
    while message_id < last_message_id:
        message_id = message_id + 1
        if message_id in list_posted:
            continue

        message = get_message(origin_chat, message_id)

        if is_empty_message(message, message_id, last_message_id):
            list_posted += [message.id]
            continue

        func_sender = get_sender(message)

        if must_be_ignored(func_sender, message_id, last_message_id):
            list_posted += [message.id]
            update_cache(CACHE_FILE, list_posted)
            continue

        func_sender(message, destination_chat)
        print(f"{message_id}/{last_message_id}")

        list_posted += [message.id]
        update_cache(CACHE_FILE, list_posted)

        wait_a_moment(message_id)

    print(
        "\nChat cloning finished! :)\n"
        + "If you are not going to continue this task for these chats, "
        + "delete the posted.json file"
    )


config_data = get_config_data(
    path_file_config=os.path.join("user", "config.ini")
)

USER_DELAY_SECONDS = float(config_data.get("user_delay_seconds"))
BOT_DELAY_SECONDS = float(config_data.get("bot_delay_seconds"))
SKIP_DELAY_SECONDS = float(config_data.get("skip_delay_seconds"))

parser = argparse.ArgumentParser()
parser.add_argument("--orig", help="chat_id of origin channel/group")
parser.add_argument("--dest", help="chat_id of destination channel/group")
parser.add_argument(
    "--mode",
    choices=["user", "bot"],
    help='"user" is slow. "bot" requires token_bot in credentials',
)
parser.add_argument(
    "--new", type=int, choices=[1, 2], help="1 = new, 2 = resume"
)
help_type = """list separated by comma of message type to be clonned:
Ex. for documents and videos: 3,8 || Options:
0 = All files
1 = Photos
2 = Text
3 = Documents (pdf, zip, rar...)
4 = Stickers
5 = Animation
6 = Audio files (music
7 = Voice message
8 = Videos
9 = Polls"""
parser.add_argument("--type", help=help_type)
options = parser.parse_args()

if options.mode is None:
    MODE = config_data.get("mode")
else:
    MODE = options.mode


useraccount = ensure_connection("user")
print(f"{MODE=}")
if MODE == "bot":
    bot = ensure_connection("bot")
    tg = bot
    DELAY_AMOUNT = BOT_DELAY_SECONDS

if MODE == "user":
    tg = useraccount
    DELAY_AMOUNT = USER_DELAY_SECONDS

DELAY_SKIP = SKIP_DELAY_SECONDS

NEW = options.new

if options.orig is None:  # Menu interface
    while True:
        origin_chat = int(input("Enter the origin id_chat:"))
        ORIGIN_CHAT_TITLE = check_chat_id(origin_chat)
        if ORIGIN_CHAT_TITLE:
            break
else:  # CLI interface
    origin_chat = int(options.orig)
    ORIGIN_CHAT_TITLE = check_chat_id(origin_chat)
    if ORIGIN_CHAT_TITLE is False:
        raise AttributeError("Fix the origin chat_id")
    FILES_TYPE_EXCLUDED = []
    if NEW is None:
        NEW = 1
    else:
        NEW = int(NEW)

if options.dest is None:  # Menu interface
    while True:
        destination_chat = int(input("Enter the destination id_chat:"))
        DESTINATION_CHAT_TITLE = check_chat_id(origin_chat)
        if DESTINATION_CHAT_TITLE:
            break
else:  # CLI interface
    destination_chat = int(options.dest)
    DESTINATION_CHAT_TITLE = check_chat_id(origin_chat)
    if DESTINATION_CHAT_TITLE is False:
        raise AttributeError("Fix the destination chat_id")

if options.type is None:
    pass
else:
    TYPE = options.type
    FILES_TYPE_EXCLUDED = get_files_type_excluded_by_input(TYPE)

CACHE_FILE = get_task_file(ORIGIN_CHAT_TITLE, destination_chat)

main()
Editor is loading...