Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
7.8 kB
2
Indexable
Never
# TOKEN = '6125868858:AAH_F-s_DS0Tfgs0ihDHQzZIsO0gUdNGh4o'
# DEFAULT_API_KEY: Final = 'df6fce504580a5484bae747566a5c4f722930b30'
print('Starting up bot...')

from aiogram import types, Bot, Dispatcher, executor
from aiogram.types import InlineKeyboardButton, InlineKeyboardMarkup
import requests
from urllib.parse import urlparse
from datetime import date
import json
from datetime import date

DATA_FILE = "bot_data.json"
API_TOKENS_FILE = "api_tokens.json"

CHANNEL_ID = "@whisper_dz"

bot = Bot(token='6125868858:AAH_F-s_DS0Tfgs0ihDHQzZIsO0gUdNGh4o')
dp = Dispatcher(bot)

ADMIN_ID = 1408780397

user_api_tokens = {}

try:
  with open(DATA_FILE, "r") as file:
    bot_stats = json.load(file)
except FileNotFoundError:
  # If the file doesn't exist, initialize the bot_stats object
  bot_stats = {
    "members": 0,
    "links_shortened": 0,
    "links_shortened_today": 0,
    "today": str(date.today()),
    "user_data": {}
  }

# Load the user API tokens from the file on bot startup
try:
  with open(API_TOKENS_FILE, "r") as file:
    user_api_tokens = json.load(file)
except FileNotFoundError:
  # If the file doesn't exist, initialize the user_api_tokens dictionary
  user_api_tokens = {}

announcement_message = None


def get_qr_code_url(url):
  return f"https://qr-code.abderrahmanemeh.repl.co/generate_qr?url={url}"


def get_admin_markup():
  keyboard_markup = types.InlineKeyboardMarkup()
  keyboard_markup.row(
    types.InlineKeyboardButton("Members", callback_data="member_count"),
    types.InlineKeyboardButton("Total", callback_data="total_links"))
  keyboard_markup.row(
    types.InlineKeyboardButton("Today", callback_data="today_links"),
    types.InlineKeyboardButton("See", callback_data="see_members"))
  keyboard_markup.row(
    types.InlineKeyboardButton("Announce", callback_data="announce"),
    types.InlineKeyboardButton("Send Message",
                               callback_data="send_message"))  # New button
  return keyboard_markup


def get_start_markup():
  keyboard_markup = types.InlineKeyboardMarkup()
  keyboard_markup.row(
    types.InlineKeyboardButton("Shorten", callback_data="shorten"))
  return keyboard_markup


@dp.message_handler(commands=['start'])
async def process_start_command(message: types.Message):
  api_key = message.get_args()
  if api_key:
    user_api_tokens[message.from_user.id] = api_key
    bot_stats["members"] += 1
    bot_stats["user_data"][message.from_user.id] = message.from_user.username
    save_bot_data()
    save_api_tokens()

    await bot.send_message(message.from_user.id,
                           "تم توصيل حسابك في cut2us بنجاح مع البوت",
                           reply_markup=get_start_markup())
  else:
    await bot.send_message(message.from_user.id,
                           "اظغط على الزر Shorten لأختصار رابطك",
                           reply_markup=get_start_markup())


@dp.callback_query_handler(lambda c: c.data == "shorten")
async def process_shorten_callback(callback_query: types.CallbackQuery):
  user_id = callback_query.from_user.id
  api_key = user_api_tokens.get(user_id)
  if not api_key:
    await bot.send_message(
      user_id,
      "حسابك ليس متصلا مع البوت لتوصيله اذهب الى لوحة التحكم على الموقع و اظغط على ايقونة البوت . هذا رابط لوحة التحكم : https://cut2.us/member/dashboard"
    )
  else:
    await bot.send_message(user_id, "يرجى تقديم الرابط الذي ترغب في اختصاره.")


def save_bot_data():
  # Save the bot_stats to the file
  with open(DATA_FILE, "w") as file:
    json.dump(bot_stats, file)


def save_api_tokens():
  # Save the user_api_tokens to the file
  with open(API_TOKENS_FILE, "w") as file:
    json.dump(user_api_tokens, file)


@dp.message_handler(commands=['admin'])
async def process_admin_command(message: types.Message):
  if message.from_user.id == ADMIN_ID:
    await bot.send_message(message.from_user.id,
                           "Select an option:",
                           reply_markup=get_admin_markup())
  else:
    await bot.send_message(message.from_user.id,
                           "You are not authorized to use this command.")


@dp.callback_query_handler(lambda c: c.data in [
  "member_count", "total_links", "today_links", "see_members", "announce",
  "send_message"
])
async def process_callback(callback_query: types.CallbackQuery):
  global announcement_message
  if callback_query.from_user.id == ADMIN_ID:
    if callback_query.data == "send_message":  # New condition
      announcement_message = "Please type the user id and the message you want to send in the format: user_id message"
      await bot.send_message(callback_query.from_user.id, announcement_message)
  if callback_query.from_user.id == ADMIN_ID:
    if callback_query.data == "member_count":
      await bot.answer_callback_query(
        callback_query.id, f"Number of members: {bot_stats['members']}")
    elif callback_query.data == "total_links":
      await bot.answer_callback_query(
        callback_query.id,
        f"Number of links shortened: {bot_stats['links_shortened']}")
    elif callback_query.data == "today_links":
      await bot.answer_callback_query(
        callback_query.id,
        f"Number of links shortened today: {bot_stats['links_shortened_today']}"
      )
    elif callback_query.data == "see_members":
      members = "\n".join([
        f"{id}: {username}" for id, username in bot_stats["user_data"].items()
      ])
      await bot.send_message(callback_query.from_user.id, members)
    elif callback_query.data == "announce":
      announcement_message = "Please type your announcement message."
      await bot.send_message(callback_query.from_user.id, announcement_message)
  else:
    await bot.answer_callback_query(
      callback_query.id, "You are not authorized to use this command.")


@dp.message_handler()
async def echo_message(msg: types.Message):
  global announcement_message
  user_id = msg.from_user.id
  api_key = user_api_tokens.get(user_id)
  if user_id == ADMIN_ID and announcement_message is not None:
    user_id, message = msg.text.split(" ", 1)
    if user_id.isdigit():  # Check if the user_id is a number
      await bot.send_message(int(user_id), message, disable_notification=False)
    announcement_message = None
    return

  if not api_key:
    await bot.send_message(
      msg.from_user.id,
      "حسابك ليس متصلا مع البوت لتوصيله اذهب الى لوحة التحكم على الموقع و اظغط على ايقونة البوت . هذا رابط لوحة التحكم : https://cut2.us/member/dashboard"
    )
    return
  parsed_url = urlparse(msg.text)
  if not all([parsed_url.scheme, parsed_url.netloc]):
    await bot.send_message(user_id,
                           "ها ليس رابطًا صالحًا. يرجى المحاولة مرة أخرى.")
    return
  api_url = f"http://cut2.us/api?api={api_key}&url={msg.text}"
  response = requests.get(api_url)
  data = response.json()
  if data['status'] == 'success':
    bot_stats["links_shortened"] += 1
    if bot_stats["today"] == date.today():
      bot_stats["links_shortened_today"] += 1
    else:
      bot_stats["today"] = date.today()
      bot_stats["links_shortened_today"] = 1
    shortened_url = data['shortenedUrl']
    qr_code_url = get_qr_code_url(shortened_url)
    await bot.send_message(user_id, f"إليك الرابط المختصر :  {shortened_url}")
    await bot.send_photo(user_id, qr_code_url)

  else:
    await bot.send_message(
      user_id, "حث خطأ أثناء اختصار الرابط الخاص بك. يرجى المحاولة مرة أخرى.")


if __name__ == '__main__':
  executor.start_polling(dp)