Untitled

 avatar
unknown
plain_text
5 months ago
5.3 kB
4
Indexable
import os
import json
import logging
import boto3
import requests
from src.service.common_utils import get_token, get_secret
from src.service.share_class import get_portfolio_share_class_data

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)

token_url_suffx = "/oauth-api/token/client"
scope = "*"

env = os.environ["env"]
region = os.environ["AWS_REGION"]
bucket_name = os.environ["bucket_name"]

s3 = boto3.client("s3")
sns = boto3.client("sns")
reference_esdl_data = {}


def get_ref_types(
        ref_type: str, ref_key: str, ref_value: str, filter_data: list, api_conf: dict, token: str, retry_count: int = 2
) -> dict:
    """Fetch data from reference esdl api"""
    endpoint = (f"/pgi-esdl-reference/api/v1/ReferenceData/ReferenceTypes/{ref_type.replace(' ', '%20')}/"
                f"ReferenceSystems/ESDL/True")
    try:
        ref_data = {}

        auth_headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {token}",
        }
        LOGGER.info("Calling Reference API for the reference type")
        response = requests.get(url=api_conf["apigee_base_url"] + endpoint, headers=auth_headers)
        if response.status_code != 200:
            if retry_count > 0:
                return get_ref_types(
                    ref_type, ref_key, ref_value, filter_data, api_conf, token, retry_count - 1)
            raise response.raise_for_status()
        response_keys = list(response.json()[0].keys())
        for key in response_keys:
            if key.capitalize() == ref_key:
                for res in response.json():
                    if filter_data == [] or res[key] in filter_data:
                        ref_data[res[key]] = res[ref_value]

    except Exception as e:
        LOGGER.error(e)
        raise e
    return ref_data


def get_enterprise_sources(enterprise_src_list: list, api_conf: dict, token: str, retry_count: int = 2) -> dict:
    """Fetch data from reference esdl api for enterprise sources"""
    endpoint = "/pgi-esdl-reference/api/v1/EnterpriseSystems/source_name"
    try:
        system_ids = {}

        auth_headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {token}",
        }

        endpoint_url = api_conf["apigee_base_url"] + endpoint
        for source in enterprise_src_list:
            api_endpoint_url = endpoint_url.replace(
                "source_name", source.replace(" ", "%20")
            )

            response = requests.get(url=api_endpoint_url, headers=auth_headers)
            if response.status_code != 200:
                if retry_count > 0:
                    return get_enterprise_sources(
                        enterprise_src_list, api_conf, token, retry_count - 1)
                raise response.raise_for_status()
            response_json = response.json()

            system_ids[response_json["Name"]] = response_json["Id"]

    except Exception as e:
        LOGGER.error(e)
        raise e
    return system_ids

def lambda_handler(_event: dict, _context: dict) -> str:
    try:
        secrets = json.loads(get_secret())
        LOGGER.info("Reading the Reference Types JSON Configuration... ")
        with open("reference_types_config.json", "r", encoding="utf-8") as f:
            reference_config = json.load(f)

        LOGGER.info("Fetching the Reference Types")
        token = get_token(secrets)
        for reference in reference_config["referenceTypeList"]:
            reference_type_dict = reference_config["referenceTypeList"][reference]
            reference_type = reference_type_dict["referenceType"]
            key_column = reference_type_dict["key_column"]
            value_column = reference_type_dict["value_column"]
            key_column_filter = reference_type_dict["key_column_filter"]

            reference_esdl_data[reference] = get_ref_types(
                reference_type, key_column, value_column, key_column_filter, secrets, token
            )
        LOGGER.info("Fetching the Enterprise Sources ")
        for enterpriseSource in reference_config["enterpriseSource"]:
            enterprise_src_list = reference_config["enterpriseSource"][enterpriseSource]

            reference_esdl_data[enterpriseSource] = get_enterprise_sources(
                enterprise_src_list, secrets, token
            )
        LOGGER.info("Pushing the Reference Data to S3 bucket")

        s3.put_object(
            Body=json.dumps(reference_esdl_data),
            Bucket=bucket_name,
            Key="reference_data/reference_esdl_data.json",
        )

        LOGGER.info("Fetching Portfolio and Share Class data from Portfolio Master")
        get_portfolio_share_class_data(reference_config, secrets, bucket_name)
        
    except Exception as err:
        topic_arn = os.environ["topic_arn"]
        sns.publish(
            TopicArn=topic_arn,
            Message=f"Dear Team,\n\n"
                 f"The Reference Data failed due to the following error..\n"
                 f"{err}\n\nThanks,\n{os.environ['signature']}",
            Subject="[ERROR] Reference Data Failed",
        )
        raise ValueError(err) from err
    return "Success"
Editor is loading...
Leave a Comment