Untitled

 avatar
unknown
plain_text
5 months ago
4.4 kB
3
Indexable
import logging
import os
import awswrangler as wr

from reference_data_config import config as ref_data_conf
from src.data_validation import (
    securities_data_validation,
    portfolio_data_validation,
    broker_data_validation,
    portfolio_share_class_data_validation,
)
from data_reservoir import common_services

logger = logging.getLogger()
logger.setLevel(logging.INFO)

env = os.environ["env"]


def lambda_handler(event: dict, _context: dict) -> dict:
    logger.info(event)
    missing_securityid, missing_portfolioid, missing_brokercode = [], [], []
    missing_portfolios, missing_share_class = [], []
    raise_missing_entries = event.get(
        "raise_missing_entries", os.getenv("raise_missing_entries", "Yes")
    )
    reservoir_bucket = os.environ["reservoir_bucket"]
    source_ref_conf = ref_data_conf.get(event["table_name"], "NoEntry")
    if source_ref_conf == "NoEntry":
        return {"status_code": 200}
    as_of_date = f"{event['yyyy']}-{event['mm']}-{event['dd']}"
    source_query = (
        str(source_ref_conf["source_query"])
        .replace("table_name_param", event["table_name"])
        .replace("year_param", event["yyyy"])
        .replace("month_param", event["mm"])
        .replace("date_param", event["dd"])
        .replace("hub_load_time_utc_param", event["hub_load_time_utc"])
    )

    logger.info("Reading source data")
    src_df = wr.athena.read_sql_query(
        sql=source_query,
        database=source_ref_conf["db_name"],
        ctas_approach=False,
        s3_output=f"s3://{reservoir_bucket}/athena_results/",
    )

    if "securities" in source_ref_conf["references_master_data"]:
        logger.info("Calling securities validation..")
        missing_securityid.extend(
            securities_data_validation(
                src_df,
                source_ref_conf["references_master_data"]["securities"],
            )
        )
    if "portfolio" in source_ref_conf["references_master_data"]:
        logger.info("Calling portfolio validation..")
        missing_portfolioid.extend(
            portfolio_data_validation(
                src_df,
                source_ref_conf["references_master_data"]["portfolio"],
            )
        )
    if "brokercode" in source_ref_conf["references_master_data"]:
        logger.info("Calling brokercode validation..")
        missing_brokercode.extend(
            broker_data_validation(
                src_df,
                source_ref_conf["references_master_data"]["brokercode"],
            )
        )
    if "portfolio_share_class" in source_ref_conf["references_master_data"]:
        logger.info("Calling portfolio_share_class validation..")
        portfolio_share_class = portfolio_share_class_data_validation(
            src_df,
            source_ref_conf["references_master_data"]["portfolio_share_class"],
        )

        missing_portfolios.extend(portfolio_share_class[0])
        missing_share_class.extend(portfolio_share_class[1])

    if (
        missing_securityid
        or missing_portfolioid
        or missing_brokercode
        or missing_portfolios
        or missing_share_class
    ):
        if env == 'prod':
            cc_emails = source_ref_conf.get("cc_emails", "")
        else:
            cc_emails = ''
        logger.info("Missing reference master data found")
        mail_params = {
            "as_of_date": as_of_date,
            "table_name": event["table_name"],
            "system_name": source_ref_conf.get("system_name", ""),
            "to_recipient": os.environ["to_emails"],
            "signature": os.environ["signature"],
            "cc_recipient": cc_emails
        }

        common_services.notify_missing_securities(missing_securityid, mail_params, env)
        common_services.notify_missing_portfolios(missing_portfolioid, mail_params, env)
        common_services.notify_missing_portfolios(missing_portfolios, mail_params, env)
        common_services.notify_missing_brokercodes(missing_brokercode, mail_params, env)
        common_services.notify_missing_shareclass(missing_share_class, mail_params, env)
        if raise_missing_entries == "Yes":
            raise ReferenceError("Missing references in the master data..")
    return {"status_code": 200}
Editor is loading...
Leave a Comment