Untitled
unknown
plain_text
5 months ago
4.4 kB
3
Indexable
import logging import os import awswrangler as wr from reference_data_config import config as ref_data_conf from src.data_validation import ( securities_data_validation, portfolio_data_validation, broker_data_validation, portfolio_share_class_data_validation, ) from data_reservoir import common_services logger = logging.getLogger() logger.setLevel(logging.INFO) env = os.environ["env"] def lambda_handler(event: dict, _context: dict) -> dict: logger.info(event) missing_securityid, missing_portfolioid, missing_brokercode = [], [], [] missing_portfolios, missing_share_class = [], [] raise_missing_entries = event.get( "raise_missing_entries", os.getenv("raise_missing_entries", "Yes") ) reservoir_bucket = os.environ["reservoir_bucket"] source_ref_conf = ref_data_conf.get(event["table_name"], "NoEntry") if source_ref_conf == "NoEntry": return {"status_code": 200} as_of_date = f"{event['yyyy']}-{event['mm']}-{event['dd']}" source_query = ( str(source_ref_conf["source_query"]) .replace("table_name_param", event["table_name"]) .replace("year_param", event["yyyy"]) .replace("month_param", event["mm"]) .replace("date_param", event["dd"]) .replace("hub_load_time_utc_param", event["hub_load_time_utc"]) ) logger.info("Reading source data") src_df = wr.athena.read_sql_query( sql=source_query, database=source_ref_conf["db_name"], ctas_approach=False, s3_output=f"s3://{reservoir_bucket}/athena_results/", ) if "securities" in source_ref_conf["references_master_data"]: logger.info("Calling securities validation..") missing_securityid.extend( securities_data_validation( src_df, source_ref_conf["references_master_data"]["securities"], ) ) if "portfolio" in source_ref_conf["references_master_data"]: logger.info("Calling portfolio validation..") missing_portfolioid.extend( portfolio_data_validation( src_df, source_ref_conf["references_master_data"]["portfolio"], ) ) if "brokercode" in source_ref_conf["references_master_data"]: logger.info("Calling brokercode validation..") missing_brokercode.extend( broker_data_validation( src_df, source_ref_conf["references_master_data"]["brokercode"], ) ) if "portfolio_share_class" in source_ref_conf["references_master_data"]: logger.info("Calling portfolio_share_class validation..") portfolio_share_class = portfolio_share_class_data_validation( src_df, source_ref_conf["references_master_data"]["portfolio_share_class"], ) missing_portfolios.extend(portfolio_share_class[0]) missing_share_class.extend(portfolio_share_class[1]) if ( missing_securityid or missing_portfolioid or missing_brokercode or missing_portfolios or missing_share_class ): if env == 'prod': cc_emails = source_ref_conf.get("cc_emails", "") else: cc_emails = '' logger.info("Missing reference master data found") mail_params = { "as_of_date": as_of_date, "table_name": event["table_name"], "system_name": source_ref_conf.get("system_name", ""), "to_recipient": os.environ["to_emails"], "signature": os.environ["signature"], "cc_recipient": cc_emails } common_services.notify_missing_securities(missing_securityid, mail_params, env) common_services.notify_missing_portfolios(missing_portfolioid, mail_params, env) common_services.notify_missing_portfolios(missing_portfolios, mail_params, env) common_services.notify_missing_brokercodes(missing_brokercode, mail_params, env) common_services.notify_missing_shareclass(missing_share_class, mail_params, env) if raise_missing_entries == "Yes": raise ReferenceError("Missing references in the master data..") return {"status_code": 200}
Editor is loading...
Leave a Comment