Untitled
unknown
plain_text
2 years ago
7.8 kB
6
Indexable
# TOKEN = '6125868858:AAH_F-s_DS0Tfgs0ihDHQzZIsO0gUdNGh4o'
# DEFAULT_API_KEY: Final = 'df6fce504580a5484bae747566a5c4f722930b30'
print('Starting up bot...')
from aiogram import types, Bot, Dispatcher, executor
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
import requests
from urllib.parse import urlparse
from datetime import date
import json
from datetime import date
DATA_FILE = "bot_data.json"
API_TOKENS_FILE = "api_tokens.json"
CHANNEL_ID = "@whisper_dz"
bot = Bot(token='6125868858:AAH_F-s_DS0Tfgs0ihDHQzZIsO0gUdNGh4o')
dp = Dispatcher(bot)
ADMIN_ID = 1408780397
user_api_tokens = {}
try:
with open(DATA_FILE, "r") as file:
bot_stats = json.load(file)
except FileNotFoundError:
# If the file doesn't exist, initialize the bot_stats object
bot_stats = {
"members": 0,
"links_shortened": 0,
"links_shortened_today": 0,
"today": str(date.today()),
"user_data": {}
}
# Load the user API tokens from the file on bot startup
try:
with open(API_TOKENS_FILE, "r") as file:
user_api_tokens = json.load(file)
except FileNotFoundError:
# If the file doesn't exist, initialize the user_api_tokens dictionary
user_api_tokens = {}
announcement_message = None
def get_qr_code_url(url):
return f"https://qr-code.abderrahmanemeh.repl.co/generate_qr?url={url}"
def get_admin_markup():
keyboard_markup = types.InlineKeyboardMarkup()
keyboard_markup.row(
types.InlineKeyboardButton("Members", callback_data="member_count"),
types.InlineKeyboardButton("Total", callback_data="total_links"))
keyboard_markup.row(
types.InlineKeyboardButton("Today", callback_data="today_links"),
types.InlineKeyboardButton("See", callback_data="see_members"))
keyboard_markup.row(
types.InlineKeyboardButton("Announce", callback_data="announce"),
types.InlineKeyboardButton("Send Message",
callback_data="send_message")) # New button
return keyboard_markup
def get_start_markup():
keyboard_markup = types.InlineKeyboardMarkup()
keyboard_markup.row(
types.InlineKeyboardButton("Shorten", callback_data="shorten"))
return keyboard_markup
@dp.message_handler(commands=['start'])
async def process_start_command(message: types.Message):
api_key = message.get_args()
if api_key:
user_api_tokens[message.from_user.id] = api_key
bot_stats["members"] += 1
bot_stats["user_data"][message.from_user.id] = message.from_user.username
save_bot_data()
save_api_tokens()
await bot.send_message(message.from_user.id,
"تم توصيل حسابك في cut2us بنجاح مع البوت",
reply_markup=get_start_markup())
else:
await bot.send_message(message.from_user.id,
"اظغط على الزر Shorten لأختصار رابطك",
reply_markup=get_start_markup())
@dp.callback_query_handler(lambda c: c.data == "shorten")
async def process_shorten_callback(callback_query: types.CallbackQuery):
user_id = callback_query.from_user.id
api_key = user_api_tokens.get(user_id)
if not api_key:
await bot.send_message(
user_id,
"حسابك ليس متصلا مع البوت لتوصيله اذهب الى لوحة التحكم على الموقع و اظغط على ايقونة البوت . هذا رابط لوحة التحكم : https://cut2.us/member/dashboard"
)
else:
await bot.send_message(user_id, "يرجى تقديم الرابط الذي ترغب في اختصاره.")
def save_bot_data():
# Save the bot_stats to the file
with open(DATA_FILE, "w") as file:
json.dump(bot_stats, file)
def save_api_tokens():
# Save the user_api_tokens to the file
with open(API_TOKENS_FILE, "w") as file:
json.dump(user_api_tokens, file)
@dp.message_handler(commands=['admin'])
async def process_admin_command(message: types.Message):
if message.from_user.id == ADMIN_ID:
await bot.send_message(message.from_user.id,
"Select an option:",
reply_markup=get_admin_markup())
else:
await bot.send_message(message.from_user.id,
"You are not authorized to use this command.")
@dp.callback_query_handler(lambda c: c.data in [
"member_count", "total_links", "today_links", "see_members", "announce",
"send_message"
])
async def process_callback(callback_query: types.CallbackQuery):
global announcement_message
if callback_query.from_user.id == ADMIN_ID:
if callback_query.data == "send_message": # New condition
announcement_message = "Please type the user id and the message you want to send in the format: user_id message"
await bot.send_message(callback_query.from_user.id, announcement_message)
if callback_query.from_user.id == ADMIN_ID:
if callback_query.data == "member_count":
await bot.answer_callback_query(
callback_query.id, f"Number of members: {bot_stats['members']}")
elif callback_query.data == "total_links":
await bot.answer_callback_query(
callback_query.id,
f"Number of links shortened: {bot_stats['links_shortened']}")
elif callback_query.data == "today_links":
await bot.answer_callback_query(
callback_query.id,
f"Number of links shortened today: {bot_stats['links_shortened_today']}"
)
elif callback_query.data == "see_members":
members = "\n".join([
f"{id}: {username}" for id, username in bot_stats["user_data"].items()
])
await bot.send_message(callback_query.from_user.id, members)
elif callback_query.data == "announce":
announcement_message = "Please type your announcement message."
await bot.send_message(callback_query.from_user.id, announcement_message)
else:
await bot.answer_callback_query(
callback_query.id, "You are not authorized to use this command.")
@dp.message_handler()
async def echo_message(msg: types.Message):
global announcement_message
user_id = msg.from_user.id
api_key = user_api_tokens.get(user_id)
if user_id == ADMIN_ID and announcement_message is not None:
user_id, message = msg.text.split(" ", 1)
if user_id.isdigit(): # Check if the user_id is a number
await bot.send_message(int(user_id), message, disable_notification=False)
announcement_message = None
return
if not api_key:
await bot.send_message(
msg.from_user.id,
"حسابك ليس متصلا مع البوت لتوصيله اذهب الى لوحة التحكم على الموقع و اظغط على ايقونة البوت . هذا رابط لوحة التحكم : https://cut2.us/member/dashboard"
)
return
parsed_url = urlparse(msg.text)
if not all([parsed_url.scheme, parsed_url.netloc]):
await bot.send_message(user_id,
"ها ليس رابطًا صالحًا. يرجى المحاولة مرة أخرى.")
return
api_url = f"http://cut2.us/api?api={api_key}&url={msg.text}"
response = requests.get(api_url)
data = response.json()
if data['status'] == 'success':
bot_stats["links_shortened"] += 1
if bot_stats["today"] == date.today():
bot_stats["links_shortened_today"] += 1
else:
bot_stats["today"] = date.today()
bot_stats["links_shortened_today"] = 1
shortened_url = data['shortenedUrl']
qr_code_url = get_qr_code_url(shortened_url)
await bot.send_message(user_id, f"إليك الرابط المختصر : {shortened_url}")
await bot.send_photo(user_id, qr_code_url)
else:
await bot.send_message(
user_id, "حث خطأ أثناء اختصار الرابط الخاص بك. يرجى المحاولة مرة أخرى.")
if __name__ == '__main__':
executor.start_polling(dp)
Editor is loading...