Untitled
from fastapi import APIRouter, Query, Request, HTTPException, Body from fastapi.encoders import jsonable_encoder import pandas as pd import json from app_instance import dbObj, logObj, configObj, setResponse from models.message import Message, ParameterFailed from util.validateUUID import fn_validate_uuid from models.arc_entity_details import EntityDetailsIn, EntityDetailsPatch from cassandra.util import OrderedMapSerializedKey from configparser import ConfigParser from sys import exc_info as sys_exc_info import json from uuid import uuid5, NAMESPACE_DNS, UUID, uuid4 from pydantic import BaseModel, UUID5, Field, validator, ConfigDict from typing import List, Optional, Dict from sqlalchemy.orm import Session from datetime import datetime, timedelta config = ConfigParser() config.read('/app/server/HOBS-Archival/archival_service_det/config/config.cfg') column_mapping = json.loads(config.get('entityDetails', 'columnMapping', raw=True)) reverse_column_mapping = {v: k for k, v in column_mapping.items()} class Error(Exception): """Base class for other exceptions""" pass class Exception_Validation(Error): pass async def log_event(event_type, event_name, event_request, event_status, remarks, start_time, end_time=None, batch_id=None): duration = None if end_time: duration = str(end_time - start_time) op_id = configObj.get('DEFAULT', 'opID') bu_id = configObj.get('DEFAULT', 'buID') if not batch_id: batch_id = uuid4() query = """ INSERT INTO archival.arc_entity_logs ( ael_batch_id, ael_year, ael_month, ael_day, ael_event_date, ael_event_type, ael_event_name, ael_event_status, ael_event_request, ael_remarks, ael_start_date, ael_end_date, ael_duration, ael_op_id, ael_bu_id, ael_created_by, ael_created_date, ael_updated_by, ael_updated_date ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """ args = ( batch_id, start_time.year, start_time.month, start_time.day, start_time, event_type, event_name, event_status, event_request, remarks, start_time, end_time, duration, op_id, bu_id, 'archivalAPI', datetime.now(), 'archivalAPI', datetime.now() ) await dbObj.dbExec(query, args) return batch_id entityMasterDetailsRouter = APIRouter() @entityMasterDetailsRouter.get("/metadata/getEntityDetails", responses={ 200: {"model": EntityDetailsIn, "description": "Success"}, 204: {"description": "No data"}, 400: {"model": Message, "description": "Details Not Found"}, 422: {"model": ParameterFailed, "description": "There was a field rule violation or missing required attribute."}, 500: {"model": Message, "description": "Internal Server Error"} }, summary="Retrieves the entity master table details", response_model=EntityDetailsIn ) async def get_data(request: Request , detailsID: str = Query(default=None , description="Entity unique id" , openapi_examples="3fa85f64-5717-4562-b3fc-2c963f66afa6") , tablename: str = Query(None, description="Enter value to retrieve entities associated with table name(s)") ): statusCode = 200 statusMessage = "Success" response = [] start_time = datetime.now() try: logObj.info("Invoked getEntityDetails #getEntityDetails#args") if detailsID and not fn_validate_uuid(detailsID): raise Exception_Validation("Invalid detailsID format") query = "SELECT * FROM archival.arc_entity_details" if detailsID and tablename: query += f" WHERE aed_seq={detailsID} AND aed_table_name='{tablename}' allow filtering" elif detailsID: query += f" WHERE aed_seq={detailsID} allow filtering" elif tablename: query += f" WHERE aed_table_name='{tablename}' allow filtering" logObj.debug(f"query: {query}") result = await dbObj.dbSelect(query) logObj.debug(f"result: {result}") logObj.debug(f"result: {type(result)}") resultDF = pd.DataFrame.from_dict(result, orient='columns') resultDF = resultDF.rename(columns=column_mapping) if not resultDF.empty: if resultDF['executionOrder'].isnull().any(): resultDF['executionOrder'].fillna(0, inplace=True) resultDF['createdDate'] = resultDF['createdDate'].astype(str) resultDF['updatedDate'] = resultDF['updatedDate'].astype(str) resultDF['seqNo'] = resultDF['seqNo'].astype(str) resultDF['parentEntity'] = resultDF['parentEntity'].astype(str) resultDF['masterKey'] = resultDF['masterKey'].astype(str) resultDF['metaDataMapping'] = resultDF['metaDataMapping'].apply( lambda x: dict(x) if x is not None else None) result_dict = resultDF.to_dict(orient='records') response = result_dict else: statusCode = 204 statusMessage = "No data for the input received." logObj.debug(f"response : {response}") raw_url = str(request.url) domain = str(request.base_url) path = str(request.url.path) query_params = str(request.query_params) params_dict = dict(request.query_params) logObj.debug(f"raw_url: {raw_url}") logObj.debug(f"domain: {domain}") logObj.debug(f"path: {path}") logObj.debug(f"query_params: {query_params}") logObj.debug(f"params_dict: {params_dict}") except Exception_Validation as expvald: statusCode = 422 statusMessage = ("ERROR - %s. Line No - %s" % (str(expvald), str(sys_exc_info()[-1].tb_lineno))) logObj.error(statusMessage) batch_id = await log_event('metaData', 'entityDetails', 'get', 'failed', statusMessage, start_time) except HTTPException as httpe: statusCode = httpe.status_code statusMessage = httpe.detail logObj.error(statusMessage) batch_id = await log_event('metaData', 'entityDetails', 'get', 'failed', statusMessage, start_time) except Exception as e: statusCode = 500 statusMessage = ("ERROR - %s. Line No - %s" % (str(e), str(sys_exc_info()[-1].tb_lineno))) logObj.error(statusMessage) batch_id = await log_event('metaData', 'entityDetails', 'get', 'failed', statusMessage, start_time) else: batch_id = await log_event('metaData', 'entityDetails', 'get', 'success', 'Record fetched successfully', start_time) return setResponse(status_code=statusCode , status_description=statusMessage , response=response ) @entityMasterDetailsRouter.post("/metadata/addEntityDetails", responses={ 201: {"model": EntityDetailsIn, "description": "Record added successfully"}, 422: {"model": Message, "description": "Validation error or missing required attribute."}, 500: {"model": Message, "description": "Internal Server Error"} }, tags=["Meta data operations"], summary="Inserts a new record into entity rules table", response_model=EntityDetailsIn ) async def add_data(request: EntityDetailsIn = Body(...)): statusCode = 201 statusMessage = "Record added successfully" response = None logObj.debug("post Initiating") logObj.debug(f"{request}") start_time = datetime.now() try: data = jsonable_encoder(request) logObj.debug(f"{data}") query_exists = f"SELECT COUNT(*) FROM archival.arc_entity_details WHERE aed_entity_name = %s AND aed_table_name = %s AND aed_table_key = %s AND aed_entity_pkey = %s allow filtering" args_exists = (data['entityName'], data['tableName'], data['tableKey'], data['entityPKey']) exists = await dbObj.dbSelect(query_exists, args_exists) print("hi") if exists[0]['count'] > 0: statusCode = 409 statusMessage = "Record already exists" logObj.debug(statusMessage) else: print("hiii") aed_seq_uuid = uuid5(NAMESPACE_DNS, data['tableName'] + data['tableKey']) logObj.debug(f"Generated UUID for aer_seq:{aed_seq_uuid}") data['seqNo'] = aed_seq_uuid current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f%z") print("hiiiii") if data['createdDate'] is None: data['createdDate'] = current_datetime else: try: datetime.strptime(data['createdDate'], "%Y-%m-%d %H:%M:%S.%f%z") except Exception as e: raise Exception_Validation("createdDate is in invalid format") print("wee") if data['updatedDate'] is None: data['updatedDate'] = current_datetime else: try: datetime.strptime(data['updatedDate'], "%Y-%m-%d %H:%M:%S.%f%z") except Exception as e: raise Exception_Validation("updatedDate is in invalid format") print("lll") if data['createdBy'] is None: data['createdBy'] = "archivalJob" if data['updatedBy'] is None: data['updatedBy'] = "archivalJob" if data['masterKey'] is not None: data['masterKey'] = UUID(data['masterKey']) print("ccc") if data.get('parentEntity') is not None: try: data['parentEntity'] = UUID(data['parentEntity']) except ValueError: raise Exception_Validation("parent entity is not valid uuid format") print("sss") logObj.debug(f"data : {data}") entity_dict = {key: data[value] for key, value in column_mapping.items() if value in data} logObj.debug(f"entity_dict : {entity_dict}") logObj.debug(f"Inserting record...") query = f"INSERT INTO archival.arc_entity_details ({', '.join(entity_dict.keys())}) VALUES ({', '.join(['%s'] * len(entity_dict))})" print(query) args = tuple(entity_dict.values()) logObj.debug(f"query : {query}") logObj.debug(f"args : {args}") await dbObj.dbExec(query, args) except HTTPException as httpe: statusCode = httpe.status_code statusMessage = httpe.detail logObj.error(statusMessage) batch_id = await log_event('metaData', 'entityDetails', 'insert', 'failed', statusMessage, start_time) except Exception as e: statusCode = 500 statusMessage = ("ERROR - %s. Line No - %s" % (str(e), str(sys.exc_info()[-1].tb_lineno))) logObj.error(statusMessage) batch_id = await log_event('metaData', 'entityDetails', 'insert', 'failed', statusMessage, start_time) else: batch_id = await log_event('metaData', 'entityDetails', 'insert', 'success', 'Record added successfully', start_time) return setResponse(status_code=statusCode, status_description=statusMessage, response=response) @entityMasterDetailsRouter.delete("/metadata/deleteEntityDetails", responses={ 200: {"model": Message, "description": "Record deleted successfully"}, 404: {"model": Message, "description": "Record not found"}, 422: {"model": Message, "description": "Duplicate record found"}, 500: {"model": Message, "description": "Internal Server Error"} }, tags=["Meta data operations"], summary="Deletes a record from the entity rules table", response_model=Message ) async def delete_data(opID: str = Query(None), buID: str = Query(None), tableName: str = Query(None), tableKey: str = Query(None), detailsID: UUID5 = Query(None)): statusCode = 200 statusMessage = "Record deleted successfully" response = None logObj.debug("delete Initiating") logObj.debug( f"opID: {opID}, buID: {buID}, tableName: {tableName}, tableKey: {tableKey}, detailsID: {detailsID}") start_time = datetime.now() try: if detailsID and not (opID and buID and tableName and tableKey): query_get_details = f"SELECT aed_op_id, aed_bu_id, aed_table_name, aed_table_key FROM archival.arc_entity_details WHERE aed_seq = {detailsID} allow filtering" logObj.debug(query_get_details) details = await dbObj.dbSelect(query_get_details) logObj.debug(details) details = list(details) if len(details) == 0: raise HTTPException(status_code=404, detail="Record not found") if len(details) == 1: opID = details[0]['aed_op_id'] buID = details[0]['aed_bu_id'] tableName = details[0]['aed_table_name'] tableKey = details[0]['aed_table_key'] else: raise HTTPException(status_code=422, detail="More than one record present in the DATABASE") elif (opID and buID and tableName and tableKey) and not detailsID: query_get_details = f"SELECT aed_op_id, aed_bu_id, aed_table_name, aed_table_key FROM archival.arc_entity_details WHERE aed_op_id = '{opID}' AND aed_bu_id = '{buID}' AND aed_table_name = '{tableName}' AND aed_table_key = '{tableKey}' allow filtering" logObj.debug(query_get_details) details
Leave a Comment