Untitled

 avatar
4ae4d
plain_text
4 months ago
38 kB
18
Indexable
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai
 > podman compose logs -f --tail=300 pomogai_backend
>>>> Executing external compose provider "/usr/bin/docker-compose". Please see podman-compose(1) for how to disable this message. <<<<

ERROR: 'network_mode' and 'networks' cannot be combined
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  1
 > podman compose-vuild logs -f --tail=300 pomogai_backend
Error: unknown shorthand flag: 'f' in -f
See 'podman --help'
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  125
 > podman compose-build logs -f --tail=300 pomogai_backend
Error: unknown shorthand flag: 'f' in -f
See 'podman --help'
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  125
 > podman compose up logs -f --tail=300 pomogai_backend
>>>> Executing external compose provider "/usr/bin/docker-compose". Please see podman-compose(1) for how to disable this message. <<<<

ERROR: 'network_mode' and 'networks' cannot be combined
Error: executing /usr/bin/docker-compose up logs -f --tail=300 pomogai_backend: exit status 1
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  1
 > podman compose-up logs -f --tail=300 pomogai_backend
Error: unknown shorthand flag: 'f' in -f
See 'podman --help'
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  125
 > docker compose logs -f --tail=300 pomogai_backend
Emulate Docker CLI using podman. Create /etc/containers/nodocker to quiet msg.
>>>> Executing external compose provider "/usr/bin/docker-compose". Please see podman-compose(1) for how to disable this message. <<<<

ERROR: 'network_mode' and 'networks' cannot be combined
Error: executing /usr/bin/docker-compose logs -f --tail=300 pomogai_backend: exit status 1
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  1
 > podman-compose logs -f tail=300 pomogai_backend
WARNING:podman_compose:missing services [tail=300,pomogai_backend]
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  1
 > podman logs -f --tail=300 pomogai_backend
Error: no container with name or ID "pomogai_backend" found: no such container
 [email protected]  URB-WSN-0007608  /home/work/[email protected]/pomogai  125
 > journalctl -u pomogai-backend -n 300 -f
Hint: You are currently not seeing messages from other users and the system.
      Users in groups 'adm', 'systemd-journal' can see all messages.
      Pass -q to turn off this notice.
No journal files were opened due to insufficient permissions.





f=src/backend/app/utils/errors.py
f=src/backend/app/utils/logger.py
f=src/backend/app/filters/__init__.py
f=src/backend/app/filters/filters/tags.py
f=src/backend/app/filters/filters/participant.py
f=src/backend/app/filters/filters/report.py
f=src/backend/app/filters/filters/organizer.py
f=src/backend/app/db/models/event_report.py
f=src/backend/app/db/models/user.py
f=src/backend/app/schemas/event.py
f=src/backend/app/main.py
f=src/backend/app/api/__init__.py
f=src/backend/app/repositories/event.py
f=src/backend/app/services/event.py
f=src/backend/app/api/v1/event.py
Error: 'src/backend/app/utils/errors.py' is not a valid file
--- src/backend/app/utils/logger.py ---
import logging
from logging.handlers import RotatingFileHandler
from pathlib import Path

LOG_FILE = Path("backend.log")

LOG_FORMAT = "[%(asctime)s] [%(levelname)s] %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

logger = logging.getLogger("backend")

logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT)

file_handler = RotatingFileHandler(
    LOG_FILE,
    maxBytes=5_000_000,  # 5 MB
    backupCount=3,
    encoding="utf-8",
)
file_handler.setFormatter(formatter)

if not logger.hasHandlers():
    logger.addHandler(file_handler)
    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

logger.propagate = False

--- src/backend/app/filters/__init__.py ---
from .base import FilterContext, QueryFilter
from .builder import FilterBuilder
from .specification import FilterSpecification

__all__ = [
    "FilterContext",
    "QueryFilter",
    "FilterBuilder",
    "FilterSpecification",
]

--- src/backend/app/filters/filters/tags.py ---
from __future__ import annotations

from dataclasses import dataclass, field
from uuid import UUID

from sqlalchemy import exists, select
from sqlalchemy.sql import Select

from ...db.models import AppTagEvent, Event
from ..base import FilterContext, QueryFilter


@dataclass(slots=True)
class TagsEventsFilter(QueryFilter):
    """tag_ids (ANY): события, у которых есть хотя бы один AppTag из списка"""

    enabled: bool = True
    tag_ids: list[UUID] = field(default_factory=list)

    def is_enabled(self) -> bool:
        return bool(self.tag_ids)

    def apply(self, query: Select, ctx: FilterContext) -> Select:
        subq = (
            select(1)
            .select_from(AppTagEvent)
            .where(
                AppTagEvent.event_id == Event.id,
                AppTagEvent.app_tag_id.in_(self.tag_ids),
            )
        )

        return query.where(exists(subq))

--- src/backend/app/filters/filters/participant.py ---
from __future__ import annotations

from dataclasses import dataclass

from sqlalchemy import exists, select
from sqlalchemy.sql import Select

from ...db.models import Event, Participation
from ..base import FilterContext, QueryFilter


@dataclass(slots=True)
class ParticipantEventsFilter(QueryFilter):
    """only_participant: события, где requester является участником"""

    enabled: bool = False

    def is_enabled(self) -> bool:
        return bool(self.enabled)

    def apply(self, query: Select, ctx: FilterContext) -> Select:
        requester_user_id = ctx.get("requester_user_id")
        if requester_user_id is None:
            return query

        subq = (
            select(1)
            .select_from(Participation)
            .where(
                Participation.user_id == requester_user_id,
                Participation.event_id == Event.id,
            )
        )
        return query.where(exists(subq))

--- src/backend/app/filters/filters/report.py ---
from __future__ import annotations

from dataclasses import dataclass

from sqlalchemy import exists, select
from sqlalchemy.sql import Select

from ...db.models import Event, EventReport
from ..base import FilterContext, QueryFilter


@dataclass(slots=True)
class ReportEventsFilter(QueryFilter):
    """
    has_report:
      None  -> не фильтруем
      True  -> только архив (есть EventReport)
      False -> только активные (нет EventReport)
    """

    has_report: bool | None = None

    def is_enabled(self) -> bool:
        return self.has_report is not None

    def apply(self, query: Select, ctx: FilterContext) -> Select:
        subq = select(1).select_from(EventReport).where(EventReport.event_id == Event.id)
        cond = exists(subq)
        if self.has_report is True:
            return query.where(cond)
        return query.where(~cond)

--- src/backend/app/filters/filters/organizer.py ---
from __future__ import annotations

from dataclasses import dataclass

from sqlalchemy.sql import Select

from ...db.models import Event
from ..base import FilterContext, QueryFilter


@dataclass(slots=True)
class OrganizerEventsFilter(QueryFilter):
    """only_organizer: события, где requester является creator"""

    enabled: bool = False

    def is_enabled(self) -> bool:
        return bool(self.enabled)

    def apply(self, query: Select, ctx: FilterContext) -> Select:
        requester_user_id = ctx.get("requester_user_id")
        if requester_user_id is None:
            return query

        return query.where(Event.creator_id == requester_user_id)

--- src/backend/app/db/models/event_report.py ---
from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from sqlalchemy import ForeignKey, Integer, Text, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model

if TYPE_CHECKING:
    from .event import Event


class EventReport(Model):
    __tablename__ = "event_reports"

    event_id: Mapped[UUID] = mapped_column(
        ForeignKey("events.id", ondelete="CASCADE"),
        nullable=False,
        unique=True,
    )
    how_it_went: Mapped[str] = mapped_column(Text, nullable=False)
    # можно пока оставить nullable, чтобы не ломать демо/мастер
    beneficiaries_count: Mapped[int | None] = mapped_column(Integer, nullable=True)
    participants_count_snapshot: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    event: Mapped[Event] = relationship(back_populates="report", uselist=False)
    __table_args__ = (UniqueConstraint("event_id", name="uix_event_reports_event_id"),)

--- src/backend/app/db/models/user.py ---
from typing import TYPE_CHECKING
from uuid import UUID

from sqlalchemy import BigInteger, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model
from .enums import RoleType

if TYPE_CHECKING:
    from .city import City
    from .custom_tag import CustomTag
    from .event import Event
    from .participation import Participation
    from .user_tag import UserTag


class User(Model):
    __tablename__ = "users"

    firstname: Mapped[str] = mapped_column(String(100), nullable=True)
    messenger_id: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False)
    employee_number: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False)
    role: Mapped[RoleType] = mapped_column(Text, default=RoleType.user.value)
    points: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    city_id: Mapped[UUID] = mapped_column(ForeignKey("cities.id"), nullable=True)

    # Relationships
    city: Mapped["City"] = relationship(back_populates="users")
    custom_tags: Mapped[list["CustomTag"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    user_tags: Mapped[list["UserTag"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    participations: Mapped[list["Participation"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    events: Mapped[list["Event"]] = relationship(
        back_populates="creator", cascade="all, delete-orphan", uselist=True
    )

    def to_dict(self) -> dict:
        return {
            "id": str(self.id),
            "username": getattr(
                self, "username", None
            ),  # непонятно где используется. оставил для безопасности
            "firstname": self.firstname,
            "messenger_id": self.messenger_id,
            "employee_number": self.employee_number,
            "points": self.points,
            "city_id": str(self.city_id) if self.city_id else None,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
        }

    @property
    def tags(self):
        return [ut.app_tag for ut in (self.user_tags or []) if ut.app_tag is not None]

--- src/backend/app/schemas/event.py ---
from datetime import datetime
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field, constr

from .app_tag import ResponseAppTagSchema
from .base import TimestampSchema


class BaseEvent(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    name: str
    description: str | None = None
    location: str | None = None
    tags: list[ResponseAppTagSchema] = Field(default_factory=list)
    bonus_code: str
    points: int
    date: datetime


class EventCreateSchema(BaseModel):
    """то, что приходит от бота при создании мероприятия"""

    model_config = ConfigDict(extra="forbid")

    name: constr(max_length=128)
    description: str | None = None
    location: constr(max_length=255) | None = None
    app_tag_ids: list[UUID] = Field(default_factory=list)
    bonus_code: constr(max_length=16)
    points: int = 0
    date: datetime
    city_id: UUID
    creator_id: UUID | None = None


class RequestEventSchema(BaseEvent):
    pass


class ResponseEventSchema(BaseEvent, TimestampSchema):
    id: UUID
    creator_id: UUID | None = (
        None  # TODO решить, оставляем ли или всегда приписываем UUID создателя
    )
    creator_name: str | None = None
    city_id: UUID


class EventSignUpSchema(BaseModel):
    event_id: UUID
    messenger_id: int


class EventSignOutSchema(BaseModel):
    participation_id: UUID


class EventCodeSchema(EventSignOutSchema):
    bonus_code: str


class EventUpdateSchema(BaseModel):
    model_config = ConfigDict(extra="forbid")

    name: constr(max_length=128) | None = None
    description: str | None = None
    location: str | None = None
    app_tag_ids: list[UUID] | None = None
    bonus_code: constr(max_length=16) | None = None
    points: int | None = None
    date: datetime | None = None
    city_id: UUID | None = None
    creator_id: UUID | None = None


class ResponseEventParticipantSchema(BaseModel):
    """один участник мероприятия"""

    model_config = ConfigDict(extra="forbid")

    participation_id: UUID
    is_claimed: bool

    user_id: UUID
    firstname: str | None = None
    messenger_id: int
    employee_number: int
    points: int

Error: 'src/backend/app/main.py' is not a valid file
Error: 'src/backend/app/api/__init__.py' is not a valid file
--- src/backend/app/repositories/event.py ---
from datetime import datetime
from uuid import UUID

from sqlalchemy import and_, delete, func, or_, select, update
from sqlalchemy.dialects import postgresql
from sqlalchemy.orm import selectinload

from ..db.models import AppTagEvent, Event
from ..filters import FilterContext, FilterSpecification
from ..utils.logger import logger
from .base import SQLRepository


class EventRepository(SQLRepository):
    model = Event

    def _base_events_query(self):
        return (
            select(self.model)
            .options(selectinload(self.model.app_tag_events).selectinload(AppTagEvent.app_tag))
            .options(selectinload(self.model.creator))
        )

    def _base_city_events_query(self, city_id: UUID):
        query = self._base_events_query()
        if city_id is not None:
            query = query.where(self.model.city_id == city_id)
        return query

    def _log_sql(self, query) -> None:
        try:
            compiled = query.compile(
                dialect=postgresql.dialect(),
                compile_kwargs={"literal_binds": True},
            )
            logger.debug(f"SQL:\n{compiled}")
        except Exception as e:
            logger.debug(f"SQL (repr):\n{query!r}")
            logger.debug(f"error: {e}")

    def _apply_keyset(
        self,
        query,
        *,
        cursor_date: datetime | None,
        cursor_id: UUID | None,
    ):
        if cursor_date is None or cursor_id is None:
            return query

        # order is (date desc, id desc) => next page: strictly smaller
        return query.where(
            or_(Event.date < cursor_date, and_(Event.date == cursor_date, Event.id < cursor_id))
        )

    async def get_cities_events(
        self,
        city_id: UUID | None,
        offset: int,
        limit: int,
        *,
        cursor_date: datetime | None = None,
        cursor_id: UUID | None = None,
        spec: FilterSpecification | None = None,
        ctx: FilterContext | None = None,
    ):
        # Phase 1: select ids with filters/joins, apply keyset and ordering, limit
        ids_q = select(Event.id, Event.date)
        if city_id is not None:
            ids_q = ids_q.where(Event.city_id == city_id)

        if spec is not None:
            ids_q = spec.apply(ids_q, ctx or FilterContext())

        ids_q = ids_q.group_by(Event.id, Event.date)
        ids_q = self._apply_keyset(ids_q, cursor_date=cursor_date, cursor_id=cursor_id)
        ids_q = ids_q.order_by(Event.date.desc(), Event.id.desc()).limit(limit)

        # NOTE: if cursor is used, offset is intentionally ignored
        if cursor_date is None and cursor_id is None:
            ids_q = ids_q.offset(offset)

        self._log_sql(ids_q)
        ids_res = await self.session.execute(ids_q)
        ids = [row[0] for row in ids_res.all()]
        if not ids:
            return []

        # Phase 2: load events with relationships
        query = (
            self._base_events_query()
            .where(Event.id.in_(ids))
            .order_by(Event.date.desc(), Event.id.desc())
        )
        self._log_sql(query)
        result = await self.session.execute(query)
        return result.scalars().all()

    async def get_by_id(self, event_id: UUID) -> Event | None:
        query = (
            select(self.model)
            .where(self.model.id == event_id)
            .options(selectinload(self.model.app_tag_events).selectinload(AppTagEvent.app_tag))
            .options(selectinload(self.model.creator))
        )
        result = await self.session.execute(query)
        return result.scalar_one_or_none()

    async def update_fields(self, event_id: UUID, data: dict) -> Event | None:
        if not data:
            return None

        query = (
            update(self.model).where(self.model.id == event_id).values(**data).returning(self.model)
        )
        result = await self.session.execute(query)
        await self.session.commit()
        return result.scalar_one_or_none()

    async def set_app_tags(self, event_id: UUID, app_tag_ids: list[UUID]) -> None:
        uniq = list(dict.fromkeys(app_tag_ids or []))
        await self.session.execute(delete(AppTagEvent).where(AppTagEvent.event_id == event_id))
        self.session.add_all([AppTagEvent(event_id=event_id, app_tag_id=tid) for tid in uniq])
        await self.session.commit()

    async def search_cities_events_with_total(
        self,
        city_id: UUID | None,
        offset: int,
        limit: int,
        *,
        cursor_date: datetime | None = None,
        cursor_id: UUID | None = None,
        spec: FilterSpecification | None = None,
        ctx: FilterContext | None = None,
    ) -> tuple[list[Event], int]:
        """
        как get_cities_events, но дополнительно возвращает total count
        для текущих фильтров (и cursor, если задан).
        total считается на стороне бд (без limit/offset).
        """

        ids_base = select(Event.id, Event.date)
        if city_id is not None:
            ids_base = ids_base.where(Event.city_id == city_id)

        if spec is not None:
            ids_base = spec.apply(ids_base, ctx or FilterContext())

        ids_base = ids_base.group_by(Event.id, Event.date)
        ids_base = self._apply_keyset(ids_base, cursor_date=cursor_date, cursor_id=cursor_id)

        total_q = select(func.count()).select_from(ids_base.subquery())
        self._log_sql(total_q)
        total_res = await self.session.execute(total_q)
        total = int(total_res.scalar() or 0)

        ids_q = ids_base.order_by(Event.date.desc(), Event.id.desc()).limit(limit)

        # NOTE: if cursor is used, offset is intentionally ignored
        if cursor_date is None and cursor_id is None:
            ids_q = ids_q.offset(offset)

        self._log_sql(ids_q)
        ids_res = await self.session.execute(ids_q)
        ids = [row[0] for row in ids_res.all()]
        if not ids:
            return ([], total)

        query = (
            self._base_events_query()
            .where(Event.id.in_(ids))
            .order_by(Event.date.desc(), Event.id.desc())
        )
        self._log_sql(query)
        result = await self.session.execute(query)
        return (result.scalars().all(), total)

--- src/backend/app/services/event.py ---
from typing import Any
from uuid import UUID

from ..dependencies.administrator import Administrator
from ..filters import FilterContext, FilterSpecification
from ..filters.filters.claimed import ClaimedEventsFilter
from ..filters.filters.organizer import OrganizerEventsFilter
from ..filters.filters.participant import ParticipantEventsFilter
from ..filters.filters.report import ReportEventsFilter
from ..filters.filters.tags import TagsEventsFilter
from ..schemas import (
    EventCreateSchema,
    EventSearchQuerySchema,
    EventSignOutSchema,
    EventSignUpSchema,
    EventUpdateSchema,
    ResponseEventParticipantSchema,
)
from ..utils.errors import BadRequestError, ConflictError, NotFoundError
from ..utils.logger import logger


class EventService:
    def __init__(self, administrator: Administrator):
        self.administrator = administrator

    async def get_cities_events(self, city_id: UUID, offset: int, limit: int):
        async with self.administrator.start() as admin:
            events = await admin.event.get_cities_events(
                city_id=city_id, offset=offset, limit=limit
            )
            return events

    async def search_events(self, query: EventSearchQuerySchema):
        logger.info(
            "search_events: city_id=%s offset=%s limit=%s cursor_date=%s cursor_id=%s requester_messenger_id=%s only_claimed=%s only_organizer=%s only_participant=%s has_report=%s tag_ids=%s",
            query.city_id,
            query.offset,
            query.limit,
            query.cursor_date,
            query.cursor_id,
            query.requester_messenger_id,
            query.only_claimed,
            query.only_organizer,
            query.only_participant,
            query.has_report,
            query.tag_ids,
        )

        # keyset cursor must be comlete
        if (query.cursor_date is None) ^ (query.cursor_id is None):
            raise BadRequestError("cursor_date and cursor_id must be provieded together")

        needs_requester = bool(query.only_claimed or query.only_organizer or query.only_participant)
        if needs_requester and query.requester_messenger_id is None:
            raise BadRequestError(
                "requester_messenger_id is required for claimed/organizer filters"
            )

        async with self.administrator.start() as admin:
            requester = None
            if query.requester_messenger_id is not None:
                requester = await admin.user.get_by_messenger_id(query.requester_messenger_id)
                if not requester:
                    raise NotFoundError("User not found")

            ctx_data: dict[str, Any] = {"tag_ids": query.tag_ids}
            if requester is not None:
                ctx_data["requester_user_id"] = getattr(requester, "id", None)
                ctx_data["requester_messenger_id"] = query.requester_messenger_id
            ctx = FilterContext(ctx_data)

            spec = FilterSpecification()
            spec.add(ClaimedEventsFilter(enabled=bool(query.only_claimed)))
            spec.add(OrganizerEventsFilter(enabled=bool(query.only_organizer)))
            spec.add(ParticipantEventsFilter(enabled=bool(query.only_participant)))
            spec.add(ReportEventsFilter(has_report=query.has_report))
            spec.add(TagsEventsFilter(tag_ids=list(query.tag_ids or [])))

            events, total = await admin.event.get_cities_events_with_total(
                city_id=query.city_id,
                offset=query.offset,
                limit=query.limit,
                cursor_date=query.cursor_date,
                cursor_id=query.cursor_id,
                spec=spec,
                ctx=ctx,
            )
            return events, total

    async def get_by_id(self, event_id: UUID):
        async with self.administrator.start() as admin:
            event = await admin.event.get_by_id(event_id)
            return event

    async def enter_code(self, participation_id: UUID, bonus_code: str):
        async with self.administrator.start() as admin:
            participation = await admin.participation.get_by_id(participation_id)
            if not participation:
                raise NotFoundError()
            if participation.is_claimed:
                raise BadRequestError("User already claimed points")

            event = await admin.event.get_by_id(participation.event_id)
            if event.bonus_code != bonus_code:
                raise BadRequestError("Codes do not match")

            await admin.participation.update(participation_id, {"is_claimed": True})
            updated_user = await admin.user.add_points(participation.user_id, event.points)

            return updated_user

    async def sign_up(self, data: EventSignUpSchema):
        async with self.administrator.start() as admin:
            user = await admin.user.get_by_messenger_id(data.messenger_id)
            if not user:
                raise NotFoundError("User not found")

            participation = await admin.participation.get_participation(user.id, data.event_id)
            if participation:
                raise ConflictError()

            participation_data = {"user_id": user.id, "event_id": data.event_id}

            participation = await admin.participation.insert(participation_data)
            return participation

    async def sign_out(self, data: EventSignOutSchema):
        async with self.administrator.start() as admin:
            participation = await admin.participation.get_by_id(data.participation_id)
            if not participation:
                raise NotFoundError()
            if participation.is_claimed:
                raise BadRequestError()

            deleted_participation = await admin.participation.delete(participation.id)
            return deleted_participation

    async def get_users_events(self, user_id: int):
        async with self.administrator.start() as admin:
            user = await admin.user.get_by_messenger_id(user_id)
            user_events = await admin.participation.get_user_participations(user.id)

            return user_events

    async def create_event(self, data: EventCreateSchema):
        """
        создать мероприятие в БД.
        creator_id можно пробросить из текущего пользователя (модератора),
        пока можно передавать None - поле в модели nullable.
        """
        async with self.administrator.start() as admin:
            event_data = {
                "creator_id": data.creator_id,
                "city_id": data.city_id,
                "name": data.name,
                "description": data.description,
                "location": data.location,
                "bonus_code": data.bonus_code,
                "points": data.points,
                "date": data.date,
            }
            event = await admin.event.insert(event_data)

            if data.app_tag_ids:
                await admin.event.set_app_tags(event.id, data.app_tag_ids)

            return await admin.event.get_by_id(event.id)

    async def delete_event(self, event_id: UUID) -> bool:
        """
        удалить мероприятие по id.
        true - получилось удалить
        false - не получилось удалить
        """
        async with self.administrator.start() as admin:
            event = await admin.event.get_by_id(event_id)
            if not event:
                return False

            await admin.event.delete(event_id)
            return True

    async def update_event(self, event_id: UUID, data: EventUpdateSchema):
        """
        обновляем мероприятие,
        проверяем что количество поинтов адекватное
        """
        async with self.administrator.start() as admin:
            event = await admin.event.get_by_id(event_id)
            if not event:
                raise NotFoundError("Event not found")

            patch = data.model_dump(exclude_unset=True)
            tag_ids = patch.pop("app_tag_ids", None)

            if "points" in patch and patch["points"] is not None:
                if patch["points"] <= 0 or patch["points"] >= 2**31:
                    raise BadRequestError("Invalid points value")

            if patch:
                updated = await admin.event.update_fields(event_id, patch)
                if not updated:
                    raise BadRequestError("Nothing to update")

            if tag_ids is not None:
                await admin.event.set_app_tags(event_id, tag_ids)

            refreshed = await admin.event.get_by_id(event_id)
            if not refreshed:
                raise BadRequestError("Event not found")

            return refreshed

    async def get_event_participants(
        self,
        event_id: UUID,
        requester_messenger_id: int,
        offset: int = 0,
        limit: int = 50,
    ) -> list[ResponseEventParticipantSchema]:
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            event = await admin.event.get_by_id(event_id)
            if not event:
                raise NotFoundError("Event not found")

            role = getattr(requester, "role", None)
            role_val = getattr(role, "value", role)
            is_admin = str(role_val) == "admin"

            if not is_admin:
                if not event.creator_id or str(event.creator_id) != str(requester.id):
                    raise BadRequestError("Access denied: only organizer can view participants")
                    # TODO replace with AuthError

            parts = await admin.participation.get_event_participations(
                event_id=event_id,
                offset=offset,
                limit=limit,
            )

            out: list[ResponseEventParticipantSchema] = []
            for p in parts:
                u = getattr(p, "user", None)
                if not u:
                    continue

                out.append(
                    ResponseEventParticipantSchema(
                        participation_id=p.id,
                        is_claimed=bool(getattr(p, "is_claimed", False)),
                        user_id=u.id,
                        firstname=getattr(u, "firstname", None),
                        messenger_id=int(getattr(u, "messenger_id", 0)),
                        employee_number=int(getattr(u, "employee_number", 0)),
                        points=int(getattr(u, "points", 0)),
                    )
                )

            return out

--- src/backend/app/api/v1/event.py ---
from datetime import datetime
from typing import Annotated
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, Path, Query, Response, status
from fastapi.responses import ORJSONResponse
from sqlalchemy.ext.asyncio import AsyncSession

from ...db.connection import get_db
from ...dependencies.administrator import Administrator
from ...schemas import (
    DefaultResponseSchema,
    EventCodeSchema,
    EventCreateSchema,
    EventSearchQuerySchema,
    EventSignOutSchema,
    EventSignUpSchema,
    EventUpdateSchema,
    ResponseEventParticipantSchema,
    ResponseEventSchema,
    ResponseUserSchema,
)
from ...services import EventService
from ...utils.logger import logger

api_router = APIRouter(prefix="/events", tags=["Events"])


SessionDep = Annotated[AsyncSession, Depends(get_db)]


@api_router.get(
    "/search",
    status_code=status.HTTP_200_OK,
    response_model=list[ResponseEventSchema],
)
async def search_events(
    session: SessionDep,
    response: Response,
    city_id: Annotated[UUID | None, Query()] = None,
    offset: Annotated[int, Query(ge=0)] = 0,
    limit: Annotated[int, Query(ge=1)] = 50,
    cursor_date: Annotated[datetime | None, Query()] = None,
    cursor_id: Annotated[UUID | None, Query()] = None,
    requester_messenger_id: Annotated[int | None, Query()] = None,
    only_claimed: Annotated[bool, Query()] = False,
    only_organizer: Annotated[bool, Query()] = False,
    only_participant: Annotated[bool, Query()] = False,
    has_report: Annotated[bool | None, Query()] = None,
    tag_ids: Annotated[list[UUID] | None, Query()] = None,
):
    logger.info(
        "HTTP /events/search: city_id=%s offset=%s limit=%s cursor_date=%s cursor_id=%s requester_messenger_id=%s only_claimed=%s only_organizer=%s only_participant=%s has_report=%s tag_ids=%s",
        city_id,
        offset,
        limit,
        cursor_date,
        cursor_id,
        requester_messenger_id,
        only_claimed,
        only_organizer,
        only_participant,
        has_report,
        tag_ids,
    )

    admin = Administrator(session)
    service = EventService(admin)
    query = EventSearchQuerySchema(
        city_id=city_id,
        offset=offset,
        limit=limit,
        cursor_date=cursor_date,
        cursor_id=cursor_id,
        requester_messenger_id=requester_messenger_id,
        only_claimed=only_claimed,
        only_organizer=only_organizer,
        only_participant=only_participant,
        has_report=has_report,
        tag_ids=tag_ids or [],
    )

    items, total = await service.search_events(query)
    response.headers["X-Total-Count"] = str(total)
    return items


@api_router.get(
    "/id/{city_id}", status_code=status.HTTP_200_OK, response_model=list[ResponseEventSchema]
)
async def get_cities_events(
    city_id: Annotated[UUID, Path()],
    offset: Annotated[int, Query()],
    limit: Annotated[int, Query()],
    session: SessionDep,
):
    admin = Administrator(session)
    service = EventService(admin)
    return_data = await service.get_cities_events(city_id, offset, limit)
    return return_data


@api_router.get(
    "/info/{event_id}", status_code=status.HTTP_200_OK, response_model=ResponseEventSchema
)
async def get_event_info(event_id: Annotated[UUID, Path()], session: SessionDep):
    admin = Administrator(session)
    service = EventService(admin)
    return_data = await service.get_by_id(event_id)
    return return_data


@api_router.get(
    "/{event_id}/participants",
    status_code=status.HTTP_200_OK,
    response_model=list[ResponseEventParticipantSchema],
)
async def get_event_participants(
    event_id: Annotated[UUID, Path()],
    requester_messenger_id: Annotated[int, Query()],
    session: SessionDep,
    offset: Annotated[int, Query()] = 0,
    limit: Annotated[int, Query()] = 50,
):
    admin = Administrator(session)
    service = EventService(admin)
    return await service.get_event_participants(
        event_id=event_id,
        requester_messenger_id=requester_messenger_id,
        offset=offset,
        limit=limit,
    )


@api_router.get("/my", status_code=status.HTTP_200_OK)
async def get_users_events(user_id: Annotated[int, Query()], session: SessionDep):
    admin = Administrator(session)
    service = EventService(admin)
    return_data = await service.get_users_events(user_id)
    return return_data


@api_router.post("/code", status_code=status.HTTP_200_OK, response_model=ResponseUserSchema)
async def event_enter_code(body: EventCodeSchema, session: SessionDep):
    admin = Administrator(session)
    service = EventService(admin)
    return_data = await service.enter_code(body.participation_id, body.bonus_code)
    return return_data


@api_router.post("/sign-up", status_code=status.HTTP_200_OK, response_model=DefaultResponseSchema)
async def event_sign_up(body: EventSignUpSchema, session: SessionDep):
    admin = Administrator(session)
    service = EventService(admin)
    await service.sign_up(body)
    return ORJSONResponse(
        {
            "status": "OK",
            "description": f"User {body.messenger_id} successfully signed up for event {body.event_id}",
        }
    )


@api_router.patch("/{event_id}", status_code=status.HTTP_200_OK, response_model=ResponseEventSchema)
async def update_event(
    event_id: Annotated[UUID, Path()],
    body: EventUpdateSchema,
    session: SessionDep,
):
    admin = Administrator(session)
    service = EventService(admin)
    return await service.update_event(event_id, body)


@api_router.delete(
    "/sign-out", status_code=status.HTTP_200_OK, response_model=DefaultResponseSchema
)
async def event_sign_out(body: EventSignOutSchema, session: SessionDep):
    admin = Administrator(session)
    service = EventService(admin)
    await service.sign_out(body)
    return ORJSONResponse(
        {
            "status": "OK",
            "description": f"User successfully signed out for participation {body.participation_id}",
        }
    )


@api_router.post("", status_code=status.HTTP_201_CREATED, response_model=ResponseEventSchema)
async def create_event(body: EventCreateSchema, session: SessionDep):
    """
    создать мероприятие.

    пока без привязки к конкретному модератору:
    creator_id передаём как None (в модели поле nullable)
    позже можно будет подставлять id текущего пользователя
    """  # TODO
    admin = Administrator(session)
    service = EventService(admin)

    event = await service.create_event(body)
    return event


@api_router.delete("/{event_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_event(event_id: Annotated[UUID, Path()], session: SessionDep):
    """удалить мероприятие по id"""
    admin = Administrator(session)
    service = EventService(admin)

    deleted = await service.delete_event(event_id)

    if not deleted:
        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found")

    return Response(status_code=status.HTTP_204_NO_CONTENT)

Editor is loading...
Leave a Comment