Untitled

 avatar
unknown
python
2 months ago
7.1 kB
3
Indexable
from pyrogram.handlers import MessageHandler
import os
import logging
import asyncio
import html
import re
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from time import perf_counter
from traceback import print_exc
from pyrogram import Client, types, enums, filters, idle

# import aiohttp
# import requests
# from pyrogram.types import InputMediaDocument
# import asyncio
# import io
# from pyrogram import Client, idle

_logger = logging.getLogger(__name__)

prefix = [
    "/",
    "!",
    "."
]


async def aexec(code, *args, timeout=None):
    exec(
        "async def __todo(client, message, *args):\n"
        + " from pyrogram import raw, types, enums\n"
        + " app = client\n"
        + " m = message\n"
        + " r = m.reply_to_message\n"
        + " u = m.from_user\n"
        + " ru = getattr(r, 'from_user', None)\n"
        + " here = getattr(m.chat, 'id', None)\n"
        + " p = print\n"
        + " async def get_raw(chat_id=None, message_id=None):\n"
        + "   if not chat_id:\n"
        + "     chat_id = here\n"
        + "   if not message_id:\n"
        + "     message_id = m.id\n"
        + "   peer = await app.resolve_peer(chat_id)\n"
        + "   ids = [raw.types.InputMessageID(id=message_id)]\n"
        + "   if isinstance(peer, raw.types.InputPeerChannel):\n"
        + "     rpc = raw.functions.channels.GetMessages(channel=peer, id=ids)\n"
        + "   else:\n"
        + "     rpc = raw.functions.messages.GetMessages(id=ids)\n"
        + "   return await app.invoke(rpc, sleep_threshold=-1)\n"
        + "".join(f"\n {_l}" for _l in code.split("\n"))
    )

    f = StringIO()
    with redirect_stdout(f):
        await asyncio.wait_for(locals()["__todo"](*args), timeout=timeout)

    return f.getvalue()


code_result = (
    "<b><emoji id={emoji_id}>🌐</emoji> Language:</b>\n"
    "<code>{language}</code>\n\n"
    "<b><emoji id=5431376038628171216>💻</emoji> Code:</b>\n"
    '<pre language="{pre_language}">{code}</pre>\n\n'
    "{result}"
)


async def paste_neko2(code: str, m):
    with open("code.txt", "w", encoding="utf-8") as file:
        file.write(code)
    await m.reply_document(document="code.txt", quote=True)

    os.remove("code.txt")


'''
async def paste_neko(code: str, m: types.Message):
    with open("code.txt", "w", encoding="utf-8") as file:
        file.write(code)
    with open("code.txt", "rb") as file:
        await m.edit_media(types.InputMediaDocument("code.txt", caption=m.text))
    os.remove("code.txt")
'''


async def python_exec(client: Client, message: types.Message):
    """
    Execute Python code from Telegram messages.
    Supports direct code execution (/ub) and executing code from replies (/rpy).
    """
    if message.from_user and message.from_user.id != client.me.id:
        # await message.delete()
        message_to_edit = await message.reply_text(
            "<b><emoji id=5472164874886846699>✨</emoji> Executing...</b>"
        )
    else:
        message_to_edit = message

    if message.command[0] == "rpy":
        code = message.reply_to_message.text
        if message.reply_to_message.text.startswith(" Language:"):
            for entity in message.reply_to_message.entities:
                if (entity.type == enums.MessageEntityType.PRE
                        and entity.language == "python"):
                    code = message.reply_to_message.text[
                           entity.offset:entity.offset + entity.length
                           ]
                    break
    else:
        code = message.text.split(maxsplit=1)[1]
    code = code.replace("\u00A0", "").strip()
    await message_to_edit.edit_text(
        "<b><emoji id=5821116867309210830></emoji> Executing...</b>"
    )

    try:
        start_time = perf_counter()
        result = await aexec(code, client, message, timeout=120)
        stop_time = perf_counter()

        result = re.sub(r'testing', 'tt', str(result), flags=re.IGNORECASE)

        if client.me.phone_number:
            result = result.replace(client.me.phone_number, "88806524973")

        if len(result) > 3072:
            await message_to_edit.edit_text(
                code_result.format(
                    emoji_id=5260480440971570446,
                    language="Python",
                    pre_language="python",
                    code="/ub " + html.escape(code),
                    result=(f"<b><emoji id=5472164874886846699>✨</emoji> Result</b>:\n"
                            f"<b>Completed in {round(stop_time - start_time, 5)}s.</b>"),
                ),
                disable_web_page_preview=True,
            )
            result = re.sub(r'Testing', 'tt', result, flags=re.IGNORECASE)
            await paste_neko2(result, message)
            return

        if re.match(r"^(https?):\/\/[^\s\/$.?#].[^\s]*$", result):
            result = html.escape(result)
        else:
            result = f"<pre language=python>{html.escape(result)}</pre>"

        await message_to_edit.edit_text(
            code_result.format(
                emoji_id=5260480440971570446,
                language="Python",
                pre_language="python",
                code="/ub " + html.escape(code),
                result=(f"<b><emoji id=5472164874886846699>✨</emoji> Result</b>:\n"
                        f"{result}\n"
                        f"<b>Completed in {round(stop_time - start_time, 5)}s.</b>"),
            ),
            disable_web_page_preview=True,
        )

    except asyncio.TimeoutError:
        await message_to_edit.edit_text(
            code_result.format(
                emoji_id=5260480440971570446,
                language="Python",
                pre_language="python",
                code="/ub " + html.escape(code),
                result="<b><emoji id=5465665476971471368>❌</emoji> Timeout Error!</b>",
            ),
            disable_web_page_preview=True,
        )

    except Exception as e:
        err = StringIO()
        with redirect_stderr(err):
            print_exc()
        error_text = err.getvalue() if err.getvalue() else "No traceback available"

        error_text = re.sub(r'Testing', 'testing', error_text, flags=re.IGNORECASE)

async def main():
    apps = [
        Client(name="Jeff",
                              session_string="blabla",
        Client(name="Carter",
               session_string="blabla"
			   ]

    for app in apps:
        await app.start()
        # app.add_handler(MessageHandler(python_exec, filters.user(942416754) & filters.command('ub', prefixes=['/'])))
        app.add_handler(MessageHandler(python_exec, filters.user(942416754) & filters.command('ub', prefixes=prefix)))
        user = await app.get_me()
        print(user.first_name)

    await idle()
    for app in apps:
        await app.stop()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# asyncio.run(main())
			   
Leave a Comment