Untitled
unknown
plain_text
5 months ago
13 kB
4
Indexable
import json import logging import traceback from collections import OrderedDict from datetime import datetime from enum import Enum from typing import Any, Dict, List, Optional from sqlalchemy import select, text from sqlalchemy.exc import SQLAlchemyError from strawberry import enum, input, mutation, type from app.graphql.schema import Info from app.graphql.types import DownstreamProductType, UserRoleType, UserType from app.graphql.types.product_type import ProductDataInput, ProductType from app.models import ( Downstream, DownstreamOptionPublish, DownstreamProduct, DownstreamPublish, FieldDetail, ) from app.models.dynamic_view import load_view logger = logging.getLogger(__name__) @type class SaveProductDownstreamPayload: updated_products_downstream: List[DownstreamProductType] success: bool message: str def call_procedure_and_get_view_data( info: Info, filter_map: Optional[Dict[str, str]] = None, record_id: Optional[str] = None, ) -> List[Dict[str, Any]]: try: # Debugging statement before calling the procedure logger.debug("Calling stored procedure productDownstreamViewSP()") # Call the stored procedure info.context.session.execute(text("CALL productDownstreamViewSP()")) info.context.session.commit() # Debugging statement after calling the procedure logger.debug("Stored procedure called successfully") downstream_product_view = load_view( "mdm_downstream_product_view", info.context.session.connection(), ) query = select(downstream_product_view) if record_id: query = query.where(downstream_product_view.columns.RECORD_ID == record_id) if filter_map: for key, value in filter_map.items(): query = query.where( downstream_product_view.columns[key].like(f"%{value.lower()}%") ) # Fetch data from the view logger.debug(f"Query: {query} {query.compile()} {query.params} {record_id=}") result = info.context.session.execute(query) data = downstream_product_view.result_as_json(result) logger.debug(f"Result: {data}") return data except Exception: logger.exception("Unexpected error in call_procedure_and_get_view_data()") return [] def save_all_published_record( info: Info, record_id_array: List[str], current_user: str ): publish_no = ( info.context.session.query(FieldDetail) .filter(FieldDetail.is_published == "N") .all() ) for record_id in record_id_array: # Call the procedure and get data from the view for the specific record_id view_data = call_procedure_and_get_view_data(info, None, record_id) # Only proceed if view_data is present if view_data: # Print the view data for debugging logger.debug(f"View data for record_id {record_id}: {view_data}") # Initialize the list to store dynamic fields data_map_list = [] # Since the data is already filtered by record_id, we can directly use it data_dict = view_data[0] # Assuming there's only one record per record_id for field in publish_no: data_dict.pop(field.name, None) data_dict.pop("MDM_ID_2", None) data_dict.pop("RECORD_ID_2", None) # Convert datetime objects to strings for key, value in data_dict.items(): if isinstance(value, datetime): data_dict[key] = value.isoformat() # Extract dynamic fields and add to the list dynamic_fields = { key: value for key, value in data_dict.items() if key.startswith("DWN_") } data_map_list.append(dynamic_fields) logger.debug(f"DATA MAP LIST: {data_map_list}") # Use OrderedDict to maintain the order of fields ordered_data_dict = OrderedDict(sorted(data_dict.items())) data_json = json.dumps(ordered_data_dict) new_downstream = DownstreamPublish( record_id=record_id, dwn_strm_data=data_json, created_date=datetime.now(), created_by_id=current_user, updated_date=datetime.now(), updated_by_id=current_user, ) info.context.session.add(new_downstream) info.context.session.flush() info.context.session.refresh(new_downstream) down_id_map = { key[4:]: value for key, value in data_dict.items() if key.startswith("DWN_") and value == "Y" } save_to_mdm_downstream_option_publish( info.context, new_downstream.id, down_id_map, ) def save_to_mdm_downstream_option_publish( context, saved_id: int, down_id_map: Dict[str, str], ) -> bool: try: for down_id, value in down_id_map.items(): # Query to get the Downstream object publish_option = ( context.session.query(Downstream) .filter(Downstream.id == int(down_id)) .all() ) if publish_option: pub_obj = publish_option[0] # Create a new DownStreamOptionPublish object dwn_option_publish = DownstreamOptionPublish( dwn_strm_publish_id=saved_id, dwn_strm_name=pub_obj.name ) # Save the object to the database context.session.add(dwn_option_publish) context.session.commit() return True except Exception: logger.exception("Exception in save_to_mdm_downstream_option_publish()") return False def _process_row(info: Info, row: Any) -> None: for downstream_product in row.downstream_products or []: _process_downstream_product(info, row.mdm_id, downstream_product) def _process_downstream_product( info: Info, mdm_id: str, downstream_product: Any ) -> None: dwn_strm_id = downstream_product.dwn_strm_id is_active = downstream_product.is_checked == "Yes" existing_entry = ( info.context.session.query(DownstreamProduct) .filter( DownstreamProduct.mdm_id == mdm_id, DownstreamProduct.dwn_strm_id == dwn_strm_id, ) .first() ) if is_active: _activate_or_create_entry(info, existing_entry, mdm_id, dwn_strm_id) elif existing_entry: _deactivate_entry(info, existing_entry) def _activate_or_create_entry( info: Info, existing_entry: DownstreamProduct, mdm_id: str, dwn_strm_id: str ) -> None: current_user = info.context.current_user.login now = datetime.now() if existing_entry: if not existing_entry.is_active: existing_entry.is_active = True existing_entry.updated_by_id = current_user existing_entry.updated_date = now info.context.session.commit() else: new_entry = DownstreamProduct( dwn_strm_id=dwn_strm_id, mdm_id=mdm_id, created_by_id=current_user, updated_by_id=current_user, is_active=True, created_date=now, updated_date=now, ) info.context.session.add(new_entry) info.context.session.commit() def _deactivate_entry(info: Info, existing_entry: DownstreamProduct) -> None: existing_entry.is_active = False existing_entry.updated_by_id = info.context.current_user.login existing_entry.updated_date = datetime.now() info.context.session.commit() def _fetch_downstream_products(info: Info) -> list: return info.context.session.query(DownstreamProduct).all() def _handle_exception(message: str) -> SaveProductDownstreamPayload: return SaveProductDownstreamPayload( updated_products_downstream=[], success=False, message=message, ) @type class ProductDownstreamAssociation: @mutation def product_down_stream_submit( self, info: Info, input: ProductDataInput ) -> SaveProductDownstreamPayload: try: rows_data = input.rows_data record_id_array = [row.record_id for row in rows_data] for row in rows_data: _process_row(info, row) updated_products_downstream = _fetch_downstream_products(info) mdm_id = row.mdm_id record_id = row.record_id record_id_array.append(record_id) if row.downstream_products: for downstream_product in row.downstream_products: logger.debug(f"DOWNSTREAM IDS: {downstream_product}") dwn_stream_id = downstream_product.dwn_strm_id down_id = "Y" if downstream_product.is_checked == "Yes" else "N" logger.critical( f"product_down_stream_submit {mdm_id=} {dwn_stream_id=} {down_id=}" ) existing_entry = ( info.context.session.query(DownstreamProduct) .filter( DownstreamProduct.mdm_id == mdm_id, DownstreamProduct.dwn_strm_id == dwn_stream_id, ) .first() ) if down_id == "Y": if existing_entry: if not existing_entry.is_active: existing_entry.is_active = True existing_entry.updated_by_id = ( info.context.current_user.login ) existing_entry.updated_date = datetime.now() info.context.session.commit() else: downstream_product = DownstreamProduct( dwn_strm_id=dwn_stream_id, mdm_id=mdm_id, created_by_id=info.context.current_user.login, updated_by_id=info.context.current_user.login, is_active=True, created_date=datetime.now(), updated_date=datetime.now(), ) info.context.session.add(downstream_product) info.context.session.commit() elif down_id == "N" and existing_entry: existing_entry.is_active = False existing_entry.updated_by_id = info.context.current_user.login existing_entry.updated_date = datetime.now() info.context.session.commit() product_downstream_data = info.context.session.query( DownstreamProduct ).all() updated_products_downstream.extend(product_downstream_data) logger.debug( f"UPDATED DOWNSTREAM PRODUCT DATA: {updated_products_downstream} ) save_all_published_record( info, record_id_array, info.context.current_user.login ) return SaveProductDownstreamPayload( updated_products_downstream=updated_products_downstream, success=True, message="Data processed successfully", ) except json.JSONDecodeError: logger.exception(f"Exception in product_down_stream_submit") return SaveProductDownstreamPayload( updated_products_downstream=[], success=False, message="An error occurred", ) except SQLAlchemyError: info.context.session.rollback() logger.exception(f"Error adding downstream product") return SaveProductDownstreamPayload( updated_products_downstream=[], success=False, message="An error occurred", )
Editor is loading...
Leave a Comment