Python program for Telegram

jjj
 avatar
unknown
plain_text
3 years ago
16 kB
475
No Index
import os
import re
import html
import time
import shutil
import logging
import asyncio
import zipfile
import tempfile
import traceback
from pyrogram import StopTransmission
from collections import defaultdict
from natsort import natsorted
from pyrogram.parser import html as pyrogram_html
from pyrogram.errors.exceptions.bad_request_400 import MessageIdInvalid, MessageNotModified
from .. import PROGRESS_UPDATE_DELAY, ADMIN_CHATS, preserved_logs, TESTMODE, SendAsZipFlag, ForceDocumentFlag, LICHER_CHAT, LICHER_STICKER, LICHER_FOOTER, LICHER_PARSE_EPISODE, IGNORE_PADDING_FILE
from .misc import split_files, get_file_mimetype, format_bytes, get_video_info, generate_thumbnail, return_progress_string, calculate_eta, watermark_photo

upload_queue = asyncio.Queue()
upload_statuses = dict()
upload_tamper_lock = asyncio.Lock()
message_exists = defaultdict(set)
message_exists_lock = asyncio.Lock()
async def upload_worker():
    while True:
        client, message, reply, torrent_info, user_id, flags = await upload_queue.get()
        try:
            message_identifier = (reply.chat.id, reply.message_id)
            if SendAsZipFlag not in flags:
                asyncio.create_task(reply.edit_text('Download successful, uploading files...'))
            task = asyncio.create_task(_upload_worker(client, message, reply, torrent_info, user_id, flags))
            upload_statuses[message_identifier] = task, user_id
            await task
        except asyncio.CancelledError:
            text = 'Your leech has been cancelled.'
            await asyncio.gather(reply.edit_text(text), message.reply_text(text))
        except Exception as ex:
            preserved_logs.append((message, torrent_info, ex))
            logging.exception('%s %s', message, torrent_info)
            await message.reply_text(traceback.format_exc(), parse_mode=None)
            for admin_chat in ADMIN_CHATS:
                await client.send_message(admin_chat, traceback.format_exc(), parse_mode=None)
        finally:
            upload_queue.task_done()
        worker_identifier = (reply.chat.id, reply.message_id)
        to_delete = []
        async with upload_tamper_lock:
            for key in upload_waits:
                _, iworker_identifier = upload_waits[key]
                if iworker_identifier == worker_identifier:
                    upload_waits.pop(key)
                    to_delete.append(key[1])
        task = None
        if to_delete:
            task = asyncio.create_task(client.delete_messages(reply.chat.id, to_delete))
        upload_statuses.pop(message_identifier)
        if not TESTMODE:
            shutil.rmtree(torrent_info['dir'])
        if task:
            await task

upload_waits = dict()
async def _upload_worker(client, message, reply, torrent_info, user_id, flags):
    files = dict()
    sent_files = []
    with tempfile.TemporaryDirectory(dir=str(user_id)) as zip_tempdir:
        if SendAsZipFlag in flags:
            if torrent_info.get('bittorrent'):
                filename = torrent_info['bittorrent']['info']['name']
            else:
                filename = os.path.basename(torrent_info['files'][0]['path'])
            filename = filename[-251:] + '.zip'
            filepath = os.path.join(zip_tempdir, filename)
            def _zip_files():
                with zipfile.ZipFile(filepath, 'x') as zipf:
                    for file in torrent_info['files']:
                        filename = file['path'].replace(os.path.join(torrent_info['dir'], ''), '', 1)
                        if IGNORE_PADDING_FILE and re.match(r'(?i)^_+padding_file', filename) is not None:
                            continue
                        zipf.write(file['path'], filename)
            await asyncio.gather(reply.edit_text('Download successful, zipping files...'), client.loop.run_in_executor(None, _zip_files))
            asyncio.create_task(reply.edit_text('Download successful, uploading files...'))
            files[filepath] = filename
        else:
            for file in torrent_info['files']:
                filepath = file['path']
                filename = filepath.replace(os.path.join(torrent_info['dir'], ''), '', 1)
                if IGNORE_PADDING_FILE and re.match(r'(?i)^_+padding_file', filename) is not None:
                    continue
                if LICHER_PARSE_EPISODE:
                    filename = re.sub(r'\s*(?:\[.+?\]|\(.+?\))\s*|\.[a-z][a-z0-9]{2}$', '', os.path.basename(filepath)).strip() or filename
                files[filepath] = filename
        for filepath in natsorted(files):
            sent_files.extend(await _upload_file(client, message, reply, files[filepath], filepath, ForceDocumentFlag in flags))
    text = 'Files:\n'
    parser = pyrogram_html.HTML(client)
    quote = None
    first_index = None
    all_amount = 1
    for filename, filelink in sent_files:
        if filelink:
            atext = f'- <a href="{filelink}">{html.escape(filename)}</a>'
        else:
            atext = f'- {html.escape(filename)} (empty)'
        atext += '\n'
        futtext = text + atext
        if all_amount > 100 or len((await parser.parse(futtext))['message']) > 4096:
            thing = await message.reply_text(text, quote=quote, disable_web_page_preview=True)
            if first_index is None:
                first_index = thing
            quote = False
            futtext = atext
            all_amount = 1
            await asyncio.sleep(PROGRESS_UPDATE_DELAY)
        all_amount += 1
        text = futtext
    if not sent_files:
        text = 'Files: None'
    elif LICHER_CHAT and LICHER_STICKER and message.chat.id in ADMIN_CHATS:
        await client.send_sticker(LICHER_CHAT, LICHER_STICKER)
    thing = await message.reply_text(text, quote=quote, disable_web_page_preview=True)
    if first_index is None:
        first_index = thing
    asyncio.create_task(reply.edit_text(f'Download successful, files uploaded.\nFiles: {first_index.link}', disable_web_page_preview=True))

async def _upload_file(client, message, reply, filename, filepath, force_document):
    if not os.path.getsize(filepath):
        return [(os.path.basename(filename), None)]
    worker_identifier = (reply.chat.id, reply.message_id)
    user_id = message.from_user.id
    user_thumbnail = os.path.join(str(user_id), 'thumbnail.jpg')
    user_watermark = os.path.join(str(user_id), 'watermark.jpg')
    user_watermarked_thumbnail = os.path.join(str(user_id), 'watermarked_thumbnail.jpg')
    file_has_big = os.path.getsize(filepath) > 2097152000
    upload_wait = await reply.reply_text(f'Upload of {html.escape(filename)} will start in {PROGRESS_UPDATE_DELAY}s')
    message_exists[upload_wait.chat.id].add(upload_wait.message_id)
    upload_identifier = (upload_wait.chat.id, upload_wait.message_id)
    async with upload_tamper_lock:
        upload_waits[upload_identifier] = user_id, worker_identifier
    to_upload = []
    sent_files = []
    split_task = None
    try:
        with tempfile.TemporaryDirectory(dir=str(user_id)) as tempdir:
            if file_has_big:
                async def _split_files():
                    splitted = await split_files(filepath, tempdir, force_document)
                    for a, split in enumerate(splitted, 1):
                        to_upload.append((split, filename + f' (part {a})'))
                split_task = asyncio.create_task(_split_files())
            else:
                to_upload.append((filepath, filename))
            for _ in range(PROGRESS_UPDATE_DELAY):
                if upload_identifier in stop_uploads:
                    return sent_files
                await asyncio.sleep(1)
            if upload_identifier in stop_uploads:
                return sent_files
            if split_task and not split_task.done():
                await upload_wait.edit_text(f'Splitting {html.escape(filename)}...')
                while not split_task.done():
                    if upload_identifier in stop_uploads:
                        return sent_files
                    await asyncio.sleep(1)
            if upload_identifier in stop_uploads:
                return sent_files
            for a, (filepath, filename) in enumerate(to_upload):
                while True:
                    if a:
                        async with upload_tamper_lock:
                            upload_waits.pop(upload_identifier)
                            upload_wait = await reply.reply_text(f'Upload of {html.escape(filename)} will start in {PROGRESS_UPDATE_DELAY}s')
                            upload_identifier = (upload_wait.chat.id, upload_wait.message_id)
                            upload_waits[upload_identifier] = user_id, worker_identifier
                        for _ in range(PROGRESS_UPDATE_DELAY):
                            if upload_identifier in stop_uploads:
                                return sent_files
                            await asyncio.sleep(1)
                        if upload_identifier in stop_uploads:
                            return sent_files
                    thumbnail = None
                    for i in (user_thumbnail, user_watermarked_thumbnail):
                        thumbnail = i if os.path.isfile(i) else thumbnail
                    mimetype = await get_file_mimetype(filepath)
                    progress_args = (client, message, upload_wait, filename, user_id)
                    try:
                        if not force_document and mimetype.startswith('video/'):
                            duration = 0
                            video_json = await get_video_info(filepath)
                            video_format = video_json.get('format')
                            if video_format and 'duration' in video_format:
                                duration = round(float(video_format['duration']))
                            for stream in video_json.get('streams', ()):
                                if stream['codec_type'] == 'video':
                                    width = stream.get('width')
                                    height = stream.get('height')
                                    if width and height:
                                        if not thumbnail:
                                            thumbnail = os.path.join(tempdir, '0.jpg')
                                            await generate_thumbnail(filepath, thumbnail)
                                            if os.path.isfile(thumbnail) and os.path.isfile(user_watermark):
                                                othumbnail = thumbnail
                                                thumbnail = os.path.join(tempdir, '1.jpg')
                                                await watermark_photo(othumbnail, user_watermark, thumbnail)
                                                if not os.path.isfile(thumbnail):
                                                    thumbnail = othumbnail
                                            if not os.path.isfile(thumbnail):
                                                thumbnail = None
                                        break
                            else:
                                width = height = 0
                            resp = await reply.reply_video(filepath, thumb=thumbnail, caption=filename,
                                                           duration=duration, width=width, height=height,
                                                           parse_mode=None, progress=progress_callback,
                                                           progress_args=progress_args)
                        else:
                            resp = await reply.reply_document(filepath, thumb=thumbnail, caption=filename,
                                                              parse_mode=None, progress=progress_callback,
                                                              progress_args=progress_args)
                    except StopTransmission:
                        resp = None
                    except Exception:
                        await message.reply_text(traceback.format_exc(), parse_mode=None)
                        break
                    if resp:
                        sent_files.append((os.path.basename(filename), resp.link))
                        if LICHER_CHAT and reply.chat.id in ADMIN_CHATS and mimetype.startswith('video/') and resp.video:
                            await client.send_video(LICHER_CHAT, resp.video.file_id, thumb=thumbnail,
                                                    caption=filename + LICHER_FOOTER, duration=duration,
                                                    width=width, height=height, parse_mode=None)
                        break
                    return sent_files
        return sent_files
    finally:
        if split_task:
            split_task.cancel()
        async with message_exists_lock:
            message_exists[upload_wait.chat.id].discard(upload_wait.message_id)
        asyncio.create_task(upload_wait.delete())
        async with upload_tamper_lock:
            upload_waits.pop(upload_identifier)

progress_callback_data = dict()
stop_uploads = set()
async def progress_callback(current, total, client, message, reply, filename, user_id):
    try:
        if reply.message_id not in message_exists[reply.chat.id]:
            return
        message_identifier = (reply.chat.id, reply.message_id)
        last_edit_time, prevtext, start_time, user_id = progress_callback_data.get(message_identifier, (0, None, time.time(), user_id))
        if message_identifier in stop_uploads or current == total:
            asyncio.create_task(reply.delete())
            try:
                progress_callback_data.pop(message_identifier)
            except KeyError:
                pass
            if message_identifier in stop_uploads:
                client.stop_transmission()
        elif (time.time() - last_edit_time) > PROGRESS_UPDATE_DELAY:
            if last_edit_time:
                upload_speed = format_bytes((total - current) / (time.time() - start_time))
            else:
                upload_speed = '0 B'
            text = f'''Uploading {html.escape(filename)}...
<code>{html.escape(return_progress_string(current, total))}</code>
<b>Total Size:</b> {format_bytes(total)}
<b>Uploaded Size:</b> {format_bytes(current)}
<b>Upload Speed:</b> {upload_speed}/s
<b>ETA:</b> {calculate_eta(current, total, start_time)}'''
            if prevtext != text and reply.message_id in message_exists[reply.chat.id]:
                async with message_exists_lock:
                    if reply.message_id not in message_exists[reply.chat.id]:
                        return
                    try:
                        await reply.edit_text(text)
                    except MessageIdInvalid:
                        message_exists[reply.chat.id].discard(reply.message_id)
                        return
                    except MessageNotModified:
                        pass
                prevtext = text
                last_edit_time = time.time()
                progress_callback_data[message_identifier] = last_edit_time, prevtext, start_time, user_id
    except Exception as ex:
        preserved_logs.append((message, None, ex))
        logging.exception('%s', message)
        await message.reply_text(traceback.format_exc(), parse_mode=None)
        for admin_chat in ADMIN_CHATS:
            await client.send_message(admin_chat, traceback.format_exc(), parse_mode=None)
Editor is loading...