Untitled
unknown
plain_text
3 days ago
11 kB
4
Indexable
import json import logging import os import random import uuid from concurrent.futures import ThreadPoolExecutor from concurrent.futures import as_completed from copy import deepcopy from datetime import datetime, timedelta import boto3 import pandas as pd from data_reservoir.common_services import ( latest_exchange_rates, get_secret, get_api_gateway_token, ) from data_reservoir.master_data import get_reference_master from src import common_utils s3 = boto3.client("s3") logger = logging.getLogger() logger.setLevel(logging.INFO) region = os.getenv("region", os.environ["AWS_DEFAULT_REGION"]) env = os.environ["env"] atdd_scope = os.environ["atdd_scope"] config_path = os.path.join(os.path.dirname(__file__), 'transaction_type_config.json') with open( config_path, "r", encoding="utf-8" ) as transaction_type_config: transaction_type_config = json.load(transaction_type_config) def handle_special_transaction_type( net_transaction_amt_local_ccy, sec_intl_cd, transaction_type, tnxn_copy, ref_conf ): if transaction_type in ["Multiple D/C - Foreign ccy", "Multiple D/C - value date"]: if sec_intl_cd[3] in ["1", "3"]: tnxn_copy["accountingTransactionTypeId"] = int( ref_conf["accounting_transaction_type"]["OTHER INCOME"] ) if net_transaction_amt_local_ccy < 0: tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Cash Disbursement"] ) elif net_transaction_amt_local_ccy > 0: tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Cash Receipt"] ) elif sec_intl_cd[3] in ["2", "4"]: tnxn_copy["accountingTransactionTypeId"] = int( ref_conf["accounting_transaction_type"]["OTHER EXPENSE"] ) if net_transaction_amt_local_ccy < 0: tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Deliver Free Cost Adjustment"] ) elif net_transaction_amt_local_ccy > 0: tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Receive Free Cost Adjustment"] ) elif sec_intl_cd[3] == "7" and transaction_type == "Multiple D/C - Foreign ccy": tnxn_copy["accountingTransactionTypeId"] = int( ref_conf["accounting_transaction_type"]["UNDEFINED"] ) if net_transaction_amt_local_ccy < 0: tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Cash Disbursement"] ) elif net_transaction_amt_local_ccy > 0: tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"]["Cash Receipt"] ) return tnxn_copy def invmnt_trxn( df: pd.DataFrame, system_id: int, ref_conf: dict, exchange_rates: dict, ) -> list: """Create investment_transaction from input""" logger.info("Investment Transaction field mapping started......") data_skeleton = { "header": { "destinationAddress": "PGI.ESDL.ACTG.TRANSACTION.IN", "messageSubject": "ATDD", "payloadType": "JSON", "batch": {"id": None, "rowCount": 1, "recordSequenceNumber": 1}, }, "body": {"accountingTransactions": None}, } cur_utc_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] tnxn_skeleton = { "accountingBasisTypeId": int(ref_conf["accounting_basis_types"]["GAAP"]), "accountingTransactionStatusId": int(ref_conf["transaction_status"]["NEW"]), "contractVersion": 8.6, "investmentTypeId": int(ref_conf["investment_types"]["Security"]), "systemId": system_id, "sourceProcessedDateTime": cur_utc_time + "Z", } invmnt_trxn_list = [] lot_numer = 0 for rows in df.itertuples(index=False): data_copy = data_skeleton.copy() data_copy["header"]["batch"]["id"] = str(uuid.uuid4()) tnxn_copy = tnxn_skeleton.copy() lot_numer = lot_numer + 1 tnxn_skeleton["lots"] = [ { "lotId": str(tnxn_copy["systemId"]) + "_" + cur_utc_time + "_" + str(lot_numer) } ] lots_copy = tnxn_skeleton["lots"][0].copy() exchng_rate = exchange_rates[ f"{getattr(rows, 'local_currency')}_{getattr(rows, 'trade_date')}" ] tnxn_copy["accountingTransactionId"] = str( getattr(rows, "accounting_transaction_id") ) tnxn_copy["accountingExchangeRate"] = exchng_rate # populate investmentId with CADIS_ID once it's available tnxn_copy["investmentId"] = str(random.randint(0, 1000000)) tnxn_copy["messageId"] = str(uuid.uuid4()) tnxn_copy["portfolioMasterId"] = getattr(rows, "PortfolioMasterId") tnxn_copy["postDate"] = getattr(rows, "entry_date") tnxn_copy["settleDate"] = getattr(rows, "settlement_date") tnxn_copy["tradeDate"] = getattr(rows, "trade_date") tnxn_copy["price"] = float(getattr(rows, "market_price_local_ccy")) sec_intl_cd = getattr(rows, "security_internal_code") net_transaction_amt_local_ccy = getattr(rows, "net_transaction_amt_local_ccy") transaction_type = getattr(rows, "transaction_type") if transaction_type in [ "Multiple D/C - Foreign ccy", "Multiple D/C - value date", ]: tnxn_copy = handle_special_transaction_type( net_transaction_amt_local_ccy, sec_intl_cd, transaction_type, tnxn_copy, ref_conf, ) else: config = transaction_type_config.get(transaction_type) if config: accounting_transaction_type = config["accountingTransactionType"] directional_impact = config["directionalImpact"] tnxn_copy["accountingTransactionTypeId"] = int( ref_conf["accounting_transaction_type"][accounting_transaction_type] ) tnxn_copy["directionalImpactId"] = int( ref_conf["directional_impact"][directional_impact] ) lots_copy["quantity"] = float(getattr(rows, "units__nominal_value")) lots_copy["bookValueLocal"] = float(getattr(rows, "book_cost_local_ccy")) lots_copy["totalLocal"] = float(getattr(rows, "gross_cost_local_ccy")) lots_copy["commissionLocal"] = float( getattr(rows, "broker_commission_local_ccy") ) lots_copy["withholdingTaxLocal"] = float(getattr(rows, "tax_amount_local_ccy")) lots_copy["tradeFeeLocal"] = float(getattr(rows, "total_fee_local_ccy")) lots_copy["accruedInterestLocal"] = float( getattr(rows, "transaction_accrued_interest_local_ccy") ) lots_copy["accruedInterestUSD"] = float( getattr(rows, "transaction_accrued_interest_fund_ccy") ) lots_copy["costLocal"] = float(getattr(rows, "net_transaction_amt_local_ccy")) lots_copy["costBase"] = float(getattr(rows, "net_transaction_amt_fund_ccy")) if getattr(rows, "realized_total_gl_local_ccy") < 0: lots_copy["longTermRealizedLossValueLocal"] = float( getattr(rows, "realized_total_gl_local_ccy") ) else: lots_copy["longTermRealizedGainValueLocal"] = float( getattr(rows, "realized_total_gl_local_ccy") ) if getattr(rows, "realized_total_gl_fund_ccy") < 0: lots_copy["longTermRealizedLossValueBase"] = float( getattr(rows, "realized_total_gl_fund_ccy") ) else: lots_copy["longTermRealizedGainValueBase"] = float( getattr(rows, "realized_total_gl_fund_ccy") ) if getattr(rows, "realized_market_gl_fund_ccy") < 0: lots_copy["foreignCurrencyRealizedLossValueBase"] = float( getattr(rows, "realized_market_gl_fund_ccy") ) else: lots_copy["foreignCurrencyRealizedGainValueBase"] = float( getattr(rows, "realized_market_gl_fund_ccy") ) tnxn_copy["lots"] = [lots_copy] data_copy["body"]["accountingTransactions"] = [tnxn_copy] invmnt_trxn_list.append(deepcopy(data_copy)) logger.info("Investment Transaction field mapping completes.") return invmnt_trxn_list def apply_src_rules(input_df: pd.DataFrame, event_json: dict) -> dict: """Source rule application""" ref_esdl_data = get_reference_master(env, region) ref_sys_id = ref_esdl_data["system_ids"]["CITI HK"] input_df[["entry_date", "trade_date", "settlement_date"]] = ( input_df[["entry_date", "trade_date", "settlement_date"]] .apply(pd.to_datetime, format='mixed') .astype(str) ) input_df = common_utils.get_src_with_master_data(input_df, event_json) exchange_rates = latest_exchange_rates( input_df.filter(["local_currency", "trade_date"], axis=1) .drop_duplicates() .values.tolist(), ref_esdl_data, ) investment_data = invmnt_trxn(input_df, ref_sys_id, ref_esdl_data, exchange_rates) # Perform schema validation here # schema_validation(investment_data, "investment_transaction") secrets = json.loads(get_secret()) token_response = get_api_gateway_token(atdd_scope, secrets) with ThreadPoolExecutor(100) as executor: futures = [] for data in investment_data: if datetime.now() - token_response["time"] >= timedelta(minutes=5): token_response = get_api_gateway_token(atdd_scope, secrets) futures.append( executor.submit( common_utils.write_invmnt_trxn_data, data, secrets["api_gateway_base_url"], token_response["token"], ) ) for future in as_completed(futures): if future.result().status_code != 200: logger.error(future.result().text) raise ConnectionError("Api exception occured") return {"status_code": 200}
Editor is loading...
Leave a Comment