Untitled

 avatar
unknown
plain_text
6 months ago
6.9 kB
5
Indexable
import json
import logging
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from copy import deepcopy
from datetime import datetime, timedelta

import pandas as pd
from data_reservoir.common_services import (
    latest_exchange_rates,
    schema_validation,
    get_secret,
    get_api_gateway_token,
    update_file_tracker,
)
from data_reservoir.master_data import get_reference_master

from src import common_utils

logger = logging.getLogger()
logger.setLevel(logging.INFO)

region = os.getenv("region", os.environ["AWS_DEFAULT_REGION"])
env = os.environ["env"]
atdd_scope = os.environ["atdd_scope"]


def invmnt_trxn(
    df: pd.DataFrame,
    system_id: int,
    ref_conf: dict,
    exchange_rates: dict,
    as_of_date: str,
) -> list:
    """Create positions investment_transaction from input"""
    logger.info("Investment Transaction field mapping started......")
    data_skeleton = {
        "header": {
            "destinationAddress": "PGI.ESDL.ACTG.TRANSACTION.IN",
            "messageSubject": "ATDD",
            "payloadType": "JSON",
            "batch": {"id": None, "rowCount": 1, "recordSequenceNumber": 1},
        },
        "body": {"accountingTransactions": None},
    }
    tnxn_skeleton = {
        "accountingBasisTypeId": int(ref_conf["accounting_basis_types"]["GAAP"]),
        "accountingTransactionStatusId": int(ref_conf["transaction_status"]["NEW"]),
        "contractVersion": 8.6,
        "investmentTypeId": int(ref_conf["investment_types"]["Security"]),
        "postDate": as_of_date,
        "systemId": system_id,
        "sourceProcessedDateTime": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
        + "Z",
        "lots": [{"lotId": "1"}],
    }

    invmnt_trxn_list = []
    for rows in df.itertuples(index=False):
        data_copy = data_skeleton.copy()
        data_copy["header"]["batch"]["id"] = str(uuid.uuid4())
        tnxn_copy = tnxn_skeleton.copy()
        lots_copy = tnxn_skeleton["lots"][0].copy()
        exchng_rate = exchange_rates[
            f"{getattr(rows, 'isocurrency')}_{getattr(rows, 'tradedate')}"
        ]
        tnxn_copy["accountingTransactionId"] = str(uuid.uuid4())
        tnxn_copy["accountingExchangeRate"] = exchng_rate
        tnxn_copy["investmentId"] = getattr(rows, "CADIS_ID")
        tnxn_copy["messageId"] = str(uuid.uuid4())
        tnxn_copy["linkingId"] = str(getattr(rows, "tanumber"))
        tnxn_copy["portfolioMasterId"] = getattr(rows, "PortfolioMasterId")
        tnxn_copy["settleDate"] = getattr(rows, "acqsettlementdate")
        tnxn_copy["tradeDate"] = getattr(rows, "tradedate")
        tnxn_copy["executingBrokerId"] = int(getattr(rows, "BrokerEsdlId"))

        if getattr(rows, "transactiondescript").strip().lower() == "sale":
            tnxn_copy["accountingTransactionTypeId"] = int(
                ref_conf["accounting_transaction_type"]["SELL"]
            )
            tnxn_copy["directionalImpactId"] = int(
                ref_conf["directional_impact"]["Cash Receipt"]
            )
        else:
            tnxn_copy["accountingTransactionTypeId"] = int(
                ref_conf["accounting_transaction_type"]["BUY"]
            )
            tnxn_copy["directionalImpactId"] = int(
                ref_conf["directional_impact"]["Cash Disbursement"]
            )

        lots_copy["quantity"] = float(getattr(rows, "units"))
        lots_copy["totalBase"] = float(getattr(rows, "principal"))
        lots_copy["totalLocal"] = float(getattr(rows, "principal"))
        lots_copy["totalUSD"] = float(getattr(rows, "principal")) / exchng_rate

        if getattr(rows, "gainloss") < 0:
            lots_copy["longTermRealizedLossValueLocal"] = float(
                getattr(rows, "gainloss")
            )
            lots_copy["longTermRealizedLossValueBase"] = float(
                getattr(rows, "gainloss")
            )
            lots_copy["longTermRealizedLossValueUSD"] = (
                float(getattr(rows, "gainloss")) / exchng_rate
            )
        else:
            lots_copy["longTermRealizedGainValueLocal"] = float(
                getattr(rows, "gainloss")
            )
            lots_copy["longTermRealizedGainValueBase"] = float(
                getattr(rows, "gainloss")
            )
            lots_copy["longTermRealizedGainValueUSD"] = (
                float(getattr(rows, "gainloss")) / exchng_rate
            )

        tnxn_copy["lots"] = [lots_copy]

        data_copy["body"]["accountingTransactions"] = [tnxn_copy]
        invmnt_trxn_list.append(deepcopy(data_copy))

    logger.info("Investment Transaction field mapping completes.")
    return invmnt_trxn_list


def apply_src_rules(input_df: pd.DataFrame, event_json: dict, as_of_date: str,) -> dict:
    """Source rule application"""
    ref_esdl_data = get_reference_master(env, region)
    ref_sys_id = ref_esdl_data["system_ids"]["ePAM SE ASIA"]
    input_df[["acqsettlementdate", "tradedate"]] = (
        input_df[["acqsettlementdate", "tradedate"]].apply(pd.to_datetime).astype(str)
    )
    input_df = common_utils.get_src_with_master_data(input_df, event_json)
    exchange_rates = latest_exchange_rates(
        input_df.filter(["isocurrency", "tradedate"], axis=1)
        .drop_duplicates()
        .values.tolist(),
        ref_esdl_data,
    )

    investment_data = invmnt_trxn(
        input_df,
        ref_sys_id,
        ref_esdl_data,
        exchange_rates,
        as_of_date,
    )

    schema_validation(investment_data, "investment_transaction")
    secrets = json.loads(get_secret())
    token_response = get_api_gateway_token(atdd_scope, secrets)

    with ThreadPoolExecutor(100) as executor:
        futures = []
        for data in investment_data:
            if datetime.now() - token_response["time"] >= timedelta(minutes=5):
                token_response = get_api_gateway_token(atdd_scope, secrets)
            futures.append(
                executor.submit(
                    common_utils.write_invmnt_trxn_data,
                    data,
                    secrets["api_gateway_base_url"],
                    token_response["token"],
                )
            )
        for future in as_completed(futures):
            if future.result().status_code != 200:
                logger.error(future.result().text)
                raise ConnectionError("Api exception occured")
    update_file_tracker(
        event_json,
        env,
        ref_sys_id,
        "ePAM SE ASIA",
        investment_data[0]["body"]["accountingTransactions"][0][
            "sourceProcessedDateTime"
        ],
        "Investment Transaction",
    )

    return {"status_code": 200}
Editor is loading...
Leave a Comment