Untitled
unknown
plain_text
a year ago
7.3 kB
4
Indexable
import os
import json
import logging
import boto3
import requests
from requests_aws4auth import AWS4Auth
from src.service.common_utils import get_token, get_secret
from src.service.share_class import get_portfolio_share_class_data
from src.service.broker_reference_mapping import get_broker_data
from data_reservoir.common_services import send_mail
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
token_url_suffx = "/oauth-api/token/client"
scope = "*"
env = os.environ["env"]
region = os.environ["AWS_REGION"]
bucket_name = os.environ["bucket_name"]
mail_sign = os.environ["signature"]
support_dl = os.environ["support_dl"].split(",")
s3 = boto3.client("s3")
reference_esdl_data = {}
def get_ref_types(
ref_type: str, ref_key: str, ref_value: str, filter_data: list, api_conf: dict, token: str, retry_count: int = 2
) -> dict:
"""Fetch data from reference esdl api"""
endpoint = (f"/pgi-esdl-reference/api/v1/ReferenceData/ReferenceTypes/{ref_type.replace(' ', '%20')}/"
f"ReferenceSystems/ESDL/True")
try:
ref_data = {}
auth_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
LOGGER.info("Calling Reference API for the reference type")
response = requests.get(url=api_conf["apigee_base_url"] + endpoint, headers=auth_headers)
if response.status_code != 200:
if retry_count > 0:
return get_ref_types(
ref_type, ref_key, ref_value, filter_data, api_conf, token, retry_count - 1)
raise response.raise_for_status()
response_keys = list(response.json()[0].keys())
for key in response_keys:
if key.capitalize() == ref_key:
for res in response.json():
if filter_data == [] or res[key] in filter_data:
ref_data[res[key]] = res[ref_value]
except Exception as e:
LOGGER.error(e)
raise e
return ref_data
def get_enterprise_sources(enterprise_src_list: list, api_conf: dict, token: str, retry_count: int = 2) -> dict:
"""Fetch data from reference esdl api for enterprise sources"""
endpoint = "/pgi-esdl-reference/api/v1/EnterpriseSystems/source_name"
try:
system_ids = {}
auth_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}",
}
endpoint_url = api_conf["apigee_base_url"] + endpoint
for source in enterprise_src_list:
api_endpoint_url = endpoint_url.replace(
"source_name", source.replace(" ", "%20")
)
response = requests.get(url=api_endpoint_url, headers=auth_headers)
if response.status_code != 200:
if retry_count > 0:
return get_enterprise_sources(
enterprise_src_list, api_conf, token, retry_count - 1)
raise response.raise_for_status()
response_json = response.json()
system_ids[response_json["Name"]] = response_json["Id"]
except Exception as e:
LOGGER.error(e)
raise e
return system_ids
def check_execution_region(secrets: dict) -> tuple[bool, dict]:
try:
auth = AWS4Auth(secrets["my_iam_user_access_key"], secrets["my_iam_user_secret"], "us-west-2", "lambda")
response = requests.get(secrets['health_check_url'], auth=auth)
if response.status_code == 200:
data = response.json()
health_check_region = data["region"]
LOGGER.info("health_check_region %s", health_check_region)
return health_check_region == os.environ.get('AWS_REGION'), data
LOGGER.info("Health check region was not current region of trigger")
return False, {}
except Exception as err:
LOGGER.info("Error getting region check %s", str(err))
return False, {}
def data_sync(response: dict) -> None:
files_list = ['reference_data/reference_esdl_data.json',
'reference_data/Broker_Data.csv', 'reference_data/Portfolio_Share_Class_Data.csv']
for file in files_list:
s3.copy_object(
CopySource={"Bucket": bucket_name, "Key": file},
Bucket=bucket_name.replace(
response['region'], response['alt_region']),
Key=file
)
LOGGER.info("Data copied to alt_region ... ")
def lambda_handler(_event: dict, _context: dict) -> str:
try:
secrets = json.loads(get_secret())
flag, response = check_execution_region(secrets)
if not flag:
return "Success"
LOGGER.info("Reading the Reference Types JSON Configuration... ")
with open("reference_types_config.json", "r", encoding="utf-8") as f:
reference_config = json.load(f)
LOGGER.info("Fetching the Reference Types")
token = get_token(secrets)
for reference in reference_config["referenceTypeList"]:
reference_type_dict = reference_config["referenceTypeList"][reference]
reference_type = reference_type_dict["referenceType"]
key_column = reference_type_dict["key_column"]
value_column = reference_type_dict["value_column"]
key_column_filter = reference_type_dict["key_column_filter"]
reference_esdl_data[reference] = get_ref_types(
reference_type, key_column, value_column, key_column_filter, secrets, token
)
LOGGER.info("Fetching the Enterprise Sources ")
for enterpriseSource in reference_config["enterpriseSource"]:
enterprise_src_list = reference_config["enterpriseSource"][enterpriseSource]
reference_esdl_data[enterpriseSource] = get_enterprise_sources(
enterprise_src_list, secrets, token
)
LOGGER.info("Pushing the Reference Data to S3 bucket")
s3.put_object(
Body=json.dumps(reference_esdl_data),
Bucket=bucket_name,
Key="reference_data/reference_esdl_data.json",
)
LOGGER.info("Fetching Broker data from ESDL Reference Mapping")
get_broker_data(reference_config, bucket_name, secrets)
LOGGER.info("Fetching Portfolio and Share Class data from Portfolio Master")
get_portfolio_share_class_data(reference_config, secrets, bucket_name)
data_sync(response)
send_mail(
env,
subject=os.environ["success_mail_sub"],
body="Dear Team,\n\n"
f"Fetching Reference Data Completed.\n"
f"\nThanks,\n{mail_sign}",
to_list=support_dl,
)
except Exception as err:
LOGGER.exception(err)
send_mail(
env,
subject=os.environ["failure_mail_sub"],
body="Dear Team,\n\n"
f"The Reference Data failed due to the following error..\n"
f"{err}\n\nThanks,\n{mail_sign}",
to_list=support_dl,
)
raise err
return "Success"
Editor is loading...
Leave a Comment