Untitled

 avatar
unknown
plain_text
3 days ago
11 kB
4
Indexable
import json
import logging
import os
import random
import uuid
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from copy import deepcopy
from datetime import datetime, timedelta

import boto3
import pandas as pd
from data_reservoir.common_services import (
    latest_exchange_rates,
    get_secret,
    get_api_gateway_token,
)
from data_reservoir.master_data import get_reference_master

from src import common_utils

s3 = boto3.client("s3")

logger = logging.getLogger()
logger.setLevel(logging.INFO)

region = os.getenv("region", os.environ["AWS_DEFAULT_REGION"])
env = os.environ["env"]
atdd_scope = os.environ["atdd_scope"]

config_path = os.path.join(os.path.dirname(__file__), 'transaction_type_config.json')
with open(
    config_path, "r", encoding="utf-8"
) as transaction_type_config:
    transaction_type_config = json.load(transaction_type_config)


def handle_special_transaction_type(
    net_transaction_amt_local_ccy, sec_intl_cd, transaction_type, tnxn_copy, ref_conf
):
    if transaction_type in ["Multiple D/C - Foreign ccy", "Multiple D/C - value date"]:
        if sec_intl_cd[3] in ["1", "3"]:
            tnxn_copy["accountingTransactionTypeId"] = int(
                ref_conf["accounting_transaction_type"]["OTHER INCOME"]
            )
            if net_transaction_amt_local_ccy < 0:
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"]["Cash Disbursement"]
                )
            elif net_transaction_amt_local_ccy > 0:
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"]["Cash Receipt"]
                )
        elif sec_intl_cd[3] in ["2", "4"]:
            tnxn_copy["accountingTransactionTypeId"] = int(
                ref_conf["accounting_transaction_type"]["OTHER EXPENSE"]
            )
            if net_transaction_amt_local_ccy < 0:
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"]["Deliver Free Cost Adjustment"]
                )
            elif net_transaction_amt_local_ccy > 0:
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"]["Receive Free Cost Adjustment"]
                )
        elif sec_intl_cd[3] == "7" and transaction_type == "Multiple D/C - Foreign ccy":
            tnxn_copy["accountingTransactionTypeId"] = int(
                ref_conf["accounting_transaction_type"]["UNDEFINED"]
            )
            if net_transaction_amt_local_ccy < 0:
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"]["Cash Disbursement"]
                )
            elif net_transaction_amt_local_ccy > 0:
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"]["Cash Receipt"]
                )
    return tnxn_copy


def invmnt_trxn(
    df: pd.DataFrame,
    system_id: int,
    ref_conf: dict,
    exchange_rates: dict,
) -> list:
    """Create investment_transaction from input"""
    logger.info("Investment Transaction field mapping started......")
    data_skeleton = {
        "header": {
            "destinationAddress": "PGI.ESDL.ACTG.TRANSACTION.IN",
            "messageSubject": "ATDD",
            "payloadType": "JSON",
            "batch": {"id": None, "rowCount": 1, "recordSequenceNumber": 1},
        },
        "body": {"accountingTransactions": None},
    }
    cur_utc_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
    tnxn_skeleton = {
        "accountingBasisTypeId": int(ref_conf["accounting_basis_types"]["GAAP"]),
        "accountingTransactionStatusId": int(ref_conf["transaction_status"]["NEW"]),
        "contractVersion": 8.6,
        "investmentTypeId": int(ref_conf["investment_types"]["Security"]),
        "systemId": system_id,
        "sourceProcessedDateTime": cur_utc_time
        + "Z",
    }

    invmnt_trxn_list = []
    lot_numer = 0
    for rows in df.itertuples(index=False):
        data_copy = data_skeleton.copy()
        data_copy["header"]["batch"]["id"] = str(uuid.uuid4())
        tnxn_copy = tnxn_skeleton.copy()
        lot_numer = lot_numer + 1
        tnxn_skeleton["lots"] = [
            {
                "lotId": str(tnxn_copy["systemId"])
                + "_"
                + cur_utc_time
                + "_"
                + str(lot_numer)
            }
        ]
        lots_copy = tnxn_skeleton["lots"][0].copy()
        exchng_rate = exchange_rates[
            f"{getattr(rows, 'local_currency')}_{getattr(rows, 'trade_date')}"
        ]
        tnxn_copy["accountingTransactionId"] = str(
            getattr(rows, "accounting_transaction_id")
        )
        tnxn_copy["accountingExchangeRate"] = exchng_rate
        # populate investmentId with CADIS_ID once it's available
        tnxn_copy["investmentId"] = str(random.randint(0, 1000000))
        tnxn_copy["messageId"] = str(uuid.uuid4())
        tnxn_copy["portfolioMasterId"] = getattr(rows, "PortfolioMasterId")
        tnxn_copy["postDate"] = getattr(rows, "entry_date")
        tnxn_copy["settleDate"] = getattr(rows, "settlement_date")
        tnxn_copy["tradeDate"] = getattr(rows, "trade_date")
        tnxn_copy["price"] = float(getattr(rows, "market_price_local_ccy"))

        sec_intl_cd = getattr(rows, "security_internal_code")
        net_transaction_amt_local_ccy = getattr(rows, "net_transaction_amt_local_ccy")
        transaction_type = getattr(rows, "transaction_type")
        if transaction_type in [
            "Multiple D/C - Foreign ccy",
            "Multiple D/C - value date",
        ]:
            tnxn_copy = handle_special_transaction_type(
                net_transaction_amt_local_ccy,
                sec_intl_cd,
                transaction_type,
                tnxn_copy,
                ref_conf,
            )
        else:
            config = transaction_type_config.get(transaction_type)
            if config:
                accounting_transaction_type = config["accountingTransactionType"]
                directional_impact = config["directionalImpact"]
                tnxn_copy["accountingTransactionTypeId"] = int(
                    ref_conf["accounting_transaction_type"][accounting_transaction_type]
                )
                tnxn_copy["directionalImpactId"] = int(
                    ref_conf["directional_impact"][directional_impact]
                )

        lots_copy["quantity"] = float(getattr(rows, "units__nominal_value"))
        lots_copy["bookValueLocal"] = float(getattr(rows, "book_cost_local_ccy"))
        lots_copy["totalLocal"] = float(getattr(rows, "gross_cost_local_ccy"))
        lots_copy["commissionLocal"] = float(
            getattr(rows, "broker_commission_local_ccy")
        )
        lots_copy["withholdingTaxLocal"] = float(getattr(rows, "tax_amount_local_ccy"))
        lots_copy["tradeFeeLocal"] = float(getattr(rows, "total_fee_local_ccy"))
        lots_copy["accruedInterestLocal"] = float(
            getattr(rows, "transaction_accrued_interest_local_ccy")
        )
        lots_copy["accruedInterestUSD"] = float(
            getattr(rows, "transaction_accrued_interest_fund_ccy")
        )
        lots_copy["costLocal"] = float(getattr(rows, "net_transaction_amt_local_ccy"))
        lots_copy["costBase"] = float(getattr(rows, "net_transaction_amt_fund_ccy"))

        if getattr(rows, "realized_total_gl_local_ccy") < 0:
            lots_copy["longTermRealizedLossValueLocal"] = float(
                getattr(rows, "realized_total_gl_local_ccy")
            )
        else:
            lots_copy["longTermRealizedGainValueLocal"] = float(
                getattr(rows, "realized_total_gl_local_ccy")
            )

        if getattr(rows, "realized_total_gl_fund_ccy") < 0:
            lots_copy["longTermRealizedLossValueBase"] = float(
                getattr(rows, "realized_total_gl_fund_ccy")
            )
        else:
            lots_copy["longTermRealizedGainValueBase"] = float(
                getattr(rows, "realized_total_gl_fund_ccy")
            )

        if getattr(rows, "realized_market_gl_fund_ccy") < 0:
            lots_copy["foreignCurrencyRealizedLossValueBase"] = float(
                getattr(rows, "realized_market_gl_fund_ccy")
            )
        else:
            lots_copy["foreignCurrencyRealizedGainValueBase"] = float(
                getattr(rows, "realized_market_gl_fund_ccy")
            )

        tnxn_copy["lots"] = [lots_copy]

        data_copy["body"]["accountingTransactions"] = [tnxn_copy]
        invmnt_trxn_list.append(deepcopy(data_copy))

    logger.info("Investment Transaction field mapping completes.")
    return invmnt_trxn_list


def apply_src_rules(input_df: pd.DataFrame, event_json: dict) -> dict:
    """Source rule application"""
    ref_esdl_data = get_reference_master(env, region)
    ref_sys_id = ref_esdl_data["system_ids"]["CITI HK"]
    input_df[["entry_date", "trade_date", "settlement_date"]] = (
        input_df[["entry_date", "trade_date", "settlement_date"]]
        .apply(pd.to_datetime, format='mixed')
        .astype(str)
    )
    input_df = common_utils.get_src_with_master_data(input_df, event_json)
    exchange_rates = latest_exchange_rates(
        input_df.filter(["local_currency", "trade_date"], axis=1)
        .drop_duplicates()
        .values.tolist(),
        ref_esdl_data,
    )

    investment_data = invmnt_trxn(input_df, ref_sys_id, ref_esdl_data, exchange_rates)

    # Perform schema validation here
    # schema_validation(investment_data, "investment_transaction")
    secrets = json.loads(get_secret())
    token_response = get_api_gateway_token(atdd_scope, secrets)

    with ThreadPoolExecutor(100) as executor:
        futures = []
        for data in investment_data:
            if datetime.now() - token_response["time"] >= timedelta(minutes=5):
                token_response = get_api_gateway_token(atdd_scope, secrets)
            futures.append(
                executor.submit(
                    common_utils.write_invmnt_trxn_data,
                    data,
                    secrets["api_gateway_base_url"],
                    token_response["token"],
                )
            )
        for future in as_completed(futures):
            if future.result().status_code != 200:
                logger.error(future.result().text)
                raise ConnectionError("Api exception occured")

    return {"status_code": 200}
Editor is loading...
Leave a Comment