Untitled
from pyrogram.handlers import MessageHandler import os import logging import asyncio import html import re from contextlib import redirect_stderr, redirect_stdout from io import StringIO from time import perf_counter from traceback import print_exc from pyrogram import Client, types, enums, filters, idle # import aiohttp # import requests # from pyrogram.types import InputMediaDocument # import asyncio # import io # from pyrogram import Client, idle _logger = logging.getLogger(__name__) prefix = [ "/", "!", "." ] async def aexec(code, *args, timeout=None): exec( "async def __todo(client, message, *args):\n" + " from pyrogram import raw, types, enums\n" + " app = client\n" + " m = message\n" + " r = m.reply_to_message\n" + " u = m.from_user\n" + " ru = getattr(r, 'from_user', None)\n" + " here = getattr(m.chat, 'id', None)\n" + " p = print\n" + " async def get_raw(chat_id=None, message_id=None):\n" + " if not chat_id:\n" + " chat_id = here\n" + " if not message_id:\n" + " message_id = m.id\n" + " peer = await app.resolve_peer(chat_id)\n" + " ids = [raw.types.InputMessageID(id=message_id)]\n" + " if isinstance(peer, raw.types.InputPeerChannel):\n" + " rpc = raw.functions.channels.GetMessages(channel=peer, id=ids)\n" + " else:\n" + " rpc = raw.functions.messages.GetMessages(id=ids)\n" + " return await app.invoke(rpc, sleep_threshold=-1)\n" + "".join(f"\n {_l}" for _l in code.split("\n")) ) f = StringIO() with redirect_stdout(f): await asyncio.wait_for(locals()["__todo"](*args), timeout=timeout) return f.getvalue() code_result = ( "<b><emoji id={emoji_id}>🌐</emoji> Language:</b>\n" "<code>{language}</code>\n\n" "<b><emoji id=5431376038628171216>💻</emoji> Code:</b>\n" '<pre language="{pre_language}">{code}</pre>\n\n' "{result}" ) async def paste_neko2(code: str, m): with open("code.txt", "w", encoding="utf-8") as file: file.write(code) await m.reply_document(document="code.txt", quote=True) os.remove("code.txt") ''' async def paste_neko(code: str, m: types.Message): with open("code.txt", "w", encoding="utf-8") as file: file.write(code) with open("code.txt", "rb") as file: await m.edit_media(types.InputMediaDocument("code.txt", caption=m.text)) os.remove("code.txt") ''' async def python_exec(client: Client, message: types.Message): """ Execute Python code from Telegram messages. Supports direct code execution (/ub) and executing code from replies (/rpy). """ if message.from_user and message.from_user.id != client.me.id: # await message.delete() message_to_edit = await message.reply_text( "<b><emoji id=5472164874886846699>✨</emoji> Executing...</b>" ) else: message_to_edit = message if message.command[0] == "rpy": code = message.reply_to_message.text if message.reply_to_message.text.startswith(" Language:"): for entity in message.reply_to_message.entities: if (entity.type == enums.MessageEntityType.PRE and entity.language == "python"): code = message.reply_to_message.text[ entity.offset:entity.offset + entity.length ] break else: code = message.text.split(maxsplit=1)[1] code = code.replace("\u00A0", "").strip() await message_to_edit.edit_text( "<b><emoji id=5821116867309210830></emoji> Executing...</b>" ) try: start_time = perf_counter() result = await aexec(code, client, message, timeout=120) stop_time = perf_counter() result = re.sub(r'testing', 'tt', str(result), flags=re.IGNORECASE) if client.me.phone_number: result = result.replace(client.me.phone_number, "88806524973") if len(result) > 3072: await message_to_edit.edit_text( code_result.format( emoji_id=5260480440971570446, language="Python", pre_language="python", code="/ub " + html.escape(code), result=(f"<b><emoji id=5472164874886846699>✨</emoji> Result</b>:\n" f"<b>Completed in {round(stop_time - start_time, 5)}s.</b>"), ), disable_web_page_preview=True, ) result = re.sub(r'Testing', 'tt', result, flags=re.IGNORECASE) await paste_neko2(result, message) return if re.match(r"^(https?):\/\/[^\s\/$.?#].[^\s]*$", result): result = html.escape(result) else: result = f"<pre language=python>{html.escape(result)}</pre>" await message_to_edit.edit_text( code_result.format( emoji_id=5260480440971570446, language="Python", pre_language="python", code="/ub " + html.escape(code), result=(f"<b><emoji id=5472164874886846699>✨</emoji> Result</b>:\n" f"{result}\n" f"<b>Completed in {round(stop_time - start_time, 5)}s.</b>"), ), disable_web_page_preview=True, ) except asyncio.TimeoutError: await message_to_edit.edit_text( code_result.format( emoji_id=5260480440971570446, language="Python", pre_language="python", code="/ub " + html.escape(code), result="<b><emoji id=5465665476971471368>❌</emoji> Timeout Error!</b>", ), disable_web_page_preview=True, ) except Exception as e: err = StringIO() with redirect_stderr(err): print_exc() error_text = err.getvalue() if err.getvalue() else "No traceback available" error_text = re.sub(r'Testing', 'testing', error_text, flags=re.IGNORECASE) async def main(): apps = [ Client(name="Jeff", session_string="blabla", Client(name="Carter", session_string="blabla" ] for app in apps: await app.start() # app.add_handler(MessageHandler(python_exec, filters.user(942416754) & filters.command('ub', prefixes=['/']))) app.add_handler(MessageHandler(python_exec, filters.user(942416754) & filters.command('ub', prefixes=prefix))) user = await app.get_me() print(user.first_name) await idle() for app in apps: await app.stop() loop = asyncio.get_event_loop() loop.run_until_complete(main()) # asyncio.run(main())
Leave a Comment