Untitled

 avatar
unknown
plain_text
5 months ago
13 kB
4
Indexable
import json
import logging
import traceback
from collections import OrderedDict
from datetime import datetime
from enum import Enum
from typing import Any, Dict, List, Optional

from sqlalchemy import select, text
from sqlalchemy.exc import SQLAlchemyError
from strawberry import enum, input, mutation, type

from app.graphql.schema import Info
from app.graphql.types import DownstreamProductType, UserRoleType, UserType
from app.graphql.types.product_type import ProductDataInput, ProductType
from app.models import (
    Downstream,
    DownstreamOptionPublish,
    DownstreamProduct,
    DownstreamPublish,
    FieldDetail,
)
from app.models.dynamic_view import load_view

logger = logging.getLogger(__name__)


@type
class SaveProductDownstreamPayload:
    updated_products_downstream: List[DownstreamProductType]
    success: bool
    message: str


def call_procedure_and_get_view_data(
    info: Info,
    filter_map: Optional[Dict[str, str]] = None,
    record_id: Optional[str] = None,
) -> List[Dict[str, Any]]:
    try:
        # Debugging statement before calling the procedure
        logger.debug("Calling stored procedure productDownstreamViewSP()")

        # Call the stored procedure
        info.context.session.execute(text("CALL productDownstreamViewSP()"))
        info.context.session.commit()

        # Debugging statement after calling the procedure
        logger.debug("Stored procedure called successfully")

        downstream_product_view = load_view(
            "mdm_downstream_product_view",
            info.context.session.connection(),
        )
        query = select(downstream_product_view)

        if record_id:
            query = query.where(downstream_product_view.columns.RECORD_ID == record_id)

        if filter_map:
            for key, value in filter_map.items():
                query = query.where(
                    downstream_product_view.columns[key].like(f"%{value.lower()}%")
                )

        # Fetch data from the view
        logger.debug(f"Query: {query} {query.compile()} {query.params} {record_id=}")
        result = info.context.session.execute(query)
        data = downstream_product_view.result_as_json(result)
        logger.debug(f"Result: {data}")
        return data

    except Exception:
        logger.exception("Unexpected error in call_procedure_and_get_view_data()")
        return []


def save_all_published_record(
    info: Info, record_id_array: List[str], current_user: str
):
    publish_no = (
        info.context.session.query(FieldDetail)
        .filter(FieldDetail.is_published == "N")
        .all()
    )

    for record_id in record_id_array:
        # Call the procedure and get data from the view for the specific record_id
        view_data = call_procedure_and_get_view_data(info, None, record_id)

        # Only proceed if view_data is present
        if view_data:
            # Print the view data for debugging
            logger.debug(f"View data for record_id {record_id}: {view_data}")

            # Initialize the list to store dynamic fields
            data_map_list = []

            # Since the data is already filtered by record_id, we can directly use it
            data_dict = view_data[0]  # Assuming there's only one record per record_id

            for field in publish_no:
                data_dict.pop(field.name, None)
            data_dict.pop("MDM_ID_2", None)
            data_dict.pop("RECORD_ID_2", None)

            # Convert datetime objects to strings
            for key, value in data_dict.items():
                if isinstance(value, datetime):
                    data_dict[key] = value.isoformat()

            # Extract dynamic fields and add to the list
            dynamic_fields = {
                key: value for key, value in data_dict.items() if key.startswith("DWN_")
            }
            data_map_list.append(dynamic_fields)
            logger.debug(f"DATA MAP LIST: {data_map_list}")

            # Use OrderedDict to maintain the order of fields
            ordered_data_dict = OrderedDict(sorted(data_dict.items()))

            data_json = json.dumps(ordered_data_dict)

            new_downstream = DownstreamPublish(
                record_id=record_id,
                dwn_strm_data=data_json,
                created_date=datetime.now(),
                created_by_id=current_user,
                updated_date=datetime.now(),
                updated_by_id=current_user,
            )
            info.context.session.add(new_downstream)
            info.context.session.flush()
            info.context.session.refresh(new_downstream)

            down_id_map = {
                key[4:]: value
                for key, value in data_dict.items()
                if key.startswith("DWN_") and value == "Y"
            }
            save_to_mdm_downstream_option_publish(
                info.context,
                new_downstream.id,
                down_id_map,
            )


def save_to_mdm_downstream_option_publish(
    context,
    saved_id: int,
    down_id_map: Dict[str, str],
) -> bool:
    try:
        for down_id, value in down_id_map.items():
            # Query to get the Downstream object
            publish_option = (
                context.session.query(Downstream)
                .filter(Downstream.id == int(down_id))
                .all()
            )

            if publish_option:
                pub_obj = publish_option[0]

                # Create a new DownStreamOptionPublish object
                dwn_option_publish = DownstreamOptionPublish(
                    dwn_strm_publish_id=saved_id, dwn_strm_name=pub_obj.name
                )

                # Save the object to the database
                context.session.add(dwn_option_publish)
                context.session.commit()
        return True
    except Exception:
        logger.exception("Exception in save_to_mdm_downstream_option_publish()")
        return False


def _process_row(info: Info, row: Any) -> None:
    for downstream_product in row.downstream_products or []:
        _process_downstream_product(info, row.mdm_id, downstream_product)


def _process_downstream_product(
    info: Info, mdm_id: str, downstream_product: Any
) -> None:
    dwn_strm_id = downstream_product.dwn_strm_id
    is_active = downstream_product.is_checked == "Yes"

    existing_entry = (
        info.context.session.query(DownstreamProduct)
        .filter(
            DownstreamProduct.mdm_id == mdm_id,
            DownstreamProduct.dwn_strm_id == dwn_strm_id,
        )
        .first()
    )

    if is_active:
        _activate_or_create_entry(info, existing_entry, mdm_id, dwn_strm_id)
    elif existing_entry:
        _deactivate_entry(info, existing_entry)


def _activate_or_create_entry(
    info: Info, existing_entry: DownstreamProduct, mdm_id: str, dwn_strm_id: str
) -> None:
    current_user = info.context.current_user.login
    now = datetime.now()

    if existing_entry:
        if not existing_entry.is_active:
            existing_entry.is_active = True
            existing_entry.updated_by_id = current_user
            existing_entry.updated_date = now
            info.context.session.commit()
    else:
        new_entry = DownstreamProduct(
            dwn_strm_id=dwn_strm_id,
            mdm_id=mdm_id,
            created_by_id=current_user,
            updated_by_id=current_user,
            is_active=True,
            created_date=now,
            updated_date=now,
        )
        info.context.session.add(new_entry)
        info.context.session.commit()


def _deactivate_entry(info: Info, existing_entry: DownstreamProduct) -> None:
    existing_entry.is_active = False
    existing_entry.updated_by_id = info.context.current_user.login
    existing_entry.updated_date = datetime.now()
    info.context.session.commit()


def _fetch_downstream_products(info: Info) -> list:
    return info.context.session.query(DownstreamProduct).all()


def _handle_exception(message: str) -> SaveProductDownstreamPayload:
    return SaveProductDownstreamPayload(
        updated_products_downstream=[],
        success=False,
        message=message,
    )


@type
class ProductDownstreamAssociation:
    @mutation
    def product_down_stream_submit(
        self, info: Info, input: ProductDataInput
    ) -> SaveProductDownstreamPayload:
        try:
            rows_data = input.rows_data
            record_id_array = [row.record_id for row in rows_data]

            for row in rows_data:
                _process_row(info, row)

            updated_products_downstream = _fetch_downstream_products(info)

            mdm_id = row.mdm_id
            record_id = row.record_id
            record_id_array.append(record_id)

            if row.downstream_products:
                for downstream_product in row.downstream_products:
                    logger.debug(f"DOWNSTREAM IDS: {downstream_product}")
                    dwn_stream_id = downstream_product.dwn_strm_id
                    down_id = "Y" if downstream_product.is_checked == "Yes" else "N"

                    logger.critical(
                        f"product_down_stream_submit {mdm_id=} {dwn_stream_id=} {down_id=}"
                    )
                    existing_entry = (
                        info.context.session.query(DownstreamProduct)
                        .filter(
                            DownstreamProduct.mdm_id == mdm_id,
                            DownstreamProduct.dwn_strm_id == dwn_stream_id,
                        )
                        .first()
                    )

                    if down_id == "Y":
                        if existing_entry:
                            if not existing_entry.is_active:
                                existing_entry.is_active = True
                                existing_entry.updated_by_id = (
                                    info.context.current_user.login
                                )
                                existing_entry.updated_date = datetime.now()
                                info.context.session.commit()
                        else:
                            downstream_product = DownstreamProduct(
                                dwn_strm_id=dwn_stream_id,
                                mdm_id=mdm_id,
                                created_by_id=info.context.current_user.login,
                                updated_by_id=info.context.current_user.login,
                                is_active=True,
                                created_date=datetime.now(),
                                updated_date=datetime.now(),
                            )
                            info.context.session.add(downstream_product)
                            info.context.session.commit()
                    elif down_id == "N" and existing_entry:
                        existing_entry.is_active = False
                        existing_entry.updated_by_id = info.context.current_user.login
                        existing_entry.updated_date = datetime.now()
                        info.context.session.commit()

            product_downstream_data = info.context.session.query(
                DownstreamProduct
            ).all()
            updated_products_downstream.extend(product_downstream_data)
            logger.debug(
                f"UPDATED DOWNSTREAM PRODUCT DATA: {updated_products_downstream}
            )
            save_all_published_record(
                info, record_id_array, info.context.current_user.login
            )

            return SaveProductDownstreamPayload(
                updated_products_downstream=updated_products_downstream,
                success=True,
                message="Data processed successfully",
            )

        except json.JSONDecodeError:
            logger.exception(f"Exception in product_down_stream_submit")
            return SaveProductDownstreamPayload(
                updated_products_downstream=[],
                success=False,
                message="An error occurred",
            )
        except SQLAlchemyError:
            info.context.session.rollback()
            logger.exception(f"Error adding downstream product")
            return SaveProductDownstreamPayload(
                updated_products_downstream=[],
                success=False,
                message="An error occurred",
            )
Editor is loading...
Leave a Comment