Untitled
unknown
plain_text
2 years ago
7.8 kB
5
Indexable
# TOKEN = '6125868858:AAH_F-s_DS0Tfgs0ihDHQzZIsO0gUdNGh4o' # DEFAULT_API_KEY: Final = 'df6fce504580a5484bae747566a5c4f722930b30' print('Starting up bot...') from aiogram import types, Bot, Dispatcher, executor from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup import requests from urllib.parse import urlparse from datetime import date import json from datetime import date DATA_FILE = "bot_data.json" API_TOKENS_FILE = "api_tokens.json" CHANNEL_ID = "@whisper_dz" bot = Bot(token='6125868858:AAH_F-s_DS0Tfgs0ihDHQzZIsO0gUdNGh4o') dp = Dispatcher(bot) ADMIN_ID = 1408780397 user_api_tokens = {} try: with open(DATA_FILE, "r") as file: bot_stats = json.load(file) except FileNotFoundError: # If the file doesn't exist, initialize the bot_stats object bot_stats = { "members": 0, "links_shortened": 0, "links_shortened_today": 0, "today": str(date.today()), "user_data": {} } # Load the user API tokens from the file on bot startup try: with open(API_TOKENS_FILE, "r") as file: user_api_tokens = json.load(file) except FileNotFoundError: # If the file doesn't exist, initialize the user_api_tokens dictionary user_api_tokens = {} announcement_message = None def get_qr_code_url(url): return f"https://qr-code.abderrahmanemeh.repl.co/generate_qr?url={url}" def get_admin_markup(): keyboard_markup = types.InlineKeyboardMarkup() keyboard_markup.row( types.InlineKeyboardButton("Members", callback_data="member_count"), types.InlineKeyboardButton("Total", callback_data="total_links")) keyboard_markup.row( types.InlineKeyboardButton("Today", callback_data="today_links"), types.InlineKeyboardButton("See", callback_data="see_members")) keyboard_markup.row( types.InlineKeyboardButton("Announce", callback_data="announce"), types.InlineKeyboardButton("Send Message", callback_data="send_message")) # New button return keyboard_markup def get_start_markup(): keyboard_markup = types.InlineKeyboardMarkup() keyboard_markup.row( types.InlineKeyboardButton("Shorten", callback_data="shorten")) return keyboard_markup @dp.message_handler(commands=['start']) async def process_start_command(message: types.Message): api_key = message.get_args() if api_key: user_api_tokens[message.from_user.id] = api_key bot_stats["members"] += 1 bot_stats["user_data"][message.from_user.id] = message.from_user.username save_bot_data() save_api_tokens() await bot.send_message(message.from_user.id, "تم توصيل حسابك في cut2us بنجاح مع البوت", reply_markup=get_start_markup()) else: await bot.send_message(message.from_user.id, "اظغط على الزر Shorten لأختصار رابطك", reply_markup=get_start_markup()) @dp.callback_query_handler(lambda c: c.data == "shorten") async def process_shorten_callback(callback_query: types.CallbackQuery): user_id = callback_query.from_user.id api_key = user_api_tokens.get(user_id) if not api_key: await bot.send_message( user_id, "حسابك ليس متصلا مع البوت لتوصيله اذهب الى لوحة التحكم على الموقع و اظغط على ايقونة البوت . هذا رابط لوحة التحكم : https://cut2.us/member/dashboard" ) else: await bot.send_message(user_id, "يرجى تقديم الرابط الذي ترغب في اختصاره.") def save_bot_data(): # Save the bot_stats to the file with open(DATA_FILE, "w") as file: json.dump(bot_stats, file) def save_api_tokens(): # Save the user_api_tokens to the file with open(API_TOKENS_FILE, "w") as file: json.dump(user_api_tokens, file) @dp.message_handler(commands=['admin']) async def process_admin_command(message: types.Message): if message.from_user.id == ADMIN_ID: await bot.send_message(message.from_user.id, "Select an option:", reply_markup=get_admin_markup()) else: await bot.send_message(message.from_user.id, "You are not authorized to use this command.") @dp.callback_query_handler(lambda c: c.data in [ "member_count", "total_links", "today_links", "see_members", "announce", "send_message" ]) async def process_callback(callback_query: types.CallbackQuery): global announcement_message if callback_query.from_user.id == ADMIN_ID: if callback_query.data == "send_message": # New condition announcement_message = "Please type the user id and the message you want to send in the format: user_id message" await bot.send_message(callback_query.from_user.id, announcement_message) if callback_query.from_user.id == ADMIN_ID: if callback_query.data == "member_count": await bot.answer_callback_query( callback_query.id, f"Number of members: {bot_stats['members']}") elif callback_query.data == "total_links": await bot.answer_callback_query( callback_query.id, f"Number of links shortened: {bot_stats['links_shortened']}") elif callback_query.data == "today_links": await bot.answer_callback_query( callback_query.id, f"Number of links shortened today: {bot_stats['links_shortened_today']}" ) elif callback_query.data == "see_members": members = "\n".join([ f"{id}: {username}" for id, username in bot_stats["user_data"].items() ]) await bot.send_message(callback_query.from_user.id, members) elif callback_query.data == "announce": announcement_message = "Please type your announcement message." await bot.send_message(callback_query.from_user.id, announcement_message) else: await bot.answer_callback_query( callback_query.id, "You are not authorized to use this command.") @dp.message_handler() async def echo_message(msg: types.Message): global announcement_message user_id = msg.from_user.id api_key = user_api_tokens.get(user_id) if user_id == ADMIN_ID and announcement_message is not None: user_id, message = msg.text.split(" ", 1) if user_id.isdigit(): # Check if the user_id is a number await bot.send_message(int(user_id), message, disable_notification=False) announcement_message = None return if not api_key: await bot.send_message( msg.from_user.id, "حسابك ليس متصلا مع البوت لتوصيله اذهب الى لوحة التحكم على الموقع و اظغط على ايقونة البوت . هذا رابط لوحة التحكم : https://cut2.us/member/dashboard" ) return parsed_url = urlparse(msg.text) if not all([parsed_url.scheme, parsed_url.netloc]): await bot.send_message(user_id, "ها ليس رابطًا صالحًا. يرجى المحاولة مرة أخرى.") return api_url = f"http://cut2.us/api?api={api_key}&url={msg.text}" response = requests.get(api_url) data = response.json() if data['status'] == 'success': bot_stats["links_shortened"] += 1 if bot_stats["today"] == date.today(): bot_stats["links_shortened_today"] += 1 else: bot_stats["today"] = date.today() bot_stats["links_shortened_today"] = 1 shortened_url = data['shortenedUrl'] qr_code_url = get_qr_code_url(shortened_url) await bot.send_message(user_id, f"إليك الرابط المختصر : {shortened_url}") await bot.send_photo(user_id, qr_code_url) else: await bot.send_message( user_id, "حث خطأ أثناء اختصار الرابط الخاص بك. يرجى المحاولة مرة أخرى.") if __name__ == '__main__': executor.start_polling(dp)
Editor is loading...