Untitled

 avatar
4ae4d
plain_text
2 months ago
28 kB
5
Indexable
f=src/backend/app/db/models/user.py
f=src/backend/app/db/models/enums.py
f=src/backend/app/db/models/gosb.py
f=src/backend/app/db/models/territorial_bank.py
f=src/backend/app/db/models/project.py
f=src/backend/app/db/models/request.py
f=src/backend/app/policy/event_access.py
f=src/backend/app/services/report.py
f=src/backend/app/services/event.py
f=src/backend/app/schemas/user.py
--- src/backend/app/db/models/user.py ---
from typing import TYPE_CHECKING
from uuid import UUID

from sqlalchemy import BigInteger, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model
from .enums import RoleType

if TYPE_CHECKING:
    from .city import City
    from .custom_tag import CustomTag
    from .event import Event
    from .event_organizer import EventOrganizer
    from .participation import Participation
    from .project import Project
    from .request import Request
    from .user_tag import UserTag


class User(Model):
    __tablename__ = "users"

    firstname: Mapped[str | None] = mapped_column(String(100), nullable=True)
    messenger_id: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False)
    employee_number: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False)
    role: Mapped[str] = mapped_column(Text, default=RoleType.user.value)
    points: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    city_id: Mapped[UUID | None] = mapped_column(ForeignKey("cities.id"), nullable=True)

    # Relationships
    city: Mapped["City"] = relationship(back_populates="users")
    custom_tags: Mapped[list["CustomTag"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    user_tags: Mapped[list["UserTag"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    participations: Mapped[list["Participation"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    events: Mapped[list["Event"]] = relationship(
        back_populates="creator", cascade="all, delete-orphan", uselist=True
    )

    organized_events_links: Mapped[list["EventOrganizer"]] = relationship(
        back_populates="user", cascade="all, delete-orphan", uselist=True
    )
    led_projects: Mapped[list["Project"]] = relationship(back_populates="leader", uselist=True)
    authored_requests: Mapped[list["Request"]] = relationship(
        back_populates="author",
        uselist=True,
        foreign_keys="Request.author_user_id",
    )
    targeted_requests: Mapped[list["Request"]] = relationship(
        back_populates="target",
        uselist=True,
        foreign_keys="Request.target_user_id",
    )

    def to_dict(self) -> dict:
        return {
            "id": str(self.id),
            "username": getattr(
                self, "username", None
            ),  # непонятно где используется. оставил для безопасности
            "firstname": self.firstname,
            "messenger_id": self.messenger_id,
            "employee_number": self.employee_number,
            "points": self.points,
            "city_id": str(self.city_id) if self.city_id else None,
            "created_at": self.created_at.isoformat() if self.created_at else None,
            "updated_at": self.updated_at.isoformat() if self.updated_at else None,
        }

    @property
    def tags(self):
        return [ut.app_tag for ut in (self.user_tags or []) if ut.app_tag is not None]

--- src/backend/app/db/models/enums.py ---
from enum import Enum


class RoleType(Enum):
    user = "user"
    moderator = "moderator"
    admin = "admin"

--- src/backend/app/db/models/gosb.py ---
from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model

if TYPE_CHECKING:
    from .event import Event
    from .territorial_bank import TerritorialBank


class Gosb(Model):
    __tablename__ = "gosb"

    territorial_bank_id: Mapped[UUID] = mapped_column(
        ForeignKey("territorial_banks.id"),
        nullable=False,
    )
    name: Mapped[str] = mapped_column(String(150), nullable=False)
    address: Mapped[str | None] = mapped_column(String(255), nullable=True)
    city_name: Mapped[str | None] = mapped_column(String(150), nullable=True)

    territorial_bank: Mapped[TerritorialBank] = relationship(back_populates="gosb_list")
    events: Mapped[list[Event]] = relationship(back_populates="gosb")

--- src/backend/app/db/models/territorial_bank.py ---
from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model

if TYPE_CHECKING:
    from .gosb import Gosb
    from .project import Project


class TerritorialBank(Model):
    __tablename__ = "territorial_banks"

    name: Mapped[str] = mapped_column(String(150), nullable=False)
    region: Mapped[str | None] = mapped_column(String(150), nullable=True)

    gosb_list: Mapped[list[Gosb]] = relationship(back_populates="territorial_bank")
    projects: Mapped[list[Project]] = relationship(back_populates="territorial_bank")

--- src/backend/app/db/models/project.py ---
from __future__ import annotations

from typing import TYPE_CHECKING
from uuid import UUID

from sqlalchemy import Boolean, ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model

if TYPE_CHECKING:
    from .event import Event
    from .request import Request
    from .territorial_bank import TerritorialBank
    from .user import User


class Project(Model):
    __tablename__ = "projects"

    name: Mapped[str] = mapped_column(String(150), nullable=False)
    description: Mapped[str | None] = mapped_column(Text, nullable=True)
    leader_user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
    territorial_bank_id: Mapped[UUID] = mapped_column(
        ForeignKey("territorial_banks.id"),
        nullable=False,
    )
    is_active: Mapped[bool] = mapped_column(Boolean, nullable=False)

    leader: Mapped[User] = relationship(back_populates="led_projects")
    territorial_bank: Mapped[TerritorialBank] = relationship(back_populates="projects")
    events: Mapped[list[Event]] = relationship(back_populates="project")
    requests: Mapped[list[Request]] = relationship(back_populates="project")

--- src/backend/app/db/models/request.py ---
from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Any
from uuid import UUID

from sqlalchemy import DateTime, ForeignKey, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship

from .base import Model

if TYPE_CHECKING:
    from .project import Project
    from .user import User


class Request(Model):
    __tablename__ = "requests"

    type: Mapped[str] = mapped_column(String(50), nullable=False)
    author_user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
    target_user_id: Mapped[UUID | None] = mapped_column(ForeignKey("users.id"), nullable=True)
    project_id: Mapped[UUID | None] = mapped_column(ForeignKey("projects.id"), nullable=True)
    status: Mapped[str] = mapped_column(String(50), nullable=False)
    payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
    comment: Mapped[str | None] = mapped_column(Text, nullable=True)
    processed_at: Mapped[datetime.datetime | None] = mapped_column(
        DateTime(timezone=True),
        nullable=True,
    )

    author: Mapped[User] = relationship(
        back_populates="authored_requests",
        foreign_keys=[author_user_id],
    )
    target: Mapped[User] = relationship(
        back_populates="targeted_requests",
        foreign_keys=[target_user_id],
    )
    project: Mapped[Project | None] = relationship(back_populates="requests")

--- src/backend/app/policy/event_access.py ---
from __future__ import annotations

from ..db.models import Event, User


def _event_tb_id(event: Event | None) -> str | None:
    if event is None:
        return None

    gosb = getattr(event, "gosb", None)
    if gosb is None:
        return None

    tb_id = getattr(gosb, "territorial_bank_id", None)
    return str(tb_id) if tb_id is not None else None

def _role_value(role: object) -> str | None:
    if role is None:
        return None
    return str(getattr(role, "value", role))


def is_in_tb_scope(user: User | None, event: Event | None) -> bool:
    """
    читаем event -> gosb -> tb и сравниваем с tb юзера в случае, если он модератор
    совпадает -> true
    иначе -> false

    пока что заглушка, поскольку не реализована привязка tb <-> moderator
    """
    if user is None or event is None:
        return False

    if not is_moderator(user):
        return False

    if _event_tb_id(event) is None:
        return False

    return False # TODO вернуть: является ли пользователь модератором в _event_tb_id


def is_admin(user: User | None) -> bool:
    if user is None:
        return False
    return _role_value(getattr(user, "role", None)) == "admin"


def is_moderator(user: User | None) -> bool:
    if user is None:
        return False
    return _role_value(getattr(user, "role", None)) == "moderator"


def is_event_organizer(user: User | None, event: Event | None) -> bool:
    if user is None or event is None:
        return False

    user_id = str(getattr(user, "id", ""))
    if not user_id:
        return False

    # legacy fallback
    creator_id = getattr(event, "creator_id", None)
    if creator_id is not None and str(creator_id) == user_id:
        return True

    # target model
    for link in getattr(event, "event_organizers", []) or []:
        if str(getattr(link, "user_id", "")) == user_id:
            return True

    return False


def can_view_event_participants(user: User | None, event: Event | None) -> bool:
    return is_admin(user) or is_event_organizer(user, event) or is_in_tb_scope(user, event)


def can_create_event_report(user: User | None, event: Event | None) -> bool:
    return is_admin(user) or is_event_organizer(user, event)


def can_view_event_report(user: User | None, event: Event | None) -> bool:
    return is_admin(user) or is_in_tb_scope(user, event) or is_event_organizer(user, event)


def can_create_event(user: User | None) -> bool:
    return is_admin(user) or is_moderator(user)


def can_update_event(user: User | None, event: Event | None) -> bool:
    return is_admin(user) or is_event_organizer(user, event)


def can_delete_event(user: User | None, event: Event | None) -> bool:
    return is_admin(user) or is_event_organizer(user, event)

--- src/backend/app/services/report.py ---
from __future__ import annotations

from datetime import date
from typing import Any
from uuid import UUID

from ..dependencies.administrator import Administrator
from ..policy import can_create_event_report, can_view_event_report
from ..utils.errors import AuthError, ConflictError, NotFoundError


class ReportService:
    def __init__(self, administrator: Administrator):
        self.administrator = administrator

    async def create_report(
        self,
        *,
        event_id: UUID,
        requester_messenger_id: int,
        how_it_went: str,
        beneficiaries_count: int | None = None,
    ):
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            event = await admin.event.get_by_id(event_id)
            if not event:
                raise NotFoundError("Event not found")

            if not can_create_event_report(requester, event):
                raise AuthError("Access denied: only organizer or admin can create report")

            exists = await admin.report.get_by_event_id(event_id)
            if exists:
                raise ConflictError("Report already exists for this event")

            participants_count = await admin.participation.count_by_event_id(event_id)

            payload = {
                "event_id": event_id,
                "how_it_went": how_it_went,
                "beneficiaries_count": beneficiaries_count,
                "participants_count_snapshot": participants_count,
            }

            return await admin.report.insert(payload)

    async def get_report(
        self,
        *,
        event_id: UUID,
        requester_messenger_id: int,
    ):
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            event = await admin.event.get_by_id(event_id)
            if not event:
                raise NotFoundError("Event not found")

            if not can_view_event_report(requester, event):
                raise AuthError("Access denied")

            report = await admin.report.get_by_event_id(event_id)
            if not report:
                raise NotFoundError("Report not found")

            return report

    async def export_reports_rows(
        self,
        *,
        date_from: date | None = None,
        date_to: date | None = None,
    ) -> list[list[Any]]:
        """
        возвращает стоки для xlsx без шапки, по одному отчёту в строке.
        пока выгружаем только существующие отчёты
        """
        async with self.administrator.start() as admin:
            reports = await admin.report.list_for_export(date_from=date_from, date_to=date_to)

            rows: list[list[Any]] = []
            for r in reports:
                ev = getattr(r, "event", None)
                if ev is None:
                    continue

                dt = getattr(ev, "date", None)
                d = dt.date() if dt else None
                t = dt.time().replace(microsecond=0) if dt else None

                city_name = getattr(getattr(ev, "city", None), "name", None) or ""
                organizer = getattr(getattr(ev, "creator", None), "firstname", None) or ""
                tags = ", ".join(
                    [
                        getattr(tag, "name", "")
                        for tag in (getattr(ev, "tags", None) or [])
                        if getattr(tag, "name", None)
                    ]
                )

                rows.append(
                    [
                        getattr(ev, "name", "") or "",
                        city_name,
                        getattr(ev, "location", "") or "",
                        d,
                        t,
                        getattr(r, "participants_count_snapshot", 0) or 0,
                        getattr(ev, "points", 0) or 0,
                        tags,
                        organizer,
                        getattr(r, "beneficiaries_count", None),
                        getattr(r, "how_it_went", "") or "",
                    ]
                )

            return rows

--- src/backend/app/services/event.py ---
from typing import Any
from uuid import UUID

from ..dependencies.administrator import Administrator
from ..filters import FilterContext, FilterSpecification
from ..filters.filters.claimed import ClaimedEventsFilter
from ..filters.filters.organizer import OrganizerEventsFilter
from ..filters.filters.participant import ParticipantEventsFilter
from ..filters.filters.report import ReportEventsFilter
from ..filters.filters.tags import TagsEventsFilter
from ..policy import (
    can_create_event,
    can_delete_event,
    can_update_event,
    can_view_event_participants,
)
from ..schemas import (
    EventCreateSchema,
    EventSearchQuerySchema,
    EventSignOutSchema,
    EventSignUpSchema,
    EventUpdateSchema,
    ResponseEventParticipantSchema,
)
from ..utils.errors import AuthError, BadRequestError, ConflictError, NotFoundError
from ..utils.logger import logger


class EventService:
    def __init__(self, administrator: Administrator):
        self.administrator = administrator

    async def get_cities_events(self, city_id: UUID, offset: int, limit: int):
        async with self.administrator.start() as admin:
            return await admin.event.get_cities_events(city_id=city_id, offset=offset, limit=limit)

    async def search_events(self, query: EventSearchQuerySchema):
        logger.info(
            "search_events: city_id=%s offset=%s limit=%s cursor_date=%s cursor_id=%s requester_messenger_id=%s only_claimed=%s only_organizer=%s only_participant=%s has_report=%s tag_ids=%s",
            query.city_id,
            query.offset,
            query.limit,
            query.cursor_date,
            query.cursor_id,
            query.requester_messenger_id,
            query.only_claimed,
            query.only_organizer,
            query.only_participant,
            query.has_report,
            query.tag_ids,
        )

        # keyset cursor must be comlete
        if (query.cursor_date is None) ^ (query.cursor_id is None):
            raise BadRequestError("cursor_date and cursor_id must be provided together")

        needs_requester = bool(query.only_claimed or query.only_organizer or query.only_participant)
        if needs_requester and query.requester_messenger_id is None:
            raise BadRequestError(
                "requester_messenger_id is required for claimed/organizer/participant filters"
            )

        async with self.administrator.start() as admin:
            requester = None
            if query.requester_messenger_id is not None:
                requester = await admin.user.get_by_messenger_id(query.requester_messenger_id)
                if not requester:
                    raise NotFoundError("User not found")

            ctx_data: dict[str, Any] = {"tag_ids": query.tag_ids}
            if requester is not None:
                ctx_data["requester_user_id"] = getattr(requester, "id", None)
                ctx_data["requester_messenger_id"] = query.requester_messenger_id
            ctx = FilterContext(ctx_data)

            spec = FilterSpecification()
            spec.add(ClaimedEventsFilter(enabled=bool(query.only_claimed)))
            spec.add(OrganizerEventsFilter(enabled=bool(query.only_organizer)))
            spec.add(ParticipantEventsFilter(enabled=bool(query.only_participant)))
            spec.add(ReportEventsFilter(has_report=query.has_report))
            spec.add(TagsEventsFilter(tag_ids=list(query.tag_ids or [])))

            return await admin.event.get_cities_events_with_total(
                city_id=query.city_id,
                offset=query.offset,
                limit=query.limit,
                cursor_date=query.cursor_date,
                cursor_id=query.cursor_id,
                spec=spec,
                ctx=ctx,
            )

    async def get_by_id(self, event_id: UUID):
        async with self.administrator.start() as admin:
            return await admin.event.get_by_id(event_id)

    async def enter_code(self, participation_id: UUID, bonus_code: str):
        async with self.administrator.start() as admin:
            participation = await admin.participation.get_by_id(participation_id)
            if not participation:
                raise NotFoundError()
            if participation.is_claimed:
                raise BadRequestError("User already claimed points")

            event = await admin.event.get_by_id(participation.event_id)
            if event.bonus_code != bonus_code:
                raise BadRequestError("Codes do not match")

            await admin.participation.update(participation_id, {"is_claimed": True})
            return await admin.user.add_points(participation.user_id, event.points)

    async def sign_up(self, data: EventSignUpSchema):
        async with self.administrator.start() as admin:
            user = await admin.user.get_by_messenger_id(data.messenger_id)
            if not user:
                raise NotFoundError("User not found")

            participation = await admin.participation.get_participation(user.id, data.event_id)
            if participation:
                raise ConflictError()

            participation_data = {"user_id": user.id, "event_id": data.event_id}
            return await admin.participation.insert(participation_data)

    async def sign_out(self, data: EventSignOutSchema):
        async with self.administrator.start() as admin:
            participation = await admin.participation.get_by_id(data.participation_id)
            if not participation:
                raise NotFoundError()
            if participation.is_claimed:
                raise BadRequestError()

            return await admin.participation.delete(participation.id)

    async def get_users_events(self, user_id: int):
        async with self.administrator.start() as admin:
            user = await admin.user.get_by_messenger_id(user_id)
            return await admin.participation.get_user_participations(user.id)

    async def create_event(self, data: EventCreateSchema, requester_messenger_id: int):
        """
        создать мероприятие в БД.
        """
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            if not can_create_event(requester):
                raise AuthError("Access denied")

            event_data = {
                "creator_id": data.creator_id,
                "city_id": data.city_id,
                "name": data.name,
                "description": data.description,
                "location": data.location,
                "bonus_code": data.bonus_code,
                "points": data.points,
                "date": data.date,
            }
            event = await admin.event.insert(event_data)

            if data.app_tag_ids:
                await admin.event.set_app_tags(event.id, data.app_tag_ids)

            return await admin.event.get_by_id(event.id)

    async def delete_event(self, event_id: UUID, requester_messenger_id: int) -> bool:
        """
        удалить мероприятие по id.
        true - получилось удалить
        false - не получилось удалить
        """
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            event = await admin.event.get_by_id(event_id)
            if not event:
                return False

            if not can_delete_event(requester, event):
                raise AuthError("Access denied")

            await admin.event.delete(event_id)
            return True

    async def update_event(
        self,
        event_id: UUID,
        data: EventUpdateSchema,
        requester_messenger_id: int,
    ):
        """
        обновляем мероприятие,
        проверяем что количество поинтов адекватное
        """
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            event = await admin.event.get_by_id(event_id)
            if not event:
                raise NotFoundError("Event not found")

            if not can_update_event(requester, event):
                raise AuthError("Access denied")

            patch = data.model_dump(exclude_unset=True)
            tag_ids = patch.pop("app_tag_ids", None)

            if (
                "points" in patch
                and patch["points"] is not None
                and (patch["points"] <= 0 or patch["points"] >= 2**31)
            ):
                raise BadRequestError("Invalid points value")

            if patch:
                updated = await admin.event.update_fields(event_id, patch)
                if not updated:
                    raise BadRequestError("Nothing to update")

            if tag_ids is not None:
                await admin.event.set_app_tags(event_id, tag_ids)

            refreshed = await admin.event.get_by_id(event_id)
            if not refreshed:
                raise BadRequestError("Event not found")

            return refreshed

    async def get_event_participants(
        self,
        event_id: UUID,
        requester_messenger_id: int,
        offset: int = 0,
        limit: int = 50,
    ) -> list[ResponseEventParticipantSchema]:
        async with self.administrator.start() as admin:
            requester = await admin.user.get_by_messenger_id(requester_messenger_id)
            if not requester:
                raise NotFoundError("User not found")

            event = await admin.event.get_by_id(event_id)
            if not event:
                raise NotFoundError("Event not found")

            if not can_view_event_participants(requester, event):
                raise BadRequestError("Access denied: only organizer can view participants")
                # TODO replace with AuthError?

            parts = await admin.participation.get_event_participations(
                event_id=event_id,
                offset=offset,
                limit=limit,
            )

            out: list[ResponseEventParticipantSchema] = []
            for p in parts:
                u = getattr(p, "user", None)
                if not u:
                    continue

                out.append(
                    ResponseEventParticipantSchema(
                        participation_id=p.id,
                        is_claimed=bool(getattr(p, "is_claimed", False)),
                        user_id=u.id,
                        firstname=getattr(u, "firstname", None),
                        messenger_id=int(getattr(u, "messenger_id", 0)),
                        employee_number=int(getattr(u, "employee_number", 0)),
                        points=int(getattr(u, "points", 0)),
                    )
                )

            return out

--- src/backend/app/schemas/user.py ---
from enum import Enum
from uuid import UUID

from pydantic import BaseModel, ConfigDict, Field

from .app_tag import ResponseAppTagSchema
from .base import ChangeOnDelta, TimestampSchema, UpdateValidator
from .city import ResponseCitySchema
from .event import ResponseEventSchema


class RoleType(str, Enum):
    user = "user"
    moderator = "moderator"
    admin = "admin"


class BaseUser(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    firstname: str
    messenger_id: int
    employee_number: int
    role: RoleType
    points: int
    city_id: UUID


class RequestUserSchema(BaseUser):
    pass


class ResponseUserSchema(TimestampSchema):
    model_config = ConfigDict(from_attributes=True)
    id: UUID
    firstname: str | None = None
    messenger_id: int
    employee_number: int
    role: RoleType
    points: int
    city_id: UUID | None = None


class UpdateUserSchema(BaseModel, UpdateValidator):
    firstname: str | None = None
    role: RoleType | None = None
    points: ChangeOnDelta | None = None
    city_id: UUID | None = None


class ResponseDetailUserSchema(BaseUser, TimestampSchema):
    id: UUID
    city: ResponseCitySchema | None = None
    events: list[ResponseEventSchema] = Field(default_factory=list)
    tags: list[ResponseAppTagSchema] = Field(default_factory=list)


class UserRankSchema(BaseModel):
    rank: int

Error: 'alembic/versions' is not a valid file
Editor is loading...
Leave a Comment