Untitled
4ae4d
plain_text
2 months ago
28 kB
5
Indexable
f=src/backend/app/db/models/user.py
f=src/backend/app/db/models/enums.py
f=src/backend/app/db/models/gosb.py
f=src/backend/app/db/models/territorial_bank.py
f=src/backend/app/db/models/project.py
f=src/backend/app/db/models/request.py
f=src/backend/app/policy/event_access.py
f=src/backend/app/services/report.py
f=src/backend/app/services/event.py
f=src/backend/app/schemas/user.py
--- src/backend/app/db/models/user.py ---
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import BigInteger, ForeignKey, Integer, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Model
from .enums import RoleType
if TYPE_CHECKING:
from .city import City
from .custom_tag import CustomTag
from .event import Event
from .event_organizer import EventOrganizer
from .participation import Participation
from .project import Project
from .request import Request
from .user_tag import UserTag
class User(Model):
__tablename__ = "users"
firstname: Mapped[str | None] = mapped_column(String(100), nullable=True)
messenger_id: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False)
employee_number: Mapped[int] = mapped_column(BigInteger(), unique=True, nullable=False)
role: Mapped[str] = mapped_column(Text, default=RoleType.user.value)
points: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
city_id: Mapped[UUID | None] = mapped_column(ForeignKey("cities.id"), nullable=True)
# Relationships
city: Mapped["City"] = relationship(back_populates="users")
custom_tags: Mapped[list["CustomTag"]] = relationship(
back_populates="user", cascade="all, delete-orphan", uselist=True
)
user_tags: Mapped[list["UserTag"]] = relationship(
back_populates="user", cascade="all, delete-orphan", uselist=True
)
participations: Mapped[list["Participation"]] = relationship(
back_populates="user", cascade="all, delete-orphan", uselist=True
)
events: Mapped[list["Event"]] = relationship(
back_populates="creator", cascade="all, delete-orphan", uselist=True
)
organized_events_links: Mapped[list["EventOrganizer"]] = relationship(
back_populates="user", cascade="all, delete-orphan", uselist=True
)
led_projects: Mapped[list["Project"]] = relationship(back_populates="leader", uselist=True)
authored_requests: Mapped[list["Request"]] = relationship(
back_populates="author",
uselist=True,
foreign_keys="Request.author_user_id",
)
targeted_requests: Mapped[list["Request"]] = relationship(
back_populates="target",
uselist=True,
foreign_keys="Request.target_user_id",
)
def to_dict(self) -> dict:
return {
"id": str(self.id),
"username": getattr(
self, "username", None
), # непонятно где используется. оставил для безопасности
"firstname": self.firstname,
"messenger_id": self.messenger_id,
"employee_number": self.employee_number,
"points": self.points,
"city_id": str(self.city_id) if self.city_id else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
@property
def tags(self):
return [ut.app_tag for ut in (self.user_tags or []) if ut.app_tag is not None]
--- src/backend/app/db/models/enums.py ---
from enum import Enum
class RoleType(Enum):
user = "user"
moderator = "moderator"
admin = "admin"
--- src/backend/app/db/models/gosb.py ---
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Model
if TYPE_CHECKING:
from .event import Event
from .territorial_bank import TerritorialBank
class Gosb(Model):
__tablename__ = "gosb"
territorial_bank_id: Mapped[UUID] = mapped_column(
ForeignKey("territorial_banks.id"),
nullable=False,
)
name: Mapped[str] = mapped_column(String(150), nullable=False)
address: Mapped[str | None] = mapped_column(String(255), nullable=True)
city_name: Mapped[str | None] = mapped_column(String(150), nullable=True)
territorial_bank: Mapped[TerritorialBank] = relationship(back_populates="gosb_list")
events: Mapped[list[Event]] = relationship(back_populates="gosb")
--- src/backend/app/db/models/territorial_bank.py ---
from __future__ import annotations
from typing import TYPE_CHECKING
from sqlalchemy import String
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Model
if TYPE_CHECKING:
from .gosb import Gosb
from .project import Project
class TerritorialBank(Model):
__tablename__ = "territorial_banks"
name: Mapped[str] = mapped_column(String(150), nullable=False)
region: Mapped[str | None] = mapped_column(String(150), nullable=True)
gosb_list: Mapped[list[Gosb]] = relationship(back_populates="territorial_bank")
projects: Mapped[list[Project]] = relationship(back_populates="territorial_bank")
--- src/backend/app/db/models/project.py ---
from __future__ import annotations
from typing import TYPE_CHECKING
from uuid import UUID
from sqlalchemy import Boolean, ForeignKey, String, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Model
if TYPE_CHECKING:
from .event import Event
from .request import Request
from .territorial_bank import TerritorialBank
from .user import User
class Project(Model):
__tablename__ = "projects"
name: Mapped[str] = mapped_column(String(150), nullable=False)
description: Mapped[str | None] = mapped_column(Text, nullable=True)
leader_user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
territorial_bank_id: Mapped[UUID] = mapped_column(
ForeignKey("territorial_banks.id"),
nullable=False,
)
is_active: Mapped[bool] = mapped_column(Boolean, nullable=False)
leader: Mapped[User] = relationship(back_populates="led_projects")
territorial_bank: Mapped[TerritorialBank] = relationship(back_populates="projects")
events: Mapped[list[Event]] = relationship(back_populates="project")
requests: Mapped[list[Request]] = relationship(back_populates="project")
--- src/backend/app/db/models/request.py ---
from __future__ import annotations
import datetime
from typing import TYPE_CHECKING, Any
from uuid import UUID
from sqlalchemy import DateTime, ForeignKey, String, Text
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Mapped, mapped_column, relationship
from .base import Model
if TYPE_CHECKING:
from .project import Project
from .user import User
class Request(Model):
__tablename__ = "requests"
type: Mapped[str] = mapped_column(String(50), nullable=False)
author_user_id: Mapped[UUID] = mapped_column(ForeignKey("users.id"), nullable=False)
target_user_id: Mapped[UUID | None] = mapped_column(ForeignKey("users.id"), nullable=True)
project_id: Mapped[UUID | None] = mapped_column(ForeignKey("projects.id"), nullable=True)
status: Mapped[str] = mapped_column(String(50), nullable=False)
payload: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False)
comment: Mapped[str | None] = mapped_column(Text, nullable=True)
processed_at: Mapped[datetime.datetime | None] = mapped_column(
DateTime(timezone=True),
nullable=True,
)
author: Mapped[User] = relationship(
back_populates="authored_requests",
foreign_keys=[author_user_id],
)
target: Mapped[User] = relationship(
back_populates="targeted_requests",
foreign_keys=[target_user_id],
)
project: Mapped[Project | None] = relationship(back_populates="requests")
--- src/backend/app/policy/event_access.py ---
from __future__ import annotations
from ..db.models import Event, User
def _event_tb_id(event: Event | None) -> str | None:
if event is None:
return None
gosb = getattr(event, "gosb", None)
if gosb is None:
return None
tb_id = getattr(gosb, "territorial_bank_id", None)
return str(tb_id) if tb_id is not None else None
def _role_value(role: object) -> str | None:
if role is None:
return None
return str(getattr(role, "value", role))
def is_in_tb_scope(user: User | None, event: Event | None) -> bool:
"""
читаем event -> gosb -> tb и сравниваем с tb юзера в случае, если он модератор
совпадает -> true
иначе -> false
пока что заглушка, поскольку не реализована привязка tb <-> moderator
"""
if user is None or event is None:
return False
if not is_moderator(user):
return False
if _event_tb_id(event) is None:
return False
return False # TODO вернуть: является ли пользователь модератором в _event_tb_id
def is_admin(user: User | None) -> bool:
if user is None:
return False
return _role_value(getattr(user, "role", None)) == "admin"
def is_moderator(user: User | None) -> bool:
if user is None:
return False
return _role_value(getattr(user, "role", None)) == "moderator"
def is_event_organizer(user: User | None, event: Event | None) -> bool:
if user is None or event is None:
return False
user_id = str(getattr(user, "id", ""))
if not user_id:
return False
# legacy fallback
creator_id = getattr(event, "creator_id", None)
if creator_id is not None and str(creator_id) == user_id:
return True
# target model
for link in getattr(event, "event_organizers", []) or []:
if str(getattr(link, "user_id", "")) == user_id:
return True
return False
def can_view_event_participants(user: User | None, event: Event | None) -> bool:
return is_admin(user) or is_event_organizer(user, event) or is_in_tb_scope(user, event)
def can_create_event_report(user: User | None, event: Event | None) -> bool:
return is_admin(user) or is_event_organizer(user, event)
def can_view_event_report(user: User | None, event: Event | None) -> bool:
return is_admin(user) or is_in_tb_scope(user, event) or is_event_organizer(user, event)
def can_create_event(user: User | None) -> bool:
return is_admin(user) or is_moderator(user)
def can_update_event(user: User | None, event: Event | None) -> bool:
return is_admin(user) or is_event_organizer(user, event)
def can_delete_event(user: User | None, event: Event | None) -> bool:
return is_admin(user) or is_event_organizer(user, event)
--- src/backend/app/services/report.py ---
from __future__ import annotations
from datetime import date
from typing import Any
from uuid import UUID
from ..dependencies.administrator import Administrator
from ..policy import can_create_event_report, can_view_event_report
from ..utils.errors import AuthError, ConflictError, NotFoundError
class ReportService:
def __init__(self, administrator: Administrator):
self.administrator = administrator
async def create_report(
self,
*,
event_id: UUID,
requester_messenger_id: int,
how_it_went: str,
beneficiaries_count: int | None = None,
):
async with self.administrator.start() as admin:
requester = await admin.user.get_by_messenger_id(requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
event = await admin.event.get_by_id(event_id)
if not event:
raise NotFoundError("Event not found")
if not can_create_event_report(requester, event):
raise AuthError("Access denied: only organizer or admin can create report")
exists = await admin.report.get_by_event_id(event_id)
if exists:
raise ConflictError("Report already exists for this event")
participants_count = await admin.participation.count_by_event_id(event_id)
payload = {
"event_id": event_id,
"how_it_went": how_it_went,
"beneficiaries_count": beneficiaries_count,
"participants_count_snapshot": participants_count,
}
return await admin.report.insert(payload)
async def get_report(
self,
*,
event_id: UUID,
requester_messenger_id: int,
):
async with self.administrator.start() as admin:
requester = await admin.user.get_by_messenger_id(requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
event = await admin.event.get_by_id(event_id)
if not event:
raise NotFoundError("Event not found")
if not can_view_event_report(requester, event):
raise AuthError("Access denied")
report = await admin.report.get_by_event_id(event_id)
if not report:
raise NotFoundError("Report not found")
return report
async def export_reports_rows(
self,
*,
date_from: date | None = None,
date_to: date | None = None,
) -> list[list[Any]]:
"""
возвращает стоки для xlsx без шапки, по одному отчёту в строке.
пока выгружаем только существующие отчёты
"""
async with self.administrator.start() as admin:
reports = await admin.report.list_for_export(date_from=date_from, date_to=date_to)
rows: list[list[Any]] = []
for r in reports:
ev = getattr(r, "event", None)
if ev is None:
continue
dt = getattr(ev, "date", None)
d = dt.date() if dt else None
t = dt.time().replace(microsecond=0) if dt else None
city_name = getattr(getattr(ev, "city", None), "name", None) or ""
organizer = getattr(getattr(ev, "creator", None), "firstname", None) or ""
tags = ", ".join(
[
getattr(tag, "name", "")
for tag in (getattr(ev, "tags", None) or [])
if getattr(tag, "name", None)
]
)
rows.append(
[
getattr(ev, "name", "") or "",
city_name,
getattr(ev, "location", "") or "",
d,
t,
getattr(r, "participants_count_snapshot", 0) or 0,
getattr(ev, "points", 0) or 0,
tags,
organizer,
getattr(r, "beneficiaries_count", None),
getattr(r, "how_it_went", "") or "",
]
)
return rows
--- src/backend/app/services/event.py ---
from typing import Any
from uuid import UUID
from ..dependencies.administrator import Administrator
from ..filters import FilterContext, FilterSpecification
from ..filters.filters.claimed import ClaimedEventsFilter
from ..filters.filters.organizer import OrganizerEventsFilter
from ..filters.filters.participant import ParticipantEventsFilter
from ..filters.filters.report import ReportEventsFilter
from ..filters.filters.tags import TagsEventsFilter
from ..policy import (
can_create_event,
can_delete_event,
can_update_event,
can_view_event_participants,
)
from ..schemas import (
EventCreateSchema,
EventSearchQuerySchema,
EventSignOutSchema,
EventSignUpSchema,
EventUpdateSchema,
ResponseEventParticipantSchema,
)
from ..utils.errors import AuthError, BadRequestError, ConflictError, NotFoundError
from ..utils.logger import logger
class EventService:
def __init__(self, administrator: Administrator):
self.administrator = administrator
async def get_cities_events(self, city_id: UUID, offset: int, limit: int):
async with self.administrator.start() as admin:
return await admin.event.get_cities_events(city_id=city_id, offset=offset, limit=limit)
async def search_events(self, query: EventSearchQuerySchema):
logger.info(
"search_events: city_id=%s offset=%s limit=%s cursor_date=%s cursor_id=%s requester_messenger_id=%s only_claimed=%s only_organizer=%s only_participant=%s has_report=%s tag_ids=%s",
query.city_id,
query.offset,
query.limit,
query.cursor_date,
query.cursor_id,
query.requester_messenger_id,
query.only_claimed,
query.only_organizer,
query.only_participant,
query.has_report,
query.tag_ids,
)
# keyset cursor must be comlete
if (query.cursor_date is None) ^ (query.cursor_id is None):
raise BadRequestError("cursor_date and cursor_id must be provided together")
needs_requester = bool(query.only_claimed or query.only_organizer or query.only_participant)
if needs_requester and query.requester_messenger_id is None:
raise BadRequestError(
"requester_messenger_id is required for claimed/organizer/participant filters"
)
async with self.administrator.start() as admin:
requester = None
if query.requester_messenger_id is not None:
requester = await admin.user.get_by_messenger_id(query.requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
ctx_data: dict[str, Any] = {"tag_ids": query.tag_ids}
if requester is not None:
ctx_data["requester_user_id"] = getattr(requester, "id", None)
ctx_data["requester_messenger_id"] = query.requester_messenger_id
ctx = FilterContext(ctx_data)
spec = FilterSpecification()
spec.add(ClaimedEventsFilter(enabled=bool(query.only_claimed)))
spec.add(OrganizerEventsFilter(enabled=bool(query.only_organizer)))
spec.add(ParticipantEventsFilter(enabled=bool(query.only_participant)))
spec.add(ReportEventsFilter(has_report=query.has_report))
spec.add(TagsEventsFilter(tag_ids=list(query.tag_ids or [])))
return await admin.event.get_cities_events_with_total(
city_id=query.city_id,
offset=query.offset,
limit=query.limit,
cursor_date=query.cursor_date,
cursor_id=query.cursor_id,
spec=spec,
ctx=ctx,
)
async def get_by_id(self, event_id: UUID):
async with self.administrator.start() as admin:
return await admin.event.get_by_id(event_id)
async def enter_code(self, participation_id: UUID, bonus_code: str):
async with self.administrator.start() as admin:
participation = await admin.participation.get_by_id(participation_id)
if not participation:
raise NotFoundError()
if participation.is_claimed:
raise BadRequestError("User already claimed points")
event = await admin.event.get_by_id(participation.event_id)
if event.bonus_code != bonus_code:
raise BadRequestError("Codes do not match")
await admin.participation.update(participation_id, {"is_claimed": True})
return await admin.user.add_points(participation.user_id, event.points)
async def sign_up(self, data: EventSignUpSchema):
async with self.administrator.start() as admin:
user = await admin.user.get_by_messenger_id(data.messenger_id)
if not user:
raise NotFoundError("User not found")
participation = await admin.participation.get_participation(user.id, data.event_id)
if participation:
raise ConflictError()
participation_data = {"user_id": user.id, "event_id": data.event_id}
return await admin.participation.insert(participation_data)
async def sign_out(self, data: EventSignOutSchema):
async with self.administrator.start() as admin:
participation = await admin.participation.get_by_id(data.participation_id)
if not participation:
raise NotFoundError()
if participation.is_claimed:
raise BadRequestError()
return await admin.participation.delete(participation.id)
async def get_users_events(self, user_id: int):
async with self.administrator.start() as admin:
user = await admin.user.get_by_messenger_id(user_id)
return await admin.participation.get_user_participations(user.id)
async def create_event(self, data: EventCreateSchema, requester_messenger_id: int):
"""
создать мероприятие в БД.
"""
async with self.administrator.start() as admin:
requester = await admin.user.get_by_messenger_id(requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
if not can_create_event(requester):
raise AuthError("Access denied")
event_data = {
"creator_id": data.creator_id,
"city_id": data.city_id,
"name": data.name,
"description": data.description,
"location": data.location,
"bonus_code": data.bonus_code,
"points": data.points,
"date": data.date,
}
event = await admin.event.insert(event_data)
if data.app_tag_ids:
await admin.event.set_app_tags(event.id, data.app_tag_ids)
return await admin.event.get_by_id(event.id)
async def delete_event(self, event_id: UUID, requester_messenger_id: int) -> bool:
"""
удалить мероприятие по id.
true - получилось удалить
false - не получилось удалить
"""
async with self.administrator.start() as admin:
requester = await admin.user.get_by_messenger_id(requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
event = await admin.event.get_by_id(event_id)
if not event:
return False
if not can_delete_event(requester, event):
raise AuthError("Access denied")
await admin.event.delete(event_id)
return True
async def update_event(
self,
event_id: UUID,
data: EventUpdateSchema,
requester_messenger_id: int,
):
"""
обновляем мероприятие,
проверяем что количество поинтов адекватное
"""
async with self.administrator.start() as admin:
requester = await admin.user.get_by_messenger_id(requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
event = await admin.event.get_by_id(event_id)
if not event:
raise NotFoundError("Event not found")
if not can_update_event(requester, event):
raise AuthError("Access denied")
patch = data.model_dump(exclude_unset=True)
tag_ids = patch.pop("app_tag_ids", None)
if (
"points" in patch
and patch["points"] is not None
and (patch["points"] <= 0 or patch["points"] >= 2**31)
):
raise BadRequestError("Invalid points value")
if patch:
updated = await admin.event.update_fields(event_id, patch)
if not updated:
raise BadRequestError("Nothing to update")
if tag_ids is not None:
await admin.event.set_app_tags(event_id, tag_ids)
refreshed = await admin.event.get_by_id(event_id)
if not refreshed:
raise BadRequestError("Event not found")
return refreshed
async def get_event_participants(
self,
event_id: UUID,
requester_messenger_id: int,
offset: int = 0,
limit: int = 50,
) -> list[ResponseEventParticipantSchema]:
async with self.administrator.start() as admin:
requester = await admin.user.get_by_messenger_id(requester_messenger_id)
if not requester:
raise NotFoundError("User not found")
event = await admin.event.get_by_id(event_id)
if not event:
raise NotFoundError("Event not found")
if not can_view_event_participants(requester, event):
raise BadRequestError("Access denied: only organizer can view participants")
# TODO replace with AuthError?
parts = await admin.participation.get_event_participations(
event_id=event_id,
offset=offset,
limit=limit,
)
out: list[ResponseEventParticipantSchema] = []
for p in parts:
u = getattr(p, "user", None)
if not u:
continue
out.append(
ResponseEventParticipantSchema(
participation_id=p.id,
is_claimed=bool(getattr(p, "is_claimed", False)),
user_id=u.id,
firstname=getattr(u, "firstname", None),
messenger_id=int(getattr(u, "messenger_id", 0)),
employee_number=int(getattr(u, "employee_number", 0)),
points=int(getattr(u, "points", 0)),
)
)
return out
--- src/backend/app/schemas/user.py ---
from enum import Enum
from uuid import UUID
from pydantic import BaseModel, ConfigDict, Field
from .app_tag import ResponseAppTagSchema
from .base import ChangeOnDelta, TimestampSchema, UpdateValidator
from .city import ResponseCitySchema
from .event import ResponseEventSchema
class RoleType(str, Enum):
user = "user"
moderator = "moderator"
admin = "admin"
class BaseUser(BaseModel):
model_config = ConfigDict(from_attributes=True)
firstname: str
messenger_id: int
employee_number: int
role: RoleType
points: int
city_id: UUID
class RequestUserSchema(BaseUser):
pass
class ResponseUserSchema(TimestampSchema):
model_config = ConfigDict(from_attributes=True)
id: UUID
firstname: str | None = None
messenger_id: int
employee_number: int
role: RoleType
points: int
city_id: UUID | None = None
class UpdateUserSchema(BaseModel, UpdateValidator):
firstname: str | None = None
role: RoleType | None = None
points: ChangeOnDelta | None = None
city_id: UUID | None = None
class ResponseDetailUserSchema(BaseUser, TimestampSchema):
id: UUID
city: ResponseCitySchema | None = None
events: list[ResponseEventSchema] = Field(default_factory=list)
tags: list[ResponseAppTagSchema] = Field(default_factory=list)
class UserRankSchema(BaseModel):
rank: int
Error: 'alembic/versions' is not a valid file
Editor is loading...
Leave a Comment