Untitled

 avatar
4ae4d
plain_text
9 days ago
47 kB
4
Indexable
--- core/markups/project.py ---
from dialog_bot_sdk.interactive_media import Button, InteractiveMediaGroup, MediaGroupBuilder

from core.markups.pagination import pagination_keyboard
from core.schemas import ProjectCardSchema


def format_project_details(
    card: ProjectCardSchema,
    *,
    show_organizer: bool,
    show_active: bool = False,
    make_frame: bool = False,
) -> str:
    """формирует читаемое описание из ProjectCardSchema"""

    def make_bold(string: str) -> str:
        return f"*{string}*" if not make_frame else string

    def make_block(string: str) -> str:
        return f"`{string}`" if not make_frame else string

    def make_title(string: str) -> str:
        return f"{string}\n" if make_frame else f"{string}\n\n"

    lines: list[str] = [
        f"📣 {make_bold('Название')}: {card.project.title}",
        f"🏛️ {make_bold('Тербанк')}: {card.terbank.name}",
    ]

    if show_active:
        lines.append("")
        lines.append("✅ Согласовано" if card.project.active else "⏳ На согласовании")

    lines.extend(["", "🔧 Техническая информация:"])
    if show_organizer:
        lines.append(f"Leader id: {make_block(card.project.leader_id or '—')}")
    lines.append(f"ID: {make_block(card.project.id)}")

    result = "\n".join(lines)

    return make_title("🌳 *Проект*") + ("```plain\n" + result + "\n```" if make_frame else result)


# for moderation
def all_projects_pagination_keyboard(*, offset: int, limit: int, has_prev: bool, has_next: bool):
    return pagination_keyboard(
        offset=offset,
        limit=limit,
        has_prev=has_prev,
        has_next=has_next,
        prev_media_id="all_projects_prev",
        next_media_id="all_projects_next",
        leave_media_id="moderation",
        leave_label="🛡️ В мастерскую",
        keep_layout=True,
        add_view_by_uuid_button=True,
        view_by_uuid_value="moderation",
    )


# for user
def user_projects_pagination_keyboard(*, offset: int, limit: int, has_prev: bool, has_next: bool):
    return pagination_keyboard(
        offset=offset,
        limit=limit,
        has_prev=has_prev,
        has_next=has_next,
        prev_media_id="user_projects_prev",
        next_media_id="user_projects_next",
        leave_media_id="volunteer_home",
        leave_label="🏡 В дом волонтёра",
        keep_layout=True,
        add_view_by_uuid_button=True,
        view_by_uuid_value="projects",
    )


def _project_button_label(project_card: ProjectCardSchema, ctx: str | None = None) -> str:
    """Формирует подпись для кнопки проекта"""
    title = getattr(project_card.project, "title", "—")

    if ctx == "my_projects":
        status = "на согласовании ⏳"
        if project_card.project.active:
            status = "согласовано ✅"
        return f"{title} — {status}"

    return f"{title}"


def project_select_keyboard(
    project_cards: list[ProjectCardSchema],
    *,
    open_media_id: str,
    start_index: int = 1,
    per_row: int = 1,
    label_max: int = 120,
    ctx: str | None = None,
) -> list[InteractiveMediaGroup]:
    """
    строит список кнопок по проектам
    нажатие вызывает handler по _id  = open_media_id, value = project.id
    """
    groups: list[InteractiveMediaGroup] = []
    row: list[Button] = []

    idx = start_index
    for project_card in project_cards:
        label = _project_button_label(project_card, ctx)
        row.append(
            Button(
                media_id=open_media_id,
                value=str(getattr(project_card.project, "id", "")),
                label=label,
            )
        )
        idx += 1

        if len(row) >= per_row:
            groups.extend(MediaGroupBuilder(row).build())
            row = []

    if row:
        groups.extend(MediaGroupBuilder(row).build())

    return groups


def project_actions_keyboard(
    project_id: str,
    *,
    can_edit: bool,
    back_value: str | None,
    back_label: str,
    edit_media_id: str = "project_menu_edit",
    delete_media_id: str = "project_menu_delete",
    ctx: str | None = None,
) -> list[InteractiveMediaGroup]:
    actions = []
    v = f"{project_id}|{ctx}" if ctx else project_id

    if can_edit:
        actions.append(Button(media_id=edit_media_id, value=v, label="✏️ Редактировать"))
        actions.append(Button(media_id=delete_media_id, value=v, label="🗑 Удалить"))

    if back_value is not None:
        actions.append(Button(media_id=back_value, value="", label=back_label))

    return MediaGroupBuilder(actions).build()

--- core/handlers/projects.py ---
from dialog_bot_sdk.entities.messaging import UpdateInteractiveMediaEvent

from core.bot_kit.fsm import FSMContext
from core.bot_kit.router import Router
from core.config import bot
from core.handlers.projects_ui import (
    send_project_card,
    send_project_cards_page,
    split_project_value,
)
from core.schemas import UserSchema
from core.services import ProjectService
from core.utils import clear_context_keep_events_filters, delete_prev_message_by_peer

user_projects_rt = Router()


@bot.di
def projects_menu_handler(
    event: UpdateInteractiveMediaEvent, context: FSMContext, project_service: ProjectService
):
    delete_prev_message_by_peer(bot, event.peer)
    clear_context_keep_events_filters(context)

    send_project_cards_page(
        event.peer,
        offset=0,
        project_service=project_service,
        ctx_name="projects",
        context=context,
    )


@bot.di
def user_projects_page_handler(
    event: UpdateInteractiveMediaEvent, context: FSMContext, project_service: ProjectService
):
    delete_prev_message_by_peer(bot, event.peer)

    try:
        offset = int(event.data.value)
        if offset < 0:
            offset = 0
    except Exception:
        offset = 0

    send_project_cards_page(
        event.peer,
        offset=0,
        project_service=project_service,
        ctx_name="projects",
        context=context,
    )


@bot.di
def user_projects_open_handler(
    event: UpdateInteractiveMediaEvent,
    context: FSMContext,
    project_service: ProjectService,
    user: UserSchema | None,
):
    delete_prev_message_by_peer(bot, event.peer)
    context.set_state(None)

    project_id, ctx = split_project_value(event.data.value)
    ctx_name = ctx or "projects"

    send_project_card(
        event.peer,
        project_id=project_id,
        project_service=project_service,
        user=user,
        ctx_name=ctx_name,
    )

--- core/handlers/projects_ui.py ---
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import Any

from dialog_bot_sdk.interactive_media import Button, InteractiveMediaGroup, MediaGroupBuilder

from core.bot_kit.fsm import FSMContext
from core.config import bot
from core.handlers.project_pages import PROJECTS_PAGE_SIZE, fetch_projects_page
from core.markups import (
    format_project_details,
    project_actions_keyboard,
    project_select_keyboard,
    user_projects_pagination_keyboard,
)
from core.schemas import ProjectCardSchema, ProjectCardsPageSchema, UserSchema
from core.services import ProjectService


@dataclass(frozen=True)
class ProjectUIContext:
    name: str
    # list
    list_open_media_id: str
    pagination_builder: Callable[..., list[InteractiveMediaGroup]]
    extra_groups_builder: Callable[[], list[InteractiveMediaGroup]] | None
    # card
    back_value: str | None
    back_label: str


_UI: dict[str, ProjectUIContext] = {
    # дом волонтёра -> все проекты
    "projects": ProjectUIContext(
        name="projects",
        list_open_media_id="user_projects_open",
        pagination_builder=user_projects_pagination_keyboard,
        extra_groups_builder=None,
        back_value="projects",
        back_label="⬅️ К списку проектов",
    ),
}


def get_ui(ctx_name: str) -> ProjectUIContext:
    return _UI.get(ctx_name, _UI["projects"])


def build_back_keyboard(ctx_name: str) -> list[InteractiveMediaGroup]:
    ui = get_ui(ctx_name)
    if not ui.back_value:
        return []
    return MediaGroupBuilder(
        [Button(media_id=ui.back_value, value="", label=ui.back_label)]
    ).build()


def split_project_value(raw: Any) -> tuple[str, str | None]:
    """
    ожидаем:
      - "<project_id>"
      - "<project_id>|<ctx>"
    """
    s = str(raw or "").strip()
    if "|" in s:
        ev_id, ctx = s.split("|", 1)
        return ev_id.strip(), (ctx.strip() or None)
    return s, None


def _send_project_list_screen(
    peer,
    *,
    ui: ProjectUIContext,
    ctx_name: str,
    offset: int,
    limit: int,
    page_data: ProjectCardsPageSchema,
    note: str | None = None,
    header: str | None = None,
    empty_text: str | None = None,
    empty_groups: list[InteractiveMediaGroup] | None = None,
    filter_summary: str | None = None,
    show_active: bool = False,
):
    project_cards = page_data.project_cards
    has_prev = page_data.has_prev
    has_next = page_data.has_next

    prefix = f"{note}\n\n" if note else ""
    header_part = (header if header is not None else "🌳 Проекты") + "\n\n"
    filter_part = f"\n\n{filter_summary}" if filter_summary else ""

    if not project_cards:  # проверяем карточки
        text = (
            prefix
            + header_part
            + (
                empty_text
                or "⚠️ Проектов не найдено.\n\nПопробуй изменить критерии или вернись назад."
            )
            + filter_part
        )
        im: list[InteractiveMediaGroup] = []
        footer: list[InteractiveMediaGroup] = []
        if empty_groups is not None:
            footer = empty_groups
        elif ui.extra_groups_builder:
            footer = ui.extra_groups_builder()
        # TODO: im += projects_filters_button_keyboard(ctx_name=ctx_name, offset=offset)
        im += ui.pagination_builder(
            offset=offset,
            limit=limit,
            has_prev=has_prev,
            has_next=has_next,
        )
        im += footer
        bot.messaging.send_message(peer=peer, text=text, interactive_media_groups=im)
        return

    text = (
        prefix
        + header_part
        + f"Страница: {page_data.current_page} / {page_data.total_pages}\n\n"
        + "Выбери проект:"
        + filter_part
    )

    im: list[InteractiveMediaGroup] = []
    im += project_select_keyboard(
        project_cards=project_cards,
        ctx=ctx_name,
        open_media_id=ui.list_open_media_id,
        start_index=offset + 1,
        per_row=1,
    )
    # TODO: im += projects_filters_button_keyboard(ctx_name=ctx_name, offset=offset)
    im += ui.pagination_builder(offset=offset, limit=limit, has_prev=has_prev, has_next=has_next)

    if ui.extra_groups_builder:
        im += ui.extra_groups_builder()

    bot.messaging.send_message(peer=peer, text=text, interactive_media_groups=im)


def send_project_cards_page(
    peer,
    *,
    offset: int,
    project_service: ProjectService,
    ctx_name: str,
    note: str | None = None,
    context: FSMContext | None = None,
):
    ui = get_ui(ctx_name)
    limit = PROJECTS_PAGE_SIZE

    # TODO: flt = get_projects_filter(context, ctx_name=ctx_name) if context is not None else None
    # TODO: summary = format_projects_filter_summary(flt) if context is not None else None

    page_data = fetch_projects_page(
        project_service=project_service,
        offset=offset,
        limit=limit,
        requester_messenger_id=peer.id,
        # TODO: filters=flt,
    )

    _send_project_list_screen(
        peer,
        ui=ui,
        ctx_name=ctx_name,
        offset=offset,
        limit=limit,
        page_data=page_data,
        note=note,
        # TODO: filter_summary=summary,
    )


def send_project_card(
    peer,
    *,
    project_id: str,
    project_service: ProjectService,
    user: UserSchema | None,
    ctx_name: str,
    note: str | None = None,
):
    ui = get_ui(ctx_name)

    card: ProjectCardSchema | None = project_service.get_project_by_id(project_id, peer.id)
    if card is None:
        text = f"{note}\n\n⚠️ Проект не найден." if note else "⚠️ Проект не найден."
        bot.messaging.send_message(
            peer=peer,
            text=text,
            interactive_media_groups=build_back_keyboard(ctx_name),
        )
        return

    can_edit = card.allowed_actions.can_edit if card.allowed_actions else False

    show_active = ctx_name == "my_projects"

    base = format_project_details(
        show_organizer=False,
        show_active=show_active,
        card=card,
        make_frame=False,
    )
    text = f"{note}\n\n{base}" if note else base

    bot.messaging.send_message(
        peer=peer,
        text=text,
        interactive_media_groups=project_actions_keyboard(
            project_id=project_id,
            can_edit=can_edit,
            back_value=ui.back_value,
            back_label=ui.back_label,
            ctx=ctx_name,
        ),
    )

--- core/handlers/project_pages.py ---
from core.schemas import ProjectCardsPageSchema
from core.services import ProjectService

PROJECTS_PAGE_SIZE = 10


def fetch_projects_page(
    *,
    project_service: ProjectService,
    offset: int,
    limit: int,
    requester_messenger_id: str,
    # TODO: filters: ProjectFilter | None = None,
    mine: bool = False,
) -> ProjectCardsPageSchema:
    # TODO: flt = filters or ProjectFilter()
    page = offset // limit + 1

    project_cards, current_page, total_pages = project_service.get_project_cards_page(
        requester_messenger_id=requester_messenger_id,
        page=page,
        limit=limit,
        mine=mine,
        # TODO: title_str=flt.title_str,
        # TODO: terbank_id=flt.terbank_id,
    )

    # Возвращаем только карточки
    return ProjectCardsPageSchema(
        project_cards=project_cards,
        current_page=current_page,
        total_pages=total_pages,
    )

--- core/markups/pagination.py ---
from __future__ import annotations

from typing import cast

from dialog_bot_sdk.interactive_media import Button, InteractiveMediaGroup, MediaGroupBuilder


def pagination_keyboard(  # legacy
    *,
    offset: int,
    limit: int,
    has_prev: bool,
    has_next: bool,
    prev_media_id: str,
    next_media_id: str,
    prev_value: str | None = None,
    next_value: str | None = None,
    prev_label: str = "⬅️ Назад",
    next_label: str = "➡️ Вперёд",
    include_leave: bool = True,
    leave_media_id: str | None = None,
    leave_value: str | None = "",
    leave_label: str | None = "⬅️ Назад",
    keep_layout: bool = False,
    placeholder_media_id: str = "noop",
    placeholder_value: str = "noop",
    placeholder_label: str = " ",
    hide_when_all_placeholders: bool = False,
    add_view_by_uuid_button: bool = False,
    view_by_uuid_media_id: str = "view_by_uuid",
    view_by_uuid_value: str = "events",
    view_by_uuid_label: str = "🔎 По UUID",
) -> list[InteractiveMediaGroup]:
    """
    унифицированная пагинация.

    keep_layout=True:
      если add_view_by_uuid_button=True:
        - если include_leave=True: 4 слота [prev|ph] [leave|ph] [uuid] [next|ph]
        - если include_leave=False: 3 слота [prev|ph] [uuid] [next|ph]
      если add_view_by_uuid_button=False:
        - если include_leave=True: 3 слота [prev|ph] [leave|ph] [next|ph]
        - если include_leave=False: 2 слота [prev|ph] [next|ph]

    hide_when_all_placeholders=True: если из кнопок только ph - то возвращаем пустой лист.
    """

    if leave_value is None:
        leave_value = ""
    if leave_label is None:
        leave_label = "⬅️ Назад"

    def _btn(media_id: str, value: str, label: str) -> Button:  # legacy
        return Button(media_id=media_id, value=value, label=label)

    def _ph(slot: str) -> Button:  # legacy
        return _btn(
            placeholder_media_id + "_" + slot,
            placeholder_value,
            placeholder_label,
        )

    def _prev() -> Button:  # legacy
        value = prev_value if prev_value is not None else str(max(offset - limit, 0))
        return _btn(prev_media_id, value, prev_label)

    def _next() -> Button:  # legacy
        value = next_value if next_value is not None else str(offset + limit)
        return _btn(next_media_id, value, next_label)

    def _view_by_uuid() -> Button:  # legacy
        return _btn(
            view_by_uuid_media_id,
            str(view_by_uuid_value),
            str(view_by_uuid_label),
        )

    def _leave() -> Button | None:  # legacy
        if not include_leave or leave_media_id is None:
            return None
        return _btn(leave_media_id, leave_value, leave_label)

    # --- 3-slot layout for include_leave
    if include_leave and keep_layout:
        buttons: list[Button] = []
        leave_btn = _leave()

        buttons.append(_prev() if has_prev else _ph("prev"))
        buttons.append(leave_btn if leave_btn is not None else _ph("mid"))

        # --- 4-slot: [prev] [leave] [uuid] [next]
        if add_view_by_uuid_button:
            buttons.append(_view_by_uuid())

        buttons.append(_next() if has_next else _ph("next"))

        if hide_when_all_placeholders and all(
            str(getattr(b, "media_id", "")).startswith(placeholder_media_id + "_") for b in buttons
        ):
            return []
        return cast(list[InteractiveMediaGroup], MediaGroupBuilder(buttons).build())

    # --- default linear layout (как было)
    buttons = []
    if has_prev:
        buttons.append(_prev())
    elif keep_layout:
        buttons.append(_ph("prev"))

    leave_btn = _leave()
    if leave_btn is not None:
        buttons.append(leave_btn)

    if add_view_by_uuid_button:
        buttons.append(_view_by_uuid())

    if has_next:
        buttons.append(_next())
    elif keep_layout:
        buttons.append(_ph("next"))

    if hide_when_all_placeholders and all(
        str(getattr(b, "media_id", "")).startswith(placeholder_media_id + "_") for b in buttons
    ):
        return []

    return cast(list[InteractiveMediaGroup], MediaGroupBuilder(buttons).build() if buttons else [])

--- core/markups/events_filters.py ---
from dialog_bot_sdk.interactive_media import Button, InteractiveMediaGroup, MediaGroupBuilder

from core.utils import EventsMode


def events_filters_button_keyboard(
    *, ctx_name: str, offset: int
) -> list[InteractiveMediaGroup]:  # legacy
    return MediaGroupBuilder(
        [Button(media_id="events_filters_open", value=f"{ctx_name}|{offset}", label="🧰 Фильтры")]
    ).build()


def events_filters_menu_keyboard(
    *, ctx_name: str, offset: int
) -> list[InteractiveMediaGroup]:  # legacy
    # value всегда "ctx|offset" чтобы назад возвращаться туда же
    v = f"{ctx_name}|{offset}"
    top = MediaGroupBuilder(
        [
            Button(media_id="events_filters_tags", value=v, label="🏷️ Теги"),
        ]
    ).build()

    if ctx_name == "part_events":
        mid = MediaGroupBuilder(
            [Button(media_id="events_filters_date", value=v, label="🗓️ Дата/время")]
        ).build()
    else:
        mid = MediaGroupBuilder(
            [
                Button(media_id="events_filters_date", value=v, label="🗓️ Дата/время"),
                Button(media_id="events_filters_mode", value=v, label="🎛️ Режим"),
            ]
        ).build()

    bottom = MediaGroupBuilder(
        [
            Button(media_id="events_filters_apply", value=v, label="✅ Применить"),
            Button(media_id="events_filters_reset", value=v, label="♻️ Сбросить"),
            Button(media_id="events_filters_back", value=v, label="⬅️ Назад"),
        ]
    ).build()
    return top + mid + bottom


def events_filters_mode_keyboard(  # legacy
    *,
    ctx_name: str,
    back_offset: int,
    selected_mode: EventsMode,
    locked_mode: EventsMode | None,
) -> list[InteractiveMediaGroup]:
    groups: list[InteractiveMediaGroup] = []

    def _btn(label: str, value: str, marked: bool) -> list[InteractiveMediaGroup]:  # legacy
        mark = "☑️ " if marked else ""
        return MediaGroupBuilder(
            [Button(media_id="events_filters_mode_select", value=value, label=mark + label)]
        ).build()

    # если locked_mode - показываем эффективный режим вкладки
    eff_mode: EventsMode = locked_mode if locked_mode else selected_mode

    groups += _btn("Все", "set:all", eff_mode == "all")
    groups += _btn("Я участвую", "set:participant", eff_mode == "participant")
    groups += _btn("Я организатор", "set:organizer", eff_mode == "organizer")

    groups += MediaGroupBuilder(
        [
            Button(
                media_id="events_filters_mode_select",
                value=f"done:{ctx_name}|{back_offset}",
                label="✅ Готово",
            )
        ]
    ).build()
    groups += MediaGroupBuilder(
        [
            Button(
                media_id="events_filters_mode_select",
                value=f"back:{ctx_name}|{back_offset}",
                label="⬅️ Назад",
            )
        ]
    ).build()
    return groups

--- core/handlers/event.py ---
from dialog_bot_sdk.entities.messaging import UpdateInteractiveMediaEvent, UpdateMessage

from core.bot_kit.fsm import FSMContext, State, StatesGroup
from core.bot_kit.router import Router
from core.config import bot
from core.handlers.events_ui import (
    build_back_to_event_keyboard,
    send_event_card,
    send_event_cards_page,
    send_part_events_page,
    split_event_value,
)
from core.schemas import UserSchema
from core.services import EventService, ReportService
from core.utils import (
    clear_context_keep_events_filters,
    delete_prev_message,
    delete_prev_message_by_peer,
)

user_events_rt = Router()


class UserEventEnterCodeState(StatesGroup):  # legacy
    wait_code = State()


@bot.di
def events_menu_handler(  # legacy
    event: UpdateInteractiveMediaEvent, context: FSMContext, event_service: EventService
):
    delete_prev_message_by_peer(bot, event.peer)
    clear_context_keep_events_filters(context)

    send_event_cards_page(
        event.peer,
        offset=0,
        event_service=event_service,
        ctx_name="events",
        context=context,
    )


@bot.di
def user_events_page_handler(  # legacy
    event: UpdateInteractiveMediaEvent, context: FSMContext, event_service: EventService
):
    delete_prev_message_by_peer(bot, event.peer)

    try:
        offset = int(event.data.value)
        if offset < 0:
            offset = 0
    except Exception:
        offset = 0

    send_event_cards_page(
        event.peer,
        offset=offset,
        event_service=event_service,
        ctx_name="events",
        context=context,
    )


@bot.di
def part_events_menu_handler(  # legacy
    event: UpdateInteractiveMediaEvent, context: FSMContext, event_service: EventService
):
    delete_prev_message_by_peer(bot, event.peer)
    clear_context_keep_events_filters(context)

    send_part_events_page(
        event.peer,
        offset=0,
        event_service=event_service,
        context=context,
    )


@bot.di
def part_events_page_handler(  # legacy
    event: UpdateInteractiveMediaEvent, context: FSMContext, event_service: EventService
):
    delete_prev_message_by_peer(bot, event.peer)

    try:
        offset = int(event.data.value)
        if offset < 0:
            offset = 0
    except Exception:
        offset = 0

    send_part_events_page(
        event.peer,
        offset=offset,
        event_service=event_service,
        context=context,
    )


@bot.di
def user_events_open_handler(  # legacy
    event: UpdateInteractiveMediaEvent,
    context: FSMContext,
    event_service: EventService,
    report_service: ReportService,
    user: UserSchema | None,
):
    delete_prev_message_by_peer(bot, event.peer)
    context.set_state(None)

    event_id, ctx = split_event_value(event.data.value)
    ctx_name = ctx or "events"

    send_event_card(
        event.peer,
        event_id=event_id,
        event_service=event_service,
        user=user,
        ctx_name=ctx_name,
        report_service=report_service,
    )


@bot.di
def event_user_sign_up_handler(
    event: UpdateInteractiveMediaEvent,
    context: FSMContext,
    event_service: EventService,
    report_service: ReportService,
    user: UserSchema | None,
):
    delete_prev_message_by_peer(bot, event.peer)

    event_id, ctx = split_event_value(event.data.value)
    ctx_name = ctx or "events"

    if ctx_name != "ai_dialog_event":
        context.set_state(None)

    error = event_service.sign_up(event_id, event.peer.id)

    errors = {
        "INVALID_PARTICIPANTS_COUNT": "нет доступных мест.",
        "EVENT_ALREADY_ARCHIVED": "мероприятие уже прошло.",
        "ALREADY_PARTICIPATING": "ты уже записан на это мероприятие.",
        "EVENT_NOT_ACTIVE": "мероприятие не активировано.",
    }

    send_event_card(
        event.peer,
        event_id=event_id,
        event_service=event_service,
        report_service=report_service,
        user=user,
        ctx_name=ctx_name,
        note="✅ Запись на мероприятие прошла успешно!"
        if error is None
        else "⚠️ Не удалось записаться на мероприятие: " + errors.get(error, error),
    )


@bot.di
def event_user_sign_out_handler(
    event: UpdateInteractiveMediaEvent,
    context: FSMContext,
    event_service: EventService,
    report_service: ReportService,
    user: UserSchema | None,
):
    delete_prev_message_by_peer(bot, event.peer)

    event_id, ctx = split_event_value(event.data.value)
    ctx_name = ctx or "events"

    if ctx_name != "ai_dialog_event":
        context.set_state(None)

    status = event_service.get_participation_status(event_id, event.peer.id)
    if status is None:
        note = "⚠️ Не удалось отписаться: ты не записан на это мероприятие."
    elif status.state == "confirmed":
        note = "⚠️ Нельзя отписаться: часы уже получены за это мероприятие."
    else:
        ok = event_service.sign_out_by_event(event_id, event.peer.id)
        note = "✅ Ты успешно отписался от мероприятия." if ok else "⚠️ Не удалось отписаться."

    send_event_card(
        event.peer,
        event_id=event_id,
        event_service=event_service,
        report_service=report_service,
        user=user,
        ctx_name=ctx_name,
        note=note,
    )


@bot.di
def event_user_enter_code_start_handler(
    event: UpdateInteractiveMediaEvent, context: FSMContext
):  # legacy
    delete_prev_message_by_peer(bot, event.peer)

    event_id, ctx = split_event_value(event.data.value)
    ctx_name = ctx or "events"
    context.update_data({"current_event_id": event_id, "current_event_ctx": ctx_name})
    context.set_state(UserEventEnterCodeState.wait_code)

    bot.messaging.send_message(
        peer=event.peer,
        text="Введи бонус-код мероприятия (без пробелов):",
        interactive_media_groups=build_back_to_event_keyboard(event_id, ctx_name),
    )


@user_events_rt.message(state=UserEventEnterCodeState.wait_code)
@bot.di
def event_user_enter_code_step(
    message: UpdateMessage,
    context: FSMContext,
    event_service: EventService,
    report_service: ReportService,
    user: UserSchema | None,
):
    delete_prev_message(bot, message)
    data = context.get_data()

    ctx_name = data.get("current_event_ctx") or "events"
    # TODO may be a source of bugs. do we really need to deafult context?

    event_id = str(data.get("current_event_id"))
    # если event_id is None, то клиент не найдёт event с id == "None", и всё ок
    if event_id is None:
        note = "⚠️ Код не принят: не удалось найти мероприятие."
    else:
        code = message.message.text_message.text.strip()

        note: str
        status = event_service.get_participation_status(event_id, message.peer.id)

        if status is None:
            note = "⚠️ Код не принят: сначала нужно записаться на мероприятие."
        elif status.state == "confirmed":
            note = "⚠️ Код не принят: часы уже получены."
        else:
            result = event_service.enter_code_by_event(event_id, message.peer.id, code)
            if not result:
                note = "⚠️ Код не принят: проверь правильность кода."
            else:
                note = "✅ Код принят. Волонтёрские часы начислены."

    context.set_state(None)

    send_event_card(
        message.peer,
        event_id=event_id,
        event_service=event_service,
        report_service=report_service,
        user=user,
        ctx_name=ctx_name,
        note=note,
    )

--- core/handlers/events_pages.py ---
from core.schemas import EventCardsPageSchema
from core.services import EventService
from core.utils import EventsFilter

EVENTS_PAGE_SIZE = 10


def fetch_events_page(
    *,
    event_service: EventService,
    offset: int,
    limit: int,
    requester_messenger_id: str,
    filters: EventsFilter | None = None,
    mine: bool = False,
) -> EventCardsPageSchema:
    flt = filters or EventsFilter()
    page = offset // limit + 1
    tag_ids = list(flt.tag_ids or [])

    # Получаем карточки
    event_cards, current_page, total_pages = event_service.get_event_cards_page(
        requester_messenger_id=requester_messenger_id,
        page=page,
        limit=limit,
        tag_ids=tag_ids or None,
        mine=mine,
    )

    # Возвращаем только карточки
    return EventCardsPageSchema(
        event_cards=event_cards,
        current_page=current_page,
        total_pages=total_pages,
    )

--- core/handlers/events_ui.py ---
from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

from dialog_bot_sdk.interactive_media import Button, InteractiveMediaGroup, MediaGroupBuilder

from core.bot_kit.fsm import FSMContext
from core.config import bot
from core.handlers.events_pages import EVENTS_PAGE_SIZE, fetch_events_page
from core.markups import (
    all_events_pagination_keyboard,
    event_actions_keyboard,
    events_filters_button_keyboard,
    events_select_keyboard,
    format_event_details,
    my_events_pagination_keyboard,
    part_events_pagination_keyboard,
    points_events_pagination_keyboard,
    points_menu_keyboard,
    user_events_pagination_keyboard,
)
from core.schemas import EventCardsPageSchema, UserSchema
from core.services import EventService, ReportService
from core.utils import format_events_filter_summary, get_events_filter, logger

if TYPE_CHECKING:
    from core.schemas.event import EventCardSchema


@dataclass(frozen=True)
class EventUIContext:  # legacy
    name: str
    # list
    list_open_media_id: str
    pagination_builder: Callable[..., list[InteractiveMediaGroup]]
    extra_groups_builder: Callable[[], list[InteractiveMediaGroup]] | None
    # card
    back_value: str | None
    back_label: str
    enter_code_media_id: str
    sign_up_media_id: str
    sign_out_media_id: str


_UI: dict[str, EventUIContext] = {
    # дом волонтёра -> все мероприятия
    "events": EventUIContext(  # legacy
        name="events",
        list_open_media_id="user_events_open",
        pagination_builder=user_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value="events",
        back_label="⬅️ К списку мероприятий",
        enter_code_media_id="event_user_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # дом волонтёра -> мои меропрития
    "part_events": EventUIContext(  # legacy
        name="part_events",
        list_open_media_id="user_events_open",
        pagination_builder=part_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value="part_events",
        back_label="⬅️ К списку мероприятий",
        enter_code_media_id="event_user_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # дом волонтёра -> посмотреть по uuid
    "volunteer_home": EventUIContext(  # legacy
        name="volunteer_home",
        list_open_media_id="user_events_open",  # не используется в этом контексте
        pagination_builder=user_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value="volunteer_home",
        back_label="⬅️ В дом волонтёра",
        enter_code_media_id="event_user_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # дом модерации -> все мероприятия
    "moderation": EventUIContext(  # legacy
        name="moderation",
        list_open_media_id="all_events_open",
        pagination_builder=all_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value="all_events",
        back_label="⬅️ К списку мероприятий",
        enter_code_media_id="event_menu_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # меню баллов -> мероприятия с начисленными баллами
    "points": EventUIContext(  # legacy
        name="points",
        list_open_media_id="points_event_open",
        pagination_builder=points_events_pagination_keyboard,
        extra_groups_builder=points_menu_keyboard,  # лидерборд + назад
        back_value="points",
        back_label="⬅️ К баллам",
        enter_code_media_id="event_user_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # дом модерации -> посмотреть по uuid
    "moderation_menu": EventUIContext(  # legacy
        name="moderation_menu",
        list_open_media_id="all_events_open",  # не используется в этом контексте
        pagination_builder=all_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value="moderation",
        back_label="⬅️ В мастерскую",
        enter_code_media_id="event_menu_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # дом модерации -> мои мероприятия
    "my_events": EventUIContext(  # legacy
        name="my_events",
        list_open_media_id="my_events_open",
        pagination_builder=my_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value="my_events",
        back_label="⬅️ К моим мероприятиям",
        enter_code_media_id="event_menu_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
    # ответ нейронки -> карточка мероприятия
    "ai_dialog_event": EventUIContext(  # legacy
        name="ai_dialog_event",
        list_open_media_id="user_events_open",
        pagination_builder=user_events_pagination_keyboard,
        extra_groups_builder=None,
        back_value=None,
        back_label="",
        enter_code_media_id="event_user_enter_code",
        sign_up_media_id="event_user_sign_up",
        sign_out_media_id="event_user_sign_out",
    ),
}


def get_ui(ctx_name: str) -> EventUIContext:  # legacy
    return _UI.get(ctx_name, _UI["events"])


def build_back_keyboard(ctx_name: str) -> list[InteractiveMediaGroup]:
    ui = get_ui(ctx_name)
    if not ui.back_value:
        return []
    return MediaGroupBuilder(
        [Button(media_id=ui.back_value, value="", label=ui.back_label)]
    ).build()


def split_event_value(raw: Any) -> tuple[str, str | None]:  # legacy
    """
    ожидаем:
      - "<event_id>"
      - "<event_id>|<ctx>"
    """
    s = str(raw or "").strip()
    if "|" in s:
        ev_id, ctx = s.split("|", 1)
        return ev_id.strip(), (ctx.strip() or None)
    return s, None


def _send_events_list_screen(  # legacy
    peer,
    *,
    ui: EventUIContext,
    ctx_name: str,
    offset: int,
    limit: int,
    page_data: EventCardsPageSchema,
    note: str | None = None,
    header: str | None = None,
    empty_text: str | None = None,
    empty_groups: list[InteractiveMediaGroup] | None = None,
    filter_summary: str | None = None,
    show_active: bool = False,
):
    # ИСПРАВЛЕНО: используем event_cards вместо events
    event_cards = page_data.event_cards  # list[EventCardSchema]
    has_prev = page_data.has_prev
    has_next = page_data.has_next

    prefix = f"{note}\n\n" if note else ""
    header_part = (header if header is not None else "📅 Мероприятия") + "\n\n"
    filter_part = f"\n\n{filter_summary}" if filter_summary else ""

    if not event_cards:  # проверяем карточки
        text = (
            prefix
            + header_part
            + (
                empty_text
                or "⚠️ Мероприятий не найдено.\n\nПопробуй изменить критерии или вернись назад."
            )
            + filter_part
        )
        im: list[InteractiveMediaGroup] = []
        footer: list[InteractiveMediaGroup] = []
        if empty_groups is not None:
            footer = empty_groups
        elif ui.extra_groups_builder:
            footer = ui.extra_groups_builder()
        im += events_filters_button_keyboard(ctx_name=ctx_name, offset=offset)
        im += ui.pagination_builder(
            offset=offset,
            limit=limit,
            has_prev=has_prev,
            has_next=has_next,
        )
        im += footer
        bot.messaging.send_message(peer=peer, text=text, interactive_media_groups=im)
        return

    text = (
        prefix
        + header_part
        + f"Страница: {page_data.current_page} / {page_data.total_pages}\n\n"
        + "Выбери мероприятие:"
        + filter_part
    )

    im: list[InteractiveMediaGroup] = []
    im += events_select_keyboard(
        event_cards,
        ctx=ctx_name,
        open_media_id=ui.list_open_media_id,
        start_index=offset + 1,
        per_row=1,
    )
    im += events_filters_button_keyboard(ctx_name=ctx_name, offset=offset)
    im += ui.pagination_builder(offset=offset, limit=limit, has_prev=has_prev, has_next=has_next)

    if ui.extra_groups_builder:
        im += ui.extra_groups_builder()

    bot.messaging.send_message(peer=peer, text=text, interactive_media_groups=im)


def send_event_cards_page(
    peer,
    *,
    offset: int,
    event_service: EventService,
    ctx_name: str,
    note: str | None = None,
    context: FSMContext | None = None,
):
    ui = get_ui(ctx_name)
    limit = EVENTS_PAGE_SIZE

    flt = get_events_filter(context, ctx_name=ctx_name) if context is not None else None
    summary = format_events_filter_summary(flt) if context is not None else None

    page_data = fetch_events_page(
        event_service=event_service,
        offset=offset,
        limit=limit,
        requester_messenger_id=peer.id,
        filters=flt,
    )

    _send_events_list_screen(
        peer,
        ui=ui,
        ctx_name=ctx_name,
        offset=offset,
        limit=limit,
        page_data=page_data,
        note=note,
        filter_summary=summary,
    )


def send_part_events_page(  # legacy
    peer,
    *,
    offset: int,
    event_service: EventService,
    context: FSMContext | None = None,
    note: str | None = None,
):
    bot.messaging.send_message(
        peer=peer,
        text=("f{note}\n\n" if note else "Мои мероприятия\n\nПока не реализовано"),
        interactive_media_groups=build_back_keyboard("part_events"),
    )


def send_points_page(  # legacy
    peer,
    *,
    offset: int,
    event_service: EventService,
    context: FSMContext | None = None,
):
    bot.messaging.send_message(
        peer=peer,
        text=("Меорприятия за которые я получил баллы\n\nПока не реализовано"),
        interactive_media_groups=build_back_keyboard("part_events"),
    )


def send_my_events_page(  # legacy
    peer,
    *,
    offset: int,
    event_service: EventService,
    requester_messenger_id: str,
    ctx_name: str,
    note: str | None = None,
    context: FSMContext | None = None,
):
    ui = get_ui(ctx_name)
    limit = EVENTS_PAGE_SIZE

    flt = get_events_filter(context, ctx_name=ctx_name) if context is not None else None
    summary = format_events_filter_summary(flt) if context is not None else None

    page_data = fetch_events_page(
        event_service=event_service,
        offset=offset,
        limit=limit,
        requester_messenger_id=peer.id,
        filters=flt,
        mine=True,
    )

    _send_events_list_screen(
        peer,
        ui=ui,
        ctx_name=ctx_name,
        offset=offset,
        limit=limit,
        page_data=page_data,
        note=note,
        header="👤 Мои мероприятия",
        filter_summary=summary,
        show_active=True,
    )


def send_event_card(
    peer,
    *,
    event_id: str,
    event_service: EventService,
    report_service: ReportService | None = None,
    user: UserSchema | None,
    ctx_name: str,
    note: str | None = None,
    show_organizer: bool = True,
):
    ui = get_ui(ctx_name)

    card: EventCardSchema | None = event_service.get_event_by_id(event_id, peer.id)
    if card is None:
        text = f"{note}\n\n⚠️ Мероприятие не найдено." if note else "⚠️ Мероприятие не найдено."
        bot.messaging.send_message(
            peer=peer,
            text=text,
            interactive_media_groups=build_back_keyboard(ctx_name),
        )
        return

    # TODO: 1. Убрать проверку права редактирования в сервис
    can_edit = card.allowed_actions.can_edit if card.allowed_actions else False
    is_participant = card.participation is not None
    show_code = card.allowed_actions.can_view_code if card.allowed_actions else False
    # TODO: 2. Отдельная проверка на право доступа к отчёту
    can_access_reports = can_edit

    can_sign_up = card.allowed_actions.can_sign_up if card.allowed_actions else False

    logger.debug(f"""
        for event with id {event_id}:
        can_sign_up = {can_sign_up}
            card.event.participation_count < card.event.participation_limit is {card.event.participation_count < card.event.participation_limit}
            card.event.archived_at is not None is {card.event.archived_at is not None}
            not card.event.deleted is {not card.event.deleted}
            card.event.active is {card.event.active}
                 """)

    report_exists: bool | None = None
    if can_access_reports and report_service is not None:
        report_exists = report_service.report_exists(
            event_id=str(card.event.id),
            requester_messenger_id=peer.id,
        )
    else:
        report_exists = None

    # TODO: 3 Separete permissions
    can_create_report = bool(can_edit and (report_exists is False))
    can_view_report = bool(can_access_reports and (report_exists is True))

    show_active = ctx_name == "my_events"

    base = format_event_details(
        card,
        show_active=show_active,
        show_code=show_code,
        show_organizer=show_organizer,
        show_report_status=bool(can_access_reports and report_exists is not None),
        report_exists=bool(report_exists) if report_exists is not None else None,
    )
    text = f"{note}\n\n{base}" if note else base

    bot.messaging.send_message(
        peer=peer,
        text=text,
        interactive_media_groups=event_actions_keyboard(
            card.event.id,
            is_participant=is_participant,
            can_edit=can_edit,
            back_value=ui.back_value,
            back_label=ui.back_label,
            enter_code_media_id=ui.enter_code_media_id,
            sign_up_media_id=ui.sign_up_media_id,
            sign_out_media_id=ui.sign_out_media_id,
            ctx=ui.name,  # важно: чтобы value был "<event_id>|<ctx>"
            can_create_report=can_create_report,
            can_view_report=can_view_report,
            can_sign_up=can_sign_up,
        ),
    )


# TODO перенести клавиатуру туда где ей место
def build_back_to_event_keyboard(
    event_id: str, ctx_name: str
) -> list[InteractiveMediaGroup]:  # legacy
    ui = get_ui(ctx_name)
    return MediaGroupBuilder(
        [
            Button(
                media_id=ui.list_open_media_id,
                value=f"{event_id}|{ui.name}",
                label="⬅️ К мероприятию",
            )
        ]
    ).build()


# TODO перенести клавиатуру туда где ей место
def build_back_to_event_editing_keyboard(
    event_id: str, ctx_name: str
) -> list[InteractiveMediaGroup]:  # legacy
    return MediaGroupBuilder(
        [
            Button(
                media_id="event_menu_edit",
                value=f"{event_id}|{ctx_name}",
                label="⬅️ К редактированию мероприятия",
            )
        ]
    ).build()

Editor is loading...
Leave a Comment