Python program for Telegram
jjjunknown
plain_text
3 years ago
16 kB
475
No Index
import os import re import html import time import shutil import logging import asyncio import zipfile import tempfile import traceback from pyrogram import StopTransmission from collections import defaultdict from natsort import natsorted from pyrogram.parser import html as pyrogram_html from pyrogram.errors.exceptions.bad_request_400 import MessageIdInvalid, MessageNotModified from .. import PROGRESS_UPDATE_DELAY, ADMIN_CHATS, preserved_logs, TESTMODE, SendAsZipFlag, ForceDocumentFlag, LICHER_CHAT, LICHER_STICKER, LICHER_FOOTER, LICHER_PARSE_EPISODE, IGNORE_PADDING_FILE from .misc import split_files, get_file_mimetype, format_bytes, get_video_info, generate_thumbnail, return_progress_string, calculate_eta, watermark_photo upload_queue = asyncio.Queue() upload_statuses = dict() upload_tamper_lock = asyncio.Lock() message_exists = defaultdict(set) message_exists_lock = asyncio.Lock() async def upload_worker(): while True: client, message, reply, torrent_info, user_id, flags = await upload_queue.get() try: message_identifier = (reply.chat.id, reply.message_id) if SendAsZipFlag not in flags: asyncio.create_task(reply.edit_text('Download successful, uploading files...')) task = asyncio.create_task(_upload_worker(client, message, reply, torrent_info, user_id, flags)) upload_statuses[message_identifier] = task, user_id await task except asyncio.CancelledError: text = 'Your leech has been cancelled.' await asyncio.gather(reply.edit_text(text), message.reply_text(text)) except Exception as ex: preserved_logs.append((message, torrent_info, ex)) logging.exception('%s %s', message, torrent_info) await message.reply_text(traceback.format_exc(), parse_mode=None) for admin_chat in ADMIN_CHATS: await client.send_message(admin_chat, traceback.format_exc(), parse_mode=None) finally: upload_queue.task_done() worker_identifier = (reply.chat.id, reply.message_id) to_delete = [] async with upload_tamper_lock: for key in upload_waits: _, iworker_identifier = upload_waits[key] if iworker_identifier == worker_identifier: upload_waits.pop(key) to_delete.append(key[1]) task = None if to_delete: task = asyncio.create_task(client.delete_messages(reply.chat.id, to_delete)) upload_statuses.pop(message_identifier) if not TESTMODE: shutil.rmtree(torrent_info['dir']) if task: await task upload_waits = dict() async def _upload_worker(client, message, reply, torrent_info, user_id, flags): files = dict() sent_files = [] with tempfile.TemporaryDirectory(dir=str(user_id)) as zip_tempdir: if SendAsZipFlag in flags: if torrent_info.get('bittorrent'): filename = torrent_info['bittorrent']['info']['name'] else: filename = os.path.basename(torrent_info['files'][0]['path']) filename = filename[-251:] + '.zip' filepath = os.path.join(zip_tempdir, filename) def _zip_files(): with zipfile.ZipFile(filepath, 'x') as zipf: for file in torrent_info['files']: filename = file['path'].replace(os.path.join(torrent_info['dir'], ''), '', 1) if IGNORE_PADDING_FILE and re.match(r'(?i)^_+padding_file', filename) is not None: continue zipf.write(file['path'], filename) await asyncio.gather(reply.edit_text('Download successful, zipping files...'), client.loop.run_in_executor(None, _zip_files)) asyncio.create_task(reply.edit_text('Download successful, uploading files...')) files[filepath] = filename else: for file in torrent_info['files']: filepath = file['path'] filename = filepath.replace(os.path.join(torrent_info['dir'], ''), '', 1) if IGNORE_PADDING_FILE and re.match(r'(?i)^_+padding_file', filename) is not None: continue if LICHER_PARSE_EPISODE: filename = re.sub(r'\s*(?:\[.+?\]|\(.+?\))\s*|\.[a-z][a-z0-9]{2}$', '', os.path.basename(filepath)).strip() or filename files[filepath] = filename for filepath in natsorted(files): sent_files.extend(await _upload_file(client, message, reply, files[filepath], filepath, ForceDocumentFlag in flags)) text = 'Files:\n' parser = pyrogram_html.HTML(client) quote = None first_index = None all_amount = 1 for filename, filelink in sent_files: if filelink: atext = f'- <a href="{filelink}">{html.escape(filename)}</a>' else: atext = f'- {html.escape(filename)} (empty)' atext += '\n' futtext = text + atext if all_amount > 100 or len((await parser.parse(futtext))['message']) > 4096: thing = await message.reply_text(text, quote=quote, disable_web_page_preview=True) if first_index is None: first_index = thing quote = False futtext = atext all_amount = 1 await asyncio.sleep(PROGRESS_UPDATE_DELAY) all_amount += 1 text = futtext if not sent_files: text = 'Files: None' elif LICHER_CHAT and LICHER_STICKER and message.chat.id in ADMIN_CHATS: await client.send_sticker(LICHER_CHAT, LICHER_STICKER) thing = await message.reply_text(text, quote=quote, disable_web_page_preview=True) if first_index is None: first_index = thing asyncio.create_task(reply.edit_text(f'Download successful, files uploaded.\nFiles: {first_index.link}', disable_web_page_preview=True)) async def _upload_file(client, message, reply, filename, filepath, force_document): if not os.path.getsize(filepath): return [(os.path.basename(filename), None)] worker_identifier = (reply.chat.id, reply.message_id) user_id = message.from_user.id user_thumbnail = os.path.join(str(user_id), 'thumbnail.jpg') user_watermark = os.path.join(str(user_id), 'watermark.jpg') user_watermarked_thumbnail = os.path.join(str(user_id), 'watermarked_thumbnail.jpg') file_has_big = os.path.getsize(filepath) > 2097152000 upload_wait = await reply.reply_text(f'Upload of {html.escape(filename)} will start in {PROGRESS_UPDATE_DELAY}s') message_exists[upload_wait.chat.id].add(upload_wait.message_id) upload_identifier = (upload_wait.chat.id, upload_wait.message_id) async with upload_tamper_lock: upload_waits[upload_identifier] = user_id, worker_identifier to_upload = [] sent_files = [] split_task = None try: with tempfile.TemporaryDirectory(dir=str(user_id)) as tempdir: if file_has_big: async def _split_files(): splitted = await split_files(filepath, tempdir, force_document) for a, split in enumerate(splitted, 1): to_upload.append((split, filename + f' (part {a})')) split_task = asyncio.create_task(_split_files()) else: to_upload.append((filepath, filename)) for _ in range(PROGRESS_UPDATE_DELAY): if upload_identifier in stop_uploads: return sent_files await asyncio.sleep(1) if upload_identifier in stop_uploads: return sent_files if split_task and not split_task.done(): await upload_wait.edit_text(f'Splitting {html.escape(filename)}...') while not split_task.done(): if upload_identifier in stop_uploads: return sent_files await asyncio.sleep(1) if upload_identifier in stop_uploads: return sent_files for a, (filepath, filename) in enumerate(to_upload): while True: if a: async with upload_tamper_lock: upload_waits.pop(upload_identifier) upload_wait = await reply.reply_text(f'Upload of {html.escape(filename)} will start in {PROGRESS_UPDATE_DELAY}s') upload_identifier = (upload_wait.chat.id, upload_wait.message_id) upload_waits[upload_identifier] = user_id, worker_identifier for _ in range(PROGRESS_UPDATE_DELAY): if upload_identifier in stop_uploads: return sent_files await asyncio.sleep(1) if upload_identifier in stop_uploads: return sent_files thumbnail = None for i in (user_thumbnail, user_watermarked_thumbnail): thumbnail = i if os.path.isfile(i) else thumbnail mimetype = await get_file_mimetype(filepath) progress_args = (client, message, upload_wait, filename, user_id) try: if not force_document and mimetype.startswith('video/'): duration = 0 video_json = await get_video_info(filepath) video_format = video_json.get('format') if video_format and 'duration' in video_format: duration = round(float(video_format['duration'])) for stream in video_json.get('streams', ()): if stream['codec_type'] == 'video': width = stream.get('width') height = stream.get('height') if width and height: if not thumbnail: thumbnail = os.path.join(tempdir, '0.jpg') await generate_thumbnail(filepath, thumbnail) if os.path.isfile(thumbnail) and os.path.isfile(user_watermark): othumbnail = thumbnail thumbnail = os.path.join(tempdir, '1.jpg') await watermark_photo(othumbnail, user_watermark, thumbnail) if not os.path.isfile(thumbnail): thumbnail = othumbnail if not os.path.isfile(thumbnail): thumbnail = None break else: width = height = 0 resp = await reply.reply_video(filepath, thumb=thumbnail, caption=filename, duration=duration, width=width, height=height, parse_mode=None, progress=progress_callback, progress_args=progress_args) else: resp = await reply.reply_document(filepath, thumb=thumbnail, caption=filename, parse_mode=None, progress=progress_callback, progress_args=progress_args) except StopTransmission: resp = None except Exception: await message.reply_text(traceback.format_exc(), parse_mode=None) break if resp: sent_files.append((os.path.basename(filename), resp.link)) if LICHER_CHAT and reply.chat.id in ADMIN_CHATS and mimetype.startswith('video/') and resp.video: await client.send_video(LICHER_CHAT, resp.video.file_id, thumb=thumbnail, caption=filename + LICHER_FOOTER, duration=duration, width=width, height=height, parse_mode=None) break return sent_files return sent_files finally: if split_task: split_task.cancel() async with message_exists_lock: message_exists[upload_wait.chat.id].discard(upload_wait.message_id) asyncio.create_task(upload_wait.delete()) async with upload_tamper_lock: upload_waits.pop(upload_identifier) progress_callback_data = dict() stop_uploads = set() async def progress_callback(current, total, client, message, reply, filename, user_id): try: if reply.message_id not in message_exists[reply.chat.id]: return message_identifier = (reply.chat.id, reply.message_id) last_edit_time, prevtext, start_time, user_id = progress_callback_data.get(message_identifier, (0, None, time.time(), user_id)) if message_identifier in stop_uploads or current == total: asyncio.create_task(reply.delete()) try: progress_callback_data.pop(message_identifier) except KeyError: pass if message_identifier in stop_uploads: client.stop_transmission() elif (time.time() - last_edit_time) > PROGRESS_UPDATE_DELAY: if last_edit_time: upload_speed = format_bytes((total - current) / (time.time() - start_time)) else: upload_speed = '0 B' text = f'''Uploading {html.escape(filename)}... <code>{html.escape(return_progress_string(current, total))}</code> <b>Total Size:</b> {format_bytes(total)} <b>Uploaded Size:</b> {format_bytes(current)} <b>Upload Speed:</b> {upload_speed}/s <b>ETA:</b> {calculate_eta(current, total, start_time)}''' if prevtext != text and reply.message_id in message_exists[reply.chat.id]: async with message_exists_lock: if reply.message_id not in message_exists[reply.chat.id]: return try: await reply.edit_text(text) except MessageIdInvalid: message_exists[reply.chat.id].discard(reply.message_id) return except MessageNotModified: pass prevtext = text last_edit_time = time.time() progress_callback_data[message_identifier] = last_edit_time, prevtext, start_time, user_id except Exception as ex: preserved_logs.append((message, None, ex)) logging.exception('%s', message) await message.reply_text(traceback.format_exc(), parse_mode=None) for admin_chat in ADMIN_CHATS: await client.send_message(admin_chat, traceback.format_exc(), parse_mode=None)
Editor is loading...