Untitled
unknown
plain_text
a year ago
4.4 kB
4
Indexable
import logging
import os
import awswrangler as wr
from reference_data_config import config as ref_data_conf
from src.data_validation import (
securities_data_validation,
portfolio_data_validation,
broker_data_validation,
portfolio_share_class_data_validation,
)
from data_reservoir import common_services
logger = logging.getLogger()
logger.setLevel(logging.INFO)
env = os.environ["env"]
def lambda_handler(event: dict, _context: dict) -> dict:
logger.info(event)
missing_securityid, missing_portfolioid, missing_brokercode = [], [], []
missing_portfolios, missing_share_class = [], []
raise_missing_entries = event.get(
"raise_missing_entries", os.getenv("raise_missing_entries", "Yes")
)
reservoir_bucket = os.environ["reservoir_bucket"]
source_ref_conf = ref_data_conf.get(event["table_name"], "NoEntry")
if source_ref_conf == "NoEntry":
return {"status_code": 200}
as_of_date = f"{event['yyyy']}-{event['mm']}-{event['dd']}"
source_query = (
str(source_ref_conf["source_query"])
.replace("table_name_param", event["table_name"])
.replace("year_param", event["yyyy"])
.replace("month_param", event["mm"])
.replace("date_param", event["dd"])
.replace("hub_load_time_utc_param", event["hub_load_time_utc"])
)
logger.info("Reading source data")
src_df = wr.athena.read_sql_query(
sql=source_query,
database=source_ref_conf["db_name"],
ctas_approach=False,
s3_output=f"s3://{reservoir_bucket}/athena_results/",
)
if "securities" in source_ref_conf["references_master_data"]:
logger.info("Calling securities validation..")
missing_securityid.extend(
securities_data_validation(
src_df,
source_ref_conf["references_master_data"]["securities"],
)
)
if "portfolio" in source_ref_conf["references_master_data"]:
logger.info("Calling portfolio validation..")
missing_portfolioid.extend(
portfolio_data_validation(
src_df,
source_ref_conf["references_master_data"]["portfolio"],
)
)
if "brokercode" in source_ref_conf["references_master_data"]:
logger.info("Calling brokercode validation..")
missing_brokercode.extend(
broker_data_validation(
src_df,
source_ref_conf["references_master_data"]["brokercode"],
)
)
if "portfolio_share_class" in source_ref_conf["references_master_data"]:
logger.info("Calling portfolio_share_class validation..")
portfolio_share_class = portfolio_share_class_data_validation(
src_df,
source_ref_conf["references_master_data"]["portfolio_share_class"],
)
missing_portfolios.extend(portfolio_share_class[0])
missing_share_class.extend(portfolio_share_class[1])
if (
missing_securityid
or missing_portfolioid
or missing_brokercode
or missing_portfolios
or missing_share_class
):
if env == 'prod':
cc_emails = source_ref_conf.get("cc_emails", "")
else:
cc_emails = ''
logger.info("Missing reference master data found")
mail_params = {
"as_of_date": as_of_date,
"table_name": event["table_name"],
"system_name": source_ref_conf.get("system_name", ""),
"to_recipient": os.environ["to_emails"],
"signature": os.environ["signature"],
"cc_recipient": cc_emails
}
common_services.notify_missing_securities(missing_securityid, mail_params, env)
common_services.notify_missing_portfolios(missing_portfolioid, mail_params, env)
common_services.notify_missing_portfolios(missing_portfolios, mail_params, env)
common_services.notify_missing_brokercodes(missing_brokercode, mail_params, env)
common_services.notify_missing_shareclass(missing_share_class, mail_params, env)
if raise_missing_entries == "Yes":
raise ReferenceError("Missing references in the master data..")
return {"status_code": 200}Editor is loading...
Leave a Comment