Untitled
4ae4d
plain_text
4 months ago
38 kB
18
Indexable
[email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai > podman compose logs -f --tail=300 pomogai_backend >>>> Executing external compose provider "/usr/bin/docker-compose". Please see podman-compose(1) for how to disable this message. <<<< ERROR: 'network_mode' and 'networks' cannot be combined [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 1 > podman compose-vuild logs -f --tail=300 pomogai_backend Error: unknown shorthand flag: 'f' in -f See 'podman --help' [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 125 > podman compose-build logs -f --tail=300 pomogai_backend Error: unknown shorthand flag: 'f' in -f See 'podman --help' [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 125 > podman compose up logs -f --tail=300 pomogai_backend >>>> Executing external compose provider "/usr/bin/docker-compose". Please see podman-compose(1) for how to disable this message. <<<< ERROR: 'network_mode' and 'networks' cannot be combined Error: executing /usr/bin/docker-compose up logs -f --tail=300 pomogai_backend: exit status 1 [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 1 > podman compose-up logs -f --tail=300 pomogai_backend Error: unknown shorthand flag: 'f' in -f See 'podman --help' [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 125 > docker compose logs -f --tail=300 pomogai_backend Emulate Docker CLI using podman. Create /etc/containers/nodocker to quiet msg. >>>> Executing external compose provider "/usr/bin/docker-compose". Please see podman-compose(1) for how to disable this message. <<<< ERROR: 'network_mode' and 'networks' cannot be combined Error: executing /usr/bin/docker-compose logs -f --tail=300 pomogai_backend: exit status 1 [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 1 > podman-compose logs -f tail=300 pomogai_backend WARNING:podman_compose:missing services [tail=300,pomogai_backend] [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 1 > podman logs -f --tail=300 pomogai_backend Error: no container with name or ID "pomogai_backend" found: no such container [email protected] URB-WSN-0007608 /home/work/[email protected]/pomogai 125 > journalctl -u pomogai-backend -n 300 -f Hint: You are currently not seeing messages from other users and the system. Users in groups 'adm', 'systemd-journal' can see all messages. Pass -q to turn off this notice. No journal files were opened due to insufficient permissions. f=src/backend/app/utils/errors.py f=src/backend/app/utils/logger.py f=src/backend/app/filters/__init__.py f=src/backend/app/filters/filters/tags.py f=src/backend/app/filters/filters/participant.py f=src/backend/app/filters/filters/report.py f=src/backend/app/filters/filters/organizer.py f=src/backend/app/db/models/event_report.py f=src/backend/app/db/models/user.py f=src/backend/app/schemas/event.py f=src/backend/app/main.py f=src/backend/app/api/__init__.py f=src/backend/app/repositories/event.py f=src/backend/app/services/event.py f=src/backend/app/api/v1/event.py Error: 'src/backend/app/utils/errors.py' is not a valid file --- src/backend/app/utils/logger.py --- import logging from logging.handlers import RotatingFileHandler from pathlib import Path LOG_FILE = Path("backend.log") LOG_FORMAT = "[%(asctime)s] [%(levelname)s] %(message)s" DATE_FORMAT = "%Y-%m-%d %H:%M:%S" logger = logging.getLogger("backend") logger.setLevel(logging.DEBUG) formatter = logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT) file_handler = RotatingFileHandler( LOG_FILE, maxBytes=5_000_000, # 5 MB backupCount=3, encoding="utf-8", ) file_handler.setFormatter(formatter) if not logger.hasHandlers(): logger.addHandler(file_handler) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) logger.propagate = False --- src/backend/app/filters/__init__.py --- from .base import FilterContext, QueryFilter from .builder import FilterBuilder from .specification import FilterSpecification __all__ = [ "FilterContext", "QueryFilter", "FilterBuilder", "FilterSpecification", ] --- src/backend/app/filters/filters/tags.py --- from __future__ import annotations from dataclasses import dataclass, field from uuid import UUID from sqlalchemy import exists, select from sqlalchemy.sql import Select from ...db.models import AppTagEvent, Event from ..base import FilterContext, QueryFilter @dataclass(slots=True) class TagsEventsFilter(QueryFilter): """tag_ids (ANY): события, у которых есть хотя бы один AppTag из списка""" enabled: bool = True tag_ids: list[UUID] = field(default_factory=list) def is_enabled(self) -> bool: return bool(self.tag_ids) def apply(self, query: Select, ctx: FilterContext) -> Select: subq = ( select(1) .select_from(AppTagEvent) .where( AppTagEvent.event_id == Event.id, AppTagEvent.app_tag_id.in_(self.tag_ids), ) ) return query.where(exists(subq)) --- src/backend/app/filters/filters/participant.py --- from __future__ import annotations from dataclasses import dataclass from sqlalchemy import exists, select from sqlalchemy.sql import Select from ...db.models import Event, Participation from ..base import FilterContext, QueryFilter @dataclass(slots=True) class ParticipantEventsFilter(QueryFilter): """only_participant: события, где requester является участником""" enabled: bool = False def is_enabled(self) -> bool: return bool(self.enabled) def apply(self, query: Select, ctx: FilterContext) -> Select: requester_user_id = ctx.get("requester_user_id") if requester_user_id is None: return query subq = ( select(1) .select_from(Participation) .where( Participation.user_id == requester_user_id, Participation.event_id == Event.id, ) ) return query.where(exists(subq)) --- src/backend/app/filters/filters/report.py --- from __future__ import annotations from dataclasses import dataclass from sqlalchemy import exists, select from sqlalchemy.sql import Select from ...db.models import Event, EventReport from ..base import FilterContext, QueryFilter @dataclass(slots=True) class ReportEventsFilter(QueryFilter): """ has_report: None -> не фильтруем True -> только архив (есть EventReport) False -> только активные (нет EventReport) """ has_report: bool | None = None def is_enabled(self) -> bool: return self.has_report is not None def apply(self, query: Select, ctx: FilterContext) -> Select: subq = select(1).select_from(EventReport).where(EventReport.event_id == Event.id) cond = exists(subq) if self.has_report is True: return query.where(cond) return query.where(~cond) --- src/backend/app/filters/filters/organizer.py --- from __future__ import annotations from dataclasses import dataclass from sqlalchemy.sql import Select from ...db.models import Event from ..base import FilterContext, QueryFilter @dataclass(slots=True) class OrganizerEventsFilter(QueryFilter): """only_organizer: события, где requester является creator""" enabled: bool = False def is_enabled(self) -> bool: return bool(self.enabled) def apply(self, query: Select, ctx: FilterContext) -> Select: requester_user_id = ctx.get("requester_user_id") if requester_user_id is None: return query return query.where(Event.creator_id == requester_user_id) --- src/backend/app/db/models/event_report.py --- from __future__ import annotations from typing import TYPE_CHECKING from uuid import UUID from sqlalchemy import ForeignKey, Integer, Text, UniqueConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from .base import Model if TYPE_CHECKING: from .event import Event class EventReport(Model): __tablename__ = "event_reports" event_id: Mapped[UUID] = mapped_column( ForeignKey("events.id", ondelete="CASCADE"), nullable=False, unique=True, ) how_it_went: Mapped[str] = mapped_column(Text, nullable=False) # можно пока оставить nullable, чтобы не ломать демо/мастер beneficiaries_count: Mapped[int | None] = mapped_column(Integer, nullable=True) participants_count_snapshot: Mapped[int] = mapped_column(Integer, nullable=False, default=0) event: Mapped[Event] = relationship(back_populates="report", uselist=False) __table_args__ = (UniqueConstraint("event_id", name="uix_event_reports_event_id"),) --- src/backend/app/db/models/user.py --- from typing import TYPE_CHECKING from uuid import UUID from sqlalchemy import BigInteger, ForeignKey, Integer, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship from .base import Model from .enums import RoleType if TYPE_CHECKING: from .city import City from .custom_tag import CustomTag from .event import Event from .participation import Participation from .user_tag import UserTag class User(Model): __tablename__ = "users" firstname: Mapped[str] = mapped_column(String(100), nullable=True) messenger_id: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False) employee_number: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False) role: Mapped[RoleType] = mapped_column(Text, default=RoleType.user.value) points: Mapped[int] = mapped_column(Integer, nullable=False, default=0) city_id: Mapped[UUID] = mapped_column(ForeignKey("cities.id"), nullable=True) # Relationships city: Mapped["City"] = relationship(back_populates="users") custom_tags: Mapped[list["CustomTag"]] = relationship( back_populates="user", cascade="all, delete-orphan", uselist=True ) user_tags: Mapped[list["UserTag"]] = relationship( back_populates="user", cascade="all, delete-orphan", uselist=True ) participations: Mapped[list["Participation"]] = relationship( back_populates="user", cascade="all, delete-orphan", uselist=True ) events: Mapped[list["Event"]] = relationship( back_populates="creator", cascade="all, delete-orphan", uselist=True ) def to_dict(self) -> dict: return { "id": str(self.id), "username": getattr( self, "username", None ), # непонятно где используется. оставил для безопасности "firstname": self.firstname, "messenger_id": self.messenger_id, "employee_number": self.employee_number, "points": self.points, "city_id": str(self.city_id) if self.city_id else None, "created_at": self.created_at.isoformat() if self.created_at else None, "updated_at": self.updated_at.isoformat() if self.updated_at else None, } @property def tags(self): return [ut.app_tag for ut in (self.user_tags or []) if ut.app_tag is not None] --- src/backend/app/schemas/event.py --- from datetime import datetime from uuid import UUID from pydantic import BaseModel, ConfigDict, Field, constr from .app_tag import ResponseAppTagSchema from .base import TimestampSchema class BaseEvent(BaseModel): model_config = ConfigDict(from_attributes=True) name: str description: str | None = None location: str | None = None tags: list[ResponseAppTagSchema] = Field(default_factory=list) bonus_code: str points: int date: datetime class EventCreateSchema(BaseModel): """то, что приходит от бота при создании мероприятия""" model_config = ConfigDict(extra="forbid") name: constr(max_length=128) description: str | None = None location: constr(max_length=255) | None = None app_tag_ids: list[UUID] = Field(default_factory=list) bonus_code: constr(max_length=16) points: int = 0 date: datetime city_id: UUID creator_id: UUID | None = None class RequestEventSchema(BaseEvent): pass class ResponseEventSchema(BaseEvent, TimestampSchema): id: UUID creator_id: UUID | None = ( None # TODO решить, оставляем ли или всегда приписываем UUID создателя ) creator_name: str | None = None city_id: UUID class EventSignUpSchema(BaseModel): event_id: UUID messenger_id: int class EventSignOutSchema(BaseModel): participation_id: UUID class EventCodeSchema(EventSignOutSchema): bonus_code: str class EventUpdateSchema(BaseModel): model_config = ConfigDict(extra="forbid") name: constr(max_length=128) | None = None description: str | None = None location: str | None = None app_tag_ids: list[UUID] | None = None bonus_code: constr(max_length=16) | None = None points: int | None = None date: datetime | None = None city_id: UUID | None = None creator_id: UUID | None = None class ResponseEventParticipantSchema(BaseModel): """один участник мероприятия""" model_config = ConfigDict(extra="forbid") participation_id: UUID is_claimed: bool user_id: UUID firstname: str | None = None messenger_id: int employee_number: int points: int Error: 'src/backend/app/main.py' is not a valid file Error: 'src/backend/app/api/__init__.py' is not a valid file --- src/backend/app/repositories/event.py --- from datetime import datetime from uuid import UUID from sqlalchemy import and_, delete, func, or_, select, update from sqlalchemy.dialects import postgresql from sqlalchemy.orm import selectinload from ..db.models import AppTagEvent, Event from ..filters import FilterContext, FilterSpecification from ..utils.logger import logger from .base import SQLRepository class EventRepository(SQLRepository): model = Event def _base_events_query(self): return ( select(self.model) .options(selectinload(self.model.app_tag_events).selectinload(AppTagEvent.app_tag)) .options(selectinload(self.model.creator)) ) def _base_city_events_query(self, city_id: UUID): query = self._base_events_query() if city_id is not None: query = query.where(self.model.city_id == city_id) return query def _log_sql(self, query) -> None: try: compiled = query.compile( dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}, ) logger.debug(f"SQL:\n{compiled}") except Exception as e: logger.debug(f"SQL (repr):\n{query!r}") logger.debug(f"error: {e}") def _apply_keyset( self, query, *, cursor_date: datetime | None, cursor_id: UUID | None, ): if cursor_date is None or cursor_id is None: return query # order is (date desc, id desc) => next page: strictly smaller return query.where( or_(Event.date < cursor_date, and_(Event.date == cursor_date, Event.id < cursor_id)) ) async def get_cities_events( self, city_id: UUID | None, offset: int, limit: int, *, cursor_date: datetime | None = None, cursor_id: UUID | None = None, spec: FilterSpecification | None = None, ctx: FilterContext | None = None, ): # Phase 1: select ids with filters/joins, apply keyset and ordering, limit ids_q = select(Event.id, Event.date) if city_id is not None: ids_q = ids_q.where(Event.city_id == city_id) if spec is not None: ids_q = spec.apply(ids_q, ctx or FilterContext()) ids_q = ids_q.group_by(Event.id, Event.date) ids_q = self._apply_keyset(ids_q, cursor_date=cursor_date, cursor_id=cursor_id) ids_q = ids_q.order_by(Event.date.desc(), Event.id.desc()).limit(limit) # NOTE: if cursor is used, offset is intentionally ignored if cursor_date is None and cursor_id is None: ids_q = ids_q.offset(offset) self._log_sql(ids_q) ids_res = await self.session.execute(ids_q) ids = [row[0] for row in ids_res.all()] if not ids: return [] # Phase 2: load events with relationships query = ( self._base_events_query() .where(Event.id.in_(ids)) .order_by(Event.date.desc(), Event.id.desc()) ) self._log_sql(query) result = await self.session.execute(query) return result.scalars().all() async def get_by_id(self, event_id: UUID) -> Event | None: query = ( select(self.model) .where(self.model.id == event_id) .options(selectinload(self.model.app_tag_events).selectinload(AppTagEvent.app_tag)) .options(selectinload(self.model.creator)) ) result = await self.session.execute(query) return result.scalar_one_or_none() async def update_fields(self, event_id: UUID, data: dict) -> Event | None: if not data: return None query = ( update(self.model).where(self.model.id == event_id).values(**data).returning(self.model) ) result = await self.session.execute(query) await self.session.commit() return result.scalar_one_or_none() async def set_app_tags(self, event_id: UUID, app_tag_ids: list[UUID]) -> None: uniq = list(dict.fromkeys(app_tag_ids or [])) await self.session.execute(delete(AppTagEvent).where(AppTagEvent.event_id == event_id)) self.session.add_all([AppTagEvent(event_id=event_id, app_tag_id=tid) for tid in uniq]) await self.session.commit() async def search_cities_events_with_total( self, city_id: UUID | None, offset: int, limit: int, *, cursor_date: datetime | None = None, cursor_id: UUID | None = None, spec: FilterSpecification | None = None, ctx: FilterContext | None = None, ) -> tuple[list[Event], int]: """ как get_cities_events, но дополнительно возвращает total count для текущих фильтров (и cursor, если задан). total считается на стороне бд (без limit/offset). """ ids_base = select(Event.id, Event.date) if city_id is not None: ids_base = ids_base.where(Event.city_id == city_id) if spec is not None: ids_base = spec.apply(ids_base, ctx or FilterContext()) ids_base = ids_base.group_by(Event.id, Event.date) ids_base = self._apply_keyset(ids_base, cursor_date=cursor_date, cursor_id=cursor_id) total_q = select(func.count()).select_from(ids_base.subquery()) self._log_sql(total_q) total_res = await self.session.execute(total_q) total = int(total_res.scalar() or 0) ids_q = ids_base.order_by(Event.date.desc(), Event.id.desc()).limit(limit) # NOTE: if cursor is used, offset is intentionally ignored if cursor_date is None and cursor_id is None: ids_q = ids_q.offset(offset) self._log_sql(ids_q) ids_res = await self.session.execute(ids_q) ids = [row[0] for row in ids_res.all()] if not ids: return ([], total) query = ( self._base_events_query() .where(Event.id.in_(ids)) .order_by(Event.date.desc(), Event.id.desc()) ) self._log_sql(query) result = await self.session.execute(query) return (result.scalars().all(), total) --- src/backend/app/services/event.py --- from typing import Any from uuid import UUID from ..dependencies.administrator import Administrator from ..filters import FilterContext, FilterSpecification from ..filters.filters.claimed import ClaimedEventsFilter from ..filters.filters.organizer import OrganizerEventsFilter from ..filters.filters.participant import ParticipantEventsFilter from ..filters.filters.report import ReportEventsFilter from ..filters.filters.tags import TagsEventsFilter from ..schemas import ( EventCreateSchema, EventSearchQuerySchema, EventSignOutSchema, EventSignUpSchema, EventUpdateSchema, ResponseEventParticipantSchema, ) from ..utils.errors import BadRequestError, ConflictError, NotFoundError from ..utils.logger import logger class EventService: def __init__(self, administrator: Administrator): self.administrator = administrator async def get_cities_events(self, city_id: UUID, offset: int, limit: int): async with self.administrator.start() as admin: events = await admin.event.get_cities_events( city_id=city_id, offset=offset, limit=limit ) return events async def search_events(self, query: EventSearchQuerySchema): logger.info( "search_events: city_id=%s offset=%s limit=%s cursor_date=%s cursor_id=%s requester_messenger_id=%s only_claimed=%s only_organizer=%s only_participant=%s has_report=%s tag_ids=%s", query.city_id, query.offset, query.limit, query.cursor_date, query.cursor_id, query.requester_messenger_id, query.only_claimed, query.only_organizer, query.only_participant, query.has_report, query.tag_ids, ) # keyset cursor must be comlete if (query.cursor_date is None) ^ (query.cursor_id is None): raise BadRequestError("cursor_date and cursor_id must be provieded together") needs_requester = bool(query.only_claimed or query.only_organizer or query.only_participant) if needs_requester and query.requester_messenger_id is None: raise BadRequestError( "requester_messenger_id is required for claimed/organizer filters" ) async with self.administrator.start() as admin: requester = None if query.requester_messenger_id is not None: requester = await admin.user.get_by_messenger_id(query.requester_messenger_id) if not requester: raise NotFoundError("User not found") ctx_data: dict[str, Any] = {"tag_ids": query.tag_ids} if requester is not None: ctx_data["requester_user_id"] = getattr(requester, "id", None) ctx_data["requester_messenger_id"] = query.requester_messenger_id ctx = FilterContext(ctx_data) spec = FilterSpecification() spec.add(ClaimedEventsFilter(enabled=bool(query.only_claimed))) spec.add(OrganizerEventsFilter(enabled=bool(query.only_organizer))) spec.add(ParticipantEventsFilter(enabled=bool(query.only_participant))) spec.add(ReportEventsFilter(has_report=query.has_report)) spec.add(TagsEventsFilter(tag_ids=list(query.tag_ids or []))) events, total = await admin.event.get_cities_events_with_total( city_id=query.city_id, offset=query.offset, limit=query.limit, cursor_date=query.cursor_date, cursor_id=query.cursor_id, spec=spec, ctx=ctx, ) return events, total async def get_by_id(self, event_id: UUID): async with self.administrator.start() as admin: event = await admin.event.get_by_id(event_id) return event async def enter_code(self, participation_id: UUID, bonus_code: str): async with self.administrator.start() as admin: participation = await admin.participation.get_by_id(participation_id) if not participation: raise NotFoundError() if participation.is_claimed: raise BadRequestError("User already claimed points") event = await admin.event.get_by_id(participation.event_id) if event.bonus_code != bonus_code: raise BadRequestError("Codes do not match") await admin.participation.update(participation_id, {"is_claimed": True}) updated_user = await admin.user.add_points(participation.user_id, event.points) return updated_user async def sign_up(self, data: EventSignUpSchema): async with self.administrator.start() as admin: user = await admin.user.get_by_messenger_id(data.messenger_id) if not user: raise NotFoundError("User not found") participation = await admin.participation.get_participation(user.id, data.event_id) if participation: raise ConflictError() participation_data = {"user_id": user.id, "event_id": data.event_id} participation = await admin.participation.insert(participation_data) return participation async def sign_out(self, data: EventSignOutSchema): async with self.administrator.start() as admin: participation = await admin.participation.get_by_id(data.participation_id) if not participation: raise NotFoundError() if participation.is_claimed: raise BadRequestError() deleted_participation = await admin.participation.delete(participation.id) return deleted_participation async def get_users_events(self, user_id: int): async with self.administrator.start() as admin: user = await admin.user.get_by_messenger_id(user_id) user_events = await admin.participation.get_user_participations(user.id) return user_events async def create_event(self, data: EventCreateSchema): """ создать мероприятие в БД. creator_id можно пробросить из текущего пользователя (модератора), пока можно передавать None - поле в модели nullable. """ async with self.administrator.start() as admin: event_data = { "creator_id": data.creator_id, "city_id": data.city_id, "name": data.name, "description": data.description, "location": data.location, "bonus_code": data.bonus_code, "points": data.points, "date": data.date, } event = await admin.event.insert(event_data) if data.app_tag_ids: await admin.event.set_app_tags(event.id, data.app_tag_ids) return await admin.event.get_by_id(event.id) async def delete_event(self, event_id: UUID) -> bool: """ удалить мероприятие по id. true - получилось удалить false - не получилось удалить """ async with self.administrator.start() as admin: event = await admin.event.get_by_id(event_id) if not event: return False await admin.event.delete(event_id) return True async def update_event(self, event_id: UUID, data: EventUpdateSchema): """ обновляем мероприятие, проверяем что количество поинтов адекватное """ async with self.administrator.start() as admin: event = await admin.event.get_by_id(event_id) if not event: raise NotFoundError("Event not found") patch = data.model_dump(exclude_unset=True) tag_ids = patch.pop("app_tag_ids", None) if "points" in patch and patch["points"] is not None: if patch["points"] <= 0 or patch["points"] >= 2**31: raise BadRequestError("Invalid points value") if patch: updated = await admin.event.update_fields(event_id, patch) if not updated: raise BadRequestError("Nothing to update") if tag_ids is not None: await admin.event.set_app_tags(event_id, tag_ids) refreshed = await admin.event.get_by_id(event_id) if not refreshed: raise BadRequestError("Event not found") return refreshed async def get_event_participants( self, event_id: UUID, requester_messenger_id: int, offset: int = 0, limit: int = 50, ) -> list[ResponseEventParticipantSchema]: async with self.administrator.start() as admin: requester = await admin.user.get_by_messenger_id(requester_messenger_id) if not requester: raise NotFoundError("User not found") event = await admin.event.get_by_id(event_id) if not event: raise NotFoundError("Event not found") role = getattr(requester, "role", None) role_val = getattr(role, "value", role) is_admin = str(role_val) == "admin" if not is_admin: if not event.creator_id or str(event.creator_id) != str(requester.id): raise BadRequestError("Access denied: only organizer can view participants") # TODO replace with AuthError parts = await admin.participation.get_event_participations( event_id=event_id, offset=offset, limit=limit, ) out: list[ResponseEventParticipantSchema] = [] for p in parts: u = getattr(p, "user", None) if not u: continue out.append( ResponseEventParticipantSchema( participation_id=p.id, is_claimed=bool(getattr(p, "is_claimed", False)), user_id=u.id, firstname=getattr(u, "firstname", None), messenger_id=int(getattr(u, "messenger_id", 0)), employee_number=int(getattr(u, "employee_number", 0)), points=int(getattr(u, "points", 0)), ) ) return out --- src/backend/app/api/v1/event.py --- from datetime import datetime from typing import Annotated from uuid import UUID from fastapi import APIRouter, Depends, HTTPException, Path, Query, Response, status from fastapi.responses import ORJSONResponse from sqlalchemy.ext.asyncio import AsyncSession from ...db.connection import get_db from ...dependencies.administrator import Administrator from ...schemas import ( DefaultResponseSchema, EventCodeSchema, EventCreateSchema, EventSearchQuerySchema, EventSignOutSchema, EventSignUpSchema, EventUpdateSchema, ResponseEventParticipantSchema, ResponseEventSchema, ResponseUserSchema, ) from ...services import EventService from ...utils.logger import logger api_router = APIRouter(prefix="/events", tags=["Events"]) SessionDep = Annotated[AsyncSession, Depends(get_db)] @api_router.get( "/search", status_code=status.HTTP_200_OK, response_model=list[ResponseEventSchema], ) async def search_events( session: SessionDep, response: Response, city_id: Annotated[UUID | None, Query()] = None, offset: Annotated[int, Query(ge=0)] = 0, limit: Annotated[int, Query(ge=1)] = 50, cursor_date: Annotated[datetime | None, Query()] = None, cursor_id: Annotated[UUID | None, Query()] = None, requester_messenger_id: Annotated[int | None, Query()] = None, only_claimed: Annotated[bool, Query()] = False, only_organizer: Annotated[bool, Query()] = False, only_participant: Annotated[bool, Query()] = False, has_report: Annotated[bool | None, Query()] = None, tag_ids: Annotated[list[UUID] | None, Query()] = None, ): logger.info( "HTTP /events/search: city_id=%s offset=%s limit=%s cursor_date=%s cursor_id=%s requester_messenger_id=%s only_claimed=%s only_organizer=%s only_participant=%s has_report=%s tag_ids=%s", city_id, offset, limit, cursor_date, cursor_id, requester_messenger_id, only_claimed, only_organizer, only_participant, has_report, tag_ids, ) admin = Administrator(session) service = EventService(admin) query = EventSearchQuerySchema( city_id=city_id, offset=offset, limit=limit, cursor_date=cursor_date, cursor_id=cursor_id, requester_messenger_id=requester_messenger_id, only_claimed=only_claimed, only_organizer=only_organizer, only_participant=only_participant, has_report=has_report, tag_ids=tag_ids or [], ) items, total = await service.search_events(query) response.headers["X-Total-Count"] = str(total) return items @api_router.get( "/id/{city_id}", status_code=status.HTTP_200_OK, response_model=list[ResponseEventSchema] ) async def get_cities_events( city_id: Annotated[UUID, Path()], offset: Annotated[int, Query()], limit: Annotated[int, Query()], session: SessionDep, ): admin = Administrator(session) service = EventService(admin) return_data = await service.get_cities_events(city_id, offset, limit) return return_data @api_router.get( "/info/{event_id}", status_code=status.HTTP_200_OK, response_model=ResponseEventSchema ) async def get_event_info(event_id: Annotated[UUID, Path()], session: SessionDep): admin = Administrator(session) service = EventService(admin) return_data = await service.get_by_id(event_id) return return_data @api_router.get( "/{event_id}/participants", status_code=status.HTTP_200_OK, response_model=list[ResponseEventParticipantSchema], ) async def get_event_participants( event_id: Annotated[UUID, Path()], requester_messenger_id: Annotated[int, Query()], session: SessionDep, offset: Annotated[int, Query()] = 0, limit: Annotated[int, Query()] = 50, ): admin = Administrator(session) service = EventService(admin) return await service.get_event_participants( event_id=event_id, requester_messenger_id=requester_messenger_id, offset=offset, limit=limit, ) @api_router.get("/my", status_code=status.HTTP_200_OK) async def get_users_events(user_id: Annotated[int, Query()], session: SessionDep): admin = Administrator(session) service = EventService(admin) return_data = await service.get_users_events(user_id) return return_data @api_router.post("/code", status_code=status.HTTP_200_OK, response_model=ResponseUserSchema) async def event_enter_code(body: EventCodeSchema, session: SessionDep): admin = Administrator(session) service = EventService(admin) return_data = await service.enter_code(body.participation_id, body.bonus_code) return return_data @api_router.post("/sign-up", status_code=status.HTTP_200_OK, response_model=DefaultResponseSchema) async def event_sign_up(body: EventSignUpSchema, session: SessionDep): admin = Administrator(session) service = EventService(admin) await service.sign_up(body) return ORJSONResponse( { "status": "OK", "description": f"User {body.messenger_id} successfully signed up for event {body.event_id}", } ) @api_router.patch("/{event_id}", status_code=status.HTTP_200_OK, response_model=ResponseEventSchema) async def update_event( event_id: Annotated[UUID, Path()], body: EventUpdateSchema, session: SessionDep, ): admin = Administrator(session) service = EventService(admin) return await service.update_event(event_id, body) @api_router.delete( "/sign-out", status_code=status.HTTP_200_OK, response_model=DefaultResponseSchema ) async def event_sign_out(body: EventSignOutSchema, session: SessionDep): admin = Administrator(session) service = EventService(admin) await service.sign_out(body) return ORJSONResponse( { "status": "OK", "description": f"User successfully signed out for participation {body.participation_id}", } ) @api_router.post("", status_code=status.HTTP_201_CREATED, response_model=ResponseEventSchema) async def create_event(body: EventCreateSchema, session: SessionDep): """ создать мероприятие. пока без привязки к конкретному модератору: creator_id передаём как None (в модели поле nullable) позже можно будет подставлять id текущего пользователя """ # TODO admin = Administrator(session) service = EventService(admin) event = await service.create_event(body) return event @api_router.delete("/{event_id}", status_code=status.HTTP_204_NO_CONTENT) async def delete_event(event_id: Annotated[UUID, Path()], session: SessionDep): """удалить мероприятие по id""" admin = Administrator(session) service = EventService(admin) deleted = await service.delete_event(event_id) if not deleted: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Event not found") return Response(status_code=status.HTTP_204_NO_CONTENT)
Editor is loading...
Leave a Comment