date_middlewares
unknown
python
a year ago
4.5 kB
19
Indexable
Never
from typing import Callable, Awaitable, Dict, Any, Union import pytz, random from aiogram import BaseMiddleware, Router, Bot, F from aiogram.types import TelegramObject, Message, CallbackQuery, InlineKeyboardButton from aiogram.fsm.context import FSMContext from aiogram.filters import StateFilter from aiogram.utils.keyboard import InlineKeyboardBuilder from fluentogram import TranslatorRunner from sqlalchemy.ext.asyncio import AsyncSession from datetime import datetime, timedelta from infrastructure.database.models.base import StorageUsers def generate_random_smiles(): smile_list = ["๐", "๐", "๐ ", "๐คฉ", "๐", "๐ฅณ", "๐", "๐ค", "๐", "๐", "๐", "๐"] random_smiles = random.sample(smile_list, 5) # ะัะฑะธัะฐะตะผ 5 ัะปััะฐะนะฝัั ัะผะฐะนะปะฐ return random_smiles, random.choice(random_smiles) class DateCaptchaMiddleware(BaseMiddleware): def __init__(self): super().__init__() async def __call__( self, handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]], event: TelegramObject, data: Dict[str, Any], ) -> Any: user: StorageUsers = data["user"] state: FSMContext = data["state"] i18n: TranslatorRunner = data["i18n"] msg: Union[Message, CallbackQuery] = data["event_update"].message if data["event_update"].message != None else data["event_update"].callback_query.message try: dates = data["event_update"].callback_query.data if dates.startswith("emoji_"): return await handler(event, data) except: pass if user != None: date_now = datetime.now(tz=pytz.timezone('Europe/Moscow')) user_date: datetime = user.user_date.astimezone(pytz.timezone('Europe/Moscow')) + timedelta(days=2.0) if date_now >= user_date: emojis, emoji = generate_random_smiles() kb = InlineKeyboardBuilder() for i in emojis: kb.row(InlineKeyboardButton(text=i, callback_data=f"emoji_{i}")) kb.adjust(3, 2) msg_to_send = await msg.answer(emoji) await msg.answer(i18n.select.captcha(), reply_markup=kb.as_markup()) await state.update_data({"emoji_suc": emoji, "msg_emoji": msg_to_send.message_id, "event_emoji": event, "data_emoji": data, "handler_emoji": handler}) await state.set_state("set_emoji") return elif user == None: emojis, emoji = generate_random_smiles() kb = InlineKeyboardBuilder() for i in emojis: kb.row(InlineKeyboardButton(text=i, callback_data=f"emoji_{i}")) kb.adjust(3, 2) msg_to_send = await msg.answer(emoji) await msg.answer(i18n.select.captcha(), reply_markup=kb.as_markup()) await state.update_data({"emoji_suc": emoji, "msg_emoji": msg_to_send.message_id, "event_emoji": event, "data_emoji": data, "handler_emoji": handler}) await state.set_state("set_emoji") return return await handler(event, data) router_to_test = Router() @router_to_test.callback_query(StateFilter("set_emoji"), F.data.startswith("emoji_")) async def state_to_test(c: CallbackQuery, state: FSMContext, bot: Bot, session: AsyncSession, user: StorageUsers): emoji_str = c.data.split("_")[1] data = await state.get_data() if data["emoji_suc"] == emoji_str: await state.clear() await bot.delete_message(c.from_user.id, int(data["msg_emoji"])) await c.message.delete() date_now = datetime.now(tz=pytz.timezone('Europe/Moscow')) user.user_date = date_now await session.commit() return await data["handler_emoji"](data["event_emoji"], data["data_emoji"]) else: emojis, emoji = generate_random_smiles() kb = InlineKeyboardBuilder() for i in emojis: kb.row(InlineKeyboardButton(text=i, callback_data=f"emoji_{i}")) kb.adjust(3, 2) await bot.edit_message_text(emoji, c.from_user.id, int(data["msg_emoji"])) await bot.edit_message_text("Tese", c.from_user.id, c.message.message_id, reply_markup=kb.as_markup()) await state.update_data({"emoji_suc": emoji}) await state.set_state("set_emoji")