date_middlewares
unknown
python
3 years ago
4.5 kB
29
Indexable
from typing import Callable, Awaitable, Dict, Any, Union
import pytz, random
from aiogram import BaseMiddleware, Router, Bot, F
from aiogram.types import TelegramObject, Message, CallbackQuery, InlineKeyboardButton
from aiogram.fsm.context import FSMContext
from aiogram.filters import StateFilter
from aiogram.utils.keyboard import InlineKeyboardBuilder
from fluentogram import TranslatorRunner
from sqlalchemy.ext.asyncio import AsyncSession
from datetime import datetime, timedelta
from infrastructure.database.models.base import StorageUsers
def generate_random_smiles():
smile_list = ["๐", "๐", "๐
", "๐คฉ", "๐", "๐ฅณ", "๐", "๐ค", "๐", "๐", "๐", "๐"]
random_smiles = random.sample(smile_list, 5) # ะัะฑะธัะฐะตะผ 5 ัะปััะฐะนะฝัั
ัะผะฐะนะปะฐ
return random_smiles, random.choice(random_smiles)
class DateCaptchaMiddleware(BaseMiddleware):
def __init__(self):
super().__init__()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: TelegramObject,
data: Dict[str, Any],
) -> Any:
user: StorageUsers = data["user"]
state: FSMContext = data["state"]
i18n: TranslatorRunner = data["i18n"]
msg: Union[Message, CallbackQuery] = data["event_update"].message if data["event_update"].message != None else data["event_update"].callback_query.message
try:
dates = data["event_update"].callback_query.data
if dates.startswith("emoji_"):
return await handler(event, data)
except:
pass
if user != None:
date_now = datetime.now(tz=pytz.timezone('Europe/Moscow'))
user_date: datetime = user.user_date.astimezone(pytz.timezone('Europe/Moscow')) + timedelta(days=2.0)
if date_now >= user_date:
emojis, emoji = generate_random_smiles()
kb = InlineKeyboardBuilder()
for i in emojis:
kb.row(InlineKeyboardButton(text=i, callback_data=f"emoji_{i}"))
kb.adjust(3, 2)
msg_to_send = await msg.answer(emoji)
await msg.answer(i18n.select.captcha(), reply_markup=kb.as_markup())
await state.update_data({"emoji_suc": emoji, "msg_emoji": msg_to_send.message_id, "event_emoji": event, "data_emoji": data, "handler_emoji": handler})
await state.set_state("set_emoji")
return
elif user == None:
emojis, emoji = generate_random_smiles()
kb = InlineKeyboardBuilder()
for i in emojis:
kb.row(InlineKeyboardButton(text=i, callback_data=f"emoji_{i}"))
kb.adjust(3, 2)
msg_to_send = await msg.answer(emoji)
await msg.answer(i18n.select.captcha(), reply_markup=kb.as_markup())
await state.update_data({"emoji_suc": emoji, "msg_emoji": msg_to_send.message_id, "event_emoji": event, "data_emoji": data, "handler_emoji": handler})
await state.set_state("set_emoji")
return
return await handler(event, data)
router_to_test = Router()
@router_to_test.callback_query(StateFilter("set_emoji"), F.data.startswith("emoji_"))
async def state_to_test(c: CallbackQuery, state: FSMContext, bot: Bot, session: AsyncSession, user: StorageUsers):
emoji_str = c.data.split("_")[1]
data = await state.get_data()
if data["emoji_suc"] == emoji_str:
await state.clear()
await bot.delete_message(c.from_user.id, int(data["msg_emoji"]))
await c.message.delete()
date_now = datetime.now(tz=pytz.timezone('Europe/Moscow'))
user.user_date = date_now
await session.commit()
return await data["handler_emoji"](data["event_emoji"], data["data_emoji"])
else:
emojis, emoji = generate_random_smiles()
kb = InlineKeyboardBuilder()
for i in emojis:
kb.row(InlineKeyboardButton(text=i, callback_data=f"emoji_{i}"))
kb.adjust(3, 2)
await bot.edit_message_text(emoji, c.from_user.id, int(data["msg_emoji"]))
await bot.edit_message_text("Tese", c.from_user.id, c.message.message_id, reply_markup=kb.as_markup())
await state.update_data({"emoji_suc": emoji})
await state.set_state("set_emoji")Editor is loading...