Untitled
unknown
plain_text
a year ago
6.9 kB
8
Indexable
import json
import logging
import os
import uuid
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
from copy import deepcopy
from datetime import datetime, timedelta
import pandas as pd
from data_reservoir.common_services import (
latest_exchange_rates,
schema_validation,
get_secret,
get_api_gateway_token,
update_file_tracker,
)
from data_reservoir.master_data import get_reference_master
from src import common_utils
logger = logging.getLogger()
logger.setLevel(logging.INFO)
region = os.getenv("region", os.environ["AWS_DEFAULT_REGION"])
env = os.environ["env"]
atdd_scope = os.environ["atdd_scope"]
def invmnt_trxn(
df: pd.DataFrame,
system_id: int,
ref_conf: dict,
exchange_rates: dict,
as_of_date: str,
) -> list:
"""Create positions investment_transaction from input"""
logger.info("Investment Transaction field mapping started......")
data_skeleton = {
"header": {
"destinationAddress": "PGI.ESDL.ACTG.TRANSACTION.IN",
"messageSubject": "ATDD",
"payloadType": "JSON",
"batch": {"id": None, "rowCount": 1, "recordSequenceNumber": 1},
},
"body": {"accountingTransactions": None},
}
tnxn_skeleton = {
"accountingBasisTypeId": int(ref_conf["accounting_basis_types"]["GAAP"]),
"accountingTransactionStatusId": int(ref_conf["transaction_status"]["NEW"]),
"contractVersion": 8.6,
"investmentTypeId": int(ref_conf["investment_types"]["Security"]),
"postDate": as_of_date,
"systemId": system_id,
"sourceProcessedDateTime": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3]
+ "Z",
"lots": [{"lotId": "1"}],
}
invmnt_trxn_list = []
for rows in df.itertuples(index=False):
data_copy = data_skeleton.copy()
data_copy["header"]["batch"]["id"] = str(uuid.uuid4())
tnxn_copy = tnxn_skeleton.copy()
lots_copy = tnxn_skeleton["lots"][0].copy()
exchng_rate = exchange_rates[
f"{getattr(rows, 'isocurrency')}_{getattr(rows, 'tradedate')}"
]
tnxn_copy["accountingTransactionId"] = str(uuid.uuid4())
tnxn_copy["accountingExchangeRate"] = exchng_rate
tnxn_copy["investmentId"] = getattr(rows, "CADIS_ID")
tnxn_copy["messageId"] = str(uuid.uuid4())
tnxn_copy["linkingId"] = str(getattr(rows, "tanumber"))
tnxn_copy["portfolioMasterId"] = getattr(rows, "PortfolioMasterId")
tnxn_copy["settleDate"] = getattr(rows, "acqsettlementdate")
tnxn_copy["tradeDate"] = getattr(rows, "tradedate")
tnxn_copy["executingBrokerId"] = int(getattr(rows, "BrokerEsdlId"))
if getattr(rows, "transactiondescript").strip().lower() == "sale":
tnxn_copy["accountingTransactionTypeId"] = int(
ref_conf["accounting_transaction_type"]["SELL"]
)
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Cash Receipt"]
)
else:
tnxn_copy["accountingTransactionTypeId"] = int(
ref_conf["accounting_transaction_type"]["BUY"]
)
tnxn_copy["directionalImpactId"] = int(
ref_conf["directional_impact"]["Cash Disbursement"]
)
lots_copy["quantity"] = float(getattr(rows, "units"))
lots_copy["totalBase"] = float(getattr(rows, "principal"))
lots_copy["totalLocal"] = float(getattr(rows, "principal"))
lots_copy["totalUSD"] = float(getattr(rows, "principal")) / exchng_rate
if getattr(rows, "gainloss") < 0:
lots_copy["longTermRealizedLossValueLocal"] = float(
getattr(rows, "gainloss")
)
lots_copy["longTermRealizedLossValueBase"] = float(
getattr(rows, "gainloss")
)
lots_copy["longTermRealizedLossValueUSD"] = (
float(getattr(rows, "gainloss")) / exchng_rate
)
else:
lots_copy["longTermRealizedGainValueLocal"] = float(
getattr(rows, "gainloss")
)
lots_copy["longTermRealizedGainValueBase"] = float(
getattr(rows, "gainloss")
)
lots_copy["longTermRealizedGainValueUSD"] = (
float(getattr(rows, "gainloss")) / exchng_rate
)
tnxn_copy["lots"] = [lots_copy]
data_copy["body"]["accountingTransactions"] = [tnxn_copy]
invmnt_trxn_list.append(deepcopy(data_copy))
logger.info("Investment Transaction field mapping completes.")
return invmnt_trxn_list
def apply_src_rules(input_df: pd.DataFrame, event_json: dict, as_of_date: str,) -> dict:
"""Source rule application"""
ref_esdl_data = get_reference_master(env, region)
ref_sys_id = ref_esdl_data["system_ids"]["ePAM SE ASIA"]
input_df[["acqsettlementdate", "tradedate"]] = (
input_df[["acqsettlementdate", "tradedate"]].apply(pd.to_datetime).astype(str)
)
input_df = common_utils.get_src_with_master_data(input_df, event_json)
exchange_rates = latest_exchange_rates(
input_df.filter(["isocurrency", "tradedate"], axis=1)
.drop_duplicates()
.values.tolist(),
ref_esdl_data,
)
investment_data = invmnt_trxn(
input_df,
ref_sys_id,
ref_esdl_data,
exchange_rates,
as_of_date,
)
schema_validation(investment_data, "investment_transaction")
secrets = json.loads(get_secret())
token_response = get_api_gateway_token(atdd_scope, secrets)
with ThreadPoolExecutor(100) as executor:
futures = []
for data in investment_data:
if datetime.now() - token_response["time"] >= timedelta(minutes=5):
token_response = get_api_gateway_token(atdd_scope, secrets)
futures.append(
executor.submit(
common_utils.write_invmnt_trxn_data,
data,
secrets["api_gateway_base_url"],
token_response["token"],
)
)
for future in as_completed(futures):
if future.result().status_code != 200:
logger.error(future.result().text)
raise ConnectionError("Api exception occured")
update_file_tracker(
event_json,
env,
ref_sys_id,
"ePAM SE ASIA",
investment_data[0]["body"]["accountingTransactions"][0][
"sourceProcessedDateTime"
],
"Investment Transaction",
)
return {"status_code": 200}
Editor is loading...
Leave a Comment