Token Sys
plain_text
a month ago
3.2 kB
11
Indexable
Never
from asyncio import sleep from datetime import datetime, timedelta, timezone import os from os import environ from time import time from re import match as re_match from uuid import uuid4 from button_build import ButtonMaker from message_utils import sendMessage, get_readable_time, deleteMessage from pyrogram import Client, filters from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup from pyrogram.handlers import MessageHandler, CallbackQueryHandler from pyrogram.filters import command from pyrogram.handlers import MessageHandler from multishort import short_url from message_utils import LOGGER bot_name = environ.get('bot_name', '') TOKEN_TIMEOUT = environ.get('TOKEN_TIMEOUT', '60') config_dict = {"TOKEN_TIMEOUT": TOKEN_TIMEOUT} user_data = {} # Create a new Pyrogram client SESSION = "TOKENBOT" # Replace with your desired session name API_ID = "" # Replace with your actual API ID API_HASH = "" # Replace with your actual API hash BOT_TOKEN = "" # Replace with your actual bot token # Create a new Pyrogram client app = Client( name=SESSION, api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN ) @app.on_message(filters.command("start") & filters.private) async def start(_, message): if len(message.command) > 1: userid = message.from_user.id input_token = message.command[1] if userid not in user_data: return await sendMessage(message, 'Who are you?') data = user_data[userid] if 'token' not in data or data['token'] != input_token: return await sendMessage(message, 'This is a token already expired') data['token'] = str(uuid4()) data['time'] = time() user_data[userid].update(data) return await sendMessage(message, 'Token refreshed successfully!') else: start_string = '🌹 Welcome To One Of A Modified Anasty Mirror Bot\n' \ 'This bot can Mirror all your links To Google Drive!\n' \ '👨🏽💻 Powered By: @JMDKH_Team' await sendMessage(message, start_string) def checking_access(user_id, button=None): if not config_dict['TOKEN_TIMEOUT']: return None, button user_data.setdefault(user_id, {}) data = user_data[user_id] expire = data.get('time') isExpired = (expire is None or expire is not None and ( time() - expire) > config_dict['TOKEN_TIMEOUT']) if isExpired: token = data['token'] if expire is None and 'token' in data else str( uuid4()) if expire is not None: del data['time'] data['token'] = token user_data[user_id].update(data) if button is None: button = ButtonMaker() button.ubutton('Refresh Token', short_url( f'https://t.me/{bot_name}?start={token}')) return 'Token is expired, refresh your token and try again.', button return None, button async def main(): # Add handlers to the client app.add_handler(MessageHandler( start, filters=filters.command("start") & filters.private)) # Start the client if __name__ == "__main__": app.run()