Untitled
unknown
plain_text
6 months ago
6.9 kB
5
Indexable
import json import logging import os import uuid from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from copy import deepcopy from datetime import datetime, timedelta import pandas as pd from data_reservoir.common_services import ( latest_exchange_rates, schema_validation, get_secret, get_api_gateway_token, update_file_tracker, ) from data_reservoir.master_data import get_reference_master from src import common_utils logger = logging.getLogger() logger.setLevel(logging.INFO) region = os.getenv("region", os.environ["AWS_DEFAULT_REGION"]) env = os.environ["env"] atdd_scope = os.environ["atdd_scope"] def invmnt_trxn( df: pd.DataFrame, system_id: int, ref_conf: dict, exchange_rates: dict, as_of_date: str, ) -> list: """Create positions investment_transaction from input""" logger.info("Investment Transaction field mapping started......") data_skeleton = { "header": { "destinationAddress": "PGI.ESDL.ACTG.TRANSACTION.IN", "messageSubject": "ATDD", "payloadType": "JSON", "batch": {"id": None, "rowCount": 1, "recordSequenceNumber": 1}, }, "body": {"accountingTransactions": None}, } tnxn_skeleton = { "accountingBasisTypeId": int(ref_conf["accounting_basis_types"]["GAAP"]), "accountingTransactionStatusId": int(ref_conf["transaction_status"]["NEW"]), "contractVersion": 8.6, "investmentTypeId": int(ref_conf["investment_types"]["Security"]), "postDate": as_of_date, "systemId": system_id, "sourceProcessedDateTime": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z", "lots": [{"lotId": "1"}], } invmnt_trxn_list = [] for rows in df.itertuples(index=False): data_copy = data_skeleton.copy() data_copy["header"]["batch"]["id"] = str(uuid.uuid4()) tnxn_copy = tnxn_skeleton.copy() lots_copy = tnxn_skeleton["lots"][0].copy() exchng_rate = exchange_rates[ f"{getattr(rows, 'isocurrency')}_{getattr(rows, 'tradedate')}" ] tnxn_copy["accountingTransactionId"] = str(uuid.uuid4()) tnxn_copy["accountingExchangeRate"] = exchng_rate tnxn_copy["investmentId"] = getattr(rows, "CADIS_ID") tnxn_copy["messageId"] = str(uuid.uuid4()) tnxn_copy["linkingId"] = str(getattr(rows, "tanumber")) tnxn_copy["portfolioMasterId"] = getattr(rows, "PortfolioMasterId") tnxn_copy["settleDate"] = getattr(rows, "acqsettlementdate") tnxn_copy["tradeDate"] = getattr(rows, "tradedate") tnxn_copy["executingBrokerId"] = int(getattr(rows, "BrokerEsdlId")) if getattr(rows, "transactiondescript").strip().lower() == "sale": tnxn_copy["accountingTransactionTypeId"] = int( ref_conf["accounting_transaction_type"]["SELL"] ) tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Cash Receipt"] ) else: tnxn_copy["accountingTransactionTypeId"] = int( ref_conf["accounting_transaction_type"]["BUY"] ) tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Cash Disbursement"] ) lots_copy["quantity"] = float(getattr(rows, "units")) lots_copy["totalBase"] = float(getattr(rows, "principal")) lots_copy["totalLocal"] = float(getattr(rows, "principal")) lots_copy["totalUSD"] = float(getattr(rows, "principal")) / exchng_rate if getattr(rows, "gainloss") < 0: lots_copy["longTermRealizedLossValueLocal"] = float( getattr(rows, "gainloss") ) lots_copy["longTermRealizedLossValueBase"] = float( getattr(rows, "gainloss") ) lots_copy["longTermRealizedLossValueUSD"] = ( float(getattr(rows, "gainloss")) / exchng_rate ) else: lots_copy["longTermRealizedGainValueLocal"] = float( getattr(rows, "gainloss") ) lots_copy["longTermRealizedGainValueBase"] = float( getattr(rows, "gainloss") ) lots_copy["longTermRealizedGainValueUSD"] = ( float(getattr(rows, "gainloss")) / exchng_rate ) tnxn_copy["lots"] = [lots_copy] data_copy["body"]["accountingTransactions"] = [tnxn_copy] invmnt_trxn_list.append(deepcopy(data_copy)) logger.info("Investment Transaction field mapping completes.") return invmnt_trxn_list def apply_src_rules(input_df: pd.DataFrame, event_json: dict, as_of_date: str,) -> dict: """Source rule application""" ref_esdl_data = get_reference_master(env, region) ref_sys_id = ref_esdl_data["system_ids"]["ePAM SE ASIA"] input_df[["acqsettlementdate", "tradedate"]] = ( input_df[["acqsettlementdate", "tradedate"]].apply(pd.to_datetime).astype(str) ) input_df = common_utils.get_src_with_master_data(input_df, event_json) exchange_rates = latest_exchange_rates( input_df.filter(["isocurrency", "tradedate"], axis=1) .drop_duplicates() .values.tolist(), ref_esdl_data, ) investment_data = invmnt_trxn( input_df, ref_sys_id, ref_esdl_data, exchange_rates, as_of_date, ) schema_validation(investment_data, "investment_transaction") secrets = json.loads(get_secret()) token_response = get_api_gateway_token(atdd_scope, secrets) with ThreadPoolExecutor(100) as executor: futures = [] for data in investment_data: if datetime.now() - token_response["time"] >= timedelta(minutes=5): token_response = get_api_gateway_token(atdd_scope, secrets) futures.append( executor.submit( common_utils.write_invmnt_trxn_data, data, secrets["api_gateway_base_url"], token_response["token"], ) ) for future in as_completed(futures): if future.result().status_code != 200: logger.error(future.result().text) raise ConnectionError("Api exception occured") update_file_tracker( event_json, env, ref_sys_id, "ePAM SE ASIA", investment_data[0]["body"]["accountingTransactions"][0][ "sourceProcessedDateTime" ], "Investment Transaction", ) return {"status_code": 200}
Editor is loading...
Leave a Comment