Untitled
unknown
plain_text
5 months ago
7.3 kB
3
Indexable
import os import json import logging import boto3 import requests from requests_aws4auth import AWS4Auth from src.service.common_utils import get_token, get_secret from src.service.share_class import get_portfolio_share_class_data from src.service.broker_reference_mapping import get_broker_data from data_reservoir.common_services import send_mail LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) token_url_suffx = "/oauth-api/token/client" scope = "*" env = os.environ["env"] region = os.environ["AWS_REGION"] bucket_name = os.environ["bucket_name"] mail_sign = os.environ["signature"] support_dl = os.environ["support_dl"].split(",") s3 = boto3.client("s3") reference_esdl_data = {} def get_ref_types( ref_type: str, ref_key: str, ref_value: str, filter_data: list, api_conf: dict, token: str, retry_count: int = 2 ) -> dict: """Fetch data from reference esdl api""" endpoint = (f"/pgi-esdl-reference/api/v1/ReferenceData/ReferenceTypes/{ref_type.replace(' ', '%20')}/" f"ReferenceSystems/ESDL/True") try: ref_data = {} auth_headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}", } LOGGER.info("Calling Reference API for the reference type") response = requests.get(url=api_conf["apigee_base_url"] + endpoint, headers=auth_headers) if response.status_code != 200: if retry_count > 0: return get_ref_types( ref_type, ref_key, ref_value, filter_data, api_conf, token, retry_count - 1) raise response.raise_for_status() response_keys = list(response.json()[0].keys()) for key in response_keys: if key.capitalize() == ref_key: for res in response.json(): if filter_data == [] or res[key] in filter_data: ref_data[res[key]] = res[ref_value] except Exception as e: LOGGER.error(e) raise e return ref_data def get_enterprise_sources(enterprise_src_list: list, api_conf: dict, token: str, retry_count: int = 2) -> dict: """Fetch data from reference esdl api for enterprise sources""" endpoint = "/pgi-esdl-reference/api/v1/EnterpriseSystems/source_name" try: system_ids = {} auth_headers = { "Content-Type": "application/json", "Authorization": f"Bearer {token}", } endpoint_url = api_conf["apigee_base_url"] + endpoint for source in enterprise_src_list: api_endpoint_url = endpoint_url.replace( "source_name", source.replace(" ", "%20") ) response = requests.get(url=api_endpoint_url, headers=auth_headers) if response.status_code != 200: if retry_count > 0: return get_enterprise_sources( enterprise_src_list, api_conf, token, retry_count - 1) raise response.raise_for_status() response_json = response.json() system_ids[response_json["Name"]] = response_json["Id"] except Exception as e: LOGGER.error(e) raise e return system_ids def check_execution_region(secrets: dict) -> tuple[bool, dict]: try: auth = AWS4Auth(secrets["my_iam_user_access_key"], secrets["my_iam_user_secret"], "us-west-2", "lambda") response = requests.get(secrets['health_check_url'], auth=auth) if response.status_code == 200: data = response.json() health_check_region = data["region"] LOGGER.info("health_check_region %s", health_check_region) return health_check_region == os.environ.get('AWS_REGION'), data LOGGER.info("Health check region was not current region of trigger") return False, {} except Exception as err: LOGGER.info("Error getting region check %s", str(err)) return False, {} def data_sync(response: dict) -> None: files_list = ['reference_data/reference_esdl_data.json', 'reference_data/Broker_Data.csv', 'reference_data/Portfolio_Share_Class_Data.csv'] for file in files_list: s3.copy_object( CopySource={"Bucket": bucket_name, "Key": file}, Bucket=bucket_name.replace( response['region'], response['alt_region']), Key=file ) LOGGER.info("Data copied to alt_region ... ") def lambda_handler(_event: dict, _context: dict) -> str: try: secrets = json.loads(get_secret()) flag, response = check_execution_region(secrets) if not flag: return "Success" LOGGER.info("Reading the Reference Types JSON Configuration... ") with open("reference_types_config.json", "r", encoding="utf-8") as f: reference_config = json.load(f) LOGGER.info("Fetching the Reference Types") token = get_token(secrets) for reference in reference_config["referenceTypeList"]: reference_type_dict = reference_config["referenceTypeList"][reference] reference_type = reference_type_dict["referenceType"] key_column = reference_type_dict["key_column"] value_column = reference_type_dict["value_column"] key_column_filter = reference_type_dict["key_column_filter"] reference_esdl_data[reference] = get_ref_types( reference_type, key_column, value_column, key_column_filter, secrets, token ) LOGGER.info("Fetching the Enterprise Sources ") for enterpriseSource in reference_config["enterpriseSource"]: enterprise_src_list = reference_config["enterpriseSource"][enterpriseSource] reference_esdl_data[enterpriseSource] = get_enterprise_sources( enterprise_src_list, secrets, token ) LOGGER.info("Pushing the Reference Data to S3 bucket") s3.put_object( Body=json.dumps(reference_esdl_data), Bucket=bucket_name, Key="reference_data/reference_esdl_data.json", ) LOGGER.info("Fetching Broker data from ESDL Reference Mapping") get_broker_data(reference_config, bucket_name, secrets) LOGGER.info("Fetching Portfolio and Share Class data from Portfolio Master") get_portfolio_share_class_data(reference_config, secrets, bucket_name) data_sync(response) send_mail( env, subject=os.environ["success_mail_sub"], body="Dear Team,\n\n" f"Fetching Reference Data Completed.\n" f"\nThanks,\n{mail_sign}", to_list=support_dl, ) except Exception as err: LOGGER.exception(err) send_mail( env, subject=os.environ["failure_mail_sub"], body="Dear Team,\n\n" f"The Reference Data failed due to the following error..\n" f"{err}\n\nThanks,\n{mail_sign}", to_list=support_dl, ) raise err return "Success"
Editor is loading...
Leave a Comment