Untitled
unknown
plain_text
9 months ago
11 kB
7
Indexable
import json
import logging
import os
import random
import uuid
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from copy import deepcopy
from datetime import datetime, timedelta
import boto3
import pandas as pd
from data_reservoir.common_services import (
latest_exchange_rates,
get_secret,
get_api_gateway_token,
)
from data_reservoir.master_data import get_reference_master
from src import common_utils
s3 = boto3.client("s3")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
region = os.getenv("region", os.environ["AWS_DEFAULT_REGION"])
env = os.environ["env"]
atdd_scope = os.environ["atdd_scope"]
config_path = os.path.join(os.path.dirname(__file__), 'transaction_type_config.json')
with open(
config_path, "r", encoding="utf-8"
) as transaction_type_config:
transaction_type_config = json.load(transaction_type_config)
def handle_special_transaction_type(
net_transaction_amt_local_ccy, sec_intl_cd, transaction_type, tnxn_copy, ref_conf
):
if transaction_type in ["Multiple D/C - Foreign ccy", "Multiple D/C - value date"]:
if sec_intl_cd[3] in ["1", "3"]:
tnxn_copy["accountingTransactionTypeId"] = int(
ref_conf["accounting_transaction_type"]["OTHER INCOME"]
)
if net_transaction_amt_local_ccy < 0:
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Cash Disbursement"]
)
elif net_transaction_amt_local_ccy > 0:
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Cash Receipt"]
)
elif sec_intl_cd[3] in ["2", "4"]:
tnxn_copy["accountingTransactionTypeId"] = int(
ref_conf["accounting_transaction_type"]["OTHER EXPENSE"]
)
if net_transaction_amt_local_ccy < 0:
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Deliver Free Cost Adjustment"]
)
elif net_transaction_amt_local_ccy > 0:
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Receive Free Cost Adjustment"]
)
elif sec_intl_cd[3] == "7" and transaction_type == "Multiple D/C - Foreign ccy":
tnxn_copy["accountingTransactionTypeId"] = int(
ref_conf["accounting_transaction_type"]["UNDEFINED"]
)
if net_transaction_amt_local_ccy < 0:
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Cash Disbursement"]
)
elif net_transaction_amt_local_ccy > 0:
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Cash Receipt"]
)
return tnxn_copy
def invmnt_trxn(
df: pd.DataFrame,
system_id: int,
ref_conf: dict,
exchange_rates: dict,
) -> list:
"""Create investment_transaction from input"""
logger.info("Investment Transaction field mapping started......")
data_skeleton = {
"header": {
"destinationAddress": "PGI.ESDL.ACTG.TRANSACTION.IN",
"messageSubject": "ATDD",
"payloadType": "JSON",
"batch": {"id": None, "rowCount": 1, "recordSequenceNumber": 1},
},
"body": {"accountingTransactions": None},
}
cur_utc_time = datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
tnxn_skeleton = {
"accountingBasisTypeId": int(ref_conf["accounting_basis_types"]["GAAP"]),
"accountingTransactionStatusId": int(ref_conf["transaction_status"]["NEW"]),
"contractVersion": 8.6,
"investmentTypeId": int(ref_conf["investment_types"]["Security"]),
"systemId": system_id,
"sourceProcessedDateTime": cur_utc_time
+ "Z",
}
invmnt_trxn_list = []
lot_numer = 0
for rows in df.itertuples(index=False):
data_copy = data_skeleton.copy()
data_copy["header"]["batch"]["id"] = str(uuid.uuid4())
tnxn_copy = tnxn_skeleton.copy()
lot_numer = lot_numer + 1
tnxn_skeleton["lots"] = [
{
"lotId": str(tnxn_copy["systemId"])
+ "_"
+ cur_utc_time
+ "_"
+ str(lot_numer)
}
]
lots_copy = tnxn_skeleton["lots"][0].copy()
exchng_rate = exchange_rates[
f"{getattr(rows, 'local_currency')}_{getattr(rows, 'trade_date')}"
]
tnxn_copy["accountingTransactionId"] = str(
getattr(rows, "accounting_transaction_id")
)
tnxn_copy["accountingExchangeRate"] = exchng_rate
# populate investmentId with CADIS_ID once it's available
tnxn_copy["investmentId"] = str(random.randint(0, 1000000))
tnxn_copy["messageId"] = str(uuid.uuid4())
tnxn_copy["portfolioMasterId"] = getattr(rows, "PortfolioMasterId")
tnxn_copy["postDate"] = getattr(rows, "entry_date")
tnxn_copy["settleDate"] = getattr(rows, "settlement_date")
tnxn_copy["tradeDate"] = getattr(rows, "trade_date")
tnxn_copy["price"] = float(getattr(rows, "market_price_local_ccy"))
sec_intl_cd = getattr(rows, "security_internal_code")
net_transaction_amt_local_ccy = getattr(rows, "net_transaction_amt_local_ccy")
transaction_type = getattr(rows, "transaction_type")
if transaction_type in [
"Multiple D/C - Foreign ccy",
"Multiple D/C - value date",
]:
tnxn_copy = handle_special_transaction_type(
net_transaction_amt_local_ccy,
sec_intl_cd,
transaction_type,
tnxn_copy,
ref_conf,
)
else:
config = transaction_type_config.get(transaction_type)
if config:
accounting_transaction_type = config["accountingTransactionType"]
directional_impact = config["directionalImpact"]
tnxn_copy["accountingTransactionTypeId"] = int(
ref_conf["accounting_transaction_type"][accounting_transaction_type]
)
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"][directional_impact]
)
lots_copy["quantity"] = float(getattr(rows, "units__nominal_value"))
lots_copy["bookValueLocal"] = float(getattr(rows, "book_cost_local_ccy"))
lots_copy["totalLocal"] = float(getattr(rows, "gross_cost_local_ccy"))
lots_copy["commissionLocal"] = float(
getattr(rows, "broker_commission_local_ccy")
)
lots_copy["withholdingTaxLocal"] = float(getattr(rows, "tax_amount_local_ccy"))
lots_copy["tradeFeeLocal"] = float(getattr(rows, "total_fee_local_ccy"))
lots_copy["accruedInterestLocal"] = float(
getattr(rows, "transaction_accrued_interest_local_ccy")
)
lots_copy["accruedInterestUSD"] = float(
getattr(rows, "transaction_accrued_interest_fund_ccy")
)
lots_copy["costLocal"] = float(getattr(rows, "net_transaction_amt_local_ccy"))
lots_copy["costBase"] = float(getattr(rows, "net_transaction_amt_fund_ccy"))
if getattr(rows, "realized_total_gl_local_ccy") < 0:
lots_copy["longTermRealizedLossValueLocal"] = float(
getattr(rows, "realized_total_gl_local_ccy")
)
else:
lots_copy["longTermRealizedGainValueLocal"] = float(
getattr(rows, "realized_total_gl_local_ccy")
)
if getattr(rows, "realized_total_gl_fund_ccy") < 0:
lots_copy["longTermRealizedLossValueBase"] = float(
getattr(rows, "realized_total_gl_fund_ccy")
)
else:
lots_copy["longTermRealizedGainValueBase"] = float(
getattr(rows, "realized_total_gl_fund_ccy")
)
if getattr(rows, "realized_market_gl_fund_ccy") < 0:
lots_copy["foreignCurrencyRealizedLossValueBase"] = float(
getattr(rows, "realized_market_gl_fund_ccy")
)
else:
lots_copy["foreignCurrencyRealizedGainValueBase"] = float(
getattr(rows, "realized_market_gl_fund_ccy")
)
tnxn_copy["lots"] = [lots_copy]
data_copy["body"]["accountingTransactions"] = [tnxn_copy]
invmnt_trxn_list.append(deepcopy(data_copy))
logger.info("Investment Transaction field mapping completes.")
return invmnt_trxn_list
def apply_src_rules(input_df: pd.DataFrame, event_json: dict) -> dict:
"""Source rule application"""
ref_esdl_data = get_reference_master(env, region)
ref_sys_id = ref_esdl_data["system_ids"]["CITI HK"]
input_df[["entry_date", "trade_date", "settlement_date"]] = (
input_df[["entry_date", "trade_date", "settlement_date"]]
.apply(pd.to_datetime, format='mixed')
.astype(str)
)
input_df = common_utils.get_src_with_master_data(input_df, event_json)
exchange_rates = latest_exchange_rates(
input_df.filter(["local_currency", "trade_date"], axis=1)
.drop_duplicates()
.values.tolist(),
ref_esdl_data,
)
investment_data = invmnt_trxn(input_df, ref_sys_id, ref_esdl_data, exchange_rates)
# Perform schema validation here
# schema_validation(investment_data, "investment_transaction")
secrets = json.loads(get_secret())
token_response = get_api_gateway_token(atdd_scope, secrets)
with ThreadPoolExecutor(100) as executor:
futures = []
for data in investment_data:
if datetime.now() - token_response["time"] >= timedelta(minutes=5):
token_response = get_api_gateway_token(atdd_scope, secrets)
futures.append(
executor.submit(
common_utils.write_invmnt_trxn_data,
data,
secrets["api_gateway_base_url"],
token_response["token"],
)
)
for future in as_completed(futures):
if future.result().status_code != 200:
logger.error(future.result().text)
raise ConnectionError("Api exception occured")
return {"status_code": 200}
Editor is loading...
Leave a Comment