Untitled

 avatar
unknown
plain_text
5 months ago
12 kB
4
Indexable
from datetime import datetime
from typing import List

from sqlalchemy import update, text
from strawberry import mutation, type

from app.graphql.schema import Info
from app.graphql.types.field_detail_type import (
    CreateFieldDetailsInput,
    DeleteFieldDetailsInput,
    FieldDetailType,
    UpdateFieldDetailsInput,
)
from app.models import FieldDetail, Validation

field_format_mapping = {"Uppercase": "U", "Lowercase": "L", "Titlecase": "T"}
procedure_error = "Procedure error occurred! View is not updated."

@type
class UpdateFieldDetailsMutationPayload:
    field_details: List[FieldDetailType]
    success: bool
    message: str = ""

@type
class CreateFieldDetailsMutationPayload:
    field_details: List[FieldDetailType]
    success: bool
    message: str = ""

@type
class DeleteFieldDetailsMutationPayload:
    field_details: List[FieldDetailType]
    success: bool
    message: str = ""

def _call_stored_procedure(session):
    """Call the stored procedure to update the product view."""
    try:
        session.execute(text("CALL jsamdm_qa.productViewSP()"))
        session.commit()
        return True
    except Exception as e:
        session.rollback()
        return False, f"Stored procedure error: {str(e)}"

def _update_select_field(field_detail, current_user, session):
    """Handle updates for select and select-multiple fields."""
    stmt = (
        update(FieldDetail)
        .where(FieldDetail.id == field_detail.id)
        .values(
            label=field_detail.label,
            type=field_detail.type,
            order=field_detail.order,
            options=field_detail.options if field_detail.options else None,
            rcvd_frm_reltio=field_detail.reltio_id if field_detail.reltio_id else None,
            is_active=field_detail.is_active,
            is_editable=field_detail.is_editable,
            is_visible=field_detail.is_visible,
            is_downstream=field_detail.is_downstream,
            is_published=field_detail.is_published,
            updated_date=datetime.now(),
            updated_by_id=current_user,
        )
    )
    session.execute(stmt)

def _update_text_field(field_detail, current_user, session):
    """Handle updates for text and textarea fields."""
    field_format_value = (
        field_format_mapping.get(field_detail.format) if field_detail.format else None
    )
    validation_record = (
        session.query(Validation).filter_by(type=field_detail.validation_value).first()
    )
    if validation_record:
        stmt = (
            update(FieldDetail)
            .where(FieldDetail.id == field_detail.id)
            .values(
                label=field_detail.label,
                type=field_detail.type,
                order=field_detail.order,
                format=field_format_value,
                validation_id=validation_record.id,
                rcvd_frm_reltio=field_detail.reltio_id if field_detail.reltio_id else None,
                is_active=field_detail.is_active,
                is_editable=field_detail.is_editable,
                is_visible=field_detail.is_visible,
                is_downstream=field_detail.is_downstream,
                is_published=field_detail.is_published,
                updated_date=datetime.now(),
                updated_by_id=current_user,
            )
        )
        session.execute(stmt)
        return True
    return False

@type
class FieldDetailsMutation:

    @mutation
    def update_field_details(
        self, info: Info, input: List[UpdateFieldDetailsInput]
    ) -> UpdateFieldDetailsMutationPayload:
        try:
            current_user = info.context.current_user.login
            session = info.context.session
            updated_field_details_ids = []

            for field_detail in input:
                if field_detail.type in ["select", "select-multiple"]:
                    _update_select_field(field_detail, current_user, session)
                elif field_detail.type in ["text", "textarea"]:
                    success = _update_text_field(field_detail, current_user, session)
                    if not success:
                        return UpdateFieldDetailsMutationPayload(
                            field_details=[],
                            success=False,
                            message=f"Validation type '{field_detail.validation_value}' not found",
                        )
                updated_field_details_ids.append(field_detail.id)

            session.commit()

            if not _call_stored_procedure(session):
                return UpdateFieldDetailsMutationPayload(
                    field_details=[], success=False, message=procedure_error
                )

            updated_field_details = (
                session.query(FieldDetail)
                .filter(FieldDetail.id.in_(updated_field_details_ids))
                .all()
            )

            return UpdateFieldDetailsMutationPayload(
                field_details=updated_field_details,
                success=True,
                message="Field details updated successfully",
            )

        except Exception as e:
            session.rollback()
            return UpdateFieldDetailsMutationPayload(
                field_details=[], success=False, message=f"Update failed: {str(e)}"
            )

    def _prepare_field_detail_defaults(self, current_user):
        """Returns default values for field details."""
        return dict(
            rcvd_frm_reltio=None,
            is_dynamic="Y",
            is_active="Y",
            is_editable="Y",
            is_deleted=0,
            is_visible="Y",
            is_downstream="Y",
            is_published="Y",
            default_option="Y",
            created_date=datetime.now(),
            created_by_id=current_user,
            updated_date=datetime.now(),
            updated_by_id=current_user,
        )

    def _get_validation_id(self, field_detail, session):
        """Fetches the validation ID(s) based on the field details."""
        validation_ids = []
        validation_record = session.query(Validation).filter_by(type=field_detail.validation_value).first()
        if validation_record:
            validation_ids.append(str(validation_record.id))
        if field_detail.is_mandatory == "Y":
            required_validation = session.query(Validation).filter_by(type="Required").first()
            if required_validation:
                validation_ids.append(str(required_validation.id))
        return ",".join(validation_ids) if validation_ids else None

    def _create_field_detail_instance(self, field_detail, field_defaults, validation_id, current_user):
        """Creates a FieldDetail instance by merging input with defaults."""
        field_format_value = field_format_mapping.get(field_detail.format) if field_detail.type in ["text", "textarea"] else None
        options_value = field_detail.options if field_detail.type in ["select", "select-multiple"] else None
        rcvd_frm_reltio = field_detail.reltio_id if field_detail.rcvd_frm_reltio == "Y" else None

        merged_field_detail = {**field_detail.__dict__, **field_defaults, "format": field_format_value,
                               "validation_id": validation_id, "options": options_value, "rcvd_frm_reltio": rcvd_frm_reltio}

        return FieldDetail(
            label=merged_field_detail["label"],
            name=merged_field_detail["name"],
            type=merged_field_detail["type"],
            order=merged_field_detail["order"],
            format=merged_field_detail["format"],
            validation_id=merged_field_detail["validation_id"],
            options=merged_field_detail["options"],
            rcvd_frm_reltio=merged_field_detail["rcvd_frm_reltio"],
            is_dynamic=merged_field_detail["is_dynamic"],
            is_active=merged_field_detail["is_active"],
            is_editable=merged_field_detail["is_editable"],
            is_deleted=merged_field_detail["is_deleted"],
            is_visible=merged_field_detail["is_visible"],
            is_downstream=merged_field_detail["is_downstream"],
            is_published=merged_field_detail["is_published"],
            default_option=merged_field_detail["default_option"],
            created_date=merged_field_detail["created_date"],
            created_by_id=merged_field_detail["created_by_id"],
            updated_date=merged_field_detail["updated_date"],
            updated_by_id=merged_field_detail["updated_by_id"],
        )

    @mutation
    def create_field_details(
        self, info: Info, input: List[CreateFieldDetailsInput]
    ) -> CreateFieldDetailsMutationPayload:
        try:
            current_user = info.context.current_user.login
            field_defaults = self._prepare_field_detail_defaults(current_user)
            created_field_details_ids = []

            for field_detail in input:
                validation_id = self._get_validation_id(field_detail, info.context.session)
                new_field_detail = self._create_field_detail_instance(field_detail, field_defaults, validation_id, current_user)
                
                info.context.session.add(new_field_detail)
                info.context.session.flush()
                created_field_details_ids.append(new_field_detail.id)

            info.context.session.commit()

            if not _call_stored_procedure(info.context.session):
                return CreateFieldDetailsMutationPayload(
                    field_details=[], success=False, message=procedure_error
                )

            created_field_details = (
                info.context.session.query(FieldDetail)
                .filter(FieldDetail.id.in_(created_field_details_ids))
                .all()
            )

            return CreateFieldDetailsMutationPayload(
                field_details=created_field_details,
                success=True,
                message="Field details created successfully",
            )

        except Exception as e:
            info.context.session.rollback()
            return CreateFieldDetailsMutationPayload(
                field_details=[], success=False, message=f"Creation failed: {str(e)}"
            )

    def _delete_field_detail(self, field_detail_id, session):
        """Marks a field detail as deleted in the database."""
        stmt = update(FieldDetail).where(FieldDetail.id == field_detail_id).values(is_deleted=True)
        session.execute(stmt)

    @mutation
    def delete_field_details(
        self, info: Info, input: List[DeleteFieldDetailsInput]
    ) -> DeleteFieldDetailsMutationPayload:
        try:
            deleted_field_details = []

            for field_detail in input:
                self._delete_field_detail(field_detail.id, info.context.session)
                deleted_field_detail = info.context.session.query(FieldDetail).get(field_detail.id)
                deleted_field_details.append(deleted_field_detail)

            info.context.session.commit()

            if not _call_stored_procedure(info.context.session):
                return DeleteFieldDetailsMutationPayload(
                    field_details=[], success=False, message=procedure_error
                )

            if deleted_field_details:
                return DeleteFieldDetailsMutationPayload(
                    field_details=deleted_field_details,
                    success=True,
                    message=f"{len(deleted_field_details)} field detail(s) deleted successfully",
                )
            else:
                return DeleteFieldDetailsMutationPayload(
                    field_details=[], success=False, message="No field detail was deleted"
                )

        except Exception as e:
            info.context.session.rollback()
            return DeleteFieldDetailsMutationPayload(
                field_details=[], success=False, message=f"Deletion failed: {str(e)}"
            )
Editor is loading...
Leave a Comment