Untitled
unknown
python
a year ago
7.1 kB
7
Indexable
from pyrogram.handlers import MessageHandler
import os
import logging
import asyncio
import html
import re
from contextlib import redirect_stderr, redirect_stdout
from io import StringIO
from time import perf_counter
from traceback import print_exc
from pyrogram import Client, types, enums, filters, idle
# import aiohttp
# import requests
# from pyrogram.types import InputMediaDocument
# import asyncio
# import io
# from pyrogram import Client, idle
_logger = logging.getLogger(__name__)
prefix = [
"/",
"!",
"."
]
async def aexec(code, *args, timeout=None):
exec(
"async def __todo(client, message, *args):\n"
+ " from pyrogram import raw, types, enums\n"
+ " app = client\n"
+ " m = message\n"
+ " r = m.reply_to_message\n"
+ " u = m.from_user\n"
+ " ru = getattr(r, 'from_user', None)\n"
+ " here = getattr(m.chat, 'id', None)\n"
+ " p = print\n"
+ " async def get_raw(chat_id=None, message_id=None):\n"
+ " if not chat_id:\n"
+ " chat_id = here\n"
+ " if not message_id:\n"
+ " message_id = m.id\n"
+ " peer = await app.resolve_peer(chat_id)\n"
+ " ids = [raw.types.InputMessageID(id=message_id)]\n"
+ " if isinstance(peer, raw.types.InputPeerChannel):\n"
+ " rpc = raw.functions.channels.GetMessages(channel=peer, id=ids)\n"
+ " else:\n"
+ " rpc = raw.functions.messages.GetMessages(id=ids)\n"
+ " return await app.invoke(rpc, sleep_threshold=-1)\n"
+ "".join(f"\n {_l}" for _l in code.split("\n"))
)
f = StringIO()
with redirect_stdout(f):
await asyncio.wait_for(locals()["__todo"](*args), timeout=timeout)
return f.getvalue()
code_result = (
"<b><emoji id={emoji_id}>🌐</emoji> Language:</b>\n"
"<code>{language}</code>\n\n"
"<b><emoji id=5431376038628171216>💻</emoji> Code:</b>\n"
'<pre language="{pre_language}">{code}</pre>\n\n'
"{result}"
)
async def paste_neko2(code: str, m):
with open("code.txt", "w", encoding="utf-8") as file:
file.write(code)
await m.reply_document(document="code.txt", quote=True)
os.remove("code.txt")
'''
async def paste_neko(code: str, m: types.Message):
with open("code.txt", "w", encoding="utf-8") as file:
file.write(code)
with open("code.txt", "rb") as file:
await m.edit_media(types.InputMediaDocument("code.txt", caption=m.text))
os.remove("code.txt")
'''
async def python_exec(client: Client, message: types.Message):
"""
Execute Python code from Telegram messages.
Supports direct code execution (/ub) and executing code from replies (/rpy).
"""
if message.from_user and message.from_user.id != client.me.id:
# await message.delete()
message_to_edit = await message.reply_text(
"<b><emoji id=5472164874886846699>✨</emoji> Executing...</b>"
)
else:
message_to_edit = message
if message.command[0] == "rpy":
code = message.reply_to_message.text
if message.reply_to_message.text.startswith(" Language:"):
for entity in message.reply_to_message.entities:
if (entity.type == enums.MessageEntityType.PRE
and entity.language == "python"):
code = message.reply_to_message.text[
entity.offset:entity.offset + entity.length
]
break
else:
code = message.text.split(maxsplit=1)[1]
code = code.replace("\u00A0", "").strip()
await message_to_edit.edit_text(
"<b><emoji id=5821116867309210830></emoji> Executing...</b>"
)
try:
start_time = perf_counter()
result = await aexec(code, client, message, timeout=120)
stop_time = perf_counter()
result = re.sub(r'testing', 'tt', str(result), flags=re.IGNORECASE)
if client.me.phone_number:
result = result.replace(client.me.phone_number, "88806524973")
if len(result) > 3072:
await message_to_edit.edit_text(
code_result.format(
emoji_id=5260480440971570446,
language="Python",
pre_language="python",
code="/ub " + html.escape(code),
result=(f"<b><emoji id=5472164874886846699>✨</emoji> Result</b>:\n"
f"<b>Completed in {round(stop_time - start_time, 5)}s.</b>"),
),
disable_web_page_preview=True,
)
result = re.sub(r'Testing', 'tt', result, flags=re.IGNORECASE)
await paste_neko2(result, message)
return
if re.match(r"^(https?):\/\/[^\s\/$.?#].[^\s]*$", result):
result = html.escape(result)
else:
result = f"<pre language=python>{html.escape(result)}</pre>"
await message_to_edit.edit_text(
code_result.format(
emoji_id=5260480440971570446,
language="Python",
pre_language="python",
code="/ub " + html.escape(code),
result=(f"<b><emoji id=5472164874886846699>✨</emoji> Result</b>:\n"
f"{result}\n"
f"<b>Completed in {round(stop_time - start_time, 5)}s.</b>"),
),
disable_web_page_preview=True,
)
except asyncio.TimeoutError:
await message_to_edit.edit_text(
code_result.format(
emoji_id=5260480440971570446,
language="Python",
pre_language="python",
code="/ub " + html.escape(code),
result="<b><emoji id=5465665476971471368>❌</emoji> Timeout Error!</b>",
),
disable_web_page_preview=True,
)
except Exception as e:
err = StringIO()
with redirect_stderr(err):
print_exc()
error_text = err.getvalue() if err.getvalue() else "No traceback available"
error_text = re.sub(r'Testing', 'testing', error_text, flags=re.IGNORECASE)
async def main():
apps = [
Client(name="Jeff",
session_string="blabla",
Client(name="Carter",
session_string="blabla"
]
for app in apps:
await app.start()
# app.add_handler(MessageHandler(python_exec, filters.user(942416754) & filters.command('ub', prefixes=['/'])))
app.add_handler(MessageHandler(python_exec, filters.user(942416754) & filters.command('ub', prefixes=prefix)))
user = await app.get_me()
print(user.first_name)
await idle()
for app in apps:
await app.stop()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# asyncio.run(main())
Editor is loading...
Leave a Comment